Remove some superfluous imports
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, bridge, mac) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
509       env["INSTANCE_NIC%d_MAC" % idx] = mac
510   else:
511     nic_count = 0
512
513   env["INSTANCE_NIC_COUNT"] = nic_count
514
515   if disks:
516     disk_count = len(disks)
517     for idx, (size, mode) in enumerate(disks):
518       env["INSTANCE_DISK%d_SIZE" % idx] = size
519       env["INSTANCE_DISK%d_MODE" % idx] = mode
520   else:
521     disk_count = 0
522
523   env["INSTANCE_DISK_COUNT"] = disk_count
524
525   return env
526
527
528 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
529   """Builds instance related env variables for hooks from an object.
530
531   @type lu: L{LogicalUnit}
532   @param lu: the logical unit on whose behalf we execute
533   @type instance: L{objects.Instance}
534   @param instance: the instance for which we should build the
535       environment
536   @type override: dict
537   @param override: dictionary with key/values that will override
538       our values
539   @rtype: dict
540   @return: the hook environment dictionary
541
542   """
543   bep = lu.cfg.GetClusterInfo().FillBE(instance)
544   args = {
545     'name': instance.name,
546     'primary_node': instance.primary_node,
547     'secondary_nodes': instance.secondary_nodes,
548     'os_type': instance.os,
549     'status': instance.admin_up,
550     'memory': bep[constants.BE_MEMORY],
551     'vcpus': bep[constants.BE_VCPUS],
552     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
553     'disk_template': instance.disk_template,
554     'disks': [(disk.size, disk.mode) for disk in instance.disks],
555   }
556   if override:
557     args.update(override)
558   return _BuildInstanceHookEnv(**args)
559
560
561 def _AdjustCandidatePool(lu):
562   """Adjust the candidate pool after node operations.
563
564   """
565   mod_list = lu.cfg.MaintainCandidatePool()
566   if mod_list:
567     lu.LogInfo("Promoted nodes to master candidate role: %s",
568                ", ".join(node.name for node in mod_list))
569     for name in mod_list:
570       lu.context.ReaddNode(name)
571   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
572   if mc_now > mc_max:
573     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
574                (mc_now, mc_max))
575
576
577 def _CheckInstanceBridgesExist(lu, instance):
578   """Check that the brigdes needed by an instance exist.
579
580   """
581   # check bridges existance
582   brlist = [nic.bridge for nic in instance.nics]
583   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
584   result.Raise()
585   if not result.data:
586     raise errors.OpPrereqError("One or more target bridges %s does not"
587                                " exist on destination node '%s'" %
588                                (brlist, instance.primary_node))
589
590
591 class LUDestroyCluster(NoHooksLU):
592   """Logical unit for destroying the cluster.
593
594   """
595   _OP_REQP = []
596
597   def CheckPrereq(self):
598     """Check prerequisites.
599
600     This checks whether the cluster is empty.
601
602     Any errors are signalled by raising errors.OpPrereqError.
603
604     """
605     master = self.cfg.GetMasterNode()
606
607     nodelist = self.cfg.GetNodeList()
608     if len(nodelist) != 1 or nodelist[0] != master:
609       raise errors.OpPrereqError("There are still %d node(s) in"
610                                  " this cluster." % (len(nodelist) - 1))
611     instancelist = self.cfg.GetInstanceList()
612     if instancelist:
613       raise errors.OpPrereqError("There are still %d instance(s) in"
614                                  " this cluster." % len(instancelist))
615
616   def Exec(self, feedback_fn):
617     """Destroys the cluster.
618
619     """
620     master = self.cfg.GetMasterNode()
621     result = self.rpc.call_node_stop_master(master, False)
622     result.Raise()
623     if not result.data:
624       raise errors.OpExecError("Could not disable the master role")
625     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
626     utils.CreateBackup(priv_key)
627     utils.CreateBackup(pub_key)
628     return master
629
630
631 class LUVerifyCluster(LogicalUnit):
632   """Verifies the cluster status.
633
634   """
635   HPATH = "cluster-verify"
636   HTYPE = constants.HTYPE_CLUSTER
637   _OP_REQP = ["skip_checks"]
638   REQ_BGL = False
639
640   def ExpandNames(self):
641     self.needed_locks = {
642       locking.LEVEL_NODE: locking.ALL_SET,
643       locking.LEVEL_INSTANCE: locking.ALL_SET,
644     }
645     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
646
647   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
648                   node_result, feedback_fn, master_files,
649                   drbd_map, vg_name):
650     """Run multiple tests against a node.
651
652     Test list:
653
654       - compares ganeti version
655       - checks vg existance and size > 20G
656       - checks config file checksum
657       - checks ssh to other nodes
658
659     @type nodeinfo: L{objects.Node}
660     @param nodeinfo: the node to check
661     @param file_list: required list of files
662     @param local_cksum: dictionary of local files and their checksums
663     @param node_result: the results from the node
664     @param feedback_fn: function used to accumulate results
665     @param master_files: list of files that only masters should have
666     @param drbd_map: the useddrbd minors for this node, in
667         form of minor: (instance, must_exist) which correspond to instances
668         and their running status
669     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
670
671     """
672     node = nodeinfo.name
673
674     # main result, node_result should be a non-empty dict
675     if not node_result or not isinstance(node_result, dict):
676       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
677       return True
678
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     remote_version = node_result.get('version', None)
682     if not (remote_version and isinstance(remote_version, (list, tuple)) and
683             len(remote_version) == 2):
684       feedback_fn("  - ERROR: connection to %s failed" % (node))
685       return True
686
687     if local_version != remote_version[0]:
688       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
689                   " node %s %s" % (local_version, node, remote_version[0]))
690       return True
691
692     # node seems compatible, we can actually try to look into its results
693
694     bad = False
695
696     # full package version
697     if constants.RELEASE_VERSION != remote_version[1]:
698       feedback_fn("  - WARNING: software version mismatch: master %s,"
699                   " node %s %s" %
700                   (constants.RELEASE_VERSION, node, remote_version[1]))
701
702     # checks vg existence and size > 20G
703     if vg_name is not None:
704       vglist = node_result.get(constants.NV_VGLIST, None)
705       if not vglist:
706         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
707                         (node,))
708         bad = True
709       else:
710         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
711                                               constants.MIN_VG_SIZE)
712         if vgstatus:
713           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
714           bad = True
715
716     # checks config file checksum
717
718     remote_cksum = node_result.get(constants.NV_FILELIST, None)
719     if not isinstance(remote_cksum, dict):
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned file checksum data")
722     else:
723       for file_name in file_list:
724         node_is_mc = nodeinfo.master_candidate
725         must_have_file = file_name not in master_files
726         if file_name not in remote_cksum:
727           if node_is_mc or must_have_file:
728             bad = True
729             feedback_fn("  - ERROR: file '%s' missing" % file_name)
730         elif remote_cksum[file_name] != local_cksum[file_name]:
731           if node_is_mc or must_have_file:
732             bad = True
733             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
734           else:
735             # not candidate and this is not a must-have file
736             bad = True
737             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
738                         " '%s'" % file_name)
739         else:
740           # all good, except non-master/non-must have combination
741           if not node_is_mc and not must_have_file:
742             feedback_fn("  - ERROR: file '%s' should not exist on non master"
743                         " candidates" % file_name)
744
745     # checks ssh to any
746
747     if constants.NV_NODELIST not in node_result:
748       bad = True
749       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
750     else:
751       if node_result[constants.NV_NODELIST]:
752         bad = True
753         for node in node_result[constants.NV_NODELIST]:
754           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
755                           (node, node_result[constants.NV_NODELIST][node]))
756
757     if constants.NV_NODENETTEST not in node_result:
758       bad = True
759       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
760     else:
761       if node_result[constants.NV_NODENETTEST]:
762         bad = True
763         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
764         for node in nlist:
765           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
766                           (node, node_result[constants.NV_NODENETTEST][node]))
767
768     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
769     if isinstance(hyp_result, dict):
770       for hv_name, hv_result in hyp_result.iteritems():
771         if hv_result is not None:
772           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
773                       (hv_name, hv_result))
774
775     # check used drbd list
776     if vg_name is not None:
777       used_minors = node_result.get(constants.NV_DRBDLIST, [])
778       if not isinstance(used_minors, (tuple, list)):
779         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
780                     str(used_minors))
781       else:
782         for minor, (iname, must_exist) in drbd_map.items():
783           if minor not in used_minors and must_exist:
784             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
785                         " not active" % (minor, iname))
786             bad = True
787         for minor in used_minors:
788           if minor not in drbd_map:
789             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
790                         minor)
791             bad = True
792
793     return bad
794
795   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
796                       node_instance, feedback_fn, n_offline):
797     """Verify an instance.
798
799     This function checks to see if the required block devices are
800     available on the instance's node.
801
802     """
803     bad = False
804
805     node_current = instanceconfig.primary_node
806
807     node_vol_should = {}
808     instanceconfig.MapLVsByNode(node_vol_should)
809
810     for node in node_vol_should:
811       if node in n_offline:
812         # ignore missing volumes on offline nodes
813         continue
814       for volume in node_vol_should[node]:
815         if node not in node_vol_is or volume not in node_vol_is[node]:
816           feedback_fn("  - ERROR: volume %s missing on node %s" %
817                           (volume, node))
818           bad = True
819
820     if instanceconfig.admin_up:
821       if ((node_current not in node_instance or
822           not instance in node_instance[node_current]) and
823           node_current not in n_offline):
824         feedback_fn("  - ERROR: instance %s not running on node %s" %
825                         (instance, node_current))
826         bad = True
827
828     for node in node_instance:
829       if (not node == node_current):
830         if instance in node_instance[node]:
831           feedback_fn("  - ERROR: instance %s should not run on node %s" %
832                           (instance, node))
833           bad = True
834
835     return bad
836
837   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
838     """Verify if there are any unknown volumes in the cluster.
839
840     The .os, .swap and backup volumes are ignored. All other volumes are
841     reported as unknown.
842
843     """
844     bad = False
845
846     for node in node_vol_is:
847       for volume in node_vol_is[node]:
848         if node not in node_vol_should or volume not in node_vol_should[node]:
849           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
850                       (volume, node))
851           bad = True
852     return bad
853
854   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
855     """Verify the list of running instances.
856
857     This checks what instances are running but unknown to the cluster.
858
859     """
860     bad = False
861     for node in node_instance:
862       for runninginstance in node_instance[node]:
863         if runninginstance not in instancelist:
864           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
865                           (runninginstance, node))
866           bad = True
867     return bad
868
869   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
870     """Verify N+1 Memory Resilience.
871
872     Check that if one single node dies we can still start all the instances it
873     was primary for.
874
875     """
876     bad = False
877
878     for node, nodeinfo in node_info.iteritems():
879       # This code checks that every node which is now listed as secondary has
880       # enough memory to host all instances it is supposed to should a single
881       # other node in the cluster fail.
882       # FIXME: not ready for failover to an arbitrary node
883       # FIXME: does not support file-backed instances
884       # WARNING: we currently take into account down instances as well as up
885       # ones, considering that even if they're down someone might want to start
886       # them even in the event of a node failure.
887       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
888         needed_mem = 0
889         for instance in instances:
890           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
891           if bep[constants.BE_AUTO_BALANCE]:
892             needed_mem += bep[constants.BE_MEMORY]
893         if nodeinfo['mfree'] < needed_mem:
894           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
895                       " failovers should node %s fail" % (node, prinode))
896           bad = True
897     return bad
898
899   def CheckPrereq(self):
900     """Check prerequisites.
901
902     Transform the list of checks we're going to skip into a set and check that
903     all its members are valid.
904
905     """
906     self.skip_set = frozenset(self.op.skip_checks)
907     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
908       raise errors.OpPrereqError("Invalid checks to be skipped specified")
909
910   def BuildHooksEnv(self):
911     """Build hooks env.
912
913     Cluster-Verify hooks just rone in the post phase and their failure makes
914     the output be logged in the verify output and the verification to fail.
915
916     """
917     all_nodes = self.cfg.GetNodeList()
918     env = {
919       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
920       }
921     for node in self.cfg.GetAllNodesInfo().values():
922       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
923
924     return env, [], all_nodes
925
926   def Exec(self, feedback_fn):
927     """Verify integrity of cluster, performing various test on nodes.
928
929     """
930     bad = False
931     feedback_fn("* Verifying global settings")
932     for msg in self.cfg.VerifyConfig():
933       feedback_fn("  - ERROR: %s" % msg)
934
935     vg_name = self.cfg.GetVGName()
936     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
937     nodelist = utils.NiceSort(self.cfg.GetNodeList())
938     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
939     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
940     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
941                         for iname in instancelist)
942     i_non_redundant = [] # Non redundant instances
943     i_non_a_balanced = [] # Non auto-balanced instances
944     n_offline = [] # List of offline nodes
945     n_drained = [] # List of nodes being drained
946     node_volume = {}
947     node_instance = {}
948     node_info = {}
949     instance_cfg = {}
950
951     # FIXME: verify OS list
952     # do local checksums
953     master_files = [constants.CLUSTER_CONF_FILE]
954
955     file_names = ssconf.SimpleStore().GetFileList()
956     file_names.append(constants.SSL_CERT_FILE)
957     file_names.append(constants.RAPI_CERT_FILE)
958     file_names.extend(master_files)
959
960     local_checksums = utils.FingerprintFiles(file_names)
961
962     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
963     node_verify_param = {
964       constants.NV_FILELIST: file_names,
965       constants.NV_NODELIST: [node.name for node in nodeinfo
966                               if not node.offline],
967       constants.NV_HYPERVISOR: hypervisors,
968       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
969                                   node.secondary_ip) for node in nodeinfo
970                                  if not node.offline],
971       constants.NV_INSTANCELIST: hypervisors,
972       constants.NV_VERSION: None,
973       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
974       }
975     if vg_name is not None:
976       node_verify_param[constants.NV_VGLIST] = None
977       node_verify_param[constants.NV_LVLIST] = vg_name
978       node_verify_param[constants.NV_DRBDLIST] = None
979     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
980                                            self.cfg.GetClusterName())
981
982     cluster = self.cfg.GetClusterInfo()
983     master_node = self.cfg.GetMasterNode()
984     all_drbd_map = self.cfg.ComputeDRBDMap()
985
986     for node_i in nodeinfo:
987       node = node_i.name
988       nresult = all_nvinfo[node].data
989
990       if node_i.offline:
991         feedback_fn("* Skipping offline node %s" % (node,))
992         n_offline.append(node)
993         continue
994
995       if node == master_node:
996         ntype = "master"
997       elif node_i.master_candidate:
998         ntype = "master candidate"
999       elif node_i.drained:
1000         ntype = "drained"
1001         n_drained.append(node)
1002       else:
1003         ntype = "regular"
1004       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1005
1006       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1007         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1008         bad = True
1009         continue
1010
1011       node_drbd = {}
1012       for minor, instance in all_drbd_map[node].items():
1013         if instance not in instanceinfo:
1014           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1015                       instance)
1016           # ghost instance should not be running, but otherwise we
1017           # don't give double warnings (both ghost instance and
1018           # unallocated minor in use)
1019           node_drbd[minor] = (instance, False)
1020         else:
1021           instance = instanceinfo[instance]
1022           node_drbd[minor] = (instance.name, instance.admin_up)
1023       result = self._VerifyNode(node_i, file_names, local_checksums,
1024                                 nresult, feedback_fn, master_files,
1025                                 node_drbd, vg_name)
1026       bad = bad or result
1027
1028       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1029       if vg_name is None:
1030         node_volume[node] = {}
1031       elif isinstance(lvdata, basestring):
1032         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1033                     (node, utils.SafeEncode(lvdata)))
1034         bad = True
1035         node_volume[node] = {}
1036       elif not isinstance(lvdata, dict):
1037         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1038         bad = True
1039         continue
1040       else:
1041         node_volume[node] = lvdata
1042
1043       # node_instance
1044       idata = nresult.get(constants.NV_INSTANCELIST, None)
1045       if not isinstance(idata, list):
1046         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1047                     (node,))
1048         bad = True
1049         continue
1050
1051       node_instance[node] = idata
1052
1053       # node_info
1054       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1055       if not isinstance(nodeinfo, dict):
1056         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1057         bad = True
1058         continue
1059
1060       try:
1061         node_info[node] = {
1062           "mfree": int(nodeinfo['memory_free']),
1063           "pinst": [],
1064           "sinst": [],
1065           # dictionary holding all instances this node is secondary for,
1066           # grouped by their primary node. Each key is a cluster node, and each
1067           # value is a list of instances which have the key as primary and the
1068           # current node as secondary.  this is handy to calculate N+1 memory
1069           # availability if you can only failover from a primary to its
1070           # secondary.
1071           "sinst-by-pnode": {},
1072         }
1073         # FIXME: devise a free space model for file based instances as well
1074         if vg_name is not None:
1075           if (constants.NV_VGLIST not in nresult or
1076               vg_name not in nresult[constants.NV_VGLIST]):
1077             feedback_fn("  - ERROR: node %s didn't return data for the"
1078                         " volume group '%s' - it is either missing or broken" %
1079                         (node, vg_name))
1080             bad = True
1081             continue
1082           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1083       except (ValueError, KeyError):
1084         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1085                     " from node %s" % (node,))
1086         bad = True
1087         continue
1088
1089     node_vol_should = {}
1090
1091     for instance in instancelist:
1092       feedback_fn("* Verifying instance %s" % instance)
1093       inst_config = instanceinfo[instance]
1094       result =  self._VerifyInstance(instance, inst_config, node_volume,
1095                                      node_instance, feedback_fn, n_offline)
1096       bad = bad or result
1097       inst_nodes_offline = []
1098
1099       inst_config.MapLVsByNode(node_vol_should)
1100
1101       instance_cfg[instance] = inst_config
1102
1103       pnode = inst_config.primary_node
1104       if pnode in node_info:
1105         node_info[pnode]['pinst'].append(instance)
1106       elif pnode not in n_offline:
1107         feedback_fn("  - ERROR: instance %s, connection to primary node"
1108                     " %s failed" % (instance, pnode))
1109         bad = True
1110
1111       if pnode in n_offline:
1112         inst_nodes_offline.append(pnode)
1113
1114       # If the instance is non-redundant we cannot survive losing its primary
1115       # node, so we are not N+1 compliant. On the other hand we have no disk
1116       # templates with more than one secondary so that situation is not well
1117       # supported either.
1118       # FIXME: does not support file-backed instances
1119       if len(inst_config.secondary_nodes) == 0:
1120         i_non_redundant.append(instance)
1121       elif len(inst_config.secondary_nodes) > 1:
1122         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1123                     % instance)
1124
1125       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1126         i_non_a_balanced.append(instance)
1127
1128       for snode in inst_config.secondary_nodes:
1129         if snode in node_info:
1130           node_info[snode]['sinst'].append(instance)
1131           if pnode not in node_info[snode]['sinst-by-pnode']:
1132             node_info[snode]['sinst-by-pnode'][pnode] = []
1133           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1134         elif snode not in n_offline:
1135           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1136                       " %s failed" % (instance, snode))
1137           bad = True
1138         if snode in n_offline:
1139           inst_nodes_offline.append(snode)
1140
1141       if inst_nodes_offline:
1142         # warn that the instance lives on offline nodes, and set bad=True
1143         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1144                     ", ".join(inst_nodes_offline))
1145         bad = True
1146
1147     feedback_fn("* Verifying orphan volumes")
1148     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1149                                        feedback_fn)
1150     bad = bad or result
1151
1152     feedback_fn("* Verifying remaining instances")
1153     result = self._VerifyOrphanInstances(instancelist, node_instance,
1154                                          feedback_fn)
1155     bad = bad or result
1156
1157     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1158       feedback_fn("* Verifying N+1 Memory redundancy")
1159       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1160       bad = bad or result
1161
1162     feedback_fn("* Other Notes")
1163     if i_non_redundant:
1164       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1165                   % len(i_non_redundant))
1166
1167     if i_non_a_balanced:
1168       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1169                   % len(i_non_a_balanced))
1170
1171     if n_offline:
1172       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1173
1174     if n_drained:
1175       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1176
1177     return not bad
1178
1179   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1180     """Analize the post-hooks' result
1181
1182     This method analyses the hook result, handles it, and sends some
1183     nicely-formatted feedback back to the user.
1184
1185     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1186         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1187     @param hooks_results: the results of the multi-node hooks rpc call
1188     @param feedback_fn: function used send feedback back to the caller
1189     @param lu_result: previous Exec result
1190     @return: the new Exec result, based on the previous result
1191         and hook results
1192
1193     """
1194     # We only really run POST phase hooks, and are only interested in
1195     # their results
1196     if phase == constants.HOOKS_PHASE_POST:
1197       # Used to change hooks' output to proper indentation
1198       indent_re = re.compile('^', re.M)
1199       feedback_fn("* Hooks Results")
1200       if not hooks_results:
1201         feedback_fn("  - ERROR: general communication failure")
1202         lu_result = 1
1203       else:
1204         for node_name in hooks_results:
1205           show_node_header = True
1206           res = hooks_results[node_name]
1207           if res.failed or res.data is False or not isinstance(res.data, list):
1208             if res.offline:
1209               # no need to warn or set fail return value
1210               continue
1211             feedback_fn("    Communication failure in hooks execution")
1212             lu_result = 1
1213             continue
1214           for script, hkr, output in res.data:
1215             if hkr == constants.HKR_FAIL:
1216               # The node header is only shown once, if there are
1217               # failing hooks on that node
1218               if show_node_header:
1219                 feedback_fn("  Node %s:" % node_name)
1220                 show_node_header = False
1221               feedback_fn("    ERROR: Script %s failed, output:" % script)
1222               output = indent_re.sub('      ', output)
1223               feedback_fn("%s" % output)
1224               lu_result = 1
1225
1226       return lu_result
1227
1228
1229 class LUVerifyDisks(NoHooksLU):
1230   """Verifies the cluster disks status.
1231
1232   """
1233   _OP_REQP = []
1234   REQ_BGL = False
1235
1236   def ExpandNames(self):
1237     self.needed_locks = {
1238       locking.LEVEL_NODE: locking.ALL_SET,
1239       locking.LEVEL_INSTANCE: locking.ALL_SET,
1240     }
1241     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1242
1243   def CheckPrereq(self):
1244     """Check prerequisites.
1245
1246     This has no prerequisites.
1247
1248     """
1249     pass
1250
1251   def Exec(self, feedback_fn):
1252     """Verify integrity of cluster disks.
1253
1254     """
1255     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1256
1257     vg_name = self.cfg.GetVGName()
1258     nodes = utils.NiceSort(self.cfg.GetNodeList())
1259     instances = [self.cfg.GetInstanceInfo(name)
1260                  for name in self.cfg.GetInstanceList()]
1261
1262     nv_dict = {}
1263     for inst in instances:
1264       inst_lvs = {}
1265       if (not inst.admin_up or
1266           inst.disk_template not in constants.DTS_NET_MIRROR):
1267         continue
1268       inst.MapLVsByNode(inst_lvs)
1269       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1270       for node, vol_list in inst_lvs.iteritems():
1271         for vol in vol_list:
1272           nv_dict[(node, vol)] = inst
1273
1274     if not nv_dict:
1275       return result
1276
1277     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1278
1279     to_act = set()
1280     for node in nodes:
1281       # node_volume
1282       lvs = node_lvs[node]
1283       if lvs.failed:
1284         if not lvs.offline:
1285           self.LogWarning("Connection to node %s failed: %s" %
1286                           (node, lvs.data))
1287         continue
1288       lvs = lvs.data
1289       if isinstance(lvs, basestring):
1290         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1291         res_nlvm[node] = lvs
1292         continue
1293       elif not isinstance(lvs, dict):
1294         logging.warning("Connection to node %s failed or invalid data"
1295                         " returned", node)
1296         res_nodes.append(node)
1297         continue
1298
1299       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1300         inst = nv_dict.pop((node, lv_name), None)
1301         if (not lv_online and inst is not None
1302             and inst.name not in res_instances):
1303           res_instances.append(inst.name)
1304
1305     # any leftover items in nv_dict are missing LVs, let's arrange the
1306     # data better
1307     for key, inst in nv_dict.iteritems():
1308       if inst.name not in res_missing:
1309         res_missing[inst.name] = []
1310       res_missing[inst.name].append(key)
1311
1312     return result
1313
1314
1315 class LURenameCluster(LogicalUnit):
1316   """Rename the cluster.
1317
1318   """
1319   HPATH = "cluster-rename"
1320   HTYPE = constants.HTYPE_CLUSTER
1321   _OP_REQP = ["name"]
1322
1323   def BuildHooksEnv(self):
1324     """Build hooks env.
1325
1326     """
1327     env = {
1328       "OP_TARGET": self.cfg.GetClusterName(),
1329       "NEW_NAME": self.op.name,
1330       }
1331     mn = self.cfg.GetMasterNode()
1332     return env, [mn], [mn]
1333
1334   def CheckPrereq(self):
1335     """Verify that the passed name is a valid one.
1336
1337     """
1338     hostname = utils.HostInfo(self.op.name)
1339
1340     new_name = hostname.name
1341     self.ip = new_ip = hostname.ip
1342     old_name = self.cfg.GetClusterName()
1343     old_ip = self.cfg.GetMasterIP()
1344     if new_name == old_name and new_ip == old_ip:
1345       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1346                                  " cluster has changed")
1347     if new_ip != old_ip:
1348       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1349         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1350                                    " reachable on the network. Aborting." %
1351                                    new_ip)
1352
1353     self.op.name = new_name
1354
1355   def Exec(self, feedback_fn):
1356     """Rename the cluster.
1357
1358     """
1359     clustername = self.op.name
1360     ip = self.ip
1361
1362     # shutdown the master IP
1363     master = self.cfg.GetMasterNode()
1364     result = self.rpc.call_node_stop_master(master, False)
1365     if result.failed or not result.data:
1366       raise errors.OpExecError("Could not disable the master role")
1367
1368     try:
1369       cluster = self.cfg.GetClusterInfo()
1370       cluster.cluster_name = clustername
1371       cluster.master_ip = ip
1372       self.cfg.Update(cluster)
1373
1374       # update the known hosts file
1375       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1376       node_list = self.cfg.GetNodeList()
1377       try:
1378         node_list.remove(master)
1379       except ValueError:
1380         pass
1381       result = self.rpc.call_upload_file(node_list,
1382                                          constants.SSH_KNOWN_HOSTS_FILE)
1383       for to_node, to_result in result.iteritems():
1384         if to_result.failed or not to_result.data:
1385           logging.error("Copy of file %s to node %s failed",
1386                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1387
1388     finally:
1389       result = self.rpc.call_node_start_master(master, False)
1390       if result.failed or not result.data:
1391         self.LogWarning("Could not re-enable the master role on"
1392                         " the master, please restart manually.")
1393
1394
1395 def _RecursiveCheckIfLVMBased(disk):
1396   """Check if the given disk or its children are lvm-based.
1397
1398   @type disk: L{objects.Disk}
1399   @param disk: the disk to check
1400   @rtype: booleean
1401   @return: boolean indicating whether a LD_LV dev_type was found or not
1402
1403   """
1404   if disk.children:
1405     for chdisk in disk.children:
1406       if _RecursiveCheckIfLVMBased(chdisk):
1407         return True
1408   return disk.dev_type == constants.LD_LV
1409
1410
1411 class LUSetClusterParams(LogicalUnit):
1412   """Change the parameters of the cluster.
1413
1414   """
1415   HPATH = "cluster-modify"
1416   HTYPE = constants.HTYPE_CLUSTER
1417   _OP_REQP = []
1418   REQ_BGL = False
1419
1420   def CheckParameters(self):
1421     """Check parameters
1422
1423     """
1424     if not hasattr(self.op, "candidate_pool_size"):
1425       self.op.candidate_pool_size = None
1426     if self.op.candidate_pool_size is not None:
1427       try:
1428         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1429       except ValueError, err:
1430         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1431                                    str(err))
1432       if self.op.candidate_pool_size < 1:
1433         raise errors.OpPrereqError("At least one master candidate needed")
1434
1435   def ExpandNames(self):
1436     # FIXME: in the future maybe other cluster params won't require checking on
1437     # all nodes to be modified.
1438     self.needed_locks = {
1439       locking.LEVEL_NODE: locking.ALL_SET,
1440     }
1441     self.share_locks[locking.LEVEL_NODE] = 1
1442
1443   def BuildHooksEnv(self):
1444     """Build hooks env.
1445
1446     """
1447     env = {
1448       "OP_TARGET": self.cfg.GetClusterName(),
1449       "NEW_VG_NAME": self.op.vg_name,
1450       }
1451     mn = self.cfg.GetMasterNode()
1452     return env, [mn], [mn]
1453
1454   def CheckPrereq(self):
1455     """Check prerequisites.
1456
1457     This checks whether the given params don't conflict and
1458     if the given volume group is valid.
1459
1460     """
1461     if self.op.vg_name is not None and not self.op.vg_name:
1462       instances = self.cfg.GetAllInstancesInfo().values()
1463       for inst in instances:
1464         for disk in inst.disks:
1465           if _RecursiveCheckIfLVMBased(disk):
1466             raise errors.OpPrereqError("Cannot disable lvm storage while"
1467                                        " lvm-based instances exist")
1468
1469     node_list = self.acquired_locks[locking.LEVEL_NODE]
1470
1471     # if vg_name not None, checks given volume group on all nodes
1472     if self.op.vg_name:
1473       vglist = self.rpc.call_vg_list(node_list)
1474       for node in node_list:
1475         if vglist[node].failed:
1476           # ignoring down node
1477           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1478           continue
1479         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1480                                               self.op.vg_name,
1481                                               constants.MIN_VG_SIZE)
1482         if vgstatus:
1483           raise errors.OpPrereqError("Error on node '%s': %s" %
1484                                      (node, vgstatus))
1485
1486     self.cluster = cluster = self.cfg.GetClusterInfo()
1487     # validate beparams changes
1488     if self.op.beparams:
1489       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1490       self.new_beparams = cluster.FillDict(
1491         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1492
1493     # hypervisor list/parameters
1494     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1495     if self.op.hvparams:
1496       if not isinstance(self.op.hvparams, dict):
1497         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1498       for hv_name, hv_dict in self.op.hvparams.items():
1499         if hv_name not in self.new_hvparams:
1500           self.new_hvparams[hv_name] = hv_dict
1501         else:
1502           self.new_hvparams[hv_name].update(hv_dict)
1503
1504     if self.op.enabled_hypervisors is not None:
1505       self.hv_list = self.op.enabled_hypervisors
1506     else:
1507       self.hv_list = cluster.enabled_hypervisors
1508
1509     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1510       # either the enabled list has changed, or the parameters have, validate
1511       for hv_name, hv_params in self.new_hvparams.items():
1512         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1513             (self.op.enabled_hypervisors and
1514              hv_name in self.op.enabled_hypervisors)):
1515           # either this is a new hypervisor, or its parameters have changed
1516           hv_class = hypervisor.GetHypervisor(hv_name)
1517           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1518           hv_class.CheckParameterSyntax(hv_params)
1519           _CheckHVParams(self, node_list, hv_name, hv_params)
1520
1521   def Exec(self, feedback_fn):
1522     """Change the parameters of the cluster.
1523
1524     """
1525     if self.op.vg_name is not None:
1526       if self.op.vg_name != self.cfg.GetVGName():
1527         self.cfg.SetVGName(self.op.vg_name)
1528       else:
1529         feedback_fn("Cluster LVM configuration already in desired"
1530                     " state, not changing")
1531     if self.op.hvparams:
1532       self.cluster.hvparams = self.new_hvparams
1533     if self.op.enabled_hypervisors is not None:
1534       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1535     if self.op.beparams:
1536       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1537     if self.op.candidate_pool_size is not None:
1538       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1539
1540     self.cfg.Update(self.cluster)
1541
1542     # we want to update nodes after the cluster so that if any errors
1543     # happen, we have recorded and saved the cluster info
1544     if self.op.candidate_pool_size is not None:
1545       _AdjustCandidatePool(self)
1546
1547
1548 class LURedistributeConfig(NoHooksLU):
1549   """Force the redistribution of cluster configuration.
1550
1551   This is a very simple LU.
1552
1553   """
1554   _OP_REQP = []
1555   REQ_BGL = False
1556
1557   def ExpandNames(self):
1558     self.needed_locks = {
1559       locking.LEVEL_NODE: locking.ALL_SET,
1560     }
1561     self.share_locks[locking.LEVEL_NODE] = 1
1562
1563   def CheckPrereq(self):
1564     """Check prerequisites.
1565
1566     """
1567
1568   def Exec(self, feedback_fn):
1569     """Redistribute the configuration.
1570
1571     """
1572     self.cfg.Update(self.cfg.GetClusterInfo())
1573
1574
1575 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1576   """Sleep and poll for an instance's disk to sync.
1577
1578   """
1579   if not instance.disks:
1580     return True
1581
1582   if not oneshot:
1583     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1584
1585   node = instance.primary_node
1586
1587   for dev in instance.disks:
1588     lu.cfg.SetDiskID(dev, node)
1589
1590   retries = 0
1591   while True:
1592     max_time = 0
1593     done = True
1594     cumul_degraded = False
1595     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1596     if rstats.failed or not rstats.data:
1597       lu.LogWarning("Can't get any data from node %s", node)
1598       retries += 1
1599       if retries >= 10:
1600         raise errors.RemoteError("Can't contact node %s for mirror data,"
1601                                  " aborting." % node)
1602       time.sleep(6)
1603       continue
1604     rstats = rstats.data
1605     retries = 0
1606     for i, mstat in enumerate(rstats):
1607       if mstat is None:
1608         lu.LogWarning("Can't compute data for node %s/%s",
1609                            node, instance.disks[i].iv_name)
1610         continue
1611       # we ignore the ldisk parameter
1612       perc_done, est_time, is_degraded, _ = mstat
1613       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1614       if perc_done is not None:
1615         done = False
1616         if est_time is not None:
1617           rem_time = "%d estimated seconds remaining" % est_time
1618           max_time = est_time
1619         else:
1620           rem_time = "no time estimate"
1621         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1622                         (instance.disks[i].iv_name, perc_done, rem_time))
1623     if done or oneshot:
1624       break
1625
1626     time.sleep(min(60, max_time))
1627
1628   if done:
1629     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1630   return not cumul_degraded
1631
1632
1633 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1634   """Check that mirrors are not degraded.
1635
1636   The ldisk parameter, if True, will change the test from the
1637   is_degraded attribute (which represents overall non-ok status for
1638   the device(s)) to the ldisk (representing the local storage status).
1639
1640   """
1641   lu.cfg.SetDiskID(dev, node)
1642   if ldisk:
1643     idx = 6
1644   else:
1645     idx = 5
1646
1647   result = True
1648   if on_primary or dev.AssembleOnSecondary():
1649     rstats = lu.rpc.call_blockdev_find(node, dev)
1650     msg = rstats.RemoteFailMsg()
1651     if msg:
1652       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1653       result = False
1654     elif not rstats.payload:
1655       lu.LogWarning("Can't find disk on node %s", node)
1656       result = False
1657     else:
1658       result = result and (not rstats.payload[idx])
1659   if dev.children:
1660     for child in dev.children:
1661       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1662
1663   return result
1664
1665
1666 class LUDiagnoseOS(NoHooksLU):
1667   """Logical unit for OS diagnose/query.
1668
1669   """
1670   _OP_REQP = ["output_fields", "names"]
1671   REQ_BGL = False
1672   _FIELDS_STATIC = utils.FieldSet()
1673   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1674
1675   def ExpandNames(self):
1676     if self.op.names:
1677       raise errors.OpPrereqError("Selective OS query not supported")
1678
1679     _CheckOutputFields(static=self._FIELDS_STATIC,
1680                        dynamic=self._FIELDS_DYNAMIC,
1681                        selected=self.op.output_fields)
1682
1683     # Lock all nodes, in shared mode
1684     # Temporary removal of locks, should be reverted later
1685     # TODO: reintroduce locks when they are lighter-weight
1686     self.needed_locks = {}
1687     #self.share_locks[locking.LEVEL_NODE] = 1
1688     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1689
1690   def CheckPrereq(self):
1691     """Check prerequisites.
1692
1693     """
1694
1695   @staticmethod
1696   def _DiagnoseByOS(node_list, rlist):
1697     """Remaps a per-node return list into an a per-os per-node dictionary
1698
1699     @param node_list: a list with the names of all nodes
1700     @param rlist: a map with node names as keys and OS objects as values
1701
1702     @rtype: dict
1703     @return: a dictionary with osnames as keys and as value another map, with
1704         nodes as keys and list of OS objects as values, eg::
1705
1706           {"debian-etch": {"node1": [<object>,...],
1707                            "node2": [<object>,]}
1708           }
1709
1710     """
1711     all_os = {}
1712     # we build here the list of nodes that didn't fail the RPC (at RPC
1713     # level), so that nodes with a non-responding node daemon don't
1714     # make all OSes invalid
1715     good_nodes = [node_name for node_name in rlist
1716                   if not rlist[node_name].failed]
1717     for node_name, nr in rlist.iteritems():
1718       if nr.failed or not nr.data:
1719         continue
1720       for os_obj in nr.data:
1721         if os_obj.name not in all_os:
1722           # build a list of nodes for this os containing empty lists
1723           # for each node in node_list
1724           all_os[os_obj.name] = {}
1725           for nname in good_nodes:
1726             all_os[os_obj.name][nname] = []
1727         all_os[os_obj.name][node_name].append(os_obj)
1728     return all_os
1729
1730   def Exec(self, feedback_fn):
1731     """Compute the list of OSes.
1732
1733     """
1734     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1735     node_data = self.rpc.call_os_diagnose(valid_nodes)
1736     if node_data == False:
1737       raise errors.OpExecError("Can't gather the list of OSes")
1738     pol = self._DiagnoseByOS(valid_nodes, node_data)
1739     output = []
1740     for os_name, os_data in pol.iteritems():
1741       row = []
1742       for field in self.op.output_fields:
1743         if field == "name":
1744           val = os_name
1745         elif field == "valid":
1746           val = utils.all([osl and osl[0] for osl in os_data.values()])
1747         elif field == "node_status":
1748           val = {}
1749           for node_name, nos_list in os_data.iteritems():
1750             val[node_name] = [(v.status, v.path) for v in nos_list]
1751         else:
1752           raise errors.ParameterError(field)
1753         row.append(val)
1754       output.append(row)
1755
1756     return output
1757
1758
1759 class LURemoveNode(LogicalUnit):
1760   """Logical unit for removing a node.
1761
1762   """
1763   HPATH = "node-remove"
1764   HTYPE = constants.HTYPE_NODE
1765   _OP_REQP = ["node_name"]
1766
1767   def BuildHooksEnv(self):
1768     """Build hooks env.
1769
1770     This doesn't run on the target node in the pre phase as a failed
1771     node would then be impossible to remove.
1772
1773     """
1774     env = {
1775       "OP_TARGET": self.op.node_name,
1776       "NODE_NAME": self.op.node_name,
1777       }
1778     all_nodes = self.cfg.GetNodeList()
1779     all_nodes.remove(self.op.node_name)
1780     return env, all_nodes, all_nodes
1781
1782   def CheckPrereq(self):
1783     """Check prerequisites.
1784
1785     This checks:
1786      - the node exists in the configuration
1787      - it does not have primary or secondary instances
1788      - it's not the master
1789
1790     Any errors are signalled by raising errors.OpPrereqError.
1791
1792     """
1793     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1794     if node is None:
1795       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1796
1797     instance_list = self.cfg.GetInstanceList()
1798
1799     masternode = self.cfg.GetMasterNode()
1800     if node.name == masternode:
1801       raise errors.OpPrereqError("Node is the master node,"
1802                                  " you need to failover first.")
1803
1804     for instance_name in instance_list:
1805       instance = self.cfg.GetInstanceInfo(instance_name)
1806       if node.name in instance.all_nodes:
1807         raise errors.OpPrereqError("Instance %s is still running on the node,"
1808                                    " please remove first." % instance_name)
1809     self.op.node_name = node.name
1810     self.node = node
1811
1812   def Exec(self, feedback_fn):
1813     """Removes the node from the cluster.
1814
1815     """
1816     node = self.node
1817     logging.info("Stopping the node daemon and removing configs from node %s",
1818                  node.name)
1819
1820     self.context.RemoveNode(node.name)
1821
1822     self.rpc.call_node_leave_cluster(node.name)
1823
1824     # Promote nodes to master candidate as needed
1825     _AdjustCandidatePool(self)
1826
1827
1828 class LUQueryNodes(NoHooksLU):
1829   """Logical unit for querying nodes.
1830
1831   """
1832   _OP_REQP = ["output_fields", "names", "use_locking"]
1833   REQ_BGL = False
1834   _FIELDS_DYNAMIC = utils.FieldSet(
1835     "dtotal", "dfree",
1836     "mtotal", "mnode", "mfree",
1837     "bootid",
1838     "ctotal", "cnodes", "csockets",
1839     )
1840
1841   _FIELDS_STATIC = utils.FieldSet(
1842     "name", "pinst_cnt", "sinst_cnt",
1843     "pinst_list", "sinst_list",
1844     "pip", "sip", "tags",
1845     "serial_no",
1846     "master_candidate",
1847     "master",
1848     "offline",
1849     "drained",
1850     )
1851
1852   def ExpandNames(self):
1853     _CheckOutputFields(static=self._FIELDS_STATIC,
1854                        dynamic=self._FIELDS_DYNAMIC,
1855                        selected=self.op.output_fields)
1856
1857     self.needed_locks = {}
1858     self.share_locks[locking.LEVEL_NODE] = 1
1859
1860     if self.op.names:
1861       self.wanted = _GetWantedNodes(self, self.op.names)
1862     else:
1863       self.wanted = locking.ALL_SET
1864
1865     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1866     self.do_locking = self.do_node_query and self.op.use_locking
1867     if self.do_locking:
1868       # if we don't request only static fields, we need to lock the nodes
1869       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1870
1871
1872   def CheckPrereq(self):
1873     """Check prerequisites.
1874
1875     """
1876     # The validation of the node list is done in the _GetWantedNodes,
1877     # if non empty, and if empty, there's no validation to do
1878     pass
1879
1880   def Exec(self, feedback_fn):
1881     """Computes the list of nodes and their attributes.
1882
1883     """
1884     all_info = self.cfg.GetAllNodesInfo()
1885     if self.do_locking:
1886       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1887     elif self.wanted != locking.ALL_SET:
1888       nodenames = self.wanted
1889       missing = set(nodenames).difference(all_info.keys())
1890       if missing:
1891         raise errors.OpExecError(
1892           "Some nodes were removed before retrieving their data: %s" % missing)
1893     else:
1894       nodenames = all_info.keys()
1895
1896     nodenames = utils.NiceSort(nodenames)
1897     nodelist = [all_info[name] for name in nodenames]
1898
1899     # begin data gathering
1900
1901     if self.do_node_query:
1902       live_data = {}
1903       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1904                                           self.cfg.GetHypervisorType())
1905       for name in nodenames:
1906         nodeinfo = node_data[name]
1907         if not nodeinfo.failed and nodeinfo.data:
1908           nodeinfo = nodeinfo.data
1909           fn = utils.TryConvert
1910           live_data[name] = {
1911             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1912             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1913             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1914             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1915             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1916             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1917             "bootid": nodeinfo.get('bootid', None),
1918             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1919             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1920             }
1921         else:
1922           live_data[name] = {}
1923     else:
1924       live_data = dict.fromkeys(nodenames, {})
1925
1926     node_to_primary = dict([(name, set()) for name in nodenames])
1927     node_to_secondary = dict([(name, set()) for name in nodenames])
1928
1929     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1930                              "sinst_cnt", "sinst_list"))
1931     if inst_fields & frozenset(self.op.output_fields):
1932       instancelist = self.cfg.GetInstanceList()
1933
1934       for instance_name in instancelist:
1935         inst = self.cfg.GetInstanceInfo(instance_name)
1936         if inst.primary_node in node_to_primary:
1937           node_to_primary[inst.primary_node].add(inst.name)
1938         for secnode in inst.secondary_nodes:
1939           if secnode in node_to_secondary:
1940             node_to_secondary[secnode].add(inst.name)
1941
1942     master_node = self.cfg.GetMasterNode()
1943
1944     # end data gathering
1945
1946     output = []
1947     for node in nodelist:
1948       node_output = []
1949       for field in self.op.output_fields:
1950         if field == "name":
1951           val = node.name
1952         elif field == "pinst_list":
1953           val = list(node_to_primary[node.name])
1954         elif field == "sinst_list":
1955           val = list(node_to_secondary[node.name])
1956         elif field == "pinst_cnt":
1957           val = len(node_to_primary[node.name])
1958         elif field == "sinst_cnt":
1959           val = len(node_to_secondary[node.name])
1960         elif field == "pip":
1961           val = node.primary_ip
1962         elif field == "sip":
1963           val = node.secondary_ip
1964         elif field == "tags":
1965           val = list(node.GetTags())
1966         elif field == "serial_no":
1967           val = node.serial_no
1968         elif field == "master_candidate":
1969           val = node.master_candidate
1970         elif field == "master":
1971           val = node.name == master_node
1972         elif field == "offline":
1973           val = node.offline
1974         elif field == "drained":
1975           val = node.drained
1976         elif self._FIELDS_DYNAMIC.Matches(field):
1977           val = live_data[node.name].get(field, None)
1978         else:
1979           raise errors.ParameterError(field)
1980         node_output.append(val)
1981       output.append(node_output)
1982
1983     return output
1984
1985
1986 class LUQueryNodeVolumes(NoHooksLU):
1987   """Logical unit for getting volumes on node(s).
1988
1989   """
1990   _OP_REQP = ["nodes", "output_fields"]
1991   REQ_BGL = False
1992   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1993   _FIELDS_STATIC = utils.FieldSet("node")
1994
1995   def ExpandNames(self):
1996     _CheckOutputFields(static=self._FIELDS_STATIC,
1997                        dynamic=self._FIELDS_DYNAMIC,
1998                        selected=self.op.output_fields)
1999
2000     self.needed_locks = {}
2001     self.share_locks[locking.LEVEL_NODE] = 1
2002     if not self.op.nodes:
2003       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2004     else:
2005       self.needed_locks[locking.LEVEL_NODE] = \
2006         _GetWantedNodes(self, self.op.nodes)
2007
2008   def CheckPrereq(self):
2009     """Check prerequisites.
2010
2011     This checks that the fields required are valid output fields.
2012
2013     """
2014     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2015
2016   def Exec(self, feedback_fn):
2017     """Computes the list of nodes and their attributes.
2018
2019     """
2020     nodenames = self.nodes
2021     volumes = self.rpc.call_node_volumes(nodenames)
2022
2023     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2024              in self.cfg.GetInstanceList()]
2025
2026     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2027
2028     output = []
2029     for node in nodenames:
2030       if node not in volumes or volumes[node].failed or not volumes[node].data:
2031         continue
2032
2033       node_vols = volumes[node].data[:]
2034       node_vols.sort(key=lambda vol: vol['dev'])
2035
2036       for vol in node_vols:
2037         node_output = []
2038         for field in self.op.output_fields:
2039           if field == "node":
2040             val = node
2041           elif field == "phys":
2042             val = vol['dev']
2043           elif field == "vg":
2044             val = vol['vg']
2045           elif field == "name":
2046             val = vol['name']
2047           elif field == "size":
2048             val = int(float(vol['size']))
2049           elif field == "instance":
2050             for inst in ilist:
2051               if node not in lv_by_node[inst]:
2052                 continue
2053               if vol['name'] in lv_by_node[inst][node]:
2054                 val = inst.name
2055                 break
2056             else:
2057               val = '-'
2058           else:
2059             raise errors.ParameterError(field)
2060           node_output.append(str(val))
2061
2062         output.append(node_output)
2063
2064     return output
2065
2066
2067 class LUAddNode(LogicalUnit):
2068   """Logical unit for adding node to the cluster.
2069
2070   """
2071   HPATH = "node-add"
2072   HTYPE = constants.HTYPE_NODE
2073   _OP_REQP = ["node_name"]
2074
2075   def BuildHooksEnv(self):
2076     """Build hooks env.
2077
2078     This will run on all nodes before, and on all nodes + the new node after.
2079
2080     """
2081     env = {
2082       "OP_TARGET": self.op.node_name,
2083       "NODE_NAME": self.op.node_name,
2084       "NODE_PIP": self.op.primary_ip,
2085       "NODE_SIP": self.op.secondary_ip,
2086       }
2087     nodes_0 = self.cfg.GetNodeList()
2088     nodes_1 = nodes_0 + [self.op.node_name, ]
2089     return env, nodes_0, nodes_1
2090
2091   def CheckPrereq(self):
2092     """Check prerequisites.
2093
2094     This checks:
2095      - the new node is not already in the config
2096      - it is resolvable
2097      - its parameters (single/dual homed) matches the cluster
2098
2099     Any errors are signalled by raising errors.OpPrereqError.
2100
2101     """
2102     node_name = self.op.node_name
2103     cfg = self.cfg
2104
2105     dns_data = utils.HostInfo(node_name)
2106
2107     node = dns_data.name
2108     primary_ip = self.op.primary_ip = dns_data.ip
2109     secondary_ip = getattr(self.op, "secondary_ip", None)
2110     if secondary_ip is None:
2111       secondary_ip = primary_ip
2112     if not utils.IsValidIP(secondary_ip):
2113       raise errors.OpPrereqError("Invalid secondary IP given")
2114     self.op.secondary_ip = secondary_ip
2115
2116     node_list = cfg.GetNodeList()
2117     if not self.op.readd and node in node_list:
2118       raise errors.OpPrereqError("Node %s is already in the configuration" %
2119                                  node)
2120     elif self.op.readd and node not in node_list:
2121       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2122
2123     for existing_node_name in node_list:
2124       existing_node = cfg.GetNodeInfo(existing_node_name)
2125
2126       if self.op.readd and node == existing_node_name:
2127         if (existing_node.primary_ip != primary_ip or
2128             existing_node.secondary_ip != secondary_ip):
2129           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2130                                      " address configuration as before")
2131         continue
2132
2133       if (existing_node.primary_ip == primary_ip or
2134           existing_node.secondary_ip == primary_ip or
2135           existing_node.primary_ip == secondary_ip or
2136           existing_node.secondary_ip == secondary_ip):
2137         raise errors.OpPrereqError("New node ip address(es) conflict with"
2138                                    " existing node %s" % existing_node.name)
2139
2140     # check that the type of the node (single versus dual homed) is the
2141     # same as for the master
2142     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2143     master_singlehomed = myself.secondary_ip == myself.primary_ip
2144     newbie_singlehomed = secondary_ip == primary_ip
2145     if master_singlehomed != newbie_singlehomed:
2146       if master_singlehomed:
2147         raise errors.OpPrereqError("The master has no private ip but the"
2148                                    " new node has one")
2149       else:
2150         raise errors.OpPrereqError("The master has a private ip but the"
2151                                    " new node doesn't have one")
2152
2153     # checks reachablity
2154     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2155       raise errors.OpPrereqError("Node not reachable by ping")
2156
2157     if not newbie_singlehomed:
2158       # check reachability from my secondary ip to newbie's secondary ip
2159       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2160                            source=myself.secondary_ip):
2161         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2162                                    " based ping to noded port")
2163
2164     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2165     mc_now, _ = self.cfg.GetMasterCandidateStats()
2166     master_candidate = mc_now < cp_size
2167
2168     self.new_node = objects.Node(name=node,
2169                                  primary_ip=primary_ip,
2170                                  secondary_ip=secondary_ip,
2171                                  master_candidate=master_candidate,
2172                                  offline=False, drained=False)
2173
2174   def Exec(self, feedback_fn):
2175     """Adds the new node to the cluster.
2176
2177     """
2178     new_node = self.new_node
2179     node = new_node.name
2180
2181     # check connectivity
2182     result = self.rpc.call_version([node])[node]
2183     result.Raise()
2184     if result.data:
2185       if constants.PROTOCOL_VERSION == result.data:
2186         logging.info("Communication to node %s fine, sw version %s match",
2187                      node, result.data)
2188       else:
2189         raise errors.OpExecError("Version mismatch master version %s,"
2190                                  " node version %s" %
2191                                  (constants.PROTOCOL_VERSION, result.data))
2192     else:
2193       raise errors.OpExecError("Cannot get version from the new node")
2194
2195     # setup ssh on node
2196     logging.info("Copy ssh key to node %s", node)
2197     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2198     keyarray = []
2199     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2200                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2201                 priv_key, pub_key]
2202
2203     for i in keyfiles:
2204       f = open(i, 'r')
2205       try:
2206         keyarray.append(f.read())
2207       finally:
2208         f.close()
2209
2210     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2211                                     keyarray[2],
2212                                     keyarray[3], keyarray[4], keyarray[5])
2213
2214     msg = result.RemoteFailMsg()
2215     if msg:
2216       raise errors.OpExecError("Cannot transfer ssh keys to the"
2217                                " new node: %s" % msg)
2218
2219     # Add node to our /etc/hosts, and add key to known_hosts
2220     utils.AddHostToEtcHosts(new_node.name)
2221
2222     if new_node.secondary_ip != new_node.primary_ip:
2223       result = self.rpc.call_node_has_ip_address(new_node.name,
2224                                                  new_node.secondary_ip)
2225       if result.failed or not result.data:
2226         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2227                                  " you gave (%s). Please fix and re-run this"
2228                                  " command." % new_node.secondary_ip)
2229
2230     node_verify_list = [self.cfg.GetMasterNode()]
2231     node_verify_param = {
2232       'nodelist': [node],
2233       # TODO: do a node-net-test as well?
2234     }
2235
2236     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2237                                        self.cfg.GetClusterName())
2238     for verifier in node_verify_list:
2239       if result[verifier].failed or not result[verifier].data:
2240         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2241                                  " for remote verification" % verifier)
2242       if result[verifier].data['nodelist']:
2243         for failed in result[verifier].data['nodelist']:
2244           feedback_fn("ssh/hostname verification failed %s -> %s" %
2245                       (verifier, result[verifier].data['nodelist'][failed]))
2246         raise errors.OpExecError("ssh/hostname verification failed.")
2247
2248     # Distribute updated /etc/hosts and known_hosts to all nodes,
2249     # including the node just added
2250     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2251     dist_nodes = self.cfg.GetNodeList()
2252     if not self.op.readd:
2253       dist_nodes.append(node)
2254     if myself.name in dist_nodes:
2255       dist_nodes.remove(myself.name)
2256
2257     logging.debug("Copying hosts and known_hosts to all nodes")
2258     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2259       result = self.rpc.call_upload_file(dist_nodes, fname)
2260       for to_node, to_result in result.iteritems():
2261         if to_result.failed or not to_result.data:
2262           logging.error("Copy of file %s to node %s failed", fname, to_node)
2263
2264     to_copy = []
2265     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2266     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2267       to_copy.append(constants.VNC_PASSWORD_FILE)
2268
2269     for fname in to_copy:
2270       result = self.rpc.call_upload_file([node], fname)
2271       if result[node].failed or not result[node]:
2272         logging.error("Could not copy file %s to node %s", fname, node)
2273
2274     if self.op.readd:
2275       self.context.ReaddNode(new_node)
2276     else:
2277       self.context.AddNode(new_node)
2278
2279
2280 class LUSetNodeParams(LogicalUnit):
2281   """Modifies the parameters of a node.
2282
2283   """
2284   HPATH = "node-modify"
2285   HTYPE = constants.HTYPE_NODE
2286   _OP_REQP = ["node_name"]
2287   REQ_BGL = False
2288
2289   def CheckArguments(self):
2290     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2291     if node_name is None:
2292       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2293     self.op.node_name = node_name
2294     _CheckBooleanOpField(self.op, 'master_candidate')
2295     _CheckBooleanOpField(self.op, 'offline')
2296     _CheckBooleanOpField(self.op, 'drained')
2297     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2298     if all_mods.count(None) == 3:
2299       raise errors.OpPrereqError("Please pass at least one modification")
2300     if all_mods.count(True) > 1:
2301       raise errors.OpPrereqError("Can't set the node into more than one"
2302                                  " state at the same time")
2303
2304   def ExpandNames(self):
2305     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2306
2307   def BuildHooksEnv(self):
2308     """Build hooks env.
2309
2310     This runs on the master node.
2311
2312     """
2313     env = {
2314       "OP_TARGET": self.op.node_name,
2315       "MASTER_CANDIDATE": str(self.op.master_candidate),
2316       "OFFLINE": str(self.op.offline),
2317       "DRAINED": str(self.op.drained),
2318       }
2319     nl = [self.cfg.GetMasterNode(),
2320           self.op.node_name]
2321     return env, nl, nl
2322
2323   def CheckPrereq(self):
2324     """Check prerequisites.
2325
2326     This only checks the instance list against the existing names.
2327
2328     """
2329     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2330
2331     if ((self.op.master_candidate == False or self.op.offline == True or
2332          self.op.drained == True) and node.master_candidate):
2333       # we will demote the node from master_candidate
2334       if self.op.node_name == self.cfg.GetMasterNode():
2335         raise errors.OpPrereqError("The master node has to be a"
2336                                    " master candidate, online and not drained")
2337       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2338       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2339       if num_candidates <= cp_size:
2340         msg = ("Not enough master candidates (desired"
2341                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2342         if self.op.force:
2343           self.LogWarning(msg)
2344         else:
2345           raise errors.OpPrereqError(msg)
2346
2347     if (self.op.master_candidate == True and
2348         ((node.offline and not self.op.offline == False) or
2349          (node.drained and not self.op.drained == False))):
2350       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2351                                  " to master_candidate" % node.name)
2352
2353     return
2354
2355   def Exec(self, feedback_fn):
2356     """Modifies a node.
2357
2358     """
2359     node = self.node
2360
2361     result = []
2362     changed_mc = False
2363
2364     if self.op.offline is not None:
2365       node.offline = self.op.offline
2366       result.append(("offline", str(self.op.offline)))
2367       if self.op.offline == True:
2368         if node.master_candidate:
2369           node.master_candidate = False
2370           changed_mc = True
2371           result.append(("master_candidate", "auto-demotion due to offline"))
2372         if node.drained:
2373           node.drained = False
2374           result.append(("drained", "clear drained status due to offline"))
2375
2376     if self.op.master_candidate is not None:
2377       node.master_candidate = self.op.master_candidate
2378       changed_mc = True
2379       result.append(("master_candidate", str(self.op.master_candidate)))
2380       if self.op.master_candidate == False:
2381         rrc = self.rpc.call_node_demote_from_mc(node.name)
2382         msg = rrc.RemoteFailMsg()
2383         if msg:
2384           self.LogWarning("Node failed to demote itself: %s" % msg)
2385
2386     if self.op.drained is not None:
2387       node.drained = self.op.drained
2388       result.append(("drained", str(self.op.drained)))
2389       if self.op.drained == True:
2390         if node.master_candidate:
2391           node.master_candidate = False
2392           changed_mc = True
2393           result.append(("master_candidate", "auto-demotion due to drain"))
2394         if node.offline:
2395           node.offline = False
2396           result.append(("offline", "clear offline status due to drain"))
2397
2398     # this will trigger configuration file update, if needed
2399     self.cfg.Update(node)
2400     # this will trigger job queue propagation or cleanup
2401     if changed_mc:
2402       self.context.ReaddNode(node)
2403
2404     return result
2405
2406
2407 class LUQueryClusterInfo(NoHooksLU):
2408   """Query cluster configuration.
2409
2410   """
2411   _OP_REQP = []
2412   REQ_BGL = False
2413
2414   def ExpandNames(self):
2415     self.needed_locks = {}
2416
2417   def CheckPrereq(self):
2418     """No prerequsites needed for this LU.
2419
2420     """
2421     pass
2422
2423   def Exec(self, feedback_fn):
2424     """Return cluster config.
2425
2426     """
2427     cluster = self.cfg.GetClusterInfo()
2428     result = {
2429       "software_version": constants.RELEASE_VERSION,
2430       "protocol_version": constants.PROTOCOL_VERSION,
2431       "config_version": constants.CONFIG_VERSION,
2432       "os_api_version": constants.OS_API_VERSION,
2433       "export_version": constants.EXPORT_VERSION,
2434       "architecture": (platform.architecture()[0], platform.machine()),
2435       "name": cluster.cluster_name,
2436       "master": cluster.master_node,
2437       "default_hypervisor": cluster.default_hypervisor,
2438       "enabled_hypervisors": cluster.enabled_hypervisors,
2439       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2440                         for hypervisor in cluster.enabled_hypervisors]),
2441       "beparams": cluster.beparams,
2442       "candidate_pool_size": cluster.candidate_pool_size,
2443       }
2444
2445     return result
2446
2447
2448 class LUQueryConfigValues(NoHooksLU):
2449   """Return configuration values.
2450
2451   """
2452   _OP_REQP = []
2453   REQ_BGL = False
2454   _FIELDS_DYNAMIC = utils.FieldSet()
2455   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2456
2457   def ExpandNames(self):
2458     self.needed_locks = {}
2459
2460     _CheckOutputFields(static=self._FIELDS_STATIC,
2461                        dynamic=self._FIELDS_DYNAMIC,
2462                        selected=self.op.output_fields)
2463
2464   def CheckPrereq(self):
2465     """No prerequisites.
2466
2467     """
2468     pass
2469
2470   def Exec(self, feedback_fn):
2471     """Dump a representation of the cluster config to the standard output.
2472
2473     """
2474     values = []
2475     for field in self.op.output_fields:
2476       if field == "cluster_name":
2477         entry = self.cfg.GetClusterName()
2478       elif field == "master_node":
2479         entry = self.cfg.GetMasterNode()
2480       elif field == "drain_flag":
2481         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2482       else:
2483         raise errors.ParameterError(field)
2484       values.append(entry)
2485     return values
2486
2487
2488 class LUActivateInstanceDisks(NoHooksLU):
2489   """Bring up an instance's disks.
2490
2491   """
2492   _OP_REQP = ["instance_name"]
2493   REQ_BGL = False
2494
2495   def ExpandNames(self):
2496     self._ExpandAndLockInstance()
2497     self.needed_locks[locking.LEVEL_NODE] = []
2498     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2499
2500   def DeclareLocks(self, level):
2501     if level == locking.LEVEL_NODE:
2502       self._LockInstancesNodes()
2503
2504   def CheckPrereq(self):
2505     """Check prerequisites.
2506
2507     This checks that the instance is in the cluster.
2508
2509     """
2510     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2511     assert self.instance is not None, \
2512       "Cannot retrieve locked instance %s" % self.op.instance_name
2513     _CheckNodeOnline(self, self.instance.primary_node)
2514
2515   def Exec(self, feedback_fn):
2516     """Activate the disks.
2517
2518     """
2519     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2520     if not disks_ok:
2521       raise errors.OpExecError("Cannot activate block devices")
2522
2523     return disks_info
2524
2525
2526 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2527   """Prepare the block devices for an instance.
2528
2529   This sets up the block devices on all nodes.
2530
2531   @type lu: L{LogicalUnit}
2532   @param lu: the logical unit on whose behalf we execute
2533   @type instance: L{objects.Instance}
2534   @param instance: the instance for whose disks we assemble
2535   @type ignore_secondaries: boolean
2536   @param ignore_secondaries: if true, errors on secondary nodes
2537       won't result in an error return from the function
2538   @return: False if the operation failed, otherwise a list of
2539       (host, instance_visible_name, node_visible_name)
2540       with the mapping from node devices to instance devices
2541
2542   """
2543   device_info = []
2544   disks_ok = True
2545   iname = instance.name
2546   # With the two passes mechanism we try to reduce the window of
2547   # opportunity for the race condition of switching DRBD to primary
2548   # before handshaking occured, but we do not eliminate it
2549
2550   # The proper fix would be to wait (with some limits) until the
2551   # connection has been made and drbd transitions from WFConnection
2552   # into any other network-connected state (Connected, SyncTarget,
2553   # SyncSource, etc.)
2554
2555   # 1st pass, assemble on all nodes in secondary mode
2556   for inst_disk in instance.disks:
2557     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2558       lu.cfg.SetDiskID(node_disk, node)
2559       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2560       msg = result.RemoteFailMsg()
2561       if msg:
2562         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2563                            " (is_primary=False, pass=1): %s",
2564                            inst_disk.iv_name, node, msg)
2565         if not ignore_secondaries:
2566           disks_ok = False
2567
2568   # FIXME: race condition on drbd migration to primary
2569
2570   # 2nd pass, do only the primary node
2571   for inst_disk in instance.disks:
2572     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2573       if node != instance.primary_node:
2574         continue
2575       lu.cfg.SetDiskID(node_disk, node)
2576       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2577       msg = result.RemoteFailMsg()
2578       if msg:
2579         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2580                            " (is_primary=True, pass=2): %s",
2581                            inst_disk.iv_name, node, msg)
2582         disks_ok = False
2583     device_info.append((instance.primary_node, inst_disk.iv_name,
2584                         result.payload))
2585
2586   # leave the disks configured for the primary node
2587   # this is a workaround that would be fixed better by
2588   # improving the logical/physical id handling
2589   for disk in instance.disks:
2590     lu.cfg.SetDiskID(disk, instance.primary_node)
2591
2592   return disks_ok, device_info
2593
2594
2595 def _StartInstanceDisks(lu, instance, force):
2596   """Start the disks of an instance.
2597
2598   """
2599   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2600                                            ignore_secondaries=force)
2601   if not disks_ok:
2602     _ShutdownInstanceDisks(lu, instance)
2603     if force is not None and not force:
2604       lu.proc.LogWarning("", hint="If the message above refers to a"
2605                          " secondary node,"
2606                          " you can retry the operation using '--force'.")
2607     raise errors.OpExecError("Disk consistency error")
2608
2609
2610 class LUDeactivateInstanceDisks(NoHooksLU):
2611   """Shutdown an instance's disks.
2612
2613   """
2614   _OP_REQP = ["instance_name"]
2615   REQ_BGL = False
2616
2617   def ExpandNames(self):
2618     self._ExpandAndLockInstance()
2619     self.needed_locks[locking.LEVEL_NODE] = []
2620     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2621
2622   def DeclareLocks(self, level):
2623     if level == locking.LEVEL_NODE:
2624       self._LockInstancesNodes()
2625
2626   def CheckPrereq(self):
2627     """Check prerequisites.
2628
2629     This checks that the instance is in the cluster.
2630
2631     """
2632     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2633     assert self.instance is not None, \
2634       "Cannot retrieve locked instance %s" % self.op.instance_name
2635
2636   def Exec(self, feedback_fn):
2637     """Deactivate the disks
2638
2639     """
2640     instance = self.instance
2641     _SafeShutdownInstanceDisks(self, instance)
2642
2643
2644 def _SafeShutdownInstanceDisks(lu, instance):
2645   """Shutdown block devices of an instance.
2646
2647   This function checks if an instance is running, before calling
2648   _ShutdownInstanceDisks.
2649
2650   """
2651   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2652                                       [instance.hypervisor])
2653   ins_l = ins_l[instance.primary_node]
2654   if ins_l.failed or not isinstance(ins_l.data, list):
2655     raise errors.OpExecError("Can't contact node '%s'" %
2656                              instance.primary_node)
2657
2658   if instance.name in ins_l.data:
2659     raise errors.OpExecError("Instance is running, can't shutdown"
2660                              " block devices.")
2661
2662   _ShutdownInstanceDisks(lu, instance)
2663
2664
2665 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2666   """Shutdown block devices of an instance.
2667
2668   This does the shutdown on all nodes of the instance.
2669
2670   If the ignore_primary is false, errors on the primary node are
2671   ignored.
2672
2673   """
2674   all_result = True
2675   for disk in instance.disks:
2676     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2677       lu.cfg.SetDiskID(top_disk, node)
2678       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2679       msg = result.RemoteFailMsg()
2680       if msg:
2681         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2682                       disk.iv_name, node, msg)
2683         if not ignore_primary or node != instance.primary_node:
2684           all_result = False
2685   return all_result
2686
2687
2688 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2689   """Checks if a node has enough free memory.
2690
2691   This function check if a given node has the needed amount of free
2692   memory. In case the node has less memory or we cannot get the
2693   information from the node, this function raise an OpPrereqError
2694   exception.
2695
2696   @type lu: C{LogicalUnit}
2697   @param lu: a logical unit from which we get configuration data
2698   @type node: C{str}
2699   @param node: the node to check
2700   @type reason: C{str}
2701   @param reason: string to use in the error message
2702   @type requested: C{int}
2703   @param requested: the amount of memory in MiB to check for
2704   @type hypervisor_name: C{str}
2705   @param hypervisor_name: the hypervisor to ask for memory stats
2706   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2707       we cannot check the node
2708
2709   """
2710   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2711   nodeinfo[node].Raise()
2712   free_mem = nodeinfo[node].data.get('memory_free')
2713   if not isinstance(free_mem, int):
2714     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2715                              " was '%s'" % (node, free_mem))
2716   if requested > free_mem:
2717     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2718                              " needed %s MiB, available %s MiB" %
2719                              (node, reason, requested, free_mem))
2720
2721
2722 class LUStartupInstance(LogicalUnit):
2723   """Starts an instance.
2724
2725   """
2726   HPATH = "instance-start"
2727   HTYPE = constants.HTYPE_INSTANCE
2728   _OP_REQP = ["instance_name", "force"]
2729   REQ_BGL = False
2730
2731   def ExpandNames(self):
2732     self._ExpandAndLockInstance()
2733
2734   def BuildHooksEnv(self):
2735     """Build hooks env.
2736
2737     This runs on master, primary and secondary nodes of the instance.
2738
2739     """
2740     env = {
2741       "FORCE": self.op.force,
2742       }
2743     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2744     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2745     return env, nl, nl
2746
2747   def CheckPrereq(self):
2748     """Check prerequisites.
2749
2750     This checks that the instance is in the cluster.
2751
2752     """
2753     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2754     assert self.instance is not None, \
2755       "Cannot retrieve locked instance %s" % self.op.instance_name
2756
2757     _CheckNodeOnline(self, instance.primary_node)
2758
2759     bep = self.cfg.GetClusterInfo().FillBE(instance)
2760     # check bridges existance
2761     _CheckInstanceBridgesExist(self, instance)
2762
2763     _CheckNodeFreeMemory(self, instance.primary_node,
2764                          "starting instance %s" % instance.name,
2765                          bep[constants.BE_MEMORY], instance.hypervisor)
2766
2767   def Exec(self, feedback_fn):
2768     """Start the instance.
2769
2770     """
2771     instance = self.instance
2772     force = self.op.force
2773
2774     self.cfg.MarkInstanceUp(instance.name)
2775
2776     node_current = instance.primary_node
2777
2778     _StartInstanceDisks(self, instance, force)
2779
2780     result = self.rpc.call_instance_start(node_current, instance)
2781     msg = result.RemoteFailMsg()
2782     if msg:
2783       _ShutdownInstanceDisks(self, instance)
2784       raise errors.OpExecError("Could not start instance: %s" % msg)
2785
2786
2787 class LURebootInstance(LogicalUnit):
2788   """Reboot an instance.
2789
2790   """
2791   HPATH = "instance-reboot"
2792   HTYPE = constants.HTYPE_INSTANCE
2793   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2794   REQ_BGL = False
2795
2796   def ExpandNames(self):
2797     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2798                                    constants.INSTANCE_REBOOT_HARD,
2799                                    constants.INSTANCE_REBOOT_FULL]:
2800       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2801                                   (constants.INSTANCE_REBOOT_SOFT,
2802                                    constants.INSTANCE_REBOOT_HARD,
2803                                    constants.INSTANCE_REBOOT_FULL))
2804     self._ExpandAndLockInstance()
2805
2806   def BuildHooksEnv(self):
2807     """Build hooks env.
2808
2809     This runs on master, primary and secondary nodes of the instance.
2810
2811     """
2812     env = {
2813       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2814       "REBOOT_TYPE": self.op.reboot_type,
2815       }
2816     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2817     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2818     return env, nl, nl
2819
2820   def CheckPrereq(self):
2821     """Check prerequisites.
2822
2823     This checks that the instance is in the cluster.
2824
2825     """
2826     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2827     assert self.instance is not None, \
2828       "Cannot retrieve locked instance %s" % self.op.instance_name
2829
2830     _CheckNodeOnline(self, instance.primary_node)
2831
2832     # check bridges existance
2833     _CheckInstanceBridgesExist(self, instance)
2834
2835   def Exec(self, feedback_fn):
2836     """Reboot the instance.
2837
2838     """
2839     instance = self.instance
2840     ignore_secondaries = self.op.ignore_secondaries
2841     reboot_type = self.op.reboot_type
2842
2843     node_current = instance.primary_node
2844
2845     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2846                        constants.INSTANCE_REBOOT_HARD]:
2847       for disk in instance.disks:
2848         self.cfg.SetDiskID(disk, node_current)
2849       result = self.rpc.call_instance_reboot(node_current, instance,
2850                                              reboot_type)
2851       msg = result.RemoteFailMsg()
2852       if msg:
2853         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2854     else:
2855       result = self.rpc.call_instance_shutdown(node_current, instance)
2856       msg = result.RemoteFailMsg()
2857       if msg:
2858         raise errors.OpExecError("Could not shutdown instance for"
2859                                  " full reboot: %s" % msg)
2860       _ShutdownInstanceDisks(self, instance)
2861       _StartInstanceDisks(self, instance, ignore_secondaries)
2862       result = self.rpc.call_instance_start(node_current, instance)
2863       msg = result.RemoteFailMsg()
2864       if msg:
2865         _ShutdownInstanceDisks(self, instance)
2866         raise errors.OpExecError("Could not start instance for"
2867                                  " full reboot: %s" % msg)
2868
2869     self.cfg.MarkInstanceUp(instance.name)
2870
2871
2872 class LUShutdownInstance(LogicalUnit):
2873   """Shutdown an instance.
2874
2875   """
2876   HPATH = "instance-stop"
2877   HTYPE = constants.HTYPE_INSTANCE
2878   _OP_REQP = ["instance_name"]
2879   REQ_BGL = False
2880
2881   def ExpandNames(self):
2882     self._ExpandAndLockInstance()
2883
2884   def BuildHooksEnv(self):
2885     """Build hooks env.
2886
2887     This runs on master, primary and secondary nodes of the instance.
2888
2889     """
2890     env = _BuildInstanceHookEnvByObject(self, self.instance)
2891     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2892     return env, nl, nl
2893
2894   def CheckPrereq(self):
2895     """Check prerequisites.
2896
2897     This checks that the instance is in the cluster.
2898
2899     """
2900     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2901     assert self.instance is not None, \
2902       "Cannot retrieve locked instance %s" % self.op.instance_name
2903     _CheckNodeOnline(self, self.instance.primary_node)
2904
2905   def Exec(self, feedback_fn):
2906     """Shutdown the instance.
2907
2908     """
2909     instance = self.instance
2910     node_current = instance.primary_node
2911     self.cfg.MarkInstanceDown(instance.name)
2912     result = self.rpc.call_instance_shutdown(node_current, instance)
2913     msg = result.RemoteFailMsg()
2914     if msg:
2915       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2916
2917     _ShutdownInstanceDisks(self, instance)
2918
2919
2920 class LUReinstallInstance(LogicalUnit):
2921   """Reinstall an instance.
2922
2923   """
2924   HPATH = "instance-reinstall"
2925   HTYPE = constants.HTYPE_INSTANCE
2926   _OP_REQP = ["instance_name"]
2927   REQ_BGL = False
2928
2929   def ExpandNames(self):
2930     self._ExpandAndLockInstance()
2931
2932   def BuildHooksEnv(self):
2933     """Build hooks env.
2934
2935     This runs on master, primary and secondary nodes of the instance.
2936
2937     """
2938     env = _BuildInstanceHookEnvByObject(self, self.instance)
2939     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2940     return env, nl, nl
2941
2942   def CheckPrereq(self):
2943     """Check prerequisites.
2944
2945     This checks that the instance is in the cluster and is not running.
2946
2947     """
2948     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2949     assert instance is not None, \
2950       "Cannot retrieve locked instance %s" % self.op.instance_name
2951     _CheckNodeOnline(self, instance.primary_node)
2952
2953     if instance.disk_template == constants.DT_DISKLESS:
2954       raise errors.OpPrereqError("Instance '%s' has no disks" %
2955                                  self.op.instance_name)
2956     if instance.admin_up:
2957       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2958                                  self.op.instance_name)
2959     remote_info = self.rpc.call_instance_info(instance.primary_node,
2960                                               instance.name,
2961                                               instance.hypervisor)
2962     if remote_info.failed or remote_info.data:
2963       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2964                                  (self.op.instance_name,
2965                                   instance.primary_node))
2966
2967     self.op.os_type = getattr(self.op, "os_type", None)
2968     if self.op.os_type is not None:
2969       # OS verification
2970       pnode = self.cfg.GetNodeInfo(
2971         self.cfg.ExpandNodeName(instance.primary_node))
2972       if pnode is None:
2973         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2974                                    self.op.pnode)
2975       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2976       result.Raise()
2977       if not isinstance(result.data, objects.OS):
2978         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2979                                    " primary node"  % self.op.os_type)
2980
2981     self.instance = instance
2982
2983   def Exec(self, feedback_fn):
2984     """Reinstall the instance.
2985
2986     """
2987     inst = self.instance
2988
2989     if self.op.os_type is not None:
2990       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2991       inst.os = self.op.os_type
2992       self.cfg.Update(inst)
2993
2994     _StartInstanceDisks(self, inst, None)
2995     try:
2996       feedback_fn("Running the instance OS create scripts...")
2997       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2998       msg = result.RemoteFailMsg()
2999       if msg:
3000         raise errors.OpExecError("Could not install OS for instance %s"
3001                                  " on node %s: %s" %
3002                                  (inst.name, inst.primary_node, msg))
3003     finally:
3004       _ShutdownInstanceDisks(self, inst)
3005
3006
3007 class LURenameInstance(LogicalUnit):
3008   """Rename an instance.
3009
3010   """
3011   HPATH = "instance-rename"
3012   HTYPE = constants.HTYPE_INSTANCE
3013   _OP_REQP = ["instance_name", "new_name"]
3014
3015   def BuildHooksEnv(self):
3016     """Build hooks env.
3017
3018     This runs on master, primary and secondary nodes of the instance.
3019
3020     """
3021     env = _BuildInstanceHookEnvByObject(self, self.instance)
3022     env["INSTANCE_NEW_NAME"] = self.op.new_name
3023     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3024     return env, nl, nl
3025
3026   def CheckPrereq(self):
3027     """Check prerequisites.
3028
3029     This checks that the instance is in the cluster and is not running.
3030
3031     """
3032     instance = self.cfg.GetInstanceInfo(
3033       self.cfg.ExpandInstanceName(self.op.instance_name))
3034     if instance is None:
3035       raise errors.OpPrereqError("Instance '%s' not known" %
3036                                  self.op.instance_name)
3037     _CheckNodeOnline(self, instance.primary_node)
3038
3039     if instance.admin_up:
3040       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3041                                  self.op.instance_name)
3042     remote_info = self.rpc.call_instance_info(instance.primary_node,
3043                                               instance.name,
3044                                               instance.hypervisor)
3045     remote_info.Raise()
3046     if remote_info.data:
3047       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3048                                  (self.op.instance_name,
3049                                   instance.primary_node))
3050     self.instance = instance
3051
3052     # new name verification
3053     name_info = utils.HostInfo(self.op.new_name)
3054
3055     self.op.new_name = new_name = name_info.name
3056     instance_list = self.cfg.GetInstanceList()
3057     if new_name in instance_list:
3058       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3059                                  new_name)
3060
3061     if not getattr(self.op, "ignore_ip", False):
3062       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3063         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3064                                    (name_info.ip, new_name))
3065
3066
3067   def Exec(self, feedback_fn):
3068     """Reinstall the instance.
3069
3070     """
3071     inst = self.instance
3072     old_name = inst.name
3073
3074     if inst.disk_template == constants.DT_FILE:
3075       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3076
3077     self.cfg.RenameInstance(inst.name, self.op.new_name)
3078     # Change the instance lock. This is definitely safe while we hold the BGL
3079     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3080     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3081
3082     # re-read the instance from the configuration after rename
3083     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3084
3085     if inst.disk_template == constants.DT_FILE:
3086       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3087       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3088                                                      old_file_storage_dir,
3089                                                      new_file_storage_dir)
3090       result.Raise()
3091       if not result.data:
3092         raise errors.OpExecError("Could not connect to node '%s' to rename"
3093                                  " directory '%s' to '%s' (but the instance"
3094                                  " has been renamed in Ganeti)" % (
3095                                  inst.primary_node, old_file_storage_dir,
3096                                  new_file_storage_dir))
3097
3098       if not result.data[0]:
3099         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3100                                  " (but the instance has been renamed in"
3101                                  " Ganeti)" % (old_file_storage_dir,
3102                                                new_file_storage_dir))
3103
3104     _StartInstanceDisks(self, inst, None)
3105     try:
3106       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3107                                                  old_name)
3108       msg = result.RemoteFailMsg()
3109       if msg:
3110         msg = ("Could not run OS rename script for instance %s on node %s"
3111                " (but the instance has been renamed in Ganeti): %s" %
3112                (inst.name, inst.primary_node, msg))
3113         self.proc.LogWarning(msg)
3114     finally:
3115       _ShutdownInstanceDisks(self, inst)
3116
3117
3118 class LURemoveInstance(LogicalUnit):
3119   """Remove an instance.
3120
3121   """
3122   HPATH = "instance-remove"
3123   HTYPE = constants.HTYPE_INSTANCE
3124   _OP_REQP = ["instance_name", "ignore_failures"]
3125   REQ_BGL = False
3126
3127   def ExpandNames(self):
3128     self._ExpandAndLockInstance()
3129     self.needed_locks[locking.LEVEL_NODE] = []
3130     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3131
3132   def DeclareLocks(self, level):
3133     if level == locking.LEVEL_NODE:
3134       self._LockInstancesNodes()
3135
3136   def BuildHooksEnv(self):
3137     """Build hooks env.
3138
3139     This runs on master, primary and secondary nodes of the instance.
3140
3141     """
3142     env = _BuildInstanceHookEnvByObject(self, self.instance)
3143     nl = [self.cfg.GetMasterNode()]
3144     return env, nl, nl
3145
3146   def CheckPrereq(self):
3147     """Check prerequisites.
3148
3149     This checks that the instance is in the cluster.
3150
3151     """
3152     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3153     assert self.instance is not None, \
3154       "Cannot retrieve locked instance %s" % self.op.instance_name
3155
3156   def Exec(self, feedback_fn):
3157     """Remove the instance.
3158
3159     """
3160     instance = self.instance
3161     logging.info("Shutting down instance %s on node %s",
3162                  instance.name, instance.primary_node)
3163
3164     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3165     msg = result.RemoteFailMsg()
3166     if msg:
3167       if self.op.ignore_failures:
3168         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3169       else:
3170         raise errors.OpExecError("Could not shutdown instance %s on"
3171                                  " node %s: %s" %
3172                                  (instance.name, instance.primary_node, msg))
3173
3174     logging.info("Removing block devices for instance %s", instance.name)
3175
3176     if not _RemoveDisks(self, instance):
3177       if self.op.ignore_failures:
3178         feedback_fn("Warning: can't remove instance's disks")
3179       else:
3180         raise errors.OpExecError("Can't remove instance's disks")
3181
3182     logging.info("Removing instance %s out of cluster config", instance.name)
3183
3184     self.cfg.RemoveInstance(instance.name)
3185     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3186
3187
3188 class LUQueryInstances(NoHooksLU):
3189   """Logical unit for querying instances.
3190
3191   """
3192   _OP_REQP = ["output_fields", "names", "use_locking"]
3193   REQ_BGL = False
3194   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3195                                     "admin_state",
3196                                     "disk_template", "ip", "mac", "bridge",
3197                                     "sda_size", "sdb_size", "vcpus", "tags",
3198                                     "network_port", "beparams",
3199                                     r"(disk)\.(size)/([0-9]+)",
3200                                     r"(disk)\.(sizes)", "disk_usage",
3201                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3202                                     r"(nic)\.(macs|ips|bridges)",
3203                                     r"(disk|nic)\.(count)",
3204                                     "serial_no", "hypervisor", "hvparams",] +
3205                                   ["hv/%s" % name
3206                                    for name in constants.HVS_PARAMETERS] +
3207                                   ["be/%s" % name
3208                                    for name in constants.BES_PARAMETERS])
3209   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3210
3211
3212   def ExpandNames(self):
3213     _CheckOutputFields(static=self._FIELDS_STATIC,
3214                        dynamic=self._FIELDS_DYNAMIC,
3215                        selected=self.op.output_fields)
3216
3217     self.needed_locks = {}
3218     self.share_locks[locking.LEVEL_INSTANCE] = 1
3219     self.share_locks[locking.LEVEL_NODE] = 1
3220
3221     if self.op.names:
3222       self.wanted = _GetWantedInstances(self, self.op.names)
3223     else:
3224       self.wanted = locking.ALL_SET
3225
3226     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3227     self.do_locking = self.do_node_query and self.op.use_locking
3228     if self.do_locking:
3229       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3230       self.needed_locks[locking.LEVEL_NODE] = []
3231       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3232
3233   def DeclareLocks(self, level):
3234     if level == locking.LEVEL_NODE and self.do_locking:
3235       self._LockInstancesNodes()
3236
3237   def CheckPrereq(self):
3238     """Check prerequisites.
3239
3240     """
3241     pass
3242
3243   def Exec(self, feedback_fn):
3244     """Computes the list of nodes and their attributes.
3245
3246     """
3247     all_info = self.cfg.GetAllInstancesInfo()
3248     if self.wanted == locking.ALL_SET:
3249       # caller didn't specify instance names, so ordering is not important
3250       if self.do_locking:
3251         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3252       else:
3253         instance_names = all_info.keys()
3254       instance_names = utils.NiceSort(instance_names)
3255     else:
3256       # caller did specify names, so we must keep the ordering
3257       if self.do_locking:
3258         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3259       else:
3260         tgt_set = all_info.keys()
3261       missing = set(self.wanted).difference(tgt_set)
3262       if missing:
3263         raise errors.OpExecError("Some instances were removed before"
3264                                  " retrieving their data: %s" % missing)
3265       instance_names = self.wanted
3266
3267     instance_list = [all_info[iname] for iname in instance_names]
3268
3269     # begin data gathering
3270
3271     nodes = frozenset([inst.primary_node for inst in instance_list])
3272     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3273
3274     bad_nodes = []
3275     off_nodes = []
3276     if self.do_node_query:
3277       live_data = {}
3278       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3279       for name in nodes:
3280         result = node_data[name]
3281         if result.offline:
3282           # offline nodes will be in both lists
3283           off_nodes.append(name)
3284         if result.failed:
3285           bad_nodes.append(name)
3286         else:
3287           if result.data:
3288             live_data.update(result.data)
3289             # else no instance is alive
3290     else:
3291       live_data = dict([(name, {}) for name in instance_names])
3292
3293     # end data gathering
3294
3295     HVPREFIX = "hv/"
3296     BEPREFIX = "be/"
3297     output = []
3298     for instance in instance_list:
3299       iout = []
3300       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3301       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3302       for field in self.op.output_fields:
3303         st_match = self._FIELDS_STATIC.Matches(field)
3304         if field == "name":
3305           val = instance.name
3306         elif field == "os":
3307           val = instance.os
3308         elif field == "pnode":
3309           val = instance.primary_node
3310         elif field == "snodes":
3311           val = list(instance.secondary_nodes)
3312         elif field == "admin_state":
3313           val = instance.admin_up
3314         elif field == "oper_state":
3315           if instance.primary_node in bad_nodes:
3316             val = None
3317           else:
3318             val = bool(live_data.get(instance.name))
3319         elif field == "status":
3320           if instance.primary_node in off_nodes:
3321             val = "ERROR_nodeoffline"
3322           elif instance.primary_node in bad_nodes:
3323             val = "ERROR_nodedown"
3324           else:
3325             running = bool(live_data.get(instance.name))
3326             if running:
3327               if instance.admin_up:
3328                 val = "running"
3329               else:
3330                 val = "ERROR_up"
3331             else:
3332               if instance.admin_up:
3333                 val = "ERROR_down"
3334               else:
3335                 val = "ADMIN_down"
3336         elif field == "oper_ram":
3337           if instance.primary_node in bad_nodes:
3338             val = None
3339           elif instance.name in live_data:
3340             val = live_data[instance.name].get("memory", "?")
3341           else:
3342             val = "-"
3343         elif field == "disk_template":
3344           val = instance.disk_template
3345         elif field == "ip":
3346           val = instance.nics[0].ip
3347         elif field == "bridge":
3348           val = instance.nics[0].bridge
3349         elif field == "mac":
3350           val = instance.nics[0].mac
3351         elif field == "sda_size" or field == "sdb_size":
3352           idx = ord(field[2]) - ord('a')
3353           try:
3354             val = instance.FindDisk(idx).size
3355           except errors.OpPrereqError:
3356             val = None
3357         elif field == "disk_usage": # total disk usage per node
3358           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3359           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3360         elif field == "tags":
3361           val = list(instance.GetTags())
3362         elif field == "serial_no":
3363           val = instance.serial_no
3364         elif field == "network_port":
3365           val = instance.network_port
3366         elif field == "hypervisor":
3367           val = instance.hypervisor
3368         elif field == "hvparams":
3369           val = i_hv
3370         elif (field.startswith(HVPREFIX) and
3371               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3372           val = i_hv.get(field[len(HVPREFIX):], None)
3373         elif field == "beparams":
3374           val = i_be
3375         elif (field.startswith(BEPREFIX) and
3376               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3377           val = i_be.get(field[len(BEPREFIX):], None)
3378         elif st_match and st_match.groups():
3379           # matches a variable list
3380           st_groups = st_match.groups()
3381           if st_groups and st_groups[0] == "disk":
3382             if st_groups[1] == "count":
3383               val = len(instance.disks)
3384             elif st_groups[1] == "sizes":
3385               val = [disk.size for disk in instance.disks]
3386             elif st_groups[1] == "size":
3387               try:
3388                 val = instance.FindDisk(st_groups[2]).size
3389               except errors.OpPrereqError:
3390                 val = None
3391             else:
3392               assert False, "Unhandled disk parameter"
3393           elif st_groups[0] == "nic":
3394             if st_groups[1] == "count":
3395               val = len(instance.nics)
3396             elif st_groups[1] == "macs":
3397               val = [nic.mac for nic in instance.nics]
3398             elif st_groups[1] == "ips":
3399               val = [nic.ip for nic in instance.nics]
3400             elif st_groups[1] == "bridges":
3401               val = [nic.bridge for nic in instance.nics]
3402             else:
3403               # index-based item
3404               nic_idx = int(st_groups[2])
3405               if nic_idx >= len(instance.nics):
3406                 val = None
3407               else:
3408                 if st_groups[1] == "mac":
3409                   val = instance.nics[nic_idx].mac
3410                 elif st_groups[1] == "ip":
3411                   val = instance.nics[nic_idx].ip
3412                 elif st_groups[1] == "bridge":
3413                   val = instance.nics[nic_idx].bridge
3414                 else:
3415                   assert False, "Unhandled NIC parameter"
3416           else:
3417             assert False, "Unhandled variable parameter"
3418         else:
3419           raise errors.ParameterError(field)
3420         iout.append(val)
3421       output.append(iout)
3422
3423     return output
3424
3425
3426 class LUFailoverInstance(LogicalUnit):
3427   """Failover an instance.
3428
3429   """
3430   HPATH = "instance-failover"
3431   HTYPE = constants.HTYPE_INSTANCE
3432   _OP_REQP = ["instance_name", "ignore_consistency"]
3433   REQ_BGL = False
3434
3435   def ExpandNames(self):
3436     self._ExpandAndLockInstance()
3437     self.needed_locks[locking.LEVEL_NODE] = []
3438     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3439
3440   def DeclareLocks(self, level):
3441     if level == locking.LEVEL_NODE:
3442       self._LockInstancesNodes()
3443
3444   def BuildHooksEnv(self):
3445     """Build hooks env.
3446
3447     This runs on master, primary and secondary nodes of the instance.
3448
3449     """
3450     env = {
3451       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3452       }
3453     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3454     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3455     return env, nl, nl
3456
3457   def CheckPrereq(self):
3458     """Check prerequisites.
3459
3460     This checks that the instance is in the cluster.
3461
3462     """
3463     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3464     assert self.instance is not None, \
3465       "Cannot retrieve locked instance %s" % self.op.instance_name
3466
3467     bep = self.cfg.GetClusterInfo().FillBE(instance)
3468     if instance.disk_template not in constants.DTS_NET_MIRROR:
3469       raise errors.OpPrereqError("Instance's disk layout is not"
3470                                  " network mirrored, cannot failover.")
3471
3472     secondary_nodes = instance.secondary_nodes
3473     if not secondary_nodes:
3474       raise errors.ProgrammerError("no secondary node but using "
3475                                    "a mirrored disk template")
3476
3477     target_node = secondary_nodes[0]
3478     _CheckNodeOnline(self, target_node)
3479     _CheckNodeNotDrained(self, target_node)
3480     # check memory requirements on the secondary node
3481     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3482                          instance.name, bep[constants.BE_MEMORY],
3483                          instance.hypervisor)
3484
3485     # check bridge existance
3486     brlist = [nic.bridge for nic in instance.nics]
3487     result = self.rpc.call_bridges_exist(target_node, brlist)
3488     result.Raise()
3489     if not result.data:
3490       raise errors.OpPrereqError("One or more target bridges %s does not"
3491                                  " exist on destination node '%s'" %
3492                                  (brlist, target_node))
3493
3494   def Exec(self, feedback_fn):
3495     """Failover an instance.
3496
3497     The failover is done by shutting it down on its present node and
3498     starting it on the secondary.
3499
3500     """
3501     instance = self.instance
3502
3503     source_node = instance.primary_node
3504     target_node = instance.secondary_nodes[0]
3505
3506     feedback_fn("* checking disk consistency between source and target")
3507     for dev in instance.disks:
3508       # for drbd, these are drbd over lvm
3509       if not _CheckDiskConsistency(self, dev, target_node, False):
3510         if instance.admin_up and not self.op.ignore_consistency:
3511           raise errors.OpExecError("Disk %s is degraded on target node,"
3512                                    " aborting failover." % dev.iv_name)
3513
3514     feedback_fn("* shutting down instance on source node")
3515     logging.info("Shutting down instance %s on node %s",
3516                  instance.name, source_node)
3517
3518     result = self.rpc.call_instance_shutdown(source_node, instance)
3519     msg = result.RemoteFailMsg()
3520     if msg:
3521       if self.op.ignore_consistency:
3522         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3523                              " Proceeding anyway. Please make sure node"
3524                              " %s is down. Error details: %s",
3525                              instance.name, source_node, source_node, msg)
3526       else:
3527         raise errors.OpExecError("Could not shutdown instance %s on"
3528                                  " node %s: %s" %
3529                                  (instance.name, source_node, msg))
3530
3531     feedback_fn("* deactivating the instance's disks on source node")
3532     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3533       raise errors.OpExecError("Can't shut down the instance's disks.")
3534
3535     instance.primary_node = target_node
3536     # distribute new instance config to the other nodes
3537     self.cfg.Update(instance)
3538
3539     # Only start the instance if it's marked as up
3540     if instance.admin_up:
3541       feedback_fn("* activating the instance's disks on target node")
3542       logging.info("Starting instance %s on node %s",
3543                    instance.name, target_node)
3544
3545       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3546                                                ignore_secondaries=True)
3547       if not disks_ok:
3548         _ShutdownInstanceDisks(self, instance)
3549         raise errors.OpExecError("Can't activate the instance's disks")
3550
3551       feedback_fn("* starting the instance on the target node")
3552       result = self.rpc.call_instance_start(target_node, instance)
3553       msg = result.RemoteFailMsg()
3554       if msg:
3555         _ShutdownInstanceDisks(self, instance)
3556         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3557                                  (instance.name, target_node, msg))
3558
3559
3560 class LUMigrateInstance(LogicalUnit):
3561   """Migrate an instance.
3562
3563   This is migration without shutting down, compared to the failover,
3564   which is done with shutdown.
3565
3566   """
3567   HPATH = "instance-migrate"
3568   HTYPE = constants.HTYPE_INSTANCE
3569   _OP_REQP = ["instance_name", "live", "cleanup"]
3570
3571   REQ_BGL = False
3572
3573   def ExpandNames(self):
3574     self._ExpandAndLockInstance()
3575     self.needed_locks[locking.LEVEL_NODE] = []
3576     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3577
3578   def DeclareLocks(self, level):
3579     if level == locking.LEVEL_NODE:
3580       self._LockInstancesNodes()
3581
3582   def BuildHooksEnv(self):
3583     """Build hooks env.
3584
3585     This runs on master, primary and secondary nodes of the instance.
3586
3587     """
3588     env = _BuildInstanceHookEnvByObject(self, self.instance)
3589     env["MIGRATE_LIVE"] = self.op.live
3590     env["MIGRATE_CLEANUP"] = self.op.cleanup
3591     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3592     return env, nl, nl
3593
3594   def CheckPrereq(self):
3595     """Check prerequisites.
3596
3597     This checks that the instance is in the cluster.
3598
3599     """
3600     instance = self.cfg.GetInstanceInfo(
3601       self.cfg.ExpandInstanceName(self.op.instance_name))
3602     if instance is None:
3603       raise errors.OpPrereqError("Instance '%s' not known" %
3604                                  self.op.instance_name)
3605
3606     if instance.disk_template != constants.DT_DRBD8:
3607       raise errors.OpPrereqError("Instance's disk layout is not"
3608                                  " drbd8, cannot migrate.")
3609
3610     secondary_nodes = instance.secondary_nodes
3611     if not secondary_nodes:
3612       raise errors.ConfigurationError("No secondary node but using"
3613                                       " drbd8 disk template")
3614
3615     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3616
3617     target_node = secondary_nodes[0]
3618     # check memory requirements on the secondary node
3619     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3620                          instance.name, i_be[constants.BE_MEMORY],
3621                          instance.hypervisor)
3622
3623     # check bridge existance
3624     brlist = [nic.bridge for nic in instance.nics]
3625     result = self.rpc.call_bridges_exist(target_node, brlist)
3626     if result.failed or not result.data:
3627       raise errors.OpPrereqError("One or more target bridges %s does not"
3628                                  " exist on destination node '%s'" %
3629                                  (brlist, target_node))
3630
3631     if not self.op.cleanup:
3632       _CheckNodeNotDrained(self, target_node)
3633       result = self.rpc.call_instance_migratable(instance.primary_node,
3634                                                  instance)
3635       msg = result.RemoteFailMsg()
3636       if msg:
3637         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3638                                    msg)
3639
3640     self.instance = instance
3641
3642   def _WaitUntilSync(self):
3643     """Poll with custom rpc for disk sync.
3644
3645     This uses our own step-based rpc call.
3646
3647     """
3648     self.feedback_fn("* wait until resync is done")
3649     all_done = False
3650     while not all_done:
3651       all_done = True
3652       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3653                                             self.nodes_ip,
3654                                             self.instance.disks)
3655       min_percent = 100
3656       for node, nres in result.items():
3657         msg = nres.RemoteFailMsg()
3658         if msg:
3659           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3660                                    (node, msg))
3661         node_done, node_percent = nres.payload
3662         all_done = all_done and node_done
3663         if node_percent is not None:
3664           min_percent = min(min_percent, node_percent)
3665       if not all_done:
3666         if min_percent < 100:
3667           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3668         time.sleep(2)
3669
3670   def _EnsureSecondary(self, node):
3671     """Demote a node to secondary.
3672
3673     """
3674     self.feedback_fn("* switching node %s to secondary mode" % node)
3675
3676     for dev in self.instance.disks:
3677       self.cfg.SetDiskID(dev, node)
3678
3679     result = self.rpc.call_blockdev_close(node, self.instance.name,
3680                                           self.instance.disks)
3681     msg = result.RemoteFailMsg()
3682     if msg:
3683       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3684                                " error %s" % (node, msg))
3685
3686   def _GoStandalone(self):
3687     """Disconnect from the network.
3688
3689     """
3690     self.feedback_fn("* changing into standalone mode")
3691     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3692                                                self.instance.disks)
3693     for node, nres in result.items():
3694       msg = nres.RemoteFailMsg()
3695       if msg:
3696         raise errors.OpExecError("Cannot disconnect disks node %s,"
3697                                  " error %s" % (node, msg))
3698
3699   def _GoReconnect(self, multimaster):
3700     """Reconnect to the network.
3701
3702     """
3703     if multimaster:
3704       msg = "dual-master"
3705     else:
3706       msg = "single-master"
3707     self.feedback_fn("* changing disks into %s mode" % msg)
3708     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3709                                            self.instance.disks,
3710                                            self.instance.name, multimaster)
3711     for node, nres in result.items():
3712       msg = nres.RemoteFailMsg()
3713       if msg:
3714         raise errors.OpExecError("Cannot change disks config on node %s,"
3715                                  " error: %s" % (node, msg))
3716
3717   def _ExecCleanup(self):
3718     """Try to cleanup after a failed migration.
3719
3720     The cleanup is done by:
3721       - check that the instance is running only on one node
3722         (and update the config if needed)
3723       - change disks on its secondary node to secondary
3724       - wait until disks are fully synchronized
3725       - disconnect from the network
3726       - change disks into single-master mode
3727       - wait again until disks are fully synchronized
3728
3729     """
3730     instance = self.instance
3731     target_node = self.target_node
3732     source_node = self.source_node
3733
3734     # check running on only one node
3735     self.feedback_fn("* checking where the instance actually runs"
3736                      " (if this hangs, the hypervisor might be in"
3737                      " a bad state)")
3738     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3739     for node, result in ins_l.items():
3740       result.Raise()
3741       if not isinstance(result.data, list):
3742         raise errors.OpExecError("Can't contact node '%s'" % node)
3743
3744     runningon_source = instance.name in ins_l[source_node].data
3745     runningon_target = instance.name in ins_l[target_node].data
3746
3747     if runningon_source and runningon_target:
3748       raise errors.OpExecError("Instance seems to be running on two nodes,"
3749                                " or the hypervisor is confused. You will have"
3750                                " to ensure manually that it runs only on one"
3751                                " and restart this operation.")
3752
3753     if not (runningon_source or runningon_target):
3754       raise errors.OpExecError("Instance does not seem to be running at all."
3755                                " In this case, it's safer to repair by"
3756                                " running 'gnt-instance stop' to ensure disk"
3757                                " shutdown, and then restarting it.")
3758
3759     if runningon_target:
3760       # the migration has actually succeeded, we need to update the config
3761       self.feedback_fn("* instance running on secondary node (%s),"
3762                        " updating config" % target_node)
3763       instance.primary_node = target_node
3764       self.cfg.Update(instance)
3765       demoted_node = source_node
3766     else:
3767       self.feedback_fn("* instance confirmed to be running on its"
3768                        " primary node (%s)" % source_node)
3769       demoted_node = target_node
3770
3771     self._EnsureSecondary(demoted_node)
3772     try:
3773       self._WaitUntilSync()
3774     except errors.OpExecError:
3775       # we ignore here errors, since if the device is standalone, it
3776       # won't be able to sync
3777       pass
3778     self._GoStandalone()
3779     self._GoReconnect(False)
3780     self._WaitUntilSync()
3781
3782     self.feedback_fn("* done")
3783
3784   def _RevertDiskStatus(self):
3785     """Try to revert the disk status after a failed migration.
3786
3787     """
3788     target_node = self.target_node
3789     try:
3790       self._EnsureSecondary(target_node)
3791       self._GoStandalone()
3792       self._GoReconnect(False)
3793       self._WaitUntilSync()
3794     except errors.OpExecError, err:
3795       self.LogWarning("Migration failed and I can't reconnect the"
3796                       " drives: error '%s'\n"
3797                       "Please look and recover the instance status" %
3798                       str(err))
3799
3800   def _AbortMigration(self):
3801     """Call the hypervisor code to abort a started migration.
3802
3803     """
3804     instance = self.instance
3805     target_node = self.target_node
3806     migration_info = self.migration_info
3807
3808     abort_result = self.rpc.call_finalize_migration(target_node,
3809                                                     instance,
3810                                                     migration_info,
3811                                                     False)
3812     abort_msg = abort_result.RemoteFailMsg()
3813     if abort_msg:
3814       logging.error("Aborting migration failed on target node %s: %s" %
3815                     (target_node, abort_msg))
3816       # Don't raise an exception here, as we stil have to try to revert the
3817       # disk status, even if this step failed.
3818
3819   def _ExecMigration(self):
3820     """Migrate an instance.
3821
3822     The migrate is done by:
3823       - change the disks into dual-master mode
3824       - wait until disks are fully synchronized again
3825       - migrate the instance
3826       - change disks on the new secondary node (the old primary) to secondary
3827       - wait until disks are fully synchronized
3828       - change disks into single-master mode
3829
3830     """
3831     instance = self.instance
3832     target_node = self.target_node
3833     source_node = self.source_node
3834
3835     self.feedback_fn("* checking disk consistency between source and target")
3836     for dev in instance.disks:
3837       if not _CheckDiskConsistency(self, dev, target_node, False):
3838         raise errors.OpExecError("Disk %s is degraded or not fully"
3839                                  " synchronized on target node,"
3840                                  " aborting migrate." % dev.iv_name)
3841
3842     # First get the migration information from the remote node
3843     result = self.rpc.call_migration_info(source_node, instance)
3844     msg = result.RemoteFailMsg()
3845     if msg:
3846       log_err = ("Failed fetching source migration information from %s: %s" %
3847                  (source_node, msg))
3848       logging.error(log_err)
3849       raise errors.OpExecError(log_err)
3850
3851     self.migration_info = migration_info = result.payload
3852
3853     # Then switch the disks to master/master mode
3854     self._EnsureSecondary(target_node)
3855     self._GoStandalone()
3856     self._GoReconnect(True)
3857     self._WaitUntilSync()
3858
3859     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3860     result = self.rpc.call_accept_instance(target_node,
3861                                            instance,
3862                                            migration_info,
3863                                            self.nodes_ip[target_node])
3864
3865     msg = result.RemoteFailMsg()
3866     if msg:
3867       logging.error("Instance pre-migration failed, trying to revert"
3868                     " disk status: %s", msg)
3869       self._AbortMigration()
3870       self._RevertDiskStatus()
3871       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3872                                (instance.name, msg))
3873
3874     self.feedback_fn("* migrating instance to %s" % target_node)
3875     time.sleep(10)
3876     result = self.rpc.call_instance_migrate(source_node, instance,
3877                                             self.nodes_ip[target_node],
3878                                             self.op.live)
3879     msg = result.RemoteFailMsg()
3880     if msg:
3881       logging.error("Instance migration failed, trying to revert"
3882                     " disk status: %s", msg)
3883       self._AbortMigration()
3884       self._RevertDiskStatus()
3885       raise errors.OpExecError("Could not migrate instance %s: %s" %
3886                                (instance.name, msg))
3887     time.sleep(10)
3888
3889     instance.primary_node = target_node
3890     # distribute new instance config to the other nodes
3891     self.cfg.Update(instance)
3892
3893     result = self.rpc.call_finalize_migration(target_node,
3894                                               instance,
3895                                               migration_info,
3896                                               True)
3897     msg = result.RemoteFailMsg()
3898     if msg:
3899       logging.error("Instance migration succeeded, but finalization failed:"
3900                     " %s" % msg)
3901       raise errors.OpExecError("Could not finalize instance migration: %s" %
3902                                msg)
3903
3904     self._EnsureSecondary(source_node)
3905     self._WaitUntilSync()
3906     self._GoStandalone()
3907     self._GoReconnect(False)
3908     self._WaitUntilSync()
3909
3910     self.feedback_fn("* done")
3911
3912   def Exec(self, feedback_fn):
3913     """Perform the migration.
3914
3915     """
3916     self.feedback_fn = feedback_fn
3917
3918     self.source_node = self.instance.primary_node
3919     self.target_node = self.instance.secondary_nodes[0]
3920     self.all_nodes = [self.source_node, self.target_node]
3921     self.nodes_ip = {
3922       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3923       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3924       }
3925     if self.op.cleanup:
3926       return self._ExecCleanup()
3927     else:
3928       return self._ExecMigration()
3929
3930
3931 def _CreateBlockDev(lu, node, instance, device, force_create,
3932                     info, force_open):
3933   """Create a tree of block devices on a given node.
3934
3935   If this device type has to be created on secondaries, create it and
3936   all its children.
3937
3938   If not, just recurse to children keeping the same 'force' value.
3939
3940   @param lu: the lu on whose behalf we execute
3941   @param node: the node on which to create the device
3942   @type instance: L{objects.Instance}
3943   @param instance: the instance which owns the device
3944   @type device: L{objects.Disk}
3945   @param device: the device to create
3946   @type force_create: boolean
3947   @param force_create: whether to force creation of this device; this
3948       will be change to True whenever we find a device which has
3949       CreateOnSecondary() attribute
3950   @param info: the extra 'metadata' we should attach to the device
3951       (this will be represented as a LVM tag)
3952   @type force_open: boolean
3953   @param force_open: this parameter will be passes to the
3954       L{backend.BlockdevCreate} function where it specifies
3955       whether we run on primary or not, and it affects both
3956       the child assembly and the device own Open() execution
3957
3958   """
3959   if device.CreateOnSecondary():
3960     force_create = True
3961
3962   if device.children:
3963     for child in device.children:
3964       _CreateBlockDev(lu, node, instance, child, force_create,
3965                       info, force_open)
3966
3967   if not force_create:
3968     return
3969
3970   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3971
3972
3973 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3974   """Create a single block device on a given node.
3975
3976   This will not recurse over children of the device, so they must be
3977   created in advance.
3978
3979   @param lu: the lu on whose behalf we execute
3980   @param node: the node on which to create the device
3981   @type instance: L{objects.Instance}
3982   @param instance: the instance which owns the device
3983   @type device: L{objects.Disk}
3984   @param device: the device to create
3985   @param info: the extra 'metadata' we should attach to the device
3986       (this will be represented as a LVM tag)
3987   @type force_open: boolean
3988   @param force_open: this parameter will be passes to the
3989       L{backend.BlockdevCreate} function where it specifies
3990       whether we run on primary or not, and it affects both
3991       the child assembly and the device own Open() execution
3992
3993   """
3994   lu.cfg.SetDiskID(device, node)
3995   result = lu.rpc.call_blockdev_create(node, device, device.size,
3996                                        instance.name, force_open, info)
3997   msg = result.RemoteFailMsg()
3998   if msg:
3999     raise errors.OpExecError("Can't create block device %s on"
4000                              " node %s for instance %s: %s" %
4001                              (device, node, instance.name, msg))
4002   if device.physical_id is None:
4003     device.physical_id = result.payload
4004
4005
4006 def _GenerateUniqueNames(lu, exts):
4007   """Generate a suitable LV name.
4008
4009   This will generate a logical volume name for the given instance.
4010
4011   """
4012   results = []
4013   for val in exts:
4014     new_id = lu.cfg.GenerateUniqueID()
4015     results.append("%s%s" % (new_id, val))
4016   return results
4017
4018
4019 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4020                          p_minor, s_minor):
4021   """Generate a drbd8 device complete with its children.
4022
4023   """
4024   port = lu.cfg.AllocatePort()
4025   vgname = lu.cfg.GetVGName()
4026   shared_secret = lu.cfg.GenerateDRBDSecret()
4027   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4028                           logical_id=(vgname, names[0]))
4029   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4030                           logical_id=(vgname, names[1]))
4031   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4032                           logical_id=(primary, secondary, port,
4033                                       p_minor, s_minor,
4034                                       shared_secret),
4035                           children=[dev_data, dev_meta],
4036                           iv_name=iv_name)
4037   return drbd_dev
4038
4039
4040 def _GenerateDiskTemplate(lu, template_name,
4041                           instance_name, primary_node,
4042                           secondary_nodes, disk_info,
4043                           file_storage_dir, file_driver,
4044                           base_index):
4045   """Generate the entire disk layout for a given template type.
4046
4047   """
4048   #TODO: compute space requirements
4049
4050   vgname = lu.cfg.GetVGName()
4051   disk_count = len(disk_info)
4052   disks = []
4053   if template_name == constants.DT_DISKLESS:
4054     pass
4055   elif template_name == constants.DT_PLAIN:
4056     if len(secondary_nodes) != 0:
4057       raise errors.ProgrammerError("Wrong template configuration")
4058
4059     names = _GenerateUniqueNames(lu, [".disk%d" % i
4060                                       for i in range(disk_count)])
4061     for idx, disk in enumerate(disk_info):
4062       disk_index = idx + base_index
4063       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4064                               logical_id=(vgname, names[idx]),
4065                               iv_name="disk/%d" % disk_index,
4066                               mode=disk["mode"])
4067       disks.append(disk_dev)
4068   elif template_name == constants.DT_DRBD8:
4069     if len(secondary_nodes) != 1:
4070       raise errors.ProgrammerError("Wrong template configuration")
4071     remote_node = secondary_nodes[0]
4072     minors = lu.cfg.AllocateDRBDMinor(
4073       [primary_node, remote_node] * len(disk_info), instance_name)
4074
4075     names = []
4076     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4077                                                for i in range(disk_count)]):
4078       names.append(lv_prefix + "_data")
4079       names.append(lv_prefix + "_meta")
4080     for idx, disk in enumerate(disk_info):
4081       disk_index = idx + base_index
4082       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4083                                       disk["size"], names[idx*2:idx*2+2],
4084                                       "disk/%d" % disk_index,
4085                                       minors[idx*2], minors[idx*2+1])
4086       disk_dev.mode = disk["mode"]
4087       disks.append(disk_dev)
4088   elif template_name == constants.DT_FILE:
4089     if len(secondary_nodes) != 0:
4090       raise errors.ProgrammerError("Wrong template configuration")
4091
4092     for idx, disk in enumerate(disk_info):
4093       disk_index = idx + base_index
4094       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4095                               iv_name="disk/%d" % disk_index,
4096                               logical_id=(file_driver,
4097                                           "%s/disk%d" % (file_storage_dir,
4098                                                          disk_index)),
4099                               mode=disk["mode"])
4100       disks.append(disk_dev)
4101   else:
4102     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4103   return disks
4104
4105
4106 def _GetInstanceInfoText(instance):
4107   """Compute that text that should be added to the disk's metadata.
4108
4109   """
4110   return "originstname+%s" % instance.name
4111
4112
4113 def _CreateDisks(lu, instance):
4114   """Create all disks for an instance.
4115
4116   This abstracts away some work from AddInstance.
4117
4118   @type lu: L{LogicalUnit}
4119   @param lu: the logical unit on whose behalf we execute
4120   @type instance: L{objects.Instance}
4121   @param instance: the instance whose disks we should create
4122   @rtype: boolean
4123   @return: the success of the creation
4124
4125   """
4126   info = _GetInstanceInfoText(instance)
4127   pnode = instance.primary_node
4128
4129   if instance.disk_template == constants.DT_FILE:
4130     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4131     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4132
4133     if result.failed or not result.data:
4134       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4135
4136     if not result.data[0]:
4137       raise errors.OpExecError("Failed to create directory '%s'" %
4138                                file_storage_dir)
4139
4140   # Note: this needs to be kept in sync with adding of disks in
4141   # LUSetInstanceParams
4142   for device in instance.disks:
4143     logging.info("Creating volume %s for instance %s",
4144                  device.iv_name, instance.name)
4145     #HARDCODE
4146     for node in instance.all_nodes:
4147       f_create = node == pnode
4148       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4149
4150
4151 def _RemoveDisks(lu, instance):
4152   """Remove all disks for an instance.
4153
4154   This abstracts away some work from `AddInstance()` and
4155   `RemoveInstance()`. Note that in case some of the devices couldn't
4156   be removed, the removal will continue with the other ones (compare
4157   with `_CreateDisks()`).
4158
4159   @type lu: L{LogicalUnit}
4160   @param lu: the logical unit on whose behalf we execute
4161   @type instance: L{objects.Instance}
4162   @param instance: the instance whose disks we should remove
4163   @rtype: boolean
4164   @return: the success of the removal
4165
4166   """
4167   logging.info("Removing block devices for instance %s", instance.name)
4168
4169   all_result = True
4170   for device in instance.disks:
4171     for node, disk in device.ComputeNodeTree(instance.primary_node):
4172       lu.cfg.SetDiskID(disk, node)
4173       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4174       if msg:
4175         lu.LogWarning("Could not remove block device %s on node %s,"
4176                       " continuing anyway: %s", device.iv_name, node, msg)
4177         all_result = False
4178
4179   if instance.disk_template == constants.DT_FILE:
4180     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4181     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4182                                                  file_storage_dir)
4183     if result.failed or not result.data:
4184       logging.error("Could not remove directory '%s'", file_storage_dir)
4185       all_result = False
4186
4187   return all_result
4188
4189
4190 def _ComputeDiskSize(disk_template, disks):
4191   """Compute disk size requirements in the volume group
4192
4193   """
4194   # Required free disk space as a function of disk and swap space
4195   req_size_dict = {
4196     constants.DT_DISKLESS: None,
4197     constants.DT_PLAIN: sum(d["size"] for d in disks),
4198     # 128 MB are added for drbd metadata for each disk
4199     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4200     constants.DT_FILE: None,
4201   }
4202
4203   if disk_template not in req_size_dict:
4204     raise errors.ProgrammerError("Disk template '%s' size requirement"
4205                                  " is unknown" %  disk_template)
4206
4207   return req_size_dict[disk_template]
4208
4209
4210 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4211   """Hypervisor parameter validation.
4212
4213   This function abstract the hypervisor parameter validation to be
4214   used in both instance create and instance modify.
4215
4216   @type lu: L{LogicalUnit}
4217   @param lu: the logical unit for which we check
4218   @type nodenames: list
4219   @param nodenames: the list of nodes on which we should check
4220   @type hvname: string
4221   @param hvname: the name of the hypervisor we should use
4222   @type hvparams: dict
4223   @param hvparams: the parameters which we need to check
4224   @raise errors.OpPrereqError: if the parameters are not valid
4225
4226   """
4227   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4228                                                   hvname,
4229                                                   hvparams)
4230   for node in nodenames:
4231     info = hvinfo[node]
4232     if info.offline:
4233       continue
4234     msg = info.RemoteFailMsg()
4235     if msg:
4236       raise errors.OpPrereqError("Hypervisor parameter validation"
4237                                  " failed on node %s: %s" % (node, msg))
4238
4239
4240 class LUCreateInstance(LogicalUnit):
4241   """Create an instance.
4242
4243   """
4244   HPATH = "instance-add"
4245   HTYPE = constants.HTYPE_INSTANCE
4246   _OP_REQP = ["instance_name", "disks", "disk_template",
4247               "mode", "start",
4248               "wait_for_sync", "ip_check", "nics",
4249               "hvparams", "beparams"]
4250   REQ_BGL = False
4251
4252   def _ExpandNode(self, node):
4253     """Expands and checks one node name.
4254
4255     """
4256     node_full = self.cfg.ExpandNodeName(node)
4257     if node_full is None:
4258       raise errors.OpPrereqError("Unknown node %s" % node)
4259     return node_full
4260
4261   def ExpandNames(self):
4262     """ExpandNames for CreateInstance.
4263
4264     Figure out the right locks for instance creation.
4265
4266     """
4267     self.needed_locks = {}
4268
4269     # set optional parameters to none if they don't exist
4270     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4271       if not hasattr(self.op, attr):
4272         setattr(self.op, attr, None)
4273
4274     # cheap checks, mostly valid constants given
4275
4276     # verify creation mode
4277     if self.op.mode not in (constants.INSTANCE_CREATE,
4278                             constants.INSTANCE_IMPORT):
4279       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4280                                  self.op.mode)
4281
4282     # disk template and mirror node verification
4283     if self.op.disk_template not in constants.DISK_TEMPLATES:
4284       raise errors.OpPrereqError("Invalid disk template name")
4285
4286     if self.op.hypervisor is None:
4287       self.op.hypervisor = self.cfg.GetHypervisorType()
4288
4289     cluster = self.cfg.GetClusterInfo()
4290     enabled_hvs = cluster.enabled_hypervisors
4291     if self.op.hypervisor not in enabled_hvs:
4292       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4293                                  " cluster (%s)" % (self.op.hypervisor,
4294                                   ",".join(enabled_hvs)))
4295
4296     # check hypervisor parameter syntax (locally)
4297     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4298     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4299                                   self.op.hvparams)
4300     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4301     hv_type.CheckParameterSyntax(filled_hvp)
4302
4303     # fill and remember the beparams dict
4304     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4305     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4306                                     self.op.beparams)
4307
4308     #### instance parameters check
4309
4310     # instance name verification
4311     hostname1 = utils.HostInfo(self.op.instance_name)
4312     self.op.instance_name = instance_name = hostname1.name
4313
4314     # this is just a preventive check, but someone might still add this
4315     # instance in the meantime, and creation will fail at lock-add time
4316     if instance_name in self.cfg.GetInstanceList():
4317       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4318                                  instance_name)
4319
4320     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4321
4322     # NIC buildup
4323     self.nics = []
4324     for nic in self.op.nics:
4325       # ip validity checks
4326       ip = nic.get("ip", None)
4327       if ip is None or ip.lower() == "none":
4328         nic_ip = None
4329       elif ip.lower() == constants.VALUE_AUTO:
4330         nic_ip = hostname1.ip
4331       else:
4332         if not utils.IsValidIP(ip):
4333           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4334                                      " like a valid IP" % ip)
4335         nic_ip = ip
4336
4337       # MAC address verification
4338       mac = nic.get("mac", constants.VALUE_AUTO)
4339       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4340         if not utils.IsValidMac(mac.lower()):
4341           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4342                                      mac)
4343       # bridge verification
4344       bridge = nic.get("bridge", None)
4345       if bridge is None:
4346         bridge = self.cfg.GetDefBridge()
4347       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4348
4349     # disk checks/pre-build
4350     self.disks = []
4351     for disk in self.op.disks:
4352       mode = disk.get("mode", constants.DISK_RDWR)
4353       if mode not in constants.DISK_ACCESS_SET:
4354         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4355                                    mode)
4356       size = disk.get("size", None)
4357       if size is None:
4358         raise errors.OpPrereqError("Missing disk size")
4359       try:
4360         size = int(size)
4361       except ValueError:
4362         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4363       self.disks.append({"size": size, "mode": mode})
4364
4365     # used in CheckPrereq for ip ping check
4366     self.check_ip = hostname1.ip
4367
4368     # file storage checks
4369     if (self.op.file_driver and
4370         not self.op.file_driver in constants.FILE_DRIVER):
4371       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4372                                  self.op.file_driver)
4373
4374     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4375       raise errors.OpPrereqError("File storage directory path not absolute")
4376
4377     ### Node/iallocator related checks
4378     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4379       raise errors.OpPrereqError("One and only one of iallocator and primary"
4380                                  " node must be given")
4381
4382     if self.op.iallocator:
4383       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4384     else:
4385       self.op.pnode = self._ExpandNode(self.op.pnode)
4386       nodelist = [self.op.pnode]
4387       if self.op.snode is not None:
4388         self.op.snode = self._ExpandNode(self.op.snode)
4389         nodelist.append(self.op.snode)
4390       self.needed_locks[locking.LEVEL_NODE] = nodelist
4391
4392     # in case of import lock the source node too
4393     if self.op.mode == constants.INSTANCE_IMPORT:
4394       src_node = getattr(self.op, "src_node", None)
4395       src_path = getattr(self.op, "src_path", None)
4396
4397       if src_path is None:
4398         self.op.src_path = src_path = self.op.instance_name
4399
4400       if src_node is None:
4401         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4402         self.op.src_node = None
4403         if os.path.isabs(src_path):
4404           raise errors.OpPrereqError("Importing an instance from an absolute"
4405                                      " path requires a source node option.")
4406       else:
4407         self.op.src_node = src_node = self._ExpandNode(src_node)
4408         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4409           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4410         if not os.path.isabs(src_path):
4411           self.op.src_path = src_path = \
4412             os.path.join(constants.EXPORT_DIR, src_path)
4413
4414     else: # INSTANCE_CREATE
4415       if getattr(self.op, "os_type", None) is None:
4416         raise errors.OpPrereqError("No guest OS specified")
4417
4418   def _RunAllocator(self):
4419     """Run the allocator based on input opcode.
4420
4421     """
4422     nics = [n.ToDict() for n in self.nics]
4423     ial = IAllocator(self,
4424                      mode=constants.IALLOCATOR_MODE_ALLOC,
4425                      name=self.op.instance_name,
4426                      disk_template=self.op.disk_template,
4427                      tags=[],
4428                      os=self.op.os_type,
4429                      vcpus=self.be_full[constants.BE_VCPUS],
4430                      mem_size=self.be_full[constants.BE_MEMORY],
4431                      disks=self.disks,
4432                      nics=nics,
4433                      hypervisor=self.op.hypervisor,
4434                      )
4435
4436     ial.Run(self.op.iallocator)
4437
4438     if not ial.success:
4439       raise errors.OpPrereqError("Can't compute nodes using"
4440                                  " iallocator '%s': %s" % (self.op.iallocator,
4441                                                            ial.info))
4442     if len(ial.nodes) != ial.required_nodes:
4443       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4444                                  " of nodes (%s), required %s" %
4445                                  (self.op.iallocator, len(ial.nodes),
4446                                   ial.required_nodes))
4447     self.op.pnode = ial.nodes[0]
4448     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4449                  self.op.instance_name, self.op.iallocator,
4450                  ", ".join(ial.nodes))
4451     if ial.required_nodes == 2:
4452       self.op.snode = ial.nodes[1]
4453
4454   def BuildHooksEnv(self):
4455     """Build hooks env.
4456
4457     This runs on master, primary and secondary nodes of the instance.
4458
4459     """
4460     env = {
4461       "ADD_MODE": self.op.mode,
4462       }
4463     if self.op.mode == constants.INSTANCE_IMPORT:
4464       env["SRC_NODE"] = self.op.src_node
4465       env["SRC_PATH"] = self.op.src_path
4466       env["SRC_IMAGES"] = self.src_images
4467
4468     env.update(_BuildInstanceHookEnv(
4469       name=self.op.instance_name,
4470       primary_node=self.op.pnode,
4471       secondary_nodes=self.secondaries,
4472       status=self.op.start,
4473       os_type=self.op.os_type,
4474       memory=self.be_full[constants.BE_MEMORY],
4475       vcpus=self.be_full[constants.BE_VCPUS],
4476       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4477       disk_template=self.op.disk_template,
4478       disks=[(d["size"], d["mode"]) for d in self.disks],
4479     ))
4480
4481     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4482           self.secondaries)
4483     return env, nl, nl
4484
4485
4486   def CheckPrereq(self):
4487     """Check prerequisites.
4488
4489     """
4490     if (not self.cfg.GetVGName() and
4491         self.op.disk_template not in constants.DTS_NOT_LVM):
4492       raise errors.OpPrereqError("Cluster does not support lvm-based"
4493                                  " instances")
4494
4495     if self.op.mode == constants.INSTANCE_IMPORT:
4496       src_node = self.op.src_node
4497       src_path = self.op.src_path
4498
4499       if src_node is None:
4500         exp_list = self.rpc.call_export_list(
4501           self.acquired_locks[locking.LEVEL_NODE])
4502         found = False
4503         for node in exp_list:
4504           if not exp_list[node].failed and src_path in exp_list[node].data:
4505             found = True
4506             self.op.src_node = src_node = node
4507             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4508                                                        src_path)
4509             break
4510         if not found:
4511           raise errors.OpPrereqError("No export found for relative path %s" %
4512                                       src_path)
4513
4514       _CheckNodeOnline(self, src_node)
4515       result = self.rpc.call_export_info(src_node, src_path)
4516       result.Raise()
4517       if not result.data:
4518         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4519
4520       export_info = result.data
4521       if not export_info.has_section(constants.INISECT_EXP):
4522         raise errors.ProgrammerError("Corrupted export config")
4523
4524       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4525       if (int(ei_version) != constants.EXPORT_VERSION):
4526         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4527                                    (ei_version, constants.EXPORT_VERSION))
4528
4529       # Check that the new instance doesn't have less disks than the export
4530       instance_disks = len(self.disks)
4531       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4532       if instance_disks < export_disks:
4533         raise errors.OpPrereqError("Not enough disks to import."
4534                                    " (instance: %d, export: %d)" %
4535                                    (instance_disks, export_disks))
4536
4537       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4538       disk_images = []
4539       for idx in range(export_disks):
4540         option = 'disk%d_dump' % idx
4541         if export_info.has_option(constants.INISECT_INS, option):
4542           # FIXME: are the old os-es, disk sizes, etc. useful?
4543           export_name = export_info.get(constants.INISECT_INS, option)
4544           image = os.path.join(src_path, export_name)
4545           disk_images.append(image)
4546         else:
4547           disk_images.append(False)
4548
4549       self.src_images = disk_images
4550
4551       old_name = export_info.get(constants.INISECT_INS, 'name')
4552       # FIXME: int() here could throw a ValueError on broken exports
4553       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4554       if self.op.instance_name == old_name:
4555         for idx, nic in enumerate(self.nics):
4556           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4557             nic_mac_ini = 'nic%d_mac' % idx
4558             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4559
4560     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4561     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4562     if self.op.start and not self.op.ip_check:
4563       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4564                                  " adding an instance in start mode")
4565
4566     if self.op.ip_check:
4567       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4568         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4569                                    (self.check_ip, self.op.instance_name))
4570
4571     #### mac address generation
4572     # By generating here the mac address both the allocator and the hooks get
4573     # the real final mac address rather than the 'auto' or 'generate' value.
4574     # There is a race condition between the generation and the instance object
4575     # creation, which means that we know the mac is valid now, but we're not
4576     # sure it will be when we actually add the instance. If things go bad
4577     # adding the instance will abort because of a duplicate mac, and the
4578     # creation job will fail.
4579     for nic in self.nics:
4580       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4581         nic.mac = self.cfg.GenerateMAC()
4582
4583     #### allocator run
4584
4585     if self.op.iallocator is not None:
4586       self._RunAllocator()
4587
4588     #### node related checks
4589
4590     # check primary node
4591     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4592     assert self.pnode is not None, \
4593       "Cannot retrieve locked node %s" % self.op.pnode
4594     if pnode.offline:
4595       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4596                                  pnode.name)
4597     if pnode.drained:
4598       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4599                                  pnode.name)
4600
4601     self.secondaries = []
4602
4603     # mirror node verification
4604     if self.op.disk_template in constants.DTS_NET_MIRROR:
4605       if self.op.snode is None:
4606         raise errors.OpPrereqError("The networked disk templates need"
4607                                    " a mirror node")
4608       if self.op.snode == pnode.name:
4609         raise errors.OpPrereqError("The secondary node cannot be"
4610                                    " the primary node.")
4611       _CheckNodeOnline(self, self.op.snode)
4612       _CheckNodeNotDrained(self, self.op.snode)
4613       self.secondaries.append(self.op.snode)
4614
4615     nodenames = [pnode.name] + self.secondaries
4616
4617     req_size = _ComputeDiskSize(self.op.disk_template,
4618                                 self.disks)
4619
4620     # Check lv size requirements
4621     if req_size is not None:
4622       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4623                                          self.op.hypervisor)
4624       for node in nodenames:
4625         info = nodeinfo[node]
4626         info.Raise()
4627         info = info.data
4628         if not info:
4629           raise errors.OpPrereqError("Cannot get current information"
4630                                      " from node '%s'" % node)
4631         vg_free = info.get('vg_free', None)
4632         if not isinstance(vg_free, int):
4633           raise errors.OpPrereqError("Can't compute free disk space on"
4634                                      " node %s" % node)
4635         if req_size > info['vg_free']:
4636           raise errors.OpPrereqError("Not enough disk space on target node %s."
4637                                      " %d MB available, %d MB required" %
4638                                      (node, info['vg_free'], req_size))
4639
4640     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4641
4642     # os verification
4643     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4644     result.Raise()
4645     if not isinstance(result.data, objects.OS):
4646       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4647                                  " primary node"  % self.op.os_type)
4648
4649     # bridge check on primary node
4650     bridges = [n.bridge for n in self.nics]
4651     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4652     result.Raise()
4653     if not result.data:
4654       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4655                                  " exist on destination node '%s'" %
4656                                  (",".join(bridges), pnode.name))
4657
4658     # memory check on primary node
4659     if self.op.start:
4660       _CheckNodeFreeMemory(self, self.pnode.name,
4661                            "creating instance %s" % self.op.instance_name,
4662                            self.be_full[constants.BE_MEMORY],
4663                            self.op.hypervisor)
4664
4665   def Exec(self, feedback_fn):
4666     """Create and add the instance to the cluster.
4667
4668     """
4669     instance = self.op.instance_name
4670     pnode_name = self.pnode.name
4671
4672     ht_kind = self.op.hypervisor
4673     if ht_kind in constants.HTS_REQ_PORT:
4674       network_port = self.cfg.AllocatePort()
4675     else:
4676       network_port = None
4677
4678     ##if self.op.vnc_bind_address is None:
4679     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4680
4681     # this is needed because os.path.join does not accept None arguments
4682     if self.op.file_storage_dir is None:
4683       string_file_storage_dir = ""
4684     else:
4685       string_file_storage_dir = self.op.file_storage_dir
4686
4687     # build the full file storage dir path
4688     file_storage_dir = os.path.normpath(os.path.join(
4689                                         self.cfg.GetFileStorageDir(),
4690                                         string_file_storage_dir, instance))
4691
4692
4693     disks = _GenerateDiskTemplate(self,
4694                                   self.op.disk_template,
4695                                   instance, pnode_name,
4696                                   self.secondaries,
4697                                   self.disks,
4698                                   file_storage_dir,
4699                                   self.op.file_driver,
4700                                   0)
4701
4702     iobj = objects.Instance(name=instance, os=self.op.os_type,
4703                             primary_node=pnode_name,
4704                             nics=self.nics, disks=disks,
4705                             disk_template=self.op.disk_template,
4706                             admin_up=False,
4707                             network_port=network_port,
4708                             beparams=self.op.beparams,
4709                             hvparams=self.op.hvparams,
4710                             hypervisor=self.op.hypervisor,
4711                             )
4712
4713     feedback_fn("* creating instance disks...")
4714     try:
4715       _CreateDisks(self, iobj)
4716     except errors.OpExecError:
4717       self.LogWarning("Device creation failed, reverting...")
4718       try:
4719         _RemoveDisks(self, iobj)
4720       finally:
4721         self.cfg.ReleaseDRBDMinors(instance)
4722         raise
4723
4724     feedback_fn("adding instance %s to cluster config" % instance)
4725
4726     self.cfg.AddInstance(iobj)
4727     # Declare that we don't want to remove the instance lock anymore, as we've
4728     # added the instance to the config
4729     del self.remove_locks[locking.LEVEL_INSTANCE]
4730     # Unlock all the nodes
4731     if self.op.mode == constants.INSTANCE_IMPORT:
4732       nodes_keep = [self.op.src_node]
4733       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4734                        if node != self.op.src_node]
4735       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4736       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4737     else:
4738       self.context.glm.release(locking.LEVEL_NODE)
4739       del self.acquired_locks[locking.LEVEL_NODE]
4740
4741     if self.op.wait_for_sync:
4742       disk_abort = not _WaitForSync(self, iobj)
4743     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4744       # make sure the disks are not degraded (still sync-ing is ok)
4745       time.sleep(15)
4746       feedback_fn("* checking mirrors status")
4747       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4748     else:
4749       disk_abort = False
4750
4751     if disk_abort:
4752       _RemoveDisks(self, iobj)
4753       self.cfg.RemoveInstance(iobj.name)
4754       # Make sure the instance lock gets removed
4755       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4756       raise errors.OpExecError("There are some degraded disks for"
4757                                " this instance")
4758
4759     feedback_fn("creating os for instance %s on node %s" %
4760                 (instance, pnode_name))
4761
4762     if iobj.disk_template != constants.DT_DISKLESS:
4763       if self.op.mode == constants.INSTANCE_CREATE:
4764         feedback_fn("* running the instance OS create scripts...")
4765         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4766         msg = result.RemoteFailMsg()
4767         if msg:
4768           raise errors.OpExecError("Could not add os for instance %s"
4769                                    " on node %s: %s" %
4770                                    (instance, pnode_name, msg))
4771
4772       elif self.op.mode == constants.INSTANCE_IMPORT:
4773         feedback_fn("* running the instance OS import scripts...")
4774         src_node = self.op.src_node
4775         src_images = self.src_images
4776         cluster_name = self.cfg.GetClusterName()
4777         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4778                                                          src_node, src_images,
4779                                                          cluster_name)
4780         import_result.Raise()
4781         for idx, result in enumerate(import_result.data):
4782           if not result:
4783             self.LogWarning("Could not import the image %s for instance"
4784                             " %s, disk %d, on node %s" %
4785                             (src_images[idx], instance, idx, pnode_name))
4786       else:
4787         # also checked in the prereq part
4788         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4789                                      % self.op.mode)
4790
4791     if self.op.start:
4792       iobj.admin_up = True
4793       self.cfg.Update(iobj)
4794       logging.info("Starting instance %s on node %s", instance, pnode_name)
4795       feedback_fn("* starting instance...")
4796       result = self.rpc.call_instance_start(pnode_name, iobj)
4797       msg = result.RemoteFailMsg()
4798       if msg:
4799         raise errors.OpExecError("Could not start instance: %s" % msg)
4800
4801
4802 class LUConnectConsole(NoHooksLU):
4803   """Connect to an instance's console.
4804
4805   This is somewhat special in that it returns the command line that
4806   you need to run on the master node in order to connect to the
4807   console.
4808
4809   """
4810   _OP_REQP = ["instance_name"]
4811   REQ_BGL = False
4812
4813   def ExpandNames(self):
4814     self._ExpandAndLockInstance()
4815
4816   def CheckPrereq(self):
4817     """Check prerequisites.
4818
4819     This checks that the instance is in the cluster.
4820
4821     """
4822     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4823     assert self.instance is not None, \
4824       "Cannot retrieve locked instance %s" % self.op.instance_name
4825     _CheckNodeOnline(self, self.instance.primary_node)
4826
4827   def Exec(self, feedback_fn):
4828     """Connect to the console of an instance
4829
4830     """
4831     instance = self.instance
4832     node = instance.primary_node
4833
4834     node_insts = self.rpc.call_instance_list([node],
4835                                              [instance.hypervisor])[node]
4836     node_insts.Raise()
4837
4838     if instance.name not in node_insts.data:
4839       raise errors.OpExecError("Instance %s is not running." % instance.name)
4840
4841     logging.debug("Connecting to console of %s on %s", instance.name, node)
4842
4843     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4844     cluster = self.cfg.GetClusterInfo()
4845     # beparams and hvparams are passed separately, to avoid editing the
4846     # instance and then saving the defaults in the instance itself.
4847     hvparams = cluster.FillHV(instance)
4848     beparams = cluster.FillBE(instance)
4849     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4850
4851     # build ssh cmdline
4852     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4853
4854
4855 class LUReplaceDisks(LogicalUnit):
4856   """Replace the disks of an instance.
4857
4858   """
4859   HPATH = "mirrors-replace"
4860   HTYPE = constants.HTYPE_INSTANCE
4861   _OP_REQP = ["instance_name", "mode", "disks"]
4862   REQ_BGL = False
4863
4864   def CheckArguments(self):
4865     if not hasattr(self.op, "remote_node"):
4866       self.op.remote_node = None
4867     if not hasattr(self.op, "iallocator"):
4868       self.op.iallocator = None
4869
4870     # check for valid parameter combination
4871     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4872     if self.op.mode == constants.REPLACE_DISK_CHG:
4873       if cnt == 2:
4874         raise errors.OpPrereqError("When changing the secondary either an"
4875                                    " iallocator script must be used or the"
4876                                    " new node given")
4877       elif cnt == 0:
4878         raise errors.OpPrereqError("Give either the iallocator or the new"
4879                                    " secondary, not both")
4880     else: # not replacing the secondary
4881       if cnt != 2:
4882         raise errors.OpPrereqError("The iallocator and new node options can"
4883                                    " be used only when changing the"
4884                                    " secondary node")
4885
4886   def ExpandNames(self):
4887     self._ExpandAndLockInstance()
4888
4889     if self.op.iallocator is not None:
4890       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4891     elif self.op.remote_node is not None:
4892       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4893       if remote_node is None:
4894         raise errors.OpPrereqError("Node '%s' not known" %
4895                                    self.op.remote_node)
4896       self.op.remote_node = remote_node
4897       # Warning: do not remove the locking of the new secondary here
4898       # unless DRBD8.AddChildren is changed to work in parallel;
4899       # currently it doesn't since parallel invocations of
4900       # FindUnusedMinor will conflict
4901       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4902       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4903     else:
4904       self.needed_locks[locking.LEVEL_NODE] = []
4905       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4906
4907   def DeclareLocks(self, level):
4908     # If we're not already locking all nodes in the set we have to declare the
4909     # instance's primary/secondary nodes.
4910     if (level == locking.LEVEL_NODE and
4911         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4912       self._LockInstancesNodes()
4913
4914   def _RunAllocator(self):
4915     """Compute a new secondary node using an IAllocator.
4916
4917     """
4918     ial = IAllocator(self,
4919                      mode=constants.IALLOCATOR_MODE_RELOC,
4920                      name=self.op.instance_name,
4921                      relocate_from=[self.sec_node])
4922
4923     ial.Run(self.op.iallocator)
4924
4925     if not ial.success:
4926       raise errors.OpPrereqError("Can't compute nodes using"
4927                                  " iallocator '%s': %s" % (self.op.iallocator,
4928                                                            ial.info))
4929     if len(ial.nodes) != ial.required_nodes:
4930       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4931                                  " of nodes (%s), required %s" %
4932                                  (len(ial.nodes), ial.required_nodes))
4933     self.op.remote_node = ial.nodes[0]
4934     self.LogInfo("Selected new secondary for the instance: %s",
4935                  self.op.remote_node)
4936
4937   def BuildHooksEnv(self):
4938     """Build hooks env.
4939
4940     This runs on the master, the primary and all the secondaries.
4941
4942     """
4943     env = {
4944       "MODE": self.op.mode,
4945       "NEW_SECONDARY": self.op.remote_node,
4946       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4947       }
4948     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4949     nl = [
4950       self.cfg.GetMasterNode(),
4951       self.instance.primary_node,
4952       ]
4953     if self.op.remote_node is not None:
4954       nl.append(self.op.remote_node)
4955     return env, nl, nl
4956
4957   def CheckPrereq(self):
4958     """Check prerequisites.
4959
4960     This checks that the instance is in the cluster.
4961
4962     """
4963     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4964     assert instance is not None, \
4965       "Cannot retrieve locked instance %s" % self.op.instance_name
4966     self.instance = instance
4967
4968     if instance.disk_template != constants.DT_DRBD8:
4969       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4970                                  " instances")
4971
4972     if len(instance.secondary_nodes) != 1:
4973       raise errors.OpPrereqError("The instance has a strange layout,"
4974                                  " expected one secondary but found %d" %
4975                                  len(instance.secondary_nodes))
4976
4977     self.sec_node = instance.secondary_nodes[0]
4978
4979     if self.op.iallocator is not None:
4980       self._RunAllocator()
4981
4982     remote_node = self.op.remote_node
4983     if remote_node is not None:
4984       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4985       assert self.remote_node_info is not None, \
4986         "Cannot retrieve locked node %s" % remote_node
4987     else:
4988       self.remote_node_info = None
4989     if remote_node == instance.primary_node:
4990       raise errors.OpPrereqError("The specified node is the primary node of"
4991                                  " the instance.")
4992     elif remote_node == self.sec_node:
4993       raise errors.OpPrereqError("The specified node is already the"
4994                                  " secondary node of the instance.")
4995
4996     if self.op.mode == constants.REPLACE_DISK_PRI:
4997       n1 = self.tgt_node = instance.primary_node
4998       n2 = self.oth_node = self.sec_node
4999     elif self.op.mode == constants.REPLACE_DISK_SEC:
5000       n1 = self.tgt_node = self.sec_node
5001       n2 = self.oth_node = instance.primary_node
5002     elif self.op.mode == constants.REPLACE_DISK_CHG:
5003       n1 = self.new_node = remote_node
5004       n2 = self.oth_node = instance.primary_node
5005       self.tgt_node = self.sec_node
5006       _CheckNodeNotDrained(self, remote_node)
5007     else:
5008       raise errors.ProgrammerError("Unhandled disk replace mode")
5009
5010     _CheckNodeOnline(self, n1)
5011     _CheckNodeOnline(self, n2)
5012
5013     if not self.op.disks:
5014       self.op.disks = range(len(instance.disks))
5015
5016     for disk_idx in self.op.disks:
5017       instance.FindDisk(disk_idx)
5018
5019   def _ExecD8DiskOnly(self, feedback_fn):
5020     """Replace a disk on the primary or secondary for dbrd8.
5021
5022     The algorithm for replace is quite complicated:
5023
5024       1. for each disk to be replaced:
5025
5026         1. create new LVs on the target node with unique names
5027         1. detach old LVs from the drbd device
5028         1. rename old LVs to name_replaced.<time_t>
5029         1. rename new LVs to old LVs
5030         1. attach the new LVs (with the old names now) to the drbd device
5031
5032       1. wait for sync across all devices
5033
5034       1. for each modified disk:
5035
5036         1. remove old LVs (which have the name name_replaces.<time_t>)
5037
5038     Failures are not very well handled.
5039
5040     """
5041     steps_total = 6
5042     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5043     instance = self.instance
5044     iv_names = {}
5045     vgname = self.cfg.GetVGName()
5046     # start of work
5047     cfg = self.cfg
5048     tgt_node = self.tgt_node
5049     oth_node = self.oth_node
5050
5051     # Step: check device activation
5052     self.proc.LogStep(1, steps_total, "check device existence")
5053     info("checking volume groups")
5054     my_vg = cfg.GetVGName()
5055     results = self.rpc.call_vg_list([oth_node, tgt_node])
5056     if not results:
5057       raise errors.OpExecError("Can't list volume groups on the nodes")
5058     for node in oth_node, tgt_node:
5059       res = results[node]
5060       if res.failed or not res.data or my_vg not in res.data:
5061         raise errors.OpExecError("Volume group '%s' not found on %s" %
5062                                  (my_vg, node))
5063     for idx, dev in enumerate(instance.disks):
5064       if idx not in self.op.disks:
5065         continue
5066       for node in tgt_node, oth_node:
5067         info("checking disk/%d on %s" % (idx, node))
5068         cfg.SetDiskID(dev, node)
5069         result = self.rpc.call_blockdev_find(node, dev)
5070         msg = result.RemoteFailMsg()
5071         if not msg and not result.payload:
5072           msg = "disk not found"
5073         if msg:
5074           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5075                                    (idx, node, msg))
5076
5077     # Step: check other node consistency
5078     self.proc.LogStep(2, steps_total, "check peer consistency")
5079     for idx, dev in enumerate(instance.disks):
5080       if idx not in self.op.disks:
5081         continue
5082       info("checking disk/%d consistency on %s" % (idx, oth_node))
5083       if not _CheckDiskConsistency(self, dev, oth_node,
5084                                    oth_node==instance.primary_node):
5085         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5086                                  " to replace disks on this node (%s)" %
5087                                  (oth_node, tgt_node))
5088
5089     # Step: create new storage
5090     self.proc.LogStep(3, steps_total, "allocate new storage")
5091     for idx, dev in enumerate(instance.disks):
5092       if idx not in self.op.disks:
5093         continue
5094       size = dev.size
5095       cfg.SetDiskID(dev, tgt_node)
5096       lv_names = [".disk%d_%s" % (idx, suf)
5097                   for suf in ["data", "meta"]]
5098       names = _GenerateUniqueNames(self, lv_names)
5099       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5100                              logical_id=(vgname, names[0]))
5101       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5102                              logical_id=(vgname, names[1]))
5103       new_lvs = [lv_data, lv_meta]
5104       old_lvs = dev.children
5105       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5106       info("creating new local storage on %s for %s" %
5107            (tgt_node, dev.iv_name))
5108       # we pass force_create=True to force the LVM creation
5109       for new_lv in new_lvs:
5110         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5111                         _GetInstanceInfoText(instance), False)
5112
5113     # Step: for each lv, detach+rename*2+attach
5114     self.proc.LogStep(4, steps_total, "change drbd configuration")
5115     for dev, old_lvs, new_lvs in iv_names.itervalues():
5116       info("detaching %s drbd from local storage" % dev.iv_name)
5117       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5118       result.Raise()
5119       if not result.data:
5120         raise errors.OpExecError("Can't detach drbd from local storage on node"
5121                                  " %s for device %s" % (tgt_node, dev.iv_name))
5122       #dev.children = []
5123       #cfg.Update(instance)
5124
5125       # ok, we created the new LVs, so now we know we have the needed
5126       # storage; as such, we proceed on the target node to rename
5127       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5128       # using the assumption that logical_id == physical_id (which in
5129       # turn is the unique_id on that node)
5130
5131       # FIXME(iustin): use a better name for the replaced LVs
5132       temp_suffix = int(time.time())
5133       ren_fn = lambda d, suff: (d.physical_id[0],
5134                                 d.physical_id[1] + "_replaced-%s" % suff)
5135       # build the rename list based on what LVs exist on the node
5136       rlist = []
5137       for to_ren in old_lvs:
5138         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5139         if not result.RemoteFailMsg() and result.payload:
5140           # device exists
5141           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5142
5143       info("renaming the old LVs on the target node")
5144       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5145       result.Raise()
5146       if not result.data:
5147         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5148       # now we rename the new LVs to the old LVs
5149       info("renaming the new LVs on the target node")
5150       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5151       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5152       result.Raise()
5153       if not result.data:
5154         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5155
5156       for old, new in zip(old_lvs, new_lvs):
5157         new.logical_id = old.logical_id
5158         cfg.SetDiskID(new, tgt_node)
5159
5160       for disk in old_lvs:
5161         disk.logical_id = ren_fn(disk, temp_suffix)
5162         cfg.SetDiskID(disk, tgt_node)
5163
5164       # now that the new lvs have the old name, we can add them to the device
5165       info("adding new mirror component on %s" % tgt_node)
5166       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5167       if result.failed or not result.data:
5168         for new_lv in new_lvs:
5169           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5170           if msg:
5171             warning("Can't rollback device %s: %s", dev, msg,
5172                     hint="cleanup manually the unused logical volumes")
5173         raise errors.OpExecError("Can't add local storage to drbd")
5174
5175       dev.children = new_lvs
5176       cfg.Update(instance)
5177
5178     # Step: wait for sync
5179
5180     # this can fail as the old devices are degraded and _WaitForSync
5181     # does a combined result over all disks, so we don't check its
5182     # return value
5183     self.proc.LogStep(5, steps_total, "sync devices")
5184     _WaitForSync(self, instance, unlock=True)
5185
5186     # so check manually all the devices
5187     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5188       cfg.SetDiskID(dev, instance.primary_node)
5189       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5190       msg = result.RemoteFailMsg()
5191       if not msg and not result.payload:
5192         msg = "disk not found"
5193       if msg:
5194         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5195                                  (name, msg))
5196       if result.payload[5]:
5197         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5198
5199     # Step: remove old storage
5200     self.proc.LogStep(6, steps_total, "removing old storage")
5201     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5202       info("remove logical volumes for %s" % name)
5203       for lv in old_lvs:
5204         cfg.SetDiskID(lv, tgt_node)
5205         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5206         if msg:
5207           warning("Can't remove old LV: %s" % msg,
5208                   hint="manually remove unused LVs")
5209           continue
5210
5211   def _ExecD8Secondary(self, feedback_fn):
5212     """Replace the secondary node for drbd8.
5213
5214     The algorithm for replace is quite complicated:
5215       - for all disks of the instance:
5216         - create new LVs on the new node with same names
5217         - shutdown the drbd device on the old secondary
5218         - disconnect the drbd network on the primary
5219         - create the drbd device on the new secondary
5220         - network attach the drbd on the primary, using an artifice:
5221           the drbd code for Attach() will connect to the network if it
5222           finds a device which is connected to the good local disks but
5223           not network enabled
5224       - wait for sync across all devices
5225       - remove all disks from the old secondary
5226
5227     Failures are not very well handled.
5228
5229     """
5230     steps_total = 6
5231     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5232     instance = self.instance
5233     iv_names = {}
5234     # start of work
5235     cfg = self.cfg
5236     old_node = self.tgt_node
5237     new_node = self.new_node
5238     pri_node = instance.primary_node
5239     nodes_ip = {
5240       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5241       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5242       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5243       }
5244
5245     # Step: check device activation
5246     self.proc.LogStep(1, steps_total, "check device existence")
5247     info("checking volume groups")
5248     my_vg = cfg.GetVGName()
5249     results = self.rpc.call_vg_list([pri_node, new_node])
5250     for node in pri_node, new_node:
5251       res = results[node]
5252       if res.failed or not res.data or my_vg not in res.data:
5253         raise errors.OpExecError("Volume group '%s' not found on %s" %
5254                                  (my_vg, node))
5255     for idx, dev in enumerate(instance.disks):
5256       if idx not in self.op.disks:
5257         continue
5258       info("checking disk/%d on %s" % (idx, pri_node))
5259       cfg.SetDiskID(dev, pri_node)
5260       result = self.rpc.call_blockdev_find(pri_node, dev)
5261       msg = result.RemoteFailMsg()
5262       if not msg and not result.payload:
5263         msg = "disk not found"
5264       if msg:
5265         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5266                                  (idx, pri_node, msg))
5267
5268     # Step: check other node consistency
5269     self.proc.LogStep(2, steps_total, "check peer consistency")
5270     for idx, dev in enumerate(instance.disks):
5271       if idx not in self.op.disks:
5272         continue
5273       info("checking disk/%d consistency on %s" % (idx, pri_node))
5274       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5275         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5276                                  " unsafe to replace the secondary" %
5277                                  pri_node)
5278
5279     # Step: create new storage
5280     self.proc.LogStep(3, steps_total, "allocate new storage")
5281     for idx, dev in enumerate(instance.disks):
5282       info("adding new local storage on %s for disk/%d" %
5283            (new_node, idx))
5284       # we pass force_create=True to force LVM creation
5285       for new_lv in dev.children:
5286         _CreateBlockDev(self, new_node, instance, new_lv, True,
5287                         _GetInstanceInfoText(instance), False)
5288
5289     # Step 4: dbrd minors and drbd setups changes
5290     # after this, we must manually remove the drbd minors on both the
5291     # error and the success paths
5292     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5293                                    instance.name)
5294     logging.debug("Allocated minors %s" % (minors,))
5295     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5296     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5297       size = dev.size
5298       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5299       # create new devices on new_node; note that we create two IDs:
5300       # one without port, so the drbd will be activated without
5301       # networking information on the new node at this stage, and one
5302       # with network, for the latter activation in step 4
5303       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5304       if pri_node == o_node1:
5305         p_minor = o_minor1
5306       else:
5307         p_minor = o_minor2
5308
5309       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5310       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5311
5312       iv_names[idx] = (dev, dev.children, new_net_id)
5313       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5314                     new_net_id)
5315       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5316                               logical_id=new_alone_id,
5317                               children=dev.children)
5318       try:
5319         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5320                               _GetInstanceInfoText(instance), False)
5321       except errors.GenericError:
5322         self.cfg.ReleaseDRBDMinors(instance.name)
5323         raise
5324
5325     for idx, dev in enumerate(instance.disks):
5326       # we have new devices, shutdown the drbd on the old secondary
5327       info("shutting down drbd for disk/%d on old node" % idx)
5328       cfg.SetDiskID(dev, old_node)
5329       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5330       if msg:
5331         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5332                 (idx, msg),
5333                 hint="Please cleanup this device manually as soon as possible")
5334
5335     info("detaching primary drbds from the network (=> standalone)")
5336     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5337                                                instance.disks)[pri_node]
5338
5339     msg = result.RemoteFailMsg()
5340     if msg:
5341       # detaches didn't succeed (unlikely)
5342       self.cfg.ReleaseDRBDMinors(instance.name)
5343       raise errors.OpExecError("Can't detach the disks from the network on"
5344                                " old node: %s" % (msg,))
5345
5346     # if we managed to detach at least one, we update all the disks of
5347     # the instance to point to the new secondary
5348     info("updating instance configuration")
5349     for dev, _, new_logical_id in iv_names.itervalues():
5350       dev.logical_id = new_logical_id
5351       cfg.SetDiskID(dev, pri_node)
5352     cfg.Update(instance)
5353
5354     # and now perform the drbd attach
5355     info("attaching primary drbds to new secondary (standalone => connected)")
5356     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5357                                            instance.disks, instance.name,
5358                                            False)
5359     for to_node, to_result in result.items():
5360       msg = to_result.RemoteFailMsg()
5361       if msg:
5362         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5363                 hint="please do a gnt-instance info to see the"
5364                 " status of disks")
5365
5366     # this can fail as the old devices are degraded and _WaitForSync
5367     # does a combined result over all disks, so we don't check its
5368     # return value
5369     self.proc.LogStep(5, steps_total, "sync devices")
5370     _WaitForSync(self, instance, unlock=True)
5371
5372     # so check manually all the devices
5373     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5374       cfg.SetDiskID(dev, pri_node)
5375       result = self.rpc.call_blockdev_find(pri_node, dev)
5376       msg = result.RemoteFailMsg()
5377       if not msg and not result.payload:
5378         msg = "disk not found"
5379       if msg:
5380         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5381                                  (idx, msg))
5382       if result.payload[5]:
5383         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5384
5385     self.proc.LogStep(6, steps_total, "removing old storage")
5386     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5387       info("remove logical volumes for disk/%d" % idx)
5388       for lv in old_lvs:
5389         cfg.SetDiskID(lv, old_node)
5390         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5391         if msg:
5392           warning("Can't remove LV on old secondary: %s", msg,
5393                   hint="Cleanup stale volumes by hand")
5394
5395   def Exec(self, feedback_fn):
5396     """Execute disk replacement.
5397
5398     This dispatches the disk replacement to the appropriate handler.
5399
5400     """
5401     instance = self.instance
5402
5403     # Activate the instance disks if we're replacing them on a down instance
5404     if not instance.admin_up:
5405       _StartInstanceDisks(self, instance, True)
5406
5407     if self.op.mode == constants.REPLACE_DISK_CHG:
5408       fn = self._ExecD8Secondary
5409     else:
5410       fn = self._ExecD8DiskOnly
5411
5412     ret = fn(feedback_fn)
5413
5414     # Deactivate the instance disks if we're replacing them on a down instance
5415     if not instance.admin_up:
5416       _SafeShutdownInstanceDisks(self, instance)
5417
5418     return ret
5419
5420
5421 class LUGrowDisk(LogicalUnit):
5422   """Grow a disk of an instance.
5423
5424   """
5425   HPATH = "disk-grow"
5426   HTYPE = constants.HTYPE_INSTANCE
5427   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5428   REQ_BGL = False
5429
5430   def ExpandNames(self):
5431     self._ExpandAndLockInstance()
5432     self.needed_locks[locking.LEVEL_NODE] = []
5433     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5434
5435   def DeclareLocks(self, level):
5436     if level == locking.LEVEL_NODE:
5437       self._LockInstancesNodes()
5438
5439   def BuildHooksEnv(self):
5440     """Build hooks env.
5441
5442     This runs on the master, the primary and all the secondaries.
5443
5444     """
5445     env = {
5446       "DISK": self.op.disk,
5447       "AMOUNT": self.op.amount,
5448       }
5449     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5450     nl = [
5451       self.cfg.GetMasterNode(),
5452       self.instance.primary_node,
5453       ]
5454     return env, nl, nl
5455
5456   def CheckPrereq(self):
5457     """Check prerequisites.
5458
5459     This checks that the instance is in the cluster.
5460
5461     """
5462     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5463     assert instance is not None, \
5464       "Cannot retrieve locked instance %s" % self.op.instance_name
5465     nodenames = list(instance.all_nodes)
5466     for node in nodenames:
5467       _CheckNodeOnline(self, node)
5468
5469
5470     self.instance = instance
5471
5472     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5473       raise errors.OpPrereqError("Instance's disk layout does not support"
5474                                  " growing.")
5475
5476     self.disk = instance.FindDisk(self.op.disk)
5477
5478     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5479                                        instance.hypervisor)
5480     for node in nodenames:
5481       info = nodeinfo[node]
5482       if info.failed or not info.data:
5483         raise errors.OpPrereqError("Cannot get current information"
5484                                    " from node '%s'" % node)
5485       vg_free = info.data.get('vg_free', None)
5486       if not isinstance(vg_free, int):
5487         raise errors.OpPrereqError("Can't compute free disk space on"
5488                                    " node %s" % node)
5489       if self.op.amount > vg_free:
5490         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5491                                    " %d MiB available, %d MiB required" %
5492                                    (node, vg_free, self.op.amount))
5493
5494   def Exec(self, feedback_fn):
5495     """Execute disk grow.
5496
5497     """
5498     instance = self.instance
5499     disk = self.disk
5500     for node in instance.all_nodes:
5501       self.cfg.SetDiskID(disk, node)
5502       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5503       msg = result.RemoteFailMsg()
5504       if msg:
5505         raise errors.OpExecError("Grow request failed to node %s: %s" %
5506                                  (node, msg))
5507     disk.RecordGrow(self.op.amount)
5508     self.cfg.Update(instance)
5509     if self.op.wait_for_sync:
5510       disk_abort = not _WaitForSync(self, instance)
5511       if disk_abort:
5512         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5513                              " status.\nPlease check the instance.")
5514
5515
5516 class LUQueryInstanceData(NoHooksLU):
5517   """Query runtime instance data.
5518
5519   """
5520   _OP_REQP = ["instances", "static"]
5521   REQ_BGL = False
5522
5523   def ExpandNames(self):
5524     self.needed_locks = {}
5525     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5526
5527     if not isinstance(self.op.instances, list):
5528       raise errors.OpPrereqError("Invalid argument type 'instances'")
5529
5530     if self.op.instances:
5531       self.wanted_names = []
5532       for name in self.op.instances:
5533         full_name = self.cfg.ExpandInstanceName(name)
5534         if full_name is None:
5535           raise errors.OpPrereqError("Instance '%s' not known" % name)
5536         self.wanted_names.append(full_name)
5537       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5538     else:
5539       self.wanted_names = None
5540       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5541
5542     self.needed_locks[locking.LEVEL_NODE] = []
5543     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5544
5545   def DeclareLocks(self, level):
5546     if level == locking.LEVEL_NODE:
5547       self._LockInstancesNodes()
5548
5549   def CheckPrereq(self):
5550     """Check prerequisites.
5551
5552     This only checks the optional instance list against the existing names.
5553
5554     """
5555     if self.wanted_names is None:
5556       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5557
5558     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5559                              in self.wanted_names]
5560     return
5561
5562   def _ComputeDiskStatus(self, instance, snode, dev):
5563     """Compute block device status.
5564
5565     """
5566     static = self.op.static
5567     if not static:
5568       self.cfg.SetDiskID(dev, instance.primary_node)
5569       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5570       if dev_pstatus.offline:
5571         dev_pstatus = None
5572       else:
5573         msg = dev_pstatus.RemoteFailMsg()
5574         if msg:
5575           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5576                                    (instance.name, msg))
5577         dev_pstatus = dev_pstatus.payload
5578     else:
5579       dev_pstatus = None
5580
5581     if dev.dev_type in constants.LDS_DRBD:
5582       # we change the snode then (otherwise we use the one passed in)
5583       if dev.logical_id[0] == instance.primary_node:
5584         snode = dev.logical_id[1]
5585       else:
5586         snode = dev.logical_id[0]
5587
5588     if snode and not static:
5589       self.cfg.SetDiskID(dev, snode)
5590       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5591       if dev_sstatus.offline:
5592         dev_sstatus = None
5593       else:
5594         msg = dev_sstatus.RemoteFailMsg()
5595         if msg:
5596           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5597                                    (instance.name, msg))
5598         dev_sstatus = dev_sstatus.payload
5599     else:
5600       dev_sstatus = None
5601
5602     if dev.children:
5603       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5604                       for child in dev.children]
5605     else:
5606       dev_children = []
5607
5608     data = {
5609       "iv_name": dev.iv_name,
5610       "dev_type": dev.dev_type,
5611       "logical_id": dev.logical_id,
5612       "physical_id": dev.physical_id,
5613       "pstatus": dev_pstatus,
5614       "sstatus": dev_sstatus,
5615       "children": dev_children,
5616       "mode": dev.mode,
5617       }
5618
5619     return data
5620
5621   def Exec(self, feedback_fn):
5622     """Gather and return data"""
5623     result = {}
5624
5625     cluster = self.cfg.GetClusterInfo()
5626
5627     for instance in self.wanted_instances:
5628       if not self.op.static:
5629         remote_info = self.rpc.call_instance_info(instance.primary_node,
5630                                                   instance.name,
5631                                                   instance.hypervisor)
5632         remote_info.Raise()
5633         remote_info = remote_info.data
5634         if remote_info and "state" in remote_info:
5635           remote_state = "up"
5636         else:
5637           remote_state = "down"
5638       else:
5639         remote_state = None
5640       if instance.admin_up:
5641         config_state = "up"
5642       else:
5643         config_state = "down"
5644
5645       disks = [self._ComputeDiskStatus(instance, None, device)
5646                for device in instance.disks]
5647
5648       idict = {
5649         "name": instance.name,
5650         "config_state": config_state,
5651         "run_state": remote_state,
5652         "pnode": instance.primary_node,
5653         "snodes": instance.secondary_nodes,
5654         "os": instance.os,
5655         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5656         "disks": disks,
5657         "hypervisor": instance.hypervisor,
5658         "network_port": instance.network_port,
5659         "hv_instance": instance.hvparams,
5660         "hv_actual": cluster.FillHV(instance),
5661         "be_instance": instance.beparams,
5662         "be_actual": cluster.FillBE(instance),
5663         }
5664
5665       result[instance.name] = idict
5666
5667     return result
5668
5669
5670 class LUSetInstanceParams(LogicalUnit):
5671   """Modifies an instances's parameters.
5672
5673   """
5674   HPATH = "instance-modify"
5675   HTYPE = constants.HTYPE_INSTANCE
5676   _OP_REQP = ["instance_name"]
5677   REQ_BGL = False
5678
5679   def CheckArguments(self):
5680     if not hasattr(self.op, 'nics'):
5681       self.op.nics = []
5682     if not hasattr(self.op, 'disks'):
5683       self.op.disks = []
5684     if not hasattr(self.op, 'beparams'):
5685       self.op.beparams = {}
5686     if not hasattr(self.op, 'hvparams'):
5687       self.op.hvparams = {}
5688     self.op.force = getattr(self.op, "force", False)
5689     if not (self.op.nics or self.op.disks or
5690             self.op.hvparams or self.op.beparams):
5691       raise errors.OpPrereqError("No changes submitted")
5692
5693     # Disk validation
5694     disk_addremove = 0
5695     for disk_op, disk_dict in self.op.disks:
5696       if disk_op == constants.DDM_REMOVE:
5697         disk_addremove += 1
5698         continue
5699       elif disk_op == constants.DDM_ADD:
5700         disk_addremove += 1
5701       else:
5702         if not isinstance(disk_op, int):
5703           raise errors.OpPrereqError("Invalid disk index")
5704       if disk_op == constants.DDM_ADD:
5705         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5706         if mode not in constants.DISK_ACCESS_SET:
5707           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5708         size = disk_dict.get('size', None)
5709         if size is None:
5710           raise errors.OpPrereqError("Required disk parameter size missing")
5711         try:
5712           size = int(size)
5713         except ValueError, err:
5714           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5715                                      str(err))
5716         disk_dict['size'] = size
5717       else:
5718         # modification of disk
5719         if 'size' in disk_dict:
5720           raise errors.OpPrereqError("Disk size change not possible, use"
5721                                      " grow-disk")
5722
5723     if disk_addremove > 1:
5724       raise errors.OpPrereqError("Only one disk add or remove operation"
5725                                  " supported at a time")
5726
5727     # NIC validation
5728     nic_addremove = 0
5729     for nic_op, nic_dict in self.op.nics:
5730       if nic_op == constants.DDM_REMOVE:
5731         nic_addremove += 1
5732         continue
5733       elif nic_op == constants.DDM_ADD:
5734         nic_addremove += 1
5735       else:
5736         if not isinstance(nic_op, int):
5737           raise errors.OpPrereqError("Invalid nic index")
5738
5739       # nic_dict should be a dict
5740       nic_ip = nic_dict.get('ip', None)
5741       if nic_ip is not None:
5742         if nic_ip.lower() == constants.VALUE_NONE:
5743           nic_dict['ip'] = None
5744         else:
5745           if not utils.IsValidIP(nic_ip):
5746             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5747
5748       if nic_op == constants.DDM_ADD:
5749         nic_bridge = nic_dict.get('bridge', None)
5750         if nic_bridge is None:
5751           nic_dict['bridge'] = self.cfg.GetDefBridge()
5752         nic_mac = nic_dict.get('mac', None)
5753         if nic_mac is None:
5754           nic_dict['mac'] = constants.VALUE_AUTO
5755
5756       if 'mac' in nic_dict:
5757         nic_mac = nic_dict['mac']
5758         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5759           if not utils.IsValidMac(nic_mac):
5760             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5761         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5762           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5763                                      " modifying an existing nic")
5764
5765     if nic_addremove > 1:
5766       raise errors.OpPrereqError("Only one NIC add or remove operation"
5767                                  " supported at a time")
5768
5769   def ExpandNames(self):
5770     self._ExpandAndLockInstance()
5771     self.needed_locks[locking.LEVEL_NODE] = []
5772     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5773
5774   def DeclareLocks(self, level):
5775     if level == locking.LEVEL_NODE:
5776       self._LockInstancesNodes()
5777
5778   def BuildHooksEnv(self):
5779     """Build hooks env.
5780
5781     This runs on the master, primary and secondaries.
5782
5783     """
5784     args = dict()
5785     if constants.BE_MEMORY in self.be_new:
5786       args['memory'] = self.be_new[constants.BE_MEMORY]
5787     if constants.BE_VCPUS in self.be_new:
5788       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5789     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5790     # information at all.
5791     if self.op.nics:
5792       args['nics'] = []
5793       nic_override = dict(self.op.nics)
5794       for idx, nic in enumerate(self.instance.nics):
5795         if idx in nic_override:
5796           this_nic_override = nic_override[idx]
5797         else:
5798           this_nic_override = {}
5799         if 'ip' in this_nic_override:
5800           ip = this_nic_override['ip']
5801         else:
5802           ip = nic.ip
5803         if 'bridge' in this_nic_override:
5804           bridge = this_nic_override['bridge']
5805         else:
5806           bridge = nic.bridge
5807         if 'mac' in this_nic_override:
5808           mac = this_nic_override['mac']
5809         else:
5810           mac = nic.mac
5811         args['nics'].append((ip, bridge, mac))
5812       if constants.DDM_ADD in nic_override:
5813         ip = nic_override[constants.DDM_ADD].get('ip', None)
5814         bridge = nic_override[constants.DDM_ADD]['bridge']
5815         mac = nic_override[constants.DDM_ADD]['mac']
5816         args['nics'].append((ip, bridge, mac))
5817       elif constants.DDM_REMOVE in nic_override:
5818         del args['nics'][-1]
5819
5820     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5821     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5822     return env, nl, nl
5823
5824   def CheckPrereq(self):
5825     """Check prerequisites.
5826
5827     This only checks the instance list against the existing names.
5828
5829     """
5830     force = self.force = self.op.force
5831
5832     # checking the new params on the primary/secondary nodes
5833
5834     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5835     assert self.instance is not None, \
5836       "Cannot retrieve locked instance %s" % self.op.instance_name
5837     pnode = instance.primary_node
5838     nodelist = list(instance.all_nodes)
5839
5840     # hvparams processing
5841     if self.op.hvparams:
5842       i_hvdict = copy.deepcopy(instance.hvparams)
5843       for key, val in self.op.hvparams.iteritems():
5844         if val == constants.VALUE_DEFAULT:
5845           try:
5846             del i_hvdict[key]
5847           except KeyError:
5848             pass
5849         else:
5850           i_hvdict[key] = val
5851       cluster = self.cfg.GetClusterInfo()
5852       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5853       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5854                                 i_hvdict)
5855       # local check
5856       hypervisor.GetHypervisor(
5857         instance.hypervisor).CheckParameterSyntax(hv_new)
5858       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5859       self.hv_new = hv_new # the new actual values
5860       self.hv_inst = i_hvdict # the new dict (without defaults)
5861     else:
5862       self.hv_new = self.hv_inst = {}
5863
5864     # beparams processing
5865     if self.op.beparams:
5866       i_bedict = copy.deepcopy(instance.beparams)
5867       for key, val in self.op.beparams.iteritems():
5868         if val == constants.VALUE_DEFAULT:
5869           try:
5870             del i_bedict[key]
5871           except KeyError:
5872             pass
5873         else:
5874           i_bedict[key] = val
5875       cluster = self.cfg.GetClusterInfo()
5876       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5877       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5878                                 i_bedict)
5879       self.be_new = be_new # the new actual values
5880       self.be_inst = i_bedict # the new dict (without defaults)
5881     else:
5882       self.be_new = self.be_inst = {}
5883
5884     self.warn = []
5885
5886     if constants.BE_MEMORY in self.op.beparams and not self.force:
5887       mem_check_list = [pnode]
5888       if be_new[constants.BE_AUTO_BALANCE]:
5889         # either we changed auto_balance to yes or it was from before
5890         mem_check_list.extend(instance.secondary_nodes)
5891       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5892                                                   instance.hypervisor)
5893       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5894                                          instance.hypervisor)
5895       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5896         # Assume the primary node is unreachable and go ahead
5897         self.warn.append("Can't get info from primary node %s" % pnode)
5898       else:
5899         if not instance_info.failed and instance_info.data:
5900           current_mem = instance_info.data['memory']
5901         else:
5902           # Assume instance not running
5903           # (there is a slight race condition here, but it's not very probable,
5904           # and we have no other way to check)
5905           current_mem = 0
5906         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5907                     nodeinfo[pnode].data['memory_free'])
5908         if miss_mem > 0:
5909           raise errors.OpPrereqError("This change will prevent the instance"
5910                                      " from starting, due to %d MB of memory"
5911                                      " missing on its primary node" % miss_mem)
5912
5913       if be_new[constants.BE_AUTO_BALANCE]:
5914         for node, nres in nodeinfo.iteritems():
5915           if node not in instance.secondary_nodes:
5916             continue
5917           if nres.failed or not isinstance(nres.data, dict):
5918             self.warn.append("Can't get info from secondary node %s" % node)
5919           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5920             self.warn.append("Not enough memory to failover instance to"
5921                              " secondary node %s" % node)
5922
5923     # NIC processing
5924     for nic_op, nic_dict in self.op.nics:
5925       if nic_op == constants.DDM_REMOVE:
5926         if not instance.nics:
5927           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5928         continue
5929       if nic_op != constants.DDM_ADD:
5930         # an existing nic
5931         if nic_op < 0 or nic_op >= len(instance.nics):
5932           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5933                                      " are 0 to %d" %
5934                                      (nic_op, len(instance.nics)))
5935       if 'bridge' in nic_dict:
5936         nic_bridge = nic_dict['bridge']
5937         if nic_bridge is None:
5938           raise errors.OpPrereqError('Cannot set the nic bridge to None')
5939         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5940           msg = ("Bridge '%s' doesn't exist on one of"
5941                  " the instance nodes" % nic_bridge)
5942           if self.force:
5943             self.warn.append(msg)
5944           else:
5945             raise errors.OpPrereqError(msg)
5946       if 'mac' in nic_dict:
5947         nic_mac = nic_dict['mac']
5948         if nic_mac is None:
5949           raise errors.OpPrereqError('Cannot set the nic mac to None')
5950         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5951           # otherwise generate the mac
5952           nic_dict['mac'] = self.cfg.GenerateMAC()
5953         else:
5954           # or validate/reserve the current one
5955           if self.cfg.IsMacInUse(nic_mac):
5956             raise errors.OpPrereqError("MAC address %s already in use"
5957                                        " in cluster" % nic_mac)
5958
5959     # DISK processing
5960     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5961       raise errors.OpPrereqError("Disk operations not supported for"
5962                                  " diskless instances")
5963     for disk_op, disk_dict in self.op.disks:
5964       if disk_op == constants.DDM_REMOVE:
5965         if len(instance.disks) == 1:
5966           raise errors.OpPrereqError("Cannot remove the last disk of"
5967                                      " an instance")
5968         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5969         ins_l = ins_l[pnode]
5970         if ins_l.failed or not isinstance(ins_l.data, list):
5971           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5972         if instance.name in ins_l.data:
5973           raise errors.OpPrereqError("Instance is running, can't remove"
5974                                      " disks.")
5975
5976       if (disk_op == constants.DDM_ADD and
5977           len(instance.nics) >= constants.MAX_DISKS):
5978         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5979                                    " add more" % constants.MAX_DISKS)
5980       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5981         # an existing disk
5982         if disk_op < 0 or disk_op >= len(instance.disks):
5983           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5984                                      " are 0 to %d" %
5985                                      (disk_op, len(instance.disks)))
5986
5987     return
5988
5989   def Exec(self, feedback_fn):
5990     """Modifies an instance.
5991
5992     All parameters take effect only at the next restart of the instance.
5993
5994     """
5995     # Process here the warnings from CheckPrereq, as we don't have a
5996     # feedback_fn there.
5997     for warn in self.warn:
5998       feedback_fn("WARNING: %s" % warn)
5999
6000     result = []
6001     instance = self.instance
6002     # disk changes
6003     for disk_op, disk_dict in self.op.disks:
6004       if disk_op == constants.DDM_REMOVE:
6005         # remove the last disk
6006         device = instance.disks.pop()
6007         device_idx = len(instance.disks)
6008         for node, disk in device.ComputeNodeTree(instance.primary_node):
6009           self.cfg.SetDiskID(disk, node)
6010           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6011           if msg:
6012             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6013                             " continuing anyway", device_idx, node, msg)
6014         result.append(("disk/%d" % device_idx, "remove"))
6015       elif disk_op == constants.DDM_ADD:
6016         # add a new disk
6017         if instance.disk_template == constants.DT_FILE:
6018           file_driver, file_path = instance.disks[0].logical_id
6019           file_path = os.path.dirname(file_path)
6020         else:
6021           file_driver = file_path = None
6022         disk_idx_base = len(instance.disks)
6023         new_disk = _GenerateDiskTemplate(self,
6024                                          instance.disk_template,
6025                                          instance.name, instance.primary_node,
6026                                          instance.secondary_nodes,
6027                                          [disk_dict],
6028                                          file_path,
6029                                          file_driver,
6030                                          disk_idx_base)[0]
6031         instance.disks.append(new_disk)
6032         info = _GetInstanceInfoText(instance)
6033
6034         logging.info("Creating volume %s for instance %s",
6035                      new_disk.iv_name, instance.name)
6036         # Note: this needs to be kept in sync with _CreateDisks
6037         #HARDCODE
6038         for node in instance.all_nodes:
6039           f_create = node == instance.primary_node
6040           try:
6041             _CreateBlockDev(self, node, instance, new_disk,
6042                             f_create, info, f_create)
6043           except errors.OpExecError, err:
6044             self.LogWarning("Failed to create volume %s (%s) on"
6045                             " node %s: %s",
6046                             new_disk.iv_name, new_disk, node, err)
6047         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6048                        (new_disk.size, new_disk.mode)))
6049       else:
6050         # change a given disk
6051         instance.disks[disk_op].mode = disk_dict['mode']
6052         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6053     # NIC changes
6054     for nic_op, nic_dict in self.op.nics:
6055       if nic_op == constants.DDM_REMOVE:
6056         # remove the last nic
6057         del instance.nics[-1]
6058         result.append(("nic.%d" % len(instance.nics), "remove"))
6059       elif nic_op == constants.DDM_ADD:
6060         # mac and bridge should be set, by now
6061         mac = nic_dict['mac']
6062         bridge = nic_dict['bridge']
6063         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6064                               bridge=bridge)
6065         instance.nics.append(new_nic)
6066         result.append(("nic.%d" % (len(instance.nics) - 1),
6067                        "add:mac=%s,ip=%s,bridge=%s" %
6068                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6069       else:
6070         # change a given nic
6071         for key in 'mac', 'ip', 'bridge':
6072           if key in nic_dict:
6073             setattr(instance.nics[nic_op], key, nic_dict[key])
6074             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6075
6076     # hvparams changes
6077     if self.op.hvparams:
6078       instance.hvparams = self.hv_inst
6079       for key, val in self.op.hvparams.iteritems():
6080         result.append(("hv/%s" % key, val))
6081
6082     # beparams changes
6083     if self.op.beparams:
6084       instance.beparams = self.be_inst
6085       for key, val in self.op.beparams.iteritems():
6086         result.append(("be/%s" % key, val))
6087
6088     self.cfg.Update(instance)
6089
6090     return result
6091
6092
6093 class LUQueryExports(NoHooksLU):
6094   """Query the exports list
6095
6096   """
6097   _OP_REQP = ['nodes']
6098   REQ_BGL = False
6099
6100   def ExpandNames(self):
6101     self.needed_locks = {}
6102     self.share_locks[locking.LEVEL_NODE] = 1
6103     if not self.op.nodes:
6104       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6105     else:
6106       self.needed_locks[locking.LEVEL_NODE] = \
6107         _GetWantedNodes(self, self.op.nodes)
6108
6109   def CheckPrereq(self):
6110     """Check prerequisites.
6111
6112     """
6113     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6114
6115   def Exec(self, feedback_fn):
6116     """Compute the list of all the exported system images.
6117
6118     @rtype: dict
6119     @return: a dictionary with the structure node->(export-list)
6120         where export-list is a list of the instances exported on
6121         that node.
6122
6123     """
6124     rpcresult = self.rpc.call_export_list(self.nodes)
6125     result = {}
6126     for node in rpcresult:
6127       if rpcresult[node].failed:
6128         result[node] = False
6129       else:
6130         result[node] = rpcresult[node].data
6131
6132     return result
6133
6134
6135 class LUExportInstance(LogicalUnit):
6136   """Export an instance to an image in the cluster.
6137
6138   """
6139   HPATH = "instance-export"
6140   HTYPE = constants.HTYPE_INSTANCE
6141   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6142   REQ_BGL = False
6143
6144   def ExpandNames(self):
6145     self._ExpandAndLockInstance()
6146     # FIXME: lock only instance primary and destination node
6147     #
6148     # Sad but true, for now we have do lock all nodes, as we don't know where
6149     # the previous export might be, and and in this LU we search for it and
6150     # remove it from its current node. In the future we could fix this by:
6151     #  - making a tasklet to search (share-lock all), then create the new one,
6152     #    then one to remove, after
6153     #  - removing the removal operation altoghether
6154     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6155
6156   def DeclareLocks(self, level):
6157     """Last minute lock declaration."""
6158     # All nodes are locked anyway, so nothing to do here.
6159
6160   def BuildHooksEnv(self):
6161     """Build hooks env.
6162
6163     This will run on the master, primary node and target node.
6164
6165     """
6166     env = {
6167       "EXPORT_NODE": self.op.target_node,
6168       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6169       }
6170     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6171     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6172           self.op.target_node]
6173     return env, nl, nl
6174
6175   def CheckPrereq(self):
6176     """Check prerequisites.
6177
6178     This checks that the instance and node names are valid.
6179
6180     """
6181     instance_name = self.op.instance_name
6182     self.instance = self.cfg.GetInstanceInfo(instance_name)
6183     assert self.instance is not None, \
6184           "Cannot retrieve locked instance %s" % self.op.instance_name
6185     _CheckNodeOnline(self, self.instance.primary_node)
6186
6187     self.dst_node = self.cfg.GetNodeInfo(
6188       self.cfg.ExpandNodeName(self.op.target_node))
6189
6190     if self.dst_node is None:
6191       # This is wrong node name, not a non-locked node
6192       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6193     _CheckNodeOnline(self, self.dst_node.name)
6194     _CheckNodeNotDrained(self, self.dst_node.name)
6195
6196     # instance disk type verification
6197     for disk in self.instance.disks:
6198       if disk.dev_type == constants.LD_FILE:
6199         raise errors.OpPrereqError("Export not supported for instances with"
6200                                    " file-based disks")
6201
6202   def Exec(self, feedback_fn):
6203     """Export an instance to an image in the cluster.
6204
6205     """
6206     instance = self.instance
6207     dst_node = self.dst_node
6208     src_node = instance.primary_node
6209     if self.op.shutdown:
6210       # shutdown the instance, but not the disks
6211       result = self.rpc.call_instance_shutdown(src_node, instance)
6212       msg = result.RemoteFailMsg()
6213       if msg:
6214         raise errors.OpExecError("Could not shutdown instance %s on"
6215                                  " node %s: %s" %
6216                                  (instance.name, src_node, msg))
6217
6218     vgname = self.cfg.GetVGName()
6219
6220     snap_disks = []
6221
6222     # set the disks ID correctly since call_instance_start needs the
6223     # correct drbd minor to create the symlinks
6224     for disk in instance.disks:
6225       self.cfg.SetDiskID(disk, src_node)
6226
6227     try:
6228       for disk in instance.disks:
6229         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6230         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6231         if new_dev_name.failed or not new_dev_name.data:
6232           self.LogWarning("Could not snapshot block device %s on node %s",
6233                           disk.logical_id[1], src_node)
6234           snap_disks.append(False)
6235         else:
6236           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6237                                  logical_id=(vgname, new_dev_name.data),
6238                                  physical_id=(vgname, new_dev_name.data),
6239                                  iv_name=disk.iv_name)
6240           snap_disks.append(new_dev)
6241
6242     finally:
6243       if self.op.shutdown and instance.admin_up:
6244         result = self.rpc.call_instance_start(src_node, instance)
6245         msg = result.RemoteFailMsg()
6246         if msg:
6247           _ShutdownInstanceDisks(self, instance)
6248           raise errors.OpExecError("Could not start instance: %s" % msg)
6249
6250     # TODO: check for size
6251
6252     cluster_name = self.cfg.GetClusterName()
6253     for idx, dev in enumerate(snap_disks):
6254       if dev:
6255         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6256                                                instance, cluster_name, idx)
6257         if result.failed or not result.data:
6258           self.LogWarning("Could not export block device %s from node %s to"
6259                           " node %s", dev.logical_id[1], src_node,
6260                           dst_node.name)
6261         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6262         if msg:
6263           self.LogWarning("Could not remove snapshot block device %s from node"
6264                           " %s: %s", dev.logical_id[1], src_node, msg)
6265
6266     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6267     if result.failed or not result.data:
6268       self.LogWarning("Could not finalize export for instance %s on node %s",
6269                       instance.name, dst_node.name)
6270
6271     nodelist = self.cfg.GetNodeList()
6272     nodelist.remove(dst_node.name)
6273
6274     # on one-node clusters nodelist will be empty after the removal
6275     # if we proceed the backup would be removed because OpQueryExports
6276     # substitutes an empty list with the full cluster node list.
6277     if nodelist:
6278       exportlist = self.rpc.call_export_list(nodelist)
6279       for node in exportlist:
6280         if exportlist[node].failed:
6281           continue
6282         if instance.name in exportlist[node].data:
6283           if not self.rpc.call_export_remove(node, instance.name):
6284             self.LogWarning("Could not remove older export for instance %s"
6285                             " on node %s", instance.name, node)
6286
6287
6288 class LURemoveExport(NoHooksLU):
6289   """Remove exports related to the named instance.
6290
6291   """
6292   _OP_REQP = ["instance_name"]
6293   REQ_BGL = False
6294
6295   def ExpandNames(self):
6296     self.needed_locks = {}
6297     # We need all nodes to be locked in order for RemoveExport to work, but we
6298     # don't need to lock the instance itself, as nothing will happen to it (and
6299     # we can remove exports also for a removed instance)
6300     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6301
6302   def CheckPrereq(self):
6303     """Check prerequisites.
6304     """
6305     pass
6306
6307   def Exec(self, feedback_fn):
6308     """Remove any export.
6309
6310     """
6311     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6312     # If the instance was not found we'll try with the name that was passed in.
6313     # This will only work if it was an FQDN, though.
6314     fqdn_warn = False
6315     if not instance_name:
6316       fqdn_warn = True
6317       instance_name = self.op.instance_name
6318
6319     exportlist = self.rpc.call_export_list(self.acquired_locks[
6320       locking.LEVEL_NODE])
6321     found = False
6322     for node in exportlist:
6323       if exportlist[node].failed:
6324         self.LogWarning("Failed to query node %s, continuing" % node)
6325         continue
6326       if instance_name in exportlist[node].data:
6327         found = True
6328         result = self.rpc.call_export_remove(node, instance_name)
6329         if result.failed or not result.data:
6330           logging.error("Could not remove export for instance %s"
6331                         " on node %s", instance_name, node)
6332
6333     if fqdn_warn and not found:
6334       feedback_fn("Export not found. If trying to remove an export belonging"
6335                   " to a deleted instance please use its Fully Qualified"
6336                   " Domain Name.")
6337
6338
6339 class TagsLU(NoHooksLU):
6340   """Generic tags LU.
6341
6342   This is an abstract class which is the parent of all the other tags LUs.
6343
6344   """
6345
6346   def ExpandNames(self):
6347     self.needed_locks = {}
6348     if self.op.kind == constants.TAG_NODE:
6349       name = self.cfg.ExpandNodeName(self.op.name)
6350       if name is None:
6351         raise errors.OpPrereqError("Invalid node name (%s)" %
6352                                    (self.op.name,))
6353       self.op.name = name
6354       self.needed_locks[locking.LEVEL_NODE] = name
6355     elif self.op.kind == constants.TAG_INSTANCE:
6356       name = self.cfg.ExpandInstanceName(self.op.name)
6357       if name is None:
6358         raise errors.OpPrereqError("Invalid instance name (%s)" %
6359                                    (self.op.name,))
6360       self.op.name = name
6361       self.needed_locks[locking.LEVEL_INSTANCE] = name
6362
6363   def CheckPrereq(self):
6364     """Check prerequisites.
6365
6366     """
6367     if self.op.kind == constants.TAG_CLUSTER:
6368       self.target = self.cfg.GetClusterInfo()
6369     elif self.op.kind == constants.TAG_NODE:
6370       self.target = self.cfg.GetNodeInfo(self.op.name)
6371     elif self.op.kind == constants.TAG_INSTANCE:
6372       self.target = self.cfg.GetInstanceInfo(self.op.name)
6373     else:
6374       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6375                                  str(self.op.kind))
6376
6377
6378 class LUGetTags(TagsLU):
6379   """Returns the tags of a given object.
6380
6381   """
6382   _OP_REQP = ["kind", "name"]
6383   REQ_BGL = False
6384
6385   def Exec(self, feedback_fn):
6386     """Returns the tag list.
6387
6388     """
6389     return list(self.target.GetTags())
6390
6391
6392 class LUSearchTags(NoHooksLU):
6393   """Searches the tags for a given pattern.
6394
6395   """
6396   _OP_REQP = ["pattern"]
6397   REQ_BGL = False
6398
6399   def ExpandNames(self):
6400     self.needed_locks = {}
6401
6402   def CheckPrereq(self):
6403     """Check prerequisites.
6404
6405     This checks the pattern passed for validity by compiling it.
6406
6407     """
6408     try:
6409       self.re = re.compile(self.op.pattern)
6410     except re.error, err:
6411       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6412                                  (self.op.pattern, err))
6413
6414   def Exec(self, feedback_fn):
6415     """Returns the tag list.
6416
6417     """
6418     cfg = self.cfg
6419     tgts = [("/cluster", cfg.GetClusterInfo())]
6420     ilist = cfg.GetAllInstancesInfo().values()
6421     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6422     nlist = cfg.GetAllNodesInfo().values()
6423     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6424     results = []
6425     for path, target in tgts:
6426       for tag in target.GetTags():
6427         if self.re.search(tag):
6428           results.append((path, tag))
6429     return results
6430
6431
6432 class LUAddTags(TagsLU):
6433   """Sets a tag on a given object.
6434
6435   """
6436   _OP_REQP = ["kind", "name", "tags"]
6437   REQ_BGL = False
6438
6439   def CheckPrereq(self):
6440     """Check prerequisites.
6441
6442     This checks the type and length of the tag name and value.
6443
6444     """
6445     TagsLU.CheckPrereq(self)
6446     for tag in self.op.tags:
6447       objects.TaggableObject.ValidateTag(tag)
6448
6449   def Exec(self, feedback_fn):
6450     """Sets the tag.
6451
6452     """
6453     try:
6454       for tag in self.op.tags:
6455         self.target.AddTag(tag)
6456     except errors.TagError, err:
6457       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6458     try:
6459       self.cfg.Update(self.target)
6460     except errors.ConfigurationError:
6461       raise errors.OpRetryError("There has been a modification to the"
6462                                 " config file and the operation has been"
6463                                 " aborted. Please retry.")
6464
6465
6466 class LUDelTags(TagsLU):
6467   """Delete a list of tags from a given object.
6468
6469   """
6470   _OP_REQP = ["kind", "name", "tags"]
6471   REQ_BGL = False
6472
6473   def CheckPrereq(self):
6474     """Check prerequisites.
6475
6476     This checks that we have the given tag.
6477
6478     """
6479     TagsLU.CheckPrereq(self)
6480     for tag in self.op.tags:
6481       objects.TaggableObject.ValidateTag(tag)
6482     del_tags = frozenset(self.op.tags)
6483     cur_tags = self.target.GetTags()
6484     if not del_tags <= cur_tags:
6485       diff_tags = del_tags - cur_tags
6486       diff_names = ["'%s'" % tag for tag in diff_tags]
6487       diff_names.sort()
6488       raise errors.OpPrereqError("Tag(s) %s not found" %
6489                                  (",".join(diff_names)))
6490
6491   def Exec(self, feedback_fn):
6492     """Remove the tag from the object.
6493
6494     """
6495     for tag in self.op.tags:
6496       self.target.RemoveTag(tag)
6497     try:
6498       self.cfg.Update(self.target)
6499     except errors.ConfigurationError:
6500       raise errors.OpRetryError("There has been a modification to the"
6501                                 " config file and the operation has been"
6502                                 " aborted. Please retry.")
6503
6504
6505 class LUTestDelay(NoHooksLU):
6506   """Sleep for a specified amount of time.
6507
6508   This LU sleeps on the master and/or nodes for a specified amount of
6509   time.
6510
6511   """
6512   _OP_REQP = ["duration", "on_master", "on_nodes"]
6513   REQ_BGL = False
6514
6515   def ExpandNames(self):
6516     """Expand names and set required locks.
6517
6518     This expands the node list, if any.
6519
6520     """
6521     self.needed_locks = {}
6522     if self.op.on_nodes:
6523       # _GetWantedNodes can be used here, but is not always appropriate to use
6524       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6525       # more information.
6526       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6527       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6528
6529   def CheckPrereq(self):
6530     """Check prerequisites.
6531
6532     """
6533
6534   def Exec(self, feedback_fn):
6535     """Do the actual sleep.
6536
6537     """
6538     if self.op.on_master:
6539       if not utils.TestDelay(self.op.duration):
6540         raise errors.OpExecError("Error during master delay test")
6541     if self.op.on_nodes:
6542       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6543       if not result:
6544         raise errors.OpExecError("Complete failure from rpc call")
6545       for node, node_result in result.items():
6546         node_result.Raise()
6547         if not node_result.data:
6548           raise errors.OpExecError("Failure during rpc call to node %s,"
6549                                    " result: %s" % (node, node_result.data))
6550
6551
6552 class IAllocator(object):
6553   """IAllocator framework.
6554
6555   An IAllocator instance has three sets of attributes:
6556     - cfg that is needed to query the cluster
6557     - input data (all members of the _KEYS class attribute are required)
6558     - four buffer attributes (in|out_data|text), that represent the
6559       input (to the external script) in text and data structure format,
6560       and the output from it, again in two formats
6561     - the result variables from the script (success, info, nodes) for
6562       easy usage
6563
6564   """
6565   _ALLO_KEYS = [
6566     "mem_size", "disks", "disk_template",
6567     "os", "tags", "nics", "vcpus", "hypervisor",
6568     ]
6569   _RELO_KEYS = [
6570     "relocate_from",
6571     ]
6572
6573   def __init__(self, lu, mode, name, **kwargs):
6574     self.lu = lu
6575     # init buffer variables
6576     self.in_text = self.out_text = self.in_data = self.out_data = None
6577     # init all input fields so that pylint is happy
6578     self.mode = mode
6579     self.name = name
6580     self.mem_size = self.disks = self.disk_template = None
6581     self.os = self.tags = self.nics = self.vcpus = None
6582     self.hypervisor = None
6583     self.relocate_from = None
6584     # computed fields
6585     self.required_nodes = None
6586     # init result fields
6587     self.success = self.info = self.nodes = None
6588     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6589       keyset = self._ALLO_KEYS
6590     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6591       keyset = self._RELO_KEYS
6592     else:
6593       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6594                                    " IAllocator" % self.mode)
6595     for key in kwargs:
6596       if key not in keyset:
6597         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6598                                      " IAllocator" % key)
6599       setattr(self, key, kwargs[key])
6600     for key in keyset:
6601       if key not in kwargs:
6602         raise errors.ProgrammerError("Missing input parameter '%s' to"
6603                                      " IAllocator" % key)
6604     self._BuildInputData()
6605
6606   def _ComputeClusterData(self):
6607     """Compute the generic allocator input data.
6608
6609     This is the data that is independent of the actual operation.
6610
6611     """
6612     cfg = self.lu.cfg
6613     cluster_info = cfg.GetClusterInfo()
6614     # cluster data
6615     data = {
6616       "version": constants.IALLOCATOR_VERSION,
6617       "cluster_name": cfg.GetClusterName(),
6618       "cluster_tags": list(cluster_info.GetTags()),
6619       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6620       # we don't have job IDs
6621       }
6622     iinfo = cfg.GetAllInstancesInfo().values()
6623     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6624
6625     # node data
6626     node_results = {}
6627     node_list = cfg.GetNodeList()
6628
6629     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6630       hypervisor_name = self.hypervisor
6631     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6632       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6633
6634     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6635                                            hypervisor_name)
6636     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6637                        cluster_info.enabled_hypervisors)
6638     for nname, nresult in node_data.items():
6639       # first fill in static (config-based) values
6640       ninfo = cfg.GetNodeInfo(nname)
6641       pnr = {
6642         "tags": list(ninfo.GetTags()),
6643         "primary_ip": ninfo.primary_ip,
6644         "secondary_ip": ninfo.secondary_ip,
6645         "offline": ninfo.offline,
6646         "drained": ninfo.drained,
6647         "master_candidate": ninfo.master_candidate,
6648         }
6649
6650       if not ninfo.offline:
6651         nresult.Raise()
6652         if not isinstance(nresult.data, dict):
6653           raise errors.OpExecError("Can't get data for node %s" % nname)
6654         remote_info = nresult.data
6655         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6656                      'vg_size', 'vg_free', 'cpu_total']:
6657           if attr not in remote_info:
6658             raise errors.OpExecError("Node '%s' didn't return attribute"
6659                                      " '%s'" % (nname, attr))
6660           try:
6661             remote_info[attr] = int(remote_info[attr])
6662           except ValueError, err:
6663             raise errors.OpExecError("Node '%s' returned invalid value"
6664                                      " for '%s': %s" % (nname, attr, err))
6665         # compute memory used by primary instances
6666         i_p_mem = i_p_up_mem = 0
6667         for iinfo, beinfo in i_list:
6668           if iinfo.primary_node == nname:
6669             i_p_mem += beinfo[constants.BE_MEMORY]
6670             if iinfo.name not in node_iinfo[nname].data:
6671               i_used_mem = 0
6672             else:
6673               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6674             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6675             remote_info['memory_free'] -= max(0, i_mem_diff)
6676
6677             if iinfo.admin_up:
6678               i_p_up_mem += beinfo[constants.BE_MEMORY]
6679
6680         # compute memory used by instances
6681         pnr_dyn = {
6682           "total_memory": remote_info['memory_total'],
6683           "reserved_memory": remote_info['memory_dom0'],
6684           "free_memory": remote_info['memory_free'],
6685           "total_disk": remote_info['vg_size'],
6686           "free_disk": remote_info['vg_free'],
6687           "total_cpus": remote_info['cpu_total'],
6688           "i_pri_memory": i_p_mem,
6689           "i_pri_up_memory": i_p_up_mem,
6690           }
6691         pnr.update(pnr_dyn)
6692
6693       node_results[nname] = pnr
6694     data["nodes"] = node_results
6695
6696     # instance data
6697     instance_data = {}
6698     for iinfo, beinfo in i_list:
6699       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6700                   for n in iinfo.nics]
6701       pir = {
6702         "tags": list(iinfo.GetTags()),
6703         "admin_up": iinfo.admin_up,
6704         "vcpus": beinfo[constants.BE_VCPUS],
6705         "memory": beinfo[constants.BE_MEMORY],
6706         "os": iinfo.os,
6707         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6708         "nics": nic_data,
6709         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6710         "disk_template": iinfo.disk_template,
6711         "hypervisor": iinfo.hypervisor,
6712         }
6713       instance_data[iinfo.name] = pir
6714
6715     data["instances"] = instance_data
6716
6717     self.in_data = data
6718
6719   def _AddNewInstance(self):
6720     """Add new instance data to allocator structure.
6721
6722     This in combination with _AllocatorGetClusterData will create the
6723     correct structure needed as input for the allocator.
6724
6725     The checks for the completeness of the opcode must have already been
6726     done.
6727
6728     """
6729     data = self.in_data
6730
6731     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6732
6733     if self.disk_template in constants.DTS_NET_MIRROR:
6734       self.required_nodes = 2
6735     else:
6736       self.required_nodes = 1
6737     request = {
6738       "type": "allocate",
6739       "name": self.name,
6740       "disk_template": self.disk_template,
6741       "tags": self.tags,
6742       "os": self.os,
6743       "vcpus": self.vcpus,
6744       "memory": self.mem_size,
6745       "disks": self.disks,
6746       "disk_space_total": disk_space,
6747       "nics": self.nics,
6748       "required_nodes": self.required_nodes,
6749       }
6750     data["request"] = request
6751
6752   def _AddRelocateInstance(self):
6753     """Add relocate instance data to allocator structure.
6754
6755     This in combination with _IAllocatorGetClusterData will create the
6756     correct structure needed as input for the allocator.
6757
6758     The checks for the completeness of the opcode must have already been
6759     done.
6760
6761     """
6762     instance = self.lu.cfg.GetInstanceInfo(self.name)
6763     if instance is None:
6764       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6765                                    " IAllocator" % self.name)
6766
6767     if instance.disk_template not in constants.DTS_NET_MIRROR:
6768       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6769
6770     if len(instance.secondary_nodes) != 1:
6771       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6772
6773     self.required_nodes = 1
6774     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6775     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6776
6777     request = {
6778       "type": "relocate",
6779       "name": self.name,
6780       "disk_space_total": disk_space,
6781       "required_nodes": self.required_nodes,
6782       "relocate_from": self.relocate_from,
6783       }
6784     self.in_data["request"] = request
6785
6786   def _BuildInputData(self):
6787     """Build input data structures.
6788
6789     """
6790     self._ComputeClusterData()
6791
6792     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6793       self._AddNewInstance()
6794     else:
6795       self._AddRelocateInstance()
6796
6797     self.in_text = serializer.Dump(self.in_data)
6798
6799   def Run(self, name, validate=True, call_fn=None):
6800     """Run an instance allocator and return the results.
6801
6802     """
6803     if call_fn is None:
6804       call_fn = self.lu.rpc.call_iallocator_runner
6805     data = self.in_text
6806
6807     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6808     result.Raise()
6809
6810     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6811       raise errors.OpExecError("Invalid result from master iallocator runner")
6812
6813     rcode, stdout, stderr, fail = result.data
6814
6815     if rcode == constants.IARUN_NOTFOUND:
6816       raise errors.OpExecError("Can't find allocator '%s'" % name)
6817     elif rcode == constants.IARUN_FAILURE:
6818       raise errors.OpExecError("Instance allocator call failed: %s,"
6819                                " output: %s" % (fail, stdout+stderr))
6820     self.out_text = stdout
6821     if validate:
6822       self._ValidateResult()
6823
6824   def _ValidateResult(self):
6825     """Process the allocator results.
6826
6827     This will process and if successful save the result in
6828     self.out_data and the other parameters.
6829
6830     """
6831     try:
6832       rdict = serializer.Load(self.out_text)
6833     except Exception, err:
6834       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6835
6836     if not isinstance(rdict, dict):
6837       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6838
6839     for key in "success", "info", "nodes":
6840       if key not in rdict:
6841         raise errors.OpExecError("Can't parse iallocator results:"
6842                                  " missing key '%s'" % key)
6843       setattr(self, key, rdict[key])
6844
6845     if not isinstance(rdict["nodes"], list):
6846       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6847                                " is not a list")
6848     self.out_data = rdict
6849
6850
6851 class LUTestAllocator(NoHooksLU):
6852   """Run allocator tests.
6853
6854   This LU runs the allocator tests
6855
6856   """
6857   _OP_REQP = ["direction", "mode", "name"]
6858
6859   def CheckPrereq(self):
6860     """Check prerequisites.
6861
6862     This checks the opcode parameters depending on the director and mode test.
6863
6864     """
6865     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6866       for attr in ["name", "mem_size", "disks", "disk_template",
6867                    "os", "tags", "nics", "vcpus"]:
6868         if not hasattr(self.op, attr):
6869           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6870                                      attr)
6871       iname = self.cfg.ExpandInstanceName(self.op.name)
6872       if iname is not None:
6873         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6874                                    iname)
6875       if not isinstance(self.op.nics, list):
6876         raise errors.OpPrereqError("Invalid parameter 'nics'")
6877       for row in self.op.nics:
6878         if (not isinstance(row, dict) or
6879             "mac" not in row or
6880             "ip" not in row or
6881             "bridge" not in row):
6882           raise errors.OpPrereqError("Invalid contents of the"
6883                                      " 'nics' parameter")
6884       if not isinstance(self.op.disks, list):
6885         raise errors.OpPrereqError("Invalid parameter 'disks'")
6886       for row in self.op.disks:
6887         if (not isinstance(row, dict) or
6888             "size" not in row or
6889             not isinstance(row["size"], int) or
6890             "mode" not in row or
6891             row["mode"] not in ['r', 'w']):
6892           raise errors.OpPrereqError("Invalid contents of the"
6893                                      " 'disks' parameter")
6894       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6895         self.op.hypervisor = self.cfg.GetHypervisorType()
6896     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6897       if not hasattr(self.op, "name"):
6898         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6899       fname = self.cfg.ExpandInstanceName(self.op.name)
6900       if fname is None:
6901         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6902                                    self.op.name)
6903       self.op.name = fname
6904       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6905     else:
6906       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6907                                  self.op.mode)
6908
6909     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6910       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6911         raise errors.OpPrereqError("Missing allocator name")
6912     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6913       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6914                                  self.op.direction)
6915
6916   def Exec(self, feedback_fn):
6917     """Run the allocator test.
6918
6919     """
6920     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6921       ial = IAllocator(self,
6922                        mode=self.op.mode,
6923                        name=self.op.name,
6924                        mem_size=self.op.mem_size,
6925                        disks=self.op.disks,
6926                        disk_template=self.op.disk_template,
6927                        os=self.op.os,
6928                        tags=self.op.tags,
6929                        nics=self.op.nics,
6930                        vcpus=self.op.vcpus,
6931                        hypervisor=self.op.hypervisor,
6932                        )
6933     else:
6934       ial = IAllocator(self,
6935                        mode=self.op.mode,
6936                        name=self.op.name,
6937                        relocate_from=list(self.relocate_from),
6938                        )
6939
6940     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6941       result = ial.in_text
6942     else:
6943       ial.Run(self.op.allocator, validate=False)
6944       result = ial.out_text
6945     return result