2f419c9eacac584b4eac32047e161f62d0cdb6bc
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks,
457                           bep, hvp, hypervisor):
458   """Builds instance related env variables for hooks
459
460   This builds the hook environment from individual variables.
461
462   @type name: string
463   @param name: the name of the instance
464   @type primary_node: string
465   @param primary_node: the name of the instance's primary node
466   @type secondary_nodes: list
467   @param secondary_nodes: list of secondary nodes as strings
468   @type os_type: string
469   @param os_type: the name of the instance's OS
470   @type status: boolean
471   @param status: the should_run status of the instance
472   @type memory: string
473   @param memory: the memory size of the instance
474   @type vcpus: string
475   @param vcpus: the count of VCPUs the instance has
476   @type nics: list
477   @param nics: list of tuples (ip, bridge, mac) representing
478       the NICs the instance  has
479   @type disk_template: string
480   @param disk_template: the distk template of the instance
481   @type disks: list
482   @param disks: the list of (size, mode) pairs
483   @type bep: dict
484   @param bep: the backend parameters for the instance
485   @type hvp: dict
486   @param hvp: the hypervisor parameters for the instance
487   @type hypervisor: string
488   @param hypervisor: the hypervisor for the instance
489   @rtype: dict
490   @return: the hook environment for this instance
491
492   """
493   if status:
494     str_status = "up"
495   else:
496     str_status = "down"
497   env = {
498     "OP_TARGET": name,
499     "INSTANCE_NAME": name,
500     "INSTANCE_PRIMARY": primary_node,
501     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502     "INSTANCE_OS_TYPE": os_type,
503     "INSTANCE_STATUS": str_status,
504     "INSTANCE_MEMORY": memory,
505     "INSTANCE_VCPUS": vcpus,
506     "INSTANCE_DISK_TEMPLATE": disk_template,
507     "INSTANCE_HYPERVISOR": hypervisor,
508   }
509
510   if nics:
511     nic_count = len(nics)
512     for idx, (ip, bridge, mac) in enumerate(nics):
513       if ip is None:
514         ip = ""
515       env["INSTANCE_NIC%d_IP" % idx] = ip
516       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
517       env["INSTANCE_NIC%d_MAC" % idx] = mac
518   else:
519     nic_count = 0
520
521   env["INSTANCE_NIC_COUNT"] = nic_count
522
523   if disks:
524     disk_count = len(disks)
525     for idx, (size, mode) in enumerate(disks):
526       env["INSTANCE_DISK%d_SIZE" % idx] = size
527       env["INSTANCE_DISK%d_MODE" % idx] = mode
528   else:
529     disk_count = 0
530
531   env["INSTANCE_DISK_COUNT"] = disk_count
532
533   for source, kind in [(bep, "BE"), (hvp, "HV")]:
534     for key, value in source.items():
535       env["INSTANCE_%s_%s" % (kind, key)] = value
536
537   return env
538
539
540 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
541   """Builds instance related env variables for hooks from an object.
542
543   @type lu: L{LogicalUnit}
544   @param lu: the logical unit on whose behalf we execute
545   @type instance: L{objects.Instance}
546   @param instance: the instance for which we should build the
547       environment
548   @type override: dict
549   @param override: dictionary with key/values that will override
550       our values
551   @rtype: dict
552   @return: the hook environment dictionary
553
554   """
555   cluster = lu.cfg.GetClusterInfo()
556   bep = cluster.FillBE(instance)
557   hvp = cluster.FillHV(instance)
558   args = {
559     'name': instance.name,
560     'primary_node': instance.primary_node,
561     'secondary_nodes': instance.secondary_nodes,
562     'os_type': instance.os,
563     'status': instance.admin_up,
564     'memory': bep[constants.BE_MEMORY],
565     'vcpus': bep[constants.BE_VCPUS],
566     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
567     'disk_template': instance.disk_template,
568     'disks': [(disk.size, disk.mode) for disk in instance.disks],
569     'bep': bep,
570     'hvp': hvp,
571     'hypervisor': instance.hypervisor,
572   }
573   if override:
574     args.update(override)
575   return _BuildInstanceHookEnv(**args)
576
577
578 def _AdjustCandidatePool(lu):
579   """Adjust the candidate pool after node operations.
580
581   """
582   mod_list = lu.cfg.MaintainCandidatePool()
583   if mod_list:
584     lu.LogInfo("Promoted nodes to master candidate role: %s",
585                ", ".join(node.name for node in mod_list))
586     for name in mod_list:
587       lu.context.ReaddNode(name)
588   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
589   if mc_now > mc_max:
590     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
591                (mc_now, mc_max))
592
593
594 def _CheckInstanceBridgesExist(lu, instance):
595   """Check that the brigdes needed by an instance exist.
596
597   """
598   # check bridges existance
599   brlist = [nic.bridge for nic in instance.nics]
600   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
601   result.Raise()
602   if not result.data:
603     raise errors.OpPrereqError("One or more target bridges %s does not"
604                                " exist on destination node '%s'" %
605                                (brlist, instance.primary_node))
606
607
608 class LUDestroyCluster(NoHooksLU):
609   """Logical unit for destroying the cluster.
610
611   """
612   _OP_REQP = []
613
614   def CheckPrereq(self):
615     """Check prerequisites.
616
617     This checks whether the cluster is empty.
618
619     Any errors are signalled by raising errors.OpPrereqError.
620
621     """
622     master = self.cfg.GetMasterNode()
623
624     nodelist = self.cfg.GetNodeList()
625     if len(nodelist) != 1 or nodelist[0] != master:
626       raise errors.OpPrereqError("There are still %d node(s) in"
627                                  " this cluster." % (len(nodelist) - 1))
628     instancelist = self.cfg.GetInstanceList()
629     if instancelist:
630       raise errors.OpPrereqError("There are still %d instance(s) in"
631                                  " this cluster." % len(instancelist))
632
633   def Exec(self, feedback_fn):
634     """Destroys the cluster.
635
636     """
637     master = self.cfg.GetMasterNode()
638     result = self.rpc.call_node_stop_master(master, False)
639     result.Raise()
640     if not result.data:
641       raise errors.OpExecError("Could not disable the master role")
642     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
643     utils.CreateBackup(priv_key)
644     utils.CreateBackup(pub_key)
645     return master
646
647
648 class LUVerifyCluster(LogicalUnit):
649   """Verifies the cluster status.
650
651   """
652   HPATH = "cluster-verify"
653   HTYPE = constants.HTYPE_CLUSTER
654   _OP_REQP = ["skip_checks"]
655   REQ_BGL = False
656
657   def ExpandNames(self):
658     self.needed_locks = {
659       locking.LEVEL_NODE: locking.ALL_SET,
660       locking.LEVEL_INSTANCE: locking.ALL_SET,
661     }
662     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
663
664   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
665                   node_result, feedback_fn, master_files,
666                   drbd_map, vg_name):
667     """Run multiple tests against a node.
668
669     Test list:
670
671       - compares ganeti version
672       - checks vg existance and size > 20G
673       - checks config file checksum
674       - checks ssh to other nodes
675
676     @type nodeinfo: L{objects.Node}
677     @param nodeinfo: the node to check
678     @param file_list: required list of files
679     @param local_cksum: dictionary of local files and their checksums
680     @param node_result: the results from the node
681     @param feedback_fn: function used to accumulate results
682     @param master_files: list of files that only masters should have
683     @param drbd_map: the useddrbd minors for this node, in
684         form of minor: (instance, must_exist) which correspond to instances
685         and their running status
686     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
687
688     """
689     node = nodeinfo.name
690
691     # main result, node_result should be a non-empty dict
692     if not node_result or not isinstance(node_result, dict):
693       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
694       return True
695
696     # compares ganeti version
697     local_version = constants.PROTOCOL_VERSION
698     remote_version = node_result.get('version', None)
699     if not (remote_version and isinstance(remote_version, (list, tuple)) and
700             len(remote_version) == 2):
701       feedback_fn("  - ERROR: connection to %s failed" % (node))
702       return True
703
704     if local_version != remote_version[0]:
705       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
706                   " node %s %s" % (local_version, node, remote_version[0]))
707       return True
708
709     # node seems compatible, we can actually try to look into its results
710
711     bad = False
712
713     # full package version
714     if constants.RELEASE_VERSION != remote_version[1]:
715       feedback_fn("  - WARNING: software version mismatch: master %s,"
716                   " node %s %s" %
717                   (constants.RELEASE_VERSION, node, remote_version[1]))
718
719     # checks vg existence and size > 20G
720     if vg_name is not None:
721       vglist = node_result.get(constants.NV_VGLIST, None)
722       if not vglist:
723         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
724                         (node,))
725         bad = True
726       else:
727         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
728                                               constants.MIN_VG_SIZE)
729         if vgstatus:
730           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
731           bad = True
732
733     # checks config file checksum
734
735     remote_cksum = node_result.get(constants.NV_FILELIST, None)
736     if not isinstance(remote_cksum, dict):
737       bad = True
738       feedback_fn("  - ERROR: node hasn't returned file checksum data")
739     else:
740       for file_name in file_list:
741         node_is_mc = nodeinfo.master_candidate
742         must_have_file = file_name not in master_files
743         if file_name not in remote_cksum:
744           if node_is_mc or must_have_file:
745             bad = True
746             feedback_fn("  - ERROR: file '%s' missing" % file_name)
747         elif remote_cksum[file_name] != local_cksum[file_name]:
748           if node_is_mc or must_have_file:
749             bad = True
750             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
751           else:
752             # not candidate and this is not a must-have file
753             bad = True
754             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
755                         " '%s'" % file_name)
756         else:
757           # all good, except non-master/non-must have combination
758           if not node_is_mc and not must_have_file:
759             feedback_fn("  - ERROR: file '%s' should not exist on non master"
760                         " candidates" % file_name)
761
762     # checks ssh to any
763
764     if constants.NV_NODELIST not in node_result:
765       bad = True
766       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
767     else:
768       if node_result[constants.NV_NODELIST]:
769         bad = True
770         for node in node_result[constants.NV_NODELIST]:
771           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
772                           (node, node_result[constants.NV_NODELIST][node]))
773
774     if constants.NV_NODENETTEST not in node_result:
775       bad = True
776       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
777     else:
778       if node_result[constants.NV_NODENETTEST]:
779         bad = True
780         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
781         for node in nlist:
782           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
783                           (node, node_result[constants.NV_NODENETTEST][node]))
784
785     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
786     if isinstance(hyp_result, dict):
787       for hv_name, hv_result in hyp_result.iteritems():
788         if hv_result is not None:
789           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
790                       (hv_name, hv_result))
791
792     # check used drbd list
793     if vg_name is not None:
794       used_minors = node_result.get(constants.NV_DRBDLIST, [])
795       if not isinstance(used_minors, (tuple, list)):
796         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
797                     str(used_minors))
798       else:
799         for minor, (iname, must_exist) in drbd_map.items():
800           if minor not in used_minors and must_exist:
801             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
802                         " not active" % (minor, iname))
803             bad = True
804         for minor in used_minors:
805           if minor not in drbd_map:
806             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
807                         minor)
808             bad = True
809
810     return bad
811
812   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
813                       node_instance, feedback_fn, n_offline):
814     """Verify an instance.
815
816     This function checks to see if the required block devices are
817     available on the instance's node.
818
819     """
820     bad = False
821
822     node_current = instanceconfig.primary_node
823
824     node_vol_should = {}
825     instanceconfig.MapLVsByNode(node_vol_should)
826
827     for node in node_vol_should:
828       if node in n_offline:
829         # ignore missing volumes on offline nodes
830         continue
831       for volume in node_vol_should[node]:
832         if node not in node_vol_is or volume not in node_vol_is[node]:
833           feedback_fn("  - ERROR: volume %s missing on node %s" %
834                           (volume, node))
835           bad = True
836
837     if instanceconfig.admin_up:
838       if ((node_current not in node_instance or
839           not instance in node_instance[node_current]) and
840           node_current not in n_offline):
841         feedback_fn("  - ERROR: instance %s not running on node %s" %
842                         (instance, node_current))
843         bad = True
844
845     for node in node_instance:
846       if (not node == node_current):
847         if instance in node_instance[node]:
848           feedback_fn("  - ERROR: instance %s should not run on node %s" %
849                           (instance, node))
850           bad = True
851
852     return bad
853
854   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
855     """Verify if there are any unknown volumes in the cluster.
856
857     The .os, .swap and backup volumes are ignored. All other volumes are
858     reported as unknown.
859
860     """
861     bad = False
862
863     for node in node_vol_is:
864       for volume in node_vol_is[node]:
865         if node not in node_vol_should or volume not in node_vol_should[node]:
866           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
867                       (volume, node))
868           bad = True
869     return bad
870
871   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
872     """Verify the list of running instances.
873
874     This checks what instances are running but unknown to the cluster.
875
876     """
877     bad = False
878     for node in node_instance:
879       for runninginstance in node_instance[node]:
880         if runninginstance not in instancelist:
881           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
882                           (runninginstance, node))
883           bad = True
884     return bad
885
886   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
887     """Verify N+1 Memory Resilience.
888
889     Check that if one single node dies we can still start all the instances it
890     was primary for.
891
892     """
893     bad = False
894
895     for node, nodeinfo in node_info.iteritems():
896       # This code checks that every node which is now listed as secondary has
897       # enough memory to host all instances it is supposed to should a single
898       # other node in the cluster fail.
899       # FIXME: not ready for failover to an arbitrary node
900       # FIXME: does not support file-backed instances
901       # WARNING: we currently take into account down instances as well as up
902       # ones, considering that even if they're down someone might want to start
903       # them even in the event of a node failure.
904       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
905         needed_mem = 0
906         for instance in instances:
907           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
908           if bep[constants.BE_AUTO_BALANCE]:
909             needed_mem += bep[constants.BE_MEMORY]
910         if nodeinfo['mfree'] < needed_mem:
911           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
912                       " failovers should node %s fail" % (node, prinode))
913           bad = True
914     return bad
915
916   def CheckPrereq(self):
917     """Check prerequisites.
918
919     Transform the list of checks we're going to skip into a set and check that
920     all its members are valid.
921
922     """
923     self.skip_set = frozenset(self.op.skip_checks)
924     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
925       raise errors.OpPrereqError("Invalid checks to be skipped specified")
926
927   def BuildHooksEnv(self):
928     """Build hooks env.
929
930     Cluster-Verify hooks just rone in the post phase and their failure makes
931     the output be logged in the verify output and the verification to fail.
932
933     """
934     all_nodes = self.cfg.GetNodeList()
935     env = {
936       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
937       }
938     for node in self.cfg.GetAllNodesInfo().values():
939       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
940
941     return env, [], all_nodes
942
943   def Exec(self, feedback_fn):
944     """Verify integrity of cluster, performing various test on nodes.
945
946     """
947     bad = False
948     feedback_fn("* Verifying global settings")
949     for msg in self.cfg.VerifyConfig():
950       feedback_fn("  - ERROR: %s" % msg)
951
952     vg_name = self.cfg.GetVGName()
953     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
954     nodelist = utils.NiceSort(self.cfg.GetNodeList())
955     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
956     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
957     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
958                         for iname in instancelist)
959     i_non_redundant = [] # Non redundant instances
960     i_non_a_balanced = [] # Non auto-balanced instances
961     n_offline = [] # List of offline nodes
962     n_drained = [] # List of nodes being drained
963     node_volume = {}
964     node_instance = {}
965     node_info = {}
966     instance_cfg = {}
967
968     # FIXME: verify OS list
969     # do local checksums
970     master_files = [constants.CLUSTER_CONF_FILE]
971
972     file_names = ssconf.SimpleStore().GetFileList()
973     file_names.append(constants.SSL_CERT_FILE)
974     file_names.append(constants.RAPI_CERT_FILE)
975     file_names.extend(master_files)
976
977     local_checksums = utils.FingerprintFiles(file_names)
978
979     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
980     node_verify_param = {
981       constants.NV_FILELIST: file_names,
982       constants.NV_NODELIST: [node.name for node in nodeinfo
983                               if not node.offline],
984       constants.NV_HYPERVISOR: hypervisors,
985       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
986                                   node.secondary_ip) for node in nodeinfo
987                                  if not node.offline],
988       constants.NV_INSTANCELIST: hypervisors,
989       constants.NV_VERSION: None,
990       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
991       }
992     if vg_name is not None:
993       node_verify_param[constants.NV_VGLIST] = None
994       node_verify_param[constants.NV_LVLIST] = vg_name
995       node_verify_param[constants.NV_DRBDLIST] = None
996     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
997                                            self.cfg.GetClusterName())
998
999     cluster = self.cfg.GetClusterInfo()
1000     master_node = self.cfg.GetMasterNode()
1001     all_drbd_map = self.cfg.ComputeDRBDMap()
1002
1003     for node_i in nodeinfo:
1004       node = node_i.name
1005       nresult = all_nvinfo[node].data
1006
1007       if node_i.offline:
1008         feedback_fn("* Skipping offline node %s" % (node,))
1009         n_offline.append(node)
1010         continue
1011
1012       if node == master_node:
1013         ntype = "master"
1014       elif node_i.master_candidate:
1015         ntype = "master candidate"
1016       elif node_i.drained:
1017         ntype = "drained"
1018         n_drained.append(node)
1019       else:
1020         ntype = "regular"
1021       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1022
1023       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1024         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1025         bad = True
1026         continue
1027
1028       node_drbd = {}
1029       for minor, instance in all_drbd_map[node].items():
1030         if instance not in instanceinfo:
1031           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1032                       instance)
1033           # ghost instance should not be running, but otherwise we
1034           # don't give double warnings (both ghost instance and
1035           # unallocated minor in use)
1036           node_drbd[minor] = (instance, False)
1037         else:
1038           instance = instanceinfo[instance]
1039           node_drbd[minor] = (instance.name, instance.admin_up)
1040       result = self._VerifyNode(node_i, file_names, local_checksums,
1041                                 nresult, feedback_fn, master_files,
1042                                 node_drbd, vg_name)
1043       bad = bad or result
1044
1045       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1046       if vg_name is None:
1047         node_volume[node] = {}
1048       elif isinstance(lvdata, basestring):
1049         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1050                     (node, utils.SafeEncode(lvdata)))
1051         bad = True
1052         node_volume[node] = {}
1053       elif not isinstance(lvdata, dict):
1054         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1055         bad = True
1056         continue
1057       else:
1058         node_volume[node] = lvdata
1059
1060       # node_instance
1061       idata = nresult.get(constants.NV_INSTANCELIST, None)
1062       if not isinstance(idata, list):
1063         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1064                     (node,))
1065         bad = True
1066         continue
1067
1068       node_instance[node] = idata
1069
1070       # node_info
1071       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1072       if not isinstance(nodeinfo, dict):
1073         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1074         bad = True
1075         continue
1076
1077       try:
1078         node_info[node] = {
1079           "mfree": int(nodeinfo['memory_free']),
1080           "pinst": [],
1081           "sinst": [],
1082           # dictionary holding all instances this node is secondary for,
1083           # grouped by their primary node. Each key is a cluster node, and each
1084           # value is a list of instances which have the key as primary and the
1085           # current node as secondary.  this is handy to calculate N+1 memory
1086           # availability if you can only failover from a primary to its
1087           # secondary.
1088           "sinst-by-pnode": {},
1089         }
1090         # FIXME: devise a free space model for file based instances as well
1091         if vg_name is not None:
1092           if (constants.NV_VGLIST not in nresult or
1093               vg_name not in nresult[constants.NV_VGLIST]):
1094             feedback_fn("  - ERROR: node %s didn't return data for the"
1095                         " volume group '%s' - it is either missing or broken" %
1096                         (node, vg_name))
1097             bad = True
1098             continue
1099           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1100       except (ValueError, KeyError):
1101         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1102                     " from node %s" % (node,))
1103         bad = True
1104         continue
1105
1106     node_vol_should = {}
1107
1108     for instance in instancelist:
1109       feedback_fn("* Verifying instance %s" % instance)
1110       inst_config = instanceinfo[instance]
1111       result =  self._VerifyInstance(instance, inst_config, node_volume,
1112                                      node_instance, feedback_fn, n_offline)
1113       bad = bad or result
1114       inst_nodes_offline = []
1115
1116       inst_config.MapLVsByNode(node_vol_should)
1117
1118       instance_cfg[instance] = inst_config
1119
1120       pnode = inst_config.primary_node
1121       if pnode in node_info:
1122         node_info[pnode]['pinst'].append(instance)
1123       elif pnode not in n_offline:
1124         feedback_fn("  - ERROR: instance %s, connection to primary node"
1125                     " %s failed" % (instance, pnode))
1126         bad = True
1127
1128       if pnode in n_offline:
1129         inst_nodes_offline.append(pnode)
1130
1131       # If the instance is non-redundant we cannot survive losing its primary
1132       # node, so we are not N+1 compliant. On the other hand we have no disk
1133       # templates with more than one secondary so that situation is not well
1134       # supported either.
1135       # FIXME: does not support file-backed instances
1136       if len(inst_config.secondary_nodes) == 0:
1137         i_non_redundant.append(instance)
1138       elif len(inst_config.secondary_nodes) > 1:
1139         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1140                     % instance)
1141
1142       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1143         i_non_a_balanced.append(instance)
1144
1145       for snode in inst_config.secondary_nodes:
1146         if snode in node_info:
1147           node_info[snode]['sinst'].append(instance)
1148           if pnode not in node_info[snode]['sinst-by-pnode']:
1149             node_info[snode]['sinst-by-pnode'][pnode] = []
1150           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1151         elif snode not in n_offline:
1152           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1153                       " %s failed" % (instance, snode))
1154           bad = True
1155         if snode in n_offline:
1156           inst_nodes_offline.append(snode)
1157
1158       if inst_nodes_offline:
1159         # warn that the instance lives on offline nodes, and set bad=True
1160         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1161                     ", ".join(inst_nodes_offline))
1162         bad = True
1163
1164     feedback_fn("* Verifying orphan volumes")
1165     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1166                                        feedback_fn)
1167     bad = bad or result
1168
1169     feedback_fn("* Verifying remaining instances")
1170     result = self._VerifyOrphanInstances(instancelist, node_instance,
1171                                          feedback_fn)
1172     bad = bad or result
1173
1174     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1175       feedback_fn("* Verifying N+1 Memory redundancy")
1176       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1177       bad = bad or result
1178
1179     feedback_fn("* Other Notes")
1180     if i_non_redundant:
1181       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1182                   % len(i_non_redundant))
1183
1184     if i_non_a_balanced:
1185       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1186                   % len(i_non_a_balanced))
1187
1188     if n_offline:
1189       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1190
1191     if n_drained:
1192       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1193
1194     return not bad
1195
1196   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1197     """Analize the post-hooks' result
1198
1199     This method analyses the hook result, handles it, and sends some
1200     nicely-formatted feedback back to the user.
1201
1202     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1203         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1204     @param hooks_results: the results of the multi-node hooks rpc call
1205     @param feedback_fn: function used send feedback back to the caller
1206     @param lu_result: previous Exec result
1207     @return: the new Exec result, based on the previous result
1208         and hook results
1209
1210     """
1211     # We only really run POST phase hooks, and are only interested in
1212     # their results
1213     if phase == constants.HOOKS_PHASE_POST:
1214       # Used to change hooks' output to proper indentation
1215       indent_re = re.compile('^', re.M)
1216       feedback_fn("* Hooks Results")
1217       if not hooks_results:
1218         feedback_fn("  - ERROR: general communication failure")
1219         lu_result = 1
1220       else:
1221         for node_name in hooks_results:
1222           show_node_header = True
1223           res = hooks_results[node_name]
1224           if res.failed or res.data is False or not isinstance(res.data, list):
1225             if res.offline:
1226               # no need to warn or set fail return value
1227               continue
1228             feedback_fn("    Communication failure in hooks execution")
1229             lu_result = 1
1230             continue
1231           for script, hkr, output in res.data:
1232             if hkr == constants.HKR_FAIL:
1233               # The node header is only shown once, if there are
1234               # failing hooks on that node
1235               if show_node_header:
1236                 feedback_fn("  Node %s:" % node_name)
1237                 show_node_header = False
1238               feedback_fn("    ERROR: Script %s failed, output:" % script)
1239               output = indent_re.sub('      ', output)
1240               feedback_fn("%s" % output)
1241               lu_result = 1
1242
1243       return lu_result
1244
1245
1246 class LUVerifyDisks(NoHooksLU):
1247   """Verifies the cluster disks status.
1248
1249   """
1250   _OP_REQP = []
1251   REQ_BGL = False
1252
1253   def ExpandNames(self):
1254     self.needed_locks = {
1255       locking.LEVEL_NODE: locking.ALL_SET,
1256       locking.LEVEL_INSTANCE: locking.ALL_SET,
1257     }
1258     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1259
1260   def CheckPrereq(self):
1261     """Check prerequisites.
1262
1263     This has no prerequisites.
1264
1265     """
1266     pass
1267
1268   def Exec(self, feedback_fn):
1269     """Verify integrity of cluster disks.
1270
1271     """
1272     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1273
1274     vg_name = self.cfg.GetVGName()
1275     nodes = utils.NiceSort(self.cfg.GetNodeList())
1276     instances = [self.cfg.GetInstanceInfo(name)
1277                  for name in self.cfg.GetInstanceList()]
1278
1279     nv_dict = {}
1280     for inst in instances:
1281       inst_lvs = {}
1282       if (not inst.admin_up or
1283           inst.disk_template not in constants.DTS_NET_MIRROR):
1284         continue
1285       inst.MapLVsByNode(inst_lvs)
1286       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1287       for node, vol_list in inst_lvs.iteritems():
1288         for vol in vol_list:
1289           nv_dict[(node, vol)] = inst
1290
1291     if not nv_dict:
1292       return result
1293
1294     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1295
1296     to_act = set()
1297     for node in nodes:
1298       # node_volume
1299       lvs = node_lvs[node]
1300       if lvs.failed:
1301         if not lvs.offline:
1302           self.LogWarning("Connection to node %s failed: %s" %
1303                           (node, lvs.data))
1304         continue
1305       lvs = lvs.data
1306       if isinstance(lvs, basestring):
1307         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1308         res_nlvm[node] = lvs
1309         continue
1310       elif not isinstance(lvs, dict):
1311         logging.warning("Connection to node %s failed or invalid data"
1312                         " returned", node)
1313         res_nodes.append(node)
1314         continue
1315
1316       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1317         inst = nv_dict.pop((node, lv_name), None)
1318         if (not lv_online and inst is not None
1319             and inst.name not in res_instances):
1320           res_instances.append(inst.name)
1321
1322     # any leftover items in nv_dict are missing LVs, let's arrange the
1323     # data better
1324     for key, inst in nv_dict.iteritems():
1325       if inst.name not in res_missing:
1326         res_missing[inst.name] = []
1327       res_missing[inst.name].append(key)
1328
1329     return result
1330
1331
1332 class LURenameCluster(LogicalUnit):
1333   """Rename the cluster.
1334
1335   """
1336   HPATH = "cluster-rename"
1337   HTYPE = constants.HTYPE_CLUSTER
1338   _OP_REQP = ["name"]
1339
1340   def BuildHooksEnv(self):
1341     """Build hooks env.
1342
1343     """
1344     env = {
1345       "OP_TARGET": self.cfg.GetClusterName(),
1346       "NEW_NAME": self.op.name,
1347       }
1348     mn = self.cfg.GetMasterNode()
1349     return env, [mn], [mn]
1350
1351   def CheckPrereq(self):
1352     """Verify that the passed name is a valid one.
1353
1354     """
1355     hostname = utils.HostInfo(self.op.name)
1356
1357     new_name = hostname.name
1358     self.ip = new_ip = hostname.ip
1359     old_name = self.cfg.GetClusterName()
1360     old_ip = self.cfg.GetMasterIP()
1361     if new_name == old_name and new_ip == old_ip:
1362       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1363                                  " cluster has changed")
1364     if new_ip != old_ip:
1365       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1366         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1367                                    " reachable on the network. Aborting." %
1368                                    new_ip)
1369
1370     self.op.name = new_name
1371
1372   def Exec(self, feedback_fn):
1373     """Rename the cluster.
1374
1375     """
1376     clustername = self.op.name
1377     ip = self.ip
1378
1379     # shutdown the master IP
1380     master = self.cfg.GetMasterNode()
1381     result = self.rpc.call_node_stop_master(master, False)
1382     if result.failed or not result.data:
1383       raise errors.OpExecError("Could not disable the master role")
1384
1385     try:
1386       cluster = self.cfg.GetClusterInfo()
1387       cluster.cluster_name = clustername
1388       cluster.master_ip = ip
1389       self.cfg.Update(cluster)
1390
1391       # update the known hosts file
1392       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1393       node_list = self.cfg.GetNodeList()
1394       try:
1395         node_list.remove(master)
1396       except ValueError:
1397         pass
1398       result = self.rpc.call_upload_file(node_list,
1399                                          constants.SSH_KNOWN_HOSTS_FILE)
1400       for to_node, to_result in result.iteritems():
1401         if to_result.failed or not to_result.data:
1402           logging.error("Copy of file %s to node %s failed",
1403                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1404
1405     finally:
1406       result = self.rpc.call_node_start_master(master, False)
1407       if result.failed or not result.data:
1408         self.LogWarning("Could not re-enable the master role on"
1409                         " the master, please restart manually.")
1410
1411
1412 def _RecursiveCheckIfLVMBased(disk):
1413   """Check if the given disk or its children are lvm-based.
1414
1415   @type disk: L{objects.Disk}
1416   @param disk: the disk to check
1417   @rtype: booleean
1418   @return: boolean indicating whether a LD_LV dev_type was found or not
1419
1420   """
1421   if disk.children:
1422     for chdisk in disk.children:
1423       if _RecursiveCheckIfLVMBased(chdisk):
1424         return True
1425   return disk.dev_type == constants.LD_LV
1426
1427
1428 class LUSetClusterParams(LogicalUnit):
1429   """Change the parameters of the cluster.
1430
1431   """
1432   HPATH = "cluster-modify"
1433   HTYPE = constants.HTYPE_CLUSTER
1434   _OP_REQP = []
1435   REQ_BGL = False
1436
1437   def CheckArguments(self):
1438     """Check parameters
1439
1440     """
1441     if not hasattr(self.op, "candidate_pool_size"):
1442       self.op.candidate_pool_size = None
1443     if self.op.candidate_pool_size is not None:
1444       try:
1445         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1446       except (ValueError, TypeError), err:
1447         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1448                                    str(err))
1449       if self.op.candidate_pool_size < 1:
1450         raise errors.OpPrereqError("At least one master candidate needed")
1451
1452   def ExpandNames(self):
1453     # FIXME: in the future maybe other cluster params won't require checking on
1454     # all nodes to be modified.
1455     self.needed_locks = {
1456       locking.LEVEL_NODE: locking.ALL_SET,
1457     }
1458     self.share_locks[locking.LEVEL_NODE] = 1
1459
1460   def BuildHooksEnv(self):
1461     """Build hooks env.
1462
1463     """
1464     env = {
1465       "OP_TARGET": self.cfg.GetClusterName(),
1466       "NEW_VG_NAME": self.op.vg_name,
1467       }
1468     mn = self.cfg.GetMasterNode()
1469     return env, [mn], [mn]
1470
1471   def CheckPrereq(self):
1472     """Check prerequisites.
1473
1474     This checks whether the given params don't conflict and
1475     if the given volume group is valid.
1476
1477     """
1478     if self.op.vg_name is not None and not self.op.vg_name:
1479       instances = self.cfg.GetAllInstancesInfo().values()
1480       for inst in instances:
1481         for disk in inst.disks:
1482           if _RecursiveCheckIfLVMBased(disk):
1483             raise errors.OpPrereqError("Cannot disable lvm storage while"
1484                                        " lvm-based instances exist")
1485
1486     node_list = self.acquired_locks[locking.LEVEL_NODE]
1487
1488     # if vg_name not None, checks given volume group on all nodes
1489     if self.op.vg_name:
1490       vglist = self.rpc.call_vg_list(node_list)
1491       for node in node_list:
1492         if vglist[node].failed:
1493           # ignoring down node
1494           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1495           continue
1496         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1497                                               self.op.vg_name,
1498                                               constants.MIN_VG_SIZE)
1499         if vgstatus:
1500           raise errors.OpPrereqError("Error on node '%s': %s" %
1501                                      (node, vgstatus))
1502
1503     self.cluster = cluster = self.cfg.GetClusterInfo()
1504     # validate beparams changes
1505     if self.op.beparams:
1506       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1507       self.new_beparams = cluster.FillDict(
1508         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1509
1510     # hypervisor list/parameters
1511     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1512     if self.op.hvparams:
1513       if not isinstance(self.op.hvparams, dict):
1514         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1515       for hv_name, hv_dict in self.op.hvparams.items():
1516         if hv_name not in self.new_hvparams:
1517           self.new_hvparams[hv_name] = hv_dict
1518         else:
1519           self.new_hvparams[hv_name].update(hv_dict)
1520
1521     if self.op.enabled_hypervisors is not None:
1522       self.hv_list = self.op.enabled_hypervisors
1523     else:
1524       self.hv_list = cluster.enabled_hypervisors
1525
1526     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1527       # either the enabled list has changed, or the parameters have, validate
1528       for hv_name, hv_params in self.new_hvparams.items():
1529         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1530             (self.op.enabled_hypervisors and
1531              hv_name in self.op.enabled_hypervisors)):
1532           # either this is a new hypervisor, or its parameters have changed
1533           hv_class = hypervisor.GetHypervisor(hv_name)
1534           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1535           hv_class.CheckParameterSyntax(hv_params)
1536           _CheckHVParams(self, node_list, hv_name, hv_params)
1537
1538   def Exec(self, feedback_fn):
1539     """Change the parameters of the cluster.
1540
1541     """
1542     if self.op.vg_name is not None:
1543       new_volume = self.op.vg_name
1544       if not new_volume:
1545         new_volume = None
1546       if new_volume != self.cfg.GetVGName():
1547         self.cfg.SetVGName(new_volume)
1548       else:
1549         feedback_fn("Cluster LVM configuration already in desired"
1550                     " state, not changing")
1551     if self.op.hvparams:
1552       self.cluster.hvparams = self.new_hvparams
1553     if self.op.enabled_hypervisors is not None:
1554       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1555     if self.op.beparams:
1556       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1557     if self.op.candidate_pool_size is not None:
1558       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1559
1560     self.cfg.Update(self.cluster)
1561
1562     # we want to update nodes after the cluster so that if any errors
1563     # happen, we have recorded and saved the cluster info
1564     if self.op.candidate_pool_size is not None:
1565       _AdjustCandidatePool(self)
1566
1567
1568 class LURedistributeConfig(NoHooksLU):
1569   """Force the redistribution of cluster configuration.
1570
1571   This is a very simple LU.
1572
1573   """
1574   _OP_REQP = []
1575   REQ_BGL = False
1576
1577   def ExpandNames(self):
1578     self.needed_locks = {
1579       locking.LEVEL_NODE: locking.ALL_SET,
1580     }
1581     self.share_locks[locking.LEVEL_NODE] = 1
1582
1583   def CheckPrereq(self):
1584     """Check prerequisites.
1585
1586     """
1587
1588   def Exec(self, feedback_fn):
1589     """Redistribute the configuration.
1590
1591     """
1592     self.cfg.Update(self.cfg.GetClusterInfo())
1593
1594
1595 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1596   """Sleep and poll for an instance's disk to sync.
1597
1598   """
1599   if not instance.disks:
1600     return True
1601
1602   if not oneshot:
1603     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1604
1605   node = instance.primary_node
1606
1607   for dev in instance.disks:
1608     lu.cfg.SetDiskID(dev, node)
1609
1610   retries = 0
1611   while True:
1612     max_time = 0
1613     done = True
1614     cumul_degraded = False
1615     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1616     if rstats.failed or not rstats.data:
1617       lu.LogWarning("Can't get any data from node %s", node)
1618       retries += 1
1619       if retries >= 10:
1620         raise errors.RemoteError("Can't contact node %s for mirror data,"
1621                                  " aborting." % node)
1622       time.sleep(6)
1623       continue
1624     rstats = rstats.data
1625     retries = 0
1626     for i, mstat in enumerate(rstats):
1627       if mstat is None:
1628         lu.LogWarning("Can't compute data for node %s/%s",
1629                            node, instance.disks[i].iv_name)
1630         continue
1631       # we ignore the ldisk parameter
1632       perc_done, est_time, is_degraded, _ = mstat
1633       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1634       if perc_done is not None:
1635         done = False
1636         if est_time is not None:
1637           rem_time = "%d estimated seconds remaining" % est_time
1638           max_time = est_time
1639         else:
1640           rem_time = "no time estimate"
1641         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1642                         (instance.disks[i].iv_name, perc_done, rem_time))
1643     if done or oneshot:
1644       break
1645
1646     time.sleep(min(60, max_time))
1647
1648   if done:
1649     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1650   return not cumul_degraded
1651
1652
1653 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1654   """Check that mirrors are not degraded.
1655
1656   The ldisk parameter, if True, will change the test from the
1657   is_degraded attribute (which represents overall non-ok status for
1658   the device(s)) to the ldisk (representing the local storage status).
1659
1660   """
1661   lu.cfg.SetDiskID(dev, node)
1662   if ldisk:
1663     idx = 6
1664   else:
1665     idx = 5
1666
1667   result = True
1668   if on_primary or dev.AssembleOnSecondary():
1669     rstats = lu.rpc.call_blockdev_find(node, dev)
1670     msg = rstats.RemoteFailMsg()
1671     if msg:
1672       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1673       result = False
1674     elif not rstats.payload:
1675       lu.LogWarning("Can't find disk on node %s", node)
1676       result = False
1677     else:
1678       result = result and (not rstats.payload[idx])
1679   if dev.children:
1680     for child in dev.children:
1681       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1682
1683   return result
1684
1685
1686 class LUDiagnoseOS(NoHooksLU):
1687   """Logical unit for OS diagnose/query.
1688
1689   """
1690   _OP_REQP = ["output_fields", "names"]
1691   REQ_BGL = False
1692   _FIELDS_STATIC = utils.FieldSet()
1693   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1694
1695   def ExpandNames(self):
1696     if self.op.names:
1697       raise errors.OpPrereqError("Selective OS query not supported")
1698
1699     _CheckOutputFields(static=self._FIELDS_STATIC,
1700                        dynamic=self._FIELDS_DYNAMIC,
1701                        selected=self.op.output_fields)
1702
1703     # Lock all nodes, in shared mode
1704     # Temporary removal of locks, should be reverted later
1705     # TODO: reintroduce locks when they are lighter-weight
1706     self.needed_locks = {}
1707     #self.share_locks[locking.LEVEL_NODE] = 1
1708     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1709
1710   def CheckPrereq(self):
1711     """Check prerequisites.
1712
1713     """
1714
1715   @staticmethod
1716   def _DiagnoseByOS(node_list, rlist):
1717     """Remaps a per-node return list into an a per-os per-node dictionary
1718
1719     @param node_list: a list with the names of all nodes
1720     @param rlist: a map with node names as keys and OS objects as values
1721
1722     @rtype: dict
1723     @return: a dictionary with osnames as keys and as value another map, with
1724         nodes as keys and list of OS objects as values, eg::
1725
1726           {"debian-etch": {"node1": [<object>,...],
1727                            "node2": [<object>,]}
1728           }
1729
1730     """
1731     all_os = {}
1732     # we build here the list of nodes that didn't fail the RPC (at RPC
1733     # level), so that nodes with a non-responding node daemon don't
1734     # make all OSes invalid
1735     good_nodes = [node_name for node_name in rlist
1736                   if not rlist[node_name].failed]
1737     for node_name, nr in rlist.iteritems():
1738       if nr.failed or not nr.data:
1739         continue
1740       for os_obj in nr.data:
1741         if os_obj.name not in all_os:
1742           # build a list of nodes for this os containing empty lists
1743           # for each node in node_list
1744           all_os[os_obj.name] = {}
1745           for nname in good_nodes:
1746             all_os[os_obj.name][nname] = []
1747         all_os[os_obj.name][node_name].append(os_obj)
1748     return all_os
1749
1750   def Exec(self, feedback_fn):
1751     """Compute the list of OSes.
1752
1753     """
1754     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1755     node_data = self.rpc.call_os_diagnose(valid_nodes)
1756     if node_data == False:
1757       raise errors.OpExecError("Can't gather the list of OSes")
1758     pol = self._DiagnoseByOS(valid_nodes, node_data)
1759     output = []
1760     for os_name, os_data in pol.iteritems():
1761       row = []
1762       for field in self.op.output_fields:
1763         if field == "name":
1764           val = os_name
1765         elif field == "valid":
1766           val = utils.all([osl and osl[0] for osl in os_data.values()])
1767         elif field == "node_status":
1768           val = {}
1769           for node_name, nos_list in os_data.iteritems():
1770             val[node_name] = [(v.status, v.path) for v in nos_list]
1771         else:
1772           raise errors.ParameterError(field)
1773         row.append(val)
1774       output.append(row)
1775
1776     return output
1777
1778
1779 class LURemoveNode(LogicalUnit):
1780   """Logical unit for removing a node.
1781
1782   """
1783   HPATH = "node-remove"
1784   HTYPE = constants.HTYPE_NODE
1785   _OP_REQP = ["node_name"]
1786
1787   def BuildHooksEnv(self):
1788     """Build hooks env.
1789
1790     This doesn't run on the target node in the pre phase as a failed
1791     node would then be impossible to remove.
1792
1793     """
1794     env = {
1795       "OP_TARGET": self.op.node_name,
1796       "NODE_NAME": self.op.node_name,
1797       }
1798     all_nodes = self.cfg.GetNodeList()
1799     all_nodes.remove(self.op.node_name)
1800     return env, all_nodes, all_nodes
1801
1802   def CheckPrereq(self):
1803     """Check prerequisites.
1804
1805     This checks:
1806      - the node exists in the configuration
1807      - it does not have primary or secondary instances
1808      - it's not the master
1809
1810     Any errors are signalled by raising errors.OpPrereqError.
1811
1812     """
1813     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1814     if node is None:
1815       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1816
1817     instance_list = self.cfg.GetInstanceList()
1818
1819     masternode = self.cfg.GetMasterNode()
1820     if node.name == masternode:
1821       raise errors.OpPrereqError("Node is the master node,"
1822                                  " you need to failover first.")
1823
1824     for instance_name in instance_list:
1825       instance = self.cfg.GetInstanceInfo(instance_name)
1826       if node.name in instance.all_nodes:
1827         raise errors.OpPrereqError("Instance %s is still running on the node,"
1828                                    " please remove first." % instance_name)
1829     self.op.node_name = node.name
1830     self.node = node
1831
1832   def Exec(self, feedback_fn):
1833     """Removes the node from the cluster.
1834
1835     """
1836     node = self.node
1837     logging.info("Stopping the node daemon and removing configs from node %s",
1838                  node.name)
1839
1840     self.context.RemoveNode(node.name)
1841
1842     self.rpc.call_node_leave_cluster(node.name)
1843
1844     # Promote nodes to master candidate as needed
1845     _AdjustCandidatePool(self)
1846
1847
1848 class LUQueryNodes(NoHooksLU):
1849   """Logical unit for querying nodes.
1850
1851   """
1852   _OP_REQP = ["output_fields", "names", "use_locking"]
1853   REQ_BGL = False
1854   _FIELDS_DYNAMIC = utils.FieldSet(
1855     "dtotal", "dfree",
1856     "mtotal", "mnode", "mfree",
1857     "bootid",
1858     "ctotal", "cnodes", "csockets",
1859     )
1860
1861   _FIELDS_STATIC = utils.FieldSet(
1862     "name", "pinst_cnt", "sinst_cnt",
1863     "pinst_list", "sinst_list",
1864     "pip", "sip", "tags",
1865     "serial_no",
1866     "master_candidate",
1867     "master",
1868     "offline",
1869     "drained",
1870     )
1871
1872   def ExpandNames(self):
1873     _CheckOutputFields(static=self._FIELDS_STATIC,
1874                        dynamic=self._FIELDS_DYNAMIC,
1875                        selected=self.op.output_fields)
1876
1877     self.needed_locks = {}
1878     self.share_locks[locking.LEVEL_NODE] = 1
1879
1880     if self.op.names:
1881       self.wanted = _GetWantedNodes(self, self.op.names)
1882     else:
1883       self.wanted = locking.ALL_SET
1884
1885     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1886     self.do_locking = self.do_node_query and self.op.use_locking
1887     if self.do_locking:
1888       # if we don't request only static fields, we need to lock the nodes
1889       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1890
1891
1892   def CheckPrereq(self):
1893     """Check prerequisites.
1894
1895     """
1896     # The validation of the node list is done in the _GetWantedNodes,
1897     # if non empty, and if empty, there's no validation to do
1898     pass
1899
1900   def Exec(self, feedback_fn):
1901     """Computes the list of nodes and their attributes.
1902
1903     """
1904     all_info = self.cfg.GetAllNodesInfo()
1905     if self.do_locking:
1906       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1907     elif self.wanted != locking.ALL_SET:
1908       nodenames = self.wanted
1909       missing = set(nodenames).difference(all_info.keys())
1910       if missing:
1911         raise errors.OpExecError(
1912           "Some nodes were removed before retrieving their data: %s" % missing)
1913     else:
1914       nodenames = all_info.keys()
1915
1916     nodenames = utils.NiceSort(nodenames)
1917     nodelist = [all_info[name] for name in nodenames]
1918
1919     # begin data gathering
1920
1921     if self.do_node_query:
1922       live_data = {}
1923       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1924                                           self.cfg.GetHypervisorType())
1925       for name in nodenames:
1926         nodeinfo = node_data[name]
1927         if not nodeinfo.failed and nodeinfo.data:
1928           nodeinfo = nodeinfo.data
1929           fn = utils.TryConvert
1930           live_data[name] = {
1931             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1932             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1933             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1934             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1935             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1936             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1937             "bootid": nodeinfo.get('bootid', None),
1938             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1939             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1940             }
1941         else:
1942           live_data[name] = {}
1943     else:
1944       live_data = dict.fromkeys(nodenames, {})
1945
1946     node_to_primary = dict([(name, set()) for name in nodenames])
1947     node_to_secondary = dict([(name, set()) for name in nodenames])
1948
1949     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1950                              "sinst_cnt", "sinst_list"))
1951     if inst_fields & frozenset(self.op.output_fields):
1952       instancelist = self.cfg.GetInstanceList()
1953
1954       for instance_name in instancelist:
1955         inst = self.cfg.GetInstanceInfo(instance_name)
1956         if inst.primary_node in node_to_primary:
1957           node_to_primary[inst.primary_node].add(inst.name)
1958         for secnode in inst.secondary_nodes:
1959           if secnode in node_to_secondary:
1960             node_to_secondary[secnode].add(inst.name)
1961
1962     master_node = self.cfg.GetMasterNode()
1963
1964     # end data gathering
1965
1966     output = []
1967     for node in nodelist:
1968       node_output = []
1969       for field in self.op.output_fields:
1970         if field == "name":
1971           val = node.name
1972         elif field == "pinst_list":
1973           val = list(node_to_primary[node.name])
1974         elif field == "sinst_list":
1975           val = list(node_to_secondary[node.name])
1976         elif field == "pinst_cnt":
1977           val = len(node_to_primary[node.name])
1978         elif field == "sinst_cnt":
1979           val = len(node_to_secondary[node.name])
1980         elif field == "pip":
1981           val = node.primary_ip
1982         elif field == "sip":
1983           val = node.secondary_ip
1984         elif field == "tags":
1985           val = list(node.GetTags())
1986         elif field == "serial_no":
1987           val = node.serial_no
1988         elif field == "master_candidate":
1989           val = node.master_candidate
1990         elif field == "master":
1991           val = node.name == master_node
1992         elif field == "offline":
1993           val = node.offline
1994         elif field == "drained":
1995           val = node.drained
1996         elif self._FIELDS_DYNAMIC.Matches(field):
1997           val = live_data[node.name].get(field, None)
1998         else:
1999           raise errors.ParameterError(field)
2000         node_output.append(val)
2001       output.append(node_output)
2002
2003     return output
2004
2005
2006 class LUQueryNodeVolumes(NoHooksLU):
2007   """Logical unit for getting volumes on node(s).
2008
2009   """
2010   _OP_REQP = ["nodes", "output_fields"]
2011   REQ_BGL = False
2012   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2013   _FIELDS_STATIC = utils.FieldSet("node")
2014
2015   def ExpandNames(self):
2016     _CheckOutputFields(static=self._FIELDS_STATIC,
2017                        dynamic=self._FIELDS_DYNAMIC,
2018                        selected=self.op.output_fields)
2019
2020     self.needed_locks = {}
2021     self.share_locks[locking.LEVEL_NODE] = 1
2022     if not self.op.nodes:
2023       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2024     else:
2025       self.needed_locks[locking.LEVEL_NODE] = \
2026         _GetWantedNodes(self, self.op.nodes)
2027
2028   def CheckPrereq(self):
2029     """Check prerequisites.
2030
2031     This checks that the fields required are valid output fields.
2032
2033     """
2034     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2035
2036   def Exec(self, feedback_fn):
2037     """Computes the list of nodes and their attributes.
2038
2039     """
2040     nodenames = self.nodes
2041     volumes = self.rpc.call_node_volumes(nodenames)
2042
2043     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2044              in self.cfg.GetInstanceList()]
2045
2046     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2047
2048     output = []
2049     for node in nodenames:
2050       if node not in volumes or volumes[node].failed or not volumes[node].data:
2051         continue
2052
2053       node_vols = volumes[node].data[:]
2054       node_vols.sort(key=lambda vol: vol['dev'])
2055
2056       for vol in node_vols:
2057         node_output = []
2058         for field in self.op.output_fields:
2059           if field == "node":
2060             val = node
2061           elif field == "phys":
2062             val = vol['dev']
2063           elif field == "vg":
2064             val = vol['vg']
2065           elif field == "name":
2066             val = vol['name']
2067           elif field == "size":
2068             val = int(float(vol['size']))
2069           elif field == "instance":
2070             for inst in ilist:
2071               if node not in lv_by_node[inst]:
2072                 continue
2073               if vol['name'] in lv_by_node[inst][node]:
2074                 val = inst.name
2075                 break
2076             else:
2077               val = '-'
2078           else:
2079             raise errors.ParameterError(field)
2080           node_output.append(str(val))
2081
2082         output.append(node_output)
2083
2084     return output
2085
2086
2087 class LUAddNode(LogicalUnit):
2088   """Logical unit for adding node to the cluster.
2089
2090   """
2091   HPATH = "node-add"
2092   HTYPE = constants.HTYPE_NODE
2093   _OP_REQP = ["node_name"]
2094
2095   def BuildHooksEnv(self):
2096     """Build hooks env.
2097
2098     This will run on all nodes before, and on all nodes + the new node after.
2099
2100     """
2101     env = {
2102       "OP_TARGET": self.op.node_name,
2103       "NODE_NAME": self.op.node_name,
2104       "NODE_PIP": self.op.primary_ip,
2105       "NODE_SIP": self.op.secondary_ip,
2106       }
2107     nodes_0 = self.cfg.GetNodeList()
2108     nodes_1 = nodes_0 + [self.op.node_name, ]
2109     return env, nodes_0, nodes_1
2110
2111   def CheckPrereq(self):
2112     """Check prerequisites.
2113
2114     This checks:
2115      - the new node is not already in the config
2116      - it is resolvable
2117      - its parameters (single/dual homed) matches the cluster
2118
2119     Any errors are signalled by raising errors.OpPrereqError.
2120
2121     """
2122     node_name = self.op.node_name
2123     cfg = self.cfg
2124
2125     dns_data = utils.HostInfo(node_name)
2126
2127     node = dns_data.name
2128     primary_ip = self.op.primary_ip = dns_data.ip
2129     secondary_ip = getattr(self.op, "secondary_ip", None)
2130     if secondary_ip is None:
2131       secondary_ip = primary_ip
2132     if not utils.IsValidIP(secondary_ip):
2133       raise errors.OpPrereqError("Invalid secondary IP given")
2134     self.op.secondary_ip = secondary_ip
2135
2136     node_list = cfg.GetNodeList()
2137     if not self.op.readd and node in node_list:
2138       raise errors.OpPrereqError("Node %s is already in the configuration" %
2139                                  node)
2140     elif self.op.readd and node not in node_list:
2141       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2142
2143     for existing_node_name in node_list:
2144       existing_node = cfg.GetNodeInfo(existing_node_name)
2145
2146       if self.op.readd and node == existing_node_name:
2147         if (existing_node.primary_ip != primary_ip or
2148             existing_node.secondary_ip != secondary_ip):
2149           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2150                                      " address configuration as before")
2151         continue
2152
2153       if (existing_node.primary_ip == primary_ip or
2154           existing_node.secondary_ip == primary_ip or
2155           existing_node.primary_ip == secondary_ip or
2156           existing_node.secondary_ip == secondary_ip):
2157         raise errors.OpPrereqError("New node ip address(es) conflict with"
2158                                    " existing node %s" % existing_node.name)
2159
2160     # check that the type of the node (single versus dual homed) is the
2161     # same as for the master
2162     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2163     master_singlehomed = myself.secondary_ip == myself.primary_ip
2164     newbie_singlehomed = secondary_ip == primary_ip
2165     if master_singlehomed != newbie_singlehomed:
2166       if master_singlehomed:
2167         raise errors.OpPrereqError("The master has no private ip but the"
2168                                    " new node has one")
2169       else:
2170         raise errors.OpPrereqError("The master has a private ip but the"
2171                                    " new node doesn't have one")
2172
2173     # checks reachablity
2174     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2175       raise errors.OpPrereqError("Node not reachable by ping")
2176
2177     if not newbie_singlehomed:
2178       # check reachability from my secondary ip to newbie's secondary ip
2179       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2180                            source=myself.secondary_ip):
2181         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2182                                    " based ping to noded port")
2183
2184     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2185     mc_now, _ = self.cfg.GetMasterCandidateStats()
2186     master_candidate = mc_now < cp_size
2187
2188     self.new_node = objects.Node(name=node,
2189                                  primary_ip=primary_ip,
2190                                  secondary_ip=secondary_ip,
2191                                  master_candidate=master_candidate,
2192                                  offline=False, drained=False)
2193
2194   def Exec(self, feedback_fn):
2195     """Adds the new node to the cluster.
2196
2197     """
2198     new_node = self.new_node
2199     node = new_node.name
2200
2201     # check connectivity
2202     result = self.rpc.call_version([node])[node]
2203     result.Raise()
2204     if result.data:
2205       if constants.PROTOCOL_VERSION == result.data:
2206         logging.info("Communication to node %s fine, sw version %s match",
2207                      node, result.data)
2208       else:
2209         raise errors.OpExecError("Version mismatch master version %s,"
2210                                  " node version %s" %
2211                                  (constants.PROTOCOL_VERSION, result.data))
2212     else:
2213       raise errors.OpExecError("Cannot get version from the new node")
2214
2215     # setup ssh on node
2216     logging.info("Copy ssh key to node %s", node)
2217     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2218     keyarray = []
2219     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2220                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2221                 priv_key, pub_key]
2222
2223     for i in keyfiles:
2224       f = open(i, 'r')
2225       try:
2226         keyarray.append(f.read())
2227       finally:
2228         f.close()
2229
2230     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2231                                     keyarray[2],
2232                                     keyarray[3], keyarray[4], keyarray[5])
2233
2234     msg = result.RemoteFailMsg()
2235     if msg:
2236       raise errors.OpExecError("Cannot transfer ssh keys to the"
2237                                " new node: %s" % msg)
2238
2239     # Add node to our /etc/hosts, and add key to known_hosts
2240     utils.AddHostToEtcHosts(new_node.name)
2241
2242     if new_node.secondary_ip != new_node.primary_ip:
2243       result = self.rpc.call_node_has_ip_address(new_node.name,
2244                                                  new_node.secondary_ip)
2245       if result.failed or not result.data:
2246         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2247                                  " you gave (%s). Please fix and re-run this"
2248                                  " command." % new_node.secondary_ip)
2249
2250     node_verify_list = [self.cfg.GetMasterNode()]
2251     node_verify_param = {
2252       'nodelist': [node],
2253       # TODO: do a node-net-test as well?
2254     }
2255
2256     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2257                                        self.cfg.GetClusterName())
2258     for verifier in node_verify_list:
2259       if result[verifier].failed or not result[verifier].data:
2260         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2261                                  " for remote verification" % verifier)
2262       if result[verifier].data['nodelist']:
2263         for failed in result[verifier].data['nodelist']:
2264           feedback_fn("ssh/hostname verification failed %s -> %s" %
2265                       (verifier, result[verifier].data['nodelist'][failed]))
2266         raise errors.OpExecError("ssh/hostname verification failed.")
2267
2268     # Distribute updated /etc/hosts and known_hosts to all nodes,
2269     # including the node just added
2270     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2271     dist_nodes = self.cfg.GetNodeList()
2272     if not self.op.readd:
2273       dist_nodes.append(node)
2274     if myself.name in dist_nodes:
2275       dist_nodes.remove(myself.name)
2276
2277     logging.debug("Copying hosts and known_hosts to all nodes")
2278     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2279       result = self.rpc.call_upload_file(dist_nodes, fname)
2280       for to_node, to_result in result.iteritems():
2281         if to_result.failed or not to_result.data:
2282           logging.error("Copy of file %s to node %s failed", fname, to_node)
2283
2284     to_copy = []
2285     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2286     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2287       to_copy.append(constants.VNC_PASSWORD_FILE)
2288
2289     for fname in to_copy:
2290       result = self.rpc.call_upload_file([node], fname)
2291       if result[node].failed or not result[node]:
2292         logging.error("Could not copy file %s to node %s", fname, node)
2293
2294     if self.op.readd:
2295       self.context.ReaddNode(new_node)
2296     else:
2297       self.context.AddNode(new_node)
2298
2299
2300 class LUSetNodeParams(LogicalUnit):
2301   """Modifies the parameters of a node.
2302
2303   """
2304   HPATH = "node-modify"
2305   HTYPE = constants.HTYPE_NODE
2306   _OP_REQP = ["node_name"]
2307   REQ_BGL = False
2308
2309   def CheckArguments(self):
2310     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2311     if node_name is None:
2312       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2313     self.op.node_name = node_name
2314     _CheckBooleanOpField(self.op, 'master_candidate')
2315     _CheckBooleanOpField(self.op, 'offline')
2316     _CheckBooleanOpField(self.op, 'drained')
2317     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2318     if all_mods.count(None) == 3:
2319       raise errors.OpPrereqError("Please pass at least one modification")
2320     if all_mods.count(True) > 1:
2321       raise errors.OpPrereqError("Can't set the node into more than one"
2322                                  " state at the same time")
2323
2324   def ExpandNames(self):
2325     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2326
2327   def BuildHooksEnv(self):
2328     """Build hooks env.
2329
2330     This runs on the master node.
2331
2332     """
2333     env = {
2334       "OP_TARGET": self.op.node_name,
2335       "MASTER_CANDIDATE": str(self.op.master_candidate),
2336       "OFFLINE": str(self.op.offline),
2337       "DRAINED": str(self.op.drained),
2338       }
2339     nl = [self.cfg.GetMasterNode(),
2340           self.op.node_name]
2341     return env, nl, nl
2342
2343   def CheckPrereq(self):
2344     """Check prerequisites.
2345
2346     This only checks the instance list against the existing names.
2347
2348     """
2349     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2350
2351     if ((self.op.master_candidate == False or self.op.offline == True or
2352          self.op.drained == True) and node.master_candidate):
2353       # we will demote the node from master_candidate
2354       if self.op.node_name == self.cfg.GetMasterNode():
2355         raise errors.OpPrereqError("The master node has to be a"
2356                                    " master candidate, online and not drained")
2357       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2358       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2359       if num_candidates <= cp_size:
2360         msg = ("Not enough master candidates (desired"
2361                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2362         if self.op.force:
2363           self.LogWarning(msg)
2364         else:
2365           raise errors.OpPrereqError(msg)
2366
2367     if (self.op.master_candidate == True and
2368         ((node.offline and not self.op.offline == False) or
2369          (node.drained and not self.op.drained == False))):
2370       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2371                                  " to master_candidate" % node.name)
2372
2373     return
2374
2375   def Exec(self, feedback_fn):
2376     """Modifies a node.
2377
2378     """
2379     node = self.node
2380
2381     result = []
2382     changed_mc = False
2383
2384     if self.op.offline is not None:
2385       node.offline = self.op.offline
2386       result.append(("offline", str(self.op.offline)))
2387       if self.op.offline == True:
2388         if node.master_candidate:
2389           node.master_candidate = False
2390           changed_mc = True
2391           result.append(("master_candidate", "auto-demotion due to offline"))
2392         if node.drained:
2393           node.drained = False
2394           result.append(("drained", "clear drained status due to offline"))
2395
2396     if self.op.master_candidate is not None:
2397       node.master_candidate = self.op.master_candidate
2398       changed_mc = True
2399       result.append(("master_candidate", str(self.op.master_candidate)))
2400       if self.op.master_candidate == False:
2401         rrc = self.rpc.call_node_demote_from_mc(node.name)
2402         msg = rrc.RemoteFailMsg()
2403         if msg:
2404           self.LogWarning("Node failed to demote itself: %s" % msg)
2405
2406     if self.op.drained is not None:
2407       node.drained = self.op.drained
2408       result.append(("drained", str(self.op.drained)))
2409       if self.op.drained == True:
2410         if node.master_candidate:
2411           node.master_candidate = False
2412           changed_mc = True
2413           result.append(("master_candidate", "auto-demotion due to drain"))
2414         if node.offline:
2415           node.offline = False
2416           result.append(("offline", "clear offline status due to drain"))
2417
2418     # this will trigger configuration file update, if needed
2419     self.cfg.Update(node)
2420     # this will trigger job queue propagation or cleanup
2421     if changed_mc:
2422       self.context.ReaddNode(node)
2423
2424     return result
2425
2426
2427 class LUQueryClusterInfo(NoHooksLU):
2428   """Query cluster configuration.
2429
2430   """
2431   _OP_REQP = []
2432   REQ_BGL = False
2433
2434   def ExpandNames(self):
2435     self.needed_locks = {}
2436
2437   def CheckPrereq(self):
2438     """No prerequsites needed for this LU.
2439
2440     """
2441     pass
2442
2443   def Exec(self, feedback_fn):
2444     """Return cluster config.
2445
2446     """
2447     cluster = self.cfg.GetClusterInfo()
2448     result = {
2449       "software_version": constants.RELEASE_VERSION,
2450       "protocol_version": constants.PROTOCOL_VERSION,
2451       "config_version": constants.CONFIG_VERSION,
2452       "os_api_version": constants.OS_API_VERSION,
2453       "export_version": constants.EXPORT_VERSION,
2454       "architecture": (platform.architecture()[0], platform.machine()),
2455       "name": cluster.cluster_name,
2456       "master": cluster.master_node,
2457       "default_hypervisor": cluster.default_hypervisor,
2458       "enabled_hypervisors": cluster.enabled_hypervisors,
2459       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2460                         for hypervisor in cluster.enabled_hypervisors]),
2461       "beparams": cluster.beparams,
2462       "candidate_pool_size": cluster.candidate_pool_size,
2463       "default_bridge": cluster.default_bridge,
2464       "master_netdev": cluster.master_netdev,
2465       "volume_group_name": cluster.volume_group_name,
2466       "file_storage_dir": cluster.file_storage_dir,
2467       }
2468
2469     return result
2470
2471
2472 class LUQueryConfigValues(NoHooksLU):
2473   """Return configuration values.
2474
2475   """
2476   _OP_REQP = []
2477   REQ_BGL = False
2478   _FIELDS_DYNAMIC = utils.FieldSet()
2479   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2480
2481   def ExpandNames(self):
2482     self.needed_locks = {}
2483
2484     _CheckOutputFields(static=self._FIELDS_STATIC,
2485                        dynamic=self._FIELDS_DYNAMIC,
2486                        selected=self.op.output_fields)
2487
2488   def CheckPrereq(self):
2489     """No prerequisites.
2490
2491     """
2492     pass
2493
2494   def Exec(self, feedback_fn):
2495     """Dump a representation of the cluster config to the standard output.
2496
2497     """
2498     values = []
2499     for field in self.op.output_fields:
2500       if field == "cluster_name":
2501         entry = self.cfg.GetClusterName()
2502       elif field == "master_node":
2503         entry = self.cfg.GetMasterNode()
2504       elif field == "drain_flag":
2505         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2506       else:
2507         raise errors.ParameterError(field)
2508       values.append(entry)
2509     return values
2510
2511
2512 class LUActivateInstanceDisks(NoHooksLU):
2513   """Bring up an instance's disks.
2514
2515   """
2516   _OP_REQP = ["instance_name"]
2517   REQ_BGL = False
2518
2519   def ExpandNames(self):
2520     self._ExpandAndLockInstance()
2521     self.needed_locks[locking.LEVEL_NODE] = []
2522     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2523
2524   def DeclareLocks(self, level):
2525     if level == locking.LEVEL_NODE:
2526       self._LockInstancesNodes()
2527
2528   def CheckPrereq(self):
2529     """Check prerequisites.
2530
2531     This checks that the instance is in the cluster.
2532
2533     """
2534     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2535     assert self.instance is not None, \
2536       "Cannot retrieve locked instance %s" % self.op.instance_name
2537     _CheckNodeOnline(self, self.instance.primary_node)
2538
2539   def Exec(self, feedback_fn):
2540     """Activate the disks.
2541
2542     """
2543     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2544     if not disks_ok:
2545       raise errors.OpExecError("Cannot activate block devices")
2546
2547     return disks_info
2548
2549
2550 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2551   """Prepare the block devices for an instance.
2552
2553   This sets up the block devices on all nodes.
2554
2555   @type lu: L{LogicalUnit}
2556   @param lu: the logical unit on whose behalf we execute
2557   @type instance: L{objects.Instance}
2558   @param instance: the instance for whose disks we assemble
2559   @type ignore_secondaries: boolean
2560   @param ignore_secondaries: if true, errors on secondary nodes
2561       won't result in an error return from the function
2562   @return: False if the operation failed, otherwise a list of
2563       (host, instance_visible_name, node_visible_name)
2564       with the mapping from node devices to instance devices
2565
2566   """
2567   device_info = []
2568   disks_ok = True
2569   iname = instance.name
2570   # With the two passes mechanism we try to reduce the window of
2571   # opportunity for the race condition of switching DRBD to primary
2572   # before handshaking occured, but we do not eliminate it
2573
2574   # The proper fix would be to wait (with some limits) until the
2575   # connection has been made and drbd transitions from WFConnection
2576   # into any other network-connected state (Connected, SyncTarget,
2577   # SyncSource, etc.)
2578
2579   # 1st pass, assemble on all nodes in secondary mode
2580   for inst_disk in instance.disks:
2581     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2582       lu.cfg.SetDiskID(node_disk, node)
2583       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2584       msg = result.RemoteFailMsg()
2585       if msg:
2586         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2587                            " (is_primary=False, pass=1): %s",
2588                            inst_disk.iv_name, node, msg)
2589         if not ignore_secondaries:
2590           disks_ok = False
2591
2592   # FIXME: race condition on drbd migration to primary
2593
2594   # 2nd pass, do only the primary node
2595   for inst_disk in instance.disks:
2596     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2597       if node != instance.primary_node:
2598         continue
2599       lu.cfg.SetDiskID(node_disk, node)
2600       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2601       msg = result.RemoteFailMsg()
2602       if msg:
2603         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2604                            " (is_primary=True, pass=2): %s",
2605                            inst_disk.iv_name, node, msg)
2606         disks_ok = False
2607     device_info.append((instance.primary_node, inst_disk.iv_name,
2608                         result.payload))
2609
2610   # leave the disks configured for the primary node
2611   # this is a workaround that would be fixed better by
2612   # improving the logical/physical id handling
2613   for disk in instance.disks:
2614     lu.cfg.SetDiskID(disk, instance.primary_node)
2615
2616   return disks_ok, device_info
2617
2618
2619 def _StartInstanceDisks(lu, instance, force):
2620   """Start the disks of an instance.
2621
2622   """
2623   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2624                                            ignore_secondaries=force)
2625   if not disks_ok:
2626     _ShutdownInstanceDisks(lu, instance)
2627     if force is not None and not force:
2628       lu.proc.LogWarning("", hint="If the message above refers to a"
2629                          " secondary node,"
2630                          " you can retry the operation using '--force'.")
2631     raise errors.OpExecError("Disk consistency error")
2632
2633
2634 class LUDeactivateInstanceDisks(NoHooksLU):
2635   """Shutdown an instance's disks.
2636
2637   """
2638   _OP_REQP = ["instance_name"]
2639   REQ_BGL = False
2640
2641   def ExpandNames(self):
2642     self._ExpandAndLockInstance()
2643     self.needed_locks[locking.LEVEL_NODE] = []
2644     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2645
2646   def DeclareLocks(self, level):
2647     if level == locking.LEVEL_NODE:
2648       self._LockInstancesNodes()
2649
2650   def CheckPrereq(self):
2651     """Check prerequisites.
2652
2653     This checks that the instance is in the cluster.
2654
2655     """
2656     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2657     assert self.instance is not None, \
2658       "Cannot retrieve locked instance %s" % self.op.instance_name
2659
2660   def Exec(self, feedback_fn):
2661     """Deactivate the disks
2662
2663     """
2664     instance = self.instance
2665     _SafeShutdownInstanceDisks(self, instance)
2666
2667
2668 def _SafeShutdownInstanceDisks(lu, instance):
2669   """Shutdown block devices of an instance.
2670
2671   This function checks if an instance is running, before calling
2672   _ShutdownInstanceDisks.
2673
2674   """
2675   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2676                                       [instance.hypervisor])
2677   ins_l = ins_l[instance.primary_node]
2678   if ins_l.failed or not isinstance(ins_l.data, list):
2679     raise errors.OpExecError("Can't contact node '%s'" %
2680                              instance.primary_node)
2681
2682   if instance.name in ins_l.data:
2683     raise errors.OpExecError("Instance is running, can't shutdown"
2684                              " block devices.")
2685
2686   _ShutdownInstanceDisks(lu, instance)
2687
2688
2689 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2690   """Shutdown block devices of an instance.
2691
2692   This does the shutdown on all nodes of the instance.
2693
2694   If the ignore_primary is false, errors on the primary node are
2695   ignored.
2696
2697   """
2698   all_result = True
2699   for disk in instance.disks:
2700     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2701       lu.cfg.SetDiskID(top_disk, node)
2702       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2703       msg = result.RemoteFailMsg()
2704       if msg:
2705         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2706                       disk.iv_name, node, msg)
2707         if not ignore_primary or node != instance.primary_node:
2708           all_result = False
2709   return all_result
2710
2711
2712 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2713   """Checks if a node has enough free memory.
2714
2715   This function check if a given node has the needed amount of free
2716   memory. In case the node has less memory or we cannot get the
2717   information from the node, this function raise an OpPrereqError
2718   exception.
2719
2720   @type lu: C{LogicalUnit}
2721   @param lu: a logical unit from which we get configuration data
2722   @type node: C{str}
2723   @param node: the node to check
2724   @type reason: C{str}
2725   @param reason: string to use in the error message
2726   @type requested: C{int}
2727   @param requested: the amount of memory in MiB to check for
2728   @type hypervisor_name: C{str}
2729   @param hypervisor_name: the hypervisor to ask for memory stats
2730   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2731       we cannot check the node
2732
2733   """
2734   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2735   nodeinfo[node].Raise()
2736   free_mem = nodeinfo[node].data.get('memory_free')
2737   if not isinstance(free_mem, int):
2738     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2739                              " was '%s'" % (node, free_mem))
2740   if requested > free_mem:
2741     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2742                              " needed %s MiB, available %s MiB" %
2743                              (node, reason, requested, free_mem))
2744
2745
2746 class LUStartupInstance(LogicalUnit):
2747   """Starts an instance.
2748
2749   """
2750   HPATH = "instance-start"
2751   HTYPE = constants.HTYPE_INSTANCE
2752   _OP_REQP = ["instance_name", "force"]
2753   REQ_BGL = False
2754
2755   def ExpandNames(self):
2756     self._ExpandAndLockInstance()
2757
2758   def BuildHooksEnv(self):
2759     """Build hooks env.
2760
2761     This runs on master, primary and secondary nodes of the instance.
2762
2763     """
2764     env = {
2765       "FORCE": self.op.force,
2766       }
2767     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2768     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2769     return env, nl, nl
2770
2771   def CheckPrereq(self):
2772     """Check prerequisites.
2773
2774     This checks that the instance is in the cluster.
2775
2776     """
2777     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2778     assert self.instance is not None, \
2779       "Cannot retrieve locked instance %s" % self.op.instance_name
2780
2781     # extra beparams
2782     self.beparams = getattr(self.op, "beparams", {})
2783     if self.beparams:
2784       if not isinstance(self.beparams, dict):
2785         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2786                                    " dict" % (type(self.beparams), ))
2787       # fill the beparams dict
2788       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2789       self.op.beparams = self.beparams
2790
2791     # extra hvparams
2792     self.hvparams = getattr(self.op, "hvparams", {})
2793     if self.hvparams:
2794       if not isinstance(self.hvparams, dict):
2795         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2796                                    " dict" % (type(self.hvparams), ))
2797
2798       # check hypervisor parameter syntax (locally)
2799       cluster = self.cfg.GetClusterInfo()
2800       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2801       filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2802                                     instance.hvparams)
2803       filled_hvp.update(self.hvparams)
2804       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2805       hv_type.CheckParameterSyntax(filled_hvp)
2806       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2807       self.op.hvparams = self.hvparams
2808
2809     _CheckNodeOnline(self, instance.primary_node)
2810
2811     bep = self.cfg.GetClusterInfo().FillBE(instance)
2812     # check bridges existance
2813     _CheckInstanceBridgesExist(self, instance)
2814
2815     remote_info = self.rpc.call_instance_info(instance.primary_node,
2816                                               instance.name,
2817                                               instance.hypervisor)
2818     remote_info.Raise()
2819     if not remote_info.data:
2820       _CheckNodeFreeMemory(self, instance.primary_node,
2821                            "starting instance %s" % instance.name,
2822                            bep[constants.BE_MEMORY], instance.hypervisor)
2823
2824   def Exec(self, feedback_fn):
2825     """Start the instance.
2826
2827     """
2828     instance = self.instance
2829     force = self.op.force
2830
2831     self.cfg.MarkInstanceUp(instance.name)
2832
2833     node_current = instance.primary_node
2834
2835     _StartInstanceDisks(self, instance, force)
2836
2837     result = self.rpc.call_instance_start(node_current, instance,
2838                                           self.hvparams, self.beparams)
2839     msg = result.RemoteFailMsg()
2840     if msg:
2841       _ShutdownInstanceDisks(self, instance)
2842       raise errors.OpExecError("Could not start instance: %s" % msg)
2843
2844
2845 class LURebootInstance(LogicalUnit):
2846   """Reboot an instance.
2847
2848   """
2849   HPATH = "instance-reboot"
2850   HTYPE = constants.HTYPE_INSTANCE
2851   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2852   REQ_BGL = False
2853
2854   def ExpandNames(self):
2855     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2856                                    constants.INSTANCE_REBOOT_HARD,
2857                                    constants.INSTANCE_REBOOT_FULL]:
2858       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2859                                   (constants.INSTANCE_REBOOT_SOFT,
2860                                    constants.INSTANCE_REBOOT_HARD,
2861                                    constants.INSTANCE_REBOOT_FULL))
2862     self._ExpandAndLockInstance()
2863
2864   def BuildHooksEnv(self):
2865     """Build hooks env.
2866
2867     This runs on master, primary and secondary nodes of the instance.
2868
2869     """
2870     env = {
2871       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2872       "REBOOT_TYPE": self.op.reboot_type,
2873       }
2874     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2875     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2876     return env, nl, nl
2877
2878   def CheckPrereq(self):
2879     """Check prerequisites.
2880
2881     This checks that the instance is in the cluster.
2882
2883     """
2884     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2885     assert self.instance is not None, \
2886       "Cannot retrieve locked instance %s" % self.op.instance_name
2887
2888     _CheckNodeOnline(self, instance.primary_node)
2889
2890     # check bridges existance
2891     _CheckInstanceBridgesExist(self, instance)
2892
2893   def Exec(self, feedback_fn):
2894     """Reboot the instance.
2895
2896     """
2897     instance = self.instance
2898     ignore_secondaries = self.op.ignore_secondaries
2899     reboot_type = self.op.reboot_type
2900
2901     node_current = instance.primary_node
2902
2903     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2904                        constants.INSTANCE_REBOOT_HARD]:
2905       for disk in instance.disks:
2906         self.cfg.SetDiskID(disk, node_current)
2907       result = self.rpc.call_instance_reboot(node_current, instance,
2908                                              reboot_type)
2909       msg = result.RemoteFailMsg()
2910       if msg:
2911         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2912     else:
2913       result = self.rpc.call_instance_shutdown(node_current, instance)
2914       msg = result.RemoteFailMsg()
2915       if msg:
2916         raise errors.OpExecError("Could not shutdown instance for"
2917                                  " full reboot: %s" % msg)
2918       _ShutdownInstanceDisks(self, instance)
2919       _StartInstanceDisks(self, instance, ignore_secondaries)
2920       result = self.rpc.call_instance_start(node_current, instance, None, None)
2921       msg = result.RemoteFailMsg()
2922       if msg:
2923         _ShutdownInstanceDisks(self, instance)
2924         raise errors.OpExecError("Could not start instance for"
2925                                  " full reboot: %s" % msg)
2926
2927     self.cfg.MarkInstanceUp(instance.name)
2928
2929
2930 class LUShutdownInstance(LogicalUnit):
2931   """Shutdown an instance.
2932
2933   """
2934   HPATH = "instance-stop"
2935   HTYPE = constants.HTYPE_INSTANCE
2936   _OP_REQP = ["instance_name"]
2937   REQ_BGL = False
2938
2939   def ExpandNames(self):
2940     self._ExpandAndLockInstance()
2941
2942   def BuildHooksEnv(self):
2943     """Build hooks env.
2944
2945     This runs on master, primary and secondary nodes of the instance.
2946
2947     """
2948     env = _BuildInstanceHookEnvByObject(self, self.instance)
2949     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2950     return env, nl, nl
2951
2952   def CheckPrereq(self):
2953     """Check prerequisites.
2954
2955     This checks that the instance is in the cluster.
2956
2957     """
2958     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2959     assert self.instance is not None, \
2960       "Cannot retrieve locked instance %s" % self.op.instance_name
2961     _CheckNodeOnline(self, self.instance.primary_node)
2962
2963   def Exec(self, feedback_fn):
2964     """Shutdown the instance.
2965
2966     """
2967     instance = self.instance
2968     node_current = instance.primary_node
2969     self.cfg.MarkInstanceDown(instance.name)
2970     result = self.rpc.call_instance_shutdown(node_current, instance)
2971     msg = result.RemoteFailMsg()
2972     if msg:
2973       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2974
2975     _ShutdownInstanceDisks(self, instance)
2976
2977
2978 class LUReinstallInstance(LogicalUnit):
2979   """Reinstall an instance.
2980
2981   """
2982   HPATH = "instance-reinstall"
2983   HTYPE = constants.HTYPE_INSTANCE
2984   _OP_REQP = ["instance_name"]
2985   REQ_BGL = False
2986
2987   def ExpandNames(self):
2988     self._ExpandAndLockInstance()
2989
2990   def BuildHooksEnv(self):
2991     """Build hooks env.
2992
2993     This runs on master, primary and secondary nodes of the instance.
2994
2995     """
2996     env = _BuildInstanceHookEnvByObject(self, self.instance)
2997     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2998     return env, nl, nl
2999
3000   def CheckPrereq(self):
3001     """Check prerequisites.
3002
3003     This checks that the instance is in the cluster and is not running.
3004
3005     """
3006     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3007     assert instance is not None, \
3008       "Cannot retrieve locked instance %s" % self.op.instance_name
3009     _CheckNodeOnline(self, instance.primary_node)
3010
3011     if instance.disk_template == constants.DT_DISKLESS:
3012       raise errors.OpPrereqError("Instance '%s' has no disks" %
3013                                  self.op.instance_name)
3014     if instance.admin_up:
3015       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3016                                  self.op.instance_name)
3017     remote_info = self.rpc.call_instance_info(instance.primary_node,
3018                                               instance.name,
3019                                               instance.hypervisor)
3020     remote_info.Raise()
3021     if remote_info.data:
3022       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3023                                  (self.op.instance_name,
3024                                   instance.primary_node))
3025
3026     self.op.os_type = getattr(self.op, "os_type", None)
3027     if self.op.os_type is not None:
3028       # OS verification
3029       pnode = self.cfg.GetNodeInfo(
3030         self.cfg.ExpandNodeName(instance.primary_node))
3031       if pnode is None:
3032         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3033                                    self.op.pnode)
3034       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3035       result.Raise()
3036       if not isinstance(result.data, objects.OS):
3037         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3038                                    " primary node"  % self.op.os_type)
3039
3040     self.instance = instance
3041
3042   def Exec(self, feedback_fn):
3043     """Reinstall the instance.
3044
3045     """
3046     inst = self.instance
3047
3048     if self.op.os_type is not None:
3049       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3050       inst.os = self.op.os_type
3051       self.cfg.Update(inst)
3052
3053     _StartInstanceDisks(self, inst, None)
3054     try:
3055       feedback_fn("Running the instance OS create scripts...")
3056       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3057       msg = result.RemoteFailMsg()
3058       if msg:
3059         raise errors.OpExecError("Could not install OS for instance %s"
3060                                  " on node %s: %s" %
3061                                  (inst.name, inst.primary_node, msg))
3062     finally:
3063       _ShutdownInstanceDisks(self, inst)
3064
3065
3066 class LURenameInstance(LogicalUnit):
3067   """Rename an instance.
3068
3069   """
3070   HPATH = "instance-rename"
3071   HTYPE = constants.HTYPE_INSTANCE
3072   _OP_REQP = ["instance_name", "new_name"]
3073
3074   def BuildHooksEnv(self):
3075     """Build hooks env.
3076
3077     This runs on master, primary and secondary nodes of the instance.
3078
3079     """
3080     env = _BuildInstanceHookEnvByObject(self, self.instance)
3081     env["INSTANCE_NEW_NAME"] = self.op.new_name
3082     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3083     return env, nl, nl
3084
3085   def CheckPrereq(self):
3086     """Check prerequisites.
3087
3088     This checks that the instance is in the cluster and is not running.
3089
3090     """
3091     instance = self.cfg.GetInstanceInfo(
3092       self.cfg.ExpandInstanceName(self.op.instance_name))
3093     if instance is None:
3094       raise errors.OpPrereqError("Instance '%s' not known" %
3095                                  self.op.instance_name)
3096     _CheckNodeOnline(self, instance.primary_node)
3097
3098     if instance.admin_up:
3099       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3100                                  self.op.instance_name)
3101     remote_info = self.rpc.call_instance_info(instance.primary_node,
3102                                               instance.name,
3103                                               instance.hypervisor)
3104     remote_info.Raise()
3105     if remote_info.data:
3106       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3107                                  (self.op.instance_name,
3108                                   instance.primary_node))
3109     self.instance = instance
3110
3111     # new name verification
3112     name_info = utils.HostInfo(self.op.new_name)
3113
3114     self.op.new_name = new_name = name_info.name
3115     instance_list = self.cfg.GetInstanceList()
3116     if new_name in instance_list:
3117       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3118                                  new_name)
3119
3120     if not getattr(self.op, "ignore_ip", False):
3121       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3122         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3123                                    (name_info.ip, new_name))
3124
3125
3126   def Exec(self, feedback_fn):
3127     """Reinstall the instance.
3128
3129     """
3130     inst = self.instance
3131     old_name = inst.name
3132
3133     if inst.disk_template == constants.DT_FILE:
3134       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3135
3136     self.cfg.RenameInstance(inst.name, self.op.new_name)
3137     # Change the instance lock. This is definitely safe while we hold the BGL
3138     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3139     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3140
3141     # re-read the instance from the configuration after rename
3142     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3143
3144     if inst.disk_template == constants.DT_FILE:
3145       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3146       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3147                                                      old_file_storage_dir,
3148                                                      new_file_storage_dir)
3149       result.Raise()
3150       if not result.data:
3151         raise errors.OpExecError("Could not connect to node '%s' to rename"
3152                                  " directory '%s' to '%s' (but the instance"
3153                                  " has been renamed in Ganeti)" % (
3154                                  inst.primary_node, old_file_storage_dir,
3155                                  new_file_storage_dir))
3156
3157       if not result.data[0]:
3158         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3159                                  " (but the instance has been renamed in"
3160                                  " Ganeti)" % (old_file_storage_dir,
3161                                                new_file_storage_dir))
3162
3163     _StartInstanceDisks(self, inst, None)
3164     try:
3165       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3166                                                  old_name)
3167       msg = result.RemoteFailMsg()
3168       if msg:
3169         msg = ("Could not run OS rename script for instance %s on node %s"
3170                " (but the instance has been renamed in Ganeti): %s" %
3171                (inst.name, inst.primary_node, msg))
3172         self.proc.LogWarning(msg)
3173     finally:
3174       _ShutdownInstanceDisks(self, inst)
3175
3176
3177 class LURemoveInstance(LogicalUnit):
3178   """Remove an instance.
3179
3180   """
3181   HPATH = "instance-remove"
3182   HTYPE = constants.HTYPE_INSTANCE
3183   _OP_REQP = ["instance_name", "ignore_failures"]
3184   REQ_BGL = False
3185
3186   def ExpandNames(self):
3187     self._ExpandAndLockInstance()
3188     self.needed_locks[locking.LEVEL_NODE] = []
3189     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3190
3191   def DeclareLocks(self, level):
3192     if level == locking.LEVEL_NODE:
3193       self._LockInstancesNodes()
3194
3195   def BuildHooksEnv(self):
3196     """Build hooks env.
3197
3198     This runs on master, primary and secondary nodes of the instance.
3199
3200     """
3201     env = _BuildInstanceHookEnvByObject(self, self.instance)
3202     nl = [self.cfg.GetMasterNode()]
3203     return env, nl, nl
3204
3205   def CheckPrereq(self):
3206     """Check prerequisites.
3207
3208     This checks that the instance is in the cluster.
3209
3210     """
3211     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3212     assert self.instance is not None, \
3213       "Cannot retrieve locked instance %s" % self.op.instance_name
3214
3215   def Exec(self, feedback_fn):
3216     """Remove the instance.
3217
3218     """
3219     instance = self.instance
3220     logging.info("Shutting down instance %s on node %s",
3221                  instance.name, instance.primary_node)
3222
3223     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3224     msg = result.RemoteFailMsg()
3225     if msg:
3226       if self.op.ignore_failures:
3227         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3228       else:
3229         raise errors.OpExecError("Could not shutdown instance %s on"
3230                                  " node %s: %s" %
3231                                  (instance.name, instance.primary_node, msg))
3232
3233     logging.info("Removing block devices for instance %s", instance.name)
3234
3235     if not _RemoveDisks(self, instance):
3236       if self.op.ignore_failures:
3237         feedback_fn("Warning: can't remove instance's disks")
3238       else:
3239         raise errors.OpExecError("Can't remove instance's disks")
3240
3241     logging.info("Removing instance %s out of cluster config", instance.name)
3242
3243     self.cfg.RemoveInstance(instance.name)
3244     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3245
3246
3247 class LUQueryInstances(NoHooksLU):
3248   """Logical unit for querying instances.
3249
3250   """
3251   _OP_REQP = ["output_fields", "names", "use_locking"]
3252   REQ_BGL = False
3253   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3254                                     "admin_state",
3255                                     "disk_template", "ip", "mac", "bridge",
3256                                     "sda_size", "sdb_size", "vcpus", "tags",
3257                                     "network_port", "beparams",
3258                                     r"(disk)\.(size)/([0-9]+)",
3259                                     r"(disk)\.(sizes)", "disk_usage",
3260                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3261                                     r"(nic)\.(macs|ips|bridges)",
3262                                     r"(disk|nic)\.(count)",
3263                                     "serial_no", "hypervisor", "hvparams",] +
3264                                   ["hv/%s" % name
3265                                    for name in constants.HVS_PARAMETERS] +
3266                                   ["be/%s" % name
3267                                    for name in constants.BES_PARAMETERS])
3268   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3269
3270
3271   def ExpandNames(self):
3272     _CheckOutputFields(static=self._FIELDS_STATIC,
3273                        dynamic=self._FIELDS_DYNAMIC,
3274                        selected=self.op.output_fields)
3275
3276     self.needed_locks = {}
3277     self.share_locks[locking.LEVEL_INSTANCE] = 1
3278     self.share_locks[locking.LEVEL_NODE] = 1
3279
3280     if self.op.names:
3281       self.wanted = _GetWantedInstances(self, self.op.names)
3282     else:
3283       self.wanted = locking.ALL_SET
3284
3285     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3286     self.do_locking = self.do_node_query and self.op.use_locking
3287     if self.do_locking:
3288       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3289       self.needed_locks[locking.LEVEL_NODE] = []
3290       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3291
3292   def DeclareLocks(self, level):
3293     if level == locking.LEVEL_NODE and self.do_locking:
3294       self._LockInstancesNodes()
3295
3296   def CheckPrereq(self):
3297     """Check prerequisites.
3298
3299     """
3300     pass
3301
3302   def Exec(self, feedback_fn):
3303     """Computes the list of nodes and their attributes.
3304
3305     """
3306     all_info = self.cfg.GetAllInstancesInfo()
3307     if self.wanted == locking.ALL_SET:
3308       # caller didn't specify instance names, so ordering is not important
3309       if self.do_locking:
3310         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3311       else:
3312         instance_names = all_info.keys()
3313       instance_names = utils.NiceSort(instance_names)
3314     else:
3315       # caller did specify names, so we must keep the ordering
3316       if self.do_locking:
3317         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3318       else:
3319         tgt_set = all_info.keys()
3320       missing = set(self.wanted).difference(tgt_set)
3321       if missing:
3322         raise errors.OpExecError("Some instances were removed before"
3323                                  " retrieving their data: %s" % missing)
3324       instance_names = self.wanted
3325
3326     instance_list = [all_info[iname] for iname in instance_names]
3327
3328     # begin data gathering
3329
3330     nodes = frozenset([inst.primary_node for inst in instance_list])
3331     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3332
3333     bad_nodes = []
3334     off_nodes = []
3335     if self.do_node_query:
3336       live_data = {}
3337       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3338       for name in nodes:
3339         result = node_data[name]
3340         if result.offline:
3341           # offline nodes will be in both lists
3342           off_nodes.append(name)
3343         if result.failed:
3344           bad_nodes.append(name)
3345         else:
3346           if result.data:
3347             live_data.update(result.data)
3348             # else no instance is alive
3349     else:
3350       live_data = dict([(name, {}) for name in instance_names])
3351
3352     # end data gathering
3353
3354     HVPREFIX = "hv/"
3355     BEPREFIX = "be/"
3356     output = []
3357     for instance in instance_list:
3358       iout = []
3359       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3360       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3361       for field in self.op.output_fields:
3362         st_match = self._FIELDS_STATIC.Matches(field)
3363         if field == "name":
3364           val = instance.name
3365         elif field == "os":
3366           val = instance.os
3367         elif field == "pnode":
3368           val = instance.primary_node
3369         elif field == "snodes":
3370           val = list(instance.secondary_nodes)
3371         elif field == "admin_state":
3372           val = instance.admin_up
3373         elif field == "oper_state":
3374           if instance.primary_node in bad_nodes:
3375             val = None
3376           else:
3377             val = bool(live_data.get(instance.name))
3378         elif field == "status":
3379           if instance.primary_node in off_nodes:
3380             val = "ERROR_nodeoffline"
3381           elif instance.primary_node in bad_nodes:
3382             val = "ERROR_nodedown"
3383           else:
3384             running = bool(live_data.get(instance.name))
3385             if running:
3386               if instance.admin_up:
3387                 val = "running"
3388               else:
3389                 val = "ERROR_up"
3390             else:
3391               if instance.admin_up:
3392                 val = "ERROR_down"
3393               else:
3394                 val = "ADMIN_down"
3395         elif field == "oper_ram":
3396           if instance.primary_node in bad_nodes:
3397             val = None
3398           elif instance.name in live_data:
3399             val = live_data[instance.name].get("memory", "?")
3400           else:
3401             val = "-"
3402         elif field == "disk_template":
3403           val = instance.disk_template
3404         elif field == "ip":
3405           val = instance.nics[0].ip
3406         elif field == "bridge":
3407           val = instance.nics[0].bridge
3408         elif field == "mac":
3409           val = instance.nics[0].mac
3410         elif field == "sda_size" or field == "sdb_size":
3411           idx = ord(field[2]) - ord('a')
3412           try:
3413             val = instance.FindDisk(idx).size
3414           except errors.OpPrereqError:
3415             val = None
3416         elif field == "disk_usage": # total disk usage per node
3417           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3418           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3419         elif field == "tags":
3420           val = list(instance.GetTags())
3421         elif field == "serial_no":
3422           val = instance.serial_no
3423         elif field == "network_port":
3424           val = instance.network_port
3425         elif field == "hypervisor":
3426           val = instance.hypervisor
3427         elif field == "hvparams":
3428           val = i_hv
3429         elif (field.startswith(HVPREFIX) and
3430               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3431           val = i_hv.get(field[len(HVPREFIX):], None)
3432         elif field == "beparams":
3433           val = i_be
3434         elif (field.startswith(BEPREFIX) and
3435               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3436           val = i_be.get(field[len(BEPREFIX):], None)
3437         elif st_match and st_match.groups():
3438           # matches a variable list
3439           st_groups = st_match.groups()
3440           if st_groups and st_groups[0] == "disk":
3441             if st_groups[1] == "count":
3442               val = len(instance.disks)
3443             elif st_groups[1] == "sizes":
3444               val = [disk.size for disk in instance.disks]
3445             elif st_groups[1] == "size":
3446               try:
3447                 val = instance.FindDisk(st_groups[2]).size
3448               except errors.OpPrereqError:
3449                 val = None
3450             else:
3451               assert False, "Unhandled disk parameter"
3452           elif st_groups[0] == "nic":
3453             if st_groups[1] == "count":
3454               val = len(instance.nics)
3455             elif st_groups[1] == "macs":
3456               val = [nic.mac for nic in instance.nics]
3457             elif st_groups[1] == "ips":
3458               val = [nic.ip for nic in instance.nics]
3459             elif st_groups[1] == "bridges":
3460               val = [nic.bridge for nic in instance.nics]
3461             else:
3462               # index-based item
3463               nic_idx = int(st_groups[2])
3464               if nic_idx >= len(instance.nics):
3465                 val = None
3466               else:
3467                 if st_groups[1] == "mac":
3468                   val = instance.nics[nic_idx].mac
3469                 elif st_groups[1] == "ip":
3470                   val = instance.nics[nic_idx].ip
3471                 elif st_groups[1] == "bridge":
3472                   val = instance.nics[nic_idx].bridge
3473                 else:
3474                   assert False, "Unhandled NIC parameter"
3475           else:
3476             assert False, "Unhandled variable parameter"
3477         else:
3478           raise errors.ParameterError(field)
3479         iout.append(val)
3480       output.append(iout)
3481
3482     return output
3483
3484
3485 class LUFailoverInstance(LogicalUnit):
3486   """Failover an instance.
3487
3488   """
3489   HPATH = "instance-failover"
3490   HTYPE = constants.HTYPE_INSTANCE
3491   _OP_REQP = ["instance_name", "ignore_consistency"]
3492   REQ_BGL = False
3493
3494   def ExpandNames(self):
3495     self._ExpandAndLockInstance()
3496     self.needed_locks[locking.LEVEL_NODE] = []
3497     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3498
3499   def DeclareLocks(self, level):
3500     if level == locking.LEVEL_NODE:
3501       self._LockInstancesNodes()
3502
3503   def BuildHooksEnv(self):
3504     """Build hooks env.
3505
3506     This runs on master, primary and secondary nodes of the instance.
3507
3508     """
3509     env = {
3510       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3511       }
3512     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3513     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3514     return env, nl, nl
3515
3516   def CheckPrereq(self):
3517     """Check prerequisites.
3518
3519     This checks that the instance is in the cluster.
3520
3521     """
3522     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3523     assert self.instance is not None, \
3524       "Cannot retrieve locked instance %s" % self.op.instance_name
3525
3526     bep = self.cfg.GetClusterInfo().FillBE(instance)
3527     if instance.disk_template not in constants.DTS_NET_MIRROR:
3528       raise errors.OpPrereqError("Instance's disk layout is not"
3529                                  " network mirrored, cannot failover.")
3530
3531     secondary_nodes = instance.secondary_nodes
3532     if not secondary_nodes:
3533       raise errors.ProgrammerError("no secondary node but using "
3534                                    "a mirrored disk template")
3535
3536     target_node = secondary_nodes[0]
3537     _CheckNodeOnline(self, target_node)
3538     _CheckNodeNotDrained(self, target_node)
3539     # check memory requirements on the secondary node
3540     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3541                          instance.name, bep[constants.BE_MEMORY],
3542                          instance.hypervisor)
3543
3544     # check bridge existance
3545     brlist = [nic.bridge for nic in instance.nics]
3546     result = self.rpc.call_bridges_exist(target_node, brlist)
3547     result.Raise()
3548     if not result.data:
3549       raise errors.OpPrereqError("One or more target bridges %s does not"
3550                                  " exist on destination node '%s'" %
3551                                  (brlist, target_node))
3552
3553   def Exec(self, feedback_fn):
3554     """Failover an instance.
3555
3556     The failover is done by shutting it down on its present node and
3557     starting it on the secondary.
3558
3559     """
3560     instance = self.instance
3561
3562     source_node = instance.primary_node
3563     target_node = instance.secondary_nodes[0]
3564
3565     feedback_fn("* checking disk consistency between source and target")
3566     for dev in instance.disks:
3567       # for drbd, these are drbd over lvm
3568       if not _CheckDiskConsistency(self, dev, target_node, False):
3569         if instance.admin_up and not self.op.ignore_consistency:
3570           raise errors.OpExecError("Disk %s is degraded on target node,"
3571                                    " aborting failover." % dev.iv_name)
3572
3573     feedback_fn("* shutting down instance on source node")
3574     logging.info("Shutting down instance %s on node %s",
3575                  instance.name, source_node)
3576
3577     result = self.rpc.call_instance_shutdown(source_node, instance)
3578     msg = result.RemoteFailMsg()
3579     if msg:
3580       if self.op.ignore_consistency:
3581         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3582                              " Proceeding anyway. Please make sure node"
3583                              " %s is down. Error details: %s",
3584                              instance.name, source_node, source_node, msg)
3585       else:
3586         raise errors.OpExecError("Could not shutdown instance %s on"
3587                                  " node %s: %s" %
3588                                  (instance.name, source_node, msg))
3589
3590     feedback_fn("* deactivating the instance's disks on source node")
3591     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3592       raise errors.OpExecError("Can't shut down the instance's disks.")
3593
3594     instance.primary_node = target_node
3595     # distribute new instance config to the other nodes
3596     self.cfg.Update(instance)
3597
3598     # Only start the instance if it's marked as up
3599     if instance.admin_up:
3600       feedback_fn("* activating the instance's disks on target node")
3601       logging.info("Starting instance %s on node %s",
3602                    instance.name, target_node)
3603
3604       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3605                                                ignore_secondaries=True)
3606       if not disks_ok:
3607         _ShutdownInstanceDisks(self, instance)
3608         raise errors.OpExecError("Can't activate the instance's disks")
3609
3610       feedback_fn("* starting the instance on the target node")
3611       result = self.rpc.call_instance_start(target_node, instance, None, None)
3612       msg = result.RemoteFailMsg()
3613       if msg:
3614         _ShutdownInstanceDisks(self, instance)
3615         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3616                                  (instance.name, target_node, msg))
3617
3618
3619 class LUMigrateInstance(LogicalUnit):
3620   """Migrate an instance.
3621
3622   This is migration without shutting down, compared to the failover,
3623   which is done with shutdown.
3624
3625   """
3626   HPATH = "instance-migrate"
3627   HTYPE = constants.HTYPE_INSTANCE
3628   _OP_REQP = ["instance_name", "live", "cleanup"]
3629
3630   REQ_BGL = False
3631
3632   def ExpandNames(self):
3633     self._ExpandAndLockInstance()
3634     self.needed_locks[locking.LEVEL_NODE] = []
3635     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3636
3637   def DeclareLocks(self, level):
3638     if level == locking.LEVEL_NODE:
3639       self._LockInstancesNodes()
3640
3641   def BuildHooksEnv(self):
3642     """Build hooks env.
3643
3644     This runs on master, primary and secondary nodes of the instance.
3645
3646     """
3647     env = _BuildInstanceHookEnvByObject(self, self.instance)
3648     env["MIGRATE_LIVE"] = self.op.live
3649     env["MIGRATE_CLEANUP"] = self.op.cleanup
3650     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3651     return env, nl, nl
3652
3653   def CheckPrereq(self):
3654     """Check prerequisites.
3655
3656     This checks that the instance is in the cluster.
3657
3658     """
3659     instance = self.cfg.GetInstanceInfo(
3660       self.cfg.ExpandInstanceName(self.op.instance_name))
3661     if instance is None:
3662       raise errors.OpPrereqError("Instance '%s' not known" %
3663                                  self.op.instance_name)
3664
3665     if instance.disk_template != constants.DT_DRBD8:
3666       raise errors.OpPrereqError("Instance's disk layout is not"
3667                                  " drbd8, cannot migrate.")
3668
3669     secondary_nodes = instance.secondary_nodes
3670     if not secondary_nodes:
3671       raise errors.ConfigurationError("No secondary node but using"
3672                                       " drbd8 disk template")
3673
3674     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3675
3676     target_node = secondary_nodes[0]
3677     # check memory requirements on the secondary node
3678     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3679                          instance.name, i_be[constants.BE_MEMORY],
3680                          instance.hypervisor)
3681
3682     # check bridge existance
3683     brlist = [nic.bridge for nic in instance.nics]
3684     result = self.rpc.call_bridges_exist(target_node, brlist)
3685     if result.failed or not result.data:
3686       raise errors.OpPrereqError("One or more target bridges %s does not"
3687                                  " exist on destination node '%s'" %
3688                                  (brlist, target_node))
3689
3690     if not self.op.cleanup:
3691       _CheckNodeNotDrained(self, target_node)
3692       result = self.rpc.call_instance_migratable(instance.primary_node,
3693                                                  instance)
3694       msg = result.RemoteFailMsg()
3695       if msg:
3696         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3697                                    msg)
3698
3699     self.instance = instance
3700
3701   def _WaitUntilSync(self):
3702     """Poll with custom rpc for disk sync.
3703
3704     This uses our own step-based rpc call.
3705
3706     """
3707     self.feedback_fn("* wait until resync is done")
3708     all_done = False
3709     while not all_done:
3710       all_done = True
3711       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3712                                             self.nodes_ip,
3713                                             self.instance.disks)
3714       min_percent = 100
3715       for node, nres in result.items():
3716         msg = nres.RemoteFailMsg()
3717         if msg:
3718           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3719                                    (node, msg))
3720         node_done, node_percent = nres.payload
3721         all_done = all_done and node_done
3722         if node_percent is not None:
3723           min_percent = min(min_percent, node_percent)
3724       if not all_done:
3725         if min_percent < 100:
3726           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3727         time.sleep(2)
3728
3729   def _EnsureSecondary(self, node):
3730     """Demote a node to secondary.
3731
3732     """
3733     self.feedback_fn("* switching node %s to secondary mode" % node)
3734
3735     for dev in self.instance.disks:
3736       self.cfg.SetDiskID(dev, node)
3737
3738     result = self.rpc.call_blockdev_close(node, self.instance.name,
3739                                           self.instance.disks)
3740     msg = result.RemoteFailMsg()
3741     if msg:
3742       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3743                                " error %s" % (node, msg))
3744
3745   def _GoStandalone(self):
3746     """Disconnect from the network.
3747
3748     """
3749     self.feedback_fn("* changing into standalone mode")
3750     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3751                                                self.instance.disks)
3752     for node, nres in result.items():
3753       msg = nres.RemoteFailMsg()
3754       if msg:
3755         raise errors.OpExecError("Cannot disconnect disks node %s,"
3756                                  " error %s" % (node, msg))
3757
3758   def _GoReconnect(self, multimaster):
3759     """Reconnect to the network.
3760
3761     """
3762     if multimaster:
3763       msg = "dual-master"
3764     else:
3765       msg = "single-master"
3766     self.feedback_fn("* changing disks into %s mode" % msg)
3767     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3768                                            self.instance.disks,
3769                                            self.instance.name, multimaster)
3770     for node, nres in result.items():
3771       msg = nres.RemoteFailMsg()
3772       if msg:
3773         raise errors.OpExecError("Cannot change disks config on node %s,"
3774                                  " error: %s" % (node, msg))
3775
3776   def _ExecCleanup(self):
3777     """Try to cleanup after a failed migration.
3778
3779     The cleanup is done by:
3780       - check that the instance is running only on one node
3781         (and update the config if needed)
3782       - change disks on its secondary node to secondary
3783       - wait until disks are fully synchronized
3784       - disconnect from the network
3785       - change disks into single-master mode
3786       - wait again until disks are fully synchronized
3787
3788     """
3789     instance = self.instance
3790     target_node = self.target_node
3791     source_node = self.source_node
3792
3793     # check running on only one node
3794     self.feedback_fn("* checking where the instance actually runs"
3795                      " (if this hangs, the hypervisor might be in"
3796                      " a bad state)")
3797     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3798     for node, result in ins_l.items():
3799       result.Raise()
3800       if not isinstance(result.data, list):
3801         raise errors.OpExecError("Can't contact node '%s'" % node)
3802
3803     runningon_source = instance.name in ins_l[source_node].data
3804     runningon_target = instance.name in ins_l[target_node].data
3805
3806     if runningon_source and runningon_target:
3807       raise errors.OpExecError("Instance seems to be running on two nodes,"
3808                                " or the hypervisor is confused. You will have"
3809                                " to ensure manually that it runs only on one"
3810                                " and restart this operation.")
3811
3812     if not (runningon_source or runningon_target):
3813       raise errors.OpExecError("Instance does not seem to be running at all."
3814                                " In this case, it's safer to repair by"
3815                                " running 'gnt-instance stop' to ensure disk"
3816                                " shutdown, and then restarting it.")
3817
3818     if runningon_target:
3819       # the migration has actually succeeded, we need to update the config
3820       self.feedback_fn("* instance running on secondary node (%s),"
3821                        " updating config" % target_node)
3822       instance.primary_node = target_node
3823       self.cfg.Update(instance)
3824       demoted_node = source_node
3825     else:
3826       self.feedback_fn("* instance confirmed to be running on its"
3827                        " primary node (%s)" % source_node)
3828       demoted_node = target_node
3829
3830     self._EnsureSecondary(demoted_node)
3831     try:
3832       self._WaitUntilSync()
3833     except errors.OpExecError:
3834       # we ignore here errors, since if the device is standalone, it
3835       # won't be able to sync
3836       pass
3837     self._GoStandalone()
3838     self._GoReconnect(False)
3839     self._WaitUntilSync()
3840
3841     self.feedback_fn("* done")
3842
3843   def _RevertDiskStatus(self):
3844     """Try to revert the disk status after a failed migration.
3845
3846     """
3847     target_node = self.target_node
3848     try:
3849       self._EnsureSecondary(target_node)
3850       self._GoStandalone()
3851       self._GoReconnect(False)
3852       self._WaitUntilSync()
3853     except errors.OpExecError, err:
3854       self.LogWarning("Migration failed and I can't reconnect the"
3855                       " drives: error '%s'\n"
3856                       "Please look and recover the instance status" %
3857                       str(err))
3858
3859   def _AbortMigration(self):
3860     """Call the hypervisor code to abort a started migration.
3861
3862     """
3863     instance = self.instance
3864     target_node = self.target_node
3865     migration_info = self.migration_info
3866
3867     abort_result = self.rpc.call_finalize_migration(target_node,
3868                                                     instance,
3869                                                     migration_info,
3870                                                     False)
3871     abort_msg = abort_result.RemoteFailMsg()
3872     if abort_msg:
3873       logging.error("Aborting migration failed on target node %s: %s" %
3874                     (target_node, abort_msg))
3875       # Don't raise an exception here, as we stil have to try to revert the
3876       # disk status, even if this step failed.
3877
3878   def _ExecMigration(self):
3879     """Migrate an instance.
3880
3881     The migrate is done by:
3882       - change the disks into dual-master mode
3883       - wait until disks are fully synchronized again
3884       - migrate the instance
3885       - change disks on the new secondary node (the old primary) to secondary
3886       - wait until disks are fully synchronized
3887       - change disks into single-master mode
3888
3889     """
3890     instance = self.instance
3891     target_node = self.target_node
3892     source_node = self.source_node
3893
3894     self.feedback_fn("* checking disk consistency between source and target")
3895     for dev in instance.disks:
3896       if not _CheckDiskConsistency(self, dev, target_node, False):
3897         raise errors.OpExecError("Disk %s is degraded or not fully"
3898                                  " synchronized on target node,"
3899                                  " aborting migrate." % dev.iv_name)
3900
3901     # First get the migration information from the remote node
3902     result = self.rpc.call_migration_info(source_node, instance)
3903     msg = result.RemoteFailMsg()
3904     if msg:
3905       log_err = ("Failed fetching source migration information from %s: %s" %
3906                  (source_node, msg))
3907       logging.error(log_err)
3908       raise errors.OpExecError(log_err)
3909
3910     self.migration_info = migration_info = result.payload
3911
3912     # Then switch the disks to master/master mode
3913     self._EnsureSecondary(target_node)
3914     self._GoStandalone()
3915     self._GoReconnect(True)
3916     self._WaitUntilSync()
3917
3918     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3919     result = self.rpc.call_accept_instance(target_node,
3920                                            instance,
3921                                            migration_info,
3922                                            self.nodes_ip[target_node])
3923
3924     msg = result.RemoteFailMsg()
3925     if msg:
3926       logging.error("Instance pre-migration failed, trying to revert"
3927                     " disk status: %s", msg)
3928       self._AbortMigration()
3929       self._RevertDiskStatus()
3930       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3931                                (instance.name, msg))
3932
3933     self.feedback_fn("* migrating instance to %s" % target_node)
3934     time.sleep(10)
3935     result = self.rpc.call_instance_migrate(source_node, instance,
3936                                             self.nodes_ip[target_node],
3937                                             self.op.live)
3938     msg = result.RemoteFailMsg()
3939     if msg:
3940       logging.error("Instance migration failed, trying to revert"
3941                     " disk status: %s", msg)
3942       self._AbortMigration()
3943       self._RevertDiskStatus()
3944       raise errors.OpExecError("Could not migrate instance %s: %s" %
3945                                (instance.name, msg))
3946     time.sleep(10)
3947
3948     instance.primary_node = target_node
3949     # distribute new instance config to the other nodes
3950     self.cfg.Update(instance)
3951
3952     result = self.rpc.call_finalize_migration(target_node,
3953                                               instance,
3954                                               migration_info,
3955                                               True)
3956     msg = result.RemoteFailMsg()
3957     if msg:
3958       logging.error("Instance migration succeeded, but finalization failed:"
3959                     " %s" % msg)
3960       raise errors.OpExecError("Could not finalize instance migration: %s" %
3961                                msg)
3962
3963     self._EnsureSecondary(source_node)
3964     self._WaitUntilSync()
3965     self._GoStandalone()
3966     self._GoReconnect(False)
3967     self._WaitUntilSync()
3968
3969     self.feedback_fn("* done")
3970
3971   def Exec(self, feedback_fn):
3972     """Perform the migration.
3973
3974     """
3975     self.feedback_fn = feedback_fn
3976
3977     self.source_node = self.instance.primary_node
3978     self.target_node = self.instance.secondary_nodes[0]
3979     self.all_nodes = [self.source_node, self.target_node]
3980     self.nodes_ip = {
3981       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3982       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3983       }
3984     if self.op.cleanup:
3985       return self._ExecCleanup()
3986     else:
3987       return self._ExecMigration()
3988
3989
3990 def _CreateBlockDev(lu, node, instance, device, force_create,
3991                     info, force_open):
3992   """Create a tree of block devices on a given node.
3993
3994   If this device type has to be created on secondaries, create it and
3995   all its children.
3996
3997   If not, just recurse to children keeping the same 'force' value.
3998
3999   @param lu: the lu on whose behalf we execute
4000   @param node: the node on which to create the device
4001   @type instance: L{objects.Instance}
4002   @param instance: the instance which owns the device
4003   @type device: L{objects.Disk}
4004   @param device: the device to create
4005   @type force_create: boolean
4006   @param force_create: whether to force creation of this device; this
4007       will be change to True whenever we find a device which has
4008       CreateOnSecondary() attribute
4009   @param info: the extra 'metadata' we should attach to the device
4010       (this will be represented as a LVM tag)
4011   @type force_open: boolean
4012   @param force_open: this parameter will be passes to the
4013       L{backend.BlockdevCreate} function where it specifies
4014       whether we run on primary or not, and it affects both
4015       the child assembly and the device own Open() execution
4016
4017   """
4018   if device.CreateOnSecondary():
4019     force_create = True
4020
4021   if device.children:
4022     for child in device.children:
4023       _CreateBlockDev(lu, node, instance, child, force_create,
4024                       info, force_open)
4025
4026   if not force_create:
4027     return
4028
4029   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4030
4031
4032 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4033   """Create a single block device on a given node.
4034
4035   This will not recurse over children of the device, so they must be
4036   created in advance.
4037
4038   @param lu: the lu on whose behalf we execute
4039   @param node: the node on which to create the device
4040   @type instance: L{objects.Instance}
4041   @param instance: the instance which owns the device
4042   @type device: L{objects.Disk}
4043   @param device: the device to create
4044   @param info: the extra 'metadata' we should attach to the device
4045       (this will be represented as a LVM tag)
4046   @type force_open: boolean
4047   @param force_open: this parameter will be passes to the
4048       L{backend.BlockdevCreate} function where it specifies
4049       whether we run on primary or not, and it affects both
4050       the child assembly and the device own Open() execution
4051
4052   """
4053   lu.cfg.SetDiskID(device, node)
4054   result = lu.rpc.call_blockdev_create(node, device, device.size,
4055                                        instance.name, force_open, info)
4056   msg = result.RemoteFailMsg()
4057   if msg:
4058     raise errors.OpExecError("Can't create block device %s on"
4059                              " node %s for instance %s: %s" %
4060                              (device, node, instance.name, msg))
4061   if device.physical_id is None:
4062     device.physical_id = result.payload
4063
4064
4065 def _GenerateUniqueNames(lu, exts):
4066   """Generate a suitable LV name.
4067
4068   This will generate a logical volume name for the given instance.
4069
4070   """
4071   results = []
4072   for val in exts:
4073     new_id = lu.cfg.GenerateUniqueID()
4074     results.append("%s%s" % (new_id, val))
4075   return results
4076
4077
4078 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4079                          p_minor, s_minor):
4080   """Generate a drbd8 device complete with its children.
4081
4082   """
4083   port = lu.cfg.AllocatePort()
4084   vgname = lu.cfg.GetVGName()
4085   shared_secret = lu.cfg.GenerateDRBDSecret()
4086   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4087                           logical_id=(vgname, names[0]))
4088   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4089                           logical_id=(vgname, names[1]))
4090   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4091                           logical_id=(primary, secondary, port,
4092                                       p_minor, s_minor,
4093                                       shared_secret),
4094                           children=[dev_data, dev_meta],
4095                           iv_name=iv_name)
4096   return drbd_dev
4097
4098
4099 def _GenerateDiskTemplate(lu, template_name,
4100                           instance_name, primary_node,
4101                           secondary_nodes, disk_info,
4102                           file_storage_dir, file_driver,
4103                           base_index):
4104   """Generate the entire disk layout for a given template type.
4105
4106   """
4107   #TODO: compute space requirements
4108
4109   vgname = lu.cfg.GetVGName()
4110   disk_count = len(disk_info)
4111   disks = []
4112   if template_name == constants.DT_DISKLESS:
4113     pass
4114   elif template_name == constants.DT_PLAIN:
4115     if len(secondary_nodes) != 0:
4116       raise errors.ProgrammerError("Wrong template configuration")
4117
4118     names = _GenerateUniqueNames(lu, [".disk%d" % i
4119                                       for i in range(disk_count)])
4120     for idx, disk in enumerate(disk_info):
4121       disk_index = idx + base_index
4122       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4123                               logical_id=(vgname, names[idx]),
4124                               iv_name="disk/%d" % disk_index,
4125                               mode=disk["mode"])
4126       disks.append(disk_dev)
4127   elif template_name == constants.DT_DRBD8:
4128     if len(secondary_nodes) != 1:
4129       raise errors.ProgrammerError("Wrong template configuration")
4130     remote_node = secondary_nodes[0]
4131     minors = lu.cfg.AllocateDRBDMinor(
4132       [primary_node, remote_node] * len(disk_info), instance_name)
4133
4134     names = []
4135     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4136                                                for i in range(disk_count)]):
4137       names.append(lv_prefix + "_data")
4138       names.append(lv_prefix + "_meta")
4139     for idx, disk in enumerate(disk_info):
4140       disk_index = idx + base_index
4141       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4142                                       disk["size"], names[idx*2:idx*2+2],
4143                                       "disk/%d" % disk_index,
4144                                       minors[idx*2], minors[idx*2+1])
4145       disk_dev.mode = disk["mode"]
4146       disks.append(disk_dev)
4147   elif template_name == constants.DT_FILE:
4148     if len(secondary_nodes) != 0:
4149       raise errors.ProgrammerError("Wrong template configuration")
4150
4151     for idx, disk in enumerate(disk_info):
4152       disk_index = idx + base_index
4153       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4154                               iv_name="disk/%d" % disk_index,
4155                               logical_id=(file_driver,
4156                                           "%s/disk%d" % (file_storage_dir,
4157                                                          disk_index)),
4158                               mode=disk["mode"])
4159       disks.append(disk_dev)
4160   else:
4161     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4162   return disks
4163
4164
4165 def _GetInstanceInfoText(instance):
4166   """Compute that text that should be added to the disk's metadata.
4167
4168   """
4169   return "originstname+%s" % instance.name
4170
4171
4172 def _CreateDisks(lu, instance):
4173   """Create all disks for an instance.
4174
4175   This abstracts away some work from AddInstance.
4176
4177   @type lu: L{LogicalUnit}
4178   @param lu: the logical unit on whose behalf we execute
4179   @type instance: L{objects.Instance}
4180   @param instance: the instance whose disks we should create
4181   @rtype: boolean
4182   @return: the success of the creation
4183
4184   """
4185   info = _GetInstanceInfoText(instance)
4186   pnode = instance.primary_node
4187
4188   if instance.disk_template == constants.DT_FILE:
4189     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4190     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4191
4192     if result.failed or not result.data:
4193       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4194
4195     if not result.data[0]:
4196       raise errors.OpExecError("Failed to create directory '%s'" %
4197                                file_storage_dir)
4198
4199   # Note: this needs to be kept in sync with adding of disks in
4200   # LUSetInstanceParams
4201   for device in instance.disks:
4202     logging.info("Creating volume %s for instance %s",
4203                  device.iv_name, instance.name)
4204     #HARDCODE
4205     for node in instance.all_nodes:
4206       f_create = node == pnode
4207       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4208
4209
4210 def _RemoveDisks(lu, instance):
4211   """Remove all disks for an instance.
4212
4213   This abstracts away some work from `AddInstance()` and
4214   `RemoveInstance()`. Note that in case some of the devices couldn't
4215   be removed, the removal will continue with the other ones (compare
4216   with `_CreateDisks()`).
4217
4218   @type lu: L{LogicalUnit}
4219   @param lu: the logical unit on whose behalf we execute
4220   @type instance: L{objects.Instance}
4221   @param instance: the instance whose disks we should remove
4222   @rtype: boolean
4223   @return: the success of the removal
4224
4225   """
4226   logging.info("Removing block devices for instance %s", instance.name)
4227
4228   all_result = True
4229   for device in instance.disks:
4230     for node, disk in device.ComputeNodeTree(instance.primary_node):
4231       lu.cfg.SetDiskID(disk, node)
4232       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4233       if msg:
4234         lu.LogWarning("Could not remove block device %s on node %s,"
4235                       " continuing anyway: %s", device.iv_name, node, msg)
4236         all_result = False
4237
4238   if instance.disk_template == constants.DT_FILE:
4239     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4240     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4241                                                  file_storage_dir)
4242     if result.failed or not result.data:
4243       logging.error("Could not remove directory '%s'", file_storage_dir)
4244       all_result = False
4245
4246   return all_result
4247
4248
4249 def _ComputeDiskSize(disk_template, disks):
4250   """Compute disk size requirements in the volume group
4251
4252   """
4253   # Required free disk space as a function of disk and swap space
4254   req_size_dict = {
4255     constants.DT_DISKLESS: None,
4256     constants.DT_PLAIN: sum(d["size"] for d in disks),
4257     # 128 MB are added for drbd metadata for each disk
4258     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4259     constants.DT_FILE: None,
4260   }
4261
4262   if disk_template not in req_size_dict:
4263     raise errors.ProgrammerError("Disk template '%s' size requirement"
4264                                  " is unknown" %  disk_template)
4265
4266   return req_size_dict[disk_template]
4267
4268
4269 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4270   """Hypervisor parameter validation.
4271
4272   This function abstract the hypervisor parameter validation to be
4273   used in both instance create and instance modify.
4274
4275   @type lu: L{LogicalUnit}
4276   @param lu: the logical unit for which we check
4277   @type nodenames: list
4278   @param nodenames: the list of nodes on which we should check
4279   @type hvname: string
4280   @param hvname: the name of the hypervisor we should use
4281   @type hvparams: dict
4282   @param hvparams: the parameters which we need to check
4283   @raise errors.OpPrereqError: if the parameters are not valid
4284
4285   """
4286   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4287                                                   hvname,
4288                                                   hvparams)
4289   for node in nodenames:
4290     info = hvinfo[node]
4291     if info.offline:
4292       continue
4293     msg = info.RemoteFailMsg()
4294     if msg:
4295       raise errors.OpPrereqError("Hypervisor parameter validation"
4296                                  " failed on node %s: %s" % (node, msg))
4297
4298
4299 class LUCreateInstance(LogicalUnit):
4300   """Create an instance.
4301
4302   """
4303   HPATH = "instance-add"
4304   HTYPE = constants.HTYPE_INSTANCE
4305   _OP_REQP = ["instance_name", "disks", "disk_template",
4306               "mode", "start",
4307               "wait_for_sync", "ip_check", "nics",
4308               "hvparams", "beparams"]
4309   REQ_BGL = False
4310
4311   def _ExpandNode(self, node):
4312     """Expands and checks one node name.
4313
4314     """
4315     node_full = self.cfg.ExpandNodeName(node)
4316     if node_full is None:
4317       raise errors.OpPrereqError("Unknown node %s" % node)
4318     return node_full
4319
4320   def ExpandNames(self):
4321     """ExpandNames for CreateInstance.
4322
4323     Figure out the right locks for instance creation.
4324
4325     """
4326     self.needed_locks = {}
4327
4328     # set optional parameters to none if they don't exist
4329     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4330       if not hasattr(self.op, attr):
4331         setattr(self.op, attr, None)
4332
4333     # cheap checks, mostly valid constants given
4334
4335     # verify creation mode
4336     if self.op.mode not in (constants.INSTANCE_CREATE,
4337                             constants.INSTANCE_IMPORT):
4338       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4339                                  self.op.mode)
4340
4341     # disk template and mirror node verification
4342     if self.op.disk_template not in constants.DISK_TEMPLATES:
4343       raise errors.OpPrereqError("Invalid disk template name")
4344
4345     if self.op.hypervisor is None:
4346       self.op.hypervisor = self.cfg.GetHypervisorType()
4347
4348     cluster = self.cfg.GetClusterInfo()
4349     enabled_hvs = cluster.enabled_hypervisors
4350     if self.op.hypervisor not in enabled_hvs:
4351       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4352                                  " cluster (%s)" % (self.op.hypervisor,
4353                                   ",".join(enabled_hvs)))
4354
4355     # check hypervisor parameter syntax (locally)
4356     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4357     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4358                                   self.op.hvparams)
4359     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4360     hv_type.CheckParameterSyntax(filled_hvp)
4361     self.hv_full = filled_hvp
4362
4363     # fill and remember the beparams dict
4364     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4365     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4366                                     self.op.beparams)
4367
4368     #### instance parameters check
4369
4370     # instance name verification
4371     hostname1 = utils.HostInfo(self.op.instance_name)
4372     self.op.instance_name = instance_name = hostname1.name
4373
4374     # this is just a preventive check, but someone might still add this
4375     # instance in the meantime, and creation will fail at lock-add time
4376     if instance_name in self.cfg.GetInstanceList():
4377       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4378                                  instance_name)
4379
4380     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4381
4382     # NIC buildup
4383     self.nics = []
4384     for nic in self.op.nics:
4385       # ip validity checks
4386       ip = nic.get("ip", None)
4387       if ip is None or ip.lower() == "none":
4388         nic_ip = None
4389       elif ip.lower() == constants.VALUE_AUTO:
4390         nic_ip = hostname1.ip
4391       else:
4392         if not utils.IsValidIP(ip):
4393           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4394                                      " like a valid IP" % ip)
4395         nic_ip = ip
4396
4397       # MAC address verification
4398       mac = nic.get("mac", constants.VALUE_AUTO)
4399       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4400         if not utils.IsValidMac(mac.lower()):
4401           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4402                                      mac)
4403       # bridge verification
4404       bridge = nic.get("bridge", None)
4405       if bridge is None:
4406         bridge = self.cfg.GetDefBridge()
4407       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4408
4409     # disk checks/pre-build
4410     self.disks = []
4411     for disk in self.op.disks:
4412       mode = disk.get("mode", constants.DISK_RDWR)
4413       if mode not in constants.DISK_ACCESS_SET:
4414         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4415                                    mode)
4416       size = disk.get("size", None)
4417       if size is None:
4418         raise errors.OpPrereqError("Missing disk size")
4419       try:
4420         size = int(size)
4421       except ValueError:
4422         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4423       self.disks.append({"size": size, "mode": mode})
4424
4425     # used in CheckPrereq for ip ping check
4426     self.check_ip = hostname1.ip
4427
4428     # file storage checks
4429     if (self.op.file_driver and
4430         not self.op.file_driver in constants.FILE_DRIVER):
4431       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4432                                  self.op.file_driver)
4433
4434     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4435       raise errors.OpPrereqError("File storage directory path not absolute")
4436
4437     ### Node/iallocator related checks
4438     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4439       raise errors.OpPrereqError("One and only one of iallocator and primary"
4440                                  " node must be given")
4441
4442     if self.op.iallocator:
4443       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4444     else:
4445       self.op.pnode = self._ExpandNode(self.op.pnode)
4446       nodelist = [self.op.pnode]
4447       if self.op.snode is not None:
4448         self.op.snode = self._ExpandNode(self.op.snode)
4449         nodelist.append(self.op.snode)
4450       self.needed_locks[locking.LEVEL_NODE] = nodelist
4451
4452     # in case of import lock the source node too
4453     if self.op.mode == constants.INSTANCE_IMPORT:
4454       src_node = getattr(self.op, "src_node", None)
4455       src_path = getattr(self.op, "src_path", None)
4456
4457       if src_path is None:
4458         self.op.src_path = src_path = self.op.instance_name
4459
4460       if src_node is None:
4461         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4462         self.op.src_node = None
4463         if os.path.isabs(src_path):
4464           raise errors.OpPrereqError("Importing an instance from an absolute"
4465                                      " path requires a source node option.")
4466       else:
4467         self.op.src_node = src_node = self._ExpandNode(src_node)
4468         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4469           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4470         if not os.path.isabs(src_path):
4471           self.op.src_path = src_path = \
4472             os.path.join(constants.EXPORT_DIR, src_path)
4473
4474     else: # INSTANCE_CREATE
4475       if getattr(self.op, "os_type", None) is None:
4476         raise errors.OpPrereqError("No guest OS specified")
4477
4478   def _RunAllocator(self):
4479     """Run the allocator based on input opcode.
4480
4481     """
4482     nics = [n.ToDict() for n in self.nics]
4483     ial = IAllocator(self,
4484                      mode=constants.IALLOCATOR_MODE_ALLOC,
4485                      name=self.op.instance_name,
4486                      disk_template=self.op.disk_template,
4487                      tags=[],
4488                      os=self.op.os_type,
4489                      vcpus=self.be_full[constants.BE_VCPUS],
4490                      mem_size=self.be_full[constants.BE_MEMORY],
4491                      disks=self.disks,
4492                      nics=nics,
4493                      hypervisor=self.op.hypervisor,
4494                      )
4495
4496     ial.Run(self.op.iallocator)
4497
4498     if not ial.success:
4499       raise errors.OpPrereqError("Can't compute nodes using"
4500                                  " iallocator '%s': %s" % (self.op.iallocator,
4501                                                            ial.info))
4502     if len(ial.nodes) != ial.required_nodes:
4503       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4504                                  " of nodes (%s), required %s" %
4505                                  (self.op.iallocator, len(ial.nodes),
4506                                   ial.required_nodes))
4507     self.op.pnode = ial.nodes[0]
4508     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4509                  self.op.instance_name, self.op.iallocator,
4510                  ", ".join(ial.nodes))
4511     if ial.required_nodes == 2:
4512       self.op.snode = ial.nodes[1]
4513
4514   def BuildHooksEnv(self):
4515     """Build hooks env.
4516
4517     This runs on master, primary and secondary nodes of the instance.
4518
4519     """
4520     env = {
4521       "ADD_MODE": self.op.mode,
4522       }
4523     if self.op.mode == constants.INSTANCE_IMPORT:
4524       env["SRC_NODE"] = self.op.src_node
4525       env["SRC_PATH"] = self.op.src_path
4526       env["SRC_IMAGES"] = self.src_images
4527
4528     env.update(_BuildInstanceHookEnv(
4529       name=self.op.instance_name,
4530       primary_node=self.op.pnode,
4531       secondary_nodes=self.secondaries,
4532       status=self.op.start,
4533       os_type=self.op.os_type,
4534       memory=self.be_full[constants.BE_MEMORY],
4535       vcpus=self.be_full[constants.BE_VCPUS],
4536       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4537       disk_template=self.op.disk_template,
4538       disks=[(d["size"], d["mode"]) for d in self.disks],
4539       bep=self.be_full,
4540       hvp=self.hv_full,
4541       hypervisor=self.op.hypervisor,
4542     ))
4543
4544     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4545           self.secondaries)
4546     return env, nl, nl
4547
4548
4549   def CheckPrereq(self):
4550     """Check prerequisites.
4551
4552     """
4553     if (not self.cfg.GetVGName() and
4554         self.op.disk_template not in constants.DTS_NOT_LVM):
4555       raise errors.OpPrereqError("Cluster does not support lvm-based"
4556                                  " instances")
4557
4558     if self.op.mode == constants.INSTANCE_IMPORT:
4559       src_node = self.op.src_node
4560       src_path = self.op.src_path
4561
4562       if src_node is None:
4563         exp_list = self.rpc.call_export_list(
4564           self.acquired_locks[locking.LEVEL_NODE])
4565         found = False
4566         for node in exp_list:
4567           if not exp_list[node].failed and src_path in exp_list[node].data:
4568             found = True
4569             self.op.src_node = src_node = node
4570             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4571                                                        src_path)
4572             break
4573         if not found:
4574           raise errors.OpPrereqError("No export found for relative path %s" %
4575                                       src_path)
4576
4577       _CheckNodeOnline(self, src_node)
4578       result = self.rpc.call_export_info(src_node, src_path)
4579       result.Raise()
4580       if not result.data:
4581         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4582
4583       export_info = result.data
4584       if not export_info.has_section(constants.INISECT_EXP):
4585         raise errors.ProgrammerError("Corrupted export config")
4586
4587       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4588       if (int(ei_version) != constants.EXPORT_VERSION):
4589         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4590                                    (ei_version, constants.EXPORT_VERSION))
4591
4592       # Check that the new instance doesn't have less disks than the export
4593       instance_disks = len(self.disks)
4594       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4595       if instance_disks < export_disks:
4596         raise errors.OpPrereqError("Not enough disks to import."
4597                                    " (instance: %d, export: %d)" %
4598                                    (instance_disks, export_disks))
4599
4600       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4601       disk_images = []
4602       for idx in range(export_disks):
4603         option = 'disk%d_dump' % idx
4604         if export_info.has_option(constants.INISECT_INS, option):
4605           # FIXME: are the old os-es, disk sizes, etc. useful?
4606           export_name = export_info.get(constants.INISECT_INS, option)
4607           image = os.path.join(src_path, export_name)
4608           disk_images.append(image)
4609         else:
4610           disk_images.append(False)
4611
4612       self.src_images = disk_images
4613
4614       old_name = export_info.get(constants.INISECT_INS, 'name')
4615       # FIXME: int() here could throw a ValueError on broken exports
4616       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4617       if self.op.instance_name == old_name:
4618         for idx, nic in enumerate(self.nics):
4619           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4620             nic_mac_ini = 'nic%d_mac' % idx
4621             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4622
4623     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4624     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4625     if self.op.start and not self.op.ip_check:
4626       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4627                                  " adding an instance in start mode")
4628
4629     if self.op.ip_check:
4630       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4631         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4632                                    (self.check_ip, self.op.instance_name))
4633
4634     #### mac address generation
4635     # By generating here the mac address both the allocator and the hooks get
4636     # the real final mac address rather than the 'auto' or 'generate' value.
4637     # There is a race condition between the generation and the instance object
4638     # creation, which means that we know the mac is valid now, but we're not
4639     # sure it will be when we actually add the instance. If things go bad
4640     # adding the instance will abort because of a duplicate mac, and the
4641     # creation job will fail.
4642     for nic in self.nics:
4643       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4644         nic.mac = self.cfg.GenerateMAC()
4645
4646     #### allocator run
4647
4648     if self.op.iallocator is not None:
4649       self._RunAllocator()
4650
4651     #### node related checks
4652
4653     # check primary node
4654     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4655     assert self.pnode is not None, \
4656       "Cannot retrieve locked node %s" % self.op.pnode
4657     if pnode.offline:
4658       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4659                                  pnode.name)
4660     if pnode.drained:
4661       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4662                                  pnode.name)
4663
4664     self.secondaries = []
4665
4666     # mirror node verification
4667     if self.op.disk_template in constants.DTS_NET_MIRROR:
4668       if self.op.snode is None:
4669         raise errors.OpPrereqError("The networked disk templates need"
4670                                    " a mirror node")
4671       if self.op.snode == pnode.name:
4672         raise errors.OpPrereqError("The secondary node cannot be"
4673                                    " the primary node.")
4674       _CheckNodeOnline(self, self.op.snode)
4675       _CheckNodeNotDrained(self, self.op.snode)
4676       self.secondaries.append(self.op.snode)
4677
4678     nodenames = [pnode.name] + self.secondaries
4679
4680     req_size = _ComputeDiskSize(self.op.disk_template,
4681                                 self.disks)
4682
4683     # Check lv size requirements
4684     if req_size is not None:
4685       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4686                                          self.op.hypervisor)
4687       for node in nodenames:
4688         info = nodeinfo[node]
4689         info.Raise()
4690         info = info.data
4691         if not info:
4692           raise errors.OpPrereqError("Cannot get current information"
4693                                      " from node '%s'" % node)
4694         vg_free = info.get('vg_free', None)
4695         if not isinstance(vg_free, int):
4696           raise errors.OpPrereqError("Can't compute free disk space on"
4697                                      " node %s" % node)
4698         if req_size > info['vg_free']:
4699           raise errors.OpPrereqError("Not enough disk space on target node %s."
4700                                      " %d MB available, %d MB required" %
4701                                      (node, info['vg_free'], req_size))
4702
4703     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4704
4705     # os verification
4706     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4707     result.Raise()
4708     if not isinstance(result.data, objects.OS):
4709       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4710                                  " primary node"  % self.op.os_type)
4711
4712     # bridge check on primary node
4713     bridges = [n.bridge for n in self.nics]
4714     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4715     result.Raise()
4716     if not result.data:
4717       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4718                                  " exist on destination node '%s'" %
4719                                  (",".join(bridges), pnode.name))
4720
4721     # memory check on primary node
4722     if self.op.start:
4723       _CheckNodeFreeMemory(self, self.pnode.name,
4724                            "creating instance %s" % self.op.instance_name,
4725                            self.be_full[constants.BE_MEMORY],
4726                            self.op.hypervisor)
4727
4728   def Exec(self, feedback_fn):
4729     """Create and add the instance to the cluster.
4730
4731     """
4732     instance = self.op.instance_name
4733     pnode_name = self.pnode.name
4734
4735     ht_kind = self.op.hypervisor
4736     if ht_kind in constants.HTS_REQ_PORT:
4737       network_port = self.cfg.AllocatePort()
4738     else:
4739       network_port = None
4740
4741     ##if self.op.vnc_bind_address is None:
4742     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4743
4744     # this is needed because os.path.join does not accept None arguments
4745     if self.op.file_storage_dir is None:
4746       string_file_storage_dir = ""
4747     else:
4748       string_file_storage_dir = self.op.file_storage_dir
4749
4750     # build the full file storage dir path
4751     file_storage_dir = os.path.normpath(os.path.join(
4752                                         self.cfg.GetFileStorageDir(),
4753                                         string_file_storage_dir, instance))
4754
4755
4756     disks = _GenerateDiskTemplate(self,
4757                                   self.op.disk_template,
4758                                   instance, pnode_name,
4759                                   self.secondaries,
4760                                   self.disks,
4761                                   file_storage_dir,
4762                                   self.op.file_driver,
4763                                   0)
4764
4765     iobj = objects.Instance(name=instance, os=self.op.os_type,
4766                             primary_node=pnode_name,
4767                             nics=self.nics, disks=disks,
4768                             disk_template=self.op.disk_template,
4769                             admin_up=False,
4770                             network_port=network_port,
4771                             beparams=self.op.beparams,
4772                             hvparams=self.op.hvparams,
4773                             hypervisor=self.op.hypervisor,
4774                             )
4775
4776     feedback_fn("* creating instance disks...")
4777     try:
4778       _CreateDisks(self, iobj)
4779     except errors.OpExecError:
4780       self.LogWarning("Device creation failed, reverting...")
4781       try:
4782         _RemoveDisks(self, iobj)
4783       finally:
4784         self.cfg.ReleaseDRBDMinors(instance)
4785         raise
4786
4787     feedback_fn("adding instance %s to cluster config" % instance)
4788
4789     self.cfg.AddInstance(iobj)
4790     # Declare that we don't want to remove the instance lock anymore, as we've
4791     # added the instance to the config
4792     del self.remove_locks[locking.LEVEL_INSTANCE]
4793     # Unlock all the nodes
4794     if self.op.mode == constants.INSTANCE_IMPORT:
4795       nodes_keep = [self.op.src_node]
4796       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4797                        if node != self.op.src_node]
4798       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4799       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4800     else:
4801       self.context.glm.release(locking.LEVEL_NODE)
4802       del self.acquired_locks[locking.LEVEL_NODE]
4803
4804     if self.op.wait_for_sync:
4805       disk_abort = not _WaitForSync(self, iobj)
4806     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4807       # make sure the disks are not degraded (still sync-ing is ok)
4808       time.sleep(15)
4809       feedback_fn("* checking mirrors status")
4810       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4811     else:
4812       disk_abort = False
4813
4814     if disk_abort:
4815       _RemoveDisks(self, iobj)
4816       self.cfg.RemoveInstance(iobj.name)
4817       # Make sure the instance lock gets removed
4818       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4819       raise errors.OpExecError("There are some degraded disks for"
4820                                " this instance")
4821
4822     feedback_fn("creating os for instance %s on node %s" %
4823                 (instance, pnode_name))
4824
4825     if iobj.disk_template != constants.DT_DISKLESS:
4826       if self.op.mode == constants.INSTANCE_CREATE:
4827         feedback_fn("* running the instance OS create scripts...")
4828         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4829         msg = result.RemoteFailMsg()
4830         if msg:
4831           raise errors.OpExecError("Could not add os for instance %s"
4832                                    " on node %s: %s" %
4833                                    (instance, pnode_name, msg))
4834
4835       elif self.op.mode == constants.INSTANCE_IMPORT:
4836         feedback_fn("* running the instance OS import scripts...")
4837         src_node = self.op.src_node
4838         src_images = self.src_images
4839         cluster_name = self.cfg.GetClusterName()
4840         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4841                                                          src_node, src_images,
4842                                                          cluster_name)
4843         import_result.Raise()
4844         for idx, result in enumerate(import_result.data):
4845           if not result:
4846             self.LogWarning("Could not import the image %s for instance"
4847                             " %s, disk %d, on node %s" %
4848                             (src_images[idx], instance, idx, pnode_name))
4849       else:
4850         # also checked in the prereq part
4851         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4852                                      % self.op.mode)
4853
4854     if self.op.start:
4855       iobj.admin_up = True
4856       self.cfg.Update(iobj)
4857       logging.info("Starting instance %s on node %s", instance, pnode_name)
4858       feedback_fn("* starting instance...")
4859       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4860       msg = result.RemoteFailMsg()
4861       if msg:
4862         raise errors.OpExecError("Could not start instance: %s" % msg)
4863
4864
4865 class LUConnectConsole(NoHooksLU):
4866   """Connect to an instance's console.
4867
4868   This is somewhat special in that it returns the command line that
4869   you need to run on the master node in order to connect to the
4870   console.
4871
4872   """
4873   _OP_REQP = ["instance_name"]
4874   REQ_BGL = False
4875
4876   def ExpandNames(self):
4877     self._ExpandAndLockInstance()
4878
4879   def CheckPrereq(self):
4880     """Check prerequisites.
4881
4882     This checks that the instance is in the cluster.
4883
4884     """
4885     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4886     assert self.instance is not None, \
4887       "Cannot retrieve locked instance %s" % self.op.instance_name
4888     _CheckNodeOnline(self, self.instance.primary_node)
4889
4890   def Exec(self, feedback_fn):
4891     """Connect to the console of an instance
4892
4893     """
4894     instance = self.instance
4895     node = instance.primary_node
4896
4897     node_insts = self.rpc.call_instance_list([node],
4898                                              [instance.hypervisor])[node]
4899     node_insts.Raise()
4900
4901     if instance.name not in node_insts.data:
4902       raise errors.OpExecError("Instance %s is not running." % instance.name)
4903
4904     logging.debug("Connecting to console of %s on %s", instance.name, node)
4905
4906     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4907     cluster = self.cfg.GetClusterInfo()
4908     # beparams and hvparams are passed separately, to avoid editing the
4909     # instance and then saving the defaults in the instance itself.
4910     hvparams = cluster.FillHV(instance)
4911     beparams = cluster.FillBE(instance)
4912     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4913
4914     # build ssh cmdline
4915     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4916
4917
4918 class LUReplaceDisks(LogicalUnit):
4919   """Replace the disks of an instance.
4920
4921   """
4922   HPATH = "mirrors-replace"
4923   HTYPE = constants.HTYPE_INSTANCE
4924   _OP_REQP = ["instance_name", "mode", "disks"]
4925   REQ_BGL = False
4926
4927   def CheckArguments(self):
4928     if not hasattr(self.op, "remote_node"):
4929       self.op.remote_node = None
4930     if not hasattr(self.op, "iallocator"):
4931       self.op.iallocator = None
4932
4933     # check for valid parameter combination
4934     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4935     if self.op.mode == constants.REPLACE_DISK_CHG:
4936       if cnt == 2:
4937         raise errors.OpPrereqError("When changing the secondary either an"
4938                                    " iallocator script must be used or the"
4939                                    " new node given")
4940       elif cnt == 0:
4941         raise errors.OpPrereqError("Give either the iallocator or the new"
4942                                    " secondary, not both")
4943     else: # not replacing the secondary
4944       if cnt != 2:
4945         raise errors.OpPrereqError("The iallocator and new node options can"
4946                                    " be used only when changing the"
4947                                    " secondary node")
4948
4949   def ExpandNames(self):
4950     self._ExpandAndLockInstance()
4951
4952     if self.op.iallocator is not None:
4953       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4954     elif self.op.remote_node is not None:
4955       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4956       if remote_node is None:
4957         raise errors.OpPrereqError("Node '%s' not known" %
4958                                    self.op.remote_node)
4959       self.op.remote_node = remote_node
4960       # Warning: do not remove the locking of the new secondary here
4961       # unless DRBD8.AddChildren is changed to work in parallel;
4962       # currently it doesn't since parallel invocations of
4963       # FindUnusedMinor will conflict
4964       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4965       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4966     else:
4967       self.needed_locks[locking.LEVEL_NODE] = []
4968       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4969
4970   def DeclareLocks(self, level):
4971     # If we're not already locking all nodes in the set we have to declare the
4972     # instance's primary/secondary nodes.
4973     if (level == locking.LEVEL_NODE and
4974         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4975       self._LockInstancesNodes()
4976
4977   def _RunAllocator(self):
4978     """Compute a new secondary node using an IAllocator.
4979
4980     """
4981     ial = IAllocator(self,
4982                      mode=constants.IALLOCATOR_MODE_RELOC,
4983                      name=self.op.instance_name,
4984                      relocate_from=[self.sec_node])
4985
4986     ial.Run(self.op.iallocator)
4987
4988     if not ial.success:
4989       raise errors.OpPrereqError("Can't compute nodes using"
4990                                  " iallocator '%s': %s" % (self.op.iallocator,
4991                                                            ial.info))
4992     if len(ial.nodes) != ial.required_nodes:
4993       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4994                                  " of nodes (%s), required %s" %
4995                                  (len(ial.nodes), ial.required_nodes))
4996     self.op.remote_node = ial.nodes[0]
4997     self.LogInfo("Selected new secondary for the instance: %s",
4998                  self.op.remote_node)
4999
5000   def BuildHooksEnv(self):
5001     """Build hooks env.
5002
5003     This runs on the master, the primary and all the secondaries.
5004
5005     """
5006     env = {
5007       "MODE": self.op.mode,
5008       "NEW_SECONDARY": self.op.remote_node,
5009       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5010       }
5011     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5012     nl = [
5013       self.cfg.GetMasterNode(),
5014       self.instance.primary_node,
5015       ]
5016     if self.op.remote_node is not None:
5017       nl.append(self.op.remote_node)
5018     return env, nl, nl
5019
5020   def CheckPrereq(self):
5021     """Check prerequisites.
5022
5023     This checks that the instance is in the cluster.
5024
5025     """
5026     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5027     assert instance is not None, \
5028       "Cannot retrieve locked instance %s" % self.op.instance_name
5029     self.instance = instance
5030
5031     if instance.disk_template != constants.DT_DRBD8:
5032       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5033                                  " instances")
5034
5035     if len(instance.secondary_nodes) != 1:
5036       raise errors.OpPrereqError("The instance has a strange layout,"
5037                                  " expected one secondary but found %d" %
5038                                  len(instance.secondary_nodes))
5039
5040     self.sec_node = instance.secondary_nodes[0]
5041
5042     if self.op.iallocator is not None:
5043       self._RunAllocator()
5044
5045     remote_node = self.op.remote_node
5046     if remote_node is not None:
5047       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5048       assert self.remote_node_info is not None, \
5049         "Cannot retrieve locked node %s" % remote_node
5050     else:
5051       self.remote_node_info = None
5052     if remote_node == instance.primary_node:
5053       raise errors.OpPrereqError("The specified node is the primary node of"
5054                                  " the instance.")
5055     elif remote_node == self.sec_node:
5056       raise errors.OpPrereqError("The specified node is already the"
5057                                  " secondary node of the instance.")
5058
5059     if self.op.mode == constants.REPLACE_DISK_PRI:
5060       n1 = self.tgt_node = instance.primary_node
5061       n2 = self.oth_node = self.sec_node
5062     elif self.op.mode == constants.REPLACE_DISK_SEC:
5063       n1 = self.tgt_node = self.sec_node
5064       n2 = self.oth_node = instance.primary_node
5065     elif self.op.mode == constants.REPLACE_DISK_CHG:
5066       n1 = self.new_node = remote_node
5067       n2 = self.oth_node = instance.primary_node
5068       self.tgt_node = self.sec_node
5069       _CheckNodeNotDrained(self, remote_node)
5070     else:
5071       raise errors.ProgrammerError("Unhandled disk replace mode")
5072
5073     _CheckNodeOnline(self, n1)
5074     _CheckNodeOnline(self, n2)
5075
5076     if not self.op.disks:
5077       self.op.disks = range(len(instance.disks))
5078
5079     for disk_idx in self.op.disks:
5080       instance.FindDisk(disk_idx)
5081
5082   def _ExecD8DiskOnly(self, feedback_fn):
5083     """Replace a disk on the primary or secondary for dbrd8.
5084
5085     The algorithm for replace is quite complicated:
5086
5087       1. for each disk to be replaced:
5088
5089         1. create new LVs on the target node with unique names
5090         1. detach old LVs from the drbd device
5091         1. rename old LVs to name_replaced.<time_t>
5092         1. rename new LVs to old LVs
5093         1. attach the new LVs (with the old names now) to the drbd device
5094
5095       1. wait for sync across all devices
5096
5097       1. for each modified disk:
5098
5099         1. remove old LVs (which have the name name_replaces.<time_t>)
5100
5101     Failures are not very well handled.
5102
5103     """
5104     steps_total = 6
5105     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5106     instance = self.instance
5107     iv_names = {}
5108     vgname = self.cfg.GetVGName()
5109     # start of work
5110     cfg = self.cfg
5111     tgt_node = self.tgt_node
5112     oth_node = self.oth_node
5113
5114     # Step: check device activation
5115     self.proc.LogStep(1, steps_total, "check device existence")
5116     info("checking volume groups")
5117     my_vg = cfg.GetVGName()
5118     results = self.rpc.call_vg_list([oth_node, tgt_node])
5119     if not results:
5120       raise errors.OpExecError("Can't list volume groups on the nodes")
5121     for node in oth_node, tgt_node:
5122       res = results[node]
5123       if res.failed or not res.data or my_vg not in res.data:
5124         raise errors.OpExecError("Volume group '%s' not found on %s" %
5125                                  (my_vg, node))
5126     for idx, dev in enumerate(instance.disks):
5127       if idx not in self.op.disks:
5128         continue
5129       for node in tgt_node, oth_node:
5130         info("checking disk/%d on %s" % (idx, node))
5131         cfg.SetDiskID(dev, node)
5132         result = self.rpc.call_blockdev_find(node, dev)
5133         msg = result.RemoteFailMsg()
5134         if not msg and not result.payload:
5135           msg = "disk not found"
5136         if msg:
5137           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5138                                    (idx, node, msg))
5139
5140     # Step: check other node consistency
5141     self.proc.LogStep(2, steps_total, "check peer consistency")
5142     for idx, dev in enumerate(instance.disks):
5143       if idx not in self.op.disks:
5144         continue
5145       info("checking disk/%d consistency on %s" % (idx, oth_node))
5146       if not _CheckDiskConsistency(self, dev, oth_node,
5147                                    oth_node==instance.primary_node):
5148         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5149                                  " to replace disks on this node (%s)" %
5150                                  (oth_node, tgt_node))
5151
5152     # Step: create new storage
5153     self.proc.LogStep(3, steps_total, "allocate new storage")
5154     for idx, dev in enumerate(instance.disks):
5155       if idx not in self.op.disks:
5156         continue
5157       size = dev.size
5158       cfg.SetDiskID(dev, tgt_node)
5159       lv_names = [".disk%d_%s" % (idx, suf)
5160                   for suf in ["data", "meta"]]
5161       names = _GenerateUniqueNames(self, lv_names)
5162       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5163                              logical_id=(vgname, names[0]))
5164       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5165                              logical_id=(vgname, names[1]))
5166       new_lvs = [lv_data, lv_meta]
5167       old_lvs = dev.children
5168       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5169       info("creating new local storage on %s for %s" %
5170            (tgt_node, dev.iv_name))
5171       # we pass force_create=True to force the LVM creation
5172       for new_lv in new_lvs:
5173         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5174                         _GetInstanceInfoText(instance), False)
5175
5176     # Step: for each lv, detach+rename*2+attach
5177     self.proc.LogStep(4, steps_total, "change drbd configuration")
5178     for dev, old_lvs, new_lvs in iv_names.itervalues():
5179       info("detaching %s drbd from local storage" % dev.iv_name)
5180       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5181       result.Raise()
5182       if not result.data:
5183         raise errors.OpExecError("Can't detach drbd from local storage on node"
5184                                  " %s for device %s" % (tgt_node, dev.iv_name))
5185       #dev.children = []
5186       #cfg.Update(instance)
5187
5188       # ok, we created the new LVs, so now we know we have the needed
5189       # storage; as such, we proceed on the target node to rename
5190       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5191       # using the assumption that logical_id == physical_id (which in
5192       # turn is the unique_id on that node)
5193
5194       # FIXME(iustin): use a better name for the replaced LVs
5195       temp_suffix = int(time.time())
5196       ren_fn = lambda d, suff: (d.physical_id[0],
5197                                 d.physical_id[1] + "_replaced-%s" % suff)
5198       # build the rename list based on what LVs exist on the node
5199       rlist = []
5200       for to_ren in old_lvs:
5201         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5202         if not result.RemoteFailMsg() and result.payload:
5203           # device exists
5204           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5205
5206       info("renaming the old LVs on the target node")
5207       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5208       result.Raise()
5209       if not result.data:
5210         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5211       # now we rename the new LVs to the old LVs
5212       info("renaming the new LVs on the target node")
5213       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5214       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5215       result.Raise()
5216       if not result.data:
5217         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5218
5219       for old, new in zip(old_lvs, new_lvs):
5220         new.logical_id = old.logical_id
5221         cfg.SetDiskID(new, tgt_node)
5222
5223       for disk in old_lvs:
5224         disk.logical_id = ren_fn(disk, temp_suffix)
5225         cfg.SetDiskID(disk, tgt_node)
5226
5227       # now that the new lvs have the old name, we can add them to the device
5228       info("adding new mirror component on %s" % tgt_node)
5229       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5230       if result.failed or not result.data:
5231         for new_lv in new_lvs:
5232           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5233           if msg:
5234             warning("Can't rollback device %s: %s", dev, msg,
5235                     hint="cleanup manually the unused logical volumes")
5236         raise errors.OpExecError("Can't add local storage to drbd")
5237
5238       dev.children = new_lvs
5239       cfg.Update(instance)
5240
5241     # Step: wait for sync
5242
5243     # this can fail as the old devices are degraded and _WaitForSync
5244     # does a combined result over all disks, so we don't check its
5245     # return value
5246     self.proc.LogStep(5, steps_total, "sync devices")
5247     _WaitForSync(self, instance, unlock=True)
5248
5249     # so check manually all the devices
5250     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5251       cfg.SetDiskID(dev, instance.primary_node)
5252       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5253       msg = result.RemoteFailMsg()
5254       if not msg and not result.payload:
5255         msg = "disk not found"
5256       if msg:
5257         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5258                                  (name, msg))
5259       if result.payload[5]:
5260         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5261
5262     # Step: remove old storage
5263     self.proc.LogStep(6, steps_total, "removing old storage")
5264     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5265       info("remove logical volumes for %s" % name)
5266       for lv in old_lvs:
5267         cfg.SetDiskID(lv, tgt_node)
5268         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5269         if msg:
5270           warning("Can't remove old LV: %s" % msg,
5271                   hint="manually remove unused LVs")
5272           continue
5273
5274   def _ExecD8Secondary(self, feedback_fn):
5275     """Replace the secondary node for drbd8.
5276
5277     The algorithm for replace is quite complicated:
5278       - for all disks of the instance:
5279         - create new LVs on the new node with same names
5280         - shutdown the drbd device on the old secondary
5281         - disconnect the drbd network on the primary
5282         - create the drbd device on the new secondary
5283         - network attach the drbd on the primary, using an artifice:
5284           the drbd code for Attach() will connect to the network if it
5285           finds a device which is connected to the good local disks but
5286           not network enabled
5287       - wait for sync across all devices
5288       - remove all disks from the old secondary
5289
5290     Failures are not very well handled.
5291
5292     """
5293     steps_total = 6
5294     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5295     instance = self.instance
5296     iv_names = {}
5297     # start of work
5298     cfg = self.cfg
5299     old_node = self.tgt_node
5300     new_node = self.new_node
5301     pri_node = instance.primary_node
5302     nodes_ip = {
5303       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5304       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5305       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5306       }
5307
5308     # Step: check device activation
5309     self.proc.LogStep(1, steps_total, "check device existence")
5310     info("checking volume groups")
5311     my_vg = cfg.GetVGName()
5312     results = self.rpc.call_vg_list([pri_node, new_node])
5313     for node in pri_node, new_node:
5314       res = results[node]
5315       if res.failed or not res.data or my_vg not in res.data:
5316         raise errors.OpExecError("Volume group '%s' not found on %s" %
5317                                  (my_vg, node))
5318     for idx, dev in enumerate(instance.disks):
5319       if idx not in self.op.disks:
5320         continue
5321       info("checking disk/%d on %s" % (idx, pri_node))
5322       cfg.SetDiskID(dev, pri_node)
5323       result = self.rpc.call_blockdev_find(pri_node, dev)
5324       msg = result.RemoteFailMsg()
5325       if not msg and not result.payload:
5326         msg = "disk not found"
5327       if msg:
5328         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5329                                  (idx, pri_node, msg))
5330
5331     # Step: check other node consistency
5332     self.proc.LogStep(2, steps_total, "check peer consistency")
5333     for idx, dev in enumerate(instance.disks):
5334       if idx not in self.op.disks:
5335         continue
5336       info("checking disk/%d consistency on %s" % (idx, pri_node))
5337       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5338         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5339                                  " unsafe to replace the secondary" %
5340                                  pri_node)
5341
5342     # Step: create new storage
5343     self.proc.LogStep(3, steps_total, "allocate new storage")
5344     for idx, dev in enumerate(instance.disks):
5345       info("adding new local storage on %s for disk/%d" %
5346            (new_node, idx))
5347       # we pass force_create=True to force LVM creation
5348       for new_lv in dev.children:
5349         _CreateBlockDev(self, new_node, instance, new_lv, True,
5350                         _GetInstanceInfoText(instance), False)
5351
5352     # Step 4: dbrd minors and drbd setups changes
5353     # after this, we must manually remove the drbd minors on both the
5354     # error and the success paths
5355     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5356                                    instance.name)
5357     logging.debug("Allocated minors %s" % (minors,))
5358     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5359     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5360       size = dev.size
5361       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5362       # create new devices on new_node; note that we create two IDs:
5363       # one without port, so the drbd will be activated without
5364       # networking information on the new node at this stage, and one
5365       # with network, for the latter activation in step 4
5366       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5367       if pri_node == o_node1:
5368         p_minor = o_minor1
5369       else:
5370         p_minor = o_minor2
5371
5372       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5373       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5374
5375       iv_names[idx] = (dev, dev.children, new_net_id)
5376       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5377                     new_net_id)
5378       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5379                               logical_id=new_alone_id,
5380                               children=dev.children)
5381       try:
5382         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5383                               _GetInstanceInfoText(instance), False)
5384       except errors.GenericError:
5385         self.cfg.ReleaseDRBDMinors(instance.name)
5386         raise
5387
5388     for idx, dev in enumerate(instance.disks):
5389       # we have new devices, shutdown the drbd on the old secondary
5390       info("shutting down drbd for disk/%d on old node" % idx)
5391       cfg.SetDiskID(dev, old_node)
5392       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5393       if msg:
5394         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5395                 (idx, msg),
5396                 hint="Please cleanup this device manually as soon as possible")
5397
5398     info("detaching primary drbds from the network (=> standalone)")
5399     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5400                                                instance.disks)[pri_node]
5401
5402     msg = result.RemoteFailMsg()
5403     if msg:
5404       # detaches didn't succeed (unlikely)
5405       self.cfg.ReleaseDRBDMinors(instance.name)
5406       raise errors.OpExecError("Can't detach the disks from the network on"
5407                                " old node: %s" % (msg,))
5408
5409     # if we managed to detach at least one, we update all the disks of
5410     # the instance to point to the new secondary
5411     info("updating instance configuration")
5412     for dev, _, new_logical_id in iv_names.itervalues():
5413       dev.logical_id = new_logical_id
5414       cfg.SetDiskID(dev, pri_node)
5415     cfg.Update(instance)
5416
5417     # and now perform the drbd attach
5418     info("attaching primary drbds to new secondary (standalone => connected)")
5419     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5420                                            instance.disks, instance.name,
5421                                            False)
5422     for to_node, to_result in result.items():
5423       msg = to_result.RemoteFailMsg()
5424       if msg:
5425         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5426                 hint="please do a gnt-instance info to see the"
5427                 " status of disks")
5428
5429     # this can fail as the old devices are degraded and _WaitForSync
5430     # does a combined result over all disks, so we don't check its
5431     # return value
5432     self.proc.LogStep(5, steps_total, "sync devices")
5433     _WaitForSync(self, instance, unlock=True)
5434
5435     # so check manually all the devices
5436     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5437       cfg.SetDiskID(dev, pri_node)
5438       result = self.rpc.call_blockdev_find(pri_node, dev)
5439       msg = result.RemoteFailMsg()
5440       if not msg and not result.payload:
5441         msg = "disk not found"
5442       if msg:
5443         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5444                                  (idx, msg))
5445       if result.payload[5]:
5446         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5447
5448     self.proc.LogStep(6, steps_total, "removing old storage")
5449     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5450       info("remove logical volumes for disk/%d" % idx)
5451       for lv in old_lvs:
5452         cfg.SetDiskID(lv, old_node)
5453         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5454         if msg:
5455           warning("Can't remove LV on old secondary: %s", msg,
5456                   hint="Cleanup stale volumes by hand")
5457
5458   def Exec(self, feedback_fn):
5459     """Execute disk replacement.
5460
5461     This dispatches the disk replacement to the appropriate handler.
5462
5463     """
5464     instance = self.instance
5465
5466     # Activate the instance disks if we're replacing them on a down instance
5467     if not instance.admin_up:
5468       _StartInstanceDisks(self, instance, True)
5469
5470     if self.op.mode == constants.REPLACE_DISK_CHG:
5471       fn = self._ExecD8Secondary
5472     else:
5473       fn = self._ExecD8DiskOnly
5474
5475     ret = fn(feedback_fn)
5476
5477     # Deactivate the instance disks if we're replacing them on a down instance
5478     if not instance.admin_up:
5479       _SafeShutdownInstanceDisks(self, instance)
5480
5481     return ret
5482
5483
5484 class LUGrowDisk(LogicalUnit):
5485   """Grow a disk of an instance.
5486
5487   """
5488   HPATH = "disk-grow"
5489   HTYPE = constants.HTYPE_INSTANCE
5490   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5491   REQ_BGL = False
5492
5493   def ExpandNames(self):
5494     self._ExpandAndLockInstance()
5495     self.needed_locks[locking.LEVEL_NODE] = []
5496     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5497
5498   def DeclareLocks(self, level):
5499     if level == locking.LEVEL_NODE:
5500       self._LockInstancesNodes()
5501
5502   def BuildHooksEnv(self):
5503     """Build hooks env.
5504
5505     This runs on the master, the primary and all the secondaries.
5506
5507     """
5508     env = {
5509       "DISK": self.op.disk,
5510       "AMOUNT": self.op.amount,
5511       }
5512     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5513     nl = [
5514       self.cfg.GetMasterNode(),
5515       self.instance.primary_node,
5516       ]
5517     return env, nl, nl
5518
5519   def CheckPrereq(self):
5520     """Check prerequisites.
5521
5522     This checks that the instance is in the cluster.
5523
5524     """
5525     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5526     assert instance is not None, \
5527       "Cannot retrieve locked instance %s" % self.op.instance_name
5528     nodenames = list(instance.all_nodes)
5529     for node in nodenames:
5530       _CheckNodeOnline(self, node)
5531
5532
5533     self.instance = instance
5534
5535     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5536       raise errors.OpPrereqError("Instance's disk layout does not support"
5537                                  " growing.")
5538
5539     self.disk = instance.FindDisk(self.op.disk)
5540
5541     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5542                                        instance.hypervisor)
5543     for node in nodenames:
5544       info = nodeinfo[node]
5545       if info.failed or not info.data:
5546         raise errors.OpPrereqError("Cannot get current information"
5547                                    " from node '%s'" % node)
5548       vg_free = info.data.get('vg_free', None)
5549       if not isinstance(vg_free, int):
5550         raise errors.OpPrereqError("Can't compute free disk space on"
5551                                    " node %s" % node)
5552       if self.op.amount > vg_free:
5553         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5554                                    " %d MiB available, %d MiB required" %
5555                                    (node, vg_free, self.op.amount))
5556
5557   def Exec(self, feedback_fn):
5558     """Execute disk grow.
5559
5560     """
5561     instance = self.instance
5562     disk = self.disk
5563     for node in instance.all_nodes:
5564       self.cfg.SetDiskID(disk, node)
5565       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5566       msg = result.RemoteFailMsg()
5567       if msg:
5568         raise errors.OpExecError("Grow request failed to node %s: %s" %
5569                                  (node, msg))
5570     disk.RecordGrow(self.op.amount)
5571     self.cfg.Update(instance)
5572     if self.op.wait_for_sync:
5573       disk_abort = not _WaitForSync(self, instance)
5574       if disk_abort:
5575         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5576                              " status.\nPlease check the instance.")
5577
5578
5579 class LUQueryInstanceData(NoHooksLU):
5580   """Query runtime instance data.
5581
5582   """
5583   _OP_REQP = ["instances", "static"]
5584   REQ_BGL = False
5585
5586   def ExpandNames(self):
5587     self.needed_locks = {}
5588     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5589
5590     if not isinstance(self.op.instances, list):
5591       raise errors.OpPrereqError("Invalid argument type 'instances'")
5592
5593     if self.op.instances:
5594       self.wanted_names = []
5595       for name in self.op.instances:
5596         full_name = self.cfg.ExpandInstanceName(name)
5597         if full_name is None:
5598           raise errors.OpPrereqError("Instance '%s' not known" % name)
5599         self.wanted_names.append(full_name)
5600       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5601     else:
5602       self.wanted_names = None
5603       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5604
5605     self.needed_locks[locking.LEVEL_NODE] = []
5606     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5607
5608   def DeclareLocks(self, level):
5609     if level == locking.LEVEL_NODE:
5610       self._LockInstancesNodes()
5611
5612   def CheckPrereq(self):
5613     """Check prerequisites.
5614
5615     This only checks the optional instance list against the existing names.
5616
5617     """
5618     if self.wanted_names is None:
5619       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5620
5621     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5622                              in self.wanted_names]
5623     return
5624
5625   def _ComputeDiskStatus(self, instance, snode, dev):
5626     """Compute block device status.
5627
5628     """
5629     static = self.op.static
5630     if not static:
5631       self.cfg.SetDiskID(dev, instance.primary_node)
5632       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5633       if dev_pstatus.offline:
5634         dev_pstatus = None
5635       else:
5636         msg = dev_pstatus.RemoteFailMsg()
5637         if msg:
5638           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5639                                    (instance.name, msg))
5640         dev_pstatus = dev_pstatus.payload
5641     else:
5642       dev_pstatus = None
5643
5644     if dev.dev_type in constants.LDS_DRBD:
5645       # we change the snode then (otherwise we use the one passed in)
5646       if dev.logical_id[0] == instance.primary_node:
5647         snode = dev.logical_id[1]
5648       else:
5649         snode = dev.logical_id[0]
5650
5651     if snode and not static:
5652       self.cfg.SetDiskID(dev, snode)
5653       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5654       if dev_sstatus.offline:
5655         dev_sstatus = None
5656       else:
5657         msg = dev_sstatus.RemoteFailMsg()
5658         if msg:
5659           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5660                                    (instance.name, msg))
5661         dev_sstatus = dev_sstatus.payload
5662     else:
5663       dev_sstatus = None
5664
5665     if dev.children:
5666       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5667                       for child in dev.children]
5668     else:
5669       dev_children = []
5670
5671     data = {
5672       "iv_name": dev.iv_name,
5673       "dev_type": dev.dev_type,
5674       "logical_id": dev.logical_id,
5675       "physical_id": dev.physical_id,
5676       "pstatus": dev_pstatus,
5677       "sstatus": dev_sstatus,
5678       "children": dev_children,
5679       "mode": dev.mode,
5680       }
5681
5682     return data
5683
5684   def Exec(self, feedback_fn):
5685     """Gather and return data"""
5686     result = {}
5687
5688     cluster = self.cfg.GetClusterInfo()
5689
5690     for instance in self.wanted_instances:
5691       if not self.op.static:
5692         remote_info = self.rpc.call_instance_info(instance.primary_node,
5693                                                   instance.name,
5694                                                   instance.hypervisor)
5695         remote_info.Raise()
5696         remote_info = remote_info.data
5697         if remote_info and "state" in remote_info:
5698           remote_state = "up"
5699         else:
5700           remote_state = "down"
5701       else:
5702         remote_state = None
5703       if instance.admin_up:
5704         config_state = "up"
5705       else:
5706         config_state = "down"
5707
5708       disks = [self._ComputeDiskStatus(instance, None, device)
5709                for device in instance.disks]
5710
5711       idict = {
5712         "name": instance.name,
5713         "config_state": config_state,
5714         "run_state": remote_state,
5715         "pnode": instance.primary_node,
5716         "snodes": instance.secondary_nodes,
5717         "os": instance.os,
5718         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5719         "disks": disks,
5720         "hypervisor": instance.hypervisor,
5721         "network_port": instance.network_port,
5722         "hv_instance": instance.hvparams,
5723         "hv_actual": cluster.FillHV(instance),
5724         "be_instance": instance.beparams,
5725         "be_actual": cluster.FillBE(instance),
5726         }
5727
5728       result[instance.name] = idict
5729
5730     return result
5731
5732
5733 class LUSetInstanceParams(LogicalUnit):
5734   """Modifies an instances's parameters.
5735
5736   """
5737   HPATH = "instance-modify"
5738   HTYPE = constants.HTYPE_INSTANCE
5739   _OP_REQP = ["instance_name"]
5740   REQ_BGL = False
5741
5742   def CheckArguments(self):
5743     if not hasattr(self.op, 'nics'):
5744       self.op.nics = []
5745     if not hasattr(self.op, 'disks'):
5746       self.op.disks = []
5747     if not hasattr(self.op, 'beparams'):
5748       self.op.beparams = {}
5749     if not hasattr(self.op, 'hvparams'):
5750       self.op.hvparams = {}
5751     self.op.force = getattr(self.op, "force", False)
5752     if not (self.op.nics or self.op.disks or
5753             self.op.hvparams or self.op.beparams):
5754       raise errors.OpPrereqError("No changes submitted")
5755
5756     # Disk validation
5757     disk_addremove = 0
5758     for disk_op, disk_dict in self.op.disks:
5759       if disk_op == constants.DDM_REMOVE:
5760         disk_addremove += 1
5761         continue
5762       elif disk_op == constants.DDM_ADD:
5763         disk_addremove += 1
5764       else:
5765         if not isinstance(disk_op, int):
5766           raise errors.OpPrereqError("Invalid disk index")
5767       if disk_op == constants.DDM_ADD:
5768         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5769         if mode not in constants.DISK_ACCESS_SET:
5770           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5771         size = disk_dict.get('size', None)
5772         if size is None:
5773           raise errors.OpPrereqError("Required disk parameter size missing")
5774         try:
5775           size = int(size)
5776         except ValueError, err:
5777           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5778                                      str(err))
5779         disk_dict['size'] = size
5780       else:
5781         # modification of disk
5782         if 'size' in disk_dict:
5783           raise errors.OpPrereqError("Disk size change not possible, use"
5784                                      " grow-disk")
5785
5786     if disk_addremove > 1:
5787       raise errors.OpPrereqError("Only one disk add or remove operation"
5788                                  " supported at a time")
5789
5790     # NIC validation
5791     nic_addremove = 0
5792     for nic_op, nic_dict in self.op.nics:
5793       if nic_op == constants.DDM_REMOVE:
5794         nic_addremove += 1
5795         continue
5796       elif nic_op == constants.DDM_ADD:
5797         nic_addremove += 1
5798       else:
5799         if not isinstance(nic_op, int):
5800           raise errors.OpPrereqError("Invalid nic index")
5801
5802       # nic_dict should be a dict
5803       nic_ip = nic_dict.get('ip', None)
5804       if nic_ip is not None:
5805         if nic_ip.lower() == constants.VALUE_NONE:
5806           nic_dict['ip'] = None
5807         else:
5808           if not utils.IsValidIP(nic_ip):
5809             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5810
5811       if nic_op == constants.DDM_ADD:
5812         nic_bridge = nic_dict.get('bridge', None)
5813         if nic_bridge is None:
5814           nic_dict['bridge'] = self.cfg.GetDefBridge()
5815         nic_mac = nic_dict.get('mac', None)
5816         if nic_mac is None:
5817           nic_dict['mac'] = constants.VALUE_AUTO
5818
5819       if 'mac' in nic_dict:
5820         nic_mac = nic_dict['mac']
5821         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5822           if not utils.IsValidMac(nic_mac):
5823             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5824         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5825           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5826                                      " modifying an existing nic")
5827
5828     if nic_addremove > 1:
5829       raise errors.OpPrereqError("Only one NIC add or remove operation"
5830                                  " supported at a time")
5831
5832   def ExpandNames(self):
5833     self._ExpandAndLockInstance()
5834     self.needed_locks[locking.LEVEL_NODE] = []
5835     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5836
5837   def DeclareLocks(self, level):
5838     if level == locking.LEVEL_NODE:
5839       self._LockInstancesNodes()
5840
5841   def BuildHooksEnv(self):
5842     """Build hooks env.
5843
5844     This runs on the master, primary and secondaries.
5845
5846     """
5847     args = dict()
5848     if constants.BE_MEMORY in self.be_new:
5849       args['memory'] = self.be_new[constants.BE_MEMORY]
5850     if constants.BE_VCPUS in self.be_new:
5851       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5852     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5853     # information at all.
5854     if self.op.nics:
5855       args['nics'] = []
5856       nic_override = dict(self.op.nics)
5857       for idx, nic in enumerate(self.instance.nics):
5858         if idx in nic_override:
5859           this_nic_override = nic_override[idx]
5860         else:
5861           this_nic_override = {}
5862         if 'ip' in this_nic_override:
5863           ip = this_nic_override['ip']
5864         else:
5865           ip = nic.ip
5866         if 'bridge' in this_nic_override:
5867           bridge = this_nic_override['bridge']
5868         else:
5869           bridge = nic.bridge
5870         if 'mac' in this_nic_override:
5871           mac = this_nic_override['mac']
5872         else:
5873           mac = nic.mac
5874         args['nics'].append((ip, bridge, mac))
5875       if constants.DDM_ADD in nic_override:
5876         ip = nic_override[constants.DDM_ADD].get('ip', None)
5877         bridge = nic_override[constants.DDM_ADD]['bridge']
5878         mac = nic_override[constants.DDM_ADD]['mac']
5879         args['nics'].append((ip, bridge, mac))
5880       elif constants.DDM_REMOVE in nic_override:
5881         del args['nics'][-1]
5882
5883     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5884     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5885     return env, nl, nl
5886
5887   def CheckPrereq(self):
5888     """Check prerequisites.
5889
5890     This only checks the instance list against the existing names.
5891
5892     """
5893     force = self.force = self.op.force
5894
5895     # checking the new params on the primary/secondary nodes
5896
5897     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5898     assert self.instance is not None, \
5899       "Cannot retrieve locked instance %s" % self.op.instance_name
5900     pnode = instance.primary_node
5901     nodelist = list(instance.all_nodes)
5902
5903     # hvparams processing
5904     if self.op.hvparams:
5905       i_hvdict = copy.deepcopy(instance.hvparams)
5906       for key, val in self.op.hvparams.iteritems():
5907         if val == constants.VALUE_DEFAULT:
5908           try:
5909             del i_hvdict[key]
5910           except KeyError:
5911             pass
5912         else:
5913           i_hvdict[key] = val
5914       cluster = self.cfg.GetClusterInfo()
5915       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5916       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5917                                 i_hvdict)
5918       # local check
5919       hypervisor.GetHypervisor(
5920         instance.hypervisor).CheckParameterSyntax(hv_new)
5921       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5922       self.hv_new = hv_new # the new actual values
5923       self.hv_inst = i_hvdict # the new dict (without defaults)
5924     else:
5925       self.hv_new = self.hv_inst = {}
5926
5927     # beparams processing
5928     if self.op.beparams:
5929       i_bedict = copy.deepcopy(instance.beparams)
5930       for key, val in self.op.beparams.iteritems():
5931         if val == constants.VALUE_DEFAULT:
5932           try:
5933             del i_bedict[key]
5934           except KeyError:
5935             pass
5936         else:
5937           i_bedict[key] = val
5938       cluster = self.cfg.GetClusterInfo()
5939       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5940       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5941                                 i_bedict)
5942       self.be_new = be_new # the new actual values
5943       self.be_inst = i_bedict # the new dict (without defaults)
5944     else:
5945       self.be_new = self.be_inst = {}
5946
5947     self.warn = []
5948
5949     if constants.BE_MEMORY in self.op.beparams and not self.force:
5950       mem_check_list = [pnode]
5951       if be_new[constants.BE_AUTO_BALANCE]:
5952         # either we changed auto_balance to yes or it was from before
5953         mem_check_list.extend(instance.secondary_nodes)
5954       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5955                                                   instance.hypervisor)
5956       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5957                                          instance.hypervisor)
5958       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5959         # Assume the primary node is unreachable and go ahead
5960         self.warn.append("Can't get info from primary node %s" % pnode)
5961       else:
5962         if not instance_info.failed and instance_info.data:
5963           current_mem = int(instance_info.data['memory'])
5964         else:
5965           # Assume instance not running
5966           # (there is a slight race condition here, but it's not very probable,
5967           # and we have no other way to check)
5968           current_mem = 0
5969         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5970                     nodeinfo[pnode].data['memory_free'])
5971         if miss_mem > 0:
5972           raise errors.OpPrereqError("This change will prevent the instance"
5973                                      " from starting, due to %d MB of memory"
5974                                      " missing on its primary node" % miss_mem)
5975
5976       if be_new[constants.BE_AUTO_BALANCE]:
5977         for node, nres in nodeinfo.iteritems():
5978           if node not in instance.secondary_nodes:
5979             continue
5980           if nres.failed or not isinstance(nres.data, dict):
5981             self.warn.append("Can't get info from secondary node %s" % node)
5982           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5983             self.warn.append("Not enough memory to failover instance to"
5984                              " secondary node %s" % node)
5985
5986     # NIC processing
5987     for nic_op, nic_dict in self.op.nics:
5988       if nic_op == constants.DDM_REMOVE:
5989         if not instance.nics:
5990           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5991         continue
5992       if nic_op != constants.DDM_ADD:
5993         # an existing nic
5994         if nic_op < 0 or nic_op >= len(instance.nics):
5995           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5996                                      " are 0 to %d" %
5997                                      (nic_op, len(instance.nics)))
5998       if 'bridge' in nic_dict:
5999         nic_bridge = nic_dict['bridge']
6000         if nic_bridge is None:
6001           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6002         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6003           msg = ("Bridge '%s' doesn't exist on one of"
6004                  " the instance nodes" % nic_bridge)
6005           if self.force:
6006             self.warn.append(msg)
6007           else:
6008             raise errors.OpPrereqError(msg)
6009       if 'mac' in nic_dict:
6010         nic_mac = nic_dict['mac']
6011         if nic_mac is None:
6012           raise errors.OpPrereqError('Cannot set the nic mac to None')
6013         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6014           # otherwise generate the mac
6015           nic_dict['mac'] = self.cfg.GenerateMAC()
6016         else:
6017           # or validate/reserve the current one
6018           if self.cfg.IsMacInUse(nic_mac):
6019             raise errors.OpPrereqError("MAC address %s already in use"
6020                                        " in cluster" % nic_mac)
6021
6022     # DISK processing
6023     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6024       raise errors.OpPrereqError("Disk operations not supported for"
6025                                  " diskless instances")
6026     for disk_op, disk_dict in self.op.disks:
6027       if disk_op == constants.DDM_REMOVE:
6028         if len(instance.disks) == 1:
6029           raise errors.OpPrereqError("Cannot remove the last disk of"
6030                                      " an instance")
6031         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6032         ins_l = ins_l[pnode]
6033         if ins_l.failed or not isinstance(ins_l.data, list):
6034           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6035         if instance.name in ins_l.data:
6036           raise errors.OpPrereqError("Instance is running, can't remove"
6037                                      " disks.")
6038
6039       if (disk_op == constants.DDM_ADD and
6040           len(instance.nics) >= constants.MAX_DISKS):
6041         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6042                                    " add more" % constants.MAX_DISKS)
6043       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6044         # an existing disk
6045         if disk_op < 0 or disk_op >= len(instance.disks):
6046           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6047                                      " are 0 to %d" %
6048                                      (disk_op, len(instance.disks)))
6049
6050     return
6051
6052   def Exec(self, feedback_fn):
6053     """Modifies an instance.
6054
6055     All parameters take effect only at the next restart of the instance.
6056
6057     """
6058     # Process here the warnings from CheckPrereq, as we don't have a
6059     # feedback_fn there.
6060     for warn in self.warn:
6061       feedback_fn("WARNING: %s" % warn)
6062
6063     result = []
6064     instance = self.instance
6065     # disk changes
6066     for disk_op, disk_dict in self.op.disks:
6067       if disk_op == constants.DDM_REMOVE:
6068         # remove the last disk
6069         device = instance.disks.pop()
6070         device_idx = len(instance.disks)
6071         for node, disk in device.ComputeNodeTree(instance.primary_node):
6072           self.cfg.SetDiskID(disk, node)
6073           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6074           if msg:
6075             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6076                             " continuing anyway", device_idx, node, msg)
6077         result.append(("disk/%d" % device_idx, "remove"))
6078       elif disk_op == constants.DDM_ADD:
6079         # add a new disk
6080         if instance.disk_template == constants.DT_FILE:
6081           file_driver, file_path = instance.disks[0].logical_id
6082           file_path = os.path.dirname(file_path)
6083         else:
6084           file_driver = file_path = None
6085         disk_idx_base = len(instance.disks)
6086         new_disk = _GenerateDiskTemplate(self,
6087                                          instance.disk_template,
6088                                          instance.name, instance.primary_node,
6089                                          instance.secondary_nodes,
6090                                          [disk_dict],
6091                                          file_path,
6092                                          file_driver,
6093                                          disk_idx_base)[0]
6094         instance.disks.append(new_disk)
6095         info = _GetInstanceInfoText(instance)
6096
6097         logging.info("Creating volume %s for instance %s",
6098                      new_disk.iv_name, instance.name)
6099         # Note: this needs to be kept in sync with _CreateDisks
6100         #HARDCODE
6101         for node in instance.all_nodes:
6102           f_create = node == instance.primary_node
6103           try:
6104             _CreateBlockDev(self, node, instance, new_disk,
6105                             f_create, info, f_create)
6106           except errors.OpExecError, err:
6107             self.LogWarning("Failed to create volume %s (%s) on"
6108                             " node %s: %s",
6109                             new_disk.iv_name, new_disk, node, err)
6110         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6111                        (new_disk.size, new_disk.mode)))
6112       else:
6113         # change a given disk
6114         instance.disks[disk_op].mode = disk_dict['mode']
6115         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6116     # NIC changes
6117     for nic_op, nic_dict in self.op.nics:
6118       if nic_op == constants.DDM_REMOVE:
6119         # remove the last nic
6120         del instance.nics[-1]
6121         result.append(("nic.%d" % len(instance.nics), "remove"))
6122       elif nic_op == constants.DDM_ADD:
6123         # mac and bridge should be set, by now
6124         mac = nic_dict['mac']
6125         bridge = nic_dict['bridge']
6126         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6127                               bridge=bridge)
6128         instance.nics.append(new_nic)
6129         result.append(("nic.%d" % (len(instance.nics) - 1),
6130                        "add:mac=%s,ip=%s,bridge=%s" %
6131                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6132       else:
6133         # change a given nic
6134         for key in 'mac', 'ip', 'bridge':
6135           if key in nic_dict:
6136             setattr(instance.nics[nic_op], key, nic_dict[key])
6137             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6138
6139     # hvparams changes
6140     if self.op.hvparams:
6141       instance.hvparams = self.hv_inst
6142       for key, val in self.op.hvparams.iteritems():
6143         result.append(("hv/%s" % key, val))
6144
6145     # beparams changes
6146     if self.op.beparams:
6147       instance.beparams = self.be_inst
6148       for key, val in self.op.beparams.iteritems():
6149         result.append(("be/%s" % key, val))
6150
6151     self.cfg.Update(instance)
6152
6153     return result
6154
6155
6156 class LUQueryExports(NoHooksLU):
6157   """Query the exports list
6158
6159   """
6160   _OP_REQP = ['nodes']
6161   REQ_BGL = False
6162
6163   def ExpandNames(self):
6164     self.needed_locks = {}
6165     self.share_locks[locking.LEVEL_NODE] = 1
6166     if not self.op.nodes:
6167       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6168     else:
6169       self.needed_locks[locking.LEVEL_NODE] = \
6170         _GetWantedNodes(self, self.op.nodes)
6171
6172   def CheckPrereq(self):
6173     """Check prerequisites.
6174
6175     """
6176     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6177
6178   def Exec(self, feedback_fn):
6179     """Compute the list of all the exported system images.
6180
6181     @rtype: dict
6182     @return: a dictionary with the structure node->(export-list)
6183         where export-list is a list of the instances exported on
6184         that node.
6185
6186     """
6187     rpcresult = self.rpc.call_export_list(self.nodes)
6188     result = {}
6189     for node in rpcresult:
6190       if rpcresult[node].failed:
6191         result[node] = False
6192       else:
6193         result[node] = rpcresult[node].data
6194
6195     return result
6196
6197
6198 class LUExportInstance(LogicalUnit):
6199   """Export an instance to an image in the cluster.
6200
6201   """
6202   HPATH = "instance-export"
6203   HTYPE = constants.HTYPE_INSTANCE
6204   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6205   REQ_BGL = False
6206
6207   def ExpandNames(self):
6208     self._ExpandAndLockInstance()
6209     # FIXME: lock only instance primary and destination node
6210     #
6211     # Sad but true, for now we have do lock all nodes, as we don't know where
6212     # the previous export might be, and and in this LU we search for it and
6213     # remove it from its current node. In the future we could fix this by:
6214     #  - making a tasklet to search (share-lock all), then create the new one,
6215     #    then one to remove, after
6216     #  - removing the removal operation altoghether
6217     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6218
6219   def DeclareLocks(self, level):
6220     """Last minute lock declaration."""
6221     # All nodes are locked anyway, so nothing to do here.
6222
6223   def BuildHooksEnv(self):
6224     """Build hooks env.
6225
6226     This will run on the master, primary node and target node.
6227
6228     """
6229     env = {
6230       "EXPORT_NODE": self.op.target_node,
6231       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6232       }
6233     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6234     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6235           self.op.target_node]
6236     return env, nl, nl
6237
6238   def CheckPrereq(self):
6239     """Check prerequisites.
6240
6241     This checks that the instance and node names are valid.
6242
6243     """
6244     instance_name = self.op.instance_name
6245     self.instance = self.cfg.GetInstanceInfo(instance_name)
6246     assert self.instance is not None, \
6247           "Cannot retrieve locked instance %s" % self.op.instance_name
6248     _CheckNodeOnline(self, self.instance.primary_node)
6249
6250     self.dst_node = self.cfg.GetNodeInfo(
6251       self.cfg.ExpandNodeName(self.op.target_node))
6252
6253     if self.dst_node is None:
6254       # This is wrong node name, not a non-locked node
6255       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6256     _CheckNodeOnline(self, self.dst_node.name)
6257     _CheckNodeNotDrained(self, self.dst_node.name)
6258
6259     # instance disk type verification
6260     for disk in self.instance.disks:
6261       if disk.dev_type == constants.LD_FILE:
6262         raise errors.OpPrereqError("Export not supported for instances with"
6263                                    " file-based disks")
6264
6265   def Exec(self, feedback_fn):
6266     """Export an instance to an image in the cluster.
6267
6268     """
6269     instance = self.instance
6270     dst_node = self.dst_node
6271     src_node = instance.primary_node
6272     if self.op.shutdown:
6273       # shutdown the instance, but not the disks
6274       result = self.rpc.call_instance_shutdown(src_node, instance)
6275       msg = result.RemoteFailMsg()
6276       if msg:
6277         raise errors.OpExecError("Could not shutdown instance %s on"
6278                                  " node %s: %s" %
6279                                  (instance.name, src_node, msg))
6280
6281     vgname = self.cfg.GetVGName()
6282
6283     snap_disks = []
6284
6285     # set the disks ID correctly since call_instance_start needs the
6286     # correct drbd minor to create the symlinks
6287     for disk in instance.disks:
6288       self.cfg.SetDiskID(disk, src_node)
6289
6290     try:
6291       for disk in instance.disks:
6292         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6293         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6294         if new_dev_name.failed or not new_dev_name.data:
6295           self.LogWarning("Could not snapshot block device %s on node %s",
6296                           disk.logical_id[1], src_node)
6297           snap_disks.append(False)
6298         else:
6299           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6300                                  logical_id=(vgname, new_dev_name.data),
6301                                  physical_id=(vgname, new_dev_name.data),
6302                                  iv_name=disk.iv_name)
6303           snap_disks.append(new_dev)
6304
6305     finally:
6306       if self.op.shutdown and instance.admin_up:
6307         result = self.rpc.call_instance_start(src_node, instance, None, None)
6308         msg = result.RemoteFailMsg()
6309         if msg:
6310           _ShutdownInstanceDisks(self, instance)
6311           raise errors.OpExecError("Could not start instance: %s" % msg)
6312
6313     # TODO: check for size
6314
6315     cluster_name = self.cfg.GetClusterName()
6316     for idx, dev in enumerate(snap_disks):
6317       if dev:
6318         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6319                                                instance, cluster_name, idx)
6320         if result.failed or not result.data:
6321           self.LogWarning("Could not export block device %s from node %s to"
6322                           " node %s", dev.logical_id[1], src_node,
6323                           dst_node.name)
6324         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6325         if msg:
6326           self.LogWarning("Could not remove snapshot block device %s from node"
6327                           " %s: %s", dev.logical_id[1], src_node, msg)
6328
6329     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6330     if result.failed or not result.data:
6331       self.LogWarning("Could not finalize export for instance %s on node %s",
6332                       instance.name, dst_node.name)
6333
6334     nodelist = self.cfg.GetNodeList()
6335     nodelist.remove(dst_node.name)
6336
6337     # on one-node clusters nodelist will be empty after the removal
6338     # if we proceed the backup would be removed because OpQueryExports
6339     # substitutes an empty list with the full cluster node list.
6340     if nodelist:
6341       exportlist = self.rpc.call_export_list(nodelist)
6342       for node in exportlist:
6343         if exportlist[node].failed:
6344           continue
6345         if instance.name in exportlist[node].data:
6346           if not self.rpc.call_export_remove(node, instance.name):
6347             self.LogWarning("Could not remove older export for instance %s"
6348                             " on node %s", instance.name, node)
6349
6350
6351 class LURemoveExport(NoHooksLU):
6352   """Remove exports related to the named instance.
6353
6354   """
6355   _OP_REQP = ["instance_name"]
6356   REQ_BGL = False
6357
6358   def ExpandNames(self):
6359     self.needed_locks = {}
6360     # We need all nodes to be locked in order for RemoveExport to work, but we
6361     # don't need to lock the instance itself, as nothing will happen to it (and
6362     # we can remove exports also for a removed instance)
6363     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6364
6365   def CheckPrereq(self):
6366     """Check prerequisites.
6367     """
6368     pass
6369
6370   def Exec(self, feedback_fn):
6371     """Remove any export.
6372
6373     """
6374     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6375     # If the instance was not found we'll try with the name that was passed in.
6376     # This will only work if it was an FQDN, though.
6377     fqdn_warn = False
6378     if not instance_name:
6379       fqdn_warn = True
6380       instance_name = self.op.instance_name
6381
6382     exportlist = self.rpc.call_export_list(self.acquired_locks[
6383       locking.LEVEL_NODE])
6384     found = False
6385     for node in exportlist:
6386       if exportlist[node].failed:
6387         self.LogWarning("Failed to query node %s, continuing" % node)
6388         continue
6389       if instance_name in exportlist[node].data:
6390         found = True
6391         result = self.rpc.call_export_remove(node, instance_name)
6392         if result.failed or not result.data:
6393           logging.error("Could not remove export for instance %s"
6394                         " on node %s", instance_name, node)
6395
6396     if fqdn_warn and not found:
6397       feedback_fn("Export not found. If trying to remove an export belonging"
6398                   " to a deleted instance please use its Fully Qualified"
6399                   " Domain Name.")
6400
6401
6402 class TagsLU(NoHooksLU):
6403   """Generic tags LU.
6404
6405   This is an abstract class which is the parent of all the other tags LUs.
6406
6407   """
6408
6409   def ExpandNames(self):
6410     self.needed_locks = {}
6411     if self.op.kind == constants.TAG_NODE:
6412       name = self.cfg.ExpandNodeName(self.op.name)
6413       if name is None:
6414         raise errors.OpPrereqError("Invalid node name (%s)" %
6415                                    (self.op.name,))
6416       self.op.name = name
6417       self.needed_locks[locking.LEVEL_NODE] = name
6418     elif self.op.kind == constants.TAG_INSTANCE:
6419       name = self.cfg.ExpandInstanceName(self.op.name)
6420       if name is None:
6421         raise errors.OpPrereqError("Invalid instance name (%s)" %
6422                                    (self.op.name,))
6423       self.op.name = name
6424       self.needed_locks[locking.LEVEL_INSTANCE] = name
6425
6426   def CheckPrereq(self):
6427     """Check prerequisites.
6428
6429     """
6430     if self.op.kind == constants.TAG_CLUSTER:
6431       self.target = self.cfg.GetClusterInfo()
6432     elif self.op.kind == constants.TAG_NODE:
6433       self.target = self.cfg.GetNodeInfo(self.op.name)
6434     elif self.op.kind == constants.TAG_INSTANCE:
6435       self.target = self.cfg.GetInstanceInfo(self.op.name)
6436     else:
6437       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6438                                  str(self.op.kind))
6439
6440
6441 class LUGetTags(TagsLU):
6442   """Returns the tags of a given object.
6443
6444   """
6445   _OP_REQP = ["kind", "name"]
6446   REQ_BGL = False
6447
6448   def Exec(self, feedback_fn):
6449     """Returns the tag list.
6450
6451     """
6452     return list(self.target.GetTags())
6453
6454
6455 class LUSearchTags(NoHooksLU):
6456   """Searches the tags for a given pattern.
6457
6458   """
6459   _OP_REQP = ["pattern"]
6460   REQ_BGL = False
6461
6462   def ExpandNames(self):
6463     self.needed_locks = {}
6464
6465   def CheckPrereq(self):
6466     """Check prerequisites.
6467
6468     This checks the pattern passed for validity by compiling it.
6469
6470     """
6471     try:
6472       self.re = re.compile(self.op.pattern)
6473     except re.error, err:
6474       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6475                                  (self.op.pattern, err))
6476
6477   def Exec(self, feedback_fn):
6478     """Returns the tag list.
6479
6480     """
6481     cfg = self.cfg
6482     tgts = [("/cluster", cfg.GetClusterInfo())]
6483     ilist = cfg.GetAllInstancesInfo().values()
6484     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6485     nlist = cfg.GetAllNodesInfo().values()
6486     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6487     results = []
6488     for path, target in tgts:
6489       for tag in target.GetTags():
6490         if self.re.search(tag):
6491           results.append((path, tag))
6492     return results
6493
6494
6495 class LUAddTags(TagsLU):
6496   """Sets a tag on a given object.
6497
6498   """
6499   _OP_REQP = ["kind", "name", "tags"]
6500   REQ_BGL = False
6501
6502   def CheckPrereq(self):
6503     """Check prerequisites.
6504
6505     This checks the type and length of the tag name and value.
6506
6507     """
6508     TagsLU.CheckPrereq(self)
6509     for tag in self.op.tags:
6510       objects.TaggableObject.ValidateTag(tag)
6511
6512   def Exec(self, feedback_fn):
6513     """Sets the tag.
6514
6515     """
6516     try:
6517       for tag in self.op.tags:
6518         self.target.AddTag(tag)
6519     except errors.TagError, err:
6520       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6521     try:
6522       self.cfg.Update(self.target)
6523     except errors.ConfigurationError:
6524       raise errors.OpRetryError("There has been a modification to the"
6525                                 " config file and the operation has been"
6526                                 " aborted. Please retry.")
6527
6528
6529 class LUDelTags(TagsLU):
6530   """Delete a list of tags from a given object.
6531
6532   """
6533   _OP_REQP = ["kind", "name", "tags"]
6534   REQ_BGL = False
6535
6536   def CheckPrereq(self):
6537     """Check prerequisites.
6538
6539     This checks that we have the given tag.
6540
6541     """
6542     TagsLU.CheckPrereq(self)
6543     for tag in self.op.tags:
6544       objects.TaggableObject.ValidateTag(tag)
6545     del_tags = frozenset(self.op.tags)
6546     cur_tags = self.target.GetTags()
6547     if not del_tags <= cur_tags:
6548       diff_tags = del_tags - cur_tags
6549       diff_names = ["'%s'" % tag for tag in diff_tags]
6550       diff_names.sort()
6551       raise errors.OpPrereqError("Tag(s) %s not found" %
6552                                  (",".join(diff_names)))
6553
6554   def Exec(self, feedback_fn):
6555     """Remove the tag from the object.
6556
6557     """
6558     for tag in self.op.tags:
6559       self.target.RemoveTag(tag)
6560     try:
6561       self.cfg.Update(self.target)
6562     except errors.ConfigurationError:
6563       raise errors.OpRetryError("There has been a modification to the"
6564                                 " config file and the operation has been"
6565                                 " aborted. Please retry.")
6566
6567
6568 class LUTestDelay(NoHooksLU):
6569   """Sleep for a specified amount of time.
6570
6571   This LU sleeps on the master and/or nodes for a specified amount of
6572   time.
6573
6574   """
6575   _OP_REQP = ["duration", "on_master", "on_nodes"]
6576   REQ_BGL = False
6577
6578   def ExpandNames(self):
6579     """Expand names and set required locks.
6580
6581     This expands the node list, if any.
6582
6583     """
6584     self.needed_locks = {}
6585     if self.op.on_nodes:
6586       # _GetWantedNodes can be used here, but is not always appropriate to use
6587       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6588       # more information.
6589       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6590       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6591
6592   def CheckPrereq(self):
6593     """Check prerequisites.
6594
6595     """
6596
6597   def Exec(self, feedback_fn):
6598     """Do the actual sleep.
6599
6600     """
6601     if self.op.on_master:
6602       if not utils.TestDelay(self.op.duration):
6603         raise errors.OpExecError("Error during master delay test")
6604     if self.op.on_nodes:
6605       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6606       if not result:
6607         raise errors.OpExecError("Complete failure from rpc call")
6608       for node, node_result in result.items():
6609         node_result.Raise()
6610         if not node_result.data:
6611           raise errors.OpExecError("Failure during rpc call to node %s,"
6612                                    " result: %s" % (node, node_result.data))
6613
6614
6615 class IAllocator(object):
6616   """IAllocator framework.
6617
6618   An IAllocator instance has three sets of attributes:
6619     - cfg that is needed to query the cluster
6620     - input data (all members of the _KEYS class attribute are required)
6621     - four buffer attributes (in|out_data|text), that represent the
6622       input (to the external script) in text and data structure format,
6623       and the output from it, again in two formats
6624     - the result variables from the script (success, info, nodes) for
6625       easy usage
6626
6627   """
6628   _ALLO_KEYS = [
6629     "mem_size", "disks", "disk_template",
6630     "os", "tags", "nics", "vcpus", "hypervisor",
6631     ]
6632   _RELO_KEYS = [
6633     "relocate_from",
6634     ]
6635
6636   def __init__(self, lu, mode, name, **kwargs):
6637     self.lu = lu
6638     # init buffer variables
6639     self.in_text = self.out_text = self.in_data = self.out_data = None
6640     # init all input fields so that pylint is happy
6641     self.mode = mode
6642     self.name = name
6643     self.mem_size = self.disks = self.disk_template = None
6644     self.os = self.tags = self.nics = self.vcpus = None
6645     self.hypervisor = None
6646     self.relocate_from = None
6647     # computed fields
6648     self.required_nodes = None
6649     # init result fields
6650     self.success = self.info = self.nodes = None
6651     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6652       keyset = self._ALLO_KEYS
6653     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6654       keyset = self._RELO_KEYS
6655     else:
6656       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6657                                    " IAllocator" % self.mode)
6658     for key in kwargs:
6659       if key not in keyset:
6660         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6661                                      " IAllocator" % key)
6662       setattr(self, key, kwargs[key])
6663     for key in keyset:
6664       if key not in kwargs:
6665         raise errors.ProgrammerError("Missing input parameter '%s' to"
6666                                      " IAllocator" % key)
6667     self._BuildInputData()
6668
6669   def _ComputeClusterData(self):
6670     """Compute the generic allocator input data.
6671
6672     This is the data that is independent of the actual operation.
6673
6674     """
6675     cfg = self.lu.cfg
6676     cluster_info = cfg.GetClusterInfo()
6677     # cluster data
6678     data = {
6679       "version": constants.IALLOCATOR_VERSION,
6680       "cluster_name": cfg.GetClusterName(),
6681       "cluster_tags": list(cluster_info.GetTags()),
6682       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6683       # we don't have job IDs
6684       }
6685     iinfo = cfg.GetAllInstancesInfo().values()
6686     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6687
6688     # node data
6689     node_results = {}
6690     node_list = cfg.GetNodeList()
6691
6692     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6693       hypervisor_name = self.hypervisor
6694     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6695       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6696
6697     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6698                                            hypervisor_name)
6699     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6700                        cluster_info.enabled_hypervisors)
6701     for nname, nresult in node_data.items():
6702       # first fill in static (config-based) values
6703       ninfo = cfg.GetNodeInfo(nname)
6704       pnr = {
6705         "tags": list(ninfo.GetTags()),
6706         "primary_ip": ninfo.primary_ip,
6707         "secondary_ip": ninfo.secondary_ip,
6708         "offline": ninfo.offline,
6709         "drained": ninfo.drained,
6710         "master_candidate": ninfo.master_candidate,
6711         }
6712
6713       if not ninfo.offline:
6714         nresult.Raise()
6715         if not isinstance(nresult.data, dict):
6716           raise errors.OpExecError("Can't get data for node %s" % nname)
6717         remote_info = nresult.data
6718         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6719                      'vg_size', 'vg_free', 'cpu_total']:
6720           if attr not in remote_info:
6721             raise errors.OpExecError("Node '%s' didn't return attribute"
6722                                      " '%s'" % (nname, attr))
6723           try:
6724             remote_info[attr] = int(remote_info[attr])
6725           except ValueError, err:
6726             raise errors.OpExecError("Node '%s' returned invalid value"
6727                                      " for '%s': %s" % (nname, attr, err))
6728         # compute memory used by primary instances
6729         i_p_mem = i_p_up_mem = 0
6730         for iinfo, beinfo in i_list:
6731           if iinfo.primary_node == nname:
6732             i_p_mem += beinfo[constants.BE_MEMORY]
6733             if iinfo.name not in node_iinfo[nname].data:
6734               i_used_mem = 0
6735             else:
6736               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6737             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6738             remote_info['memory_free'] -= max(0, i_mem_diff)
6739
6740             if iinfo.admin_up:
6741               i_p_up_mem += beinfo[constants.BE_MEMORY]
6742
6743         # compute memory used by instances
6744         pnr_dyn = {
6745           "total_memory": remote_info['memory_total'],
6746           "reserved_memory": remote_info['memory_dom0'],
6747           "free_memory": remote_info['memory_free'],
6748           "total_disk": remote_info['vg_size'],
6749           "free_disk": remote_info['vg_free'],
6750           "total_cpus": remote_info['cpu_total'],
6751           "i_pri_memory": i_p_mem,
6752           "i_pri_up_memory": i_p_up_mem,
6753           }
6754         pnr.update(pnr_dyn)
6755
6756       node_results[nname] = pnr
6757     data["nodes"] = node_results
6758
6759     # instance data
6760     instance_data = {}
6761     for iinfo, beinfo in i_list:
6762       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6763                   for n in iinfo.nics]
6764       pir = {
6765         "tags": list(iinfo.GetTags()),
6766         "admin_up": iinfo.admin_up,
6767         "vcpus": beinfo[constants.BE_VCPUS],
6768         "memory": beinfo[constants.BE_MEMORY],
6769         "os": iinfo.os,
6770         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6771         "nics": nic_data,
6772         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6773         "disk_template": iinfo.disk_template,
6774         "hypervisor": iinfo.hypervisor,
6775         }
6776       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6777                                                  pir["disks"])
6778       instance_data[iinfo.name] = pir
6779
6780     data["instances"] = instance_data
6781
6782     self.in_data = data
6783
6784   def _AddNewInstance(self):
6785     """Add new instance data to allocator structure.
6786
6787     This in combination with _AllocatorGetClusterData will create the
6788     correct structure needed as input for the allocator.
6789
6790     The checks for the completeness of the opcode must have already been
6791     done.
6792
6793     """
6794     data = self.in_data
6795
6796     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6797
6798     if self.disk_template in constants.DTS_NET_MIRROR:
6799       self.required_nodes = 2
6800     else:
6801       self.required_nodes = 1
6802     request = {
6803       "type": "allocate",
6804       "name": self.name,
6805       "disk_template": self.disk_template,
6806       "tags": self.tags,
6807       "os": self.os,
6808       "vcpus": self.vcpus,
6809       "memory": self.mem_size,
6810       "disks": self.disks,
6811       "disk_space_total": disk_space,
6812       "nics": self.nics,
6813       "required_nodes": self.required_nodes,
6814       }
6815     data["request"] = request
6816
6817   def _AddRelocateInstance(self):
6818     """Add relocate instance data to allocator structure.
6819
6820     This in combination with _IAllocatorGetClusterData will create the
6821     correct structure needed as input for the allocator.
6822
6823     The checks for the completeness of the opcode must have already been
6824     done.
6825
6826     """
6827     instance = self.lu.cfg.GetInstanceInfo(self.name)
6828     if instance is None:
6829       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6830                                    " IAllocator" % self.name)
6831
6832     if instance.disk_template not in constants.DTS_NET_MIRROR:
6833       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6834
6835     if len(instance.secondary_nodes) != 1:
6836       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6837
6838     self.required_nodes = 1
6839     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6840     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6841
6842     request = {
6843       "type": "relocate",
6844       "name": self.name,
6845       "disk_space_total": disk_space,
6846       "required_nodes": self.required_nodes,
6847       "relocate_from": self.relocate_from,
6848       }
6849     self.in_data["request"] = request
6850
6851   def _BuildInputData(self):
6852     """Build input data structures.
6853
6854     """
6855     self._ComputeClusterData()
6856
6857     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6858       self._AddNewInstance()
6859     else:
6860       self._AddRelocateInstance()
6861
6862     self.in_text = serializer.Dump(self.in_data)
6863
6864   def Run(self, name, validate=True, call_fn=None):
6865     """Run an instance allocator and return the results.
6866
6867     """
6868     if call_fn is None:
6869       call_fn = self.lu.rpc.call_iallocator_runner
6870     data = self.in_text
6871
6872     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6873     result.Raise()
6874
6875     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6876       raise errors.OpExecError("Invalid result from master iallocator runner")
6877
6878     rcode, stdout, stderr, fail = result.data
6879
6880     if rcode == constants.IARUN_NOTFOUND:
6881       raise errors.OpExecError("Can't find allocator '%s'" % name)
6882     elif rcode == constants.IARUN_FAILURE:
6883       raise errors.OpExecError("Instance allocator call failed: %s,"
6884                                " output: %s" % (fail, stdout+stderr))
6885     self.out_text = stdout
6886     if validate:
6887       self._ValidateResult()
6888
6889   def _ValidateResult(self):
6890     """Process the allocator results.
6891
6892     This will process and if successful save the result in
6893     self.out_data and the other parameters.
6894
6895     """
6896     try:
6897       rdict = serializer.Load(self.out_text)
6898     except Exception, err:
6899       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6900
6901     if not isinstance(rdict, dict):
6902       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6903
6904     for key in "success", "info", "nodes":
6905       if key not in rdict:
6906         raise errors.OpExecError("Can't parse iallocator results:"
6907                                  " missing key '%s'" % key)
6908       setattr(self, key, rdict[key])
6909
6910     if not isinstance(rdict["nodes"], list):
6911       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6912                                " is not a list")
6913     self.out_data = rdict
6914
6915
6916 class LUTestAllocator(NoHooksLU):
6917   """Run allocator tests.
6918
6919   This LU runs the allocator tests
6920
6921   """
6922   _OP_REQP = ["direction", "mode", "name"]
6923
6924   def CheckPrereq(self):
6925     """Check prerequisites.
6926
6927     This checks the opcode parameters depending on the director and mode test.
6928
6929     """
6930     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6931       for attr in ["name", "mem_size", "disks", "disk_template",
6932                    "os", "tags", "nics", "vcpus"]:
6933         if not hasattr(self.op, attr):
6934           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6935                                      attr)
6936       iname = self.cfg.ExpandInstanceName(self.op.name)
6937       if iname is not None:
6938         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6939                                    iname)
6940       if not isinstance(self.op.nics, list):
6941         raise errors.OpPrereqError("Invalid parameter 'nics'")
6942       for row in self.op.nics:
6943         if (not isinstance(row, dict) or
6944             "mac" not in row or
6945             "ip" not in row or
6946             "bridge" not in row):
6947           raise errors.OpPrereqError("Invalid contents of the"
6948                                      " 'nics' parameter")
6949       if not isinstance(self.op.disks, list):
6950         raise errors.OpPrereqError("Invalid parameter 'disks'")
6951       for row in self.op.disks:
6952         if (not isinstance(row, dict) or
6953             "size" not in row or
6954             not isinstance(row["size"], int) or
6955             "mode" not in row or
6956             row["mode"] not in ['r', 'w']):
6957           raise errors.OpPrereqError("Invalid contents of the"
6958                                      " 'disks' parameter")
6959       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6960         self.op.hypervisor = self.cfg.GetHypervisorType()
6961     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6962       if not hasattr(self.op, "name"):
6963         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6964       fname = self.cfg.ExpandInstanceName(self.op.name)
6965       if fname is None:
6966         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6967                                    self.op.name)
6968       self.op.name = fname
6969       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6970     else:
6971       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6972                                  self.op.mode)
6973
6974     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6975       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6976         raise errors.OpPrereqError("Missing allocator name")
6977     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6978       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6979                                  self.op.direction)
6980
6981   def Exec(self, feedback_fn):
6982     """Run the allocator test.
6983
6984     """
6985     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6986       ial = IAllocator(self,
6987                        mode=self.op.mode,
6988                        name=self.op.name,
6989                        mem_size=self.op.mem_size,
6990                        disks=self.op.disks,
6991                        disk_template=self.op.disk_template,
6992                        os=self.op.os,
6993                        tags=self.op.tags,
6994                        nics=self.op.nics,
6995                        vcpus=self.op.vcpus,
6996                        hypervisor=self.op.hypervisor,
6997                        )
6998     else:
6999       ial = IAllocator(self,
7000                        mode=self.op.mode,
7001                        name=self.op.name,
7002                        relocate_from=list(self.relocate_from),
7003                        )
7004
7005     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7006       result = ial.in_text
7007     else:
7008       ial.Run(self.op.allocator, validate=False)
7009       result = ial.out_text
7010     return result