Change BEGR_DEFAULT to PP_DEFAULT
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, bridge, mac) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
509       env["INSTANCE_NIC%d_MAC" % idx] = mac
510   else:
511     nic_count = 0
512
513   env["INSTANCE_NIC_COUNT"] = nic_count
514
515   if disks:
516     disk_count = len(disks)
517     for idx, (size, mode) in enumerate(disks):
518       env["INSTANCE_DISK%d_SIZE" % idx] = size
519       env["INSTANCE_DISK%d_MODE" % idx] = mode
520   else:
521     disk_count = 0
522
523   env["INSTANCE_DISK_COUNT"] = disk_count
524
525   return env
526
527
528 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
529   """Builds instance related env variables for hooks from an object.
530
531   @type lu: L{LogicalUnit}
532   @param lu: the logical unit on whose behalf we execute
533   @type instance: L{objects.Instance}
534   @param instance: the instance for which we should build the
535       environment
536   @type override: dict
537   @param override: dictionary with key/values that will override
538       our values
539   @rtype: dict
540   @return: the hook environment dictionary
541
542   """
543   bep = lu.cfg.GetClusterInfo().FillBE(instance)
544   args = {
545     'name': instance.name,
546     'primary_node': instance.primary_node,
547     'secondary_nodes': instance.secondary_nodes,
548     'os_type': instance.os,
549     'status': instance.admin_up,
550     'memory': bep[constants.BE_MEMORY],
551     'vcpus': bep[constants.BE_VCPUS],
552     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
553     'disk_template': instance.disk_template,
554     'disks': [(disk.size, disk.mode) for disk in instance.disks],
555   }
556   if override:
557     args.update(override)
558   return _BuildInstanceHookEnv(**args)
559
560
561 def _AdjustCandidatePool(lu):
562   """Adjust the candidate pool after node operations.
563
564   """
565   mod_list = lu.cfg.MaintainCandidatePool()
566   if mod_list:
567     lu.LogInfo("Promoted nodes to master candidate role: %s",
568                ", ".join(node.name for node in mod_list))
569     for name in mod_list:
570       lu.context.ReaddNode(name)
571   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
572   if mc_now > mc_max:
573     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
574                (mc_now, mc_max))
575
576
577 def _CheckInstanceBridgesExist(lu, instance):
578   """Check that the brigdes needed by an instance exist.
579
580   """
581   # check bridges existance
582   brlist = [nic.bridge for nic in instance.nics]
583   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
584   result.Raise()
585   if not result.data:
586     raise errors.OpPrereqError("One or more target bridges %s does not"
587                                " exist on destination node '%s'" %
588                                (brlist, instance.primary_node))
589
590
591 class LUDestroyCluster(NoHooksLU):
592   """Logical unit for destroying the cluster.
593
594   """
595   _OP_REQP = []
596
597   def CheckPrereq(self):
598     """Check prerequisites.
599
600     This checks whether the cluster is empty.
601
602     Any errors are signalled by raising errors.OpPrereqError.
603
604     """
605     master = self.cfg.GetMasterNode()
606
607     nodelist = self.cfg.GetNodeList()
608     if len(nodelist) != 1 or nodelist[0] != master:
609       raise errors.OpPrereqError("There are still %d node(s) in"
610                                  " this cluster." % (len(nodelist) - 1))
611     instancelist = self.cfg.GetInstanceList()
612     if instancelist:
613       raise errors.OpPrereqError("There are still %d instance(s) in"
614                                  " this cluster." % len(instancelist))
615
616   def Exec(self, feedback_fn):
617     """Destroys the cluster.
618
619     """
620     master = self.cfg.GetMasterNode()
621     result = self.rpc.call_node_stop_master(master, False)
622     result.Raise()
623     if not result.data:
624       raise errors.OpExecError("Could not disable the master role")
625     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
626     utils.CreateBackup(priv_key)
627     utils.CreateBackup(pub_key)
628     return master
629
630
631 class LUVerifyCluster(LogicalUnit):
632   """Verifies the cluster status.
633
634   """
635   HPATH = "cluster-verify"
636   HTYPE = constants.HTYPE_CLUSTER
637   _OP_REQP = ["skip_checks"]
638   REQ_BGL = False
639
640   def ExpandNames(self):
641     self.needed_locks = {
642       locking.LEVEL_NODE: locking.ALL_SET,
643       locking.LEVEL_INSTANCE: locking.ALL_SET,
644     }
645     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
646
647   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
648                   node_result, feedback_fn, master_files,
649                   drbd_map, vg_name):
650     """Run multiple tests against a node.
651
652     Test list:
653
654       - compares ganeti version
655       - checks vg existance and size > 20G
656       - checks config file checksum
657       - checks ssh to other nodes
658
659     @type nodeinfo: L{objects.Node}
660     @param nodeinfo: the node to check
661     @param file_list: required list of files
662     @param local_cksum: dictionary of local files and their checksums
663     @param node_result: the results from the node
664     @param feedback_fn: function used to accumulate results
665     @param master_files: list of files that only masters should have
666     @param drbd_map: the useddrbd minors for this node, in
667         form of minor: (instance, must_exist) which correspond to instances
668         and their running status
669     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
670
671     """
672     node = nodeinfo.name
673
674     # main result, node_result should be a non-empty dict
675     if not node_result or not isinstance(node_result, dict):
676       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
677       return True
678
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     remote_version = node_result.get('version', None)
682     if not (remote_version and isinstance(remote_version, (list, tuple)) and
683             len(remote_version) == 2):
684       feedback_fn("  - ERROR: connection to %s failed" % (node))
685       return True
686
687     if local_version != remote_version[0]:
688       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
689                   " node %s %s" % (local_version, node, remote_version[0]))
690       return True
691
692     # node seems compatible, we can actually try to look into its results
693
694     bad = False
695
696     # full package version
697     if constants.RELEASE_VERSION != remote_version[1]:
698       feedback_fn("  - WARNING: software version mismatch: master %s,"
699                   " node %s %s" %
700                   (constants.RELEASE_VERSION, node, remote_version[1]))
701
702     # checks vg existence and size > 20G
703     if vg_name is not None:
704       vglist = node_result.get(constants.NV_VGLIST, None)
705       if not vglist:
706         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
707                         (node,))
708         bad = True
709       else:
710         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
711                                               constants.MIN_VG_SIZE)
712         if vgstatus:
713           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
714           bad = True
715
716     # checks config file checksum
717
718     remote_cksum = node_result.get(constants.NV_FILELIST, None)
719     if not isinstance(remote_cksum, dict):
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned file checksum data")
722     else:
723       for file_name in file_list:
724         node_is_mc = nodeinfo.master_candidate
725         must_have_file = file_name not in master_files
726         if file_name not in remote_cksum:
727           if node_is_mc or must_have_file:
728             bad = True
729             feedback_fn("  - ERROR: file '%s' missing" % file_name)
730         elif remote_cksum[file_name] != local_cksum[file_name]:
731           if node_is_mc or must_have_file:
732             bad = True
733             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
734           else:
735             # not candidate and this is not a must-have file
736             bad = True
737             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
738                         " '%s'" % file_name)
739         else:
740           # all good, except non-master/non-must have combination
741           if not node_is_mc and not must_have_file:
742             feedback_fn("  - ERROR: file '%s' should not exist on non master"
743                         " candidates" % file_name)
744
745     # checks ssh to any
746
747     if constants.NV_NODELIST not in node_result:
748       bad = True
749       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
750     else:
751       if node_result[constants.NV_NODELIST]:
752         bad = True
753         for node in node_result[constants.NV_NODELIST]:
754           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
755                           (node, node_result[constants.NV_NODELIST][node]))
756
757     if constants.NV_NODENETTEST not in node_result:
758       bad = True
759       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
760     else:
761       if node_result[constants.NV_NODENETTEST]:
762         bad = True
763         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
764         for node in nlist:
765           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
766                           (node, node_result[constants.NV_NODENETTEST][node]))
767
768     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
769     if isinstance(hyp_result, dict):
770       for hv_name, hv_result in hyp_result.iteritems():
771         if hv_result is not None:
772           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
773                       (hv_name, hv_result))
774
775     # check used drbd list
776     if vg_name is not None:
777       used_minors = node_result.get(constants.NV_DRBDLIST, [])
778       if not isinstance(used_minors, (tuple, list)):
779         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
780                     str(used_minors))
781       else:
782         for minor, (iname, must_exist) in drbd_map.items():
783           if minor not in used_minors and must_exist:
784             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
785                         " not active" % (minor, iname))
786             bad = True
787         for minor in used_minors:
788           if minor not in drbd_map:
789             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
790                         minor)
791             bad = True
792
793     return bad
794
795   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
796                       node_instance, feedback_fn, n_offline):
797     """Verify an instance.
798
799     This function checks to see if the required block devices are
800     available on the instance's node.
801
802     """
803     bad = False
804
805     node_current = instanceconfig.primary_node
806
807     node_vol_should = {}
808     instanceconfig.MapLVsByNode(node_vol_should)
809
810     for node in node_vol_should:
811       if node in n_offline:
812         # ignore missing volumes on offline nodes
813         continue
814       for volume in node_vol_should[node]:
815         if node not in node_vol_is or volume not in node_vol_is[node]:
816           feedback_fn("  - ERROR: volume %s missing on node %s" %
817                           (volume, node))
818           bad = True
819
820     if instanceconfig.admin_up:
821       if ((node_current not in node_instance or
822           not instance in node_instance[node_current]) and
823           node_current not in n_offline):
824         feedback_fn("  - ERROR: instance %s not running on node %s" %
825                         (instance, node_current))
826         bad = True
827
828     for node in node_instance:
829       if (not node == node_current):
830         if instance in node_instance[node]:
831           feedback_fn("  - ERROR: instance %s should not run on node %s" %
832                           (instance, node))
833           bad = True
834
835     return bad
836
837   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
838     """Verify if there are any unknown volumes in the cluster.
839
840     The .os, .swap and backup volumes are ignored. All other volumes are
841     reported as unknown.
842
843     """
844     bad = False
845
846     for node in node_vol_is:
847       for volume in node_vol_is[node]:
848         if node not in node_vol_should or volume not in node_vol_should[node]:
849           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
850                       (volume, node))
851           bad = True
852     return bad
853
854   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
855     """Verify the list of running instances.
856
857     This checks what instances are running but unknown to the cluster.
858
859     """
860     bad = False
861     for node in node_instance:
862       for runninginstance in node_instance[node]:
863         if runninginstance not in instancelist:
864           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
865                           (runninginstance, node))
866           bad = True
867     return bad
868
869   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
870     """Verify N+1 Memory Resilience.
871
872     Check that if one single node dies we can still start all the instances it
873     was primary for.
874
875     """
876     bad = False
877
878     for node, nodeinfo in node_info.iteritems():
879       # This code checks that every node which is now listed as secondary has
880       # enough memory to host all instances it is supposed to should a single
881       # other node in the cluster fail.
882       # FIXME: not ready for failover to an arbitrary node
883       # FIXME: does not support file-backed instances
884       # WARNING: we currently take into account down instances as well as up
885       # ones, considering that even if they're down someone might want to start
886       # them even in the event of a node failure.
887       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
888         needed_mem = 0
889         for instance in instances:
890           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
891           if bep[constants.BE_AUTO_BALANCE]:
892             needed_mem += bep[constants.BE_MEMORY]
893         if nodeinfo['mfree'] < needed_mem:
894           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
895                       " failovers should node %s fail" % (node, prinode))
896           bad = True
897     return bad
898
899   def CheckPrereq(self):
900     """Check prerequisites.
901
902     Transform the list of checks we're going to skip into a set and check that
903     all its members are valid.
904
905     """
906     self.skip_set = frozenset(self.op.skip_checks)
907     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
908       raise errors.OpPrereqError("Invalid checks to be skipped specified")
909
910   def BuildHooksEnv(self):
911     """Build hooks env.
912
913     Cluster-Verify hooks just rone in the post phase and their failure makes
914     the output be logged in the verify output and the verification to fail.
915
916     """
917     all_nodes = self.cfg.GetNodeList()
918     env = {
919       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
920       }
921     for node in self.cfg.GetAllNodesInfo().values():
922       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
923
924     return env, [], all_nodes
925
926   def Exec(self, feedback_fn):
927     """Verify integrity of cluster, performing various test on nodes.
928
929     """
930     bad = False
931     feedback_fn("* Verifying global settings")
932     for msg in self.cfg.VerifyConfig():
933       feedback_fn("  - ERROR: %s" % msg)
934
935     vg_name = self.cfg.GetVGName()
936     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
937     nodelist = utils.NiceSort(self.cfg.GetNodeList())
938     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
939     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
940     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
941                         for iname in instancelist)
942     i_non_redundant = [] # Non redundant instances
943     i_non_a_balanced = [] # Non auto-balanced instances
944     n_offline = [] # List of offline nodes
945     n_drained = [] # List of nodes being drained
946     node_volume = {}
947     node_instance = {}
948     node_info = {}
949     instance_cfg = {}
950
951     # FIXME: verify OS list
952     # do local checksums
953     master_files = [constants.CLUSTER_CONF_FILE]
954
955     file_names = ssconf.SimpleStore().GetFileList()
956     file_names.append(constants.SSL_CERT_FILE)
957     file_names.append(constants.RAPI_CERT_FILE)
958     file_names.extend(master_files)
959
960     local_checksums = utils.FingerprintFiles(file_names)
961
962     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
963     node_verify_param = {
964       constants.NV_FILELIST: file_names,
965       constants.NV_NODELIST: [node.name for node in nodeinfo
966                               if not node.offline],
967       constants.NV_HYPERVISOR: hypervisors,
968       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
969                                   node.secondary_ip) for node in nodeinfo
970                                  if not node.offline],
971       constants.NV_INSTANCELIST: hypervisors,
972       constants.NV_VERSION: None,
973       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
974       }
975     if vg_name is not None:
976       node_verify_param[constants.NV_VGLIST] = None
977       node_verify_param[constants.NV_LVLIST] = vg_name
978       node_verify_param[constants.NV_DRBDLIST] = None
979     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
980                                            self.cfg.GetClusterName())
981
982     cluster = self.cfg.GetClusterInfo()
983     master_node = self.cfg.GetMasterNode()
984     all_drbd_map = self.cfg.ComputeDRBDMap()
985
986     for node_i in nodeinfo:
987       node = node_i.name
988       nresult = all_nvinfo[node].data
989
990       if node_i.offline:
991         feedback_fn("* Skipping offline node %s" % (node,))
992         n_offline.append(node)
993         continue
994
995       if node == master_node:
996         ntype = "master"
997       elif node_i.master_candidate:
998         ntype = "master candidate"
999       elif node_i.drained:
1000         ntype = "drained"
1001         n_drained.append(node)
1002       else:
1003         ntype = "regular"
1004       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1005
1006       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1007         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1008         bad = True
1009         continue
1010
1011       node_drbd = {}
1012       for minor, instance in all_drbd_map[node].items():
1013         if instance not in instanceinfo:
1014           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1015                       instance)
1016           # ghost instance should not be running, but otherwise we
1017           # don't give double warnings (both ghost instance and
1018           # unallocated minor in use)
1019           node_drbd[minor] = (instance, False)
1020         else:
1021           instance = instanceinfo[instance]
1022           node_drbd[minor] = (instance.name, instance.admin_up)
1023       result = self._VerifyNode(node_i, file_names, local_checksums,
1024                                 nresult, feedback_fn, master_files,
1025                                 node_drbd, vg_name)
1026       bad = bad or result
1027
1028       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1029       if vg_name is None:
1030         node_volume[node] = {}
1031       elif isinstance(lvdata, basestring):
1032         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1033                     (node, utils.SafeEncode(lvdata)))
1034         bad = True
1035         node_volume[node] = {}
1036       elif not isinstance(lvdata, dict):
1037         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1038         bad = True
1039         continue
1040       else:
1041         node_volume[node] = lvdata
1042
1043       # node_instance
1044       idata = nresult.get(constants.NV_INSTANCELIST, None)
1045       if not isinstance(idata, list):
1046         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1047                     (node,))
1048         bad = True
1049         continue
1050
1051       node_instance[node] = idata
1052
1053       # node_info
1054       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1055       if not isinstance(nodeinfo, dict):
1056         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1057         bad = True
1058         continue
1059
1060       try:
1061         node_info[node] = {
1062           "mfree": int(nodeinfo['memory_free']),
1063           "pinst": [],
1064           "sinst": [],
1065           # dictionary holding all instances this node is secondary for,
1066           # grouped by their primary node. Each key is a cluster node, and each
1067           # value is a list of instances which have the key as primary and the
1068           # current node as secondary.  this is handy to calculate N+1 memory
1069           # availability if you can only failover from a primary to its
1070           # secondary.
1071           "sinst-by-pnode": {},
1072         }
1073         # FIXME: devise a free space model for file based instances as well
1074         if vg_name is not None:
1075           if (constants.NV_VGLIST not in nresult or
1076               vg_name not in nresult[constants.NV_VGLIST]):
1077             feedback_fn("  - ERROR: node %s didn't return data for the"
1078                         " volume group '%s' - it is either missing or broken" %
1079                         (node, vg_name))
1080             bad = True
1081             continue
1082           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1083       except (ValueError, KeyError):
1084         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1085                     " from node %s" % (node,))
1086         bad = True
1087         continue
1088
1089     node_vol_should = {}
1090
1091     for instance in instancelist:
1092       feedback_fn("* Verifying instance %s" % instance)
1093       inst_config = instanceinfo[instance]
1094       result =  self._VerifyInstance(instance, inst_config, node_volume,
1095                                      node_instance, feedback_fn, n_offline)
1096       bad = bad or result
1097       inst_nodes_offline = []
1098
1099       inst_config.MapLVsByNode(node_vol_should)
1100
1101       instance_cfg[instance] = inst_config
1102
1103       pnode = inst_config.primary_node
1104       if pnode in node_info:
1105         node_info[pnode]['pinst'].append(instance)
1106       elif pnode not in n_offline:
1107         feedback_fn("  - ERROR: instance %s, connection to primary node"
1108                     " %s failed" % (instance, pnode))
1109         bad = True
1110
1111       if pnode in n_offline:
1112         inst_nodes_offline.append(pnode)
1113
1114       # If the instance is non-redundant we cannot survive losing its primary
1115       # node, so we are not N+1 compliant. On the other hand we have no disk
1116       # templates with more than one secondary so that situation is not well
1117       # supported either.
1118       # FIXME: does not support file-backed instances
1119       if len(inst_config.secondary_nodes) == 0:
1120         i_non_redundant.append(instance)
1121       elif len(inst_config.secondary_nodes) > 1:
1122         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1123                     % instance)
1124
1125       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1126         i_non_a_balanced.append(instance)
1127
1128       for snode in inst_config.secondary_nodes:
1129         if snode in node_info:
1130           node_info[snode]['sinst'].append(instance)
1131           if pnode not in node_info[snode]['sinst-by-pnode']:
1132             node_info[snode]['sinst-by-pnode'][pnode] = []
1133           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1134         elif snode not in n_offline:
1135           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1136                       " %s failed" % (instance, snode))
1137           bad = True
1138         if snode in n_offline:
1139           inst_nodes_offline.append(snode)
1140
1141       if inst_nodes_offline:
1142         # warn that the instance lives on offline nodes, and set bad=True
1143         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1144                     ", ".join(inst_nodes_offline))
1145         bad = True
1146
1147     feedback_fn("* Verifying orphan volumes")
1148     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1149                                        feedback_fn)
1150     bad = bad or result
1151
1152     feedback_fn("* Verifying remaining instances")
1153     result = self._VerifyOrphanInstances(instancelist, node_instance,
1154                                          feedback_fn)
1155     bad = bad or result
1156
1157     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1158       feedback_fn("* Verifying N+1 Memory redundancy")
1159       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1160       bad = bad or result
1161
1162     feedback_fn("* Other Notes")
1163     if i_non_redundant:
1164       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1165                   % len(i_non_redundant))
1166
1167     if i_non_a_balanced:
1168       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1169                   % len(i_non_a_balanced))
1170
1171     if n_offline:
1172       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1173
1174     if n_drained:
1175       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1176
1177     return not bad
1178
1179   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1180     """Analize the post-hooks' result
1181
1182     This method analyses the hook result, handles it, and sends some
1183     nicely-formatted feedback back to the user.
1184
1185     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1186         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1187     @param hooks_results: the results of the multi-node hooks rpc call
1188     @param feedback_fn: function used send feedback back to the caller
1189     @param lu_result: previous Exec result
1190     @return: the new Exec result, based on the previous result
1191         and hook results
1192
1193     """
1194     # We only really run POST phase hooks, and are only interested in
1195     # their results
1196     if phase == constants.HOOKS_PHASE_POST:
1197       # Used to change hooks' output to proper indentation
1198       indent_re = re.compile('^', re.M)
1199       feedback_fn("* Hooks Results")
1200       if not hooks_results:
1201         feedback_fn("  - ERROR: general communication failure")
1202         lu_result = 1
1203       else:
1204         for node_name in hooks_results:
1205           show_node_header = True
1206           res = hooks_results[node_name]
1207           if res.failed or res.data is False or not isinstance(res.data, list):
1208             if res.offline:
1209               # no need to warn or set fail return value
1210               continue
1211             feedback_fn("    Communication failure in hooks execution")
1212             lu_result = 1
1213             continue
1214           for script, hkr, output in res.data:
1215             if hkr == constants.HKR_FAIL:
1216               # The node header is only shown once, if there are
1217               # failing hooks on that node
1218               if show_node_header:
1219                 feedback_fn("  Node %s:" % node_name)
1220                 show_node_header = False
1221               feedback_fn("    ERROR: Script %s failed, output:" % script)
1222               output = indent_re.sub('      ', output)
1223               feedback_fn("%s" % output)
1224               lu_result = 1
1225
1226       return lu_result
1227
1228
1229 class LUVerifyDisks(NoHooksLU):
1230   """Verifies the cluster disks status.
1231
1232   """
1233   _OP_REQP = []
1234   REQ_BGL = False
1235
1236   def ExpandNames(self):
1237     self.needed_locks = {
1238       locking.LEVEL_NODE: locking.ALL_SET,
1239       locking.LEVEL_INSTANCE: locking.ALL_SET,
1240     }
1241     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1242
1243   def CheckPrereq(self):
1244     """Check prerequisites.
1245
1246     This has no prerequisites.
1247
1248     """
1249     pass
1250
1251   def Exec(self, feedback_fn):
1252     """Verify integrity of cluster disks.
1253
1254     """
1255     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1256
1257     vg_name = self.cfg.GetVGName()
1258     nodes = utils.NiceSort(self.cfg.GetNodeList())
1259     instances = [self.cfg.GetInstanceInfo(name)
1260                  for name in self.cfg.GetInstanceList()]
1261
1262     nv_dict = {}
1263     for inst in instances:
1264       inst_lvs = {}
1265       if (not inst.admin_up or
1266           inst.disk_template not in constants.DTS_NET_MIRROR):
1267         continue
1268       inst.MapLVsByNode(inst_lvs)
1269       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1270       for node, vol_list in inst_lvs.iteritems():
1271         for vol in vol_list:
1272           nv_dict[(node, vol)] = inst
1273
1274     if not nv_dict:
1275       return result
1276
1277     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1278
1279     to_act = set()
1280     for node in nodes:
1281       # node_volume
1282       lvs = node_lvs[node]
1283       if lvs.failed:
1284         if not lvs.offline:
1285           self.LogWarning("Connection to node %s failed: %s" %
1286                           (node, lvs.data))
1287         continue
1288       lvs = lvs.data
1289       if isinstance(lvs, basestring):
1290         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1291         res_nlvm[node] = lvs
1292         continue
1293       elif not isinstance(lvs, dict):
1294         logging.warning("Connection to node %s failed or invalid data"
1295                         " returned", node)
1296         res_nodes.append(node)
1297         continue
1298
1299       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1300         inst = nv_dict.pop((node, lv_name), None)
1301         if (not lv_online and inst is not None
1302             and inst.name not in res_instances):
1303           res_instances.append(inst.name)
1304
1305     # any leftover items in nv_dict are missing LVs, let's arrange the
1306     # data better
1307     for key, inst in nv_dict.iteritems():
1308       if inst.name not in res_missing:
1309         res_missing[inst.name] = []
1310       res_missing[inst.name].append(key)
1311
1312     return result
1313
1314
1315 class LURenameCluster(LogicalUnit):
1316   """Rename the cluster.
1317
1318   """
1319   HPATH = "cluster-rename"
1320   HTYPE = constants.HTYPE_CLUSTER
1321   _OP_REQP = ["name"]
1322
1323   def BuildHooksEnv(self):
1324     """Build hooks env.
1325
1326     """
1327     env = {
1328       "OP_TARGET": self.cfg.GetClusterName(),
1329       "NEW_NAME": self.op.name,
1330       }
1331     mn = self.cfg.GetMasterNode()
1332     return env, [mn], [mn]
1333
1334   def CheckPrereq(self):
1335     """Verify that the passed name is a valid one.
1336
1337     """
1338     hostname = utils.HostInfo(self.op.name)
1339
1340     new_name = hostname.name
1341     self.ip = new_ip = hostname.ip
1342     old_name = self.cfg.GetClusterName()
1343     old_ip = self.cfg.GetMasterIP()
1344     if new_name == old_name and new_ip == old_ip:
1345       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1346                                  " cluster has changed")
1347     if new_ip != old_ip:
1348       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1349         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1350                                    " reachable on the network. Aborting." %
1351                                    new_ip)
1352
1353     self.op.name = new_name
1354
1355   def Exec(self, feedback_fn):
1356     """Rename the cluster.
1357
1358     """
1359     clustername = self.op.name
1360     ip = self.ip
1361
1362     # shutdown the master IP
1363     master = self.cfg.GetMasterNode()
1364     result = self.rpc.call_node_stop_master(master, False)
1365     if result.failed or not result.data:
1366       raise errors.OpExecError("Could not disable the master role")
1367
1368     try:
1369       cluster = self.cfg.GetClusterInfo()
1370       cluster.cluster_name = clustername
1371       cluster.master_ip = ip
1372       self.cfg.Update(cluster)
1373
1374       # update the known hosts file
1375       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1376       node_list = self.cfg.GetNodeList()
1377       try:
1378         node_list.remove(master)
1379       except ValueError:
1380         pass
1381       result = self.rpc.call_upload_file(node_list,
1382                                          constants.SSH_KNOWN_HOSTS_FILE)
1383       for to_node, to_result in result.iteritems():
1384          msg = to_result.RemoteFailMsg()
1385          if msg:
1386            msg = ("Copy of file %s to node %s failed: %s" %
1387                    (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1388            self.proc.LogWarning(msg)
1389
1390     finally:
1391       result = self.rpc.call_node_start_master(master, False)
1392       if result.failed or not result.data:
1393         self.LogWarning("Could not re-enable the master role on"
1394                         " the master, please restart manually.")
1395
1396
1397 def _RecursiveCheckIfLVMBased(disk):
1398   """Check if the given disk or its children are lvm-based.
1399
1400   @type disk: L{objects.Disk}
1401   @param disk: the disk to check
1402   @rtype: booleean
1403   @return: boolean indicating whether a LD_LV dev_type was found or not
1404
1405   """
1406   if disk.children:
1407     for chdisk in disk.children:
1408       if _RecursiveCheckIfLVMBased(chdisk):
1409         return True
1410   return disk.dev_type == constants.LD_LV
1411
1412
1413 class LUSetClusterParams(LogicalUnit):
1414   """Change the parameters of the cluster.
1415
1416   """
1417   HPATH = "cluster-modify"
1418   HTYPE = constants.HTYPE_CLUSTER
1419   _OP_REQP = []
1420   REQ_BGL = False
1421
1422   def CheckArguments(self):
1423     """Check parameters
1424
1425     """
1426     if not hasattr(self.op, "candidate_pool_size"):
1427       self.op.candidate_pool_size = None
1428     if self.op.candidate_pool_size is not None:
1429       try:
1430         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1431       except (ValueError, TypeError), err:
1432         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1433                                    str(err))
1434       if self.op.candidate_pool_size < 1:
1435         raise errors.OpPrereqError("At least one master candidate needed")
1436
1437   def ExpandNames(self):
1438     # FIXME: in the future maybe other cluster params won't require checking on
1439     # all nodes to be modified.
1440     self.needed_locks = {
1441       locking.LEVEL_NODE: locking.ALL_SET,
1442     }
1443     self.share_locks[locking.LEVEL_NODE] = 1
1444
1445   def BuildHooksEnv(self):
1446     """Build hooks env.
1447
1448     """
1449     env = {
1450       "OP_TARGET": self.cfg.GetClusterName(),
1451       "NEW_VG_NAME": self.op.vg_name,
1452       }
1453     mn = self.cfg.GetMasterNode()
1454     return env, [mn], [mn]
1455
1456   def CheckPrereq(self):
1457     """Check prerequisites.
1458
1459     This checks whether the given params don't conflict and
1460     if the given volume group is valid.
1461
1462     """
1463     if self.op.vg_name is not None and not self.op.vg_name:
1464       instances = self.cfg.GetAllInstancesInfo().values()
1465       for inst in instances:
1466         for disk in inst.disks:
1467           if _RecursiveCheckIfLVMBased(disk):
1468             raise errors.OpPrereqError("Cannot disable lvm storage while"
1469                                        " lvm-based instances exist")
1470
1471     node_list = self.acquired_locks[locking.LEVEL_NODE]
1472
1473     # if vg_name not None, checks given volume group on all nodes
1474     if self.op.vg_name:
1475       vglist = self.rpc.call_vg_list(node_list)
1476       for node in node_list:
1477         if vglist[node].failed:
1478           # ignoring down node
1479           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1480           continue
1481         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1482                                               self.op.vg_name,
1483                                               constants.MIN_VG_SIZE)
1484         if vgstatus:
1485           raise errors.OpPrereqError("Error on node '%s': %s" %
1486                                      (node, vgstatus))
1487
1488     self.cluster = cluster = self.cfg.GetClusterInfo()
1489     # validate beparams changes
1490     if self.op.beparams:
1491       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1492       self.new_beparams = objects.FillDict(
1493         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1494
1495     # hypervisor list/parameters
1496     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1497     if self.op.hvparams:
1498       if not isinstance(self.op.hvparams, dict):
1499         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1500       for hv_name, hv_dict in self.op.hvparams.items():
1501         if hv_name not in self.new_hvparams:
1502           self.new_hvparams[hv_name] = hv_dict
1503         else:
1504           self.new_hvparams[hv_name].update(hv_dict)
1505
1506     if self.op.enabled_hypervisors is not None:
1507       self.hv_list = self.op.enabled_hypervisors
1508     else:
1509       self.hv_list = cluster.enabled_hypervisors
1510
1511     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1512       # either the enabled list has changed, or the parameters have, validate
1513       for hv_name, hv_params in self.new_hvparams.items():
1514         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1515             (self.op.enabled_hypervisors and
1516              hv_name in self.op.enabled_hypervisors)):
1517           # either this is a new hypervisor, or its parameters have changed
1518           hv_class = hypervisor.GetHypervisor(hv_name)
1519           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1520           hv_class.CheckParameterSyntax(hv_params)
1521           _CheckHVParams(self, node_list, hv_name, hv_params)
1522
1523   def Exec(self, feedback_fn):
1524     """Change the parameters of the cluster.
1525
1526     """
1527     if self.op.vg_name is not None:
1528       new_volume = self.op.vg_name
1529       if not new_volume:
1530         new_volume = None
1531       if new_volume != self.cfg.GetVGName():
1532         self.cfg.SetVGName(new_volume)
1533       else:
1534         feedback_fn("Cluster LVM configuration already in desired"
1535                     " state, not changing")
1536     if self.op.hvparams:
1537       self.cluster.hvparams = self.new_hvparams
1538     if self.op.enabled_hypervisors is not None:
1539       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1540     if self.op.beparams:
1541       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1542     if self.op.candidate_pool_size is not None:
1543       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1544
1545     self.cfg.Update(self.cluster)
1546
1547     # we want to update nodes after the cluster so that if any errors
1548     # happen, we have recorded and saved the cluster info
1549     if self.op.candidate_pool_size is not None:
1550       _AdjustCandidatePool(self)
1551
1552
1553 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1554   """Distribute additional files which are part of the cluster configuration.
1555
1556   ConfigWriter takes care of distributing the config and ssconf files, but
1557   there are more files which should be distributed to all nodes. This function
1558   makes sure those are copied.
1559
1560   @param lu: calling logical unit
1561   @param additional_nodes: list of nodes not in the config to distribute to
1562
1563   """
1564   # 1. Gather target nodes
1565   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1566   dist_nodes = lu.cfg.GetNodeList()
1567   if additional_nodes is not None:
1568     dist_nodes.extend(additional_nodes)
1569   if myself.name in dist_nodes:
1570     dist_nodes.remove(myself.name)
1571   # 2. Gather files to distribute
1572   dist_files = set([constants.ETC_HOSTS,
1573                     constants.SSH_KNOWN_HOSTS_FILE,
1574                     constants.RAPI_CERT_FILE,
1575                     constants.RAPI_USERS_FILE,
1576                    ])
1577
1578   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1579   for hv_name in enabled_hypervisors:
1580     hv_class = hypervisor.GetHypervisor(hv_name)
1581     dist_files.update(hv_class.GetAncillaryFiles())
1582
1583   # 3. Perform the files upload
1584   for fname in dist_files:
1585     if os.path.exists(fname):
1586       result = lu.rpc.call_upload_file(dist_nodes, fname)
1587       for to_node, to_result in result.items():
1588          msg = to_result.RemoteFailMsg()
1589          if msg:
1590            msg = ("Copy of file %s to node %s failed: %s" %
1591                    (fname, to_node, msg))
1592            lu.proc.LogWarning(msg)
1593
1594
1595 class LURedistributeConfig(NoHooksLU):
1596   """Force the redistribution of cluster configuration.
1597
1598   This is a very simple LU.
1599
1600   """
1601   _OP_REQP = []
1602   REQ_BGL = False
1603
1604   def ExpandNames(self):
1605     self.needed_locks = {
1606       locking.LEVEL_NODE: locking.ALL_SET,
1607     }
1608     self.share_locks[locking.LEVEL_NODE] = 1
1609
1610   def CheckPrereq(self):
1611     """Check prerequisites.
1612
1613     """
1614
1615   def Exec(self, feedback_fn):
1616     """Redistribute the configuration.
1617
1618     """
1619     self.cfg.Update(self.cfg.GetClusterInfo())
1620     _RedistributeAncillaryFiles(self)
1621
1622
1623 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1624   """Sleep and poll for an instance's disk to sync.
1625
1626   """
1627   if not instance.disks:
1628     return True
1629
1630   if not oneshot:
1631     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1632
1633   node = instance.primary_node
1634
1635   for dev in instance.disks:
1636     lu.cfg.SetDiskID(dev, node)
1637
1638   retries = 0
1639   while True:
1640     max_time = 0
1641     done = True
1642     cumul_degraded = False
1643     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1644     if rstats.failed or not rstats.data:
1645       lu.LogWarning("Can't get any data from node %s", node)
1646       retries += 1
1647       if retries >= 10:
1648         raise errors.RemoteError("Can't contact node %s for mirror data,"
1649                                  " aborting." % node)
1650       time.sleep(6)
1651       continue
1652     rstats = rstats.data
1653     retries = 0
1654     for i, mstat in enumerate(rstats):
1655       if mstat is None:
1656         lu.LogWarning("Can't compute data for node %s/%s",
1657                            node, instance.disks[i].iv_name)
1658         continue
1659       # we ignore the ldisk parameter
1660       perc_done, est_time, is_degraded, _ = mstat
1661       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1662       if perc_done is not None:
1663         done = False
1664         if est_time is not None:
1665           rem_time = "%d estimated seconds remaining" % est_time
1666           max_time = est_time
1667         else:
1668           rem_time = "no time estimate"
1669         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1670                         (instance.disks[i].iv_name, perc_done, rem_time))
1671     if done or oneshot:
1672       break
1673
1674     time.sleep(min(60, max_time))
1675
1676   if done:
1677     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1678   return not cumul_degraded
1679
1680
1681 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1682   """Check that mirrors are not degraded.
1683
1684   The ldisk parameter, if True, will change the test from the
1685   is_degraded attribute (which represents overall non-ok status for
1686   the device(s)) to the ldisk (representing the local storage status).
1687
1688   """
1689   lu.cfg.SetDiskID(dev, node)
1690   if ldisk:
1691     idx = 6
1692   else:
1693     idx = 5
1694
1695   result = True
1696   if on_primary or dev.AssembleOnSecondary():
1697     rstats = lu.rpc.call_blockdev_find(node, dev)
1698     msg = rstats.RemoteFailMsg()
1699     if msg:
1700       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1701       result = False
1702     elif not rstats.payload:
1703       lu.LogWarning("Can't find disk on node %s", node)
1704       result = False
1705     else:
1706       result = result and (not rstats.payload[idx])
1707   if dev.children:
1708     for child in dev.children:
1709       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1710
1711   return result
1712
1713
1714 class LUDiagnoseOS(NoHooksLU):
1715   """Logical unit for OS diagnose/query.
1716
1717   """
1718   _OP_REQP = ["output_fields", "names"]
1719   REQ_BGL = False
1720   _FIELDS_STATIC = utils.FieldSet()
1721   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1722
1723   def ExpandNames(self):
1724     if self.op.names:
1725       raise errors.OpPrereqError("Selective OS query not supported")
1726
1727     _CheckOutputFields(static=self._FIELDS_STATIC,
1728                        dynamic=self._FIELDS_DYNAMIC,
1729                        selected=self.op.output_fields)
1730
1731     # Lock all nodes, in shared mode
1732     # Temporary removal of locks, should be reverted later
1733     # TODO: reintroduce locks when they are lighter-weight
1734     self.needed_locks = {}
1735     #self.share_locks[locking.LEVEL_NODE] = 1
1736     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1737
1738   def CheckPrereq(self):
1739     """Check prerequisites.
1740
1741     """
1742
1743   @staticmethod
1744   def _DiagnoseByOS(node_list, rlist):
1745     """Remaps a per-node return list into an a per-os per-node dictionary
1746
1747     @param node_list: a list with the names of all nodes
1748     @param rlist: a map with node names as keys and OS objects as values
1749
1750     @rtype: dict
1751     @return: a dictionary with osnames as keys and as value another map, with
1752         nodes as keys and list of OS objects as values, eg::
1753
1754           {"debian-etch": {"node1": [<object>,...],
1755                            "node2": [<object>,]}
1756           }
1757
1758     """
1759     all_os = {}
1760     # we build here the list of nodes that didn't fail the RPC (at RPC
1761     # level), so that nodes with a non-responding node daemon don't
1762     # make all OSes invalid
1763     good_nodes = [node_name for node_name in rlist
1764                   if not rlist[node_name].failed]
1765     for node_name, nr in rlist.iteritems():
1766       if nr.failed or not nr.data:
1767         continue
1768       for os_obj in nr.data:
1769         if os_obj.name not in all_os:
1770           # build a list of nodes for this os containing empty lists
1771           # for each node in node_list
1772           all_os[os_obj.name] = {}
1773           for nname in good_nodes:
1774             all_os[os_obj.name][nname] = []
1775         all_os[os_obj.name][node_name].append(os_obj)
1776     return all_os
1777
1778   def Exec(self, feedback_fn):
1779     """Compute the list of OSes.
1780
1781     """
1782     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1783     node_data = self.rpc.call_os_diagnose(valid_nodes)
1784     if node_data == False:
1785       raise errors.OpExecError("Can't gather the list of OSes")
1786     pol = self._DiagnoseByOS(valid_nodes, node_data)
1787     output = []
1788     for os_name, os_data in pol.iteritems():
1789       row = []
1790       for field in self.op.output_fields:
1791         if field == "name":
1792           val = os_name
1793         elif field == "valid":
1794           val = utils.all([osl and osl[0] for osl in os_data.values()])
1795         elif field == "node_status":
1796           val = {}
1797           for node_name, nos_list in os_data.iteritems():
1798             val[node_name] = [(v.status, v.path) for v in nos_list]
1799         else:
1800           raise errors.ParameterError(field)
1801         row.append(val)
1802       output.append(row)
1803
1804     return output
1805
1806
1807 class LURemoveNode(LogicalUnit):
1808   """Logical unit for removing a node.
1809
1810   """
1811   HPATH = "node-remove"
1812   HTYPE = constants.HTYPE_NODE
1813   _OP_REQP = ["node_name"]
1814
1815   def BuildHooksEnv(self):
1816     """Build hooks env.
1817
1818     This doesn't run on the target node in the pre phase as a failed
1819     node would then be impossible to remove.
1820
1821     """
1822     env = {
1823       "OP_TARGET": self.op.node_name,
1824       "NODE_NAME": self.op.node_name,
1825       }
1826     all_nodes = self.cfg.GetNodeList()
1827     all_nodes.remove(self.op.node_name)
1828     return env, all_nodes, all_nodes
1829
1830   def CheckPrereq(self):
1831     """Check prerequisites.
1832
1833     This checks:
1834      - the node exists in the configuration
1835      - it does not have primary or secondary instances
1836      - it's not the master
1837
1838     Any errors are signalled by raising errors.OpPrereqError.
1839
1840     """
1841     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1842     if node is None:
1843       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1844
1845     instance_list = self.cfg.GetInstanceList()
1846
1847     masternode = self.cfg.GetMasterNode()
1848     if node.name == masternode:
1849       raise errors.OpPrereqError("Node is the master node,"
1850                                  " you need to failover first.")
1851
1852     for instance_name in instance_list:
1853       instance = self.cfg.GetInstanceInfo(instance_name)
1854       if node.name in instance.all_nodes:
1855         raise errors.OpPrereqError("Instance %s is still running on the node,"
1856                                    " please remove first." % instance_name)
1857     self.op.node_name = node.name
1858     self.node = node
1859
1860   def Exec(self, feedback_fn):
1861     """Removes the node from the cluster.
1862
1863     """
1864     node = self.node
1865     logging.info("Stopping the node daemon and removing configs from node %s",
1866                  node.name)
1867
1868     self.context.RemoveNode(node.name)
1869
1870     self.rpc.call_node_leave_cluster(node.name)
1871
1872     # Promote nodes to master candidate as needed
1873     _AdjustCandidatePool(self)
1874
1875
1876 class LUQueryNodes(NoHooksLU):
1877   """Logical unit for querying nodes.
1878
1879   """
1880   _OP_REQP = ["output_fields", "names", "use_locking"]
1881   REQ_BGL = False
1882   _FIELDS_DYNAMIC = utils.FieldSet(
1883     "dtotal", "dfree",
1884     "mtotal", "mnode", "mfree",
1885     "bootid",
1886     "ctotal", "cnodes", "csockets",
1887     )
1888
1889   _FIELDS_STATIC = utils.FieldSet(
1890     "name", "pinst_cnt", "sinst_cnt",
1891     "pinst_list", "sinst_list",
1892     "pip", "sip", "tags",
1893     "serial_no",
1894     "master_candidate",
1895     "master",
1896     "offline",
1897     "drained",
1898     )
1899
1900   def ExpandNames(self):
1901     _CheckOutputFields(static=self._FIELDS_STATIC,
1902                        dynamic=self._FIELDS_DYNAMIC,
1903                        selected=self.op.output_fields)
1904
1905     self.needed_locks = {}
1906     self.share_locks[locking.LEVEL_NODE] = 1
1907
1908     if self.op.names:
1909       self.wanted = _GetWantedNodes(self, self.op.names)
1910     else:
1911       self.wanted = locking.ALL_SET
1912
1913     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1914     self.do_locking = self.do_node_query and self.op.use_locking
1915     if self.do_locking:
1916       # if we don't request only static fields, we need to lock the nodes
1917       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1918
1919
1920   def CheckPrereq(self):
1921     """Check prerequisites.
1922
1923     """
1924     # The validation of the node list is done in the _GetWantedNodes,
1925     # if non empty, and if empty, there's no validation to do
1926     pass
1927
1928   def Exec(self, feedback_fn):
1929     """Computes the list of nodes and their attributes.
1930
1931     """
1932     all_info = self.cfg.GetAllNodesInfo()
1933     if self.do_locking:
1934       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1935     elif self.wanted != locking.ALL_SET:
1936       nodenames = self.wanted
1937       missing = set(nodenames).difference(all_info.keys())
1938       if missing:
1939         raise errors.OpExecError(
1940           "Some nodes were removed before retrieving their data: %s" % missing)
1941     else:
1942       nodenames = all_info.keys()
1943
1944     nodenames = utils.NiceSort(nodenames)
1945     nodelist = [all_info[name] for name in nodenames]
1946
1947     # begin data gathering
1948
1949     if self.do_node_query:
1950       live_data = {}
1951       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1952                                           self.cfg.GetHypervisorType())
1953       for name in nodenames:
1954         nodeinfo = node_data[name]
1955         if not nodeinfo.failed and nodeinfo.data:
1956           nodeinfo = nodeinfo.data
1957           fn = utils.TryConvert
1958           live_data[name] = {
1959             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1960             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1961             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1962             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1963             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1964             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1965             "bootid": nodeinfo.get('bootid', None),
1966             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1967             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1968             }
1969         else:
1970           live_data[name] = {}
1971     else:
1972       live_data = dict.fromkeys(nodenames, {})
1973
1974     node_to_primary = dict([(name, set()) for name in nodenames])
1975     node_to_secondary = dict([(name, set()) for name in nodenames])
1976
1977     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1978                              "sinst_cnt", "sinst_list"))
1979     if inst_fields & frozenset(self.op.output_fields):
1980       instancelist = self.cfg.GetInstanceList()
1981
1982       for instance_name in instancelist:
1983         inst = self.cfg.GetInstanceInfo(instance_name)
1984         if inst.primary_node in node_to_primary:
1985           node_to_primary[inst.primary_node].add(inst.name)
1986         for secnode in inst.secondary_nodes:
1987           if secnode in node_to_secondary:
1988             node_to_secondary[secnode].add(inst.name)
1989
1990     master_node = self.cfg.GetMasterNode()
1991
1992     # end data gathering
1993
1994     output = []
1995     for node in nodelist:
1996       node_output = []
1997       for field in self.op.output_fields:
1998         if field == "name":
1999           val = node.name
2000         elif field == "pinst_list":
2001           val = list(node_to_primary[node.name])
2002         elif field == "sinst_list":
2003           val = list(node_to_secondary[node.name])
2004         elif field == "pinst_cnt":
2005           val = len(node_to_primary[node.name])
2006         elif field == "sinst_cnt":
2007           val = len(node_to_secondary[node.name])
2008         elif field == "pip":
2009           val = node.primary_ip
2010         elif field == "sip":
2011           val = node.secondary_ip
2012         elif field == "tags":
2013           val = list(node.GetTags())
2014         elif field == "serial_no":
2015           val = node.serial_no
2016         elif field == "master_candidate":
2017           val = node.master_candidate
2018         elif field == "master":
2019           val = node.name == master_node
2020         elif field == "offline":
2021           val = node.offline
2022         elif field == "drained":
2023           val = node.drained
2024         elif self._FIELDS_DYNAMIC.Matches(field):
2025           val = live_data[node.name].get(field, None)
2026         else:
2027           raise errors.ParameterError(field)
2028         node_output.append(val)
2029       output.append(node_output)
2030
2031     return output
2032
2033
2034 class LUQueryNodeVolumes(NoHooksLU):
2035   """Logical unit for getting volumes on node(s).
2036
2037   """
2038   _OP_REQP = ["nodes", "output_fields"]
2039   REQ_BGL = False
2040   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2041   _FIELDS_STATIC = utils.FieldSet("node")
2042
2043   def ExpandNames(self):
2044     _CheckOutputFields(static=self._FIELDS_STATIC,
2045                        dynamic=self._FIELDS_DYNAMIC,
2046                        selected=self.op.output_fields)
2047
2048     self.needed_locks = {}
2049     self.share_locks[locking.LEVEL_NODE] = 1
2050     if not self.op.nodes:
2051       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2052     else:
2053       self.needed_locks[locking.LEVEL_NODE] = \
2054         _GetWantedNodes(self, self.op.nodes)
2055
2056   def CheckPrereq(self):
2057     """Check prerequisites.
2058
2059     This checks that the fields required are valid output fields.
2060
2061     """
2062     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2063
2064   def Exec(self, feedback_fn):
2065     """Computes the list of nodes and their attributes.
2066
2067     """
2068     nodenames = self.nodes
2069     volumes = self.rpc.call_node_volumes(nodenames)
2070
2071     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2072              in self.cfg.GetInstanceList()]
2073
2074     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2075
2076     output = []
2077     for node in nodenames:
2078       if node not in volumes or volumes[node].failed or not volumes[node].data:
2079         continue
2080
2081       node_vols = volumes[node].data[:]
2082       node_vols.sort(key=lambda vol: vol['dev'])
2083
2084       for vol in node_vols:
2085         node_output = []
2086         for field in self.op.output_fields:
2087           if field == "node":
2088             val = node
2089           elif field == "phys":
2090             val = vol['dev']
2091           elif field == "vg":
2092             val = vol['vg']
2093           elif field == "name":
2094             val = vol['name']
2095           elif field == "size":
2096             val = int(float(vol['size']))
2097           elif field == "instance":
2098             for inst in ilist:
2099               if node not in lv_by_node[inst]:
2100                 continue
2101               if vol['name'] in lv_by_node[inst][node]:
2102                 val = inst.name
2103                 break
2104             else:
2105               val = '-'
2106           else:
2107             raise errors.ParameterError(field)
2108           node_output.append(str(val))
2109
2110         output.append(node_output)
2111
2112     return output
2113
2114
2115 class LUAddNode(LogicalUnit):
2116   """Logical unit for adding node to the cluster.
2117
2118   """
2119   HPATH = "node-add"
2120   HTYPE = constants.HTYPE_NODE
2121   _OP_REQP = ["node_name"]
2122
2123   def BuildHooksEnv(self):
2124     """Build hooks env.
2125
2126     This will run on all nodes before, and on all nodes + the new node after.
2127
2128     """
2129     env = {
2130       "OP_TARGET": self.op.node_name,
2131       "NODE_NAME": self.op.node_name,
2132       "NODE_PIP": self.op.primary_ip,
2133       "NODE_SIP": self.op.secondary_ip,
2134       }
2135     nodes_0 = self.cfg.GetNodeList()
2136     nodes_1 = nodes_0 + [self.op.node_name, ]
2137     return env, nodes_0, nodes_1
2138
2139   def CheckPrereq(self):
2140     """Check prerequisites.
2141
2142     This checks:
2143      - the new node is not already in the config
2144      - it is resolvable
2145      - its parameters (single/dual homed) matches the cluster
2146
2147     Any errors are signalled by raising errors.OpPrereqError.
2148
2149     """
2150     node_name = self.op.node_name
2151     cfg = self.cfg
2152
2153     dns_data = utils.HostInfo(node_name)
2154
2155     node = dns_data.name
2156     primary_ip = self.op.primary_ip = dns_data.ip
2157     secondary_ip = getattr(self.op, "secondary_ip", None)
2158     if secondary_ip is None:
2159       secondary_ip = primary_ip
2160     if not utils.IsValidIP(secondary_ip):
2161       raise errors.OpPrereqError("Invalid secondary IP given")
2162     self.op.secondary_ip = secondary_ip
2163
2164     node_list = cfg.GetNodeList()
2165     if not self.op.readd and node in node_list:
2166       raise errors.OpPrereqError("Node %s is already in the configuration" %
2167                                  node)
2168     elif self.op.readd and node not in node_list:
2169       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2170
2171     for existing_node_name in node_list:
2172       existing_node = cfg.GetNodeInfo(existing_node_name)
2173
2174       if self.op.readd and node == existing_node_name:
2175         if (existing_node.primary_ip != primary_ip or
2176             existing_node.secondary_ip != secondary_ip):
2177           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2178                                      " address configuration as before")
2179         continue
2180
2181       if (existing_node.primary_ip == primary_ip or
2182           existing_node.secondary_ip == primary_ip or
2183           existing_node.primary_ip == secondary_ip or
2184           existing_node.secondary_ip == secondary_ip):
2185         raise errors.OpPrereqError("New node ip address(es) conflict with"
2186                                    " existing node %s" % existing_node.name)
2187
2188     # check that the type of the node (single versus dual homed) is the
2189     # same as for the master
2190     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2191     master_singlehomed = myself.secondary_ip == myself.primary_ip
2192     newbie_singlehomed = secondary_ip == primary_ip
2193     if master_singlehomed != newbie_singlehomed:
2194       if master_singlehomed:
2195         raise errors.OpPrereqError("The master has no private ip but the"
2196                                    " new node has one")
2197       else:
2198         raise errors.OpPrereqError("The master has a private ip but the"
2199                                    " new node doesn't have one")
2200
2201     # checks reachablity
2202     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2203       raise errors.OpPrereqError("Node not reachable by ping")
2204
2205     if not newbie_singlehomed:
2206       # check reachability from my secondary ip to newbie's secondary ip
2207       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2208                            source=myself.secondary_ip):
2209         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2210                                    " based ping to noded port")
2211
2212     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2213     mc_now, _ = self.cfg.GetMasterCandidateStats()
2214     master_candidate = mc_now < cp_size
2215
2216     self.new_node = objects.Node(name=node,
2217                                  primary_ip=primary_ip,
2218                                  secondary_ip=secondary_ip,
2219                                  master_candidate=master_candidate,
2220                                  offline=False, drained=False)
2221
2222   def Exec(self, feedback_fn):
2223     """Adds the new node to the cluster.
2224
2225     """
2226     new_node = self.new_node
2227     node = new_node.name
2228
2229     # check connectivity
2230     result = self.rpc.call_version([node])[node]
2231     result.Raise()
2232     if result.data:
2233       if constants.PROTOCOL_VERSION == result.data:
2234         logging.info("Communication to node %s fine, sw version %s match",
2235                      node, result.data)
2236       else:
2237         raise errors.OpExecError("Version mismatch master version %s,"
2238                                  " node version %s" %
2239                                  (constants.PROTOCOL_VERSION, result.data))
2240     else:
2241       raise errors.OpExecError("Cannot get version from the new node")
2242
2243     # setup ssh on node
2244     logging.info("Copy ssh key to node %s", node)
2245     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2246     keyarray = []
2247     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2248                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2249                 priv_key, pub_key]
2250
2251     for i in keyfiles:
2252       f = open(i, 'r')
2253       try:
2254         keyarray.append(f.read())
2255       finally:
2256         f.close()
2257
2258     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2259                                     keyarray[2],
2260                                     keyarray[3], keyarray[4], keyarray[5])
2261
2262     msg = result.RemoteFailMsg()
2263     if msg:
2264       raise errors.OpExecError("Cannot transfer ssh keys to the"
2265                                " new node: %s" % msg)
2266
2267     # Add node to our /etc/hosts, and add key to known_hosts
2268     if self.cfg.GetClusterInfo().modify_etc_hosts:
2269       utils.AddHostToEtcHosts(new_node.name)
2270
2271     if new_node.secondary_ip != new_node.primary_ip:
2272       result = self.rpc.call_node_has_ip_address(new_node.name,
2273                                                  new_node.secondary_ip)
2274       if result.failed or not result.data:
2275         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2276                                  " you gave (%s). Please fix and re-run this"
2277                                  " command." % new_node.secondary_ip)
2278
2279     node_verify_list = [self.cfg.GetMasterNode()]
2280     node_verify_param = {
2281       'nodelist': [node],
2282       # TODO: do a node-net-test as well?
2283     }
2284
2285     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2286                                        self.cfg.GetClusterName())
2287     for verifier in node_verify_list:
2288       if result[verifier].failed or not result[verifier].data:
2289         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2290                                  " for remote verification" % verifier)
2291       if result[verifier].data['nodelist']:
2292         for failed in result[verifier].data['nodelist']:
2293           feedback_fn("ssh/hostname verification failed %s -> %s" %
2294                       (verifier, result[verifier].data['nodelist'][failed]))
2295         raise errors.OpExecError("ssh/hostname verification failed.")
2296
2297     if self.op.readd:
2298       _RedistributeAncillaryFiles(self)
2299       self.context.ReaddNode(new_node)
2300     else:
2301       _RedistributeAncillaryFiles(self, additional_nodes=node)
2302       self.context.AddNode(new_node)
2303
2304
2305 class LUSetNodeParams(LogicalUnit):
2306   """Modifies the parameters of a node.
2307
2308   """
2309   HPATH = "node-modify"
2310   HTYPE = constants.HTYPE_NODE
2311   _OP_REQP = ["node_name"]
2312   REQ_BGL = False
2313
2314   def CheckArguments(self):
2315     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2316     if node_name is None:
2317       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2318     self.op.node_name = node_name
2319     _CheckBooleanOpField(self.op, 'master_candidate')
2320     _CheckBooleanOpField(self.op, 'offline')
2321     _CheckBooleanOpField(self.op, 'drained')
2322     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2323     if all_mods.count(None) == 3:
2324       raise errors.OpPrereqError("Please pass at least one modification")
2325     if all_mods.count(True) > 1:
2326       raise errors.OpPrereqError("Can't set the node into more than one"
2327                                  " state at the same time")
2328
2329   def ExpandNames(self):
2330     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2331
2332   def BuildHooksEnv(self):
2333     """Build hooks env.
2334
2335     This runs on the master node.
2336
2337     """
2338     env = {
2339       "OP_TARGET": self.op.node_name,
2340       "MASTER_CANDIDATE": str(self.op.master_candidate),
2341       "OFFLINE": str(self.op.offline),
2342       "DRAINED": str(self.op.drained),
2343       }
2344     nl = [self.cfg.GetMasterNode(),
2345           self.op.node_name]
2346     return env, nl, nl
2347
2348   def CheckPrereq(self):
2349     """Check prerequisites.
2350
2351     This only checks the instance list against the existing names.
2352
2353     """
2354     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2355
2356     if ((self.op.master_candidate == False or self.op.offline == True or
2357          self.op.drained == True) and node.master_candidate):
2358       # we will demote the node from master_candidate
2359       if self.op.node_name == self.cfg.GetMasterNode():
2360         raise errors.OpPrereqError("The master node has to be a"
2361                                    " master candidate, online and not drained")
2362       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2363       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2364       if num_candidates <= cp_size:
2365         msg = ("Not enough master candidates (desired"
2366                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2367         if self.op.force:
2368           self.LogWarning(msg)
2369         else:
2370           raise errors.OpPrereqError(msg)
2371
2372     if (self.op.master_candidate == True and
2373         ((node.offline and not self.op.offline == False) or
2374          (node.drained and not self.op.drained == False))):
2375       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2376                                  " to master_candidate" % node.name)
2377
2378     return
2379
2380   def Exec(self, feedback_fn):
2381     """Modifies a node.
2382
2383     """
2384     node = self.node
2385
2386     result = []
2387     changed_mc = False
2388
2389     if self.op.offline is not None:
2390       node.offline = self.op.offline
2391       result.append(("offline", str(self.op.offline)))
2392       if self.op.offline == True:
2393         if node.master_candidate:
2394           node.master_candidate = False
2395           changed_mc = True
2396           result.append(("master_candidate", "auto-demotion due to offline"))
2397         if node.drained:
2398           node.drained = False
2399           result.append(("drained", "clear drained status due to offline"))
2400
2401     if self.op.master_candidate is not None:
2402       node.master_candidate = self.op.master_candidate
2403       changed_mc = True
2404       result.append(("master_candidate", str(self.op.master_candidate)))
2405       if self.op.master_candidate == False:
2406         rrc = self.rpc.call_node_demote_from_mc(node.name)
2407         msg = rrc.RemoteFailMsg()
2408         if msg:
2409           self.LogWarning("Node failed to demote itself: %s" % msg)
2410
2411     if self.op.drained is not None:
2412       node.drained = self.op.drained
2413       result.append(("drained", str(self.op.drained)))
2414       if self.op.drained == True:
2415         if node.master_candidate:
2416           node.master_candidate = False
2417           changed_mc = True
2418           result.append(("master_candidate", "auto-demotion due to drain"))
2419         if node.offline:
2420           node.offline = False
2421           result.append(("offline", "clear offline status due to drain"))
2422
2423     # this will trigger configuration file update, if needed
2424     self.cfg.Update(node)
2425     # this will trigger job queue propagation or cleanup
2426     if changed_mc:
2427       self.context.ReaddNode(node)
2428
2429     return result
2430
2431
2432 class LUPowercycleNode(NoHooksLU):
2433   """Powercycles a node.
2434
2435   """
2436   _OP_REQP = ["node_name", "force"]
2437   REQ_BGL = False
2438
2439   def CheckArguments(self):
2440     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2441     if node_name is None:
2442       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2443     self.op.node_name = node_name
2444     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2445       raise errors.OpPrereqError("The node is the master and the force"
2446                                  " parameter was not set")
2447
2448   def ExpandNames(self):
2449     """Locking for PowercycleNode.
2450
2451     This is a last-resource option and shouldn't block on other
2452     jobs. Therefore, we grab no locks.
2453
2454     """
2455     self.needed_locks = {}
2456
2457   def CheckPrereq(self):
2458     """Check prerequisites.
2459
2460     This LU has no prereqs.
2461
2462     """
2463     pass
2464
2465   def Exec(self, feedback_fn):
2466     """Reboots a node.
2467
2468     """
2469     result = self.rpc.call_node_powercycle(self.op.node_name,
2470                                            self.cfg.GetHypervisorType())
2471     msg = result.RemoteFailMsg()
2472     if msg:
2473       raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2474     return result.payload
2475
2476
2477 class LUQueryClusterInfo(NoHooksLU):
2478   """Query cluster configuration.
2479
2480   """
2481   _OP_REQP = []
2482   REQ_BGL = False
2483
2484   def ExpandNames(self):
2485     self.needed_locks = {}
2486
2487   def CheckPrereq(self):
2488     """No prerequsites needed for this LU.
2489
2490     """
2491     pass
2492
2493   def Exec(self, feedback_fn):
2494     """Return cluster config.
2495
2496     """
2497     cluster = self.cfg.GetClusterInfo()
2498     result = {
2499       "software_version": constants.RELEASE_VERSION,
2500       "protocol_version": constants.PROTOCOL_VERSION,
2501       "config_version": constants.CONFIG_VERSION,
2502       "os_api_version": constants.OS_API_VERSION,
2503       "export_version": constants.EXPORT_VERSION,
2504       "architecture": (platform.architecture()[0], platform.machine()),
2505       "name": cluster.cluster_name,
2506       "master": cluster.master_node,
2507       "default_hypervisor": cluster.default_hypervisor,
2508       "enabled_hypervisors": cluster.enabled_hypervisors,
2509       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2510                         for hypervisor in cluster.enabled_hypervisors]),
2511       "beparams": cluster.beparams,
2512       "candidate_pool_size": cluster.candidate_pool_size,
2513       "default_bridge": cluster.default_bridge,
2514       "master_netdev": cluster.master_netdev,
2515       "volume_group_name": cluster.volume_group_name,
2516       "file_storage_dir": cluster.file_storage_dir,
2517       }
2518
2519     return result
2520
2521
2522 class LUQueryConfigValues(NoHooksLU):
2523   """Return configuration values.
2524
2525   """
2526   _OP_REQP = []
2527   REQ_BGL = False
2528   _FIELDS_DYNAMIC = utils.FieldSet()
2529   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2530
2531   def ExpandNames(self):
2532     self.needed_locks = {}
2533
2534     _CheckOutputFields(static=self._FIELDS_STATIC,
2535                        dynamic=self._FIELDS_DYNAMIC,
2536                        selected=self.op.output_fields)
2537
2538   def CheckPrereq(self):
2539     """No prerequisites.
2540
2541     """
2542     pass
2543
2544   def Exec(self, feedback_fn):
2545     """Dump a representation of the cluster config to the standard output.
2546
2547     """
2548     values = []
2549     for field in self.op.output_fields:
2550       if field == "cluster_name":
2551         entry = self.cfg.GetClusterName()
2552       elif field == "master_node":
2553         entry = self.cfg.GetMasterNode()
2554       elif field == "drain_flag":
2555         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2556       else:
2557         raise errors.ParameterError(field)
2558       values.append(entry)
2559     return values
2560
2561
2562 class LUActivateInstanceDisks(NoHooksLU):
2563   """Bring up an instance's disks.
2564
2565   """
2566   _OP_REQP = ["instance_name"]
2567   REQ_BGL = False
2568
2569   def ExpandNames(self):
2570     self._ExpandAndLockInstance()
2571     self.needed_locks[locking.LEVEL_NODE] = []
2572     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2573
2574   def DeclareLocks(self, level):
2575     if level == locking.LEVEL_NODE:
2576       self._LockInstancesNodes()
2577
2578   def CheckPrereq(self):
2579     """Check prerequisites.
2580
2581     This checks that the instance is in the cluster.
2582
2583     """
2584     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2585     assert self.instance is not None, \
2586       "Cannot retrieve locked instance %s" % self.op.instance_name
2587     _CheckNodeOnline(self, self.instance.primary_node)
2588
2589   def Exec(self, feedback_fn):
2590     """Activate the disks.
2591
2592     """
2593     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2594     if not disks_ok:
2595       raise errors.OpExecError("Cannot activate block devices")
2596
2597     return disks_info
2598
2599
2600 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2601   """Prepare the block devices for an instance.
2602
2603   This sets up the block devices on all nodes.
2604
2605   @type lu: L{LogicalUnit}
2606   @param lu: the logical unit on whose behalf we execute
2607   @type instance: L{objects.Instance}
2608   @param instance: the instance for whose disks we assemble
2609   @type ignore_secondaries: boolean
2610   @param ignore_secondaries: if true, errors on secondary nodes
2611       won't result in an error return from the function
2612   @return: False if the operation failed, otherwise a list of
2613       (host, instance_visible_name, node_visible_name)
2614       with the mapping from node devices to instance devices
2615
2616   """
2617   device_info = []
2618   disks_ok = True
2619   iname = instance.name
2620   # With the two passes mechanism we try to reduce the window of
2621   # opportunity for the race condition of switching DRBD to primary
2622   # before handshaking occured, but we do not eliminate it
2623
2624   # The proper fix would be to wait (with some limits) until the
2625   # connection has been made and drbd transitions from WFConnection
2626   # into any other network-connected state (Connected, SyncTarget,
2627   # SyncSource, etc.)
2628
2629   # 1st pass, assemble on all nodes in secondary mode
2630   for inst_disk in instance.disks:
2631     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2632       lu.cfg.SetDiskID(node_disk, node)
2633       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2634       msg = result.RemoteFailMsg()
2635       if msg:
2636         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2637                            " (is_primary=False, pass=1): %s",
2638                            inst_disk.iv_name, node, msg)
2639         if not ignore_secondaries:
2640           disks_ok = False
2641
2642   # FIXME: race condition on drbd migration to primary
2643
2644   # 2nd pass, do only the primary node
2645   for inst_disk in instance.disks:
2646     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2647       if node != instance.primary_node:
2648         continue
2649       lu.cfg.SetDiskID(node_disk, node)
2650       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2651       msg = result.RemoteFailMsg()
2652       if msg:
2653         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2654                            " (is_primary=True, pass=2): %s",
2655                            inst_disk.iv_name, node, msg)
2656         disks_ok = False
2657     device_info.append((instance.primary_node, inst_disk.iv_name,
2658                         result.payload))
2659
2660   # leave the disks configured for the primary node
2661   # this is a workaround that would be fixed better by
2662   # improving the logical/physical id handling
2663   for disk in instance.disks:
2664     lu.cfg.SetDiskID(disk, instance.primary_node)
2665
2666   return disks_ok, device_info
2667
2668
2669 def _StartInstanceDisks(lu, instance, force):
2670   """Start the disks of an instance.
2671
2672   """
2673   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2674                                            ignore_secondaries=force)
2675   if not disks_ok:
2676     _ShutdownInstanceDisks(lu, instance)
2677     if force is not None and not force:
2678       lu.proc.LogWarning("", hint="If the message above refers to a"
2679                          " secondary node,"
2680                          " you can retry the operation using '--force'.")
2681     raise errors.OpExecError("Disk consistency error")
2682
2683
2684 class LUDeactivateInstanceDisks(NoHooksLU):
2685   """Shutdown an instance's disks.
2686
2687   """
2688   _OP_REQP = ["instance_name"]
2689   REQ_BGL = False
2690
2691   def ExpandNames(self):
2692     self._ExpandAndLockInstance()
2693     self.needed_locks[locking.LEVEL_NODE] = []
2694     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2695
2696   def DeclareLocks(self, level):
2697     if level == locking.LEVEL_NODE:
2698       self._LockInstancesNodes()
2699
2700   def CheckPrereq(self):
2701     """Check prerequisites.
2702
2703     This checks that the instance is in the cluster.
2704
2705     """
2706     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2707     assert self.instance is not None, \
2708       "Cannot retrieve locked instance %s" % self.op.instance_name
2709
2710   def Exec(self, feedback_fn):
2711     """Deactivate the disks
2712
2713     """
2714     instance = self.instance
2715     _SafeShutdownInstanceDisks(self, instance)
2716
2717
2718 def _SafeShutdownInstanceDisks(lu, instance):
2719   """Shutdown block devices of an instance.
2720
2721   This function checks if an instance is running, before calling
2722   _ShutdownInstanceDisks.
2723
2724   """
2725   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2726                                       [instance.hypervisor])
2727   ins_l = ins_l[instance.primary_node]
2728   if ins_l.failed or not isinstance(ins_l.data, list):
2729     raise errors.OpExecError("Can't contact node '%s'" %
2730                              instance.primary_node)
2731
2732   if instance.name in ins_l.data:
2733     raise errors.OpExecError("Instance is running, can't shutdown"
2734                              " block devices.")
2735
2736   _ShutdownInstanceDisks(lu, instance)
2737
2738
2739 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2740   """Shutdown block devices of an instance.
2741
2742   This does the shutdown on all nodes of the instance.
2743
2744   If the ignore_primary is false, errors on the primary node are
2745   ignored.
2746
2747   """
2748   all_result = True
2749   for disk in instance.disks:
2750     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2751       lu.cfg.SetDiskID(top_disk, node)
2752       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2753       msg = result.RemoteFailMsg()
2754       if msg:
2755         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2756                       disk.iv_name, node, msg)
2757         if not ignore_primary or node != instance.primary_node:
2758           all_result = False
2759   return all_result
2760
2761
2762 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2763   """Checks if a node has enough free memory.
2764
2765   This function check if a given node has the needed amount of free
2766   memory. In case the node has less memory or we cannot get the
2767   information from the node, this function raise an OpPrereqError
2768   exception.
2769
2770   @type lu: C{LogicalUnit}
2771   @param lu: a logical unit from which we get configuration data
2772   @type node: C{str}
2773   @param node: the node to check
2774   @type reason: C{str}
2775   @param reason: string to use in the error message
2776   @type requested: C{int}
2777   @param requested: the amount of memory in MiB to check for
2778   @type hypervisor_name: C{str}
2779   @param hypervisor_name: the hypervisor to ask for memory stats
2780   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2781       we cannot check the node
2782
2783   """
2784   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2785   nodeinfo[node].Raise()
2786   free_mem = nodeinfo[node].data.get('memory_free')
2787   if not isinstance(free_mem, int):
2788     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2789                              " was '%s'" % (node, free_mem))
2790   if requested > free_mem:
2791     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2792                              " needed %s MiB, available %s MiB" %
2793                              (node, reason, requested, free_mem))
2794
2795
2796 class LUStartupInstance(LogicalUnit):
2797   """Starts an instance.
2798
2799   """
2800   HPATH = "instance-start"
2801   HTYPE = constants.HTYPE_INSTANCE
2802   _OP_REQP = ["instance_name", "force"]
2803   REQ_BGL = False
2804
2805   def ExpandNames(self):
2806     self._ExpandAndLockInstance()
2807
2808   def BuildHooksEnv(self):
2809     """Build hooks env.
2810
2811     This runs on master, primary and secondary nodes of the instance.
2812
2813     """
2814     env = {
2815       "FORCE": self.op.force,
2816       }
2817     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2818     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2819     return env, nl, nl
2820
2821   def CheckPrereq(self):
2822     """Check prerequisites.
2823
2824     This checks that the instance is in the cluster.
2825
2826     """
2827     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2828     assert self.instance is not None, \
2829       "Cannot retrieve locked instance %s" % self.op.instance_name
2830
2831     # extra beparams
2832     self.beparams = getattr(self.op, "beparams", {})
2833     if self.beparams:
2834       if not isinstance(self.beparams, dict):
2835         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2836                                    " dict" % (type(self.beparams), ))
2837       # fill the beparams dict
2838       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2839       self.op.beparams = self.beparams
2840
2841     # extra hvparams
2842     self.hvparams = getattr(self.op, "hvparams", {})
2843     if self.hvparams:
2844       if not isinstance(self.hvparams, dict):
2845         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2846                                    " dict" % (type(self.hvparams), ))
2847
2848       # check hypervisor parameter syntax (locally)
2849       cluster = self.cfg.GetClusterInfo()
2850       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2851       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2852                                     instance.hvparams)
2853       filled_hvp.update(self.hvparams)
2854       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2855       hv_type.CheckParameterSyntax(filled_hvp)
2856       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2857       self.op.hvparams = self.hvparams
2858
2859     _CheckNodeOnline(self, instance.primary_node)
2860
2861     bep = self.cfg.GetClusterInfo().FillBE(instance)
2862     # check bridges existance
2863     _CheckInstanceBridgesExist(self, instance)
2864
2865     remote_info = self.rpc.call_instance_info(instance.primary_node,
2866                                               instance.name,
2867                                               instance.hypervisor)
2868     remote_info.Raise()
2869     if not remote_info.data:
2870       _CheckNodeFreeMemory(self, instance.primary_node,
2871                            "starting instance %s" % instance.name,
2872                            bep[constants.BE_MEMORY], instance.hypervisor)
2873
2874   def Exec(self, feedback_fn):
2875     """Start the instance.
2876
2877     """
2878     instance = self.instance
2879     force = self.op.force
2880
2881     self.cfg.MarkInstanceUp(instance.name)
2882
2883     node_current = instance.primary_node
2884
2885     _StartInstanceDisks(self, instance, force)
2886
2887     result = self.rpc.call_instance_start(node_current, instance,
2888                                           self.hvparams, self.beparams)
2889     msg = result.RemoteFailMsg()
2890     if msg:
2891       _ShutdownInstanceDisks(self, instance)
2892       raise errors.OpExecError("Could not start instance: %s" % msg)
2893
2894
2895 class LURebootInstance(LogicalUnit):
2896   """Reboot an instance.
2897
2898   """
2899   HPATH = "instance-reboot"
2900   HTYPE = constants.HTYPE_INSTANCE
2901   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2902   REQ_BGL = False
2903
2904   def ExpandNames(self):
2905     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2906                                    constants.INSTANCE_REBOOT_HARD,
2907                                    constants.INSTANCE_REBOOT_FULL]:
2908       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2909                                   (constants.INSTANCE_REBOOT_SOFT,
2910                                    constants.INSTANCE_REBOOT_HARD,
2911                                    constants.INSTANCE_REBOOT_FULL))
2912     self._ExpandAndLockInstance()
2913
2914   def BuildHooksEnv(self):
2915     """Build hooks env.
2916
2917     This runs on master, primary and secondary nodes of the instance.
2918
2919     """
2920     env = {
2921       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2922       "REBOOT_TYPE": self.op.reboot_type,
2923       }
2924     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2925     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2926     return env, nl, nl
2927
2928   def CheckPrereq(self):
2929     """Check prerequisites.
2930
2931     This checks that the instance is in the cluster.
2932
2933     """
2934     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2935     assert self.instance is not None, \
2936       "Cannot retrieve locked instance %s" % self.op.instance_name
2937
2938     _CheckNodeOnline(self, instance.primary_node)
2939
2940     # check bridges existance
2941     _CheckInstanceBridgesExist(self, instance)
2942
2943   def Exec(self, feedback_fn):
2944     """Reboot the instance.
2945
2946     """
2947     instance = self.instance
2948     ignore_secondaries = self.op.ignore_secondaries
2949     reboot_type = self.op.reboot_type
2950
2951     node_current = instance.primary_node
2952
2953     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2954                        constants.INSTANCE_REBOOT_HARD]:
2955       for disk in instance.disks:
2956         self.cfg.SetDiskID(disk, node_current)
2957       result = self.rpc.call_instance_reboot(node_current, instance,
2958                                              reboot_type)
2959       msg = result.RemoteFailMsg()
2960       if msg:
2961         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2962     else:
2963       result = self.rpc.call_instance_shutdown(node_current, instance)
2964       msg = result.RemoteFailMsg()
2965       if msg:
2966         raise errors.OpExecError("Could not shutdown instance for"
2967                                  " full reboot: %s" % msg)
2968       _ShutdownInstanceDisks(self, instance)
2969       _StartInstanceDisks(self, instance, ignore_secondaries)
2970       result = self.rpc.call_instance_start(node_current, instance, None, None)
2971       msg = result.RemoteFailMsg()
2972       if msg:
2973         _ShutdownInstanceDisks(self, instance)
2974         raise errors.OpExecError("Could not start instance for"
2975                                  " full reboot: %s" % msg)
2976
2977     self.cfg.MarkInstanceUp(instance.name)
2978
2979
2980 class LUShutdownInstance(LogicalUnit):
2981   """Shutdown an instance.
2982
2983   """
2984   HPATH = "instance-stop"
2985   HTYPE = constants.HTYPE_INSTANCE
2986   _OP_REQP = ["instance_name"]
2987   REQ_BGL = False
2988
2989   def ExpandNames(self):
2990     self._ExpandAndLockInstance()
2991
2992   def BuildHooksEnv(self):
2993     """Build hooks env.
2994
2995     This runs on master, primary and secondary nodes of the instance.
2996
2997     """
2998     env = _BuildInstanceHookEnvByObject(self, self.instance)
2999     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3000     return env, nl, nl
3001
3002   def CheckPrereq(self):
3003     """Check prerequisites.
3004
3005     This checks that the instance is in the cluster.
3006
3007     """
3008     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3009     assert self.instance is not None, \
3010       "Cannot retrieve locked instance %s" % self.op.instance_name
3011     _CheckNodeOnline(self, self.instance.primary_node)
3012
3013   def Exec(self, feedback_fn):
3014     """Shutdown the instance.
3015
3016     """
3017     instance = self.instance
3018     node_current = instance.primary_node
3019     self.cfg.MarkInstanceDown(instance.name)
3020     result = self.rpc.call_instance_shutdown(node_current, instance)
3021     msg = result.RemoteFailMsg()
3022     if msg:
3023       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3024
3025     _ShutdownInstanceDisks(self, instance)
3026
3027
3028 class LUReinstallInstance(LogicalUnit):
3029   """Reinstall an instance.
3030
3031   """
3032   HPATH = "instance-reinstall"
3033   HTYPE = constants.HTYPE_INSTANCE
3034   _OP_REQP = ["instance_name"]
3035   REQ_BGL = False
3036
3037   def ExpandNames(self):
3038     self._ExpandAndLockInstance()
3039
3040   def BuildHooksEnv(self):
3041     """Build hooks env.
3042
3043     This runs on master, primary and secondary nodes of the instance.
3044
3045     """
3046     env = _BuildInstanceHookEnvByObject(self, self.instance)
3047     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3048     return env, nl, nl
3049
3050   def CheckPrereq(self):
3051     """Check prerequisites.
3052
3053     This checks that the instance is in the cluster and is not running.
3054
3055     """
3056     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3057     assert instance is not None, \
3058       "Cannot retrieve locked instance %s" % self.op.instance_name
3059     _CheckNodeOnline(self, instance.primary_node)
3060
3061     if instance.disk_template == constants.DT_DISKLESS:
3062       raise errors.OpPrereqError("Instance '%s' has no disks" %
3063                                  self.op.instance_name)
3064     if instance.admin_up:
3065       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3066                                  self.op.instance_name)
3067     remote_info = self.rpc.call_instance_info(instance.primary_node,
3068                                               instance.name,
3069                                               instance.hypervisor)
3070     remote_info.Raise()
3071     if remote_info.data:
3072       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3073                                  (self.op.instance_name,
3074                                   instance.primary_node))
3075
3076     self.op.os_type = getattr(self.op, "os_type", None)
3077     if self.op.os_type is not None:
3078       # OS verification
3079       pnode = self.cfg.GetNodeInfo(
3080         self.cfg.ExpandNodeName(instance.primary_node))
3081       if pnode is None:
3082         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3083                                    self.op.pnode)
3084       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3085       result.Raise()
3086       if not isinstance(result.data, objects.OS):
3087         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3088                                    " primary node"  % self.op.os_type)
3089
3090     self.instance = instance
3091
3092   def Exec(self, feedback_fn):
3093     """Reinstall the instance.
3094
3095     """
3096     inst = self.instance
3097
3098     if self.op.os_type is not None:
3099       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3100       inst.os = self.op.os_type
3101       self.cfg.Update(inst)
3102
3103     _StartInstanceDisks(self, inst, None)
3104     try:
3105       feedback_fn("Running the instance OS create scripts...")
3106       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3107       msg = result.RemoteFailMsg()
3108       if msg:
3109         raise errors.OpExecError("Could not install OS for instance %s"
3110                                  " on node %s: %s" %
3111                                  (inst.name, inst.primary_node, msg))
3112     finally:
3113       _ShutdownInstanceDisks(self, inst)
3114
3115
3116 class LURenameInstance(LogicalUnit):
3117   """Rename an instance.
3118
3119   """
3120   HPATH = "instance-rename"
3121   HTYPE = constants.HTYPE_INSTANCE
3122   _OP_REQP = ["instance_name", "new_name"]
3123
3124   def BuildHooksEnv(self):
3125     """Build hooks env.
3126
3127     This runs on master, primary and secondary nodes of the instance.
3128
3129     """
3130     env = _BuildInstanceHookEnvByObject(self, self.instance)
3131     env["INSTANCE_NEW_NAME"] = self.op.new_name
3132     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3133     return env, nl, nl
3134
3135   def CheckPrereq(self):
3136     """Check prerequisites.
3137
3138     This checks that the instance is in the cluster and is not running.
3139
3140     """
3141     instance = self.cfg.GetInstanceInfo(
3142       self.cfg.ExpandInstanceName(self.op.instance_name))
3143     if instance is None:
3144       raise errors.OpPrereqError("Instance '%s' not known" %
3145                                  self.op.instance_name)
3146     _CheckNodeOnline(self, instance.primary_node)
3147
3148     if instance.admin_up:
3149       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3150                                  self.op.instance_name)
3151     remote_info = self.rpc.call_instance_info(instance.primary_node,
3152                                               instance.name,
3153                                               instance.hypervisor)
3154     remote_info.Raise()
3155     if remote_info.data:
3156       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3157                                  (self.op.instance_name,
3158                                   instance.primary_node))
3159     self.instance = instance
3160
3161     # new name verification
3162     name_info = utils.HostInfo(self.op.new_name)
3163
3164     self.op.new_name = new_name = name_info.name
3165     instance_list = self.cfg.GetInstanceList()
3166     if new_name in instance_list:
3167       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3168                                  new_name)
3169
3170     if not getattr(self.op, "ignore_ip", False):
3171       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3172         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3173                                    (name_info.ip, new_name))
3174
3175
3176   def Exec(self, feedback_fn):
3177     """Reinstall the instance.
3178
3179     """
3180     inst = self.instance
3181     old_name = inst.name
3182
3183     if inst.disk_template == constants.DT_FILE:
3184       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3185
3186     self.cfg.RenameInstance(inst.name, self.op.new_name)
3187     # Change the instance lock. This is definitely safe while we hold the BGL
3188     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3189     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3190
3191     # re-read the instance from the configuration after rename
3192     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3193
3194     if inst.disk_template == constants.DT_FILE:
3195       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3196       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3197                                                      old_file_storage_dir,
3198                                                      new_file_storage_dir)
3199       result.Raise()
3200       if not result.data:
3201         raise errors.OpExecError("Could not connect to node '%s' to rename"
3202                                  " directory '%s' to '%s' (but the instance"
3203                                  " has been renamed in Ganeti)" % (
3204                                  inst.primary_node, old_file_storage_dir,
3205                                  new_file_storage_dir))
3206
3207       if not result.data[0]:
3208         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3209                                  " (but the instance has been renamed in"
3210                                  " Ganeti)" % (old_file_storage_dir,
3211                                                new_file_storage_dir))
3212
3213     _StartInstanceDisks(self, inst, None)
3214     try:
3215       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3216                                                  old_name)
3217       msg = result.RemoteFailMsg()
3218       if msg:
3219         msg = ("Could not run OS rename script for instance %s on node %s"
3220                " (but the instance has been renamed in Ganeti): %s" %
3221                (inst.name, inst.primary_node, msg))
3222         self.proc.LogWarning(msg)
3223     finally:
3224       _ShutdownInstanceDisks(self, inst)
3225
3226
3227 class LURemoveInstance(LogicalUnit):
3228   """Remove an instance.
3229
3230   """
3231   HPATH = "instance-remove"
3232   HTYPE = constants.HTYPE_INSTANCE
3233   _OP_REQP = ["instance_name", "ignore_failures"]
3234   REQ_BGL = False
3235
3236   def ExpandNames(self):
3237     self._ExpandAndLockInstance()
3238     self.needed_locks[locking.LEVEL_NODE] = []
3239     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3240
3241   def DeclareLocks(self, level):
3242     if level == locking.LEVEL_NODE:
3243       self._LockInstancesNodes()
3244
3245   def BuildHooksEnv(self):
3246     """Build hooks env.
3247
3248     This runs on master, primary and secondary nodes of the instance.
3249
3250     """
3251     env = _BuildInstanceHookEnvByObject(self, self.instance)
3252     nl = [self.cfg.GetMasterNode()]
3253     return env, nl, nl
3254
3255   def CheckPrereq(self):
3256     """Check prerequisites.
3257
3258     This checks that the instance is in the cluster.
3259
3260     """
3261     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3262     assert self.instance is not None, \
3263       "Cannot retrieve locked instance %s" % self.op.instance_name
3264
3265   def Exec(self, feedback_fn):
3266     """Remove the instance.
3267
3268     """
3269     instance = self.instance
3270     logging.info("Shutting down instance %s on node %s",
3271                  instance.name, instance.primary_node)
3272
3273     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3274     msg = result.RemoteFailMsg()
3275     if msg:
3276       if self.op.ignore_failures:
3277         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3278       else:
3279         raise errors.OpExecError("Could not shutdown instance %s on"
3280                                  " node %s: %s" %
3281                                  (instance.name, instance.primary_node, msg))
3282
3283     logging.info("Removing block devices for instance %s", instance.name)
3284
3285     if not _RemoveDisks(self, instance):
3286       if self.op.ignore_failures:
3287         feedback_fn("Warning: can't remove instance's disks")
3288       else:
3289         raise errors.OpExecError("Can't remove instance's disks")
3290
3291     logging.info("Removing instance %s out of cluster config", instance.name)
3292
3293     self.cfg.RemoveInstance(instance.name)
3294     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3295
3296
3297 class LUQueryInstances(NoHooksLU):
3298   """Logical unit for querying instances.
3299
3300   """
3301   _OP_REQP = ["output_fields", "names", "use_locking"]
3302   REQ_BGL = False
3303   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3304                                     "admin_state",
3305                                     "disk_template", "ip", "mac", "bridge",
3306                                     "sda_size", "sdb_size", "vcpus", "tags",
3307                                     "network_port", "beparams",
3308                                     r"(disk)\.(size)/([0-9]+)",
3309                                     r"(disk)\.(sizes)", "disk_usage",
3310                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3311                                     r"(nic)\.(macs|ips|bridges)",
3312                                     r"(disk|nic)\.(count)",
3313                                     "serial_no", "hypervisor", "hvparams",] +
3314                                   ["hv/%s" % name
3315                                    for name in constants.HVS_PARAMETERS] +
3316                                   ["be/%s" % name
3317                                    for name in constants.BES_PARAMETERS])
3318   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3319
3320
3321   def ExpandNames(self):
3322     _CheckOutputFields(static=self._FIELDS_STATIC,
3323                        dynamic=self._FIELDS_DYNAMIC,
3324                        selected=self.op.output_fields)
3325
3326     self.needed_locks = {}
3327     self.share_locks[locking.LEVEL_INSTANCE] = 1
3328     self.share_locks[locking.LEVEL_NODE] = 1
3329
3330     if self.op.names:
3331       self.wanted = _GetWantedInstances(self, self.op.names)
3332     else:
3333       self.wanted = locking.ALL_SET
3334
3335     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3336     self.do_locking = self.do_node_query and self.op.use_locking
3337     if self.do_locking:
3338       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3339       self.needed_locks[locking.LEVEL_NODE] = []
3340       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3341
3342   def DeclareLocks(self, level):
3343     if level == locking.LEVEL_NODE and self.do_locking:
3344       self._LockInstancesNodes()
3345
3346   def CheckPrereq(self):
3347     """Check prerequisites.
3348
3349     """
3350     pass
3351
3352   def Exec(self, feedback_fn):
3353     """Computes the list of nodes and their attributes.
3354
3355     """
3356     all_info = self.cfg.GetAllInstancesInfo()
3357     if self.wanted == locking.ALL_SET:
3358       # caller didn't specify instance names, so ordering is not important
3359       if self.do_locking:
3360         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3361       else:
3362         instance_names = all_info.keys()
3363       instance_names = utils.NiceSort(instance_names)
3364     else:
3365       # caller did specify names, so we must keep the ordering
3366       if self.do_locking:
3367         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3368       else:
3369         tgt_set = all_info.keys()
3370       missing = set(self.wanted).difference(tgt_set)
3371       if missing:
3372         raise errors.OpExecError("Some instances were removed before"
3373                                  " retrieving their data: %s" % missing)
3374       instance_names = self.wanted
3375
3376     instance_list = [all_info[iname] for iname in instance_names]
3377
3378     # begin data gathering
3379
3380     nodes = frozenset([inst.primary_node for inst in instance_list])
3381     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3382
3383     bad_nodes = []
3384     off_nodes = []
3385     if self.do_node_query:
3386       live_data = {}
3387       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3388       for name in nodes:
3389         result = node_data[name]
3390         if result.offline:
3391           # offline nodes will be in both lists
3392           off_nodes.append(name)
3393         if result.failed:
3394           bad_nodes.append(name)
3395         else:
3396           if result.data:
3397             live_data.update(result.data)
3398             # else no instance is alive
3399     else:
3400       live_data = dict([(name, {}) for name in instance_names])
3401
3402     # end data gathering
3403
3404     HVPREFIX = "hv/"
3405     BEPREFIX = "be/"
3406     output = []
3407     for instance in instance_list:
3408       iout = []
3409       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3410       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3411       for field in self.op.output_fields:
3412         st_match = self._FIELDS_STATIC.Matches(field)
3413         if field == "name":
3414           val = instance.name
3415         elif field == "os":
3416           val = instance.os
3417         elif field == "pnode":
3418           val = instance.primary_node
3419         elif field == "snodes":
3420           val = list(instance.secondary_nodes)
3421         elif field == "admin_state":
3422           val = instance.admin_up
3423         elif field == "oper_state":
3424           if instance.primary_node in bad_nodes:
3425             val = None
3426           else:
3427             val = bool(live_data.get(instance.name))
3428         elif field == "status":
3429           if instance.primary_node in off_nodes:
3430             val = "ERROR_nodeoffline"
3431           elif instance.primary_node in bad_nodes:
3432             val = "ERROR_nodedown"
3433           else:
3434             running = bool(live_data.get(instance.name))
3435             if running:
3436               if instance.admin_up:
3437                 val = "running"
3438               else:
3439                 val = "ERROR_up"
3440             else:
3441               if instance.admin_up:
3442                 val = "ERROR_down"
3443               else:
3444                 val = "ADMIN_down"
3445         elif field == "oper_ram":
3446           if instance.primary_node in bad_nodes:
3447             val = None
3448           elif instance.name in live_data:
3449             val = live_data[instance.name].get("memory", "?")
3450           else:
3451             val = "-"
3452         elif field == "disk_template":
3453           val = instance.disk_template
3454         elif field == "ip":
3455           val = instance.nics[0].ip
3456         elif field == "bridge":
3457           val = instance.nics[0].bridge
3458         elif field == "mac":
3459           val = instance.nics[0].mac
3460         elif field == "sda_size" or field == "sdb_size":
3461           idx = ord(field[2]) - ord('a')
3462           try:
3463             val = instance.FindDisk(idx).size
3464           except errors.OpPrereqError:
3465             val = None
3466         elif field == "disk_usage": # total disk usage per node
3467           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3468           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3469         elif field == "tags":
3470           val = list(instance.GetTags())
3471         elif field == "serial_no":
3472           val = instance.serial_no
3473         elif field == "network_port":
3474           val = instance.network_port
3475         elif field == "hypervisor":
3476           val = instance.hypervisor
3477         elif field == "hvparams":
3478           val = i_hv
3479         elif (field.startswith(HVPREFIX) and
3480               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3481           val = i_hv.get(field[len(HVPREFIX):], None)
3482         elif field == "beparams":
3483           val = i_be
3484         elif (field.startswith(BEPREFIX) and
3485               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3486           val = i_be.get(field[len(BEPREFIX):], None)
3487         elif st_match and st_match.groups():
3488           # matches a variable list
3489           st_groups = st_match.groups()
3490           if st_groups and st_groups[0] == "disk":
3491             if st_groups[1] == "count":
3492               val = len(instance.disks)
3493             elif st_groups[1] == "sizes":
3494               val = [disk.size for disk in instance.disks]
3495             elif st_groups[1] == "size":
3496               try:
3497                 val = instance.FindDisk(st_groups[2]).size
3498               except errors.OpPrereqError:
3499                 val = None
3500             else:
3501               assert False, "Unhandled disk parameter"
3502           elif st_groups[0] == "nic":
3503             if st_groups[1] == "count":
3504               val = len(instance.nics)
3505             elif st_groups[1] == "macs":
3506               val = [nic.mac for nic in instance.nics]
3507             elif st_groups[1] == "ips":
3508               val = [nic.ip for nic in instance.nics]
3509             elif st_groups[1] == "bridges":
3510               val = [nic.bridge for nic in instance.nics]
3511             else:
3512               # index-based item
3513               nic_idx = int(st_groups[2])
3514               if nic_idx >= len(instance.nics):
3515                 val = None
3516               else:
3517                 if st_groups[1] == "mac":
3518                   val = instance.nics[nic_idx].mac
3519                 elif st_groups[1] == "ip":
3520                   val = instance.nics[nic_idx].ip
3521                 elif st_groups[1] == "bridge":
3522                   val = instance.nics[nic_idx].bridge
3523                 else:
3524                   assert False, "Unhandled NIC parameter"
3525           else:
3526             assert False, "Unhandled variable parameter"
3527         else:
3528           raise errors.ParameterError(field)
3529         iout.append(val)
3530       output.append(iout)
3531
3532     return output
3533
3534
3535 class LUFailoverInstance(LogicalUnit):
3536   """Failover an instance.
3537
3538   """
3539   HPATH = "instance-failover"
3540   HTYPE = constants.HTYPE_INSTANCE
3541   _OP_REQP = ["instance_name", "ignore_consistency"]
3542   REQ_BGL = False
3543
3544   def ExpandNames(self):
3545     self._ExpandAndLockInstance()
3546     self.needed_locks[locking.LEVEL_NODE] = []
3547     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3548
3549   def DeclareLocks(self, level):
3550     if level == locking.LEVEL_NODE:
3551       self._LockInstancesNodes()
3552
3553   def BuildHooksEnv(self):
3554     """Build hooks env.
3555
3556     This runs on master, primary and secondary nodes of the instance.
3557
3558     """
3559     env = {
3560       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3561       }
3562     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3563     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3564     return env, nl, nl
3565
3566   def CheckPrereq(self):
3567     """Check prerequisites.
3568
3569     This checks that the instance is in the cluster.
3570
3571     """
3572     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3573     assert self.instance is not None, \
3574       "Cannot retrieve locked instance %s" % self.op.instance_name
3575
3576     bep = self.cfg.GetClusterInfo().FillBE(instance)
3577     if instance.disk_template not in constants.DTS_NET_MIRROR:
3578       raise errors.OpPrereqError("Instance's disk layout is not"
3579                                  " network mirrored, cannot failover.")
3580
3581     secondary_nodes = instance.secondary_nodes
3582     if not secondary_nodes:
3583       raise errors.ProgrammerError("no secondary node but using "
3584                                    "a mirrored disk template")
3585
3586     target_node = secondary_nodes[0]
3587     _CheckNodeOnline(self, target_node)
3588     _CheckNodeNotDrained(self, target_node)
3589     # check memory requirements on the secondary node
3590     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3591                          instance.name, bep[constants.BE_MEMORY],
3592                          instance.hypervisor)
3593
3594     # check bridge existance
3595     brlist = [nic.bridge for nic in instance.nics]
3596     result = self.rpc.call_bridges_exist(target_node, brlist)
3597     result.Raise()
3598     if not result.data:
3599       raise errors.OpPrereqError("One or more target bridges %s does not"
3600                                  " exist on destination node '%s'" %
3601                                  (brlist, target_node))
3602
3603   def Exec(self, feedback_fn):
3604     """Failover an instance.
3605
3606     The failover is done by shutting it down on its present node and
3607     starting it on the secondary.
3608
3609     """
3610     instance = self.instance
3611
3612     source_node = instance.primary_node
3613     target_node = instance.secondary_nodes[0]
3614
3615     feedback_fn("* checking disk consistency between source and target")
3616     for dev in instance.disks:
3617       # for drbd, these are drbd over lvm
3618       if not _CheckDiskConsistency(self, dev, target_node, False):
3619         if instance.admin_up and not self.op.ignore_consistency:
3620           raise errors.OpExecError("Disk %s is degraded on target node,"
3621                                    " aborting failover." % dev.iv_name)
3622
3623     feedback_fn("* shutting down instance on source node")
3624     logging.info("Shutting down instance %s on node %s",
3625                  instance.name, source_node)
3626
3627     result = self.rpc.call_instance_shutdown(source_node, instance)
3628     msg = result.RemoteFailMsg()
3629     if msg:
3630       if self.op.ignore_consistency:
3631         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3632                              " Proceeding anyway. Please make sure node"
3633                              " %s is down. Error details: %s",
3634                              instance.name, source_node, source_node, msg)
3635       else:
3636         raise errors.OpExecError("Could not shutdown instance %s on"
3637                                  " node %s: %s" %
3638                                  (instance.name, source_node, msg))
3639
3640     feedback_fn("* deactivating the instance's disks on source node")
3641     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3642       raise errors.OpExecError("Can't shut down the instance's disks.")
3643
3644     instance.primary_node = target_node
3645     # distribute new instance config to the other nodes
3646     self.cfg.Update(instance)
3647
3648     # Only start the instance if it's marked as up
3649     if instance.admin_up:
3650       feedback_fn("* activating the instance's disks on target node")
3651       logging.info("Starting instance %s on node %s",
3652                    instance.name, target_node)
3653
3654       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3655                                                ignore_secondaries=True)
3656       if not disks_ok:
3657         _ShutdownInstanceDisks(self, instance)
3658         raise errors.OpExecError("Can't activate the instance's disks")
3659
3660       feedback_fn("* starting the instance on the target node")
3661       result = self.rpc.call_instance_start(target_node, instance, None, None)
3662       msg = result.RemoteFailMsg()
3663       if msg:
3664         _ShutdownInstanceDisks(self, instance)
3665         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3666                                  (instance.name, target_node, msg))
3667
3668
3669 class LUMigrateInstance(LogicalUnit):
3670   """Migrate an instance.
3671
3672   This is migration without shutting down, compared to the failover,
3673   which is done with shutdown.
3674
3675   """
3676   HPATH = "instance-migrate"
3677   HTYPE = constants.HTYPE_INSTANCE
3678   _OP_REQP = ["instance_name", "live", "cleanup"]
3679
3680   REQ_BGL = False
3681
3682   def ExpandNames(self):
3683     self._ExpandAndLockInstance()
3684     self.needed_locks[locking.LEVEL_NODE] = []
3685     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3686
3687   def DeclareLocks(self, level):
3688     if level == locking.LEVEL_NODE:
3689       self._LockInstancesNodes()
3690
3691   def BuildHooksEnv(self):
3692     """Build hooks env.
3693
3694     This runs on master, primary and secondary nodes of the instance.
3695
3696     """
3697     env = _BuildInstanceHookEnvByObject(self, self.instance)
3698     env["MIGRATE_LIVE"] = self.op.live
3699     env["MIGRATE_CLEANUP"] = self.op.cleanup
3700     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3701     return env, nl, nl
3702
3703   def CheckPrereq(self):
3704     """Check prerequisites.
3705
3706     This checks that the instance is in the cluster.
3707
3708     """
3709     instance = self.cfg.GetInstanceInfo(
3710       self.cfg.ExpandInstanceName(self.op.instance_name))
3711     if instance is None:
3712       raise errors.OpPrereqError("Instance '%s' not known" %
3713                                  self.op.instance_name)
3714
3715     if instance.disk_template != constants.DT_DRBD8:
3716       raise errors.OpPrereqError("Instance's disk layout is not"
3717                                  " drbd8, cannot migrate.")
3718
3719     secondary_nodes = instance.secondary_nodes
3720     if not secondary_nodes:
3721       raise errors.ConfigurationError("No secondary node but using"
3722                                       " drbd8 disk template")
3723
3724     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3725
3726     target_node = secondary_nodes[0]
3727     # check memory requirements on the secondary node
3728     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3729                          instance.name, i_be[constants.BE_MEMORY],
3730                          instance.hypervisor)
3731
3732     # check bridge existance
3733     brlist = [nic.bridge for nic in instance.nics]
3734     result = self.rpc.call_bridges_exist(target_node, brlist)
3735     if result.failed or not result.data:
3736       raise errors.OpPrereqError("One or more target bridges %s does not"
3737                                  " exist on destination node '%s'" %
3738                                  (brlist, target_node))
3739
3740     if not self.op.cleanup:
3741       _CheckNodeNotDrained(self, target_node)
3742       result = self.rpc.call_instance_migratable(instance.primary_node,
3743                                                  instance)
3744       msg = result.RemoteFailMsg()
3745       if msg:
3746         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3747                                    msg)
3748
3749     self.instance = instance
3750
3751   def _WaitUntilSync(self):
3752     """Poll with custom rpc for disk sync.
3753
3754     This uses our own step-based rpc call.
3755
3756     """
3757     self.feedback_fn("* wait until resync is done")
3758     all_done = False
3759     while not all_done:
3760       all_done = True
3761       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3762                                             self.nodes_ip,
3763                                             self.instance.disks)
3764       min_percent = 100
3765       for node, nres in result.items():
3766         msg = nres.RemoteFailMsg()
3767         if msg:
3768           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3769                                    (node, msg))
3770         node_done, node_percent = nres.payload
3771         all_done = all_done and node_done
3772         if node_percent is not None:
3773           min_percent = min(min_percent, node_percent)
3774       if not all_done:
3775         if min_percent < 100:
3776           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3777         time.sleep(2)
3778
3779   def _EnsureSecondary(self, node):
3780     """Demote a node to secondary.
3781
3782     """
3783     self.feedback_fn("* switching node %s to secondary mode" % node)
3784
3785     for dev in self.instance.disks:
3786       self.cfg.SetDiskID(dev, node)
3787
3788     result = self.rpc.call_blockdev_close(node, self.instance.name,
3789                                           self.instance.disks)
3790     msg = result.RemoteFailMsg()
3791     if msg:
3792       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3793                                " error %s" % (node, msg))
3794
3795   def _GoStandalone(self):
3796     """Disconnect from the network.
3797
3798     """
3799     self.feedback_fn("* changing into standalone mode")
3800     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3801                                                self.instance.disks)
3802     for node, nres in result.items():
3803       msg = nres.RemoteFailMsg()
3804       if msg:
3805         raise errors.OpExecError("Cannot disconnect disks node %s,"
3806                                  " error %s" % (node, msg))
3807
3808   def _GoReconnect(self, multimaster):
3809     """Reconnect to the network.
3810
3811     """
3812     if multimaster:
3813       msg = "dual-master"
3814     else:
3815       msg = "single-master"
3816     self.feedback_fn("* changing disks into %s mode" % msg)
3817     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3818                                            self.instance.disks,
3819                                            self.instance.name, multimaster)
3820     for node, nres in result.items():
3821       msg = nres.RemoteFailMsg()
3822       if msg:
3823         raise errors.OpExecError("Cannot change disks config on node %s,"
3824                                  " error: %s" % (node, msg))
3825
3826   def _ExecCleanup(self):
3827     """Try to cleanup after a failed migration.
3828
3829     The cleanup is done by:
3830       - check that the instance is running only on one node
3831         (and update the config if needed)
3832       - change disks on its secondary node to secondary
3833       - wait until disks are fully synchronized
3834       - disconnect from the network
3835       - change disks into single-master mode
3836       - wait again until disks are fully synchronized
3837
3838     """
3839     instance = self.instance
3840     target_node = self.target_node
3841     source_node = self.source_node
3842
3843     # check running on only one node
3844     self.feedback_fn("* checking where the instance actually runs"
3845                      " (if this hangs, the hypervisor might be in"
3846                      " a bad state)")
3847     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3848     for node, result in ins_l.items():
3849       result.Raise()
3850       if not isinstance(result.data, list):
3851         raise errors.OpExecError("Can't contact node '%s'" % node)
3852
3853     runningon_source = instance.name in ins_l[source_node].data
3854     runningon_target = instance.name in ins_l[target_node].data
3855
3856     if runningon_source and runningon_target:
3857       raise errors.OpExecError("Instance seems to be running on two nodes,"
3858                                " or the hypervisor is confused. You will have"
3859                                " to ensure manually that it runs only on one"
3860                                " and restart this operation.")
3861
3862     if not (runningon_source or runningon_target):
3863       raise errors.OpExecError("Instance does not seem to be running at all."
3864                                " In this case, it's safer to repair by"
3865                                " running 'gnt-instance stop' to ensure disk"
3866                                " shutdown, and then restarting it.")
3867
3868     if runningon_target:
3869       # the migration has actually succeeded, we need to update the config
3870       self.feedback_fn("* instance running on secondary node (%s),"
3871                        " updating config" % target_node)
3872       instance.primary_node = target_node
3873       self.cfg.Update(instance)
3874       demoted_node = source_node
3875     else:
3876       self.feedback_fn("* instance confirmed to be running on its"
3877                        " primary node (%s)" % source_node)
3878       demoted_node = target_node
3879
3880     self._EnsureSecondary(demoted_node)
3881     try:
3882       self._WaitUntilSync()
3883     except errors.OpExecError:
3884       # we ignore here errors, since if the device is standalone, it
3885       # won't be able to sync
3886       pass
3887     self._GoStandalone()
3888     self._GoReconnect(False)
3889     self._WaitUntilSync()
3890
3891     self.feedback_fn("* done")
3892
3893   def _RevertDiskStatus(self):
3894     """Try to revert the disk status after a failed migration.
3895
3896     """
3897     target_node = self.target_node
3898     try:
3899       self._EnsureSecondary(target_node)
3900       self._GoStandalone()
3901       self._GoReconnect(False)
3902       self._WaitUntilSync()
3903     except errors.OpExecError, err:
3904       self.LogWarning("Migration failed and I can't reconnect the"
3905                       " drives: error '%s'\n"
3906                       "Please look and recover the instance status" %
3907                       str(err))
3908
3909   def _AbortMigration(self):
3910     """Call the hypervisor code to abort a started migration.
3911
3912     """
3913     instance = self.instance
3914     target_node = self.target_node
3915     migration_info = self.migration_info
3916
3917     abort_result = self.rpc.call_finalize_migration(target_node,
3918                                                     instance,
3919                                                     migration_info,
3920                                                     False)
3921     abort_msg = abort_result.RemoteFailMsg()
3922     if abort_msg:
3923       logging.error("Aborting migration failed on target node %s: %s" %
3924                     (target_node, abort_msg))
3925       # Don't raise an exception here, as we stil have to try to revert the
3926       # disk status, even if this step failed.
3927
3928   def _ExecMigration(self):
3929     """Migrate an instance.
3930
3931     The migrate is done by:
3932       - change the disks into dual-master mode
3933       - wait until disks are fully synchronized again
3934       - migrate the instance
3935       - change disks on the new secondary node (the old primary) to secondary
3936       - wait until disks are fully synchronized
3937       - change disks into single-master mode
3938
3939     """
3940     instance = self.instance
3941     target_node = self.target_node
3942     source_node = self.source_node
3943
3944     self.feedback_fn("* checking disk consistency between source and target")
3945     for dev in instance.disks:
3946       if not _CheckDiskConsistency(self, dev, target_node, False):
3947         raise errors.OpExecError("Disk %s is degraded or not fully"
3948                                  " synchronized on target node,"
3949                                  " aborting migrate." % dev.iv_name)
3950
3951     # First get the migration information from the remote node
3952     result = self.rpc.call_migration_info(source_node, instance)
3953     msg = result.RemoteFailMsg()
3954     if msg:
3955       log_err = ("Failed fetching source migration information from %s: %s" %
3956                  (source_node, msg))
3957       logging.error(log_err)
3958       raise errors.OpExecError(log_err)
3959
3960     self.migration_info = migration_info = result.payload
3961
3962     # Then switch the disks to master/master mode
3963     self._EnsureSecondary(target_node)
3964     self._GoStandalone()
3965     self._GoReconnect(True)
3966     self._WaitUntilSync()
3967
3968     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3969     result = self.rpc.call_accept_instance(target_node,
3970                                            instance,
3971                                            migration_info,
3972                                            self.nodes_ip[target_node])
3973
3974     msg = result.RemoteFailMsg()
3975     if msg:
3976       logging.error("Instance pre-migration failed, trying to revert"
3977                     " disk status: %s", msg)
3978       self._AbortMigration()
3979       self._RevertDiskStatus()
3980       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3981                                (instance.name, msg))
3982
3983     self.feedback_fn("* migrating instance to %s" % target_node)
3984     time.sleep(10)
3985     result = self.rpc.call_instance_migrate(source_node, instance,
3986                                             self.nodes_ip[target_node],
3987                                             self.op.live)
3988     msg = result.RemoteFailMsg()
3989     if msg:
3990       logging.error("Instance migration failed, trying to revert"
3991                     " disk status: %s", msg)
3992       self._AbortMigration()
3993       self._RevertDiskStatus()
3994       raise errors.OpExecError("Could not migrate instance %s: %s" %
3995                                (instance.name, msg))
3996     time.sleep(10)
3997
3998     instance.primary_node = target_node
3999     # distribute new instance config to the other nodes
4000     self.cfg.Update(instance)
4001
4002     result = self.rpc.call_finalize_migration(target_node,
4003                                               instance,
4004                                               migration_info,
4005                                               True)
4006     msg = result.RemoteFailMsg()
4007     if msg:
4008       logging.error("Instance migration succeeded, but finalization failed:"
4009                     " %s" % msg)
4010       raise errors.OpExecError("Could not finalize instance migration: %s" %
4011                                msg)
4012
4013     self._EnsureSecondary(source_node)
4014     self._WaitUntilSync()
4015     self._GoStandalone()
4016     self._GoReconnect(False)
4017     self._WaitUntilSync()
4018
4019     self.feedback_fn("* done")
4020
4021   def Exec(self, feedback_fn):
4022     """Perform the migration.
4023
4024     """
4025     self.feedback_fn = feedback_fn
4026
4027     self.source_node = self.instance.primary_node
4028     self.target_node = self.instance.secondary_nodes[0]
4029     self.all_nodes = [self.source_node, self.target_node]
4030     self.nodes_ip = {
4031       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4032       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4033       }
4034     if self.op.cleanup:
4035       return self._ExecCleanup()
4036     else:
4037       return self._ExecMigration()
4038
4039
4040 def _CreateBlockDev(lu, node, instance, device, force_create,
4041                     info, force_open):
4042   """Create a tree of block devices on a given node.
4043
4044   If this device type has to be created on secondaries, create it and
4045   all its children.
4046
4047   If not, just recurse to children keeping the same 'force' value.
4048
4049   @param lu: the lu on whose behalf we execute
4050   @param node: the node on which to create the device
4051   @type instance: L{objects.Instance}
4052   @param instance: the instance which owns the device
4053   @type device: L{objects.Disk}
4054   @param device: the device to create
4055   @type force_create: boolean
4056   @param force_create: whether to force creation of this device; this
4057       will be change to True whenever we find a device which has
4058       CreateOnSecondary() attribute
4059   @param info: the extra 'metadata' we should attach to the device
4060       (this will be represented as a LVM tag)
4061   @type force_open: boolean
4062   @param force_open: this parameter will be passes to the
4063       L{backend.BlockdevCreate} function where it specifies
4064       whether we run on primary or not, and it affects both
4065       the child assembly and the device own Open() execution
4066
4067   """
4068   if device.CreateOnSecondary():
4069     force_create = True
4070
4071   if device.children:
4072     for child in device.children:
4073       _CreateBlockDev(lu, node, instance, child, force_create,
4074                       info, force_open)
4075
4076   if not force_create:
4077     return
4078
4079   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4080
4081
4082 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4083   """Create a single block device on a given node.
4084
4085   This will not recurse over children of the device, so they must be
4086   created in advance.
4087
4088   @param lu: the lu on whose behalf we execute
4089   @param node: the node on which to create the device
4090   @type instance: L{objects.Instance}
4091   @param instance: the instance which owns the device
4092   @type device: L{objects.Disk}
4093   @param device: the device to create
4094   @param info: the extra 'metadata' we should attach to the device
4095       (this will be represented as a LVM tag)
4096   @type force_open: boolean
4097   @param force_open: this parameter will be passes to the
4098       L{backend.BlockdevCreate} function where it specifies
4099       whether we run on primary or not, and it affects both
4100       the child assembly and the device own Open() execution
4101
4102   """
4103   lu.cfg.SetDiskID(device, node)
4104   result = lu.rpc.call_blockdev_create(node, device, device.size,
4105                                        instance.name, force_open, info)
4106   msg = result.RemoteFailMsg()
4107   if msg:
4108     raise errors.OpExecError("Can't create block device %s on"
4109                              " node %s for instance %s: %s" %
4110                              (device, node, instance.name, msg))
4111   if device.physical_id is None:
4112     device.physical_id = result.payload
4113
4114
4115 def _GenerateUniqueNames(lu, exts):
4116   """Generate a suitable LV name.
4117
4118   This will generate a logical volume name for the given instance.
4119
4120   """
4121   results = []
4122   for val in exts:
4123     new_id = lu.cfg.GenerateUniqueID()
4124     results.append("%s%s" % (new_id, val))
4125   return results
4126
4127
4128 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4129                          p_minor, s_minor):
4130   """Generate a drbd8 device complete with its children.
4131
4132   """
4133   port = lu.cfg.AllocatePort()
4134   vgname = lu.cfg.GetVGName()
4135   shared_secret = lu.cfg.GenerateDRBDSecret()
4136   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4137                           logical_id=(vgname, names[0]))
4138   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4139                           logical_id=(vgname, names[1]))
4140   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4141                           logical_id=(primary, secondary, port,
4142                                       p_minor, s_minor,
4143                                       shared_secret),
4144                           children=[dev_data, dev_meta],
4145                           iv_name=iv_name)
4146   return drbd_dev
4147
4148
4149 def _GenerateDiskTemplate(lu, template_name,
4150                           instance_name, primary_node,
4151                           secondary_nodes, disk_info,
4152                           file_storage_dir, file_driver,
4153                           base_index):
4154   """Generate the entire disk layout for a given template type.
4155
4156   """
4157   #TODO: compute space requirements
4158
4159   vgname = lu.cfg.GetVGName()
4160   disk_count = len(disk_info)
4161   disks = []
4162   if template_name == constants.DT_DISKLESS:
4163     pass
4164   elif template_name == constants.DT_PLAIN:
4165     if len(secondary_nodes) != 0:
4166       raise errors.ProgrammerError("Wrong template configuration")
4167
4168     names = _GenerateUniqueNames(lu, [".disk%d" % i
4169                                       for i in range(disk_count)])
4170     for idx, disk in enumerate(disk_info):
4171       disk_index = idx + base_index
4172       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4173                               logical_id=(vgname, names[idx]),
4174                               iv_name="disk/%d" % disk_index,
4175                               mode=disk["mode"])
4176       disks.append(disk_dev)
4177   elif template_name == constants.DT_DRBD8:
4178     if len(secondary_nodes) != 1:
4179       raise errors.ProgrammerError("Wrong template configuration")
4180     remote_node = secondary_nodes[0]
4181     minors = lu.cfg.AllocateDRBDMinor(
4182       [primary_node, remote_node] * len(disk_info), instance_name)
4183
4184     names = []
4185     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4186                                                for i in range(disk_count)]):
4187       names.append(lv_prefix + "_data")
4188       names.append(lv_prefix + "_meta")
4189     for idx, disk in enumerate(disk_info):
4190       disk_index = idx + base_index
4191       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4192                                       disk["size"], names[idx*2:idx*2+2],
4193                                       "disk/%d" % disk_index,
4194                                       minors[idx*2], minors[idx*2+1])
4195       disk_dev.mode = disk["mode"]
4196       disks.append(disk_dev)
4197   elif template_name == constants.DT_FILE:
4198     if len(secondary_nodes) != 0:
4199       raise errors.ProgrammerError("Wrong template configuration")
4200
4201     for idx, disk in enumerate(disk_info):
4202       disk_index = idx + base_index
4203       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4204                               iv_name="disk/%d" % disk_index,
4205                               logical_id=(file_driver,
4206                                           "%s/disk%d" % (file_storage_dir,
4207                                                          disk_index)),
4208                               mode=disk["mode"])
4209       disks.append(disk_dev)
4210   else:
4211     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4212   return disks
4213
4214
4215 def _GetInstanceInfoText(instance):
4216   """Compute that text that should be added to the disk's metadata.
4217
4218   """
4219   return "originstname+%s" % instance.name
4220
4221
4222 def _CreateDisks(lu, instance):
4223   """Create all disks for an instance.
4224
4225   This abstracts away some work from AddInstance.
4226
4227   @type lu: L{LogicalUnit}
4228   @param lu: the logical unit on whose behalf we execute
4229   @type instance: L{objects.Instance}
4230   @param instance: the instance whose disks we should create
4231   @rtype: boolean
4232   @return: the success of the creation
4233
4234   """
4235   info = _GetInstanceInfoText(instance)
4236   pnode = instance.primary_node
4237
4238   if instance.disk_template == constants.DT_FILE:
4239     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4240     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4241
4242     if result.failed or not result.data:
4243       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4244
4245     if not result.data[0]:
4246       raise errors.OpExecError("Failed to create directory '%s'" %
4247                                file_storage_dir)
4248
4249   # Note: this needs to be kept in sync with adding of disks in
4250   # LUSetInstanceParams
4251   for device in instance.disks:
4252     logging.info("Creating volume %s for instance %s",
4253                  device.iv_name, instance.name)
4254     #HARDCODE
4255     for node in instance.all_nodes:
4256       f_create = node == pnode
4257       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4258
4259
4260 def _RemoveDisks(lu, instance):
4261   """Remove all disks for an instance.
4262
4263   This abstracts away some work from `AddInstance()` and
4264   `RemoveInstance()`. Note that in case some of the devices couldn't
4265   be removed, the removal will continue with the other ones (compare
4266   with `_CreateDisks()`).
4267
4268   @type lu: L{LogicalUnit}
4269   @param lu: the logical unit on whose behalf we execute
4270   @type instance: L{objects.Instance}
4271   @param instance: the instance whose disks we should remove
4272   @rtype: boolean
4273   @return: the success of the removal
4274
4275   """
4276   logging.info("Removing block devices for instance %s", instance.name)
4277
4278   all_result = True
4279   for device in instance.disks:
4280     for node, disk in device.ComputeNodeTree(instance.primary_node):
4281       lu.cfg.SetDiskID(disk, node)
4282       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4283       if msg:
4284         lu.LogWarning("Could not remove block device %s on node %s,"
4285                       " continuing anyway: %s", device.iv_name, node, msg)
4286         all_result = False
4287
4288   if instance.disk_template == constants.DT_FILE:
4289     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4290     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4291                                                  file_storage_dir)
4292     if result.failed or not result.data:
4293       logging.error("Could not remove directory '%s'", file_storage_dir)
4294       all_result = False
4295
4296   return all_result
4297
4298
4299 def _ComputeDiskSize(disk_template, disks):
4300   """Compute disk size requirements in the volume group
4301
4302   """
4303   # Required free disk space as a function of disk and swap space
4304   req_size_dict = {
4305     constants.DT_DISKLESS: None,
4306     constants.DT_PLAIN: sum(d["size"] for d in disks),
4307     # 128 MB are added for drbd metadata for each disk
4308     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4309     constants.DT_FILE: None,
4310   }
4311
4312   if disk_template not in req_size_dict:
4313     raise errors.ProgrammerError("Disk template '%s' size requirement"
4314                                  " is unknown" %  disk_template)
4315
4316   return req_size_dict[disk_template]
4317
4318
4319 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4320   """Hypervisor parameter validation.
4321
4322   This function abstract the hypervisor parameter validation to be
4323   used in both instance create and instance modify.
4324
4325   @type lu: L{LogicalUnit}
4326   @param lu: the logical unit for which we check
4327   @type nodenames: list
4328   @param nodenames: the list of nodes on which we should check
4329   @type hvname: string
4330   @param hvname: the name of the hypervisor we should use
4331   @type hvparams: dict
4332   @param hvparams: the parameters which we need to check
4333   @raise errors.OpPrereqError: if the parameters are not valid
4334
4335   """
4336   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4337                                                   hvname,
4338                                                   hvparams)
4339   for node in nodenames:
4340     info = hvinfo[node]
4341     if info.offline:
4342       continue
4343     msg = info.RemoteFailMsg()
4344     if msg:
4345       raise errors.OpPrereqError("Hypervisor parameter validation"
4346                                  " failed on node %s: %s" % (node, msg))
4347
4348
4349 class LUCreateInstance(LogicalUnit):
4350   """Create an instance.
4351
4352   """
4353   HPATH = "instance-add"
4354   HTYPE = constants.HTYPE_INSTANCE
4355   _OP_REQP = ["instance_name", "disks", "disk_template",
4356               "mode", "start",
4357               "wait_for_sync", "ip_check", "nics",
4358               "hvparams", "beparams"]
4359   REQ_BGL = False
4360
4361   def _ExpandNode(self, node):
4362     """Expands and checks one node name.
4363
4364     """
4365     node_full = self.cfg.ExpandNodeName(node)
4366     if node_full is None:
4367       raise errors.OpPrereqError("Unknown node %s" % node)
4368     return node_full
4369
4370   def ExpandNames(self):
4371     """ExpandNames for CreateInstance.
4372
4373     Figure out the right locks for instance creation.
4374
4375     """
4376     self.needed_locks = {}
4377
4378     # set optional parameters to none if they don't exist
4379     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4380       if not hasattr(self.op, attr):
4381         setattr(self.op, attr, None)
4382
4383     # cheap checks, mostly valid constants given
4384
4385     # verify creation mode
4386     if self.op.mode not in (constants.INSTANCE_CREATE,
4387                             constants.INSTANCE_IMPORT):
4388       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4389                                  self.op.mode)
4390
4391     # disk template and mirror node verification
4392     if self.op.disk_template not in constants.DISK_TEMPLATES:
4393       raise errors.OpPrereqError("Invalid disk template name")
4394
4395     if self.op.hypervisor is None:
4396       self.op.hypervisor = self.cfg.GetHypervisorType()
4397
4398     cluster = self.cfg.GetClusterInfo()
4399     enabled_hvs = cluster.enabled_hypervisors
4400     if self.op.hypervisor not in enabled_hvs:
4401       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4402                                  " cluster (%s)" % (self.op.hypervisor,
4403                                   ",".join(enabled_hvs)))
4404
4405     # check hypervisor parameter syntax (locally)
4406     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4407     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4408                                   self.op.hvparams)
4409     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4410     hv_type.CheckParameterSyntax(filled_hvp)
4411
4412     # fill and remember the beparams dict
4413     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4414     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4415                                     self.op.beparams)
4416
4417     #### instance parameters check
4418
4419     # instance name verification
4420     hostname1 = utils.HostInfo(self.op.instance_name)
4421     self.op.instance_name = instance_name = hostname1.name
4422
4423     # this is just a preventive check, but someone might still add this
4424     # instance in the meantime, and creation will fail at lock-add time
4425     if instance_name in self.cfg.GetInstanceList():
4426       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4427                                  instance_name)
4428
4429     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4430
4431     # NIC buildup
4432     self.nics = []
4433     for nic in self.op.nics:
4434       # ip validity checks
4435       ip = nic.get("ip", None)
4436       if ip is None or ip.lower() == "none":
4437         nic_ip = None
4438       elif ip.lower() == constants.VALUE_AUTO:
4439         nic_ip = hostname1.ip
4440       else:
4441         if not utils.IsValidIP(ip):
4442           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4443                                      " like a valid IP" % ip)
4444         nic_ip = ip
4445
4446       # MAC address verification
4447       mac = nic.get("mac", constants.VALUE_AUTO)
4448       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4449         if not utils.IsValidMac(mac.lower()):
4450           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4451                                      mac)
4452       # bridge verification
4453       bridge = nic.get("bridge", None)
4454       if bridge is None:
4455         bridge = self.cfg.GetDefBridge()
4456       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4457
4458     # disk checks/pre-build
4459     self.disks = []
4460     for disk in self.op.disks:
4461       mode = disk.get("mode", constants.DISK_RDWR)
4462       if mode not in constants.DISK_ACCESS_SET:
4463         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4464                                    mode)
4465       size = disk.get("size", None)
4466       if size is None:
4467         raise errors.OpPrereqError("Missing disk size")
4468       try:
4469         size = int(size)
4470       except ValueError:
4471         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4472       self.disks.append({"size": size, "mode": mode})
4473
4474     # used in CheckPrereq for ip ping check
4475     self.check_ip = hostname1.ip
4476
4477     # file storage checks
4478     if (self.op.file_driver and
4479         not self.op.file_driver in constants.FILE_DRIVER):
4480       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4481                                  self.op.file_driver)
4482
4483     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4484       raise errors.OpPrereqError("File storage directory path not absolute")
4485
4486     ### Node/iallocator related checks
4487     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4488       raise errors.OpPrereqError("One and only one of iallocator and primary"
4489                                  " node must be given")
4490
4491     if self.op.iallocator:
4492       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4493     else:
4494       self.op.pnode = self._ExpandNode(self.op.pnode)
4495       nodelist = [self.op.pnode]
4496       if self.op.snode is not None:
4497         self.op.snode = self._ExpandNode(self.op.snode)
4498         nodelist.append(self.op.snode)
4499       self.needed_locks[locking.LEVEL_NODE] = nodelist
4500
4501     # in case of import lock the source node too
4502     if self.op.mode == constants.INSTANCE_IMPORT:
4503       src_node = getattr(self.op, "src_node", None)
4504       src_path = getattr(self.op, "src_path", None)
4505
4506       if src_path is None:
4507         self.op.src_path = src_path = self.op.instance_name
4508
4509       if src_node is None:
4510         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4511         self.op.src_node = None
4512         if os.path.isabs(src_path):
4513           raise errors.OpPrereqError("Importing an instance from an absolute"
4514                                      " path requires a source node option.")
4515       else:
4516         self.op.src_node = src_node = self._ExpandNode(src_node)
4517         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4518           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4519         if not os.path.isabs(src_path):
4520           self.op.src_path = src_path = \
4521             os.path.join(constants.EXPORT_DIR, src_path)
4522
4523     else: # INSTANCE_CREATE
4524       if getattr(self.op, "os_type", None) is None:
4525         raise errors.OpPrereqError("No guest OS specified")
4526
4527   def _RunAllocator(self):
4528     """Run the allocator based on input opcode.
4529
4530     """
4531     nics = [n.ToDict() for n in self.nics]
4532     ial = IAllocator(self,
4533                      mode=constants.IALLOCATOR_MODE_ALLOC,
4534                      name=self.op.instance_name,
4535                      disk_template=self.op.disk_template,
4536                      tags=[],
4537                      os=self.op.os_type,
4538                      vcpus=self.be_full[constants.BE_VCPUS],
4539                      mem_size=self.be_full[constants.BE_MEMORY],
4540                      disks=self.disks,
4541                      nics=nics,
4542                      hypervisor=self.op.hypervisor,
4543                      )
4544
4545     ial.Run(self.op.iallocator)
4546
4547     if not ial.success:
4548       raise errors.OpPrereqError("Can't compute nodes using"
4549                                  " iallocator '%s': %s" % (self.op.iallocator,
4550                                                            ial.info))
4551     if len(ial.nodes) != ial.required_nodes:
4552       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4553                                  " of nodes (%s), required %s" %
4554                                  (self.op.iallocator, len(ial.nodes),
4555                                   ial.required_nodes))
4556     self.op.pnode = ial.nodes[0]
4557     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4558                  self.op.instance_name, self.op.iallocator,
4559                  ", ".join(ial.nodes))
4560     if ial.required_nodes == 2:
4561       self.op.snode = ial.nodes[1]
4562
4563   def BuildHooksEnv(self):
4564     """Build hooks env.
4565
4566     This runs on master, primary and secondary nodes of the instance.
4567
4568     """
4569     env = {
4570       "ADD_MODE": self.op.mode,
4571       }
4572     if self.op.mode == constants.INSTANCE_IMPORT:
4573       env["SRC_NODE"] = self.op.src_node
4574       env["SRC_PATH"] = self.op.src_path
4575       env["SRC_IMAGES"] = self.src_images
4576
4577     env.update(_BuildInstanceHookEnv(
4578       name=self.op.instance_name,
4579       primary_node=self.op.pnode,
4580       secondary_nodes=self.secondaries,
4581       status=self.op.start,
4582       os_type=self.op.os_type,
4583       memory=self.be_full[constants.BE_MEMORY],
4584       vcpus=self.be_full[constants.BE_VCPUS],
4585       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4586       disk_template=self.op.disk_template,
4587       disks=[(d["size"], d["mode"]) for d in self.disks],
4588     ))
4589
4590     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4591           self.secondaries)
4592     return env, nl, nl
4593
4594
4595   def CheckPrereq(self):
4596     """Check prerequisites.
4597
4598     """
4599     if (not self.cfg.GetVGName() and
4600         self.op.disk_template not in constants.DTS_NOT_LVM):
4601       raise errors.OpPrereqError("Cluster does not support lvm-based"
4602                                  " instances")
4603
4604     if self.op.mode == constants.INSTANCE_IMPORT:
4605       src_node = self.op.src_node
4606       src_path = self.op.src_path
4607
4608       if src_node is None:
4609         exp_list = self.rpc.call_export_list(
4610           self.acquired_locks[locking.LEVEL_NODE])
4611         found = False
4612         for node in exp_list:
4613           if not exp_list[node].failed and src_path in exp_list[node].data:
4614             found = True
4615             self.op.src_node = src_node = node
4616             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4617                                                        src_path)
4618             break
4619         if not found:
4620           raise errors.OpPrereqError("No export found for relative path %s" %
4621                                       src_path)
4622
4623       _CheckNodeOnline(self, src_node)
4624       result = self.rpc.call_export_info(src_node, src_path)
4625       result.Raise()
4626       if not result.data:
4627         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4628
4629       export_info = result.data
4630       if not export_info.has_section(constants.INISECT_EXP):
4631         raise errors.ProgrammerError("Corrupted export config")
4632
4633       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4634       if (int(ei_version) != constants.EXPORT_VERSION):
4635         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4636                                    (ei_version, constants.EXPORT_VERSION))
4637
4638       # Check that the new instance doesn't have less disks than the export
4639       instance_disks = len(self.disks)
4640       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4641       if instance_disks < export_disks:
4642         raise errors.OpPrereqError("Not enough disks to import."
4643                                    " (instance: %d, export: %d)" %
4644                                    (instance_disks, export_disks))
4645
4646       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4647       disk_images = []
4648       for idx in range(export_disks):
4649         option = 'disk%d_dump' % idx
4650         if export_info.has_option(constants.INISECT_INS, option):
4651           # FIXME: are the old os-es, disk sizes, etc. useful?
4652           export_name = export_info.get(constants.INISECT_INS, option)
4653           image = os.path.join(src_path, export_name)
4654           disk_images.append(image)
4655         else:
4656           disk_images.append(False)
4657
4658       self.src_images = disk_images
4659
4660       old_name = export_info.get(constants.INISECT_INS, 'name')
4661       # FIXME: int() here could throw a ValueError on broken exports
4662       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4663       if self.op.instance_name == old_name:
4664         for idx, nic in enumerate(self.nics):
4665           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4666             nic_mac_ini = 'nic%d_mac' % idx
4667             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4668
4669     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4670     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4671     if self.op.start and not self.op.ip_check:
4672       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4673                                  " adding an instance in start mode")
4674
4675     if self.op.ip_check:
4676       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4677         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4678                                    (self.check_ip, self.op.instance_name))
4679
4680     #### mac address generation
4681     # By generating here the mac address both the allocator and the hooks get
4682     # the real final mac address rather than the 'auto' or 'generate' value.
4683     # There is a race condition between the generation and the instance object
4684     # creation, which means that we know the mac is valid now, but we're not
4685     # sure it will be when we actually add the instance. If things go bad
4686     # adding the instance will abort because of a duplicate mac, and the
4687     # creation job will fail.
4688     for nic in self.nics:
4689       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4690         nic.mac = self.cfg.GenerateMAC()
4691
4692     #### allocator run
4693
4694     if self.op.iallocator is not None:
4695       self._RunAllocator()
4696
4697     #### node related checks
4698
4699     # check primary node
4700     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4701     assert self.pnode is not None, \
4702       "Cannot retrieve locked node %s" % self.op.pnode
4703     if pnode.offline:
4704       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4705                                  pnode.name)
4706     if pnode.drained:
4707       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4708                                  pnode.name)
4709
4710     self.secondaries = []
4711
4712     # mirror node verification
4713     if self.op.disk_template in constants.DTS_NET_MIRROR:
4714       if self.op.snode is None:
4715         raise errors.OpPrereqError("The networked disk templates need"
4716                                    " a mirror node")
4717       if self.op.snode == pnode.name:
4718         raise errors.OpPrereqError("The secondary node cannot be"
4719                                    " the primary node.")
4720       _CheckNodeOnline(self, self.op.snode)
4721       _CheckNodeNotDrained(self, self.op.snode)
4722       self.secondaries.append(self.op.snode)
4723
4724     nodenames = [pnode.name] + self.secondaries
4725
4726     req_size = _ComputeDiskSize(self.op.disk_template,
4727                                 self.disks)
4728
4729     # Check lv size requirements
4730     if req_size is not None:
4731       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4732                                          self.op.hypervisor)
4733       for node in nodenames:
4734         info = nodeinfo[node]
4735         info.Raise()
4736         info = info.data
4737         if not info:
4738           raise errors.OpPrereqError("Cannot get current information"
4739                                      " from node '%s'" % node)
4740         vg_free = info.get('vg_free', None)
4741         if not isinstance(vg_free, int):
4742           raise errors.OpPrereqError("Can't compute free disk space on"
4743                                      " node %s" % node)
4744         if req_size > info['vg_free']:
4745           raise errors.OpPrereqError("Not enough disk space on target node %s."
4746                                      " %d MB available, %d MB required" %
4747                                      (node, info['vg_free'], req_size))
4748
4749     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4750
4751     # os verification
4752     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4753     result.Raise()
4754     if not isinstance(result.data, objects.OS):
4755       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4756                                  " primary node"  % self.op.os_type)
4757
4758     # bridge check on primary node
4759     bridges = [n.bridge for n in self.nics]
4760     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4761     result.Raise()
4762     if not result.data:
4763       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4764                                  " exist on destination node '%s'" %
4765                                  (",".join(bridges), pnode.name))
4766
4767     # memory check on primary node
4768     if self.op.start:
4769       _CheckNodeFreeMemory(self, self.pnode.name,
4770                            "creating instance %s" % self.op.instance_name,
4771                            self.be_full[constants.BE_MEMORY],
4772                            self.op.hypervisor)
4773
4774   def Exec(self, feedback_fn):
4775     """Create and add the instance to the cluster.
4776
4777     """
4778     instance = self.op.instance_name
4779     pnode_name = self.pnode.name
4780
4781     ht_kind = self.op.hypervisor
4782     if ht_kind in constants.HTS_REQ_PORT:
4783       network_port = self.cfg.AllocatePort()
4784     else:
4785       network_port = None
4786
4787     ##if self.op.vnc_bind_address is None:
4788     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4789
4790     # this is needed because os.path.join does not accept None arguments
4791     if self.op.file_storage_dir is None:
4792       string_file_storage_dir = ""
4793     else:
4794       string_file_storage_dir = self.op.file_storage_dir
4795
4796     # build the full file storage dir path
4797     file_storage_dir = os.path.normpath(os.path.join(
4798                                         self.cfg.GetFileStorageDir(),
4799                                         string_file_storage_dir, instance))
4800
4801
4802     disks = _GenerateDiskTemplate(self,
4803                                   self.op.disk_template,
4804                                   instance, pnode_name,
4805                                   self.secondaries,
4806                                   self.disks,
4807                                   file_storage_dir,
4808                                   self.op.file_driver,
4809                                   0)
4810
4811     iobj = objects.Instance(name=instance, os=self.op.os_type,
4812                             primary_node=pnode_name,
4813                             nics=self.nics, disks=disks,
4814                             disk_template=self.op.disk_template,
4815                             admin_up=False,
4816                             network_port=network_port,
4817                             beparams=self.op.beparams,
4818                             hvparams=self.op.hvparams,
4819                             hypervisor=self.op.hypervisor,
4820                             )
4821
4822     feedback_fn("* creating instance disks...")
4823     try:
4824       _CreateDisks(self, iobj)
4825     except errors.OpExecError:
4826       self.LogWarning("Device creation failed, reverting...")
4827       try:
4828         _RemoveDisks(self, iobj)
4829       finally:
4830         self.cfg.ReleaseDRBDMinors(instance)
4831         raise
4832
4833     feedback_fn("adding instance %s to cluster config" % instance)
4834
4835     self.cfg.AddInstance(iobj)
4836     # Declare that we don't want to remove the instance lock anymore, as we've
4837     # added the instance to the config
4838     del self.remove_locks[locking.LEVEL_INSTANCE]
4839     # Unlock all the nodes
4840     if self.op.mode == constants.INSTANCE_IMPORT:
4841       nodes_keep = [self.op.src_node]
4842       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4843                        if node != self.op.src_node]
4844       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4845       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4846     else:
4847       self.context.glm.release(locking.LEVEL_NODE)
4848       del self.acquired_locks[locking.LEVEL_NODE]
4849
4850     if self.op.wait_for_sync:
4851       disk_abort = not _WaitForSync(self, iobj)
4852     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4853       # make sure the disks are not degraded (still sync-ing is ok)
4854       time.sleep(15)
4855       feedback_fn("* checking mirrors status")
4856       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4857     else:
4858       disk_abort = False
4859
4860     if disk_abort:
4861       _RemoveDisks(self, iobj)
4862       self.cfg.RemoveInstance(iobj.name)
4863       # Make sure the instance lock gets removed
4864       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4865       raise errors.OpExecError("There are some degraded disks for"
4866                                " this instance")
4867
4868     feedback_fn("creating os for instance %s on node %s" %
4869                 (instance, pnode_name))
4870
4871     if iobj.disk_template != constants.DT_DISKLESS:
4872       if self.op.mode == constants.INSTANCE_CREATE:
4873         feedback_fn("* running the instance OS create scripts...")
4874         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4875         msg = result.RemoteFailMsg()
4876         if msg:
4877           raise errors.OpExecError("Could not add os for instance %s"
4878                                    " on node %s: %s" %
4879                                    (instance, pnode_name, msg))
4880
4881       elif self.op.mode == constants.INSTANCE_IMPORT:
4882         feedback_fn("* running the instance OS import scripts...")
4883         src_node = self.op.src_node
4884         src_images = self.src_images
4885         cluster_name = self.cfg.GetClusterName()
4886         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4887                                                          src_node, src_images,
4888                                                          cluster_name)
4889         import_result.Raise()
4890         for idx, result in enumerate(import_result.data):
4891           if not result:
4892             self.LogWarning("Could not import the image %s for instance"
4893                             " %s, disk %d, on node %s" %
4894                             (src_images[idx], instance, idx, pnode_name))
4895       else:
4896         # also checked in the prereq part
4897         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4898                                      % self.op.mode)
4899
4900     if self.op.start:
4901       iobj.admin_up = True
4902       self.cfg.Update(iobj)
4903       logging.info("Starting instance %s on node %s", instance, pnode_name)
4904       feedback_fn("* starting instance...")
4905       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4906       msg = result.RemoteFailMsg()
4907       if msg:
4908         raise errors.OpExecError("Could not start instance: %s" % msg)
4909
4910
4911 class LUConnectConsole(NoHooksLU):
4912   """Connect to an instance's console.
4913
4914   This is somewhat special in that it returns the command line that
4915   you need to run on the master node in order to connect to the
4916   console.
4917
4918   """
4919   _OP_REQP = ["instance_name"]
4920   REQ_BGL = False
4921
4922   def ExpandNames(self):
4923     self._ExpandAndLockInstance()
4924
4925   def CheckPrereq(self):
4926     """Check prerequisites.
4927
4928     This checks that the instance is in the cluster.
4929
4930     """
4931     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4932     assert self.instance is not None, \
4933       "Cannot retrieve locked instance %s" % self.op.instance_name
4934     _CheckNodeOnline(self, self.instance.primary_node)
4935
4936   def Exec(self, feedback_fn):
4937     """Connect to the console of an instance
4938
4939     """
4940     instance = self.instance
4941     node = instance.primary_node
4942
4943     node_insts = self.rpc.call_instance_list([node],
4944                                              [instance.hypervisor])[node]
4945     node_insts.Raise()
4946
4947     if instance.name not in node_insts.data:
4948       raise errors.OpExecError("Instance %s is not running." % instance.name)
4949
4950     logging.debug("Connecting to console of %s on %s", instance.name, node)
4951
4952     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4953     cluster = self.cfg.GetClusterInfo()
4954     # beparams and hvparams are passed separately, to avoid editing the
4955     # instance and then saving the defaults in the instance itself.
4956     hvparams = cluster.FillHV(instance)
4957     beparams = cluster.FillBE(instance)
4958     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4959
4960     # build ssh cmdline
4961     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4962
4963
4964 class LUReplaceDisks(LogicalUnit):
4965   """Replace the disks of an instance.
4966
4967   """
4968   HPATH = "mirrors-replace"
4969   HTYPE = constants.HTYPE_INSTANCE
4970   _OP_REQP = ["instance_name", "mode", "disks"]
4971   REQ_BGL = False
4972
4973   def CheckArguments(self):
4974     if not hasattr(self.op, "remote_node"):
4975       self.op.remote_node = None
4976     if not hasattr(self.op, "iallocator"):
4977       self.op.iallocator = None
4978
4979     # check for valid parameter combination
4980     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4981     if self.op.mode == constants.REPLACE_DISK_CHG:
4982       if cnt == 2:
4983         raise errors.OpPrereqError("When changing the secondary either an"
4984                                    " iallocator script must be used or the"
4985                                    " new node given")
4986       elif cnt == 0:
4987         raise errors.OpPrereqError("Give either the iallocator or the new"
4988                                    " secondary, not both")
4989     else: # not replacing the secondary
4990       if cnt != 2:
4991         raise errors.OpPrereqError("The iallocator and new node options can"
4992                                    " be used only when changing the"
4993                                    " secondary node")
4994
4995   def ExpandNames(self):
4996     self._ExpandAndLockInstance()
4997
4998     if self.op.iallocator is not None:
4999       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5000     elif self.op.remote_node is not None:
5001       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5002       if remote_node is None:
5003         raise errors.OpPrereqError("Node '%s' not known" %
5004                                    self.op.remote_node)
5005       self.op.remote_node = remote_node
5006       # Warning: do not remove the locking of the new secondary here
5007       # unless DRBD8.AddChildren is changed to work in parallel;
5008       # currently it doesn't since parallel invocations of
5009       # FindUnusedMinor will conflict
5010       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5011       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5012     else:
5013       self.needed_locks[locking.LEVEL_NODE] = []
5014       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5015
5016   def DeclareLocks(self, level):
5017     # If we're not already locking all nodes in the set we have to declare the
5018     # instance's primary/secondary nodes.
5019     if (level == locking.LEVEL_NODE and
5020         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5021       self._LockInstancesNodes()
5022
5023   def _RunAllocator(self):
5024     """Compute a new secondary node using an IAllocator.
5025
5026     """
5027     ial = IAllocator(self,
5028                      mode=constants.IALLOCATOR_MODE_RELOC,
5029                      name=self.op.instance_name,
5030                      relocate_from=[self.sec_node])
5031
5032     ial.Run(self.op.iallocator)
5033
5034     if not ial.success:
5035       raise errors.OpPrereqError("Can't compute nodes using"
5036                                  " iallocator '%s': %s" % (self.op.iallocator,
5037                                                            ial.info))
5038     if len(ial.nodes) != ial.required_nodes:
5039       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5040                                  " of nodes (%s), required %s" %
5041                                  (len(ial.nodes), ial.required_nodes))
5042     self.op.remote_node = ial.nodes[0]
5043     self.LogInfo("Selected new secondary for the instance: %s",
5044                  self.op.remote_node)
5045
5046   def BuildHooksEnv(self):
5047     """Build hooks env.
5048
5049     This runs on the master, the primary and all the secondaries.
5050
5051     """
5052     env = {
5053       "MODE": self.op.mode,
5054       "NEW_SECONDARY": self.op.remote_node,
5055       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5056       }
5057     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5058     nl = [
5059       self.cfg.GetMasterNode(),
5060       self.instance.primary_node,
5061       ]
5062     if self.op.remote_node is not None:
5063       nl.append(self.op.remote_node)
5064     return env, nl, nl
5065
5066   def CheckPrereq(self):
5067     """Check prerequisites.
5068
5069     This checks that the instance is in the cluster.
5070
5071     """
5072     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5073     assert instance is not None, \
5074       "Cannot retrieve locked instance %s" % self.op.instance_name
5075     self.instance = instance
5076
5077     if instance.disk_template != constants.DT_DRBD8:
5078       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5079                                  " instances")
5080
5081     if len(instance.secondary_nodes) != 1:
5082       raise errors.OpPrereqError("The instance has a strange layout,"
5083                                  " expected one secondary but found %d" %
5084                                  len(instance.secondary_nodes))
5085
5086     self.sec_node = instance.secondary_nodes[0]
5087
5088     if self.op.iallocator is not None:
5089       self._RunAllocator()
5090
5091     remote_node = self.op.remote_node
5092     if remote_node is not None:
5093       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5094       assert self.remote_node_info is not None, \
5095         "Cannot retrieve locked node %s" % remote_node
5096     else:
5097       self.remote_node_info = None
5098     if remote_node == instance.primary_node:
5099       raise errors.OpPrereqError("The specified node is the primary node of"
5100                                  " the instance.")
5101     elif remote_node == self.sec_node:
5102       raise errors.OpPrereqError("The specified node is already the"
5103                                  " secondary node of the instance.")
5104
5105     if self.op.mode == constants.REPLACE_DISK_PRI:
5106       n1 = self.tgt_node = instance.primary_node
5107       n2 = self.oth_node = self.sec_node
5108     elif self.op.mode == constants.REPLACE_DISK_SEC:
5109       n1 = self.tgt_node = self.sec_node
5110       n2 = self.oth_node = instance.primary_node
5111     elif self.op.mode == constants.REPLACE_DISK_CHG:
5112       n1 = self.new_node = remote_node
5113       n2 = self.oth_node = instance.primary_node
5114       self.tgt_node = self.sec_node
5115       _CheckNodeNotDrained(self, remote_node)
5116     else:
5117       raise errors.ProgrammerError("Unhandled disk replace mode")
5118
5119     _CheckNodeOnline(self, n1)
5120     _CheckNodeOnline(self, n2)
5121
5122     if not self.op.disks:
5123       self.op.disks = range(len(instance.disks))
5124
5125     for disk_idx in self.op.disks:
5126       instance.FindDisk(disk_idx)
5127
5128   def _ExecD8DiskOnly(self, feedback_fn):
5129     """Replace a disk on the primary or secondary for dbrd8.
5130
5131     The algorithm for replace is quite complicated:
5132
5133       1. for each disk to be replaced:
5134
5135         1. create new LVs on the target node with unique names
5136         1. detach old LVs from the drbd device
5137         1. rename old LVs to name_replaced.<time_t>
5138         1. rename new LVs to old LVs
5139         1. attach the new LVs (with the old names now) to the drbd device
5140
5141       1. wait for sync across all devices
5142
5143       1. for each modified disk:
5144
5145         1. remove old LVs (which have the name name_replaces.<time_t>)
5146
5147     Failures are not very well handled.
5148
5149     """
5150     steps_total = 6
5151     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5152     instance = self.instance
5153     iv_names = {}
5154     vgname = self.cfg.GetVGName()
5155     # start of work
5156     cfg = self.cfg
5157     tgt_node = self.tgt_node
5158     oth_node = self.oth_node
5159
5160     # Step: check device activation
5161     self.proc.LogStep(1, steps_total, "check device existence")
5162     info("checking volume groups")
5163     my_vg = cfg.GetVGName()
5164     results = self.rpc.call_vg_list([oth_node, tgt_node])
5165     if not results:
5166       raise errors.OpExecError("Can't list volume groups on the nodes")
5167     for node in oth_node, tgt_node:
5168       res = results[node]
5169       if res.failed or not res.data or my_vg not in res.data:
5170         raise errors.OpExecError("Volume group '%s' not found on %s" %
5171                                  (my_vg, node))
5172     for idx, dev in enumerate(instance.disks):
5173       if idx not in self.op.disks:
5174         continue
5175       for node in tgt_node, oth_node:
5176         info("checking disk/%d on %s" % (idx, node))
5177         cfg.SetDiskID(dev, node)
5178         result = self.rpc.call_blockdev_find(node, dev)
5179         msg = result.RemoteFailMsg()
5180         if not msg and not result.payload:
5181           msg = "disk not found"
5182         if msg:
5183           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5184                                    (idx, node, msg))
5185
5186     # Step: check other node consistency
5187     self.proc.LogStep(2, steps_total, "check peer consistency")
5188     for idx, dev in enumerate(instance.disks):
5189       if idx not in self.op.disks:
5190         continue
5191       info("checking disk/%d consistency on %s" % (idx, oth_node))
5192       if not _CheckDiskConsistency(self, dev, oth_node,
5193                                    oth_node==instance.primary_node):
5194         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5195                                  " to replace disks on this node (%s)" %
5196                                  (oth_node, tgt_node))
5197
5198     # Step: create new storage
5199     self.proc.LogStep(3, steps_total, "allocate new storage")
5200     for idx, dev in enumerate(instance.disks):
5201       if idx not in self.op.disks:
5202         continue
5203       size = dev.size
5204       cfg.SetDiskID(dev, tgt_node)
5205       lv_names = [".disk%d_%s" % (idx, suf)
5206                   for suf in ["data", "meta"]]
5207       names = _GenerateUniqueNames(self, lv_names)
5208       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5209                              logical_id=(vgname, names[0]))
5210       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5211                              logical_id=(vgname, names[1]))
5212       new_lvs = [lv_data, lv_meta]
5213       old_lvs = dev.children
5214       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5215       info("creating new local storage on %s for %s" %
5216            (tgt_node, dev.iv_name))
5217       # we pass force_create=True to force the LVM creation
5218       for new_lv in new_lvs:
5219         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5220                         _GetInstanceInfoText(instance), False)
5221
5222     # Step: for each lv, detach+rename*2+attach
5223     self.proc.LogStep(4, steps_total, "change drbd configuration")
5224     for dev, old_lvs, new_lvs in iv_names.itervalues():
5225       info("detaching %s drbd from local storage" % dev.iv_name)
5226       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5227       msg = result.RemoteFailMsg()
5228       if msg:
5229         raise errors.OpExecError("Can't detach drbd from local storage on node"
5230                                  " %s for device %s: %s" %
5231                                  (tgt_node, dev.iv_name, msg))
5232       #dev.children = []
5233       #cfg.Update(instance)
5234
5235       # ok, we created the new LVs, so now we know we have the needed
5236       # storage; as such, we proceed on the target node to rename
5237       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5238       # using the assumption that logical_id == physical_id (which in
5239       # turn is the unique_id on that node)
5240
5241       # FIXME(iustin): use a better name for the replaced LVs
5242       temp_suffix = int(time.time())
5243       ren_fn = lambda d, suff: (d.physical_id[0],
5244                                 d.physical_id[1] + "_replaced-%s" % suff)
5245       # build the rename list based on what LVs exist on the node
5246       rlist = []
5247       for to_ren in old_lvs:
5248         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5249         if not result.RemoteFailMsg() and result.payload:
5250           # device exists
5251           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5252
5253       info("renaming the old LVs on the target node")
5254       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5255       msg = result.RemoteFailMsg()
5256       if msg:
5257         raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5258                                  (tgt_node, msg))
5259       # now we rename the new LVs to the old LVs
5260       info("renaming the new LVs on the target node")
5261       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5262       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5263       msg = result.RemoteFailMsg()
5264       if msg:
5265         raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5266                                  (tgt_node, msg))
5267
5268       for old, new in zip(old_lvs, new_lvs):
5269         new.logical_id = old.logical_id
5270         cfg.SetDiskID(new, tgt_node)
5271
5272       for disk in old_lvs:
5273         disk.logical_id = ren_fn(disk, temp_suffix)
5274         cfg.SetDiskID(disk, tgt_node)
5275
5276       # now that the new lvs have the old name, we can add them to the device
5277       info("adding new mirror component on %s" % tgt_node)
5278       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5279       msg = result.RemoteFailMsg()
5280       if msg:
5281         for new_lv in new_lvs:
5282           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5283           if msg:
5284             warning("Can't rollback device %s: %s", dev, msg,
5285                     hint="cleanup manually the unused logical volumes")
5286         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5287
5288       dev.children = new_lvs
5289       cfg.Update(instance)
5290
5291     # Step: wait for sync
5292
5293     # this can fail as the old devices are degraded and _WaitForSync
5294     # does a combined result over all disks, so we don't check its
5295     # return value
5296     self.proc.LogStep(5, steps_total, "sync devices")
5297     _WaitForSync(self, instance, unlock=True)
5298
5299     # so check manually all the devices
5300     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5301       cfg.SetDiskID(dev, instance.primary_node)
5302       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5303       msg = result.RemoteFailMsg()
5304       if not msg and not result.payload:
5305         msg = "disk not found"
5306       if msg:
5307         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5308                                  (name, msg))
5309       if result.payload[5]:
5310         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5311
5312     # Step: remove old storage
5313     self.proc.LogStep(6, steps_total, "removing old storage")
5314     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5315       info("remove logical volumes for %s" % name)
5316       for lv in old_lvs:
5317         cfg.SetDiskID(lv, tgt_node)
5318         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5319         if msg:
5320           warning("Can't remove old LV: %s" % msg,
5321                   hint="manually remove unused LVs")
5322           continue
5323
5324   def _ExecD8Secondary(self, feedback_fn):
5325     """Replace the secondary node for drbd8.
5326
5327     The algorithm for replace is quite complicated:
5328       - for all disks of the instance:
5329         - create new LVs on the new node with same names
5330         - shutdown the drbd device on the old secondary
5331         - disconnect the drbd network on the primary
5332         - create the drbd device on the new secondary
5333         - network attach the drbd on the primary, using an artifice:
5334           the drbd code for Attach() will connect to the network if it
5335           finds a device which is connected to the good local disks but
5336           not network enabled
5337       - wait for sync across all devices
5338       - remove all disks from the old secondary
5339
5340     Failures are not very well handled.
5341
5342     """
5343     steps_total = 6
5344     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5345     instance = self.instance
5346     iv_names = {}
5347     # start of work
5348     cfg = self.cfg
5349     old_node = self.tgt_node
5350     new_node = self.new_node
5351     pri_node = instance.primary_node
5352     nodes_ip = {
5353       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5354       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5355       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5356       }
5357
5358     # Step: check device activation
5359     self.proc.LogStep(1, steps_total, "check device existence")
5360     info("checking volume groups")
5361     my_vg = cfg.GetVGName()
5362     results = self.rpc.call_vg_list([pri_node, new_node])
5363     for node in pri_node, new_node:
5364       res = results[node]
5365       if res.failed or not res.data or my_vg not in res.data:
5366         raise errors.OpExecError("Volume group '%s' not found on %s" %
5367                                  (my_vg, node))
5368     for idx, dev in enumerate(instance.disks):
5369       if idx not in self.op.disks:
5370         continue
5371       info("checking disk/%d on %s" % (idx, pri_node))
5372       cfg.SetDiskID(dev, pri_node)
5373       result = self.rpc.call_blockdev_find(pri_node, dev)
5374       msg = result.RemoteFailMsg()
5375       if not msg and not result.payload:
5376         msg = "disk not found"
5377       if msg:
5378         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5379                                  (idx, pri_node, msg))
5380
5381     # Step: check other node consistency
5382     self.proc.LogStep(2, steps_total, "check peer consistency")
5383     for idx, dev in enumerate(instance.disks):
5384       if idx not in self.op.disks:
5385         continue
5386       info("checking disk/%d consistency on %s" % (idx, pri_node))
5387       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5388         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5389                                  " unsafe to replace the secondary" %
5390                                  pri_node)
5391
5392     # Step: create new storage
5393     self.proc.LogStep(3, steps_total, "allocate new storage")
5394     for idx, dev in enumerate(instance.disks):
5395       info("adding new local storage on %s for disk/%d" %
5396            (new_node, idx))
5397       # we pass force_create=True to force LVM creation
5398       for new_lv in dev.children:
5399         _CreateBlockDev(self, new_node, instance, new_lv, True,
5400                         _GetInstanceInfoText(instance), False)
5401
5402     # Step 4: dbrd minors and drbd setups changes
5403     # after this, we must manually remove the drbd minors on both the
5404     # error and the success paths
5405     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5406                                    instance.name)
5407     logging.debug("Allocated minors %s" % (minors,))
5408     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5409     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5410       size = dev.size
5411       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5412       # create new devices on new_node; note that we create two IDs:
5413       # one without port, so the drbd will be activated without
5414       # networking information on the new node at this stage, and one
5415       # with network, for the latter activation in step 4
5416       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5417       if pri_node == o_node1:
5418         p_minor = o_minor1
5419       else:
5420         p_minor = o_minor2
5421
5422       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5423       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5424
5425       iv_names[idx] = (dev, dev.children, new_net_id)
5426       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5427                     new_net_id)
5428       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5429                               logical_id=new_alone_id,
5430                               children=dev.children)
5431       try:
5432         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5433                               _GetInstanceInfoText(instance), False)
5434       except errors.GenericError:
5435         self.cfg.ReleaseDRBDMinors(instance.name)
5436         raise
5437
5438     for idx, dev in enumerate(instance.disks):
5439       # we have new devices, shutdown the drbd on the old secondary
5440       info("shutting down drbd for disk/%d on old node" % idx)
5441       cfg.SetDiskID(dev, old_node)
5442       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5443       if msg:
5444         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5445                 (idx, msg),
5446                 hint="Please cleanup this device manually as soon as possible")
5447
5448     info("detaching primary drbds from the network (=> standalone)")
5449     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5450                                                instance.disks)[pri_node]
5451
5452     msg = result.RemoteFailMsg()
5453     if msg:
5454       # detaches didn't succeed (unlikely)
5455       self.cfg.ReleaseDRBDMinors(instance.name)
5456       raise errors.OpExecError("Can't detach the disks from the network on"
5457                                " old node: %s" % (msg,))
5458
5459     # if we managed to detach at least one, we update all the disks of
5460     # the instance to point to the new secondary
5461     info("updating instance configuration")
5462     for dev, _, new_logical_id in iv_names.itervalues():
5463       dev.logical_id = new_logical_id
5464       cfg.SetDiskID(dev, pri_node)
5465     cfg.Update(instance)
5466
5467     # and now perform the drbd attach
5468     info("attaching primary drbds to new secondary (standalone => connected)")
5469     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5470                                            instance.disks, instance.name,
5471                                            False)
5472     for to_node, to_result in result.items():
5473       msg = to_result.RemoteFailMsg()
5474       if msg:
5475         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5476                 hint="please do a gnt-instance info to see the"
5477                 " status of disks")
5478
5479     # this can fail as the old devices are degraded and _WaitForSync
5480     # does a combined result over all disks, so we don't check its
5481     # return value
5482     self.proc.LogStep(5, steps_total, "sync devices")
5483     _WaitForSync(self, instance, unlock=True)
5484
5485     # so check manually all the devices
5486     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5487       cfg.SetDiskID(dev, pri_node)
5488       result = self.rpc.call_blockdev_find(pri_node, dev)
5489       msg = result.RemoteFailMsg()
5490       if not msg and not result.payload:
5491         msg = "disk not found"
5492       if msg:
5493         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5494                                  (idx, msg))
5495       if result.payload[5]:
5496         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5497
5498     self.proc.LogStep(6, steps_total, "removing old storage")
5499     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5500       info("remove logical volumes for disk/%d" % idx)
5501       for lv in old_lvs:
5502         cfg.SetDiskID(lv, old_node)
5503         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5504         if msg:
5505           warning("Can't remove LV on old secondary: %s", msg,
5506                   hint="Cleanup stale volumes by hand")
5507
5508   def Exec(self, feedback_fn):
5509     """Execute disk replacement.
5510
5511     This dispatches the disk replacement to the appropriate handler.
5512
5513     """
5514     instance = self.instance
5515
5516     # Activate the instance disks if we're replacing them on a down instance
5517     if not instance.admin_up:
5518       _StartInstanceDisks(self, instance, True)
5519
5520     if self.op.mode == constants.REPLACE_DISK_CHG:
5521       fn = self._ExecD8Secondary
5522     else:
5523       fn = self._ExecD8DiskOnly
5524
5525     ret = fn(feedback_fn)
5526
5527     # Deactivate the instance disks if we're replacing them on a down instance
5528     if not instance.admin_up:
5529       _SafeShutdownInstanceDisks(self, instance)
5530
5531     return ret
5532
5533
5534 class LUGrowDisk(LogicalUnit):
5535   """Grow a disk of an instance.
5536
5537   """
5538   HPATH = "disk-grow"
5539   HTYPE = constants.HTYPE_INSTANCE
5540   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5541   REQ_BGL = False
5542
5543   def ExpandNames(self):
5544     self._ExpandAndLockInstance()
5545     self.needed_locks[locking.LEVEL_NODE] = []
5546     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5547
5548   def DeclareLocks(self, level):
5549     if level == locking.LEVEL_NODE:
5550       self._LockInstancesNodes()
5551
5552   def BuildHooksEnv(self):
5553     """Build hooks env.
5554
5555     This runs on the master, the primary and all the secondaries.
5556
5557     """
5558     env = {
5559       "DISK": self.op.disk,
5560       "AMOUNT": self.op.amount,
5561       }
5562     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5563     nl = [
5564       self.cfg.GetMasterNode(),
5565       self.instance.primary_node,
5566       ]
5567     return env, nl, nl
5568
5569   def CheckPrereq(self):
5570     """Check prerequisites.
5571
5572     This checks that the instance is in the cluster.
5573
5574     """
5575     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5576     assert instance is not None, \
5577       "Cannot retrieve locked instance %s" % self.op.instance_name
5578     nodenames = list(instance.all_nodes)
5579     for node in nodenames:
5580       _CheckNodeOnline(self, node)
5581
5582
5583     self.instance = instance
5584
5585     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5586       raise errors.OpPrereqError("Instance's disk layout does not support"
5587                                  " growing.")
5588
5589     self.disk = instance.FindDisk(self.op.disk)
5590
5591     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5592                                        instance.hypervisor)
5593     for node in nodenames:
5594       info = nodeinfo[node]
5595       if info.failed or not info.data:
5596         raise errors.OpPrereqError("Cannot get current information"
5597                                    " from node '%s'" % node)
5598       vg_free = info.data.get('vg_free', None)
5599       if not isinstance(vg_free, int):
5600         raise errors.OpPrereqError("Can't compute free disk space on"
5601                                    " node %s" % node)
5602       if self.op.amount > vg_free:
5603         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5604                                    " %d MiB available, %d MiB required" %
5605                                    (node, vg_free, self.op.amount))
5606
5607   def Exec(self, feedback_fn):
5608     """Execute disk grow.
5609
5610     """
5611     instance = self.instance
5612     disk = self.disk
5613     for node in instance.all_nodes:
5614       self.cfg.SetDiskID(disk, node)
5615       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5616       msg = result.RemoteFailMsg()
5617       if msg:
5618         raise errors.OpExecError("Grow request failed to node %s: %s" %
5619                                  (node, msg))
5620     disk.RecordGrow(self.op.amount)
5621     self.cfg.Update(instance)
5622     if self.op.wait_for_sync:
5623       disk_abort = not _WaitForSync(self, instance)
5624       if disk_abort:
5625         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5626                              " status.\nPlease check the instance.")
5627
5628
5629 class LUQueryInstanceData(NoHooksLU):
5630   """Query runtime instance data.
5631
5632   """
5633   _OP_REQP = ["instances", "static"]
5634   REQ_BGL = False
5635
5636   def ExpandNames(self):
5637     self.needed_locks = {}
5638     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5639
5640     if not isinstance(self.op.instances, list):
5641       raise errors.OpPrereqError("Invalid argument type 'instances'")
5642
5643     if self.op.instances:
5644       self.wanted_names = []
5645       for name in self.op.instances:
5646         full_name = self.cfg.ExpandInstanceName(name)
5647         if full_name is None:
5648           raise errors.OpPrereqError("Instance '%s' not known" % name)
5649         self.wanted_names.append(full_name)
5650       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5651     else:
5652       self.wanted_names = None
5653       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5654
5655     self.needed_locks[locking.LEVEL_NODE] = []
5656     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5657
5658   def DeclareLocks(self, level):
5659     if level == locking.LEVEL_NODE:
5660       self._LockInstancesNodes()
5661
5662   def CheckPrereq(self):
5663     """Check prerequisites.
5664
5665     This only checks the optional instance list against the existing names.
5666
5667     """
5668     if self.wanted_names is None:
5669       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5670
5671     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5672                              in self.wanted_names]
5673     return
5674
5675   def _ComputeDiskStatus(self, instance, snode, dev):
5676     """Compute block device status.
5677
5678     """
5679     static = self.op.static
5680     if not static:
5681       self.cfg.SetDiskID(dev, instance.primary_node)
5682       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5683       if dev_pstatus.offline:
5684         dev_pstatus = None
5685       else:
5686         msg = dev_pstatus.RemoteFailMsg()
5687         if msg:
5688           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5689                                    (instance.name, msg))
5690         dev_pstatus = dev_pstatus.payload
5691     else:
5692       dev_pstatus = None
5693
5694     if dev.dev_type in constants.LDS_DRBD:
5695       # we change the snode then (otherwise we use the one passed in)
5696       if dev.logical_id[0] == instance.primary_node:
5697         snode = dev.logical_id[1]
5698       else:
5699         snode = dev.logical_id[0]
5700
5701     if snode and not static:
5702       self.cfg.SetDiskID(dev, snode)
5703       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5704       if dev_sstatus.offline:
5705         dev_sstatus = None
5706       else:
5707         msg = dev_sstatus.RemoteFailMsg()
5708         if msg:
5709           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5710                                    (instance.name, msg))
5711         dev_sstatus = dev_sstatus.payload
5712     else:
5713       dev_sstatus = None
5714
5715     if dev.children:
5716       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5717                       for child in dev.children]
5718     else:
5719       dev_children = []
5720
5721     data = {
5722       "iv_name": dev.iv_name,
5723       "dev_type": dev.dev_type,
5724       "logical_id": dev.logical_id,
5725       "physical_id": dev.physical_id,
5726       "pstatus": dev_pstatus,
5727       "sstatus": dev_sstatus,
5728       "children": dev_children,
5729       "mode": dev.mode,
5730       }
5731
5732     return data
5733
5734   def Exec(self, feedback_fn):
5735     """Gather and return data"""
5736     result = {}
5737
5738     cluster = self.cfg.GetClusterInfo()
5739
5740     for instance in self.wanted_instances:
5741       if not self.op.static:
5742         remote_info = self.rpc.call_instance_info(instance.primary_node,
5743                                                   instance.name,
5744                                                   instance.hypervisor)
5745         remote_info.Raise()
5746         remote_info = remote_info.data
5747         if remote_info and "state" in remote_info:
5748           remote_state = "up"
5749         else:
5750           remote_state = "down"
5751       else:
5752         remote_state = None
5753       if instance.admin_up:
5754         config_state = "up"
5755       else:
5756         config_state = "down"
5757
5758       disks = [self._ComputeDiskStatus(instance, None, device)
5759                for device in instance.disks]
5760
5761       idict = {
5762         "name": instance.name,
5763         "config_state": config_state,
5764         "run_state": remote_state,
5765         "pnode": instance.primary_node,
5766         "snodes": instance.secondary_nodes,
5767         "os": instance.os,
5768         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5769         "disks": disks,
5770         "hypervisor": instance.hypervisor,
5771         "network_port": instance.network_port,
5772         "hv_instance": instance.hvparams,
5773         "hv_actual": cluster.FillHV(instance),
5774         "be_instance": instance.beparams,
5775         "be_actual": cluster.FillBE(instance),
5776         }
5777
5778       result[instance.name] = idict
5779
5780     return result
5781
5782
5783 class LUSetInstanceParams(LogicalUnit):
5784   """Modifies an instances's parameters.
5785
5786   """
5787   HPATH = "instance-modify"
5788   HTYPE = constants.HTYPE_INSTANCE
5789   _OP_REQP = ["instance_name"]
5790   REQ_BGL = False
5791
5792   def CheckArguments(self):
5793     if not hasattr(self.op, 'nics'):
5794       self.op.nics = []
5795     if not hasattr(self.op, 'disks'):
5796       self.op.disks = []
5797     if not hasattr(self.op, 'beparams'):
5798       self.op.beparams = {}
5799     if not hasattr(self.op, 'hvparams'):
5800       self.op.hvparams = {}
5801     self.op.force = getattr(self.op, "force", False)
5802     if not (self.op.nics or self.op.disks or
5803             self.op.hvparams or self.op.beparams):
5804       raise errors.OpPrereqError("No changes submitted")
5805
5806     # Disk validation
5807     disk_addremove = 0
5808     for disk_op, disk_dict in self.op.disks:
5809       if disk_op == constants.DDM_REMOVE:
5810         disk_addremove += 1
5811         continue
5812       elif disk_op == constants.DDM_ADD:
5813         disk_addremove += 1
5814       else:
5815         if not isinstance(disk_op, int):
5816           raise errors.OpPrereqError("Invalid disk index")
5817       if disk_op == constants.DDM_ADD:
5818         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5819         if mode not in constants.DISK_ACCESS_SET:
5820           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5821         size = disk_dict.get('size', None)
5822         if size is None:
5823           raise errors.OpPrereqError("Required disk parameter size missing")
5824         try:
5825           size = int(size)
5826         except ValueError, err:
5827           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5828                                      str(err))
5829         disk_dict['size'] = size
5830       else:
5831         # modification of disk
5832         if 'size' in disk_dict:
5833           raise errors.OpPrereqError("Disk size change not possible, use"
5834                                      " grow-disk")
5835
5836     if disk_addremove > 1:
5837       raise errors.OpPrereqError("Only one disk add or remove operation"
5838                                  " supported at a time")
5839
5840     # NIC validation
5841     nic_addremove = 0
5842     for nic_op, nic_dict in self.op.nics:
5843       if nic_op == constants.DDM_REMOVE:
5844         nic_addremove += 1
5845         continue
5846       elif nic_op == constants.DDM_ADD:
5847         nic_addremove += 1
5848       else:
5849         if not isinstance(nic_op, int):
5850           raise errors.OpPrereqError("Invalid nic index")
5851
5852       # nic_dict should be a dict
5853       nic_ip = nic_dict.get('ip', None)
5854       if nic_ip is not None:
5855         if nic_ip.lower() == constants.VALUE_NONE:
5856           nic_dict['ip'] = None
5857         else:
5858           if not utils.IsValidIP(nic_ip):
5859             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5860
5861       if nic_op == constants.DDM_ADD:
5862         nic_bridge = nic_dict.get('bridge', None)
5863         if nic_bridge is None:
5864           nic_dict['bridge'] = self.cfg.GetDefBridge()
5865         nic_mac = nic_dict.get('mac', None)
5866         if nic_mac is None:
5867           nic_dict['mac'] = constants.VALUE_AUTO
5868
5869       if 'mac' in nic_dict:
5870         nic_mac = nic_dict['mac']
5871         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5872           if not utils.IsValidMac(nic_mac):
5873             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5874         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5875           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5876                                      " modifying an existing nic")
5877
5878     if nic_addremove > 1:
5879       raise errors.OpPrereqError("Only one NIC add or remove operation"
5880                                  " supported at a time")
5881
5882   def ExpandNames(self):
5883     self._ExpandAndLockInstance()
5884     self.needed_locks[locking.LEVEL_NODE] = []
5885     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5886
5887   def DeclareLocks(self, level):
5888     if level == locking.LEVEL_NODE:
5889       self._LockInstancesNodes()
5890
5891   def BuildHooksEnv(self):
5892     """Build hooks env.
5893
5894     This runs on the master, primary and secondaries.
5895
5896     """
5897     args = dict()
5898     if constants.BE_MEMORY in self.be_new:
5899       args['memory'] = self.be_new[constants.BE_MEMORY]
5900     if constants.BE_VCPUS in self.be_new:
5901       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5902     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5903     # information at all.
5904     if self.op.nics:
5905       args['nics'] = []
5906       nic_override = dict(self.op.nics)
5907       for idx, nic in enumerate(self.instance.nics):
5908         if idx in nic_override:
5909           this_nic_override = nic_override[idx]
5910         else:
5911           this_nic_override = {}
5912         if 'ip' in this_nic_override:
5913           ip = this_nic_override['ip']
5914         else:
5915           ip = nic.ip
5916         if 'bridge' in this_nic_override:
5917           bridge = this_nic_override['bridge']
5918         else:
5919           bridge = nic.bridge
5920         if 'mac' in this_nic_override:
5921           mac = this_nic_override['mac']
5922         else:
5923           mac = nic.mac
5924         args['nics'].append((ip, bridge, mac))
5925       if constants.DDM_ADD in nic_override:
5926         ip = nic_override[constants.DDM_ADD].get('ip', None)
5927         bridge = nic_override[constants.DDM_ADD]['bridge']
5928         mac = nic_override[constants.DDM_ADD]['mac']
5929         args['nics'].append((ip, bridge, mac))
5930       elif constants.DDM_REMOVE in nic_override:
5931         del args['nics'][-1]
5932
5933     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5934     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5935     return env, nl, nl
5936
5937   def CheckPrereq(self):
5938     """Check prerequisites.
5939
5940     This only checks the instance list against the existing names.
5941
5942     """
5943     force = self.force = self.op.force
5944
5945     # checking the new params on the primary/secondary nodes
5946
5947     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5948     assert self.instance is not None, \
5949       "Cannot retrieve locked instance %s" % self.op.instance_name
5950     pnode = instance.primary_node
5951     nodelist = list(instance.all_nodes)
5952
5953     # hvparams processing
5954     if self.op.hvparams:
5955       i_hvdict = copy.deepcopy(instance.hvparams)
5956       for key, val in self.op.hvparams.iteritems():
5957         if val == constants.VALUE_DEFAULT:
5958           try:
5959             del i_hvdict[key]
5960           except KeyError:
5961             pass
5962         else:
5963           i_hvdict[key] = val
5964       cluster = self.cfg.GetClusterInfo()
5965       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5966       hv_new = objects.FillDict(cluster.hvparams[instance.hypervisor],
5967                                 i_hvdict)
5968       # local check
5969       hypervisor.GetHypervisor(
5970         instance.hypervisor).CheckParameterSyntax(hv_new)
5971       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5972       self.hv_new = hv_new # the new actual values
5973       self.hv_inst = i_hvdict # the new dict (without defaults)
5974     else:
5975       self.hv_new = self.hv_inst = {}
5976
5977     # beparams processing
5978     if self.op.beparams:
5979       i_bedict = copy.deepcopy(instance.beparams)
5980       for key, val in self.op.beparams.iteritems():
5981         if val == constants.VALUE_DEFAULT:
5982           try:
5983             del i_bedict[key]
5984           except KeyError:
5985             pass
5986         else:
5987           i_bedict[key] = val
5988       cluster = self.cfg.GetClusterInfo()
5989       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5990       be_new = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5991                                 i_bedict)
5992       self.be_new = be_new # the new actual values
5993       self.be_inst = i_bedict # the new dict (without defaults)
5994     else:
5995       self.be_new = self.be_inst = {}
5996
5997     self.warn = []
5998
5999     if constants.BE_MEMORY in self.op.beparams and not self.force:
6000       mem_check_list = [pnode]
6001       if be_new[constants.BE_AUTO_BALANCE]:
6002         # either we changed auto_balance to yes or it was from before
6003         mem_check_list.extend(instance.secondary_nodes)
6004       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6005                                                   instance.hypervisor)
6006       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6007                                          instance.hypervisor)
6008       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6009         # Assume the primary node is unreachable and go ahead
6010         self.warn.append("Can't get info from primary node %s" % pnode)
6011       else:
6012         if not instance_info.failed and instance_info.data:
6013           current_mem = int(instance_info.data['memory'])
6014         else:
6015           # Assume instance not running
6016           # (there is a slight race condition here, but it's not very probable,
6017           # and we have no other way to check)
6018           current_mem = 0
6019         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6020                     nodeinfo[pnode].data['memory_free'])
6021         if miss_mem > 0:
6022           raise errors.OpPrereqError("This change will prevent the instance"
6023                                      " from starting, due to %d MB of memory"
6024                                      " missing on its primary node" % miss_mem)
6025
6026       if be_new[constants.BE_AUTO_BALANCE]:
6027         for node, nres in nodeinfo.iteritems():
6028           if node not in instance.secondary_nodes:
6029             continue
6030           if nres.failed or not isinstance(nres.data, dict):
6031             self.warn.append("Can't get info from secondary node %s" % node)
6032           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6033             self.warn.append("Not enough memory to failover instance to"
6034                              " secondary node %s" % node)
6035
6036     # NIC processing
6037     for nic_op, nic_dict in self.op.nics:
6038       if nic_op == constants.DDM_REMOVE:
6039         if not instance.nics:
6040           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6041         continue
6042       if nic_op != constants.DDM_ADD:
6043         # an existing nic
6044         if nic_op < 0 or nic_op >= len(instance.nics):
6045           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6046                                      " are 0 to %d" %
6047                                      (nic_op, len(instance.nics)))
6048       if 'bridge' in nic_dict:
6049         nic_bridge = nic_dict['bridge']
6050         if nic_bridge is None:
6051           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6052         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6053           msg = ("Bridge '%s' doesn't exist on one of"
6054                  " the instance nodes" % nic_bridge)
6055           if self.force:
6056             self.warn.append(msg)
6057           else:
6058             raise errors.OpPrereqError(msg)
6059       if 'mac' in nic_dict:
6060         nic_mac = nic_dict['mac']
6061         if nic_mac is None:
6062           raise errors.OpPrereqError('Cannot set the nic mac to None')
6063         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6064           # otherwise generate the mac
6065           nic_dict['mac'] = self.cfg.GenerateMAC()
6066         else:
6067           # or validate/reserve the current one
6068           if self.cfg.IsMacInUse(nic_mac):
6069             raise errors.OpPrereqError("MAC address %s already in use"
6070                                        " in cluster" % nic_mac)
6071
6072     # DISK processing
6073     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6074       raise errors.OpPrereqError("Disk operations not supported for"
6075                                  " diskless instances")
6076     for disk_op, disk_dict in self.op.disks:
6077       if disk_op == constants.DDM_REMOVE:
6078         if len(instance.disks) == 1:
6079           raise errors.OpPrereqError("Cannot remove the last disk of"
6080                                      " an instance")
6081         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6082         ins_l = ins_l[pnode]
6083         if ins_l.failed or not isinstance(ins_l.data, list):
6084           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6085         if instance.name in ins_l.data:
6086           raise errors.OpPrereqError("Instance is running, can't remove"
6087                                      " disks.")
6088
6089       if (disk_op == constants.DDM_ADD and
6090           len(instance.nics) >= constants.MAX_DISKS):
6091         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6092                                    " add more" % constants.MAX_DISKS)
6093       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6094         # an existing disk
6095         if disk_op < 0 or disk_op >= len(instance.disks):
6096           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6097                                      " are 0 to %d" %
6098                                      (disk_op, len(instance.disks)))
6099
6100     return
6101
6102   def Exec(self, feedback_fn):
6103     """Modifies an instance.
6104
6105     All parameters take effect only at the next restart of the instance.
6106
6107     """
6108     # Process here the warnings from CheckPrereq, as we don't have a
6109     # feedback_fn there.
6110     for warn in self.warn:
6111       feedback_fn("WARNING: %s" % warn)
6112
6113     result = []
6114     instance = self.instance
6115     # disk changes
6116     for disk_op, disk_dict in self.op.disks:
6117       if disk_op == constants.DDM_REMOVE:
6118         # remove the last disk
6119         device = instance.disks.pop()
6120         device_idx = len(instance.disks)
6121         for node, disk in device.ComputeNodeTree(instance.primary_node):
6122           self.cfg.SetDiskID(disk, node)
6123           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6124           if msg:
6125             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6126                             " continuing anyway", device_idx, node, msg)
6127         result.append(("disk/%d" % device_idx, "remove"))
6128       elif disk_op == constants.DDM_ADD:
6129         # add a new disk
6130         if instance.disk_template == constants.DT_FILE:
6131           file_driver, file_path = instance.disks[0].logical_id
6132           file_path = os.path.dirname(file_path)
6133         else:
6134           file_driver = file_path = None
6135         disk_idx_base = len(instance.disks)
6136         new_disk = _GenerateDiskTemplate(self,
6137                                          instance.disk_template,
6138                                          instance.name, instance.primary_node,
6139                                          instance.secondary_nodes,
6140                                          [disk_dict],
6141                                          file_path,
6142                                          file_driver,
6143                                          disk_idx_base)[0]
6144         instance.disks.append(new_disk)
6145         info = _GetInstanceInfoText(instance)
6146
6147         logging.info("Creating volume %s for instance %s",
6148                      new_disk.iv_name, instance.name)
6149         # Note: this needs to be kept in sync with _CreateDisks
6150         #HARDCODE
6151         for node in instance.all_nodes:
6152           f_create = node == instance.primary_node
6153           try:
6154             _CreateBlockDev(self, node, instance, new_disk,
6155                             f_create, info, f_create)
6156           except errors.OpExecError, err:
6157             self.LogWarning("Failed to create volume %s (%s) on"
6158                             " node %s: %s",
6159                             new_disk.iv_name, new_disk, node, err)
6160         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6161                        (new_disk.size, new_disk.mode)))
6162       else:
6163         # change a given disk
6164         instance.disks[disk_op].mode = disk_dict['mode']
6165         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6166     # NIC changes
6167     for nic_op, nic_dict in self.op.nics:
6168       if nic_op == constants.DDM_REMOVE:
6169         # remove the last nic
6170         del instance.nics[-1]
6171         result.append(("nic.%d" % len(instance.nics), "remove"))
6172       elif nic_op == constants.DDM_ADD:
6173         # mac and bridge should be set, by now
6174         mac = nic_dict['mac']
6175         bridge = nic_dict['bridge']
6176         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6177                               bridge=bridge)
6178         instance.nics.append(new_nic)
6179         result.append(("nic.%d" % (len(instance.nics) - 1),
6180                        "add:mac=%s,ip=%s,bridge=%s" %
6181                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6182       else:
6183         # change a given nic
6184         for key in 'mac', 'ip', 'bridge':
6185           if key in nic_dict:
6186             setattr(instance.nics[nic_op], key, nic_dict[key])
6187             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6188
6189     # hvparams changes
6190     if self.op.hvparams:
6191       instance.hvparams = self.hv_inst
6192       for key, val in self.op.hvparams.iteritems():
6193         result.append(("hv/%s" % key, val))
6194
6195     # beparams changes
6196     if self.op.beparams:
6197       instance.beparams = self.be_inst
6198       for key, val in self.op.beparams.iteritems():
6199         result.append(("be/%s" % key, val))
6200
6201     self.cfg.Update(instance)
6202
6203     return result
6204
6205
6206 class LUQueryExports(NoHooksLU):
6207   """Query the exports list
6208
6209   """
6210   _OP_REQP = ['nodes']
6211   REQ_BGL = False
6212
6213   def ExpandNames(self):
6214     self.needed_locks = {}
6215     self.share_locks[locking.LEVEL_NODE] = 1
6216     if not self.op.nodes:
6217       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6218     else:
6219       self.needed_locks[locking.LEVEL_NODE] = \
6220         _GetWantedNodes(self, self.op.nodes)
6221
6222   def CheckPrereq(self):
6223     """Check prerequisites.
6224
6225     """
6226     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6227
6228   def Exec(self, feedback_fn):
6229     """Compute the list of all the exported system images.
6230
6231     @rtype: dict
6232     @return: a dictionary with the structure node->(export-list)
6233         where export-list is a list of the instances exported on
6234         that node.
6235
6236     """
6237     rpcresult = self.rpc.call_export_list(self.nodes)
6238     result = {}
6239     for node in rpcresult:
6240       if rpcresult[node].failed:
6241         result[node] = False
6242       else:
6243         result[node] = rpcresult[node].data
6244
6245     return result
6246
6247
6248 class LUExportInstance(LogicalUnit):
6249   """Export an instance to an image in the cluster.
6250
6251   """
6252   HPATH = "instance-export"
6253   HTYPE = constants.HTYPE_INSTANCE
6254   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6255   REQ_BGL = False
6256
6257   def ExpandNames(self):
6258     self._ExpandAndLockInstance()
6259     # FIXME: lock only instance primary and destination node
6260     #
6261     # Sad but true, for now we have do lock all nodes, as we don't know where
6262     # the previous export might be, and and in this LU we search for it and
6263     # remove it from its current node. In the future we could fix this by:
6264     #  - making a tasklet to search (share-lock all), then create the new one,
6265     #    then one to remove, after
6266     #  - removing the removal operation altoghether
6267     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6268
6269   def DeclareLocks(self, level):
6270     """Last minute lock declaration."""
6271     # All nodes are locked anyway, so nothing to do here.
6272
6273   def BuildHooksEnv(self):
6274     """Build hooks env.
6275
6276     This will run on the master, primary node and target node.
6277
6278     """
6279     env = {
6280       "EXPORT_NODE": self.op.target_node,
6281       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6282       }
6283     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6284     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6285           self.op.target_node]
6286     return env, nl, nl
6287
6288   def CheckPrereq(self):
6289     """Check prerequisites.
6290
6291     This checks that the instance and node names are valid.
6292
6293     """
6294     instance_name = self.op.instance_name
6295     self.instance = self.cfg.GetInstanceInfo(instance_name)
6296     assert self.instance is not None, \
6297           "Cannot retrieve locked instance %s" % self.op.instance_name
6298     _CheckNodeOnline(self, self.instance.primary_node)
6299
6300     self.dst_node = self.cfg.GetNodeInfo(
6301       self.cfg.ExpandNodeName(self.op.target_node))
6302
6303     if self.dst_node is None:
6304       # This is wrong node name, not a non-locked node
6305       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6306     _CheckNodeOnline(self, self.dst_node.name)
6307     _CheckNodeNotDrained(self, self.dst_node.name)
6308
6309     # instance disk type verification
6310     for disk in self.instance.disks:
6311       if disk.dev_type == constants.LD_FILE:
6312         raise errors.OpPrereqError("Export not supported for instances with"
6313                                    " file-based disks")
6314
6315   def Exec(self, feedback_fn):
6316     """Export an instance to an image in the cluster.
6317
6318     """
6319     instance = self.instance
6320     dst_node = self.dst_node
6321     src_node = instance.primary_node
6322     if self.op.shutdown:
6323       # shutdown the instance, but not the disks
6324       result = self.rpc.call_instance_shutdown(src_node, instance)
6325       msg = result.RemoteFailMsg()
6326       if msg:
6327         raise errors.OpExecError("Could not shutdown instance %s on"
6328                                  " node %s: %s" %
6329                                  (instance.name, src_node, msg))
6330
6331     vgname = self.cfg.GetVGName()
6332
6333     snap_disks = []
6334
6335     # set the disks ID correctly since call_instance_start needs the
6336     # correct drbd minor to create the symlinks
6337     for disk in instance.disks:
6338       self.cfg.SetDiskID(disk, src_node)
6339
6340     try:
6341       for disk in instance.disks:
6342         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6343         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6344         if new_dev_name.failed or not new_dev_name.data:
6345           self.LogWarning("Could not snapshot block device %s on node %s",
6346                           disk.logical_id[1], src_node)
6347           snap_disks.append(False)
6348         else:
6349           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6350                                  logical_id=(vgname, new_dev_name.data),
6351                                  physical_id=(vgname, new_dev_name.data),
6352                                  iv_name=disk.iv_name)
6353           snap_disks.append(new_dev)
6354
6355     finally:
6356       if self.op.shutdown and instance.admin_up:
6357         result = self.rpc.call_instance_start(src_node, instance, None, None)
6358         msg = result.RemoteFailMsg()
6359         if msg:
6360           _ShutdownInstanceDisks(self, instance)
6361           raise errors.OpExecError("Could not start instance: %s" % msg)
6362
6363     # TODO: check for size
6364
6365     cluster_name = self.cfg.GetClusterName()
6366     for idx, dev in enumerate(snap_disks):
6367       if dev:
6368         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6369                                                instance, cluster_name, idx)
6370         if result.failed or not result.data:
6371           self.LogWarning("Could not export block device %s from node %s to"
6372                           " node %s", dev.logical_id[1], src_node,
6373                           dst_node.name)
6374         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6375         if msg:
6376           self.LogWarning("Could not remove snapshot block device %s from node"
6377                           " %s: %s", dev.logical_id[1], src_node, msg)
6378
6379     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6380     if result.failed or not result.data:
6381       self.LogWarning("Could not finalize export for instance %s on node %s",
6382                       instance.name, dst_node.name)
6383
6384     nodelist = self.cfg.GetNodeList()
6385     nodelist.remove(dst_node.name)
6386
6387     # on one-node clusters nodelist will be empty after the removal
6388     # if we proceed the backup would be removed because OpQueryExports
6389     # substitutes an empty list with the full cluster node list.
6390     if nodelist:
6391       exportlist = self.rpc.call_export_list(nodelist)
6392       for node in exportlist:
6393         if exportlist[node].failed:
6394           continue
6395         if instance.name in exportlist[node].data:
6396           if not self.rpc.call_export_remove(node, instance.name):
6397             self.LogWarning("Could not remove older export for instance %s"
6398                             " on node %s", instance.name, node)
6399
6400
6401 class LURemoveExport(NoHooksLU):
6402   """Remove exports related to the named instance.
6403
6404   """
6405   _OP_REQP = ["instance_name"]
6406   REQ_BGL = False
6407
6408   def ExpandNames(self):
6409     self.needed_locks = {}
6410     # We need all nodes to be locked in order for RemoveExport to work, but we
6411     # don't need to lock the instance itself, as nothing will happen to it (and
6412     # we can remove exports also for a removed instance)
6413     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6414
6415   def CheckPrereq(self):
6416     """Check prerequisites.
6417     """
6418     pass
6419
6420   def Exec(self, feedback_fn):
6421     """Remove any export.
6422
6423     """
6424     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6425     # If the instance was not found we'll try with the name that was passed in.
6426     # This will only work if it was an FQDN, though.
6427     fqdn_warn = False
6428     if not instance_name:
6429       fqdn_warn = True
6430       instance_name = self.op.instance_name
6431
6432     exportlist = self.rpc.call_export_list(self.acquired_locks[
6433       locking.LEVEL_NODE])
6434     found = False
6435     for node in exportlist:
6436       if exportlist[node].failed:
6437         self.LogWarning("Failed to query node %s, continuing" % node)
6438         continue
6439       if instance_name in exportlist[node].data:
6440         found = True
6441         result = self.rpc.call_export_remove(node, instance_name)
6442         if result.failed or not result.data:
6443           logging.error("Could not remove export for instance %s"
6444                         " on node %s", instance_name, node)
6445
6446     if fqdn_warn and not found:
6447       feedback_fn("Export not found. If trying to remove an export belonging"
6448                   " to a deleted instance please use its Fully Qualified"
6449                   " Domain Name.")
6450
6451
6452 class TagsLU(NoHooksLU):
6453   """Generic tags LU.
6454
6455   This is an abstract class which is the parent of all the other tags LUs.
6456
6457   """
6458
6459   def ExpandNames(self):
6460     self.needed_locks = {}
6461     if self.op.kind == constants.TAG_NODE:
6462       name = self.cfg.ExpandNodeName(self.op.name)
6463       if name is None:
6464         raise errors.OpPrereqError("Invalid node name (%s)" %
6465                                    (self.op.name,))
6466       self.op.name = name
6467       self.needed_locks[locking.LEVEL_NODE] = name
6468     elif self.op.kind == constants.TAG_INSTANCE:
6469       name = self.cfg.ExpandInstanceName(self.op.name)
6470       if name is None:
6471         raise errors.OpPrereqError("Invalid instance name (%s)" %
6472                                    (self.op.name,))
6473       self.op.name = name
6474       self.needed_locks[locking.LEVEL_INSTANCE] = name
6475
6476   def CheckPrereq(self):
6477     """Check prerequisites.
6478
6479     """
6480     if self.op.kind == constants.TAG_CLUSTER:
6481       self.target = self.cfg.GetClusterInfo()
6482     elif self.op.kind == constants.TAG_NODE:
6483       self.target = self.cfg.GetNodeInfo(self.op.name)
6484     elif self.op.kind == constants.TAG_INSTANCE:
6485       self.target = self.cfg.GetInstanceInfo(self.op.name)
6486     else:
6487       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6488                                  str(self.op.kind))
6489
6490
6491 class LUGetTags(TagsLU):
6492   """Returns the tags of a given object.
6493
6494   """
6495   _OP_REQP = ["kind", "name"]
6496   REQ_BGL = False
6497
6498   def Exec(self, feedback_fn):
6499     """Returns the tag list.
6500
6501     """
6502     return list(self.target.GetTags())
6503
6504
6505 class LUSearchTags(NoHooksLU):
6506   """Searches the tags for a given pattern.
6507
6508   """
6509   _OP_REQP = ["pattern"]
6510   REQ_BGL = False
6511
6512   def ExpandNames(self):
6513     self.needed_locks = {}
6514
6515   def CheckPrereq(self):
6516     """Check prerequisites.
6517
6518     This checks the pattern passed for validity by compiling it.
6519
6520     """
6521     try:
6522       self.re = re.compile(self.op.pattern)
6523     except re.error, err:
6524       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6525                                  (self.op.pattern, err))
6526
6527   def Exec(self, feedback_fn):
6528     """Returns the tag list.
6529
6530     """
6531     cfg = self.cfg
6532     tgts = [("/cluster", cfg.GetClusterInfo())]
6533     ilist = cfg.GetAllInstancesInfo().values()
6534     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6535     nlist = cfg.GetAllNodesInfo().values()
6536     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6537     results = []
6538     for path, target in tgts:
6539       for tag in target.GetTags():
6540         if self.re.search(tag):
6541           results.append((path, tag))
6542     return results
6543
6544
6545 class LUAddTags(TagsLU):
6546   """Sets a tag on a given object.
6547
6548   """
6549   _OP_REQP = ["kind", "name", "tags"]
6550   REQ_BGL = False
6551
6552   def CheckPrereq(self):
6553     """Check prerequisites.
6554
6555     This checks the type and length of the tag name and value.
6556
6557     """
6558     TagsLU.CheckPrereq(self)
6559     for tag in self.op.tags:
6560       objects.TaggableObject.ValidateTag(tag)
6561
6562   def Exec(self, feedback_fn):
6563     """Sets the tag.
6564
6565     """
6566     try:
6567       for tag in self.op.tags:
6568         self.target.AddTag(tag)
6569     except errors.TagError, err:
6570       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6571     try:
6572       self.cfg.Update(self.target)
6573     except errors.ConfigurationError:
6574       raise errors.OpRetryError("There has been a modification to the"
6575                                 " config file and the operation has been"
6576                                 " aborted. Please retry.")
6577
6578
6579 class LUDelTags(TagsLU):
6580   """Delete a list of tags from a given object.
6581
6582   """
6583   _OP_REQP = ["kind", "name", "tags"]
6584   REQ_BGL = False
6585
6586   def CheckPrereq(self):
6587     """Check prerequisites.
6588
6589     This checks that we have the given tag.
6590
6591     """
6592     TagsLU.CheckPrereq(self)
6593     for tag in self.op.tags:
6594       objects.TaggableObject.ValidateTag(tag)
6595     del_tags = frozenset(self.op.tags)
6596     cur_tags = self.target.GetTags()
6597     if not del_tags <= cur_tags:
6598       diff_tags = del_tags - cur_tags
6599       diff_names = ["'%s'" % tag for tag in diff_tags]
6600       diff_names.sort()
6601       raise errors.OpPrereqError("Tag(s) %s not found" %
6602                                  (",".join(diff_names)))
6603
6604   def Exec(self, feedback_fn):
6605     """Remove the tag from the object.
6606
6607     """
6608     for tag in self.op.tags:
6609       self.target.RemoveTag(tag)
6610     try:
6611       self.cfg.Update(self.target)
6612     except errors.ConfigurationError:
6613       raise errors.OpRetryError("There has been a modification to the"
6614                                 " config file and the operation has been"
6615                                 " aborted. Please retry.")
6616
6617
6618 class LUTestDelay(NoHooksLU):
6619   """Sleep for a specified amount of time.
6620
6621   This LU sleeps on the master and/or nodes for a specified amount of
6622   time.
6623
6624   """
6625   _OP_REQP = ["duration", "on_master", "on_nodes"]
6626   REQ_BGL = False
6627
6628   def ExpandNames(self):
6629     """Expand names and set required locks.
6630
6631     This expands the node list, if any.
6632
6633     """
6634     self.needed_locks = {}
6635     if self.op.on_nodes:
6636       # _GetWantedNodes can be used here, but is not always appropriate to use
6637       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6638       # more information.
6639       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6640       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6641
6642   def CheckPrereq(self):
6643     """Check prerequisites.
6644
6645     """
6646
6647   def Exec(self, feedback_fn):
6648     """Do the actual sleep.
6649
6650     """
6651     if self.op.on_master:
6652       if not utils.TestDelay(self.op.duration):
6653         raise errors.OpExecError("Error during master delay test")
6654     if self.op.on_nodes:
6655       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6656       if not result:
6657         raise errors.OpExecError("Complete failure from rpc call")
6658       for node, node_result in result.items():
6659         node_result.Raise()
6660         if not node_result.data:
6661           raise errors.OpExecError("Failure during rpc call to node %s,"
6662                                    " result: %s" % (node, node_result.data))
6663
6664
6665 class IAllocator(object):
6666   """IAllocator framework.
6667
6668   An IAllocator instance has three sets of attributes:
6669     - cfg that is needed to query the cluster
6670     - input data (all members of the _KEYS class attribute are required)
6671     - four buffer attributes (in|out_data|text), that represent the
6672       input (to the external script) in text and data structure format,
6673       and the output from it, again in two formats
6674     - the result variables from the script (success, info, nodes) for
6675       easy usage
6676
6677   """
6678   _ALLO_KEYS = [
6679     "mem_size", "disks", "disk_template",
6680     "os", "tags", "nics", "vcpus", "hypervisor",
6681     ]
6682   _RELO_KEYS = [
6683     "relocate_from",
6684     ]
6685
6686   def __init__(self, lu, mode, name, **kwargs):
6687     self.lu = lu
6688     # init buffer variables
6689     self.in_text = self.out_text = self.in_data = self.out_data = None
6690     # init all input fields so that pylint is happy
6691     self.mode = mode
6692     self.name = name
6693     self.mem_size = self.disks = self.disk_template = None
6694     self.os = self.tags = self.nics = self.vcpus = None
6695     self.hypervisor = None
6696     self.relocate_from = None
6697     # computed fields
6698     self.required_nodes = None
6699     # init result fields
6700     self.success = self.info = self.nodes = None
6701     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6702       keyset = self._ALLO_KEYS
6703     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6704       keyset = self._RELO_KEYS
6705     else:
6706       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6707                                    " IAllocator" % self.mode)
6708     for key in kwargs:
6709       if key not in keyset:
6710         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6711                                      " IAllocator" % key)
6712       setattr(self, key, kwargs[key])
6713     for key in keyset:
6714       if key not in kwargs:
6715         raise errors.ProgrammerError("Missing input parameter '%s' to"
6716                                      " IAllocator" % key)
6717     self._BuildInputData()
6718
6719   def _ComputeClusterData(self):
6720     """Compute the generic allocator input data.
6721
6722     This is the data that is independent of the actual operation.
6723
6724     """
6725     cfg = self.lu.cfg
6726     cluster_info = cfg.GetClusterInfo()
6727     # cluster data
6728     data = {
6729       "version": constants.IALLOCATOR_VERSION,
6730       "cluster_name": cfg.GetClusterName(),
6731       "cluster_tags": list(cluster_info.GetTags()),
6732       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6733       # we don't have job IDs
6734       }
6735     iinfo = cfg.GetAllInstancesInfo().values()
6736     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6737
6738     # node data
6739     node_results = {}
6740     node_list = cfg.GetNodeList()
6741
6742     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6743       hypervisor_name = self.hypervisor
6744     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6745       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6746
6747     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6748                                            hypervisor_name)
6749     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6750                        cluster_info.enabled_hypervisors)
6751     for nname, nresult in node_data.items():
6752       # first fill in static (config-based) values
6753       ninfo = cfg.GetNodeInfo(nname)
6754       pnr = {
6755         "tags": list(ninfo.GetTags()),
6756         "primary_ip": ninfo.primary_ip,
6757         "secondary_ip": ninfo.secondary_ip,
6758         "offline": ninfo.offline,
6759         "drained": ninfo.drained,
6760         "master_candidate": ninfo.master_candidate,
6761         }
6762
6763       if not ninfo.offline:
6764         nresult.Raise()
6765         if not isinstance(nresult.data, dict):
6766           raise errors.OpExecError("Can't get data for node %s" % nname)
6767         remote_info = nresult.data
6768         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6769                      'vg_size', 'vg_free', 'cpu_total']:
6770           if attr not in remote_info:
6771             raise errors.OpExecError("Node '%s' didn't return attribute"
6772                                      " '%s'" % (nname, attr))
6773           try:
6774             remote_info[attr] = int(remote_info[attr])
6775           except ValueError, err:
6776             raise errors.OpExecError("Node '%s' returned invalid value"
6777                                      " for '%s': %s" % (nname, attr, err))
6778         # compute memory used by primary instances
6779         i_p_mem = i_p_up_mem = 0
6780         for iinfo, beinfo in i_list:
6781           if iinfo.primary_node == nname:
6782             i_p_mem += beinfo[constants.BE_MEMORY]
6783             if iinfo.name not in node_iinfo[nname].data:
6784               i_used_mem = 0
6785             else:
6786               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6787             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6788             remote_info['memory_free'] -= max(0, i_mem_diff)
6789
6790             if iinfo.admin_up:
6791               i_p_up_mem += beinfo[constants.BE_MEMORY]
6792
6793         # compute memory used by instances
6794         pnr_dyn = {
6795           "total_memory": remote_info['memory_total'],
6796           "reserved_memory": remote_info['memory_dom0'],
6797           "free_memory": remote_info['memory_free'],
6798           "total_disk": remote_info['vg_size'],
6799           "free_disk": remote_info['vg_free'],
6800           "total_cpus": remote_info['cpu_total'],
6801           "i_pri_memory": i_p_mem,
6802           "i_pri_up_memory": i_p_up_mem,
6803           }
6804         pnr.update(pnr_dyn)
6805
6806       node_results[nname] = pnr
6807     data["nodes"] = node_results
6808
6809     # instance data
6810     instance_data = {}
6811     for iinfo, beinfo in i_list:
6812       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6813                   for n in iinfo.nics]
6814       pir = {
6815         "tags": list(iinfo.GetTags()),
6816         "admin_up": iinfo.admin_up,
6817         "vcpus": beinfo[constants.BE_VCPUS],
6818         "memory": beinfo[constants.BE_MEMORY],
6819         "os": iinfo.os,
6820         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6821         "nics": nic_data,
6822         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6823         "disk_template": iinfo.disk_template,
6824         "hypervisor": iinfo.hypervisor,
6825         }
6826       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6827                                                  pir["disks"])
6828       instance_data[iinfo.name] = pir
6829
6830     data["instances"] = instance_data
6831
6832     self.in_data = data
6833
6834   def _AddNewInstance(self):
6835     """Add new instance data to allocator structure.
6836
6837     This in combination with _AllocatorGetClusterData will create the
6838     correct structure needed as input for the allocator.
6839
6840     The checks for the completeness of the opcode must have already been
6841     done.
6842
6843     """
6844     data = self.in_data
6845
6846     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6847
6848     if self.disk_template in constants.DTS_NET_MIRROR:
6849       self.required_nodes = 2
6850     else:
6851       self.required_nodes = 1
6852     request = {
6853       "type": "allocate",
6854       "name": self.name,
6855       "disk_template": self.disk_template,
6856       "tags": self.tags,
6857       "os": self.os,
6858       "vcpus": self.vcpus,
6859       "memory": self.mem_size,
6860       "disks": self.disks,
6861       "disk_space_total": disk_space,
6862       "nics": self.nics,
6863       "required_nodes": self.required_nodes,
6864       }
6865     data["request"] = request
6866
6867   def _AddRelocateInstance(self):
6868     """Add relocate instance data to allocator structure.
6869
6870     This in combination with _IAllocatorGetClusterData will create the
6871     correct structure needed as input for the allocator.
6872
6873     The checks for the completeness of the opcode must have already been
6874     done.
6875
6876     """
6877     instance = self.lu.cfg.GetInstanceInfo(self.name)
6878     if instance is None:
6879       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6880                                    " IAllocator" % self.name)
6881
6882     if instance.disk_template not in constants.DTS_NET_MIRROR:
6883       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6884
6885     if len(instance.secondary_nodes) != 1:
6886       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6887
6888     self.required_nodes = 1
6889     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6890     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6891
6892     request = {
6893       "type": "relocate",
6894       "name": self.name,
6895       "disk_space_total": disk_space,
6896       "required_nodes": self.required_nodes,
6897       "relocate_from": self.relocate_from,
6898       }
6899     self.in_data["request"] = request
6900
6901   def _BuildInputData(self):
6902     """Build input data structures.
6903
6904     """
6905     self._ComputeClusterData()
6906
6907     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6908       self._AddNewInstance()
6909     else:
6910       self._AddRelocateInstance()
6911
6912     self.in_text = serializer.Dump(self.in_data)
6913
6914   def Run(self, name, validate=True, call_fn=None):
6915     """Run an instance allocator and return the results.
6916
6917     """
6918     if call_fn is None:
6919       call_fn = self.lu.rpc.call_iallocator_runner
6920     data = self.in_text
6921
6922     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6923     result.Raise()
6924
6925     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6926       raise errors.OpExecError("Invalid result from master iallocator runner")
6927
6928     rcode, stdout, stderr, fail = result.data
6929
6930     if rcode == constants.IARUN_NOTFOUND:
6931       raise errors.OpExecError("Can't find allocator '%s'" % name)
6932     elif rcode == constants.IARUN_FAILURE:
6933       raise errors.OpExecError("Instance allocator call failed: %s,"
6934                                " output: %s" % (fail, stdout+stderr))
6935     self.out_text = stdout
6936     if validate:
6937       self._ValidateResult()
6938
6939   def _ValidateResult(self):
6940     """Process the allocator results.
6941
6942     This will process and if successful save the result in
6943     self.out_data and the other parameters.
6944
6945     """
6946     try:
6947       rdict = serializer.Load(self.out_text)
6948     except Exception, err:
6949       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6950
6951     if not isinstance(rdict, dict):
6952       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6953
6954     for key in "success", "info", "nodes":
6955       if key not in rdict:
6956         raise errors.OpExecError("Can't parse iallocator results:"
6957                                  " missing key '%s'" % key)
6958       setattr(self, key, rdict[key])
6959
6960     if not isinstance(rdict["nodes"], list):
6961       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6962                                " is not a list")
6963     self.out_data = rdict
6964
6965
6966 class LUTestAllocator(NoHooksLU):
6967   """Run allocator tests.
6968
6969   This LU runs the allocator tests
6970
6971   """
6972   _OP_REQP = ["direction", "mode", "name"]
6973
6974   def CheckPrereq(self):
6975     """Check prerequisites.
6976
6977     This checks the opcode parameters depending on the director and mode test.
6978
6979     """
6980     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6981       for attr in ["name", "mem_size", "disks", "disk_template",
6982                    "os", "tags", "nics", "vcpus"]:
6983         if not hasattr(self.op, attr):
6984           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6985                                      attr)
6986       iname = self.cfg.ExpandInstanceName(self.op.name)
6987       if iname is not None:
6988         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6989                                    iname)
6990       if not isinstance(self.op.nics, list):
6991         raise errors.OpPrereqError("Invalid parameter 'nics'")
6992       for row in self.op.nics:
6993         if (not isinstance(row, dict) or
6994             "mac" not in row or
6995             "ip" not in row or
6996             "bridge" not in row):
6997           raise errors.OpPrereqError("Invalid contents of the"
6998                                      " 'nics' parameter")
6999       if not isinstance(self.op.disks, list):
7000         raise errors.OpPrereqError("Invalid parameter 'disks'")
7001       for row in self.op.disks:
7002         if (not isinstance(row, dict) or
7003             "size" not in row or
7004             not isinstance(row["size"], int) or
7005             "mode" not in row or
7006             row["mode"] not in ['r', 'w']):
7007           raise errors.OpPrereqError("Invalid contents of the"
7008                                      " 'disks' parameter")
7009       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7010         self.op.hypervisor = self.cfg.GetHypervisorType()
7011     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7012       if not hasattr(self.op, "name"):
7013         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7014       fname = self.cfg.ExpandInstanceName(self.op.name)
7015       if fname is None:
7016         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7017                                    self.op.name)
7018       self.op.name = fname
7019       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7020     else:
7021       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7022                                  self.op.mode)
7023
7024     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7025       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7026         raise errors.OpPrereqError("Missing allocator name")
7027     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7028       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7029                                  self.op.direction)
7030
7031   def Exec(self, feedback_fn):
7032     """Run the allocator test.
7033
7034     """
7035     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7036       ial = IAllocator(self,
7037                        mode=self.op.mode,
7038                        name=self.op.name,
7039                        mem_size=self.op.mem_size,
7040                        disks=self.op.disks,
7041                        disk_template=self.op.disk_template,
7042                        os=self.op.os,
7043                        tags=self.op.tags,
7044                        nics=self.op.nics,
7045                        vcpus=self.op.vcpus,
7046                        hypervisor=self.op.hypervisor,
7047                        )
7048     else:
7049       ial = IAllocator(self,
7050                        mode=self.op.mode,
7051                        name=self.op.name,
7052                        relocate_from=list(self.relocate_from),
7053                        )
7054
7055     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7056       result = ial.in_text
7057     else:
7058       ial.Run(self.op.allocator, validate=False)
7059       result = ial.out_text
7060     return result