35c14b13f04cba6c3d0e8ca47c0b41d6dd5aa7f8
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, bridge, mac) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
509       env["INSTANCE_NIC%d_MAC" % idx] = mac
510   else:
511     nic_count = 0
512
513   env["INSTANCE_NIC_COUNT"] = nic_count
514
515   if disks:
516     disk_count = len(disks)
517     for idx, (size, mode) in enumerate(disks):
518       env["INSTANCE_DISK%d_SIZE" % idx] = size
519       env["INSTANCE_DISK%d_MODE" % idx] = mode
520   else:
521     disk_count = 0
522
523   env["INSTANCE_DISK_COUNT"] = disk_count
524
525   return env
526
527
528 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
529   """Builds instance related env variables for hooks from an object.
530
531   @type lu: L{LogicalUnit}
532   @param lu: the logical unit on whose behalf we execute
533   @type instance: L{objects.Instance}
534   @param instance: the instance for which we should build the
535       environment
536   @type override: dict
537   @param override: dictionary with key/values that will override
538       our values
539   @rtype: dict
540   @return: the hook environment dictionary
541
542   """
543   bep = lu.cfg.GetClusterInfo().FillBE(instance)
544   args = {
545     'name': instance.name,
546     'primary_node': instance.primary_node,
547     'secondary_nodes': instance.secondary_nodes,
548     'os_type': instance.os,
549     'status': instance.admin_up,
550     'memory': bep[constants.BE_MEMORY],
551     'vcpus': bep[constants.BE_VCPUS],
552     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
553     'disk_template': instance.disk_template,
554     'disks': [(disk.size, disk.mode) for disk in instance.disks],
555   }
556   if override:
557     args.update(override)
558   return _BuildInstanceHookEnv(**args)
559
560
561 def _AdjustCandidatePool(lu):
562   """Adjust the candidate pool after node operations.
563
564   """
565   mod_list = lu.cfg.MaintainCandidatePool()
566   if mod_list:
567     lu.LogInfo("Promoted nodes to master candidate role: %s",
568                ", ".join(node.name for node in mod_list))
569     for name in mod_list:
570       lu.context.ReaddNode(name)
571   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
572   if mc_now > mc_max:
573     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
574                (mc_now, mc_max))
575
576
577 def _CheckInstanceBridgesExist(lu, instance):
578   """Check that the brigdes needed by an instance exist.
579
580   """
581   # check bridges existance
582   brlist = [nic.bridge for nic in instance.nics]
583   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
584   result.Raise()
585   if not result.data:
586     raise errors.OpPrereqError("One or more target bridges %s does not"
587                                " exist on destination node '%s'" %
588                                (brlist, instance.primary_node))
589
590
591 class LUDestroyCluster(NoHooksLU):
592   """Logical unit for destroying the cluster.
593
594   """
595   _OP_REQP = []
596
597   def CheckPrereq(self):
598     """Check prerequisites.
599
600     This checks whether the cluster is empty.
601
602     Any errors are signalled by raising errors.OpPrereqError.
603
604     """
605     master = self.cfg.GetMasterNode()
606
607     nodelist = self.cfg.GetNodeList()
608     if len(nodelist) != 1 or nodelist[0] != master:
609       raise errors.OpPrereqError("There are still %d node(s) in"
610                                  " this cluster." % (len(nodelist) - 1))
611     instancelist = self.cfg.GetInstanceList()
612     if instancelist:
613       raise errors.OpPrereqError("There are still %d instance(s) in"
614                                  " this cluster." % len(instancelist))
615
616   def Exec(self, feedback_fn):
617     """Destroys the cluster.
618
619     """
620     master = self.cfg.GetMasterNode()
621     result = self.rpc.call_node_stop_master(master, False)
622     result.Raise()
623     if not result.data:
624       raise errors.OpExecError("Could not disable the master role")
625     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
626     utils.CreateBackup(priv_key)
627     utils.CreateBackup(pub_key)
628     return master
629
630
631 class LUVerifyCluster(LogicalUnit):
632   """Verifies the cluster status.
633
634   """
635   HPATH = "cluster-verify"
636   HTYPE = constants.HTYPE_CLUSTER
637   _OP_REQP = ["skip_checks"]
638   REQ_BGL = False
639
640   def ExpandNames(self):
641     self.needed_locks = {
642       locking.LEVEL_NODE: locking.ALL_SET,
643       locking.LEVEL_INSTANCE: locking.ALL_SET,
644     }
645     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
646
647   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
648                   node_result, feedback_fn, master_files,
649                   drbd_map, vg_name):
650     """Run multiple tests against a node.
651
652     Test list:
653
654       - compares ganeti version
655       - checks vg existance and size > 20G
656       - checks config file checksum
657       - checks ssh to other nodes
658
659     @type nodeinfo: L{objects.Node}
660     @param nodeinfo: the node to check
661     @param file_list: required list of files
662     @param local_cksum: dictionary of local files and their checksums
663     @param node_result: the results from the node
664     @param feedback_fn: function used to accumulate results
665     @param master_files: list of files that only masters should have
666     @param drbd_map: the useddrbd minors for this node, in
667         form of minor: (instance, must_exist) which correspond to instances
668         and their running status
669     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
670
671     """
672     node = nodeinfo.name
673
674     # main result, node_result should be a non-empty dict
675     if not node_result or not isinstance(node_result, dict):
676       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
677       return True
678
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     remote_version = node_result.get('version', None)
682     if not (remote_version and isinstance(remote_version, (list, tuple)) and
683             len(remote_version) == 2):
684       feedback_fn("  - ERROR: connection to %s failed" % (node))
685       return True
686
687     if local_version != remote_version[0]:
688       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
689                   " node %s %s" % (local_version, node, remote_version[0]))
690       return True
691
692     # node seems compatible, we can actually try to look into its results
693
694     bad = False
695
696     # full package version
697     if constants.RELEASE_VERSION != remote_version[1]:
698       feedback_fn("  - WARNING: software version mismatch: master %s,"
699                   " node %s %s" %
700                   (constants.RELEASE_VERSION, node, remote_version[1]))
701
702     # checks vg existence and size > 20G
703     if vg_name is not None:
704       vglist = node_result.get(constants.NV_VGLIST, None)
705       if not vglist:
706         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
707                         (node,))
708         bad = True
709       else:
710         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
711                                               constants.MIN_VG_SIZE)
712         if vgstatus:
713           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
714           bad = True
715
716     # checks config file checksum
717
718     remote_cksum = node_result.get(constants.NV_FILELIST, None)
719     if not isinstance(remote_cksum, dict):
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned file checksum data")
722     else:
723       for file_name in file_list:
724         node_is_mc = nodeinfo.master_candidate
725         must_have_file = file_name not in master_files
726         if file_name not in remote_cksum:
727           if node_is_mc or must_have_file:
728             bad = True
729             feedback_fn("  - ERROR: file '%s' missing" % file_name)
730         elif remote_cksum[file_name] != local_cksum[file_name]:
731           if node_is_mc or must_have_file:
732             bad = True
733             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
734           else:
735             # not candidate and this is not a must-have file
736             bad = True
737             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
738                         " '%s'" % file_name)
739         else:
740           # all good, except non-master/non-must have combination
741           if not node_is_mc and not must_have_file:
742             feedback_fn("  - ERROR: file '%s' should not exist on non master"
743                         " candidates" % file_name)
744
745     # checks ssh to any
746
747     if constants.NV_NODELIST not in node_result:
748       bad = True
749       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
750     else:
751       if node_result[constants.NV_NODELIST]:
752         bad = True
753         for node in node_result[constants.NV_NODELIST]:
754           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
755                           (node, node_result[constants.NV_NODELIST][node]))
756
757     if constants.NV_NODENETTEST not in node_result:
758       bad = True
759       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
760     else:
761       if node_result[constants.NV_NODENETTEST]:
762         bad = True
763         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
764         for node in nlist:
765           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
766                           (node, node_result[constants.NV_NODENETTEST][node]))
767
768     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
769     if isinstance(hyp_result, dict):
770       for hv_name, hv_result in hyp_result.iteritems():
771         if hv_result is not None:
772           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
773                       (hv_name, hv_result))
774
775     # check used drbd list
776     if vg_name is not None:
777       used_minors = node_result.get(constants.NV_DRBDLIST, [])
778       if not isinstance(used_minors, (tuple, list)):
779         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
780                     str(used_minors))
781       else:
782         for minor, (iname, must_exist) in drbd_map.items():
783           if minor not in used_minors and must_exist:
784             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
785                         " not active" % (minor, iname))
786             bad = True
787         for minor in used_minors:
788           if minor not in drbd_map:
789             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
790                         minor)
791             bad = True
792
793     return bad
794
795   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
796                       node_instance, feedback_fn, n_offline):
797     """Verify an instance.
798
799     This function checks to see if the required block devices are
800     available on the instance's node.
801
802     """
803     bad = False
804
805     node_current = instanceconfig.primary_node
806
807     node_vol_should = {}
808     instanceconfig.MapLVsByNode(node_vol_should)
809
810     for node in node_vol_should:
811       if node in n_offline:
812         # ignore missing volumes on offline nodes
813         continue
814       for volume in node_vol_should[node]:
815         if node not in node_vol_is or volume not in node_vol_is[node]:
816           feedback_fn("  - ERROR: volume %s missing on node %s" %
817                           (volume, node))
818           bad = True
819
820     if instanceconfig.admin_up:
821       if ((node_current not in node_instance or
822           not instance in node_instance[node_current]) and
823           node_current not in n_offline):
824         feedback_fn("  - ERROR: instance %s not running on node %s" %
825                         (instance, node_current))
826         bad = True
827
828     for node in node_instance:
829       if (not node == node_current):
830         if instance in node_instance[node]:
831           feedback_fn("  - ERROR: instance %s should not run on node %s" %
832                           (instance, node))
833           bad = True
834
835     return bad
836
837   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
838     """Verify if there are any unknown volumes in the cluster.
839
840     The .os, .swap and backup volumes are ignored. All other volumes are
841     reported as unknown.
842
843     """
844     bad = False
845
846     for node in node_vol_is:
847       for volume in node_vol_is[node]:
848         if node not in node_vol_should or volume not in node_vol_should[node]:
849           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
850                       (volume, node))
851           bad = True
852     return bad
853
854   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
855     """Verify the list of running instances.
856
857     This checks what instances are running but unknown to the cluster.
858
859     """
860     bad = False
861     for node in node_instance:
862       for runninginstance in node_instance[node]:
863         if runninginstance not in instancelist:
864           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
865                           (runninginstance, node))
866           bad = True
867     return bad
868
869   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
870     """Verify N+1 Memory Resilience.
871
872     Check that if one single node dies we can still start all the instances it
873     was primary for.
874
875     """
876     bad = False
877
878     for node, nodeinfo in node_info.iteritems():
879       # This code checks that every node which is now listed as secondary has
880       # enough memory to host all instances it is supposed to should a single
881       # other node in the cluster fail.
882       # FIXME: not ready for failover to an arbitrary node
883       # FIXME: does not support file-backed instances
884       # WARNING: we currently take into account down instances as well as up
885       # ones, considering that even if they're down someone might want to start
886       # them even in the event of a node failure.
887       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
888         needed_mem = 0
889         for instance in instances:
890           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
891           if bep[constants.BE_AUTO_BALANCE]:
892             needed_mem += bep[constants.BE_MEMORY]
893         if nodeinfo['mfree'] < needed_mem:
894           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
895                       " failovers should node %s fail" % (node, prinode))
896           bad = True
897     return bad
898
899   def CheckPrereq(self):
900     """Check prerequisites.
901
902     Transform the list of checks we're going to skip into a set and check that
903     all its members are valid.
904
905     """
906     self.skip_set = frozenset(self.op.skip_checks)
907     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
908       raise errors.OpPrereqError("Invalid checks to be skipped specified")
909
910   def BuildHooksEnv(self):
911     """Build hooks env.
912
913     Cluster-Verify hooks just rone in the post phase and their failure makes
914     the output be logged in the verify output and the verification to fail.
915
916     """
917     all_nodes = self.cfg.GetNodeList()
918     env = {
919       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
920       }
921     for node in self.cfg.GetAllNodesInfo().values():
922       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
923
924     return env, [], all_nodes
925
926   def Exec(self, feedback_fn):
927     """Verify integrity of cluster, performing various test on nodes.
928
929     """
930     bad = False
931     feedback_fn("* Verifying global settings")
932     for msg in self.cfg.VerifyConfig():
933       feedback_fn("  - ERROR: %s" % msg)
934
935     vg_name = self.cfg.GetVGName()
936     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
937     nodelist = utils.NiceSort(self.cfg.GetNodeList())
938     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
939     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
940     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
941                         for iname in instancelist)
942     i_non_redundant = [] # Non redundant instances
943     i_non_a_balanced = [] # Non auto-balanced instances
944     n_offline = [] # List of offline nodes
945     n_drained = [] # List of nodes being drained
946     node_volume = {}
947     node_instance = {}
948     node_info = {}
949     instance_cfg = {}
950
951     # FIXME: verify OS list
952     # do local checksums
953     master_files = [constants.CLUSTER_CONF_FILE]
954
955     file_names = ssconf.SimpleStore().GetFileList()
956     file_names.append(constants.SSL_CERT_FILE)
957     file_names.append(constants.RAPI_CERT_FILE)
958     file_names.extend(master_files)
959
960     local_checksums = utils.FingerprintFiles(file_names)
961
962     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
963     node_verify_param = {
964       constants.NV_FILELIST: file_names,
965       constants.NV_NODELIST: [node.name for node in nodeinfo
966                               if not node.offline],
967       constants.NV_HYPERVISOR: hypervisors,
968       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
969                                   node.secondary_ip) for node in nodeinfo
970                                  if not node.offline],
971       constants.NV_INSTANCELIST: hypervisors,
972       constants.NV_VERSION: None,
973       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
974       }
975     if vg_name is not None:
976       node_verify_param[constants.NV_VGLIST] = None
977       node_verify_param[constants.NV_LVLIST] = vg_name
978       node_verify_param[constants.NV_DRBDLIST] = None
979     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
980                                            self.cfg.GetClusterName())
981
982     cluster = self.cfg.GetClusterInfo()
983     master_node = self.cfg.GetMasterNode()
984     all_drbd_map = self.cfg.ComputeDRBDMap()
985
986     for node_i in nodeinfo:
987       node = node_i.name
988       nresult = all_nvinfo[node].data
989
990       if node_i.offline:
991         feedback_fn("* Skipping offline node %s" % (node,))
992         n_offline.append(node)
993         continue
994
995       if node == master_node:
996         ntype = "master"
997       elif node_i.master_candidate:
998         ntype = "master candidate"
999       elif node_i.drained:
1000         ntype = "drained"
1001         n_drained.append(node)
1002       else:
1003         ntype = "regular"
1004       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1005
1006       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1007         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1008         bad = True
1009         continue
1010
1011       node_drbd = {}
1012       for minor, instance in all_drbd_map[node].items():
1013         if instance not in instanceinfo:
1014           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1015                       instance)
1016           # ghost instance should not be running, but otherwise we
1017           # don't give double warnings (both ghost instance and
1018           # unallocated minor in use)
1019           node_drbd[minor] = (instance, False)
1020         else:
1021           instance = instanceinfo[instance]
1022           node_drbd[minor] = (instance.name, instance.admin_up)
1023       result = self._VerifyNode(node_i, file_names, local_checksums,
1024                                 nresult, feedback_fn, master_files,
1025                                 node_drbd, vg_name)
1026       bad = bad or result
1027
1028       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1029       if vg_name is None:
1030         node_volume[node] = {}
1031       elif isinstance(lvdata, basestring):
1032         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1033                     (node, utils.SafeEncode(lvdata)))
1034         bad = True
1035         node_volume[node] = {}
1036       elif not isinstance(lvdata, dict):
1037         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1038         bad = True
1039         continue
1040       else:
1041         node_volume[node] = lvdata
1042
1043       # node_instance
1044       idata = nresult.get(constants.NV_INSTANCELIST, None)
1045       if not isinstance(idata, list):
1046         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1047                     (node,))
1048         bad = True
1049         continue
1050
1051       node_instance[node] = idata
1052
1053       # node_info
1054       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1055       if not isinstance(nodeinfo, dict):
1056         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1057         bad = True
1058         continue
1059
1060       try:
1061         node_info[node] = {
1062           "mfree": int(nodeinfo['memory_free']),
1063           "pinst": [],
1064           "sinst": [],
1065           # dictionary holding all instances this node is secondary for,
1066           # grouped by their primary node. Each key is a cluster node, and each
1067           # value is a list of instances which have the key as primary and the
1068           # current node as secondary.  this is handy to calculate N+1 memory
1069           # availability if you can only failover from a primary to its
1070           # secondary.
1071           "sinst-by-pnode": {},
1072         }
1073         # FIXME: devise a free space model for file based instances as well
1074         if vg_name is not None:
1075           if (constants.NV_VGLIST not in nresult or
1076               vg_name not in nresult[constants.NV_VGLIST]):
1077             feedback_fn("  - ERROR: node %s didn't return data for the"
1078                         " volume group '%s' - it is either missing or broken" %
1079                         (node, vg_name))
1080             bad = True
1081             continue
1082           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1083       except (ValueError, KeyError):
1084         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1085                     " from node %s" % (node,))
1086         bad = True
1087         continue
1088
1089     node_vol_should = {}
1090
1091     for instance in instancelist:
1092       feedback_fn("* Verifying instance %s" % instance)
1093       inst_config = instanceinfo[instance]
1094       result =  self._VerifyInstance(instance, inst_config, node_volume,
1095                                      node_instance, feedback_fn, n_offline)
1096       bad = bad or result
1097       inst_nodes_offline = []
1098
1099       inst_config.MapLVsByNode(node_vol_should)
1100
1101       instance_cfg[instance] = inst_config
1102
1103       pnode = inst_config.primary_node
1104       if pnode in node_info:
1105         node_info[pnode]['pinst'].append(instance)
1106       elif pnode not in n_offline:
1107         feedback_fn("  - ERROR: instance %s, connection to primary node"
1108                     " %s failed" % (instance, pnode))
1109         bad = True
1110
1111       if pnode in n_offline:
1112         inst_nodes_offline.append(pnode)
1113
1114       # If the instance is non-redundant we cannot survive losing its primary
1115       # node, so we are not N+1 compliant. On the other hand we have no disk
1116       # templates with more than one secondary so that situation is not well
1117       # supported either.
1118       # FIXME: does not support file-backed instances
1119       if len(inst_config.secondary_nodes) == 0:
1120         i_non_redundant.append(instance)
1121       elif len(inst_config.secondary_nodes) > 1:
1122         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1123                     % instance)
1124
1125       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1126         i_non_a_balanced.append(instance)
1127
1128       for snode in inst_config.secondary_nodes:
1129         if snode in node_info:
1130           node_info[snode]['sinst'].append(instance)
1131           if pnode not in node_info[snode]['sinst-by-pnode']:
1132             node_info[snode]['sinst-by-pnode'][pnode] = []
1133           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1134         elif snode not in n_offline:
1135           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1136                       " %s failed" % (instance, snode))
1137           bad = True
1138         if snode in n_offline:
1139           inst_nodes_offline.append(snode)
1140
1141       if inst_nodes_offline:
1142         # warn that the instance lives on offline nodes, and set bad=True
1143         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1144                     ", ".join(inst_nodes_offline))
1145         bad = True
1146
1147     feedback_fn("* Verifying orphan volumes")
1148     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1149                                        feedback_fn)
1150     bad = bad or result
1151
1152     feedback_fn("* Verifying remaining instances")
1153     result = self._VerifyOrphanInstances(instancelist, node_instance,
1154                                          feedback_fn)
1155     bad = bad or result
1156
1157     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1158       feedback_fn("* Verifying N+1 Memory redundancy")
1159       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1160       bad = bad or result
1161
1162     feedback_fn("* Other Notes")
1163     if i_non_redundant:
1164       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1165                   % len(i_non_redundant))
1166
1167     if i_non_a_balanced:
1168       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1169                   % len(i_non_a_balanced))
1170
1171     if n_offline:
1172       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1173
1174     if n_drained:
1175       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1176
1177     return not bad
1178
1179   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1180     """Analize the post-hooks' result
1181
1182     This method analyses the hook result, handles it, and sends some
1183     nicely-formatted feedback back to the user.
1184
1185     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1186         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1187     @param hooks_results: the results of the multi-node hooks rpc call
1188     @param feedback_fn: function used send feedback back to the caller
1189     @param lu_result: previous Exec result
1190     @return: the new Exec result, based on the previous result
1191         and hook results
1192
1193     """
1194     # We only really run POST phase hooks, and are only interested in
1195     # their results
1196     if phase == constants.HOOKS_PHASE_POST:
1197       # Used to change hooks' output to proper indentation
1198       indent_re = re.compile('^', re.M)
1199       feedback_fn("* Hooks Results")
1200       if not hooks_results:
1201         feedback_fn("  - ERROR: general communication failure")
1202         lu_result = 1
1203       else:
1204         for node_name in hooks_results:
1205           show_node_header = True
1206           res = hooks_results[node_name]
1207           if res.failed or res.data is False or not isinstance(res.data, list):
1208             if res.offline:
1209               # no need to warn or set fail return value
1210               continue
1211             feedback_fn("    Communication failure in hooks execution")
1212             lu_result = 1
1213             continue
1214           for script, hkr, output in res.data:
1215             if hkr == constants.HKR_FAIL:
1216               # The node header is only shown once, if there are
1217               # failing hooks on that node
1218               if show_node_header:
1219                 feedback_fn("  Node %s:" % node_name)
1220                 show_node_header = False
1221               feedback_fn("    ERROR: Script %s failed, output:" % script)
1222               output = indent_re.sub('      ', output)
1223               feedback_fn("%s" % output)
1224               lu_result = 1
1225
1226       return lu_result
1227
1228
1229 class LUVerifyDisks(NoHooksLU):
1230   """Verifies the cluster disks status.
1231
1232   """
1233   _OP_REQP = []
1234   REQ_BGL = False
1235
1236   def ExpandNames(self):
1237     self.needed_locks = {
1238       locking.LEVEL_NODE: locking.ALL_SET,
1239       locking.LEVEL_INSTANCE: locking.ALL_SET,
1240     }
1241     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1242
1243   def CheckPrereq(self):
1244     """Check prerequisites.
1245
1246     This has no prerequisites.
1247
1248     """
1249     pass
1250
1251   def Exec(self, feedback_fn):
1252     """Verify integrity of cluster disks.
1253
1254     """
1255     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1256
1257     vg_name = self.cfg.GetVGName()
1258     nodes = utils.NiceSort(self.cfg.GetNodeList())
1259     instances = [self.cfg.GetInstanceInfo(name)
1260                  for name in self.cfg.GetInstanceList()]
1261
1262     nv_dict = {}
1263     for inst in instances:
1264       inst_lvs = {}
1265       if (not inst.admin_up or
1266           inst.disk_template not in constants.DTS_NET_MIRROR):
1267         continue
1268       inst.MapLVsByNode(inst_lvs)
1269       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1270       for node, vol_list in inst_lvs.iteritems():
1271         for vol in vol_list:
1272           nv_dict[(node, vol)] = inst
1273
1274     if not nv_dict:
1275       return result
1276
1277     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1278
1279     to_act = set()
1280     for node in nodes:
1281       # node_volume
1282       lvs = node_lvs[node]
1283       if lvs.failed:
1284         if not lvs.offline:
1285           self.LogWarning("Connection to node %s failed: %s" %
1286                           (node, lvs.data))
1287         continue
1288       lvs = lvs.data
1289       if isinstance(lvs, basestring):
1290         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1291         res_nlvm[node] = lvs
1292         continue
1293       elif not isinstance(lvs, dict):
1294         logging.warning("Connection to node %s failed or invalid data"
1295                         " returned", node)
1296         res_nodes.append(node)
1297         continue
1298
1299       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1300         inst = nv_dict.pop((node, lv_name), None)
1301         if (not lv_online and inst is not None
1302             and inst.name not in res_instances):
1303           res_instances.append(inst.name)
1304
1305     # any leftover items in nv_dict are missing LVs, let's arrange the
1306     # data better
1307     for key, inst in nv_dict.iteritems():
1308       if inst.name not in res_missing:
1309         res_missing[inst.name] = []
1310       res_missing[inst.name].append(key)
1311
1312     return result
1313
1314
1315 class LURenameCluster(LogicalUnit):
1316   """Rename the cluster.
1317
1318   """
1319   HPATH = "cluster-rename"
1320   HTYPE = constants.HTYPE_CLUSTER
1321   _OP_REQP = ["name"]
1322
1323   def BuildHooksEnv(self):
1324     """Build hooks env.
1325
1326     """
1327     env = {
1328       "OP_TARGET": self.cfg.GetClusterName(),
1329       "NEW_NAME": self.op.name,
1330       }
1331     mn = self.cfg.GetMasterNode()
1332     return env, [mn], [mn]
1333
1334   def CheckPrereq(self):
1335     """Verify that the passed name is a valid one.
1336
1337     """
1338     hostname = utils.HostInfo(self.op.name)
1339
1340     new_name = hostname.name
1341     self.ip = new_ip = hostname.ip
1342     old_name = self.cfg.GetClusterName()
1343     old_ip = self.cfg.GetMasterIP()
1344     if new_name == old_name and new_ip == old_ip:
1345       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1346                                  " cluster has changed")
1347     if new_ip != old_ip:
1348       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1349         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1350                                    " reachable on the network. Aborting." %
1351                                    new_ip)
1352
1353     self.op.name = new_name
1354
1355   def Exec(self, feedback_fn):
1356     """Rename the cluster.
1357
1358     """
1359     clustername = self.op.name
1360     ip = self.ip
1361
1362     # shutdown the master IP
1363     master = self.cfg.GetMasterNode()
1364     result = self.rpc.call_node_stop_master(master, False)
1365     if result.failed or not result.data:
1366       raise errors.OpExecError("Could not disable the master role")
1367
1368     try:
1369       cluster = self.cfg.GetClusterInfo()
1370       cluster.cluster_name = clustername
1371       cluster.master_ip = ip
1372       self.cfg.Update(cluster)
1373
1374       # update the known hosts file
1375       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1376       node_list = self.cfg.GetNodeList()
1377       try:
1378         node_list.remove(master)
1379       except ValueError:
1380         pass
1381       result = self.rpc.call_upload_file(node_list,
1382                                          constants.SSH_KNOWN_HOSTS_FILE)
1383       for to_node, to_result in result.iteritems():
1384         if to_result.failed or not to_result.data:
1385           logging.error("Copy of file %s to node %s failed",
1386                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1387
1388     finally:
1389       result = self.rpc.call_node_start_master(master, False)
1390       if result.failed or not result.data:
1391         self.LogWarning("Could not re-enable the master role on"
1392                         " the master, please restart manually.")
1393
1394
1395 def _RecursiveCheckIfLVMBased(disk):
1396   """Check if the given disk or its children are lvm-based.
1397
1398   @type disk: L{objects.Disk}
1399   @param disk: the disk to check
1400   @rtype: booleean
1401   @return: boolean indicating whether a LD_LV dev_type was found or not
1402
1403   """
1404   if disk.children:
1405     for chdisk in disk.children:
1406       if _RecursiveCheckIfLVMBased(chdisk):
1407         return True
1408   return disk.dev_type == constants.LD_LV
1409
1410
1411 class LUSetClusterParams(LogicalUnit):
1412   """Change the parameters of the cluster.
1413
1414   """
1415   HPATH = "cluster-modify"
1416   HTYPE = constants.HTYPE_CLUSTER
1417   _OP_REQP = []
1418   REQ_BGL = False
1419
1420   def CheckArguments(self):
1421     """Check parameters
1422
1423     """
1424     if not hasattr(self.op, "candidate_pool_size"):
1425       self.op.candidate_pool_size = None
1426     if self.op.candidate_pool_size is not None:
1427       try:
1428         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1429       except (ValueError, TypeError), err:
1430         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1431                                    str(err))
1432       if self.op.candidate_pool_size < 1:
1433         raise errors.OpPrereqError("At least one master candidate needed")
1434
1435   def ExpandNames(self):
1436     # FIXME: in the future maybe other cluster params won't require checking on
1437     # all nodes to be modified.
1438     self.needed_locks = {
1439       locking.LEVEL_NODE: locking.ALL_SET,
1440     }
1441     self.share_locks[locking.LEVEL_NODE] = 1
1442
1443   def BuildHooksEnv(self):
1444     """Build hooks env.
1445
1446     """
1447     env = {
1448       "OP_TARGET": self.cfg.GetClusterName(),
1449       "NEW_VG_NAME": self.op.vg_name,
1450       }
1451     mn = self.cfg.GetMasterNode()
1452     return env, [mn], [mn]
1453
1454   def CheckPrereq(self):
1455     """Check prerequisites.
1456
1457     This checks whether the given params don't conflict and
1458     if the given volume group is valid.
1459
1460     """
1461     if self.op.vg_name is not None and not self.op.vg_name:
1462       instances = self.cfg.GetAllInstancesInfo().values()
1463       for inst in instances:
1464         for disk in inst.disks:
1465           if _RecursiveCheckIfLVMBased(disk):
1466             raise errors.OpPrereqError("Cannot disable lvm storage while"
1467                                        " lvm-based instances exist")
1468
1469     node_list = self.acquired_locks[locking.LEVEL_NODE]
1470
1471     # if vg_name not None, checks given volume group on all nodes
1472     if self.op.vg_name:
1473       vglist = self.rpc.call_vg_list(node_list)
1474       for node in node_list:
1475         if vglist[node].failed:
1476           # ignoring down node
1477           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1478           continue
1479         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1480                                               self.op.vg_name,
1481                                               constants.MIN_VG_SIZE)
1482         if vgstatus:
1483           raise errors.OpPrereqError("Error on node '%s': %s" %
1484                                      (node, vgstatus))
1485
1486     self.cluster = cluster = self.cfg.GetClusterInfo()
1487     # validate beparams changes
1488     if self.op.beparams:
1489       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1490       self.new_beparams = cluster.FillDict(
1491         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1492
1493     # hypervisor list/parameters
1494     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1495     if self.op.hvparams:
1496       if not isinstance(self.op.hvparams, dict):
1497         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1498       for hv_name, hv_dict in self.op.hvparams.items():
1499         if hv_name not in self.new_hvparams:
1500           self.new_hvparams[hv_name] = hv_dict
1501         else:
1502           self.new_hvparams[hv_name].update(hv_dict)
1503
1504     if self.op.enabled_hypervisors is not None:
1505       self.hv_list = self.op.enabled_hypervisors
1506     else:
1507       self.hv_list = cluster.enabled_hypervisors
1508
1509     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1510       # either the enabled list has changed, or the parameters have, validate
1511       for hv_name, hv_params in self.new_hvparams.items():
1512         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1513             (self.op.enabled_hypervisors and
1514              hv_name in self.op.enabled_hypervisors)):
1515           # either this is a new hypervisor, or its parameters have changed
1516           hv_class = hypervisor.GetHypervisor(hv_name)
1517           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1518           hv_class.CheckParameterSyntax(hv_params)
1519           _CheckHVParams(self, node_list, hv_name, hv_params)
1520
1521   def Exec(self, feedback_fn):
1522     """Change the parameters of the cluster.
1523
1524     """
1525     if self.op.vg_name is not None:
1526       new_volume = self.op.vg_name
1527       if not new_volume:
1528         new_volume = None
1529       if new_volume != self.cfg.GetVGName():
1530         self.cfg.SetVGName(new_volume)
1531       else:
1532         feedback_fn("Cluster LVM configuration already in desired"
1533                     " state, not changing")
1534     if self.op.hvparams:
1535       self.cluster.hvparams = self.new_hvparams
1536     if self.op.enabled_hypervisors is not None:
1537       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1538     if self.op.beparams:
1539       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1540     if self.op.candidate_pool_size is not None:
1541       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1542
1543     self.cfg.Update(self.cluster)
1544
1545     # we want to update nodes after the cluster so that if any errors
1546     # happen, we have recorded and saved the cluster info
1547     if self.op.candidate_pool_size is not None:
1548       _AdjustCandidatePool(self)
1549
1550
1551 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1552   """Distribute additional files which are part of the cluster configuration.
1553
1554   ConfigWriter takes care of distributing the config and ssconf files, but
1555   there are more files which should be distributed to all nodes. This function
1556   makes sure those are copied.
1557
1558   @param lu: calling logical unit
1559   @param additional_nodes: list of nodes not in the config to distribute to
1560
1561   """
1562   # 1. Gather target nodes
1563   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1564   dist_nodes = lu.cfg.GetNodeList()
1565   if additional_nodes is not None:
1566     dist_nodes.extend(additional_nodes)
1567   if myself.name in dist_nodes:
1568     dist_nodes.remove(myself.name)
1569   # 2. Gather files to distribute
1570   dist_files = set([constants.ETC_HOSTS,
1571                     constants.SSH_KNOWN_HOSTS_FILE,
1572                     constants.RAPI_CERT_FILE,
1573                     constants.RAPI_USERS_FILE,
1574                    ])
1575   # 3. Perform the files upload
1576   for fname in dist_files:
1577     if os.path.exists(fname):
1578       result = lu.rpc.call_upload_file(dist_nodes, fname)
1579       for to_node, to_result in result.items():
1580         if to_result.failed or not to_result.data:
1581           logging.error("Copy of file %s to node %s failed", fname, to_node)
1582
1583
1584 class LURedistributeConfig(NoHooksLU):
1585   """Force the redistribution of cluster configuration.
1586
1587   This is a very simple LU.
1588
1589   """
1590   _OP_REQP = []
1591   REQ_BGL = False
1592
1593   def ExpandNames(self):
1594     self.needed_locks = {
1595       locking.LEVEL_NODE: locking.ALL_SET,
1596     }
1597     self.share_locks[locking.LEVEL_NODE] = 1
1598
1599   def CheckPrereq(self):
1600     """Check prerequisites.
1601
1602     """
1603
1604   def Exec(self, feedback_fn):
1605     """Redistribute the configuration.
1606
1607     """
1608     self.cfg.Update(self.cfg.GetClusterInfo())
1609     _RedistributeAncillaryFiles(self)
1610
1611
1612 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1613   """Sleep and poll for an instance's disk to sync.
1614
1615   """
1616   if not instance.disks:
1617     return True
1618
1619   if not oneshot:
1620     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1621
1622   node = instance.primary_node
1623
1624   for dev in instance.disks:
1625     lu.cfg.SetDiskID(dev, node)
1626
1627   retries = 0
1628   while True:
1629     max_time = 0
1630     done = True
1631     cumul_degraded = False
1632     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1633     if rstats.failed or not rstats.data:
1634       lu.LogWarning("Can't get any data from node %s", node)
1635       retries += 1
1636       if retries >= 10:
1637         raise errors.RemoteError("Can't contact node %s for mirror data,"
1638                                  " aborting." % node)
1639       time.sleep(6)
1640       continue
1641     rstats = rstats.data
1642     retries = 0
1643     for i, mstat in enumerate(rstats):
1644       if mstat is None:
1645         lu.LogWarning("Can't compute data for node %s/%s",
1646                            node, instance.disks[i].iv_name)
1647         continue
1648       # we ignore the ldisk parameter
1649       perc_done, est_time, is_degraded, _ = mstat
1650       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1651       if perc_done is not None:
1652         done = False
1653         if est_time is not None:
1654           rem_time = "%d estimated seconds remaining" % est_time
1655           max_time = est_time
1656         else:
1657           rem_time = "no time estimate"
1658         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1659                         (instance.disks[i].iv_name, perc_done, rem_time))
1660     if done or oneshot:
1661       break
1662
1663     time.sleep(min(60, max_time))
1664
1665   if done:
1666     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1667   return not cumul_degraded
1668
1669
1670 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1671   """Check that mirrors are not degraded.
1672
1673   The ldisk parameter, if True, will change the test from the
1674   is_degraded attribute (which represents overall non-ok status for
1675   the device(s)) to the ldisk (representing the local storage status).
1676
1677   """
1678   lu.cfg.SetDiskID(dev, node)
1679   if ldisk:
1680     idx = 6
1681   else:
1682     idx = 5
1683
1684   result = True
1685   if on_primary or dev.AssembleOnSecondary():
1686     rstats = lu.rpc.call_blockdev_find(node, dev)
1687     msg = rstats.RemoteFailMsg()
1688     if msg:
1689       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1690       result = False
1691     elif not rstats.payload:
1692       lu.LogWarning("Can't find disk on node %s", node)
1693       result = False
1694     else:
1695       result = result and (not rstats.payload[idx])
1696   if dev.children:
1697     for child in dev.children:
1698       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1699
1700   return result
1701
1702
1703 class LUDiagnoseOS(NoHooksLU):
1704   """Logical unit for OS diagnose/query.
1705
1706   """
1707   _OP_REQP = ["output_fields", "names"]
1708   REQ_BGL = False
1709   _FIELDS_STATIC = utils.FieldSet()
1710   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1711
1712   def ExpandNames(self):
1713     if self.op.names:
1714       raise errors.OpPrereqError("Selective OS query not supported")
1715
1716     _CheckOutputFields(static=self._FIELDS_STATIC,
1717                        dynamic=self._FIELDS_DYNAMIC,
1718                        selected=self.op.output_fields)
1719
1720     # Lock all nodes, in shared mode
1721     # Temporary removal of locks, should be reverted later
1722     # TODO: reintroduce locks when they are lighter-weight
1723     self.needed_locks = {}
1724     #self.share_locks[locking.LEVEL_NODE] = 1
1725     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1726
1727   def CheckPrereq(self):
1728     """Check prerequisites.
1729
1730     """
1731
1732   @staticmethod
1733   def _DiagnoseByOS(node_list, rlist):
1734     """Remaps a per-node return list into an a per-os per-node dictionary
1735
1736     @param node_list: a list with the names of all nodes
1737     @param rlist: a map with node names as keys and OS objects as values
1738
1739     @rtype: dict
1740     @return: a dictionary with osnames as keys and as value another map, with
1741         nodes as keys and list of OS objects as values, eg::
1742
1743           {"debian-etch": {"node1": [<object>,...],
1744                            "node2": [<object>,]}
1745           }
1746
1747     """
1748     all_os = {}
1749     # we build here the list of nodes that didn't fail the RPC (at RPC
1750     # level), so that nodes with a non-responding node daemon don't
1751     # make all OSes invalid
1752     good_nodes = [node_name for node_name in rlist
1753                   if not rlist[node_name].failed]
1754     for node_name, nr in rlist.iteritems():
1755       if nr.failed or not nr.data:
1756         continue
1757       for os_obj in nr.data:
1758         if os_obj.name not in all_os:
1759           # build a list of nodes for this os containing empty lists
1760           # for each node in node_list
1761           all_os[os_obj.name] = {}
1762           for nname in good_nodes:
1763             all_os[os_obj.name][nname] = []
1764         all_os[os_obj.name][node_name].append(os_obj)
1765     return all_os
1766
1767   def Exec(self, feedback_fn):
1768     """Compute the list of OSes.
1769
1770     """
1771     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1772     node_data = self.rpc.call_os_diagnose(valid_nodes)
1773     if node_data == False:
1774       raise errors.OpExecError("Can't gather the list of OSes")
1775     pol = self._DiagnoseByOS(valid_nodes, node_data)
1776     output = []
1777     for os_name, os_data in pol.iteritems():
1778       row = []
1779       for field in self.op.output_fields:
1780         if field == "name":
1781           val = os_name
1782         elif field == "valid":
1783           val = utils.all([osl and osl[0] for osl in os_data.values()])
1784         elif field == "node_status":
1785           val = {}
1786           for node_name, nos_list in os_data.iteritems():
1787             val[node_name] = [(v.status, v.path) for v in nos_list]
1788         else:
1789           raise errors.ParameterError(field)
1790         row.append(val)
1791       output.append(row)
1792
1793     return output
1794
1795
1796 class LURemoveNode(LogicalUnit):
1797   """Logical unit for removing a node.
1798
1799   """
1800   HPATH = "node-remove"
1801   HTYPE = constants.HTYPE_NODE
1802   _OP_REQP = ["node_name"]
1803
1804   def BuildHooksEnv(self):
1805     """Build hooks env.
1806
1807     This doesn't run on the target node in the pre phase as a failed
1808     node would then be impossible to remove.
1809
1810     """
1811     env = {
1812       "OP_TARGET": self.op.node_name,
1813       "NODE_NAME": self.op.node_name,
1814       }
1815     all_nodes = self.cfg.GetNodeList()
1816     all_nodes.remove(self.op.node_name)
1817     return env, all_nodes, all_nodes
1818
1819   def CheckPrereq(self):
1820     """Check prerequisites.
1821
1822     This checks:
1823      - the node exists in the configuration
1824      - it does not have primary or secondary instances
1825      - it's not the master
1826
1827     Any errors are signalled by raising errors.OpPrereqError.
1828
1829     """
1830     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1831     if node is None:
1832       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1833
1834     instance_list = self.cfg.GetInstanceList()
1835
1836     masternode = self.cfg.GetMasterNode()
1837     if node.name == masternode:
1838       raise errors.OpPrereqError("Node is the master node,"
1839                                  " you need to failover first.")
1840
1841     for instance_name in instance_list:
1842       instance = self.cfg.GetInstanceInfo(instance_name)
1843       if node.name in instance.all_nodes:
1844         raise errors.OpPrereqError("Instance %s is still running on the node,"
1845                                    " please remove first." % instance_name)
1846     self.op.node_name = node.name
1847     self.node = node
1848
1849   def Exec(self, feedback_fn):
1850     """Removes the node from the cluster.
1851
1852     """
1853     node = self.node
1854     logging.info("Stopping the node daemon and removing configs from node %s",
1855                  node.name)
1856
1857     self.context.RemoveNode(node.name)
1858
1859     self.rpc.call_node_leave_cluster(node.name)
1860
1861     # Promote nodes to master candidate as needed
1862     _AdjustCandidatePool(self)
1863
1864
1865 class LUQueryNodes(NoHooksLU):
1866   """Logical unit for querying nodes.
1867
1868   """
1869   _OP_REQP = ["output_fields", "names", "use_locking"]
1870   REQ_BGL = False
1871   _FIELDS_DYNAMIC = utils.FieldSet(
1872     "dtotal", "dfree",
1873     "mtotal", "mnode", "mfree",
1874     "bootid",
1875     "ctotal", "cnodes", "csockets",
1876     )
1877
1878   _FIELDS_STATIC = utils.FieldSet(
1879     "name", "pinst_cnt", "sinst_cnt",
1880     "pinst_list", "sinst_list",
1881     "pip", "sip", "tags",
1882     "serial_no",
1883     "master_candidate",
1884     "master",
1885     "offline",
1886     "drained",
1887     )
1888
1889   def ExpandNames(self):
1890     _CheckOutputFields(static=self._FIELDS_STATIC,
1891                        dynamic=self._FIELDS_DYNAMIC,
1892                        selected=self.op.output_fields)
1893
1894     self.needed_locks = {}
1895     self.share_locks[locking.LEVEL_NODE] = 1
1896
1897     if self.op.names:
1898       self.wanted = _GetWantedNodes(self, self.op.names)
1899     else:
1900       self.wanted = locking.ALL_SET
1901
1902     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1903     self.do_locking = self.do_node_query and self.op.use_locking
1904     if self.do_locking:
1905       # if we don't request only static fields, we need to lock the nodes
1906       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1907
1908
1909   def CheckPrereq(self):
1910     """Check prerequisites.
1911
1912     """
1913     # The validation of the node list is done in the _GetWantedNodes,
1914     # if non empty, and if empty, there's no validation to do
1915     pass
1916
1917   def Exec(self, feedback_fn):
1918     """Computes the list of nodes and their attributes.
1919
1920     """
1921     all_info = self.cfg.GetAllNodesInfo()
1922     if self.do_locking:
1923       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1924     elif self.wanted != locking.ALL_SET:
1925       nodenames = self.wanted
1926       missing = set(nodenames).difference(all_info.keys())
1927       if missing:
1928         raise errors.OpExecError(
1929           "Some nodes were removed before retrieving their data: %s" % missing)
1930     else:
1931       nodenames = all_info.keys()
1932
1933     nodenames = utils.NiceSort(nodenames)
1934     nodelist = [all_info[name] for name in nodenames]
1935
1936     # begin data gathering
1937
1938     if self.do_node_query:
1939       live_data = {}
1940       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1941                                           self.cfg.GetHypervisorType())
1942       for name in nodenames:
1943         nodeinfo = node_data[name]
1944         if not nodeinfo.failed and nodeinfo.data:
1945           nodeinfo = nodeinfo.data
1946           fn = utils.TryConvert
1947           live_data[name] = {
1948             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1949             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1950             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1951             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1952             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1953             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1954             "bootid": nodeinfo.get('bootid', None),
1955             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1956             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1957             }
1958         else:
1959           live_data[name] = {}
1960     else:
1961       live_data = dict.fromkeys(nodenames, {})
1962
1963     node_to_primary = dict([(name, set()) for name in nodenames])
1964     node_to_secondary = dict([(name, set()) for name in nodenames])
1965
1966     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1967                              "sinst_cnt", "sinst_list"))
1968     if inst_fields & frozenset(self.op.output_fields):
1969       instancelist = self.cfg.GetInstanceList()
1970
1971       for instance_name in instancelist:
1972         inst = self.cfg.GetInstanceInfo(instance_name)
1973         if inst.primary_node in node_to_primary:
1974           node_to_primary[inst.primary_node].add(inst.name)
1975         for secnode in inst.secondary_nodes:
1976           if secnode in node_to_secondary:
1977             node_to_secondary[secnode].add(inst.name)
1978
1979     master_node = self.cfg.GetMasterNode()
1980
1981     # end data gathering
1982
1983     output = []
1984     for node in nodelist:
1985       node_output = []
1986       for field in self.op.output_fields:
1987         if field == "name":
1988           val = node.name
1989         elif field == "pinst_list":
1990           val = list(node_to_primary[node.name])
1991         elif field == "sinst_list":
1992           val = list(node_to_secondary[node.name])
1993         elif field == "pinst_cnt":
1994           val = len(node_to_primary[node.name])
1995         elif field == "sinst_cnt":
1996           val = len(node_to_secondary[node.name])
1997         elif field == "pip":
1998           val = node.primary_ip
1999         elif field == "sip":
2000           val = node.secondary_ip
2001         elif field == "tags":
2002           val = list(node.GetTags())
2003         elif field == "serial_no":
2004           val = node.serial_no
2005         elif field == "master_candidate":
2006           val = node.master_candidate
2007         elif field == "master":
2008           val = node.name == master_node
2009         elif field == "offline":
2010           val = node.offline
2011         elif field == "drained":
2012           val = node.drained
2013         elif self._FIELDS_DYNAMIC.Matches(field):
2014           val = live_data[node.name].get(field, None)
2015         else:
2016           raise errors.ParameterError(field)
2017         node_output.append(val)
2018       output.append(node_output)
2019
2020     return output
2021
2022
2023 class LUQueryNodeVolumes(NoHooksLU):
2024   """Logical unit for getting volumes on node(s).
2025
2026   """
2027   _OP_REQP = ["nodes", "output_fields"]
2028   REQ_BGL = False
2029   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2030   _FIELDS_STATIC = utils.FieldSet("node")
2031
2032   def ExpandNames(self):
2033     _CheckOutputFields(static=self._FIELDS_STATIC,
2034                        dynamic=self._FIELDS_DYNAMIC,
2035                        selected=self.op.output_fields)
2036
2037     self.needed_locks = {}
2038     self.share_locks[locking.LEVEL_NODE] = 1
2039     if not self.op.nodes:
2040       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2041     else:
2042       self.needed_locks[locking.LEVEL_NODE] = \
2043         _GetWantedNodes(self, self.op.nodes)
2044
2045   def CheckPrereq(self):
2046     """Check prerequisites.
2047
2048     This checks that the fields required are valid output fields.
2049
2050     """
2051     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2052
2053   def Exec(self, feedback_fn):
2054     """Computes the list of nodes and their attributes.
2055
2056     """
2057     nodenames = self.nodes
2058     volumes = self.rpc.call_node_volumes(nodenames)
2059
2060     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2061              in self.cfg.GetInstanceList()]
2062
2063     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2064
2065     output = []
2066     for node in nodenames:
2067       if node not in volumes or volumes[node].failed or not volumes[node].data:
2068         continue
2069
2070       node_vols = volumes[node].data[:]
2071       node_vols.sort(key=lambda vol: vol['dev'])
2072
2073       for vol in node_vols:
2074         node_output = []
2075         for field in self.op.output_fields:
2076           if field == "node":
2077             val = node
2078           elif field == "phys":
2079             val = vol['dev']
2080           elif field == "vg":
2081             val = vol['vg']
2082           elif field == "name":
2083             val = vol['name']
2084           elif field == "size":
2085             val = int(float(vol['size']))
2086           elif field == "instance":
2087             for inst in ilist:
2088               if node not in lv_by_node[inst]:
2089                 continue
2090               if vol['name'] in lv_by_node[inst][node]:
2091                 val = inst.name
2092                 break
2093             else:
2094               val = '-'
2095           else:
2096             raise errors.ParameterError(field)
2097           node_output.append(str(val))
2098
2099         output.append(node_output)
2100
2101     return output
2102
2103
2104 class LUAddNode(LogicalUnit):
2105   """Logical unit for adding node to the cluster.
2106
2107   """
2108   HPATH = "node-add"
2109   HTYPE = constants.HTYPE_NODE
2110   _OP_REQP = ["node_name"]
2111
2112   def BuildHooksEnv(self):
2113     """Build hooks env.
2114
2115     This will run on all nodes before, and on all nodes + the new node after.
2116
2117     """
2118     env = {
2119       "OP_TARGET": self.op.node_name,
2120       "NODE_NAME": self.op.node_name,
2121       "NODE_PIP": self.op.primary_ip,
2122       "NODE_SIP": self.op.secondary_ip,
2123       }
2124     nodes_0 = self.cfg.GetNodeList()
2125     nodes_1 = nodes_0 + [self.op.node_name, ]
2126     return env, nodes_0, nodes_1
2127
2128   def CheckPrereq(self):
2129     """Check prerequisites.
2130
2131     This checks:
2132      - the new node is not already in the config
2133      - it is resolvable
2134      - its parameters (single/dual homed) matches the cluster
2135
2136     Any errors are signalled by raising errors.OpPrereqError.
2137
2138     """
2139     node_name = self.op.node_name
2140     cfg = self.cfg
2141
2142     dns_data = utils.HostInfo(node_name)
2143
2144     node = dns_data.name
2145     primary_ip = self.op.primary_ip = dns_data.ip
2146     secondary_ip = getattr(self.op, "secondary_ip", None)
2147     if secondary_ip is None:
2148       secondary_ip = primary_ip
2149     if not utils.IsValidIP(secondary_ip):
2150       raise errors.OpPrereqError("Invalid secondary IP given")
2151     self.op.secondary_ip = secondary_ip
2152
2153     node_list = cfg.GetNodeList()
2154     if not self.op.readd and node in node_list:
2155       raise errors.OpPrereqError("Node %s is already in the configuration" %
2156                                  node)
2157     elif self.op.readd and node not in node_list:
2158       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2159
2160     for existing_node_name in node_list:
2161       existing_node = cfg.GetNodeInfo(existing_node_name)
2162
2163       if self.op.readd and node == existing_node_name:
2164         if (existing_node.primary_ip != primary_ip or
2165             existing_node.secondary_ip != secondary_ip):
2166           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2167                                      " address configuration as before")
2168         continue
2169
2170       if (existing_node.primary_ip == primary_ip or
2171           existing_node.secondary_ip == primary_ip or
2172           existing_node.primary_ip == secondary_ip or
2173           existing_node.secondary_ip == secondary_ip):
2174         raise errors.OpPrereqError("New node ip address(es) conflict with"
2175                                    " existing node %s" % existing_node.name)
2176
2177     # check that the type of the node (single versus dual homed) is the
2178     # same as for the master
2179     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2180     master_singlehomed = myself.secondary_ip == myself.primary_ip
2181     newbie_singlehomed = secondary_ip == primary_ip
2182     if master_singlehomed != newbie_singlehomed:
2183       if master_singlehomed:
2184         raise errors.OpPrereqError("The master has no private ip but the"
2185                                    " new node has one")
2186       else:
2187         raise errors.OpPrereqError("The master has a private ip but the"
2188                                    " new node doesn't have one")
2189
2190     # checks reachablity
2191     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2192       raise errors.OpPrereqError("Node not reachable by ping")
2193
2194     if not newbie_singlehomed:
2195       # check reachability from my secondary ip to newbie's secondary ip
2196       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2197                            source=myself.secondary_ip):
2198         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2199                                    " based ping to noded port")
2200
2201     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2202     mc_now, _ = self.cfg.GetMasterCandidateStats()
2203     master_candidate = mc_now < cp_size
2204
2205     self.new_node = objects.Node(name=node,
2206                                  primary_ip=primary_ip,
2207                                  secondary_ip=secondary_ip,
2208                                  master_candidate=master_candidate,
2209                                  offline=False, drained=False)
2210
2211   def Exec(self, feedback_fn):
2212     """Adds the new node to the cluster.
2213
2214     """
2215     new_node = self.new_node
2216     node = new_node.name
2217
2218     # check connectivity
2219     result = self.rpc.call_version([node])[node]
2220     result.Raise()
2221     if result.data:
2222       if constants.PROTOCOL_VERSION == result.data:
2223         logging.info("Communication to node %s fine, sw version %s match",
2224                      node, result.data)
2225       else:
2226         raise errors.OpExecError("Version mismatch master version %s,"
2227                                  " node version %s" %
2228                                  (constants.PROTOCOL_VERSION, result.data))
2229     else:
2230       raise errors.OpExecError("Cannot get version from the new node")
2231
2232     # setup ssh on node
2233     logging.info("Copy ssh key to node %s", node)
2234     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2235     keyarray = []
2236     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2237                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2238                 priv_key, pub_key]
2239
2240     for i in keyfiles:
2241       f = open(i, 'r')
2242       try:
2243         keyarray.append(f.read())
2244       finally:
2245         f.close()
2246
2247     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2248                                     keyarray[2],
2249                                     keyarray[3], keyarray[4], keyarray[5])
2250
2251     msg = result.RemoteFailMsg()
2252     if msg:
2253       raise errors.OpExecError("Cannot transfer ssh keys to the"
2254                                " new node: %s" % msg)
2255
2256     # Add node to our /etc/hosts, and add key to known_hosts
2257     utils.AddHostToEtcHosts(new_node.name)
2258
2259     if new_node.secondary_ip != new_node.primary_ip:
2260       result = self.rpc.call_node_has_ip_address(new_node.name,
2261                                                  new_node.secondary_ip)
2262       if result.failed or not result.data:
2263         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2264                                  " you gave (%s). Please fix and re-run this"
2265                                  " command." % new_node.secondary_ip)
2266
2267     node_verify_list = [self.cfg.GetMasterNode()]
2268     node_verify_param = {
2269       'nodelist': [node],
2270       # TODO: do a node-net-test as well?
2271     }
2272
2273     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2274                                        self.cfg.GetClusterName())
2275     for verifier in node_verify_list:
2276       if result[verifier].failed or not result[verifier].data:
2277         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2278                                  " for remote verification" % verifier)
2279       if result[verifier].data['nodelist']:
2280         for failed in result[verifier].data['nodelist']:
2281           feedback_fn("ssh/hostname verification failed %s -> %s" %
2282                       (verifier, result[verifier].data['nodelist'][failed]))
2283         raise errors.OpExecError("ssh/hostname verification failed.")
2284
2285     if self.op.readd:
2286       _RedistributeAncillaryFiles(self)
2287       self.context.ReaddNode(new_node)
2288     else:
2289       _RedistributeAncillaryFiles(self, additional_nodes=node)
2290       self.context.AddNode(new_node)
2291
2292
2293 class LUSetNodeParams(LogicalUnit):
2294   """Modifies the parameters of a node.
2295
2296   """
2297   HPATH = "node-modify"
2298   HTYPE = constants.HTYPE_NODE
2299   _OP_REQP = ["node_name"]
2300   REQ_BGL = False
2301
2302   def CheckArguments(self):
2303     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2304     if node_name is None:
2305       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2306     self.op.node_name = node_name
2307     _CheckBooleanOpField(self.op, 'master_candidate')
2308     _CheckBooleanOpField(self.op, 'offline')
2309     _CheckBooleanOpField(self.op, 'drained')
2310     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2311     if all_mods.count(None) == 3:
2312       raise errors.OpPrereqError("Please pass at least one modification")
2313     if all_mods.count(True) > 1:
2314       raise errors.OpPrereqError("Can't set the node into more than one"
2315                                  " state at the same time")
2316
2317   def ExpandNames(self):
2318     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2319
2320   def BuildHooksEnv(self):
2321     """Build hooks env.
2322
2323     This runs on the master node.
2324
2325     """
2326     env = {
2327       "OP_TARGET": self.op.node_name,
2328       "MASTER_CANDIDATE": str(self.op.master_candidate),
2329       "OFFLINE": str(self.op.offline),
2330       "DRAINED": str(self.op.drained),
2331       }
2332     nl = [self.cfg.GetMasterNode(),
2333           self.op.node_name]
2334     return env, nl, nl
2335
2336   def CheckPrereq(self):
2337     """Check prerequisites.
2338
2339     This only checks the instance list against the existing names.
2340
2341     """
2342     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2343
2344     if ((self.op.master_candidate == False or self.op.offline == True or
2345          self.op.drained == True) and node.master_candidate):
2346       # we will demote the node from master_candidate
2347       if self.op.node_name == self.cfg.GetMasterNode():
2348         raise errors.OpPrereqError("The master node has to be a"
2349                                    " master candidate, online and not drained")
2350       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2351       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2352       if num_candidates <= cp_size:
2353         msg = ("Not enough master candidates (desired"
2354                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2355         if self.op.force:
2356           self.LogWarning(msg)
2357         else:
2358           raise errors.OpPrereqError(msg)
2359
2360     if (self.op.master_candidate == True and
2361         ((node.offline and not self.op.offline == False) or
2362          (node.drained and not self.op.drained == False))):
2363       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2364                                  " to master_candidate" % node.name)
2365
2366     return
2367
2368   def Exec(self, feedback_fn):
2369     """Modifies a node.
2370
2371     """
2372     node = self.node
2373
2374     result = []
2375     changed_mc = False
2376
2377     if self.op.offline is not None:
2378       node.offline = self.op.offline
2379       result.append(("offline", str(self.op.offline)))
2380       if self.op.offline == True:
2381         if node.master_candidate:
2382           node.master_candidate = False
2383           changed_mc = True
2384           result.append(("master_candidate", "auto-demotion due to offline"))
2385         if node.drained:
2386           node.drained = False
2387           result.append(("drained", "clear drained status due to offline"))
2388
2389     if self.op.master_candidate is not None:
2390       node.master_candidate = self.op.master_candidate
2391       changed_mc = True
2392       result.append(("master_candidate", str(self.op.master_candidate)))
2393       if self.op.master_candidate == False:
2394         rrc = self.rpc.call_node_demote_from_mc(node.name)
2395         msg = rrc.RemoteFailMsg()
2396         if msg:
2397           self.LogWarning("Node failed to demote itself: %s" % msg)
2398
2399     if self.op.drained is not None:
2400       node.drained = self.op.drained
2401       result.append(("drained", str(self.op.drained)))
2402       if self.op.drained == True:
2403         if node.master_candidate:
2404           node.master_candidate = False
2405           changed_mc = True
2406           result.append(("master_candidate", "auto-demotion due to drain"))
2407         if node.offline:
2408           node.offline = False
2409           result.append(("offline", "clear offline status due to drain"))
2410
2411     # this will trigger configuration file update, if needed
2412     self.cfg.Update(node)
2413     # this will trigger job queue propagation or cleanup
2414     if changed_mc:
2415       self.context.ReaddNode(node)
2416
2417     return result
2418
2419
2420 class LUQueryClusterInfo(NoHooksLU):
2421   """Query cluster configuration.
2422
2423   """
2424   _OP_REQP = []
2425   REQ_BGL = False
2426
2427   def ExpandNames(self):
2428     self.needed_locks = {}
2429
2430   def CheckPrereq(self):
2431     """No prerequsites needed for this LU.
2432
2433     """
2434     pass
2435
2436   def Exec(self, feedback_fn):
2437     """Return cluster config.
2438
2439     """
2440     cluster = self.cfg.GetClusterInfo()
2441     result = {
2442       "software_version": constants.RELEASE_VERSION,
2443       "protocol_version": constants.PROTOCOL_VERSION,
2444       "config_version": constants.CONFIG_VERSION,
2445       "os_api_version": constants.OS_API_VERSION,
2446       "export_version": constants.EXPORT_VERSION,
2447       "architecture": (platform.architecture()[0], platform.machine()),
2448       "name": cluster.cluster_name,
2449       "master": cluster.master_node,
2450       "default_hypervisor": cluster.default_hypervisor,
2451       "enabled_hypervisors": cluster.enabled_hypervisors,
2452       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2453                         for hypervisor in cluster.enabled_hypervisors]),
2454       "beparams": cluster.beparams,
2455       "candidate_pool_size": cluster.candidate_pool_size,
2456       "default_bridge": cluster.default_bridge,
2457       "master_netdev": cluster.master_netdev,
2458       "volume_group_name": cluster.volume_group_name,
2459       "file_storage_dir": cluster.file_storage_dir,
2460       }
2461
2462     return result
2463
2464
2465 class LUQueryConfigValues(NoHooksLU):
2466   """Return configuration values.
2467
2468   """
2469   _OP_REQP = []
2470   REQ_BGL = False
2471   _FIELDS_DYNAMIC = utils.FieldSet()
2472   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2473
2474   def ExpandNames(self):
2475     self.needed_locks = {}
2476
2477     _CheckOutputFields(static=self._FIELDS_STATIC,
2478                        dynamic=self._FIELDS_DYNAMIC,
2479                        selected=self.op.output_fields)
2480
2481   def CheckPrereq(self):
2482     """No prerequisites.
2483
2484     """
2485     pass
2486
2487   def Exec(self, feedback_fn):
2488     """Dump a representation of the cluster config to the standard output.
2489
2490     """
2491     values = []
2492     for field in self.op.output_fields:
2493       if field == "cluster_name":
2494         entry = self.cfg.GetClusterName()
2495       elif field == "master_node":
2496         entry = self.cfg.GetMasterNode()
2497       elif field == "drain_flag":
2498         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2499       else:
2500         raise errors.ParameterError(field)
2501       values.append(entry)
2502     return values
2503
2504
2505 class LUActivateInstanceDisks(NoHooksLU):
2506   """Bring up an instance's disks.
2507
2508   """
2509   _OP_REQP = ["instance_name"]
2510   REQ_BGL = False
2511
2512   def ExpandNames(self):
2513     self._ExpandAndLockInstance()
2514     self.needed_locks[locking.LEVEL_NODE] = []
2515     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2516
2517   def DeclareLocks(self, level):
2518     if level == locking.LEVEL_NODE:
2519       self._LockInstancesNodes()
2520
2521   def CheckPrereq(self):
2522     """Check prerequisites.
2523
2524     This checks that the instance is in the cluster.
2525
2526     """
2527     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2528     assert self.instance is not None, \
2529       "Cannot retrieve locked instance %s" % self.op.instance_name
2530     _CheckNodeOnline(self, self.instance.primary_node)
2531
2532   def Exec(self, feedback_fn):
2533     """Activate the disks.
2534
2535     """
2536     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2537     if not disks_ok:
2538       raise errors.OpExecError("Cannot activate block devices")
2539
2540     return disks_info
2541
2542
2543 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2544   """Prepare the block devices for an instance.
2545
2546   This sets up the block devices on all nodes.
2547
2548   @type lu: L{LogicalUnit}
2549   @param lu: the logical unit on whose behalf we execute
2550   @type instance: L{objects.Instance}
2551   @param instance: the instance for whose disks we assemble
2552   @type ignore_secondaries: boolean
2553   @param ignore_secondaries: if true, errors on secondary nodes
2554       won't result in an error return from the function
2555   @return: False if the operation failed, otherwise a list of
2556       (host, instance_visible_name, node_visible_name)
2557       with the mapping from node devices to instance devices
2558
2559   """
2560   device_info = []
2561   disks_ok = True
2562   iname = instance.name
2563   # With the two passes mechanism we try to reduce the window of
2564   # opportunity for the race condition of switching DRBD to primary
2565   # before handshaking occured, but we do not eliminate it
2566
2567   # The proper fix would be to wait (with some limits) until the
2568   # connection has been made and drbd transitions from WFConnection
2569   # into any other network-connected state (Connected, SyncTarget,
2570   # SyncSource, etc.)
2571
2572   # 1st pass, assemble on all nodes in secondary mode
2573   for inst_disk in instance.disks:
2574     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2575       lu.cfg.SetDiskID(node_disk, node)
2576       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2577       msg = result.RemoteFailMsg()
2578       if msg:
2579         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2580                            " (is_primary=False, pass=1): %s",
2581                            inst_disk.iv_name, node, msg)
2582         if not ignore_secondaries:
2583           disks_ok = False
2584
2585   # FIXME: race condition on drbd migration to primary
2586
2587   # 2nd pass, do only the primary node
2588   for inst_disk in instance.disks:
2589     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2590       if node != instance.primary_node:
2591         continue
2592       lu.cfg.SetDiskID(node_disk, node)
2593       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2594       msg = result.RemoteFailMsg()
2595       if msg:
2596         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2597                            " (is_primary=True, pass=2): %s",
2598                            inst_disk.iv_name, node, msg)
2599         disks_ok = False
2600     device_info.append((instance.primary_node, inst_disk.iv_name,
2601                         result.payload))
2602
2603   # leave the disks configured for the primary node
2604   # this is a workaround that would be fixed better by
2605   # improving the logical/physical id handling
2606   for disk in instance.disks:
2607     lu.cfg.SetDiskID(disk, instance.primary_node)
2608
2609   return disks_ok, device_info
2610
2611
2612 def _StartInstanceDisks(lu, instance, force):
2613   """Start the disks of an instance.
2614
2615   """
2616   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2617                                            ignore_secondaries=force)
2618   if not disks_ok:
2619     _ShutdownInstanceDisks(lu, instance)
2620     if force is not None and not force:
2621       lu.proc.LogWarning("", hint="If the message above refers to a"
2622                          " secondary node,"
2623                          " you can retry the operation using '--force'.")
2624     raise errors.OpExecError("Disk consistency error")
2625
2626
2627 class LUDeactivateInstanceDisks(NoHooksLU):
2628   """Shutdown an instance's disks.
2629
2630   """
2631   _OP_REQP = ["instance_name"]
2632   REQ_BGL = False
2633
2634   def ExpandNames(self):
2635     self._ExpandAndLockInstance()
2636     self.needed_locks[locking.LEVEL_NODE] = []
2637     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2638
2639   def DeclareLocks(self, level):
2640     if level == locking.LEVEL_NODE:
2641       self._LockInstancesNodes()
2642
2643   def CheckPrereq(self):
2644     """Check prerequisites.
2645
2646     This checks that the instance is in the cluster.
2647
2648     """
2649     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2650     assert self.instance is not None, \
2651       "Cannot retrieve locked instance %s" % self.op.instance_name
2652
2653   def Exec(self, feedback_fn):
2654     """Deactivate the disks
2655
2656     """
2657     instance = self.instance
2658     _SafeShutdownInstanceDisks(self, instance)
2659
2660
2661 def _SafeShutdownInstanceDisks(lu, instance):
2662   """Shutdown block devices of an instance.
2663
2664   This function checks if an instance is running, before calling
2665   _ShutdownInstanceDisks.
2666
2667   """
2668   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2669                                       [instance.hypervisor])
2670   ins_l = ins_l[instance.primary_node]
2671   if ins_l.failed or not isinstance(ins_l.data, list):
2672     raise errors.OpExecError("Can't contact node '%s'" %
2673                              instance.primary_node)
2674
2675   if instance.name in ins_l.data:
2676     raise errors.OpExecError("Instance is running, can't shutdown"
2677                              " block devices.")
2678
2679   _ShutdownInstanceDisks(lu, instance)
2680
2681
2682 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2683   """Shutdown block devices of an instance.
2684
2685   This does the shutdown on all nodes of the instance.
2686
2687   If the ignore_primary is false, errors on the primary node are
2688   ignored.
2689
2690   """
2691   all_result = True
2692   for disk in instance.disks:
2693     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2694       lu.cfg.SetDiskID(top_disk, node)
2695       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2696       msg = result.RemoteFailMsg()
2697       if msg:
2698         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2699                       disk.iv_name, node, msg)
2700         if not ignore_primary or node != instance.primary_node:
2701           all_result = False
2702   return all_result
2703
2704
2705 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2706   """Checks if a node has enough free memory.
2707
2708   This function check if a given node has the needed amount of free
2709   memory. In case the node has less memory or we cannot get the
2710   information from the node, this function raise an OpPrereqError
2711   exception.
2712
2713   @type lu: C{LogicalUnit}
2714   @param lu: a logical unit from which we get configuration data
2715   @type node: C{str}
2716   @param node: the node to check
2717   @type reason: C{str}
2718   @param reason: string to use in the error message
2719   @type requested: C{int}
2720   @param requested: the amount of memory in MiB to check for
2721   @type hypervisor_name: C{str}
2722   @param hypervisor_name: the hypervisor to ask for memory stats
2723   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2724       we cannot check the node
2725
2726   """
2727   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2728   nodeinfo[node].Raise()
2729   free_mem = nodeinfo[node].data.get('memory_free')
2730   if not isinstance(free_mem, int):
2731     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2732                              " was '%s'" % (node, free_mem))
2733   if requested > free_mem:
2734     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2735                              " needed %s MiB, available %s MiB" %
2736                              (node, reason, requested, free_mem))
2737
2738
2739 class LUStartupInstance(LogicalUnit):
2740   """Starts an instance.
2741
2742   """
2743   HPATH = "instance-start"
2744   HTYPE = constants.HTYPE_INSTANCE
2745   _OP_REQP = ["instance_name", "force"]
2746   REQ_BGL = False
2747
2748   def ExpandNames(self):
2749     self._ExpandAndLockInstance()
2750
2751   def BuildHooksEnv(self):
2752     """Build hooks env.
2753
2754     This runs on master, primary and secondary nodes of the instance.
2755
2756     """
2757     env = {
2758       "FORCE": self.op.force,
2759       }
2760     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2761     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2762     return env, nl, nl
2763
2764   def CheckPrereq(self):
2765     """Check prerequisites.
2766
2767     This checks that the instance is in the cluster.
2768
2769     """
2770     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2771     assert self.instance is not None, \
2772       "Cannot retrieve locked instance %s" % self.op.instance_name
2773
2774     # extra beparams
2775     self.beparams = getattr(self.op, "beparams", {})
2776     if self.beparams:
2777       if not isinstance(self.beparams, dict):
2778         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2779                                    " dict" % (type(self.beparams), ))
2780       # fill the beparams dict
2781       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2782       self.op.beparams = self.beparams
2783
2784     # extra hvparams
2785     self.hvparams = getattr(self.op, "hvparams", {})
2786     if self.hvparams:
2787       if not isinstance(self.hvparams, dict):
2788         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2789                                    " dict" % (type(self.hvparams), ))
2790
2791       # check hypervisor parameter syntax (locally)
2792       cluster = self.cfg.GetClusterInfo()
2793       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2794       filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2795                                     instance.hvparams)
2796       filled_hvp.update(self.hvparams)
2797       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2798       hv_type.CheckParameterSyntax(filled_hvp)
2799       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2800       self.op.hvparams = self.hvparams
2801
2802     _CheckNodeOnline(self, instance.primary_node)
2803
2804     bep = self.cfg.GetClusterInfo().FillBE(instance)
2805     # check bridges existance
2806     _CheckInstanceBridgesExist(self, instance)
2807
2808     remote_info = self.rpc.call_instance_info(instance.primary_node,
2809                                               instance.name,
2810                                               instance.hypervisor)
2811     remote_info.Raise()
2812     if not remote_info.data:
2813       _CheckNodeFreeMemory(self, instance.primary_node,
2814                            "starting instance %s" % instance.name,
2815                            bep[constants.BE_MEMORY], instance.hypervisor)
2816
2817   def Exec(self, feedback_fn):
2818     """Start the instance.
2819
2820     """
2821     instance = self.instance
2822     force = self.op.force
2823
2824     self.cfg.MarkInstanceUp(instance.name)
2825
2826     node_current = instance.primary_node
2827
2828     _StartInstanceDisks(self, instance, force)
2829
2830     result = self.rpc.call_instance_start(node_current, instance,
2831                                           self.hvparams, self.beparams)
2832     msg = result.RemoteFailMsg()
2833     if msg:
2834       _ShutdownInstanceDisks(self, instance)
2835       raise errors.OpExecError("Could not start instance: %s" % msg)
2836
2837
2838 class LURebootInstance(LogicalUnit):
2839   """Reboot an instance.
2840
2841   """
2842   HPATH = "instance-reboot"
2843   HTYPE = constants.HTYPE_INSTANCE
2844   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2845   REQ_BGL = False
2846
2847   def ExpandNames(self):
2848     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2849                                    constants.INSTANCE_REBOOT_HARD,
2850                                    constants.INSTANCE_REBOOT_FULL]:
2851       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2852                                   (constants.INSTANCE_REBOOT_SOFT,
2853                                    constants.INSTANCE_REBOOT_HARD,
2854                                    constants.INSTANCE_REBOOT_FULL))
2855     self._ExpandAndLockInstance()
2856
2857   def BuildHooksEnv(self):
2858     """Build hooks env.
2859
2860     This runs on master, primary and secondary nodes of the instance.
2861
2862     """
2863     env = {
2864       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2865       "REBOOT_TYPE": self.op.reboot_type,
2866       }
2867     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2868     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2869     return env, nl, nl
2870
2871   def CheckPrereq(self):
2872     """Check prerequisites.
2873
2874     This checks that the instance is in the cluster.
2875
2876     """
2877     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2878     assert self.instance is not None, \
2879       "Cannot retrieve locked instance %s" % self.op.instance_name
2880
2881     _CheckNodeOnline(self, instance.primary_node)
2882
2883     # check bridges existance
2884     _CheckInstanceBridgesExist(self, instance)
2885
2886   def Exec(self, feedback_fn):
2887     """Reboot the instance.
2888
2889     """
2890     instance = self.instance
2891     ignore_secondaries = self.op.ignore_secondaries
2892     reboot_type = self.op.reboot_type
2893
2894     node_current = instance.primary_node
2895
2896     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2897                        constants.INSTANCE_REBOOT_HARD]:
2898       for disk in instance.disks:
2899         self.cfg.SetDiskID(disk, node_current)
2900       result = self.rpc.call_instance_reboot(node_current, instance,
2901                                              reboot_type)
2902       msg = result.RemoteFailMsg()
2903       if msg:
2904         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2905     else:
2906       result = self.rpc.call_instance_shutdown(node_current, instance)
2907       msg = result.RemoteFailMsg()
2908       if msg:
2909         raise errors.OpExecError("Could not shutdown instance for"
2910                                  " full reboot: %s" % msg)
2911       _ShutdownInstanceDisks(self, instance)
2912       _StartInstanceDisks(self, instance, ignore_secondaries)
2913       result = self.rpc.call_instance_start(node_current, instance, None, None)
2914       msg = result.RemoteFailMsg()
2915       if msg:
2916         _ShutdownInstanceDisks(self, instance)
2917         raise errors.OpExecError("Could not start instance for"
2918                                  " full reboot: %s" % msg)
2919
2920     self.cfg.MarkInstanceUp(instance.name)
2921
2922
2923 class LUShutdownInstance(LogicalUnit):
2924   """Shutdown an instance.
2925
2926   """
2927   HPATH = "instance-stop"
2928   HTYPE = constants.HTYPE_INSTANCE
2929   _OP_REQP = ["instance_name"]
2930   REQ_BGL = False
2931
2932   def ExpandNames(self):
2933     self._ExpandAndLockInstance()
2934
2935   def BuildHooksEnv(self):
2936     """Build hooks env.
2937
2938     This runs on master, primary and secondary nodes of the instance.
2939
2940     """
2941     env = _BuildInstanceHookEnvByObject(self, self.instance)
2942     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2943     return env, nl, nl
2944
2945   def CheckPrereq(self):
2946     """Check prerequisites.
2947
2948     This checks that the instance is in the cluster.
2949
2950     """
2951     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2952     assert self.instance is not None, \
2953       "Cannot retrieve locked instance %s" % self.op.instance_name
2954     _CheckNodeOnline(self, self.instance.primary_node)
2955
2956   def Exec(self, feedback_fn):
2957     """Shutdown the instance.
2958
2959     """
2960     instance = self.instance
2961     node_current = instance.primary_node
2962     self.cfg.MarkInstanceDown(instance.name)
2963     result = self.rpc.call_instance_shutdown(node_current, instance)
2964     msg = result.RemoteFailMsg()
2965     if msg:
2966       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2967
2968     _ShutdownInstanceDisks(self, instance)
2969
2970
2971 class LUReinstallInstance(LogicalUnit):
2972   """Reinstall an instance.
2973
2974   """
2975   HPATH = "instance-reinstall"
2976   HTYPE = constants.HTYPE_INSTANCE
2977   _OP_REQP = ["instance_name"]
2978   REQ_BGL = False
2979
2980   def ExpandNames(self):
2981     self._ExpandAndLockInstance()
2982
2983   def BuildHooksEnv(self):
2984     """Build hooks env.
2985
2986     This runs on master, primary and secondary nodes of the instance.
2987
2988     """
2989     env = _BuildInstanceHookEnvByObject(self, self.instance)
2990     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2991     return env, nl, nl
2992
2993   def CheckPrereq(self):
2994     """Check prerequisites.
2995
2996     This checks that the instance is in the cluster and is not running.
2997
2998     """
2999     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3000     assert instance is not None, \
3001       "Cannot retrieve locked instance %s" % self.op.instance_name
3002     _CheckNodeOnline(self, instance.primary_node)
3003
3004     if instance.disk_template == constants.DT_DISKLESS:
3005       raise errors.OpPrereqError("Instance '%s' has no disks" %
3006                                  self.op.instance_name)
3007     if instance.admin_up:
3008       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3009                                  self.op.instance_name)
3010     remote_info = self.rpc.call_instance_info(instance.primary_node,
3011                                               instance.name,
3012                                               instance.hypervisor)
3013     remote_info.Raise()
3014     if remote_info.data:
3015       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3016                                  (self.op.instance_name,
3017                                   instance.primary_node))
3018
3019     self.op.os_type = getattr(self.op, "os_type", None)
3020     if self.op.os_type is not None:
3021       # OS verification
3022       pnode = self.cfg.GetNodeInfo(
3023         self.cfg.ExpandNodeName(instance.primary_node))
3024       if pnode is None:
3025         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3026                                    self.op.pnode)
3027       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3028       result.Raise()
3029       if not isinstance(result.data, objects.OS):
3030         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3031                                    " primary node"  % self.op.os_type)
3032
3033     self.instance = instance
3034
3035   def Exec(self, feedback_fn):
3036     """Reinstall the instance.
3037
3038     """
3039     inst = self.instance
3040
3041     if self.op.os_type is not None:
3042       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3043       inst.os = self.op.os_type
3044       self.cfg.Update(inst)
3045
3046     _StartInstanceDisks(self, inst, None)
3047     try:
3048       feedback_fn("Running the instance OS create scripts...")
3049       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3050       msg = result.RemoteFailMsg()
3051       if msg:
3052         raise errors.OpExecError("Could not install OS for instance %s"
3053                                  " on node %s: %s" %
3054                                  (inst.name, inst.primary_node, msg))
3055     finally:
3056       _ShutdownInstanceDisks(self, inst)
3057
3058
3059 class LURenameInstance(LogicalUnit):
3060   """Rename an instance.
3061
3062   """
3063   HPATH = "instance-rename"
3064   HTYPE = constants.HTYPE_INSTANCE
3065   _OP_REQP = ["instance_name", "new_name"]
3066
3067   def BuildHooksEnv(self):
3068     """Build hooks env.
3069
3070     This runs on master, primary and secondary nodes of the instance.
3071
3072     """
3073     env = _BuildInstanceHookEnvByObject(self, self.instance)
3074     env["INSTANCE_NEW_NAME"] = self.op.new_name
3075     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3076     return env, nl, nl
3077
3078   def CheckPrereq(self):
3079     """Check prerequisites.
3080
3081     This checks that the instance is in the cluster and is not running.
3082
3083     """
3084     instance = self.cfg.GetInstanceInfo(
3085       self.cfg.ExpandInstanceName(self.op.instance_name))
3086     if instance is None:
3087       raise errors.OpPrereqError("Instance '%s' not known" %
3088                                  self.op.instance_name)
3089     _CheckNodeOnline(self, instance.primary_node)
3090
3091     if instance.admin_up:
3092       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3093                                  self.op.instance_name)
3094     remote_info = self.rpc.call_instance_info(instance.primary_node,
3095                                               instance.name,
3096                                               instance.hypervisor)
3097     remote_info.Raise()
3098     if remote_info.data:
3099       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3100                                  (self.op.instance_name,
3101                                   instance.primary_node))
3102     self.instance = instance
3103
3104     # new name verification
3105     name_info = utils.HostInfo(self.op.new_name)
3106
3107     self.op.new_name = new_name = name_info.name
3108     instance_list = self.cfg.GetInstanceList()
3109     if new_name in instance_list:
3110       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3111                                  new_name)
3112
3113     if not getattr(self.op, "ignore_ip", False):
3114       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3115         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3116                                    (name_info.ip, new_name))
3117
3118
3119   def Exec(self, feedback_fn):
3120     """Reinstall the instance.
3121
3122     """
3123     inst = self.instance
3124     old_name = inst.name
3125
3126     if inst.disk_template == constants.DT_FILE:
3127       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3128
3129     self.cfg.RenameInstance(inst.name, self.op.new_name)
3130     # Change the instance lock. This is definitely safe while we hold the BGL
3131     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3132     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3133
3134     # re-read the instance from the configuration after rename
3135     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3136
3137     if inst.disk_template == constants.DT_FILE:
3138       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3139       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3140                                                      old_file_storage_dir,
3141                                                      new_file_storage_dir)
3142       result.Raise()
3143       if not result.data:
3144         raise errors.OpExecError("Could not connect to node '%s' to rename"
3145                                  " directory '%s' to '%s' (but the instance"
3146                                  " has been renamed in Ganeti)" % (
3147                                  inst.primary_node, old_file_storage_dir,
3148                                  new_file_storage_dir))
3149
3150       if not result.data[0]:
3151         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3152                                  " (but the instance has been renamed in"
3153                                  " Ganeti)" % (old_file_storage_dir,
3154                                                new_file_storage_dir))
3155
3156     _StartInstanceDisks(self, inst, None)
3157     try:
3158       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3159                                                  old_name)
3160       msg = result.RemoteFailMsg()
3161       if msg:
3162         msg = ("Could not run OS rename script for instance %s on node %s"
3163                " (but the instance has been renamed in Ganeti): %s" %
3164                (inst.name, inst.primary_node, msg))
3165         self.proc.LogWarning(msg)
3166     finally:
3167       _ShutdownInstanceDisks(self, inst)
3168
3169
3170 class LURemoveInstance(LogicalUnit):
3171   """Remove an instance.
3172
3173   """
3174   HPATH = "instance-remove"
3175   HTYPE = constants.HTYPE_INSTANCE
3176   _OP_REQP = ["instance_name", "ignore_failures"]
3177   REQ_BGL = False
3178
3179   def ExpandNames(self):
3180     self._ExpandAndLockInstance()
3181     self.needed_locks[locking.LEVEL_NODE] = []
3182     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3183
3184   def DeclareLocks(self, level):
3185     if level == locking.LEVEL_NODE:
3186       self._LockInstancesNodes()
3187
3188   def BuildHooksEnv(self):
3189     """Build hooks env.
3190
3191     This runs on master, primary and secondary nodes of the instance.
3192
3193     """
3194     env = _BuildInstanceHookEnvByObject(self, self.instance)
3195     nl = [self.cfg.GetMasterNode()]
3196     return env, nl, nl
3197
3198   def CheckPrereq(self):
3199     """Check prerequisites.
3200
3201     This checks that the instance is in the cluster.
3202
3203     """
3204     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3205     assert self.instance is not None, \
3206       "Cannot retrieve locked instance %s" % self.op.instance_name
3207
3208   def Exec(self, feedback_fn):
3209     """Remove the instance.
3210
3211     """
3212     instance = self.instance
3213     logging.info("Shutting down instance %s on node %s",
3214                  instance.name, instance.primary_node)
3215
3216     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3217     msg = result.RemoteFailMsg()
3218     if msg:
3219       if self.op.ignore_failures:
3220         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3221       else:
3222         raise errors.OpExecError("Could not shutdown instance %s on"
3223                                  " node %s: %s" %
3224                                  (instance.name, instance.primary_node, msg))
3225
3226     logging.info("Removing block devices for instance %s", instance.name)
3227
3228     if not _RemoveDisks(self, instance):
3229       if self.op.ignore_failures:
3230         feedback_fn("Warning: can't remove instance's disks")
3231       else:
3232         raise errors.OpExecError("Can't remove instance's disks")
3233
3234     logging.info("Removing instance %s out of cluster config", instance.name)
3235
3236     self.cfg.RemoveInstance(instance.name)
3237     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3238
3239
3240 class LUQueryInstances(NoHooksLU):
3241   """Logical unit for querying instances.
3242
3243   """
3244   _OP_REQP = ["output_fields", "names", "use_locking"]
3245   REQ_BGL = False
3246   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3247                                     "admin_state",
3248                                     "disk_template", "ip", "mac", "bridge",
3249                                     "sda_size", "sdb_size", "vcpus", "tags",
3250                                     "network_port", "beparams",
3251                                     r"(disk)\.(size)/([0-9]+)",
3252                                     r"(disk)\.(sizes)", "disk_usage",
3253                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3254                                     r"(nic)\.(macs|ips|bridges)",
3255                                     r"(disk|nic)\.(count)",
3256                                     "serial_no", "hypervisor", "hvparams",] +
3257                                   ["hv/%s" % name
3258                                    for name in constants.HVS_PARAMETERS] +
3259                                   ["be/%s" % name
3260                                    for name in constants.BES_PARAMETERS])
3261   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3262
3263
3264   def ExpandNames(self):
3265     _CheckOutputFields(static=self._FIELDS_STATIC,
3266                        dynamic=self._FIELDS_DYNAMIC,
3267                        selected=self.op.output_fields)
3268
3269     self.needed_locks = {}
3270     self.share_locks[locking.LEVEL_INSTANCE] = 1
3271     self.share_locks[locking.LEVEL_NODE] = 1
3272
3273     if self.op.names:
3274       self.wanted = _GetWantedInstances(self, self.op.names)
3275     else:
3276       self.wanted = locking.ALL_SET
3277
3278     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3279     self.do_locking = self.do_node_query and self.op.use_locking
3280     if self.do_locking:
3281       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3282       self.needed_locks[locking.LEVEL_NODE] = []
3283       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3284
3285   def DeclareLocks(self, level):
3286     if level == locking.LEVEL_NODE and self.do_locking:
3287       self._LockInstancesNodes()
3288
3289   def CheckPrereq(self):
3290     """Check prerequisites.
3291
3292     """
3293     pass
3294
3295   def Exec(self, feedback_fn):
3296     """Computes the list of nodes and their attributes.
3297
3298     """
3299     all_info = self.cfg.GetAllInstancesInfo()
3300     if self.wanted == locking.ALL_SET:
3301       # caller didn't specify instance names, so ordering is not important
3302       if self.do_locking:
3303         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3304       else:
3305         instance_names = all_info.keys()
3306       instance_names = utils.NiceSort(instance_names)
3307     else:
3308       # caller did specify names, so we must keep the ordering
3309       if self.do_locking:
3310         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3311       else:
3312         tgt_set = all_info.keys()
3313       missing = set(self.wanted).difference(tgt_set)
3314       if missing:
3315         raise errors.OpExecError("Some instances were removed before"
3316                                  " retrieving their data: %s" % missing)
3317       instance_names = self.wanted
3318
3319     instance_list = [all_info[iname] for iname in instance_names]
3320
3321     # begin data gathering
3322
3323     nodes = frozenset([inst.primary_node for inst in instance_list])
3324     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3325
3326     bad_nodes = []
3327     off_nodes = []
3328     if self.do_node_query:
3329       live_data = {}
3330       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3331       for name in nodes:
3332         result = node_data[name]
3333         if result.offline:
3334           # offline nodes will be in both lists
3335           off_nodes.append(name)
3336         if result.failed:
3337           bad_nodes.append(name)
3338         else:
3339           if result.data:
3340             live_data.update(result.data)
3341             # else no instance is alive
3342     else:
3343       live_data = dict([(name, {}) for name in instance_names])
3344
3345     # end data gathering
3346
3347     HVPREFIX = "hv/"
3348     BEPREFIX = "be/"
3349     output = []
3350     for instance in instance_list:
3351       iout = []
3352       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3353       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3354       for field in self.op.output_fields:
3355         st_match = self._FIELDS_STATIC.Matches(field)
3356         if field == "name":
3357           val = instance.name
3358         elif field == "os":
3359           val = instance.os
3360         elif field == "pnode":
3361           val = instance.primary_node
3362         elif field == "snodes":
3363           val = list(instance.secondary_nodes)
3364         elif field == "admin_state":
3365           val = instance.admin_up
3366         elif field == "oper_state":
3367           if instance.primary_node in bad_nodes:
3368             val = None
3369           else:
3370             val = bool(live_data.get(instance.name))
3371         elif field == "status":
3372           if instance.primary_node in off_nodes:
3373             val = "ERROR_nodeoffline"
3374           elif instance.primary_node in bad_nodes:
3375             val = "ERROR_nodedown"
3376           else:
3377             running = bool(live_data.get(instance.name))
3378             if running:
3379               if instance.admin_up:
3380                 val = "running"
3381               else:
3382                 val = "ERROR_up"
3383             else:
3384               if instance.admin_up:
3385                 val = "ERROR_down"
3386               else:
3387                 val = "ADMIN_down"
3388         elif field == "oper_ram":
3389           if instance.primary_node in bad_nodes:
3390             val = None
3391           elif instance.name in live_data:
3392             val = live_data[instance.name].get("memory", "?")
3393           else:
3394             val = "-"
3395         elif field == "disk_template":
3396           val = instance.disk_template
3397         elif field == "ip":
3398           val = instance.nics[0].ip
3399         elif field == "bridge":
3400           val = instance.nics[0].bridge
3401         elif field == "mac":
3402           val = instance.nics[0].mac
3403         elif field == "sda_size" or field == "sdb_size":
3404           idx = ord(field[2]) - ord('a')
3405           try:
3406             val = instance.FindDisk(idx).size
3407           except errors.OpPrereqError:
3408             val = None
3409         elif field == "disk_usage": # total disk usage per node
3410           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3411           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3412         elif field == "tags":
3413           val = list(instance.GetTags())
3414         elif field == "serial_no":
3415           val = instance.serial_no
3416         elif field == "network_port":
3417           val = instance.network_port
3418         elif field == "hypervisor":
3419           val = instance.hypervisor
3420         elif field == "hvparams":
3421           val = i_hv
3422         elif (field.startswith(HVPREFIX) and
3423               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3424           val = i_hv.get(field[len(HVPREFIX):], None)
3425         elif field == "beparams":
3426           val = i_be
3427         elif (field.startswith(BEPREFIX) and
3428               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3429           val = i_be.get(field[len(BEPREFIX):], None)
3430         elif st_match and st_match.groups():
3431           # matches a variable list
3432           st_groups = st_match.groups()
3433           if st_groups and st_groups[0] == "disk":
3434             if st_groups[1] == "count":
3435               val = len(instance.disks)
3436             elif st_groups[1] == "sizes":
3437               val = [disk.size for disk in instance.disks]
3438             elif st_groups[1] == "size":
3439               try:
3440                 val = instance.FindDisk(st_groups[2]).size
3441               except errors.OpPrereqError:
3442                 val = None
3443             else:
3444               assert False, "Unhandled disk parameter"
3445           elif st_groups[0] == "nic":
3446             if st_groups[1] == "count":
3447               val = len(instance.nics)
3448             elif st_groups[1] == "macs":
3449               val = [nic.mac for nic in instance.nics]
3450             elif st_groups[1] == "ips":
3451               val = [nic.ip for nic in instance.nics]
3452             elif st_groups[1] == "bridges":
3453               val = [nic.bridge for nic in instance.nics]
3454             else:
3455               # index-based item
3456               nic_idx = int(st_groups[2])
3457               if nic_idx >= len(instance.nics):
3458                 val = None
3459               else:
3460                 if st_groups[1] == "mac":
3461                   val = instance.nics[nic_idx].mac
3462                 elif st_groups[1] == "ip":
3463                   val = instance.nics[nic_idx].ip
3464                 elif st_groups[1] == "bridge":
3465                   val = instance.nics[nic_idx].bridge
3466                 else:
3467                   assert False, "Unhandled NIC parameter"
3468           else:
3469             assert False, "Unhandled variable parameter"
3470         else:
3471           raise errors.ParameterError(field)
3472         iout.append(val)
3473       output.append(iout)
3474
3475     return output
3476
3477
3478 class LUFailoverInstance(LogicalUnit):
3479   """Failover an instance.
3480
3481   """
3482   HPATH = "instance-failover"
3483   HTYPE = constants.HTYPE_INSTANCE
3484   _OP_REQP = ["instance_name", "ignore_consistency"]
3485   REQ_BGL = False
3486
3487   def ExpandNames(self):
3488     self._ExpandAndLockInstance()
3489     self.needed_locks[locking.LEVEL_NODE] = []
3490     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3491
3492   def DeclareLocks(self, level):
3493     if level == locking.LEVEL_NODE:
3494       self._LockInstancesNodes()
3495
3496   def BuildHooksEnv(self):
3497     """Build hooks env.
3498
3499     This runs on master, primary and secondary nodes of the instance.
3500
3501     """
3502     env = {
3503       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3504       }
3505     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3506     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3507     return env, nl, nl
3508
3509   def CheckPrereq(self):
3510     """Check prerequisites.
3511
3512     This checks that the instance is in the cluster.
3513
3514     """
3515     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3516     assert self.instance is not None, \
3517       "Cannot retrieve locked instance %s" % self.op.instance_name
3518
3519     bep = self.cfg.GetClusterInfo().FillBE(instance)
3520     if instance.disk_template not in constants.DTS_NET_MIRROR:
3521       raise errors.OpPrereqError("Instance's disk layout is not"
3522                                  " network mirrored, cannot failover.")
3523
3524     secondary_nodes = instance.secondary_nodes
3525     if not secondary_nodes:
3526       raise errors.ProgrammerError("no secondary node but using "
3527                                    "a mirrored disk template")
3528
3529     target_node = secondary_nodes[0]
3530     _CheckNodeOnline(self, target_node)
3531     _CheckNodeNotDrained(self, target_node)
3532     # check memory requirements on the secondary node
3533     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3534                          instance.name, bep[constants.BE_MEMORY],
3535                          instance.hypervisor)
3536
3537     # check bridge existance
3538     brlist = [nic.bridge for nic in instance.nics]
3539     result = self.rpc.call_bridges_exist(target_node, brlist)
3540     result.Raise()
3541     if not result.data:
3542       raise errors.OpPrereqError("One or more target bridges %s does not"
3543                                  " exist on destination node '%s'" %
3544                                  (brlist, target_node))
3545
3546   def Exec(self, feedback_fn):
3547     """Failover an instance.
3548
3549     The failover is done by shutting it down on its present node and
3550     starting it on the secondary.
3551
3552     """
3553     instance = self.instance
3554
3555     source_node = instance.primary_node
3556     target_node = instance.secondary_nodes[0]
3557
3558     feedback_fn("* checking disk consistency between source and target")
3559     for dev in instance.disks:
3560       # for drbd, these are drbd over lvm
3561       if not _CheckDiskConsistency(self, dev, target_node, False):
3562         if instance.admin_up and not self.op.ignore_consistency:
3563           raise errors.OpExecError("Disk %s is degraded on target node,"
3564                                    " aborting failover." % dev.iv_name)
3565
3566     feedback_fn("* shutting down instance on source node")
3567     logging.info("Shutting down instance %s on node %s",
3568                  instance.name, source_node)
3569
3570     result = self.rpc.call_instance_shutdown(source_node, instance)
3571     msg = result.RemoteFailMsg()
3572     if msg:
3573       if self.op.ignore_consistency:
3574         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3575                              " Proceeding anyway. Please make sure node"
3576                              " %s is down. Error details: %s",
3577                              instance.name, source_node, source_node, msg)
3578       else:
3579         raise errors.OpExecError("Could not shutdown instance %s on"
3580                                  " node %s: %s" %
3581                                  (instance.name, source_node, msg))
3582
3583     feedback_fn("* deactivating the instance's disks on source node")
3584     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3585       raise errors.OpExecError("Can't shut down the instance's disks.")
3586
3587     instance.primary_node = target_node
3588     # distribute new instance config to the other nodes
3589     self.cfg.Update(instance)
3590
3591     # Only start the instance if it's marked as up
3592     if instance.admin_up:
3593       feedback_fn("* activating the instance's disks on target node")
3594       logging.info("Starting instance %s on node %s",
3595                    instance.name, target_node)
3596
3597       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3598                                                ignore_secondaries=True)
3599       if not disks_ok:
3600         _ShutdownInstanceDisks(self, instance)
3601         raise errors.OpExecError("Can't activate the instance's disks")
3602
3603       feedback_fn("* starting the instance on the target node")
3604       result = self.rpc.call_instance_start(target_node, instance, None, None)
3605       msg = result.RemoteFailMsg()
3606       if msg:
3607         _ShutdownInstanceDisks(self, instance)
3608         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3609                                  (instance.name, target_node, msg))
3610
3611
3612 class LUMigrateInstance(LogicalUnit):
3613   """Migrate an instance.
3614
3615   This is migration without shutting down, compared to the failover,
3616   which is done with shutdown.
3617
3618   """
3619   HPATH = "instance-migrate"
3620   HTYPE = constants.HTYPE_INSTANCE
3621   _OP_REQP = ["instance_name", "live", "cleanup"]
3622
3623   REQ_BGL = False
3624
3625   def ExpandNames(self):
3626     self._ExpandAndLockInstance()
3627     self.needed_locks[locking.LEVEL_NODE] = []
3628     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3629
3630   def DeclareLocks(self, level):
3631     if level == locking.LEVEL_NODE:
3632       self._LockInstancesNodes()
3633
3634   def BuildHooksEnv(self):
3635     """Build hooks env.
3636
3637     This runs on master, primary and secondary nodes of the instance.
3638
3639     """
3640     env = _BuildInstanceHookEnvByObject(self, self.instance)
3641     env["MIGRATE_LIVE"] = self.op.live
3642     env["MIGRATE_CLEANUP"] = self.op.cleanup
3643     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3644     return env, nl, nl
3645
3646   def CheckPrereq(self):
3647     """Check prerequisites.
3648
3649     This checks that the instance is in the cluster.
3650
3651     """
3652     instance = self.cfg.GetInstanceInfo(
3653       self.cfg.ExpandInstanceName(self.op.instance_name))
3654     if instance is None:
3655       raise errors.OpPrereqError("Instance '%s' not known" %
3656                                  self.op.instance_name)
3657
3658     if instance.disk_template != constants.DT_DRBD8:
3659       raise errors.OpPrereqError("Instance's disk layout is not"
3660                                  " drbd8, cannot migrate.")
3661
3662     secondary_nodes = instance.secondary_nodes
3663     if not secondary_nodes:
3664       raise errors.ConfigurationError("No secondary node but using"
3665                                       " drbd8 disk template")
3666
3667     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3668
3669     target_node = secondary_nodes[0]
3670     # check memory requirements on the secondary node
3671     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3672                          instance.name, i_be[constants.BE_MEMORY],
3673                          instance.hypervisor)
3674
3675     # check bridge existance
3676     brlist = [nic.bridge for nic in instance.nics]
3677     result = self.rpc.call_bridges_exist(target_node, brlist)
3678     if result.failed or not result.data:
3679       raise errors.OpPrereqError("One or more target bridges %s does not"
3680                                  " exist on destination node '%s'" %
3681                                  (brlist, target_node))
3682
3683     if not self.op.cleanup:
3684       _CheckNodeNotDrained(self, target_node)
3685       result = self.rpc.call_instance_migratable(instance.primary_node,
3686                                                  instance)
3687       msg = result.RemoteFailMsg()
3688       if msg:
3689         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3690                                    msg)
3691
3692     self.instance = instance
3693
3694   def _WaitUntilSync(self):
3695     """Poll with custom rpc for disk sync.
3696
3697     This uses our own step-based rpc call.
3698
3699     """
3700     self.feedback_fn("* wait until resync is done")
3701     all_done = False
3702     while not all_done:
3703       all_done = True
3704       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3705                                             self.nodes_ip,
3706                                             self.instance.disks)
3707       min_percent = 100
3708       for node, nres in result.items():
3709         msg = nres.RemoteFailMsg()
3710         if msg:
3711           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3712                                    (node, msg))
3713         node_done, node_percent = nres.payload
3714         all_done = all_done and node_done
3715         if node_percent is not None:
3716           min_percent = min(min_percent, node_percent)
3717       if not all_done:
3718         if min_percent < 100:
3719           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3720         time.sleep(2)
3721
3722   def _EnsureSecondary(self, node):
3723     """Demote a node to secondary.
3724
3725     """
3726     self.feedback_fn("* switching node %s to secondary mode" % node)
3727
3728     for dev in self.instance.disks:
3729       self.cfg.SetDiskID(dev, node)
3730
3731     result = self.rpc.call_blockdev_close(node, self.instance.name,
3732                                           self.instance.disks)
3733     msg = result.RemoteFailMsg()
3734     if msg:
3735       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3736                                " error %s" % (node, msg))
3737
3738   def _GoStandalone(self):
3739     """Disconnect from the network.
3740
3741     """
3742     self.feedback_fn("* changing into standalone mode")
3743     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3744                                                self.instance.disks)
3745     for node, nres in result.items():
3746       msg = nres.RemoteFailMsg()
3747       if msg:
3748         raise errors.OpExecError("Cannot disconnect disks node %s,"
3749                                  " error %s" % (node, msg))
3750
3751   def _GoReconnect(self, multimaster):
3752     """Reconnect to the network.
3753
3754     """
3755     if multimaster:
3756       msg = "dual-master"
3757     else:
3758       msg = "single-master"
3759     self.feedback_fn("* changing disks into %s mode" % msg)
3760     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3761                                            self.instance.disks,
3762                                            self.instance.name, multimaster)
3763     for node, nres in result.items():
3764       msg = nres.RemoteFailMsg()
3765       if msg:
3766         raise errors.OpExecError("Cannot change disks config on node %s,"
3767                                  " error: %s" % (node, msg))
3768
3769   def _ExecCleanup(self):
3770     """Try to cleanup after a failed migration.
3771
3772     The cleanup is done by:
3773       - check that the instance is running only on one node
3774         (and update the config if needed)
3775       - change disks on its secondary node to secondary
3776       - wait until disks are fully synchronized
3777       - disconnect from the network
3778       - change disks into single-master mode
3779       - wait again until disks are fully synchronized
3780
3781     """
3782     instance = self.instance
3783     target_node = self.target_node
3784     source_node = self.source_node
3785
3786     # check running on only one node
3787     self.feedback_fn("* checking where the instance actually runs"
3788                      " (if this hangs, the hypervisor might be in"
3789                      " a bad state)")
3790     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3791     for node, result in ins_l.items():
3792       result.Raise()
3793       if not isinstance(result.data, list):
3794         raise errors.OpExecError("Can't contact node '%s'" % node)
3795
3796     runningon_source = instance.name in ins_l[source_node].data
3797     runningon_target = instance.name in ins_l[target_node].data
3798
3799     if runningon_source and runningon_target:
3800       raise errors.OpExecError("Instance seems to be running on two nodes,"
3801                                " or the hypervisor is confused. You will have"
3802                                " to ensure manually that it runs only on one"
3803                                " and restart this operation.")
3804
3805     if not (runningon_source or runningon_target):
3806       raise errors.OpExecError("Instance does not seem to be running at all."
3807                                " In this case, it's safer to repair by"
3808                                " running 'gnt-instance stop' to ensure disk"
3809                                " shutdown, and then restarting it.")
3810
3811     if runningon_target:
3812       # the migration has actually succeeded, we need to update the config
3813       self.feedback_fn("* instance running on secondary node (%s),"
3814                        " updating config" % target_node)
3815       instance.primary_node = target_node
3816       self.cfg.Update(instance)
3817       demoted_node = source_node
3818     else:
3819       self.feedback_fn("* instance confirmed to be running on its"
3820                        " primary node (%s)" % source_node)
3821       demoted_node = target_node
3822
3823     self._EnsureSecondary(demoted_node)
3824     try:
3825       self._WaitUntilSync()
3826     except errors.OpExecError:
3827       # we ignore here errors, since if the device is standalone, it
3828       # won't be able to sync
3829       pass
3830     self._GoStandalone()
3831     self._GoReconnect(False)
3832     self._WaitUntilSync()
3833
3834     self.feedback_fn("* done")
3835
3836   def _RevertDiskStatus(self):
3837     """Try to revert the disk status after a failed migration.
3838
3839     """
3840     target_node = self.target_node
3841     try:
3842       self._EnsureSecondary(target_node)
3843       self._GoStandalone()
3844       self._GoReconnect(False)
3845       self._WaitUntilSync()
3846     except errors.OpExecError, err:
3847       self.LogWarning("Migration failed and I can't reconnect the"
3848                       " drives: error '%s'\n"
3849                       "Please look and recover the instance status" %
3850                       str(err))
3851
3852   def _AbortMigration(self):
3853     """Call the hypervisor code to abort a started migration.
3854
3855     """
3856     instance = self.instance
3857     target_node = self.target_node
3858     migration_info = self.migration_info
3859
3860     abort_result = self.rpc.call_finalize_migration(target_node,
3861                                                     instance,
3862                                                     migration_info,
3863                                                     False)
3864     abort_msg = abort_result.RemoteFailMsg()
3865     if abort_msg:
3866       logging.error("Aborting migration failed on target node %s: %s" %
3867                     (target_node, abort_msg))
3868       # Don't raise an exception here, as we stil have to try to revert the
3869       # disk status, even if this step failed.
3870
3871   def _ExecMigration(self):
3872     """Migrate an instance.
3873
3874     The migrate is done by:
3875       - change the disks into dual-master mode
3876       - wait until disks are fully synchronized again
3877       - migrate the instance
3878       - change disks on the new secondary node (the old primary) to secondary
3879       - wait until disks are fully synchronized
3880       - change disks into single-master mode
3881
3882     """
3883     instance = self.instance
3884     target_node = self.target_node
3885     source_node = self.source_node
3886
3887     self.feedback_fn("* checking disk consistency between source and target")
3888     for dev in instance.disks:
3889       if not _CheckDiskConsistency(self, dev, target_node, False):
3890         raise errors.OpExecError("Disk %s is degraded or not fully"
3891                                  " synchronized on target node,"
3892                                  " aborting migrate." % dev.iv_name)
3893
3894     # First get the migration information from the remote node
3895     result = self.rpc.call_migration_info(source_node, instance)
3896     msg = result.RemoteFailMsg()
3897     if msg:
3898       log_err = ("Failed fetching source migration information from %s: %s" %
3899                  (source_node, msg))
3900       logging.error(log_err)
3901       raise errors.OpExecError(log_err)
3902
3903     self.migration_info = migration_info = result.payload
3904
3905     # Then switch the disks to master/master mode
3906     self._EnsureSecondary(target_node)
3907     self._GoStandalone()
3908     self._GoReconnect(True)
3909     self._WaitUntilSync()
3910
3911     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3912     result = self.rpc.call_accept_instance(target_node,
3913                                            instance,
3914                                            migration_info,
3915                                            self.nodes_ip[target_node])
3916
3917     msg = result.RemoteFailMsg()
3918     if msg:
3919       logging.error("Instance pre-migration failed, trying to revert"
3920                     " disk status: %s", msg)
3921       self._AbortMigration()
3922       self._RevertDiskStatus()
3923       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3924                                (instance.name, msg))
3925
3926     self.feedback_fn("* migrating instance to %s" % target_node)
3927     time.sleep(10)
3928     result = self.rpc.call_instance_migrate(source_node, instance,
3929                                             self.nodes_ip[target_node],
3930                                             self.op.live)
3931     msg = result.RemoteFailMsg()
3932     if msg:
3933       logging.error("Instance migration failed, trying to revert"
3934                     " disk status: %s", msg)
3935       self._AbortMigration()
3936       self._RevertDiskStatus()
3937       raise errors.OpExecError("Could not migrate instance %s: %s" %
3938                                (instance.name, msg))
3939     time.sleep(10)
3940
3941     instance.primary_node = target_node
3942     # distribute new instance config to the other nodes
3943     self.cfg.Update(instance)
3944
3945     result = self.rpc.call_finalize_migration(target_node,
3946                                               instance,
3947                                               migration_info,
3948                                               True)
3949     msg = result.RemoteFailMsg()
3950     if msg:
3951       logging.error("Instance migration succeeded, but finalization failed:"
3952                     " %s" % msg)
3953       raise errors.OpExecError("Could not finalize instance migration: %s" %
3954                                msg)
3955
3956     self._EnsureSecondary(source_node)
3957     self._WaitUntilSync()
3958     self._GoStandalone()
3959     self._GoReconnect(False)
3960     self._WaitUntilSync()
3961
3962     self.feedback_fn("* done")
3963
3964   def Exec(self, feedback_fn):
3965     """Perform the migration.
3966
3967     """
3968     self.feedback_fn = feedback_fn
3969
3970     self.source_node = self.instance.primary_node
3971     self.target_node = self.instance.secondary_nodes[0]
3972     self.all_nodes = [self.source_node, self.target_node]
3973     self.nodes_ip = {
3974       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3975       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3976       }
3977     if self.op.cleanup:
3978       return self._ExecCleanup()
3979     else:
3980       return self._ExecMigration()
3981
3982
3983 def _CreateBlockDev(lu, node, instance, device, force_create,
3984                     info, force_open):
3985   """Create a tree of block devices on a given node.
3986
3987   If this device type has to be created on secondaries, create it and
3988   all its children.
3989
3990   If not, just recurse to children keeping the same 'force' value.
3991
3992   @param lu: the lu on whose behalf we execute
3993   @param node: the node on which to create the device
3994   @type instance: L{objects.Instance}
3995   @param instance: the instance which owns the device
3996   @type device: L{objects.Disk}
3997   @param device: the device to create
3998   @type force_create: boolean
3999   @param force_create: whether to force creation of this device; this
4000       will be change to True whenever we find a device which has
4001       CreateOnSecondary() attribute
4002   @param info: the extra 'metadata' we should attach to the device
4003       (this will be represented as a LVM tag)
4004   @type force_open: boolean
4005   @param force_open: this parameter will be passes to the
4006       L{backend.BlockdevCreate} function where it specifies
4007       whether we run on primary or not, and it affects both
4008       the child assembly and the device own Open() execution
4009
4010   """
4011   if device.CreateOnSecondary():
4012     force_create = True
4013
4014   if device.children:
4015     for child in device.children:
4016       _CreateBlockDev(lu, node, instance, child, force_create,
4017                       info, force_open)
4018
4019   if not force_create:
4020     return
4021
4022   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4023
4024
4025 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4026   """Create a single block device on a given node.
4027
4028   This will not recurse over children of the device, so they must be
4029   created in advance.
4030
4031   @param lu: the lu on whose behalf we execute
4032   @param node: the node on which to create the device
4033   @type instance: L{objects.Instance}
4034   @param instance: the instance which owns the device
4035   @type device: L{objects.Disk}
4036   @param device: the device to create
4037   @param info: the extra 'metadata' we should attach to the device
4038       (this will be represented as a LVM tag)
4039   @type force_open: boolean
4040   @param force_open: this parameter will be passes to the
4041       L{backend.BlockdevCreate} function where it specifies
4042       whether we run on primary or not, and it affects both
4043       the child assembly and the device own Open() execution
4044
4045   """
4046   lu.cfg.SetDiskID(device, node)
4047   result = lu.rpc.call_blockdev_create(node, device, device.size,
4048                                        instance.name, force_open, info)
4049   msg = result.RemoteFailMsg()
4050   if msg:
4051     raise errors.OpExecError("Can't create block device %s on"
4052                              " node %s for instance %s: %s" %
4053                              (device, node, instance.name, msg))
4054   if device.physical_id is None:
4055     device.physical_id = result.payload
4056
4057
4058 def _GenerateUniqueNames(lu, exts):
4059   """Generate a suitable LV name.
4060
4061   This will generate a logical volume name for the given instance.
4062
4063   """
4064   results = []
4065   for val in exts:
4066     new_id = lu.cfg.GenerateUniqueID()
4067     results.append("%s%s" % (new_id, val))
4068   return results
4069
4070
4071 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4072                          p_minor, s_minor):
4073   """Generate a drbd8 device complete with its children.
4074
4075   """
4076   port = lu.cfg.AllocatePort()
4077   vgname = lu.cfg.GetVGName()
4078   shared_secret = lu.cfg.GenerateDRBDSecret()
4079   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4080                           logical_id=(vgname, names[0]))
4081   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4082                           logical_id=(vgname, names[1]))
4083   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4084                           logical_id=(primary, secondary, port,
4085                                       p_minor, s_minor,
4086                                       shared_secret),
4087                           children=[dev_data, dev_meta],
4088                           iv_name=iv_name)
4089   return drbd_dev
4090
4091
4092 def _GenerateDiskTemplate(lu, template_name,
4093                           instance_name, primary_node,
4094                           secondary_nodes, disk_info,
4095                           file_storage_dir, file_driver,
4096                           base_index):
4097   """Generate the entire disk layout for a given template type.
4098
4099   """
4100   #TODO: compute space requirements
4101
4102   vgname = lu.cfg.GetVGName()
4103   disk_count = len(disk_info)
4104   disks = []
4105   if template_name == constants.DT_DISKLESS:
4106     pass
4107   elif template_name == constants.DT_PLAIN:
4108     if len(secondary_nodes) != 0:
4109       raise errors.ProgrammerError("Wrong template configuration")
4110
4111     names = _GenerateUniqueNames(lu, [".disk%d" % i
4112                                       for i in range(disk_count)])
4113     for idx, disk in enumerate(disk_info):
4114       disk_index = idx + base_index
4115       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4116                               logical_id=(vgname, names[idx]),
4117                               iv_name="disk/%d" % disk_index,
4118                               mode=disk["mode"])
4119       disks.append(disk_dev)
4120   elif template_name == constants.DT_DRBD8:
4121     if len(secondary_nodes) != 1:
4122       raise errors.ProgrammerError("Wrong template configuration")
4123     remote_node = secondary_nodes[0]
4124     minors = lu.cfg.AllocateDRBDMinor(
4125       [primary_node, remote_node] * len(disk_info), instance_name)
4126
4127     names = []
4128     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4129                                                for i in range(disk_count)]):
4130       names.append(lv_prefix + "_data")
4131       names.append(lv_prefix + "_meta")
4132     for idx, disk in enumerate(disk_info):
4133       disk_index = idx + base_index
4134       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4135                                       disk["size"], names[idx*2:idx*2+2],
4136                                       "disk/%d" % disk_index,
4137                                       minors[idx*2], minors[idx*2+1])
4138       disk_dev.mode = disk["mode"]
4139       disks.append(disk_dev)
4140   elif template_name == constants.DT_FILE:
4141     if len(secondary_nodes) != 0:
4142       raise errors.ProgrammerError("Wrong template configuration")
4143
4144     for idx, disk in enumerate(disk_info):
4145       disk_index = idx + base_index
4146       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4147                               iv_name="disk/%d" % disk_index,
4148                               logical_id=(file_driver,
4149                                           "%s/disk%d" % (file_storage_dir,
4150                                                          disk_index)),
4151                               mode=disk["mode"])
4152       disks.append(disk_dev)
4153   else:
4154     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4155   return disks
4156
4157
4158 def _GetInstanceInfoText(instance):
4159   """Compute that text that should be added to the disk's metadata.
4160
4161   """
4162   return "originstname+%s" % instance.name
4163
4164
4165 def _CreateDisks(lu, instance):
4166   """Create all disks for an instance.
4167
4168   This abstracts away some work from AddInstance.
4169
4170   @type lu: L{LogicalUnit}
4171   @param lu: the logical unit on whose behalf we execute
4172   @type instance: L{objects.Instance}
4173   @param instance: the instance whose disks we should create
4174   @rtype: boolean
4175   @return: the success of the creation
4176
4177   """
4178   info = _GetInstanceInfoText(instance)
4179   pnode = instance.primary_node
4180
4181   if instance.disk_template == constants.DT_FILE:
4182     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4183     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4184
4185     if result.failed or not result.data:
4186       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4187
4188     if not result.data[0]:
4189       raise errors.OpExecError("Failed to create directory '%s'" %
4190                                file_storage_dir)
4191
4192   # Note: this needs to be kept in sync with adding of disks in
4193   # LUSetInstanceParams
4194   for device in instance.disks:
4195     logging.info("Creating volume %s for instance %s",
4196                  device.iv_name, instance.name)
4197     #HARDCODE
4198     for node in instance.all_nodes:
4199       f_create = node == pnode
4200       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4201
4202
4203 def _RemoveDisks(lu, instance):
4204   """Remove all disks for an instance.
4205
4206   This abstracts away some work from `AddInstance()` and
4207   `RemoveInstance()`. Note that in case some of the devices couldn't
4208   be removed, the removal will continue with the other ones (compare
4209   with `_CreateDisks()`).
4210
4211   @type lu: L{LogicalUnit}
4212   @param lu: the logical unit on whose behalf we execute
4213   @type instance: L{objects.Instance}
4214   @param instance: the instance whose disks we should remove
4215   @rtype: boolean
4216   @return: the success of the removal
4217
4218   """
4219   logging.info("Removing block devices for instance %s", instance.name)
4220
4221   all_result = True
4222   for device in instance.disks:
4223     for node, disk in device.ComputeNodeTree(instance.primary_node):
4224       lu.cfg.SetDiskID(disk, node)
4225       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4226       if msg:
4227         lu.LogWarning("Could not remove block device %s on node %s,"
4228                       " continuing anyway: %s", device.iv_name, node, msg)
4229         all_result = False
4230
4231   if instance.disk_template == constants.DT_FILE:
4232     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4233     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4234                                                  file_storage_dir)
4235     if result.failed or not result.data:
4236       logging.error("Could not remove directory '%s'", file_storage_dir)
4237       all_result = False
4238
4239   return all_result
4240
4241
4242 def _ComputeDiskSize(disk_template, disks):
4243   """Compute disk size requirements in the volume group
4244
4245   """
4246   # Required free disk space as a function of disk and swap space
4247   req_size_dict = {
4248     constants.DT_DISKLESS: None,
4249     constants.DT_PLAIN: sum(d["size"] for d in disks),
4250     # 128 MB are added for drbd metadata for each disk
4251     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4252     constants.DT_FILE: None,
4253   }
4254
4255   if disk_template not in req_size_dict:
4256     raise errors.ProgrammerError("Disk template '%s' size requirement"
4257                                  " is unknown" %  disk_template)
4258
4259   return req_size_dict[disk_template]
4260
4261
4262 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4263   """Hypervisor parameter validation.
4264
4265   This function abstract the hypervisor parameter validation to be
4266   used in both instance create and instance modify.
4267
4268   @type lu: L{LogicalUnit}
4269   @param lu: the logical unit for which we check
4270   @type nodenames: list
4271   @param nodenames: the list of nodes on which we should check
4272   @type hvname: string
4273   @param hvname: the name of the hypervisor we should use
4274   @type hvparams: dict
4275   @param hvparams: the parameters which we need to check
4276   @raise errors.OpPrereqError: if the parameters are not valid
4277
4278   """
4279   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4280                                                   hvname,
4281                                                   hvparams)
4282   for node in nodenames:
4283     info = hvinfo[node]
4284     if info.offline:
4285       continue
4286     msg = info.RemoteFailMsg()
4287     if msg:
4288       raise errors.OpPrereqError("Hypervisor parameter validation"
4289                                  " failed on node %s: %s" % (node, msg))
4290
4291
4292 class LUCreateInstance(LogicalUnit):
4293   """Create an instance.
4294
4295   """
4296   HPATH = "instance-add"
4297   HTYPE = constants.HTYPE_INSTANCE
4298   _OP_REQP = ["instance_name", "disks", "disk_template",
4299               "mode", "start",
4300               "wait_for_sync", "ip_check", "nics",
4301               "hvparams", "beparams"]
4302   REQ_BGL = False
4303
4304   def _ExpandNode(self, node):
4305     """Expands and checks one node name.
4306
4307     """
4308     node_full = self.cfg.ExpandNodeName(node)
4309     if node_full is None:
4310       raise errors.OpPrereqError("Unknown node %s" % node)
4311     return node_full
4312
4313   def ExpandNames(self):
4314     """ExpandNames for CreateInstance.
4315
4316     Figure out the right locks for instance creation.
4317
4318     """
4319     self.needed_locks = {}
4320
4321     # set optional parameters to none if they don't exist
4322     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4323       if not hasattr(self.op, attr):
4324         setattr(self.op, attr, None)
4325
4326     # cheap checks, mostly valid constants given
4327
4328     # verify creation mode
4329     if self.op.mode not in (constants.INSTANCE_CREATE,
4330                             constants.INSTANCE_IMPORT):
4331       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4332                                  self.op.mode)
4333
4334     # disk template and mirror node verification
4335     if self.op.disk_template not in constants.DISK_TEMPLATES:
4336       raise errors.OpPrereqError("Invalid disk template name")
4337
4338     if self.op.hypervisor is None:
4339       self.op.hypervisor = self.cfg.GetHypervisorType()
4340
4341     cluster = self.cfg.GetClusterInfo()
4342     enabled_hvs = cluster.enabled_hypervisors
4343     if self.op.hypervisor not in enabled_hvs:
4344       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4345                                  " cluster (%s)" % (self.op.hypervisor,
4346                                   ",".join(enabled_hvs)))
4347
4348     # check hypervisor parameter syntax (locally)
4349     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4350     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4351                                   self.op.hvparams)
4352     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4353     hv_type.CheckParameterSyntax(filled_hvp)
4354
4355     # fill and remember the beparams dict
4356     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4357     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4358                                     self.op.beparams)
4359
4360     #### instance parameters check
4361
4362     # instance name verification
4363     hostname1 = utils.HostInfo(self.op.instance_name)
4364     self.op.instance_name = instance_name = hostname1.name
4365
4366     # this is just a preventive check, but someone might still add this
4367     # instance in the meantime, and creation will fail at lock-add time
4368     if instance_name in self.cfg.GetInstanceList():
4369       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4370                                  instance_name)
4371
4372     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4373
4374     # NIC buildup
4375     self.nics = []
4376     for nic in self.op.nics:
4377       # ip validity checks
4378       ip = nic.get("ip", None)
4379       if ip is None or ip.lower() == "none":
4380         nic_ip = None
4381       elif ip.lower() == constants.VALUE_AUTO:
4382         nic_ip = hostname1.ip
4383       else:
4384         if not utils.IsValidIP(ip):
4385           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4386                                      " like a valid IP" % ip)
4387         nic_ip = ip
4388
4389       # MAC address verification
4390       mac = nic.get("mac", constants.VALUE_AUTO)
4391       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4392         if not utils.IsValidMac(mac.lower()):
4393           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4394                                      mac)
4395       # bridge verification
4396       bridge = nic.get("bridge", None)
4397       if bridge is None:
4398         bridge = self.cfg.GetDefBridge()
4399       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4400
4401     # disk checks/pre-build
4402     self.disks = []
4403     for disk in self.op.disks:
4404       mode = disk.get("mode", constants.DISK_RDWR)
4405       if mode not in constants.DISK_ACCESS_SET:
4406         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4407                                    mode)
4408       size = disk.get("size", None)
4409       if size is None:
4410         raise errors.OpPrereqError("Missing disk size")
4411       try:
4412         size = int(size)
4413       except ValueError:
4414         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4415       self.disks.append({"size": size, "mode": mode})
4416
4417     # used in CheckPrereq for ip ping check
4418     self.check_ip = hostname1.ip
4419
4420     # file storage checks
4421     if (self.op.file_driver and
4422         not self.op.file_driver in constants.FILE_DRIVER):
4423       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4424                                  self.op.file_driver)
4425
4426     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4427       raise errors.OpPrereqError("File storage directory path not absolute")
4428
4429     ### Node/iallocator related checks
4430     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4431       raise errors.OpPrereqError("One and only one of iallocator and primary"
4432                                  " node must be given")
4433
4434     if self.op.iallocator:
4435       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4436     else:
4437       self.op.pnode = self._ExpandNode(self.op.pnode)
4438       nodelist = [self.op.pnode]
4439       if self.op.snode is not None:
4440         self.op.snode = self._ExpandNode(self.op.snode)
4441         nodelist.append(self.op.snode)
4442       self.needed_locks[locking.LEVEL_NODE] = nodelist
4443
4444     # in case of import lock the source node too
4445     if self.op.mode == constants.INSTANCE_IMPORT:
4446       src_node = getattr(self.op, "src_node", None)
4447       src_path = getattr(self.op, "src_path", None)
4448
4449       if src_path is None:
4450         self.op.src_path = src_path = self.op.instance_name
4451
4452       if src_node is None:
4453         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4454         self.op.src_node = None
4455         if os.path.isabs(src_path):
4456           raise errors.OpPrereqError("Importing an instance from an absolute"
4457                                      " path requires a source node option.")
4458       else:
4459         self.op.src_node = src_node = self._ExpandNode(src_node)
4460         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4461           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4462         if not os.path.isabs(src_path):
4463           self.op.src_path = src_path = \
4464             os.path.join(constants.EXPORT_DIR, src_path)
4465
4466     else: # INSTANCE_CREATE
4467       if getattr(self.op, "os_type", None) is None:
4468         raise errors.OpPrereqError("No guest OS specified")
4469
4470   def _RunAllocator(self):
4471     """Run the allocator based on input opcode.
4472
4473     """
4474     nics = [n.ToDict() for n in self.nics]
4475     ial = IAllocator(self,
4476                      mode=constants.IALLOCATOR_MODE_ALLOC,
4477                      name=self.op.instance_name,
4478                      disk_template=self.op.disk_template,
4479                      tags=[],
4480                      os=self.op.os_type,
4481                      vcpus=self.be_full[constants.BE_VCPUS],
4482                      mem_size=self.be_full[constants.BE_MEMORY],
4483                      disks=self.disks,
4484                      nics=nics,
4485                      hypervisor=self.op.hypervisor,
4486                      )
4487
4488     ial.Run(self.op.iallocator)
4489
4490     if not ial.success:
4491       raise errors.OpPrereqError("Can't compute nodes using"
4492                                  " iallocator '%s': %s" % (self.op.iallocator,
4493                                                            ial.info))
4494     if len(ial.nodes) != ial.required_nodes:
4495       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4496                                  " of nodes (%s), required %s" %
4497                                  (self.op.iallocator, len(ial.nodes),
4498                                   ial.required_nodes))
4499     self.op.pnode = ial.nodes[0]
4500     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4501                  self.op.instance_name, self.op.iallocator,
4502                  ", ".join(ial.nodes))
4503     if ial.required_nodes == 2:
4504       self.op.snode = ial.nodes[1]
4505
4506   def BuildHooksEnv(self):
4507     """Build hooks env.
4508
4509     This runs on master, primary and secondary nodes of the instance.
4510
4511     """
4512     env = {
4513       "ADD_MODE": self.op.mode,
4514       }
4515     if self.op.mode == constants.INSTANCE_IMPORT:
4516       env["SRC_NODE"] = self.op.src_node
4517       env["SRC_PATH"] = self.op.src_path
4518       env["SRC_IMAGES"] = self.src_images
4519
4520     env.update(_BuildInstanceHookEnv(
4521       name=self.op.instance_name,
4522       primary_node=self.op.pnode,
4523       secondary_nodes=self.secondaries,
4524       status=self.op.start,
4525       os_type=self.op.os_type,
4526       memory=self.be_full[constants.BE_MEMORY],
4527       vcpus=self.be_full[constants.BE_VCPUS],
4528       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4529       disk_template=self.op.disk_template,
4530       disks=[(d["size"], d["mode"]) for d in self.disks],
4531     ))
4532
4533     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4534           self.secondaries)
4535     return env, nl, nl
4536
4537
4538   def CheckPrereq(self):
4539     """Check prerequisites.
4540
4541     """
4542     if (not self.cfg.GetVGName() and
4543         self.op.disk_template not in constants.DTS_NOT_LVM):
4544       raise errors.OpPrereqError("Cluster does not support lvm-based"
4545                                  " instances")
4546
4547     if self.op.mode == constants.INSTANCE_IMPORT:
4548       src_node = self.op.src_node
4549       src_path = self.op.src_path
4550
4551       if src_node is None:
4552         exp_list = self.rpc.call_export_list(
4553           self.acquired_locks[locking.LEVEL_NODE])
4554         found = False
4555         for node in exp_list:
4556           if not exp_list[node].failed and src_path in exp_list[node].data:
4557             found = True
4558             self.op.src_node = src_node = node
4559             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4560                                                        src_path)
4561             break
4562         if not found:
4563           raise errors.OpPrereqError("No export found for relative path %s" %
4564                                       src_path)
4565
4566       _CheckNodeOnline(self, src_node)
4567       result = self.rpc.call_export_info(src_node, src_path)
4568       result.Raise()
4569       if not result.data:
4570         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4571
4572       export_info = result.data
4573       if not export_info.has_section(constants.INISECT_EXP):
4574         raise errors.ProgrammerError("Corrupted export config")
4575
4576       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4577       if (int(ei_version) != constants.EXPORT_VERSION):
4578         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4579                                    (ei_version, constants.EXPORT_VERSION))
4580
4581       # Check that the new instance doesn't have less disks than the export
4582       instance_disks = len(self.disks)
4583       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4584       if instance_disks < export_disks:
4585         raise errors.OpPrereqError("Not enough disks to import."
4586                                    " (instance: %d, export: %d)" %
4587                                    (instance_disks, export_disks))
4588
4589       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4590       disk_images = []
4591       for idx in range(export_disks):
4592         option = 'disk%d_dump' % idx
4593         if export_info.has_option(constants.INISECT_INS, option):
4594           # FIXME: are the old os-es, disk sizes, etc. useful?
4595           export_name = export_info.get(constants.INISECT_INS, option)
4596           image = os.path.join(src_path, export_name)
4597           disk_images.append(image)
4598         else:
4599           disk_images.append(False)
4600
4601       self.src_images = disk_images
4602
4603       old_name = export_info.get(constants.INISECT_INS, 'name')
4604       # FIXME: int() here could throw a ValueError on broken exports
4605       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4606       if self.op.instance_name == old_name:
4607         for idx, nic in enumerate(self.nics):
4608           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4609             nic_mac_ini = 'nic%d_mac' % idx
4610             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4611
4612     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4613     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4614     if self.op.start and not self.op.ip_check:
4615       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4616                                  " adding an instance in start mode")
4617
4618     if self.op.ip_check:
4619       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4620         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4621                                    (self.check_ip, self.op.instance_name))
4622
4623     #### mac address generation
4624     # By generating here the mac address both the allocator and the hooks get
4625     # the real final mac address rather than the 'auto' or 'generate' value.
4626     # There is a race condition between the generation and the instance object
4627     # creation, which means that we know the mac is valid now, but we're not
4628     # sure it will be when we actually add the instance. If things go bad
4629     # adding the instance will abort because of a duplicate mac, and the
4630     # creation job will fail.
4631     for nic in self.nics:
4632       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4633         nic.mac = self.cfg.GenerateMAC()
4634
4635     #### allocator run
4636
4637     if self.op.iallocator is not None:
4638       self._RunAllocator()
4639
4640     #### node related checks
4641
4642     # check primary node
4643     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4644     assert self.pnode is not None, \
4645       "Cannot retrieve locked node %s" % self.op.pnode
4646     if pnode.offline:
4647       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4648                                  pnode.name)
4649     if pnode.drained:
4650       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4651                                  pnode.name)
4652
4653     self.secondaries = []
4654
4655     # mirror node verification
4656     if self.op.disk_template in constants.DTS_NET_MIRROR:
4657       if self.op.snode is None:
4658         raise errors.OpPrereqError("The networked disk templates need"
4659                                    " a mirror node")
4660       if self.op.snode == pnode.name:
4661         raise errors.OpPrereqError("The secondary node cannot be"
4662                                    " the primary node.")
4663       _CheckNodeOnline(self, self.op.snode)
4664       _CheckNodeNotDrained(self, self.op.snode)
4665       self.secondaries.append(self.op.snode)
4666
4667     nodenames = [pnode.name] + self.secondaries
4668
4669     req_size = _ComputeDiskSize(self.op.disk_template,
4670                                 self.disks)
4671
4672     # Check lv size requirements
4673     if req_size is not None:
4674       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4675                                          self.op.hypervisor)
4676       for node in nodenames:
4677         info = nodeinfo[node]
4678         info.Raise()
4679         info = info.data
4680         if not info:
4681           raise errors.OpPrereqError("Cannot get current information"
4682                                      " from node '%s'" % node)
4683         vg_free = info.get('vg_free', None)
4684         if not isinstance(vg_free, int):
4685           raise errors.OpPrereqError("Can't compute free disk space on"
4686                                      " node %s" % node)
4687         if req_size > info['vg_free']:
4688           raise errors.OpPrereqError("Not enough disk space on target node %s."
4689                                      " %d MB available, %d MB required" %
4690                                      (node, info['vg_free'], req_size))
4691
4692     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4693
4694     # os verification
4695     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4696     result.Raise()
4697     if not isinstance(result.data, objects.OS):
4698       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4699                                  " primary node"  % self.op.os_type)
4700
4701     # bridge check on primary node
4702     bridges = [n.bridge for n in self.nics]
4703     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4704     result.Raise()
4705     if not result.data:
4706       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4707                                  " exist on destination node '%s'" %
4708                                  (",".join(bridges), pnode.name))
4709
4710     # memory check on primary node
4711     if self.op.start:
4712       _CheckNodeFreeMemory(self, self.pnode.name,
4713                            "creating instance %s" % self.op.instance_name,
4714                            self.be_full[constants.BE_MEMORY],
4715                            self.op.hypervisor)
4716
4717   def Exec(self, feedback_fn):
4718     """Create and add the instance to the cluster.
4719
4720     """
4721     instance = self.op.instance_name
4722     pnode_name = self.pnode.name
4723
4724     ht_kind = self.op.hypervisor
4725     if ht_kind in constants.HTS_REQ_PORT:
4726       network_port = self.cfg.AllocatePort()
4727     else:
4728       network_port = None
4729
4730     ##if self.op.vnc_bind_address is None:
4731     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4732
4733     # this is needed because os.path.join does not accept None arguments
4734     if self.op.file_storage_dir is None:
4735       string_file_storage_dir = ""
4736     else:
4737       string_file_storage_dir = self.op.file_storage_dir
4738
4739     # build the full file storage dir path
4740     file_storage_dir = os.path.normpath(os.path.join(
4741                                         self.cfg.GetFileStorageDir(),
4742                                         string_file_storage_dir, instance))
4743
4744
4745     disks = _GenerateDiskTemplate(self,
4746                                   self.op.disk_template,
4747                                   instance, pnode_name,
4748                                   self.secondaries,
4749                                   self.disks,
4750                                   file_storage_dir,
4751                                   self.op.file_driver,
4752                                   0)
4753
4754     iobj = objects.Instance(name=instance, os=self.op.os_type,
4755                             primary_node=pnode_name,
4756                             nics=self.nics, disks=disks,
4757                             disk_template=self.op.disk_template,
4758                             admin_up=False,
4759                             network_port=network_port,
4760                             beparams=self.op.beparams,
4761                             hvparams=self.op.hvparams,
4762                             hypervisor=self.op.hypervisor,
4763                             )
4764
4765     feedback_fn("* creating instance disks...")
4766     try:
4767       _CreateDisks(self, iobj)
4768     except errors.OpExecError:
4769       self.LogWarning("Device creation failed, reverting...")
4770       try:
4771         _RemoveDisks(self, iobj)
4772       finally:
4773         self.cfg.ReleaseDRBDMinors(instance)
4774         raise
4775
4776     feedback_fn("adding instance %s to cluster config" % instance)
4777
4778     self.cfg.AddInstance(iobj)
4779     # Declare that we don't want to remove the instance lock anymore, as we've
4780     # added the instance to the config
4781     del self.remove_locks[locking.LEVEL_INSTANCE]
4782     # Unlock all the nodes
4783     if self.op.mode == constants.INSTANCE_IMPORT:
4784       nodes_keep = [self.op.src_node]
4785       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4786                        if node != self.op.src_node]
4787       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4788       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4789     else:
4790       self.context.glm.release(locking.LEVEL_NODE)
4791       del self.acquired_locks[locking.LEVEL_NODE]
4792
4793     if self.op.wait_for_sync:
4794       disk_abort = not _WaitForSync(self, iobj)
4795     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4796       # make sure the disks are not degraded (still sync-ing is ok)
4797       time.sleep(15)
4798       feedback_fn("* checking mirrors status")
4799       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4800     else:
4801       disk_abort = False
4802
4803     if disk_abort:
4804       _RemoveDisks(self, iobj)
4805       self.cfg.RemoveInstance(iobj.name)
4806       # Make sure the instance lock gets removed
4807       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4808       raise errors.OpExecError("There are some degraded disks for"
4809                                " this instance")
4810
4811     feedback_fn("creating os for instance %s on node %s" %
4812                 (instance, pnode_name))
4813
4814     if iobj.disk_template != constants.DT_DISKLESS:
4815       if self.op.mode == constants.INSTANCE_CREATE:
4816         feedback_fn("* running the instance OS create scripts...")
4817         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4818         msg = result.RemoteFailMsg()
4819         if msg:
4820           raise errors.OpExecError("Could not add os for instance %s"
4821                                    " on node %s: %s" %
4822                                    (instance, pnode_name, msg))
4823
4824       elif self.op.mode == constants.INSTANCE_IMPORT:
4825         feedback_fn("* running the instance OS import scripts...")
4826         src_node = self.op.src_node
4827         src_images = self.src_images
4828         cluster_name = self.cfg.GetClusterName()
4829         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4830                                                          src_node, src_images,
4831                                                          cluster_name)
4832         import_result.Raise()
4833         for idx, result in enumerate(import_result.data):
4834           if not result:
4835             self.LogWarning("Could not import the image %s for instance"
4836                             " %s, disk %d, on node %s" %
4837                             (src_images[idx], instance, idx, pnode_name))
4838       else:
4839         # also checked in the prereq part
4840         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4841                                      % self.op.mode)
4842
4843     if self.op.start:
4844       iobj.admin_up = True
4845       self.cfg.Update(iobj)
4846       logging.info("Starting instance %s on node %s", instance, pnode_name)
4847       feedback_fn("* starting instance...")
4848       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4849       msg = result.RemoteFailMsg()
4850       if msg:
4851         raise errors.OpExecError("Could not start instance: %s" % msg)
4852
4853
4854 class LUConnectConsole(NoHooksLU):
4855   """Connect to an instance's console.
4856
4857   This is somewhat special in that it returns the command line that
4858   you need to run on the master node in order to connect to the
4859   console.
4860
4861   """
4862   _OP_REQP = ["instance_name"]
4863   REQ_BGL = False
4864
4865   def ExpandNames(self):
4866     self._ExpandAndLockInstance()
4867
4868   def CheckPrereq(self):
4869     """Check prerequisites.
4870
4871     This checks that the instance is in the cluster.
4872
4873     """
4874     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4875     assert self.instance is not None, \
4876       "Cannot retrieve locked instance %s" % self.op.instance_name
4877     _CheckNodeOnline(self, self.instance.primary_node)
4878
4879   def Exec(self, feedback_fn):
4880     """Connect to the console of an instance
4881
4882     """
4883     instance = self.instance
4884     node = instance.primary_node
4885
4886     node_insts = self.rpc.call_instance_list([node],
4887                                              [instance.hypervisor])[node]
4888     node_insts.Raise()
4889
4890     if instance.name not in node_insts.data:
4891       raise errors.OpExecError("Instance %s is not running." % instance.name)
4892
4893     logging.debug("Connecting to console of %s on %s", instance.name, node)
4894
4895     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4896     cluster = self.cfg.GetClusterInfo()
4897     # beparams and hvparams are passed separately, to avoid editing the
4898     # instance and then saving the defaults in the instance itself.
4899     hvparams = cluster.FillHV(instance)
4900     beparams = cluster.FillBE(instance)
4901     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4902
4903     # build ssh cmdline
4904     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4905
4906
4907 class LUReplaceDisks(LogicalUnit):
4908   """Replace the disks of an instance.
4909
4910   """
4911   HPATH = "mirrors-replace"
4912   HTYPE = constants.HTYPE_INSTANCE
4913   _OP_REQP = ["instance_name", "mode", "disks"]
4914   REQ_BGL = False
4915
4916   def CheckArguments(self):
4917     if not hasattr(self.op, "remote_node"):
4918       self.op.remote_node = None
4919     if not hasattr(self.op, "iallocator"):
4920       self.op.iallocator = None
4921
4922     # check for valid parameter combination
4923     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4924     if self.op.mode == constants.REPLACE_DISK_CHG:
4925       if cnt == 2:
4926         raise errors.OpPrereqError("When changing the secondary either an"
4927                                    " iallocator script must be used or the"
4928                                    " new node given")
4929       elif cnt == 0:
4930         raise errors.OpPrereqError("Give either the iallocator or the new"
4931                                    " secondary, not both")
4932     else: # not replacing the secondary
4933       if cnt != 2:
4934         raise errors.OpPrereqError("The iallocator and new node options can"
4935                                    " be used only when changing the"
4936                                    " secondary node")
4937
4938   def ExpandNames(self):
4939     self._ExpandAndLockInstance()
4940
4941     if self.op.iallocator is not None:
4942       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4943     elif self.op.remote_node is not None:
4944       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4945       if remote_node is None:
4946         raise errors.OpPrereqError("Node '%s' not known" %
4947                                    self.op.remote_node)
4948       self.op.remote_node = remote_node
4949       # Warning: do not remove the locking of the new secondary here
4950       # unless DRBD8.AddChildren is changed to work in parallel;
4951       # currently it doesn't since parallel invocations of
4952       # FindUnusedMinor will conflict
4953       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4954       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4955     else:
4956       self.needed_locks[locking.LEVEL_NODE] = []
4957       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4958
4959   def DeclareLocks(self, level):
4960     # If we're not already locking all nodes in the set we have to declare the
4961     # instance's primary/secondary nodes.
4962     if (level == locking.LEVEL_NODE and
4963         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4964       self._LockInstancesNodes()
4965
4966   def _RunAllocator(self):
4967     """Compute a new secondary node using an IAllocator.
4968
4969     """
4970     ial = IAllocator(self,
4971                      mode=constants.IALLOCATOR_MODE_RELOC,
4972                      name=self.op.instance_name,
4973                      relocate_from=[self.sec_node])
4974
4975     ial.Run(self.op.iallocator)
4976
4977     if not ial.success:
4978       raise errors.OpPrereqError("Can't compute nodes using"
4979                                  " iallocator '%s': %s" % (self.op.iallocator,
4980                                                            ial.info))
4981     if len(ial.nodes) != ial.required_nodes:
4982       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4983                                  " of nodes (%s), required %s" %
4984                                  (len(ial.nodes), ial.required_nodes))
4985     self.op.remote_node = ial.nodes[0]
4986     self.LogInfo("Selected new secondary for the instance: %s",
4987                  self.op.remote_node)
4988
4989   def BuildHooksEnv(self):
4990     """Build hooks env.
4991
4992     This runs on the master, the primary and all the secondaries.
4993
4994     """
4995     env = {
4996       "MODE": self.op.mode,
4997       "NEW_SECONDARY": self.op.remote_node,
4998       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4999       }
5000     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5001     nl = [
5002       self.cfg.GetMasterNode(),
5003       self.instance.primary_node,
5004       ]
5005     if self.op.remote_node is not None:
5006       nl.append(self.op.remote_node)
5007     return env, nl, nl
5008
5009   def CheckPrereq(self):
5010     """Check prerequisites.
5011
5012     This checks that the instance is in the cluster.
5013
5014     """
5015     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5016     assert instance is not None, \
5017       "Cannot retrieve locked instance %s" % self.op.instance_name
5018     self.instance = instance
5019
5020     if instance.disk_template != constants.DT_DRBD8:
5021       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5022                                  " instances")
5023
5024     if len(instance.secondary_nodes) != 1:
5025       raise errors.OpPrereqError("The instance has a strange layout,"
5026                                  " expected one secondary but found %d" %
5027                                  len(instance.secondary_nodes))
5028
5029     self.sec_node = instance.secondary_nodes[0]
5030
5031     if self.op.iallocator is not None:
5032       self._RunAllocator()
5033
5034     remote_node = self.op.remote_node
5035     if remote_node is not None:
5036       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5037       assert self.remote_node_info is not None, \
5038         "Cannot retrieve locked node %s" % remote_node
5039     else:
5040       self.remote_node_info = None
5041     if remote_node == instance.primary_node:
5042       raise errors.OpPrereqError("The specified node is the primary node of"
5043                                  " the instance.")
5044     elif remote_node == self.sec_node:
5045       raise errors.OpPrereqError("The specified node is already the"
5046                                  " secondary node of the instance.")
5047
5048     if self.op.mode == constants.REPLACE_DISK_PRI:
5049       n1 = self.tgt_node = instance.primary_node
5050       n2 = self.oth_node = self.sec_node
5051     elif self.op.mode == constants.REPLACE_DISK_SEC:
5052       n1 = self.tgt_node = self.sec_node
5053       n2 = self.oth_node = instance.primary_node
5054     elif self.op.mode == constants.REPLACE_DISK_CHG:
5055       n1 = self.new_node = remote_node
5056       n2 = self.oth_node = instance.primary_node
5057       self.tgt_node = self.sec_node
5058       _CheckNodeNotDrained(self, remote_node)
5059     else:
5060       raise errors.ProgrammerError("Unhandled disk replace mode")
5061
5062     _CheckNodeOnline(self, n1)
5063     _CheckNodeOnline(self, n2)
5064
5065     if not self.op.disks:
5066       self.op.disks = range(len(instance.disks))
5067
5068     for disk_idx in self.op.disks:
5069       instance.FindDisk(disk_idx)
5070
5071   def _ExecD8DiskOnly(self, feedback_fn):
5072     """Replace a disk on the primary or secondary for dbrd8.
5073
5074     The algorithm for replace is quite complicated:
5075
5076       1. for each disk to be replaced:
5077
5078         1. create new LVs on the target node with unique names
5079         1. detach old LVs from the drbd device
5080         1. rename old LVs to name_replaced.<time_t>
5081         1. rename new LVs to old LVs
5082         1. attach the new LVs (with the old names now) to the drbd device
5083
5084       1. wait for sync across all devices
5085
5086       1. for each modified disk:
5087
5088         1. remove old LVs (which have the name name_replaces.<time_t>)
5089
5090     Failures are not very well handled.
5091
5092     """
5093     steps_total = 6
5094     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5095     instance = self.instance
5096     iv_names = {}
5097     vgname = self.cfg.GetVGName()
5098     # start of work
5099     cfg = self.cfg
5100     tgt_node = self.tgt_node
5101     oth_node = self.oth_node
5102
5103     # Step: check device activation
5104     self.proc.LogStep(1, steps_total, "check device existence")
5105     info("checking volume groups")
5106     my_vg = cfg.GetVGName()
5107     results = self.rpc.call_vg_list([oth_node, tgt_node])
5108     if not results:
5109       raise errors.OpExecError("Can't list volume groups on the nodes")
5110     for node in oth_node, tgt_node:
5111       res = results[node]
5112       if res.failed or not res.data or my_vg not in res.data:
5113         raise errors.OpExecError("Volume group '%s' not found on %s" %
5114                                  (my_vg, node))
5115     for idx, dev in enumerate(instance.disks):
5116       if idx not in self.op.disks:
5117         continue
5118       for node in tgt_node, oth_node:
5119         info("checking disk/%d on %s" % (idx, node))
5120         cfg.SetDiskID(dev, node)
5121         result = self.rpc.call_blockdev_find(node, dev)
5122         msg = result.RemoteFailMsg()
5123         if not msg and not result.payload:
5124           msg = "disk not found"
5125         if msg:
5126           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5127                                    (idx, node, msg))
5128
5129     # Step: check other node consistency
5130     self.proc.LogStep(2, steps_total, "check peer consistency")
5131     for idx, dev in enumerate(instance.disks):
5132       if idx not in self.op.disks:
5133         continue
5134       info("checking disk/%d consistency on %s" % (idx, oth_node))
5135       if not _CheckDiskConsistency(self, dev, oth_node,
5136                                    oth_node==instance.primary_node):
5137         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5138                                  " to replace disks on this node (%s)" %
5139                                  (oth_node, tgt_node))
5140
5141     # Step: create new storage
5142     self.proc.LogStep(3, steps_total, "allocate new storage")
5143     for idx, dev in enumerate(instance.disks):
5144       if idx not in self.op.disks:
5145         continue
5146       size = dev.size
5147       cfg.SetDiskID(dev, tgt_node)
5148       lv_names = [".disk%d_%s" % (idx, suf)
5149                   for suf in ["data", "meta"]]
5150       names = _GenerateUniqueNames(self, lv_names)
5151       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5152                              logical_id=(vgname, names[0]))
5153       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5154                              logical_id=(vgname, names[1]))
5155       new_lvs = [lv_data, lv_meta]
5156       old_lvs = dev.children
5157       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5158       info("creating new local storage on %s for %s" %
5159            (tgt_node, dev.iv_name))
5160       # we pass force_create=True to force the LVM creation
5161       for new_lv in new_lvs:
5162         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5163                         _GetInstanceInfoText(instance), False)
5164
5165     # Step: for each lv, detach+rename*2+attach
5166     self.proc.LogStep(4, steps_total, "change drbd configuration")
5167     for dev, old_lvs, new_lvs in iv_names.itervalues():
5168       info("detaching %s drbd from local storage" % dev.iv_name)
5169       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5170       result.Raise()
5171       if not result.data:
5172         raise errors.OpExecError("Can't detach drbd from local storage on node"
5173                                  " %s for device %s" % (tgt_node, dev.iv_name))
5174       #dev.children = []
5175       #cfg.Update(instance)
5176
5177       # ok, we created the new LVs, so now we know we have the needed
5178       # storage; as such, we proceed on the target node to rename
5179       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5180       # using the assumption that logical_id == physical_id (which in
5181       # turn is the unique_id on that node)
5182
5183       # FIXME(iustin): use a better name for the replaced LVs
5184       temp_suffix = int(time.time())
5185       ren_fn = lambda d, suff: (d.physical_id[0],
5186                                 d.physical_id[1] + "_replaced-%s" % suff)
5187       # build the rename list based on what LVs exist on the node
5188       rlist = []
5189       for to_ren in old_lvs:
5190         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5191         if not result.RemoteFailMsg() and result.payload:
5192           # device exists
5193           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5194
5195       info("renaming the old LVs on the target node")
5196       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5197       result.Raise()
5198       if not result.data:
5199         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5200       # now we rename the new LVs to the old LVs
5201       info("renaming the new LVs on the target node")
5202       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5203       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5204       result.Raise()
5205       if not result.data:
5206         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5207
5208       for old, new in zip(old_lvs, new_lvs):
5209         new.logical_id = old.logical_id
5210         cfg.SetDiskID(new, tgt_node)
5211
5212       for disk in old_lvs:
5213         disk.logical_id = ren_fn(disk, temp_suffix)
5214         cfg.SetDiskID(disk, tgt_node)
5215
5216       # now that the new lvs have the old name, we can add them to the device
5217       info("adding new mirror component on %s" % tgt_node)
5218       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5219       if result.failed or not result.data:
5220         for new_lv in new_lvs:
5221           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5222           if msg:
5223             warning("Can't rollback device %s: %s", dev, msg,
5224                     hint="cleanup manually the unused logical volumes")
5225         raise errors.OpExecError("Can't add local storage to drbd")
5226
5227       dev.children = new_lvs
5228       cfg.Update(instance)
5229
5230     # Step: wait for sync
5231
5232     # this can fail as the old devices are degraded and _WaitForSync
5233     # does a combined result over all disks, so we don't check its
5234     # return value
5235     self.proc.LogStep(5, steps_total, "sync devices")
5236     _WaitForSync(self, instance, unlock=True)
5237
5238     # so check manually all the devices
5239     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5240       cfg.SetDiskID(dev, instance.primary_node)
5241       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5242       msg = result.RemoteFailMsg()
5243       if not msg and not result.payload:
5244         msg = "disk not found"
5245       if msg:
5246         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5247                                  (name, msg))
5248       if result.payload[5]:
5249         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5250
5251     # Step: remove old storage
5252     self.proc.LogStep(6, steps_total, "removing old storage")
5253     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5254       info("remove logical volumes for %s" % name)
5255       for lv in old_lvs:
5256         cfg.SetDiskID(lv, tgt_node)
5257         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5258         if msg:
5259           warning("Can't remove old LV: %s" % msg,
5260                   hint="manually remove unused LVs")
5261           continue
5262
5263   def _ExecD8Secondary(self, feedback_fn):
5264     """Replace the secondary node for drbd8.
5265
5266     The algorithm for replace is quite complicated:
5267       - for all disks of the instance:
5268         - create new LVs on the new node with same names
5269         - shutdown the drbd device on the old secondary
5270         - disconnect the drbd network on the primary
5271         - create the drbd device on the new secondary
5272         - network attach the drbd on the primary, using an artifice:
5273           the drbd code for Attach() will connect to the network if it
5274           finds a device which is connected to the good local disks but
5275           not network enabled
5276       - wait for sync across all devices
5277       - remove all disks from the old secondary
5278
5279     Failures are not very well handled.
5280
5281     """
5282     steps_total = 6
5283     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5284     instance = self.instance
5285     iv_names = {}
5286     # start of work
5287     cfg = self.cfg
5288     old_node = self.tgt_node
5289     new_node = self.new_node
5290     pri_node = instance.primary_node
5291     nodes_ip = {
5292       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5293       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5294       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5295       }
5296
5297     # Step: check device activation
5298     self.proc.LogStep(1, steps_total, "check device existence")
5299     info("checking volume groups")
5300     my_vg = cfg.GetVGName()
5301     results = self.rpc.call_vg_list([pri_node, new_node])
5302     for node in pri_node, new_node:
5303       res = results[node]
5304       if res.failed or not res.data or my_vg not in res.data:
5305         raise errors.OpExecError("Volume group '%s' not found on %s" %
5306                                  (my_vg, node))
5307     for idx, dev in enumerate(instance.disks):
5308       if idx not in self.op.disks:
5309         continue
5310       info("checking disk/%d on %s" % (idx, pri_node))
5311       cfg.SetDiskID(dev, pri_node)
5312       result = self.rpc.call_blockdev_find(pri_node, dev)
5313       msg = result.RemoteFailMsg()
5314       if not msg and not result.payload:
5315         msg = "disk not found"
5316       if msg:
5317         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5318                                  (idx, pri_node, msg))
5319
5320     # Step: check other node consistency
5321     self.proc.LogStep(2, steps_total, "check peer consistency")
5322     for idx, dev in enumerate(instance.disks):
5323       if idx not in self.op.disks:
5324         continue
5325       info("checking disk/%d consistency on %s" % (idx, pri_node))
5326       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5327         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5328                                  " unsafe to replace the secondary" %
5329                                  pri_node)
5330
5331     # Step: create new storage
5332     self.proc.LogStep(3, steps_total, "allocate new storage")
5333     for idx, dev in enumerate(instance.disks):
5334       info("adding new local storage on %s for disk/%d" %
5335            (new_node, idx))
5336       # we pass force_create=True to force LVM creation
5337       for new_lv in dev.children:
5338         _CreateBlockDev(self, new_node, instance, new_lv, True,
5339                         _GetInstanceInfoText(instance), False)
5340
5341     # Step 4: dbrd minors and drbd setups changes
5342     # after this, we must manually remove the drbd minors on both the
5343     # error and the success paths
5344     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5345                                    instance.name)
5346     logging.debug("Allocated minors %s" % (minors,))
5347     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5348     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5349       size = dev.size
5350       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5351       # create new devices on new_node; note that we create two IDs:
5352       # one without port, so the drbd will be activated without
5353       # networking information on the new node at this stage, and one
5354       # with network, for the latter activation in step 4
5355       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5356       if pri_node == o_node1:
5357         p_minor = o_minor1
5358       else:
5359         p_minor = o_minor2
5360
5361       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5362       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5363
5364       iv_names[idx] = (dev, dev.children, new_net_id)
5365       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5366                     new_net_id)
5367       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5368                               logical_id=new_alone_id,
5369                               children=dev.children)
5370       try:
5371         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5372                               _GetInstanceInfoText(instance), False)
5373       except errors.GenericError:
5374         self.cfg.ReleaseDRBDMinors(instance.name)
5375         raise
5376
5377     for idx, dev in enumerate(instance.disks):
5378       # we have new devices, shutdown the drbd on the old secondary
5379       info("shutting down drbd for disk/%d on old node" % idx)
5380       cfg.SetDiskID(dev, old_node)
5381       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5382       if msg:
5383         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5384                 (idx, msg),
5385                 hint="Please cleanup this device manually as soon as possible")
5386
5387     info("detaching primary drbds from the network (=> standalone)")
5388     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5389                                                instance.disks)[pri_node]
5390
5391     msg = result.RemoteFailMsg()
5392     if msg:
5393       # detaches didn't succeed (unlikely)
5394       self.cfg.ReleaseDRBDMinors(instance.name)
5395       raise errors.OpExecError("Can't detach the disks from the network on"
5396                                " old node: %s" % (msg,))
5397
5398     # if we managed to detach at least one, we update all the disks of
5399     # the instance to point to the new secondary
5400     info("updating instance configuration")
5401     for dev, _, new_logical_id in iv_names.itervalues():
5402       dev.logical_id = new_logical_id
5403       cfg.SetDiskID(dev, pri_node)
5404     cfg.Update(instance)
5405
5406     # and now perform the drbd attach
5407     info("attaching primary drbds to new secondary (standalone => connected)")
5408     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5409                                            instance.disks, instance.name,
5410                                            False)
5411     for to_node, to_result in result.items():
5412       msg = to_result.RemoteFailMsg()
5413       if msg:
5414         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5415                 hint="please do a gnt-instance info to see the"
5416                 " status of disks")
5417
5418     # this can fail as the old devices are degraded and _WaitForSync
5419     # does a combined result over all disks, so we don't check its
5420     # return value
5421     self.proc.LogStep(5, steps_total, "sync devices")
5422     _WaitForSync(self, instance, unlock=True)
5423
5424     # so check manually all the devices
5425     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5426       cfg.SetDiskID(dev, pri_node)
5427       result = self.rpc.call_blockdev_find(pri_node, dev)
5428       msg = result.RemoteFailMsg()
5429       if not msg and not result.payload:
5430         msg = "disk not found"
5431       if msg:
5432         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5433                                  (idx, msg))
5434       if result.payload[5]:
5435         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5436
5437     self.proc.LogStep(6, steps_total, "removing old storage")
5438     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5439       info("remove logical volumes for disk/%d" % idx)
5440       for lv in old_lvs:
5441         cfg.SetDiskID(lv, old_node)
5442         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5443         if msg:
5444           warning("Can't remove LV on old secondary: %s", msg,
5445                   hint="Cleanup stale volumes by hand")
5446
5447   def Exec(self, feedback_fn):
5448     """Execute disk replacement.
5449
5450     This dispatches the disk replacement to the appropriate handler.
5451
5452     """
5453     instance = self.instance
5454
5455     # Activate the instance disks if we're replacing them on a down instance
5456     if not instance.admin_up:
5457       _StartInstanceDisks(self, instance, True)
5458
5459     if self.op.mode == constants.REPLACE_DISK_CHG:
5460       fn = self._ExecD8Secondary
5461     else:
5462       fn = self._ExecD8DiskOnly
5463
5464     ret = fn(feedback_fn)
5465
5466     # Deactivate the instance disks if we're replacing them on a down instance
5467     if not instance.admin_up:
5468       _SafeShutdownInstanceDisks(self, instance)
5469
5470     return ret
5471
5472
5473 class LUGrowDisk(LogicalUnit):
5474   """Grow a disk of an instance.
5475
5476   """
5477   HPATH = "disk-grow"
5478   HTYPE = constants.HTYPE_INSTANCE
5479   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5480   REQ_BGL = False
5481
5482   def ExpandNames(self):
5483     self._ExpandAndLockInstance()
5484     self.needed_locks[locking.LEVEL_NODE] = []
5485     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5486
5487   def DeclareLocks(self, level):
5488     if level == locking.LEVEL_NODE:
5489       self._LockInstancesNodes()
5490
5491   def BuildHooksEnv(self):
5492     """Build hooks env.
5493
5494     This runs on the master, the primary and all the secondaries.
5495
5496     """
5497     env = {
5498       "DISK": self.op.disk,
5499       "AMOUNT": self.op.amount,
5500       }
5501     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5502     nl = [
5503       self.cfg.GetMasterNode(),
5504       self.instance.primary_node,
5505       ]
5506     return env, nl, nl
5507
5508   def CheckPrereq(self):
5509     """Check prerequisites.
5510
5511     This checks that the instance is in the cluster.
5512
5513     """
5514     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5515     assert instance is not None, \
5516       "Cannot retrieve locked instance %s" % self.op.instance_name
5517     nodenames = list(instance.all_nodes)
5518     for node in nodenames:
5519       _CheckNodeOnline(self, node)
5520
5521
5522     self.instance = instance
5523
5524     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5525       raise errors.OpPrereqError("Instance's disk layout does not support"
5526                                  " growing.")
5527
5528     self.disk = instance.FindDisk(self.op.disk)
5529
5530     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5531                                        instance.hypervisor)
5532     for node in nodenames:
5533       info = nodeinfo[node]
5534       if info.failed or not info.data:
5535         raise errors.OpPrereqError("Cannot get current information"
5536                                    " from node '%s'" % node)
5537       vg_free = info.data.get('vg_free', None)
5538       if not isinstance(vg_free, int):
5539         raise errors.OpPrereqError("Can't compute free disk space on"
5540                                    " node %s" % node)
5541       if self.op.amount > vg_free:
5542         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5543                                    " %d MiB available, %d MiB required" %
5544                                    (node, vg_free, self.op.amount))
5545
5546   def Exec(self, feedback_fn):
5547     """Execute disk grow.
5548
5549     """
5550     instance = self.instance
5551     disk = self.disk
5552     for node in instance.all_nodes:
5553       self.cfg.SetDiskID(disk, node)
5554       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5555       msg = result.RemoteFailMsg()
5556       if msg:
5557         raise errors.OpExecError("Grow request failed to node %s: %s" %
5558                                  (node, msg))
5559     disk.RecordGrow(self.op.amount)
5560     self.cfg.Update(instance)
5561     if self.op.wait_for_sync:
5562       disk_abort = not _WaitForSync(self, instance)
5563       if disk_abort:
5564         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5565                              " status.\nPlease check the instance.")
5566
5567
5568 class LUQueryInstanceData(NoHooksLU):
5569   """Query runtime instance data.
5570
5571   """
5572   _OP_REQP = ["instances", "static"]
5573   REQ_BGL = False
5574
5575   def ExpandNames(self):
5576     self.needed_locks = {}
5577     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5578
5579     if not isinstance(self.op.instances, list):
5580       raise errors.OpPrereqError("Invalid argument type 'instances'")
5581
5582     if self.op.instances:
5583       self.wanted_names = []
5584       for name in self.op.instances:
5585         full_name = self.cfg.ExpandInstanceName(name)
5586         if full_name is None:
5587           raise errors.OpPrereqError("Instance '%s' not known" % name)
5588         self.wanted_names.append(full_name)
5589       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5590     else:
5591       self.wanted_names = None
5592       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5593
5594     self.needed_locks[locking.LEVEL_NODE] = []
5595     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5596
5597   def DeclareLocks(self, level):
5598     if level == locking.LEVEL_NODE:
5599       self._LockInstancesNodes()
5600
5601   def CheckPrereq(self):
5602     """Check prerequisites.
5603
5604     This only checks the optional instance list against the existing names.
5605
5606     """
5607     if self.wanted_names is None:
5608       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5609
5610     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5611                              in self.wanted_names]
5612     return
5613
5614   def _ComputeDiskStatus(self, instance, snode, dev):
5615     """Compute block device status.
5616
5617     """
5618     static = self.op.static
5619     if not static:
5620       self.cfg.SetDiskID(dev, instance.primary_node)
5621       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5622       if dev_pstatus.offline:
5623         dev_pstatus = None
5624       else:
5625         msg = dev_pstatus.RemoteFailMsg()
5626         if msg:
5627           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5628                                    (instance.name, msg))
5629         dev_pstatus = dev_pstatus.payload
5630     else:
5631       dev_pstatus = None
5632
5633     if dev.dev_type in constants.LDS_DRBD:
5634       # we change the snode then (otherwise we use the one passed in)
5635       if dev.logical_id[0] == instance.primary_node:
5636         snode = dev.logical_id[1]
5637       else:
5638         snode = dev.logical_id[0]
5639
5640     if snode and not static:
5641       self.cfg.SetDiskID(dev, snode)
5642       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5643       if dev_sstatus.offline:
5644         dev_sstatus = None
5645       else:
5646         msg = dev_sstatus.RemoteFailMsg()
5647         if msg:
5648           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5649                                    (instance.name, msg))
5650         dev_sstatus = dev_sstatus.payload
5651     else:
5652       dev_sstatus = None
5653
5654     if dev.children:
5655       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5656                       for child in dev.children]
5657     else:
5658       dev_children = []
5659
5660     data = {
5661       "iv_name": dev.iv_name,
5662       "dev_type": dev.dev_type,
5663       "logical_id": dev.logical_id,
5664       "physical_id": dev.physical_id,
5665       "pstatus": dev_pstatus,
5666       "sstatus": dev_sstatus,
5667       "children": dev_children,
5668       "mode": dev.mode,
5669       }
5670
5671     return data
5672
5673   def Exec(self, feedback_fn):
5674     """Gather and return data"""
5675     result = {}
5676
5677     cluster = self.cfg.GetClusterInfo()
5678
5679     for instance in self.wanted_instances:
5680       if not self.op.static:
5681         remote_info = self.rpc.call_instance_info(instance.primary_node,
5682                                                   instance.name,
5683                                                   instance.hypervisor)
5684         remote_info.Raise()
5685         remote_info = remote_info.data
5686         if remote_info and "state" in remote_info:
5687           remote_state = "up"
5688         else:
5689           remote_state = "down"
5690       else:
5691         remote_state = None
5692       if instance.admin_up:
5693         config_state = "up"
5694       else:
5695         config_state = "down"
5696
5697       disks = [self._ComputeDiskStatus(instance, None, device)
5698                for device in instance.disks]
5699
5700       idict = {
5701         "name": instance.name,
5702         "config_state": config_state,
5703         "run_state": remote_state,
5704         "pnode": instance.primary_node,
5705         "snodes": instance.secondary_nodes,
5706         "os": instance.os,
5707         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5708         "disks": disks,
5709         "hypervisor": instance.hypervisor,
5710         "network_port": instance.network_port,
5711         "hv_instance": instance.hvparams,
5712         "hv_actual": cluster.FillHV(instance),
5713         "be_instance": instance.beparams,
5714         "be_actual": cluster.FillBE(instance),
5715         }
5716
5717       result[instance.name] = idict
5718
5719     return result
5720
5721
5722 class LUSetInstanceParams(LogicalUnit):
5723   """Modifies an instances's parameters.
5724
5725   """
5726   HPATH = "instance-modify"
5727   HTYPE = constants.HTYPE_INSTANCE
5728   _OP_REQP = ["instance_name"]
5729   REQ_BGL = False
5730
5731   def CheckArguments(self):
5732     if not hasattr(self.op, 'nics'):
5733       self.op.nics = []
5734     if not hasattr(self.op, 'disks'):
5735       self.op.disks = []
5736     if not hasattr(self.op, 'beparams'):
5737       self.op.beparams = {}
5738     if not hasattr(self.op, 'hvparams'):
5739       self.op.hvparams = {}
5740     self.op.force = getattr(self.op, "force", False)
5741     if not (self.op.nics or self.op.disks or
5742             self.op.hvparams or self.op.beparams):
5743       raise errors.OpPrereqError("No changes submitted")
5744
5745     # Disk validation
5746     disk_addremove = 0
5747     for disk_op, disk_dict in self.op.disks:
5748       if disk_op == constants.DDM_REMOVE:
5749         disk_addremove += 1
5750         continue
5751       elif disk_op == constants.DDM_ADD:
5752         disk_addremove += 1
5753       else:
5754         if not isinstance(disk_op, int):
5755           raise errors.OpPrereqError("Invalid disk index")
5756       if disk_op == constants.DDM_ADD:
5757         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5758         if mode not in constants.DISK_ACCESS_SET:
5759           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5760         size = disk_dict.get('size', None)
5761         if size is None:
5762           raise errors.OpPrereqError("Required disk parameter size missing")
5763         try:
5764           size = int(size)
5765         except ValueError, err:
5766           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5767                                      str(err))
5768         disk_dict['size'] = size
5769       else:
5770         # modification of disk
5771         if 'size' in disk_dict:
5772           raise errors.OpPrereqError("Disk size change not possible, use"
5773                                      " grow-disk")
5774
5775     if disk_addremove > 1:
5776       raise errors.OpPrereqError("Only one disk add or remove operation"
5777                                  " supported at a time")
5778
5779     # NIC validation
5780     nic_addremove = 0
5781     for nic_op, nic_dict in self.op.nics:
5782       if nic_op == constants.DDM_REMOVE:
5783         nic_addremove += 1
5784         continue
5785       elif nic_op == constants.DDM_ADD:
5786         nic_addremove += 1
5787       else:
5788         if not isinstance(nic_op, int):
5789           raise errors.OpPrereqError("Invalid nic index")
5790
5791       # nic_dict should be a dict
5792       nic_ip = nic_dict.get('ip', None)
5793       if nic_ip is not None:
5794         if nic_ip.lower() == constants.VALUE_NONE:
5795           nic_dict['ip'] = None
5796         else:
5797           if not utils.IsValidIP(nic_ip):
5798             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5799
5800       if nic_op == constants.DDM_ADD:
5801         nic_bridge = nic_dict.get('bridge', None)
5802         if nic_bridge is None:
5803           nic_dict['bridge'] = self.cfg.GetDefBridge()
5804         nic_mac = nic_dict.get('mac', None)
5805         if nic_mac is None:
5806           nic_dict['mac'] = constants.VALUE_AUTO
5807
5808       if 'mac' in nic_dict:
5809         nic_mac = nic_dict['mac']
5810         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5811           if not utils.IsValidMac(nic_mac):
5812             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5813         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5814           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5815                                      " modifying an existing nic")
5816
5817     if nic_addremove > 1:
5818       raise errors.OpPrereqError("Only one NIC add or remove operation"
5819                                  " supported at a time")
5820
5821   def ExpandNames(self):
5822     self._ExpandAndLockInstance()
5823     self.needed_locks[locking.LEVEL_NODE] = []
5824     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5825
5826   def DeclareLocks(self, level):
5827     if level == locking.LEVEL_NODE:
5828       self._LockInstancesNodes()
5829
5830   def BuildHooksEnv(self):
5831     """Build hooks env.
5832
5833     This runs on the master, primary and secondaries.
5834
5835     """
5836     args = dict()
5837     if constants.BE_MEMORY in self.be_new:
5838       args['memory'] = self.be_new[constants.BE_MEMORY]
5839     if constants.BE_VCPUS in self.be_new:
5840       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5841     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5842     # information at all.
5843     if self.op.nics:
5844       args['nics'] = []
5845       nic_override = dict(self.op.nics)
5846       for idx, nic in enumerate(self.instance.nics):
5847         if idx in nic_override:
5848           this_nic_override = nic_override[idx]
5849         else:
5850           this_nic_override = {}
5851         if 'ip' in this_nic_override:
5852           ip = this_nic_override['ip']
5853         else:
5854           ip = nic.ip
5855         if 'bridge' in this_nic_override:
5856           bridge = this_nic_override['bridge']
5857         else:
5858           bridge = nic.bridge
5859         if 'mac' in this_nic_override:
5860           mac = this_nic_override['mac']
5861         else:
5862           mac = nic.mac
5863         args['nics'].append((ip, bridge, mac))
5864       if constants.DDM_ADD in nic_override:
5865         ip = nic_override[constants.DDM_ADD].get('ip', None)
5866         bridge = nic_override[constants.DDM_ADD]['bridge']
5867         mac = nic_override[constants.DDM_ADD]['mac']
5868         args['nics'].append((ip, bridge, mac))
5869       elif constants.DDM_REMOVE in nic_override:
5870         del args['nics'][-1]
5871
5872     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5873     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5874     return env, nl, nl
5875
5876   def CheckPrereq(self):
5877     """Check prerequisites.
5878
5879     This only checks the instance list against the existing names.
5880
5881     """
5882     force = self.force = self.op.force
5883
5884     # checking the new params on the primary/secondary nodes
5885
5886     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5887     assert self.instance is not None, \
5888       "Cannot retrieve locked instance %s" % self.op.instance_name
5889     pnode = instance.primary_node
5890     nodelist = list(instance.all_nodes)
5891
5892     # hvparams processing
5893     if self.op.hvparams:
5894       i_hvdict = copy.deepcopy(instance.hvparams)
5895       for key, val in self.op.hvparams.iteritems():
5896         if val == constants.VALUE_DEFAULT:
5897           try:
5898             del i_hvdict[key]
5899           except KeyError:
5900             pass
5901         else:
5902           i_hvdict[key] = val
5903       cluster = self.cfg.GetClusterInfo()
5904       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5905       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5906                                 i_hvdict)
5907       # local check
5908       hypervisor.GetHypervisor(
5909         instance.hypervisor).CheckParameterSyntax(hv_new)
5910       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5911       self.hv_new = hv_new # the new actual values
5912       self.hv_inst = i_hvdict # the new dict (without defaults)
5913     else:
5914       self.hv_new = self.hv_inst = {}
5915
5916     # beparams processing
5917     if self.op.beparams:
5918       i_bedict = copy.deepcopy(instance.beparams)
5919       for key, val in self.op.beparams.iteritems():
5920         if val == constants.VALUE_DEFAULT:
5921           try:
5922             del i_bedict[key]
5923           except KeyError:
5924             pass
5925         else:
5926           i_bedict[key] = val
5927       cluster = self.cfg.GetClusterInfo()
5928       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5929       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5930                                 i_bedict)
5931       self.be_new = be_new # the new actual values
5932       self.be_inst = i_bedict # the new dict (without defaults)
5933     else:
5934       self.be_new = self.be_inst = {}
5935
5936     self.warn = []
5937
5938     if constants.BE_MEMORY in self.op.beparams and not self.force:
5939       mem_check_list = [pnode]
5940       if be_new[constants.BE_AUTO_BALANCE]:
5941         # either we changed auto_balance to yes or it was from before
5942         mem_check_list.extend(instance.secondary_nodes)
5943       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5944                                                   instance.hypervisor)
5945       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5946                                          instance.hypervisor)
5947       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5948         # Assume the primary node is unreachable and go ahead
5949         self.warn.append("Can't get info from primary node %s" % pnode)
5950       else:
5951         if not instance_info.failed and instance_info.data:
5952           current_mem = int(instance_info.data['memory'])
5953         else:
5954           # Assume instance not running
5955           # (there is a slight race condition here, but it's not very probable,
5956           # and we have no other way to check)
5957           current_mem = 0
5958         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5959                     nodeinfo[pnode].data['memory_free'])
5960         if miss_mem > 0:
5961           raise errors.OpPrereqError("This change will prevent the instance"
5962                                      " from starting, due to %d MB of memory"
5963                                      " missing on its primary node" % miss_mem)
5964
5965       if be_new[constants.BE_AUTO_BALANCE]:
5966         for node, nres in nodeinfo.iteritems():
5967           if node not in instance.secondary_nodes:
5968             continue
5969           if nres.failed or not isinstance(nres.data, dict):
5970             self.warn.append("Can't get info from secondary node %s" % node)
5971           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5972             self.warn.append("Not enough memory to failover instance to"
5973                              " secondary node %s" % node)
5974
5975     # NIC processing
5976     for nic_op, nic_dict in self.op.nics:
5977       if nic_op == constants.DDM_REMOVE:
5978         if not instance.nics:
5979           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5980         continue
5981       if nic_op != constants.DDM_ADD:
5982         # an existing nic
5983         if nic_op < 0 or nic_op >= len(instance.nics):
5984           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5985                                      " are 0 to %d" %
5986                                      (nic_op, len(instance.nics)))
5987       if 'bridge' in nic_dict:
5988         nic_bridge = nic_dict['bridge']
5989         if nic_bridge is None:
5990           raise errors.OpPrereqError('Cannot set the nic bridge to None')
5991         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5992           msg = ("Bridge '%s' doesn't exist on one of"
5993                  " the instance nodes" % nic_bridge)
5994           if self.force:
5995             self.warn.append(msg)
5996           else:
5997             raise errors.OpPrereqError(msg)
5998       if 'mac' in nic_dict:
5999         nic_mac = nic_dict['mac']
6000         if nic_mac is None:
6001           raise errors.OpPrereqError('Cannot set the nic mac to None')
6002         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6003           # otherwise generate the mac
6004           nic_dict['mac'] = self.cfg.GenerateMAC()
6005         else:
6006           # or validate/reserve the current one
6007           if self.cfg.IsMacInUse(nic_mac):
6008             raise errors.OpPrereqError("MAC address %s already in use"
6009                                        " in cluster" % nic_mac)
6010
6011     # DISK processing
6012     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6013       raise errors.OpPrereqError("Disk operations not supported for"
6014                                  " diskless instances")
6015     for disk_op, disk_dict in self.op.disks:
6016       if disk_op == constants.DDM_REMOVE:
6017         if len(instance.disks) == 1:
6018           raise errors.OpPrereqError("Cannot remove the last disk of"
6019                                      " an instance")
6020         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6021         ins_l = ins_l[pnode]
6022         if ins_l.failed or not isinstance(ins_l.data, list):
6023           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6024         if instance.name in ins_l.data:
6025           raise errors.OpPrereqError("Instance is running, can't remove"
6026                                      " disks.")
6027
6028       if (disk_op == constants.DDM_ADD and
6029           len(instance.nics) >= constants.MAX_DISKS):
6030         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6031                                    " add more" % constants.MAX_DISKS)
6032       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6033         # an existing disk
6034         if disk_op < 0 or disk_op >= len(instance.disks):
6035           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6036                                      " are 0 to %d" %
6037                                      (disk_op, len(instance.disks)))
6038
6039     return
6040
6041   def Exec(self, feedback_fn):
6042     """Modifies an instance.
6043
6044     All parameters take effect only at the next restart of the instance.
6045
6046     """
6047     # Process here the warnings from CheckPrereq, as we don't have a
6048     # feedback_fn there.
6049     for warn in self.warn:
6050       feedback_fn("WARNING: %s" % warn)
6051
6052     result = []
6053     instance = self.instance
6054     # disk changes
6055     for disk_op, disk_dict in self.op.disks:
6056       if disk_op == constants.DDM_REMOVE:
6057         # remove the last disk
6058         device = instance.disks.pop()
6059         device_idx = len(instance.disks)
6060         for node, disk in device.ComputeNodeTree(instance.primary_node):
6061           self.cfg.SetDiskID(disk, node)
6062           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6063           if msg:
6064             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6065                             " continuing anyway", device_idx, node, msg)
6066         result.append(("disk/%d" % device_idx, "remove"))
6067       elif disk_op == constants.DDM_ADD:
6068         # add a new disk
6069         if instance.disk_template == constants.DT_FILE:
6070           file_driver, file_path = instance.disks[0].logical_id
6071           file_path = os.path.dirname(file_path)
6072         else:
6073           file_driver = file_path = None
6074         disk_idx_base = len(instance.disks)
6075         new_disk = _GenerateDiskTemplate(self,
6076                                          instance.disk_template,
6077                                          instance.name, instance.primary_node,
6078                                          instance.secondary_nodes,
6079                                          [disk_dict],
6080                                          file_path,
6081                                          file_driver,
6082                                          disk_idx_base)[0]
6083         instance.disks.append(new_disk)
6084         info = _GetInstanceInfoText(instance)
6085
6086         logging.info("Creating volume %s for instance %s",
6087                      new_disk.iv_name, instance.name)
6088         # Note: this needs to be kept in sync with _CreateDisks
6089         #HARDCODE
6090         for node in instance.all_nodes:
6091           f_create = node == instance.primary_node
6092           try:
6093             _CreateBlockDev(self, node, instance, new_disk,
6094                             f_create, info, f_create)
6095           except errors.OpExecError, err:
6096             self.LogWarning("Failed to create volume %s (%s) on"
6097                             " node %s: %s",
6098                             new_disk.iv_name, new_disk, node, err)
6099         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6100                        (new_disk.size, new_disk.mode)))
6101       else:
6102         # change a given disk
6103         instance.disks[disk_op].mode = disk_dict['mode']
6104         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6105     # NIC changes
6106     for nic_op, nic_dict in self.op.nics:
6107       if nic_op == constants.DDM_REMOVE:
6108         # remove the last nic
6109         del instance.nics[-1]
6110         result.append(("nic.%d" % len(instance.nics), "remove"))
6111       elif nic_op == constants.DDM_ADD:
6112         # mac and bridge should be set, by now
6113         mac = nic_dict['mac']
6114         bridge = nic_dict['bridge']
6115         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6116                               bridge=bridge)
6117         instance.nics.append(new_nic)
6118         result.append(("nic.%d" % (len(instance.nics) - 1),
6119                        "add:mac=%s,ip=%s,bridge=%s" %
6120                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6121       else:
6122         # change a given nic
6123         for key in 'mac', 'ip', 'bridge':
6124           if key in nic_dict:
6125             setattr(instance.nics[nic_op], key, nic_dict[key])
6126             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6127
6128     # hvparams changes
6129     if self.op.hvparams:
6130       instance.hvparams = self.hv_inst
6131       for key, val in self.op.hvparams.iteritems():
6132         result.append(("hv/%s" % key, val))
6133
6134     # beparams changes
6135     if self.op.beparams:
6136       instance.beparams = self.be_inst
6137       for key, val in self.op.beparams.iteritems():
6138         result.append(("be/%s" % key, val))
6139
6140     self.cfg.Update(instance)
6141
6142     return result
6143
6144
6145 class LUQueryExports(NoHooksLU):
6146   """Query the exports list
6147
6148   """
6149   _OP_REQP = ['nodes']
6150   REQ_BGL = False
6151
6152   def ExpandNames(self):
6153     self.needed_locks = {}
6154     self.share_locks[locking.LEVEL_NODE] = 1
6155     if not self.op.nodes:
6156       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6157     else:
6158       self.needed_locks[locking.LEVEL_NODE] = \
6159         _GetWantedNodes(self, self.op.nodes)
6160
6161   def CheckPrereq(self):
6162     """Check prerequisites.
6163
6164     """
6165     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6166
6167   def Exec(self, feedback_fn):
6168     """Compute the list of all the exported system images.
6169
6170     @rtype: dict
6171     @return: a dictionary with the structure node->(export-list)
6172         where export-list is a list of the instances exported on
6173         that node.
6174
6175     """
6176     rpcresult = self.rpc.call_export_list(self.nodes)
6177     result = {}
6178     for node in rpcresult:
6179       if rpcresult[node].failed:
6180         result[node] = False
6181       else:
6182         result[node] = rpcresult[node].data
6183
6184     return result
6185
6186
6187 class LUExportInstance(LogicalUnit):
6188   """Export an instance to an image in the cluster.
6189
6190   """
6191   HPATH = "instance-export"
6192   HTYPE = constants.HTYPE_INSTANCE
6193   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6194   REQ_BGL = False
6195
6196   def ExpandNames(self):
6197     self._ExpandAndLockInstance()
6198     # FIXME: lock only instance primary and destination node
6199     #
6200     # Sad but true, for now we have do lock all nodes, as we don't know where
6201     # the previous export might be, and and in this LU we search for it and
6202     # remove it from its current node. In the future we could fix this by:
6203     #  - making a tasklet to search (share-lock all), then create the new one,
6204     #    then one to remove, after
6205     #  - removing the removal operation altoghether
6206     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6207
6208   def DeclareLocks(self, level):
6209     """Last minute lock declaration."""
6210     # All nodes are locked anyway, so nothing to do here.
6211
6212   def BuildHooksEnv(self):
6213     """Build hooks env.
6214
6215     This will run on the master, primary node and target node.
6216
6217     """
6218     env = {
6219       "EXPORT_NODE": self.op.target_node,
6220       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6221       }
6222     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6223     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6224           self.op.target_node]
6225     return env, nl, nl
6226
6227   def CheckPrereq(self):
6228     """Check prerequisites.
6229
6230     This checks that the instance and node names are valid.
6231
6232     """
6233     instance_name = self.op.instance_name
6234     self.instance = self.cfg.GetInstanceInfo(instance_name)
6235     assert self.instance is not None, \
6236           "Cannot retrieve locked instance %s" % self.op.instance_name
6237     _CheckNodeOnline(self, self.instance.primary_node)
6238
6239     self.dst_node = self.cfg.GetNodeInfo(
6240       self.cfg.ExpandNodeName(self.op.target_node))
6241
6242     if self.dst_node is None:
6243       # This is wrong node name, not a non-locked node
6244       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6245     _CheckNodeOnline(self, self.dst_node.name)
6246     _CheckNodeNotDrained(self, self.dst_node.name)
6247
6248     # instance disk type verification
6249     for disk in self.instance.disks:
6250       if disk.dev_type == constants.LD_FILE:
6251         raise errors.OpPrereqError("Export not supported for instances with"
6252                                    " file-based disks")
6253
6254   def Exec(self, feedback_fn):
6255     """Export an instance to an image in the cluster.
6256
6257     """
6258     instance = self.instance
6259     dst_node = self.dst_node
6260     src_node = instance.primary_node
6261     if self.op.shutdown:
6262       # shutdown the instance, but not the disks
6263       result = self.rpc.call_instance_shutdown(src_node, instance)
6264       msg = result.RemoteFailMsg()
6265       if msg:
6266         raise errors.OpExecError("Could not shutdown instance %s on"
6267                                  " node %s: %s" %
6268                                  (instance.name, src_node, msg))
6269
6270     vgname = self.cfg.GetVGName()
6271
6272     snap_disks = []
6273
6274     # set the disks ID correctly since call_instance_start needs the
6275     # correct drbd minor to create the symlinks
6276     for disk in instance.disks:
6277       self.cfg.SetDiskID(disk, src_node)
6278
6279     try:
6280       for disk in instance.disks:
6281         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6282         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6283         if new_dev_name.failed or not new_dev_name.data:
6284           self.LogWarning("Could not snapshot block device %s on node %s",
6285                           disk.logical_id[1], src_node)
6286           snap_disks.append(False)
6287         else:
6288           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6289                                  logical_id=(vgname, new_dev_name.data),
6290                                  physical_id=(vgname, new_dev_name.data),
6291                                  iv_name=disk.iv_name)
6292           snap_disks.append(new_dev)
6293
6294     finally:
6295       if self.op.shutdown and instance.admin_up:
6296         result = self.rpc.call_instance_start(src_node, instance, None, None)
6297         msg = result.RemoteFailMsg()
6298         if msg:
6299           _ShutdownInstanceDisks(self, instance)
6300           raise errors.OpExecError("Could not start instance: %s" % msg)
6301
6302     # TODO: check for size
6303
6304     cluster_name = self.cfg.GetClusterName()
6305     for idx, dev in enumerate(snap_disks):
6306       if dev:
6307         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6308                                                instance, cluster_name, idx)
6309         if result.failed or not result.data:
6310           self.LogWarning("Could not export block device %s from node %s to"
6311                           " node %s", dev.logical_id[1], src_node,
6312                           dst_node.name)
6313         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6314         if msg:
6315           self.LogWarning("Could not remove snapshot block device %s from node"
6316                           " %s: %s", dev.logical_id[1], src_node, msg)
6317
6318     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6319     if result.failed or not result.data:
6320       self.LogWarning("Could not finalize export for instance %s on node %s",
6321                       instance.name, dst_node.name)
6322
6323     nodelist = self.cfg.GetNodeList()
6324     nodelist.remove(dst_node.name)
6325
6326     # on one-node clusters nodelist will be empty after the removal
6327     # if we proceed the backup would be removed because OpQueryExports
6328     # substitutes an empty list with the full cluster node list.
6329     if nodelist:
6330       exportlist = self.rpc.call_export_list(nodelist)
6331       for node in exportlist:
6332         if exportlist[node].failed:
6333           continue
6334         if instance.name in exportlist[node].data:
6335           if not self.rpc.call_export_remove(node, instance.name):
6336             self.LogWarning("Could not remove older export for instance %s"
6337                             " on node %s", instance.name, node)
6338
6339
6340 class LURemoveExport(NoHooksLU):
6341   """Remove exports related to the named instance.
6342
6343   """
6344   _OP_REQP = ["instance_name"]
6345   REQ_BGL = False
6346
6347   def ExpandNames(self):
6348     self.needed_locks = {}
6349     # We need all nodes to be locked in order for RemoveExport to work, but we
6350     # don't need to lock the instance itself, as nothing will happen to it (and
6351     # we can remove exports also for a removed instance)
6352     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6353
6354   def CheckPrereq(self):
6355     """Check prerequisites.
6356     """
6357     pass
6358
6359   def Exec(self, feedback_fn):
6360     """Remove any export.
6361
6362     """
6363     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6364     # If the instance was not found we'll try with the name that was passed in.
6365     # This will only work if it was an FQDN, though.
6366     fqdn_warn = False
6367     if not instance_name:
6368       fqdn_warn = True
6369       instance_name = self.op.instance_name
6370
6371     exportlist = self.rpc.call_export_list(self.acquired_locks[
6372       locking.LEVEL_NODE])
6373     found = False
6374     for node in exportlist:
6375       if exportlist[node].failed:
6376         self.LogWarning("Failed to query node %s, continuing" % node)
6377         continue
6378       if instance_name in exportlist[node].data:
6379         found = True
6380         result = self.rpc.call_export_remove(node, instance_name)
6381         if result.failed or not result.data:
6382           logging.error("Could not remove export for instance %s"
6383                         " on node %s", instance_name, node)
6384
6385     if fqdn_warn and not found:
6386       feedback_fn("Export not found. If trying to remove an export belonging"
6387                   " to a deleted instance please use its Fully Qualified"
6388                   " Domain Name.")
6389
6390
6391 class TagsLU(NoHooksLU):
6392   """Generic tags LU.
6393
6394   This is an abstract class which is the parent of all the other tags LUs.
6395
6396   """
6397
6398   def ExpandNames(self):
6399     self.needed_locks = {}
6400     if self.op.kind == constants.TAG_NODE:
6401       name = self.cfg.ExpandNodeName(self.op.name)
6402       if name is None:
6403         raise errors.OpPrereqError("Invalid node name (%s)" %
6404                                    (self.op.name,))
6405       self.op.name = name
6406       self.needed_locks[locking.LEVEL_NODE] = name
6407     elif self.op.kind == constants.TAG_INSTANCE:
6408       name = self.cfg.ExpandInstanceName(self.op.name)
6409       if name is None:
6410         raise errors.OpPrereqError("Invalid instance name (%s)" %
6411                                    (self.op.name,))
6412       self.op.name = name
6413       self.needed_locks[locking.LEVEL_INSTANCE] = name
6414
6415   def CheckPrereq(self):
6416     """Check prerequisites.
6417
6418     """
6419     if self.op.kind == constants.TAG_CLUSTER:
6420       self.target = self.cfg.GetClusterInfo()
6421     elif self.op.kind == constants.TAG_NODE:
6422       self.target = self.cfg.GetNodeInfo(self.op.name)
6423     elif self.op.kind == constants.TAG_INSTANCE:
6424       self.target = self.cfg.GetInstanceInfo(self.op.name)
6425     else:
6426       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6427                                  str(self.op.kind))
6428
6429
6430 class LUGetTags(TagsLU):
6431   """Returns the tags of a given object.
6432
6433   """
6434   _OP_REQP = ["kind", "name"]
6435   REQ_BGL = False
6436
6437   def Exec(self, feedback_fn):
6438     """Returns the tag list.
6439
6440     """
6441     return list(self.target.GetTags())
6442
6443
6444 class LUSearchTags(NoHooksLU):
6445   """Searches the tags for a given pattern.
6446
6447   """
6448   _OP_REQP = ["pattern"]
6449   REQ_BGL = False
6450
6451   def ExpandNames(self):
6452     self.needed_locks = {}
6453
6454   def CheckPrereq(self):
6455     """Check prerequisites.
6456
6457     This checks the pattern passed for validity by compiling it.
6458
6459     """
6460     try:
6461       self.re = re.compile(self.op.pattern)
6462     except re.error, err:
6463       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6464                                  (self.op.pattern, err))
6465
6466   def Exec(self, feedback_fn):
6467     """Returns the tag list.
6468
6469     """
6470     cfg = self.cfg
6471     tgts = [("/cluster", cfg.GetClusterInfo())]
6472     ilist = cfg.GetAllInstancesInfo().values()
6473     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6474     nlist = cfg.GetAllNodesInfo().values()
6475     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6476     results = []
6477     for path, target in tgts:
6478       for tag in target.GetTags():
6479         if self.re.search(tag):
6480           results.append((path, tag))
6481     return results
6482
6483
6484 class LUAddTags(TagsLU):
6485   """Sets a tag on a given object.
6486
6487   """
6488   _OP_REQP = ["kind", "name", "tags"]
6489   REQ_BGL = False
6490
6491   def CheckPrereq(self):
6492     """Check prerequisites.
6493
6494     This checks the type and length of the tag name and value.
6495
6496     """
6497     TagsLU.CheckPrereq(self)
6498     for tag in self.op.tags:
6499       objects.TaggableObject.ValidateTag(tag)
6500
6501   def Exec(self, feedback_fn):
6502     """Sets the tag.
6503
6504     """
6505     try:
6506       for tag in self.op.tags:
6507         self.target.AddTag(tag)
6508     except errors.TagError, err:
6509       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6510     try:
6511       self.cfg.Update(self.target)
6512     except errors.ConfigurationError:
6513       raise errors.OpRetryError("There has been a modification to the"
6514                                 " config file and the operation has been"
6515                                 " aborted. Please retry.")
6516
6517
6518 class LUDelTags(TagsLU):
6519   """Delete a list of tags from a given object.
6520
6521   """
6522   _OP_REQP = ["kind", "name", "tags"]
6523   REQ_BGL = False
6524
6525   def CheckPrereq(self):
6526     """Check prerequisites.
6527
6528     This checks that we have the given tag.
6529
6530     """
6531     TagsLU.CheckPrereq(self)
6532     for tag in self.op.tags:
6533       objects.TaggableObject.ValidateTag(tag)
6534     del_tags = frozenset(self.op.tags)
6535     cur_tags = self.target.GetTags()
6536     if not del_tags <= cur_tags:
6537       diff_tags = del_tags - cur_tags
6538       diff_names = ["'%s'" % tag for tag in diff_tags]
6539       diff_names.sort()
6540       raise errors.OpPrereqError("Tag(s) %s not found" %
6541                                  (",".join(diff_names)))
6542
6543   def Exec(self, feedback_fn):
6544     """Remove the tag from the object.
6545
6546     """
6547     for tag in self.op.tags:
6548       self.target.RemoveTag(tag)
6549     try:
6550       self.cfg.Update(self.target)
6551     except errors.ConfigurationError:
6552       raise errors.OpRetryError("There has been a modification to the"
6553                                 " config file and the operation has been"
6554                                 " aborted. Please retry.")
6555
6556
6557 class LUTestDelay(NoHooksLU):
6558   """Sleep for a specified amount of time.
6559
6560   This LU sleeps on the master and/or nodes for a specified amount of
6561   time.
6562
6563   """
6564   _OP_REQP = ["duration", "on_master", "on_nodes"]
6565   REQ_BGL = False
6566
6567   def ExpandNames(self):
6568     """Expand names and set required locks.
6569
6570     This expands the node list, if any.
6571
6572     """
6573     self.needed_locks = {}
6574     if self.op.on_nodes:
6575       # _GetWantedNodes can be used here, but is not always appropriate to use
6576       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6577       # more information.
6578       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6579       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6580
6581   def CheckPrereq(self):
6582     """Check prerequisites.
6583
6584     """
6585
6586   def Exec(self, feedback_fn):
6587     """Do the actual sleep.
6588
6589     """
6590     if self.op.on_master:
6591       if not utils.TestDelay(self.op.duration):
6592         raise errors.OpExecError("Error during master delay test")
6593     if self.op.on_nodes:
6594       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6595       if not result:
6596         raise errors.OpExecError("Complete failure from rpc call")
6597       for node, node_result in result.items():
6598         node_result.Raise()
6599         if not node_result.data:
6600           raise errors.OpExecError("Failure during rpc call to node %s,"
6601                                    " result: %s" % (node, node_result.data))
6602
6603
6604 class IAllocator(object):
6605   """IAllocator framework.
6606
6607   An IAllocator instance has three sets of attributes:
6608     - cfg that is needed to query the cluster
6609     - input data (all members of the _KEYS class attribute are required)
6610     - four buffer attributes (in|out_data|text), that represent the
6611       input (to the external script) in text and data structure format,
6612       and the output from it, again in two formats
6613     - the result variables from the script (success, info, nodes) for
6614       easy usage
6615
6616   """
6617   _ALLO_KEYS = [
6618     "mem_size", "disks", "disk_template",
6619     "os", "tags", "nics", "vcpus", "hypervisor",
6620     ]
6621   _RELO_KEYS = [
6622     "relocate_from",
6623     ]
6624
6625   def __init__(self, lu, mode, name, **kwargs):
6626     self.lu = lu
6627     # init buffer variables
6628     self.in_text = self.out_text = self.in_data = self.out_data = None
6629     # init all input fields so that pylint is happy
6630     self.mode = mode
6631     self.name = name
6632     self.mem_size = self.disks = self.disk_template = None
6633     self.os = self.tags = self.nics = self.vcpus = None
6634     self.hypervisor = None
6635     self.relocate_from = None
6636     # computed fields
6637     self.required_nodes = None
6638     # init result fields
6639     self.success = self.info = self.nodes = None
6640     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6641       keyset = self._ALLO_KEYS
6642     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6643       keyset = self._RELO_KEYS
6644     else:
6645       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6646                                    " IAllocator" % self.mode)
6647     for key in kwargs:
6648       if key not in keyset:
6649         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6650                                      " IAllocator" % key)
6651       setattr(self, key, kwargs[key])
6652     for key in keyset:
6653       if key not in kwargs:
6654         raise errors.ProgrammerError("Missing input parameter '%s' to"
6655                                      " IAllocator" % key)
6656     self._BuildInputData()
6657
6658   def _ComputeClusterData(self):
6659     """Compute the generic allocator input data.
6660
6661     This is the data that is independent of the actual operation.
6662
6663     """
6664     cfg = self.lu.cfg
6665     cluster_info = cfg.GetClusterInfo()
6666     # cluster data
6667     data = {
6668       "version": constants.IALLOCATOR_VERSION,
6669       "cluster_name": cfg.GetClusterName(),
6670       "cluster_tags": list(cluster_info.GetTags()),
6671       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6672       # we don't have job IDs
6673       }
6674     iinfo = cfg.GetAllInstancesInfo().values()
6675     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6676
6677     # node data
6678     node_results = {}
6679     node_list = cfg.GetNodeList()
6680
6681     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6682       hypervisor_name = self.hypervisor
6683     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6684       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6685
6686     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6687                                            hypervisor_name)
6688     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6689                        cluster_info.enabled_hypervisors)
6690     for nname, nresult in node_data.items():
6691       # first fill in static (config-based) values
6692       ninfo = cfg.GetNodeInfo(nname)
6693       pnr = {
6694         "tags": list(ninfo.GetTags()),
6695         "primary_ip": ninfo.primary_ip,
6696         "secondary_ip": ninfo.secondary_ip,
6697         "offline": ninfo.offline,
6698         "drained": ninfo.drained,
6699         "master_candidate": ninfo.master_candidate,
6700         }
6701
6702       if not ninfo.offline:
6703         nresult.Raise()
6704         if not isinstance(nresult.data, dict):
6705           raise errors.OpExecError("Can't get data for node %s" % nname)
6706         remote_info = nresult.data
6707         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6708                      'vg_size', 'vg_free', 'cpu_total']:
6709           if attr not in remote_info:
6710             raise errors.OpExecError("Node '%s' didn't return attribute"
6711                                      " '%s'" % (nname, attr))
6712           try:
6713             remote_info[attr] = int(remote_info[attr])
6714           except ValueError, err:
6715             raise errors.OpExecError("Node '%s' returned invalid value"
6716                                      " for '%s': %s" % (nname, attr, err))
6717         # compute memory used by primary instances
6718         i_p_mem = i_p_up_mem = 0
6719         for iinfo, beinfo in i_list:
6720           if iinfo.primary_node == nname:
6721             i_p_mem += beinfo[constants.BE_MEMORY]
6722             if iinfo.name not in node_iinfo[nname].data:
6723               i_used_mem = 0
6724             else:
6725               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6726             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6727             remote_info['memory_free'] -= max(0, i_mem_diff)
6728
6729             if iinfo.admin_up:
6730               i_p_up_mem += beinfo[constants.BE_MEMORY]
6731
6732         # compute memory used by instances
6733         pnr_dyn = {
6734           "total_memory": remote_info['memory_total'],
6735           "reserved_memory": remote_info['memory_dom0'],
6736           "free_memory": remote_info['memory_free'],
6737           "total_disk": remote_info['vg_size'],
6738           "free_disk": remote_info['vg_free'],
6739           "total_cpus": remote_info['cpu_total'],
6740           "i_pri_memory": i_p_mem,
6741           "i_pri_up_memory": i_p_up_mem,
6742           }
6743         pnr.update(pnr_dyn)
6744
6745       node_results[nname] = pnr
6746     data["nodes"] = node_results
6747
6748     # instance data
6749     instance_data = {}
6750     for iinfo, beinfo in i_list:
6751       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6752                   for n in iinfo.nics]
6753       pir = {
6754         "tags": list(iinfo.GetTags()),
6755         "admin_up": iinfo.admin_up,
6756         "vcpus": beinfo[constants.BE_VCPUS],
6757         "memory": beinfo[constants.BE_MEMORY],
6758         "os": iinfo.os,
6759         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6760         "nics": nic_data,
6761         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6762         "disk_template": iinfo.disk_template,
6763         "hypervisor": iinfo.hypervisor,
6764         }
6765       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6766                                                  pir["disks"])
6767       instance_data[iinfo.name] = pir
6768
6769     data["instances"] = instance_data
6770
6771     self.in_data = data
6772
6773   def _AddNewInstance(self):
6774     """Add new instance data to allocator structure.
6775
6776     This in combination with _AllocatorGetClusterData will create the
6777     correct structure needed as input for the allocator.
6778
6779     The checks for the completeness of the opcode must have already been
6780     done.
6781
6782     """
6783     data = self.in_data
6784
6785     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6786
6787     if self.disk_template in constants.DTS_NET_MIRROR:
6788       self.required_nodes = 2
6789     else:
6790       self.required_nodes = 1
6791     request = {
6792       "type": "allocate",
6793       "name": self.name,
6794       "disk_template": self.disk_template,
6795       "tags": self.tags,
6796       "os": self.os,
6797       "vcpus": self.vcpus,
6798       "memory": self.mem_size,
6799       "disks": self.disks,
6800       "disk_space_total": disk_space,
6801       "nics": self.nics,
6802       "required_nodes": self.required_nodes,
6803       }
6804     data["request"] = request
6805
6806   def _AddRelocateInstance(self):
6807     """Add relocate instance data to allocator structure.
6808
6809     This in combination with _IAllocatorGetClusterData will create the
6810     correct structure needed as input for the allocator.
6811
6812     The checks for the completeness of the opcode must have already been
6813     done.
6814
6815     """
6816     instance = self.lu.cfg.GetInstanceInfo(self.name)
6817     if instance is None:
6818       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6819                                    " IAllocator" % self.name)
6820
6821     if instance.disk_template not in constants.DTS_NET_MIRROR:
6822       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6823
6824     if len(instance.secondary_nodes) != 1:
6825       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6826
6827     self.required_nodes = 1
6828     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6829     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6830
6831     request = {
6832       "type": "relocate",
6833       "name": self.name,
6834       "disk_space_total": disk_space,
6835       "required_nodes": self.required_nodes,
6836       "relocate_from": self.relocate_from,
6837       }
6838     self.in_data["request"] = request
6839
6840   def _BuildInputData(self):
6841     """Build input data structures.
6842
6843     """
6844     self._ComputeClusterData()
6845
6846     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6847       self._AddNewInstance()
6848     else:
6849       self._AddRelocateInstance()
6850
6851     self.in_text = serializer.Dump(self.in_data)
6852
6853   def Run(self, name, validate=True, call_fn=None):
6854     """Run an instance allocator and return the results.
6855
6856     """
6857     if call_fn is None:
6858       call_fn = self.lu.rpc.call_iallocator_runner
6859     data = self.in_text
6860
6861     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6862     result.Raise()
6863
6864     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6865       raise errors.OpExecError("Invalid result from master iallocator runner")
6866
6867     rcode, stdout, stderr, fail = result.data
6868
6869     if rcode == constants.IARUN_NOTFOUND:
6870       raise errors.OpExecError("Can't find allocator '%s'" % name)
6871     elif rcode == constants.IARUN_FAILURE:
6872       raise errors.OpExecError("Instance allocator call failed: %s,"
6873                                " output: %s" % (fail, stdout+stderr))
6874     self.out_text = stdout
6875     if validate:
6876       self._ValidateResult()
6877
6878   def _ValidateResult(self):
6879     """Process the allocator results.
6880
6881     This will process and if successful save the result in
6882     self.out_data and the other parameters.
6883
6884     """
6885     try:
6886       rdict = serializer.Load(self.out_text)
6887     except Exception, err:
6888       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6889
6890     if not isinstance(rdict, dict):
6891       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6892
6893     for key in "success", "info", "nodes":
6894       if key not in rdict:
6895         raise errors.OpExecError("Can't parse iallocator results:"
6896                                  " missing key '%s'" % key)
6897       setattr(self, key, rdict[key])
6898
6899     if not isinstance(rdict["nodes"], list):
6900       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6901                                " is not a list")
6902     self.out_data = rdict
6903
6904
6905 class LUTestAllocator(NoHooksLU):
6906   """Run allocator tests.
6907
6908   This LU runs the allocator tests
6909
6910   """
6911   _OP_REQP = ["direction", "mode", "name"]
6912
6913   def CheckPrereq(self):
6914     """Check prerequisites.
6915
6916     This checks the opcode parameters depending on the director and mode test.
6917
6918     """
6919     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6920       for attr in ["name", "mem_size", "disks", "disk_template",
6921                    "os", "tags", "nics", "vcpus"]:
6922         if not hasattr(self.op, attr):
6923           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6924                                      attr)
6925       iname = self.cfg.ExpandInstanceName(self.op.name)
6926       if iname is not None:
6927         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6928                                    iname)
6929       if not isinstance(self.op.nics, list):
6930         raise errors.OpPrereqError("Invalid parameter 'nics'")
6931       for row in self.op.nics:
6932         if (not isinstance(row, dict) or
6933             "mac" not in row or
6934             "ip" not in row or
6935             "bridge" not in row):
6936           raise errors.OpPrereqError("Invalid contents of the"
6937                                      " 'nics' parameter")
6938       if not isinstance(self.op.disks, list):
6939         raise errors.OpPrereqError("Invalid parameter 'disks'")
6940       for row in self.op.disks:
6941         if (not isinstance(row, dict) or
6942             "size" not in row or
6943             not isinstance(row["size"], int) or
6944             "mode" not in row or
6945             row["mode"] not in ['r', 'w']):
6946           raise errors.OpPrereqError("Invalid contents of the"
6947                                      " 'disks' parameter")
6948       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6949         self.op.hypervisor = self.cfg.GetHypervisorType()
6950     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6951       if not hasattr(self.op, "name"):
6952         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6953       fname = self.cfg.ExpandInstanceName(self.op.name)
6954       if fname is None:
6955         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6956                                    self.op.name)
6957       self.op.name = fname
6958       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6959     else:
6960       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6961                                  self.op.mode)
6962
6963     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6964       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6965         raise errors.OpPrereqError("Missing allocator name")
6966     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6967       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6968                                  self.op.direction)
6969
6970   def Exec(self, feedback_fn):
6971     """Run the allocator test.
6972
6973     """
6974     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6975       ial = IAllocator(self,
6976                        mode=self.op.mode,
6977                        name=self.op.name,
6978                        mem_size=self.op.mem_size,
6979                        disks=self.op.disks,
6980                        disk_template=self.op.disk_template,
6981                        os=self.op.os,
6982                        tags=self.op.tags,
6983                        nics=self.op.nics,
6984                        vcpus=self.op.vcpus,
6985                        hypervisor=self.op.hypervisor,
6986                        )
6987     else:
6988       ial = IAllocator(self,
6989                        mode=self.op.mode,
6990                        name=self.op.name,
6991                        relocate_from=list(self.relocate_from),
6992                        )
6993
6994     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6995       result = ial.in_text
6996     else:
6997       ial.Run(self.op.allocator, validate=False)
6998       result = ial.out_text
6999     return result