LUDiagnoseOS: change locking and error handling
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396   return wanted
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _CheckBooleanOpField(op, name):
419   """Validates boolean opcode parameters.
420
421   This will ensure that an opcode parameter is either a boolean value,
422   or None (but that it always exists).
423
424   """
425   val = getattr(op, name, None)
426   if not (val is None or isinstance(val, bool)):
427     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428                                (name, str(val)))
429   setattr(op, name, val)
430
431
432 def _CheckNodeOnline(lu, node):
433   """Ensure that a given node is online.
434
435   @param lu: the LU on behalf of which we make the check
436   @param node: the node to check
437   @raise errors.OpPrereqError: if the node is offline
438
439   """
440   if lu.cfg.GetNodeInfo(node).offline:
441     raise errors.OpPrereqError("Can't use offline node %s" % node)
442
443
444 def _CheckNodeNotDrained(lu, node):
445   """Ensure that a given node is not drained.
446
447   @param lu: the LU on behalf of which we make the check
448   @param node: the node to check
449   @raise errors.OpPrereqError: if the node is drained
450
451   """
452   if lu.cfg.GetNodeInfo(node).drained:
453     raise errors.OpPrereqError("Can't use drained node %s" % node)
454
455
456 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457                           memory, vcpus, nics, disk_template, disks):
458   """Builds instance related env variables for hooks
459
460   This builds the hook environment from individual variables.
461
462   @type name: string
463   @param name: the name of the instance
464   @type primary_node: string
465   @param primary_node: the name of the instance's primary node
466   @type secondary_nodes: list
467   @param secondary_nodes: list of secondary nodes as strings
468   @type os_type: string
469   @param os_type: the name of the instance's OS
470   @type status: boolean
471   @param status: the should_run status of the instance
472   @type memory: string
473   @param memory: the memory size of the instance
474   @type vcpus: string
475   @param vcpus: the count of VCPUs the instance has
476   @type nics: list
477   @param nics: list of tuples (ip, bridge, mac) representing
478       the NICs the instance  has
479   @type disk_template: string
480   @param disk_template: the distk template of the instance
481   @type disks: list
482   @param disks: the list of (size, mode) pairs
483   @rtype: dict
484   @return: the hook environment for this instance
485
486   """
487   if status:
488     str_status = "up"
489   else:
490     str_status = "down"
491   env = {
492     "OP_TARGET": name,
493     "INSTANCE_NAME": name,
494     "INSTANCE_PRIMARY": primary_node,
495     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
496     "INSTANCE_OS_TYPE": os_type,
497     "INSTANCE_STATUS": str_status,
498     "INSTANCE_MEMORY": memory,
499     "INSTANCE_VCPUS": vcpus,
500     "INSTANCE_DISK_TEMPLATE": disk_template,
501   }
502
503   if nics:
504     nic_count = len(nics)
505     for idx, (ip, bridge, mac) in enumerate(nics):
506       if ip is None:
507         ip = ""
508       env["INSTANCE_NIC%d_IP" % idx] = ip
509       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
510       env["INSTANCE_NIC%d_MAC" % idx] = mac
511   else:
512     nic_count = 0
513
514   env["INSTANCE_NIC_COUNT"] = nic_count
515
516   if disks:
517     disk_count = len(disks)
518     for idx, (size, mode) in enumerate(disks):
519       env["INSTANCE_DISK%d_SIZE" % idx] = size
520       env["INSTANCE_DISK%d_MODE" % idx] = mode
521   else:
522     disk_count = 0
523
524   env["INSTANCE_DISK_COUNT"] = disk_count
525
526   return env
527
528
529 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
530   """Builds instance related env variables for hooks from an object.
531
532   @type lu: L{LogicalUnit}
533   @param lu: the logical unit on whose behalf we execute
534   @type instance: L{objects.Instance}
535   @param instance: the instance for which we should build the
536       environment
537   @type override: dict
538   @param override: dictionary with key/values that will override
539       our values
540   @rtype: dict
541   @return: the hook environment dictionary
542
543   """
544   bep = lu.cfg.GetClusterInfo().FillBE(instance)
545   args = {
546     'name': instance.name,
547     'primary_node': instance.primary_node,
548     'secondary_nodes': instance.secondary_nodes,
549     'os_type': instance.os,
550     'status': instance.admin_up,
551     'memory': bep[constants.BE_MEMORY],
552     'vcpus': bep[constants.BE_VCPUS],
553     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
554     'disk_template': instance.disk_template,
555     'disks': [(disk.size, disk.mode) for disk in instance.disks],
556   }
557   if override:
558     args.update(override)
559   return _BuildInstanceHookEnv(**args)
560
561
562 def _AdjustCandidatePool(lu):
563   """Adjust the candidate pool after node operations.
564
565   """
566   mod_list = lu.cfg.MaintainCandidatePool()
567   if mod_list:
568     lu.LogInfo("Promoted nodes to master candidate role: %s",
569                ", ".join(node.name for node in mod_list))
570     for name in mod_list:
571       lu.context.ReaddNode(name)
572   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
573   if mc_now > mc_max:
574     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
575                (mc_now, mc_max))
576
577
578 def _CheckInstanceBridgesExist(lu, instance):
579   """Check that the brigdes needed by an instance exist.
580
581   """
582   # check bridges existance
583   brlist = [nic.bridge for nic in instance.nics]
584   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
585   result.Raise()
586   if not result.data:
587     raise errors.OpPrereqError("One or more target bridges %s does not"
588                                " exist on destination node '%s'" %
589                                (brlist, instance.primary_node))
590
591
592 class LUDestroyCluster(NoHooksLU):
593   """Logical unit for destroying the cluster.
594
595   """
596   _OP_REQP = []
597
598   def CheckPrereq(self):
599     """Check prerequisites.
600
601     This checks whether the cluster is empty.
602
603     Any errors are signalled by raising errors.OpPrereqError.
604
605     """
606     master = self.cfg.GetMasterNode()
607
608     nodelist = self.cfg.GetNodeList()
609     if len(nodelist) != 1 or nodelist[0] != master:
610       raise errors.OpPrereqError("There are still %d node(s) in"
611                                  " this cluster." % (len(nodelist) - 1))
612     instancelist = self.cfg.GetInstanceList()
613     if instancelist:
614       raise errors.OpPrereqError("There are still %d instance(s) in"
615                                  " this cluster." % len(instancelist))
616
617   def Exec(self, feedback_fn):
618     """Destroys the cluster.
619
620     """
621     master = self.cfg.GetMasterNode()
622     result = self.rpc.call_node_stop_master(master, False)
623     result.Raise()
624     if not result.data:
625       raise errors.OpExecError("Could not disable the master role")
626     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
627     utils.CreateBackup(priv_key)
628     utils.CreateBackup(pub_key)
629     return master
630
631
632 class LUVerifyCluster(LogicalUnit):
633   """Verifies the cluster status.
634
635   """
636   HPATH = "cluster-verify"
637   HTYPE = constants.HTYPE_CLUSTER
638   _OP_REQP = ["skip_checks"]
639   REQ_BGL = False
640
641   def ExpandNames(self):
642     self.needed_locks = {
643       locking.LEVEL_NODE: locking.ALL_SET,
644       locking.LEVEL_INSTANCE: locking.ALL_SET,
645     }
646     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
647
648   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
649                   node_result, feedback_fn, master_files,
650                   drbd_map, vg_name):
651     """Run multiple tests against a node.
652
653     Test list:
654
655       - compares ganeti version
656       - checks vg existance and size > 20G
657       - checks config file checksum
658       - checks ssh to other nodes
659
660     @type nodeinfo: L{objects.Node}
661     @param nodeinfo: the node to check
662     @param file_list: required list of files
663     @param local_cksum: dictionary of local files and their checksums
664     @param node_result: the results from the node
665     @param feedback_fn: function used to accumulate results
666     @param master_files: list of files that only masters should have
667     @param drbd_map: the useddrbd minors for this node, in
668         form of minor: (instance, must_exist) which correspond to instances
669         and their running status
670     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
671
672     """
673     node = nodeinfo.name
674
675     # main result, node_result should be a non-empty dict
676     if not node_result or not isinstance(node_result, dict):
677       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
678       return True
679
680     # compares ganeti version
681     local_version = constants.PROTOCOL_VERSION
682     remote_version = node_result.get('version', None)
683     if not (remote_version and isinstance(remote_version, (list, tuple)) and
684             len(remote_version) == 2):
685       feedback_fn("  - ERROR: connection to %s failed" % (node))
686       return True
687
688     if local_version != remote_version[0]:
689       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
690                   " node %s %s" % (local_version, node, remote_version[0]))
691       return True
692
693     # node seems compatible, we can actually try to look into its results
694
695     bad = False
696
697     # full package version
698     if constants.RELEASE_VERSION != remote_version[1]:
699       feedback_fn("  - WARNING: software version mismatch: master %s,"
700                   " node %s %s" %
701                   (constants.RELEASE_VERSION, node, remote_version[1]))
702
703     # checks vg existence and size > 20G
704     if vg_name is not None:
705       vglist = node_result.get(constants.NV_VGLIST, None)
706       if not vglist:
707         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
708                         (node,))
709         bad = True
710       else:
711         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
712                                               constants.MIN_VG_SIZE)
713         if vgstatus:
714           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
715           bad = True
716
717     # checks config file checksum
718
719     remote_cksum = node_result.get(constants.NV_FILELIST, None)
720     if not isinstance(remote_cksum, dict):
721       bad = True
722       feedback_fn("  - ERROR: node hasn't returned file checksum data")
723     else:
724       for file_name in file_list:
725         node_is_mc = nodeinfo.master_candidate
726         must_have_file = file_name not in master_files
727         if file_name not in remote_cksum:
728           if node_is_mc or must_have_file:
729             bad = True
730             feedback_fn("  - ERROR: file '%s' missing" % file_name)
731         elif remote_cksum[file_name] != local_cksum[file_name]:
732           if node_is_mc or must_have_file:
733             bad = True
734             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
735           else:
736             # not candidate and this is not a must-have file
737             bad = True
738             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
739                         " '%s'" % file_name)
740         else:
741           # all good, except non-master/non-must have combination
742           if not node_is_mc and not must_have_file:
743             feedback_fn("  - ERROR: file '%s' should not exist on non master"
744                         " candidates" % file_name)
745
746     # checks ssh to any
747
748     if constants.NV_NODELIST not in node_result:
749       bad = True
750       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
751     else:
752       if node_result[constants.NV_NODELIST]:
753         bad = True
754         for node in node_result[constants.NV_NODELIST]:
755           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
756                           (node, node_result[constants.NV_NODELIST][node]))
757
758     if constants.NV_NODENETTEST not in node_result:
759       bad = True
760       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
761     else:
762       if node_result[constants.NV_NODENETTEST]:
763         bad = True
764         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
765         for node in nlist:
766           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
767                           (node, node_result[constants.NV_NODENETTEST][node]))
768
769     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
770     if isinstance(hyp_result, dict):
771       for hv_name, hv_result in hyp_result.iteritems():
772         if hv_result is not None:
773           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
774                       (hv_name, hv_result))
775
776     # check used drbd list
777     if vg_name is not None:
778       used_minors = node_result.get(constants.NV_DRBDLIST, [])
779       if not isinstance(used_minors, (tuple, list)):
780         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
781                     str(used_minors))
782       else:
783         for minor, (iname, must_exist) in drbd_map.items():
784           if minor not in used_minors and must_exist:
785             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
786                         " not active" % (minor, iname))
787             bad = True
788         for minor in used_minors:
789           if minor not in drbd_map:
790             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
791                         minor)
792             bad = True
793
794     return bad
795
796   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
797                       node_instance, feedback_fn, n_offline):
798     """Verify an instance.
799
800     This function checks to see if the required block devices are
801     available on the instance's node.
802
803     """
804     bad = False
805
806     node_current = instanceconfig.primary_node
807
808     node_vol_should = {}
809     instanceconfig.MapLVsByNode(node_vol_should)
810
811     for node in node_vol_should:
812       if node in n_offline:
813         # ignore missing volumes on offline nodes
814         continue
815       for volume in node_vol_should[node]:
816         if node not in node_vol_is or volume not in node_vol_is[node]:
817           feedback_fn("  - ERROR: volume %s missing on node %s" %
818                           (volume, node))
819           bad = True
820
821     if instanceconfig.admin_up:
822       if ((node_current not in node_instance or
823           not instance in node_instance[node_current]) and
824           node_current not in n_offline):
825         feedback_fn("  - ERROR: instance %s not running on node %s" %
826                         (instance, node_current))
827         bad = True
828
829     for node in node_instance:
830       if (not node == node_current):
831         if instance in node_instance[node]:
832           feedback_fn("  - ERROR: instance %s should not run on node %s" %
833                           (instance, node))
834           bad = True
835
836     return bad
837
838   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
839     """Verify if there are any unknown volumes in the cluster.
840
841     The .os, .swap and backup volumes are ignored. All other volumes are
842     reported as unknown.
843
844     """
845     bad = False
846
847     for node in node_vol_is:
848       for volume in node_vol_is[node]:
849         if node not in node_vol_should or volume not in node_vol_should[node]:
850           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
851                       (volume, node))
852           bad = True
853     return bad
854
855   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
856     """Verify the list of running instances.
857
858     This checks what instances are running but unknown to the cluster.
859
860     """
861     bad = False
862     for node in node_instance:
863       for runninginstance in node_instance[node]:
864         if runninginstance not in instancelist:
865           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
866                           (runninginstance, node))
867           bad = True
868     return bad
869
870   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
871     """Verify N+1 Memory Resilience.
872
873     Check that if one single node dies we can still start all the instances it
874     was primary for.
875
876     """
877     bad = False
878
879     for node, nodeinfo in node_info.iteritems():
880       # This code checks that every node which is now listed as secondary has
881       # enough memory to host all instances it is supposed to should a single
882       # other node in the cluster fail.
883       # FIXME: not ready for failover to an arbitrary node
884       # FIXME: does not support file-backed instances
885       # WARNING: we currently take into account down instances as well as up
886       # ones, considering that even if they're down someone might want to start
887       # them even in the event of a node failure.
888       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
889         needed_mem = 0
890         for instance in instances:
891           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
892           if bep[constants.BE_AUTO_BALANCE]:
893             needed_mem += bep[constants.BE_MEMORY]
894         if nodeinfo['mfree'] < needed_mem:
895           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
896                       " failovers should node %s fail" % (node, prinode))
897           bad = True
898     return bad
899
900   def CheckPrereq(self):
901     """Check prerequisites.
902
903     Transform the list of checks we're going to skip into a set and check that
904     all its members are valid.
905
906     """
907     self.skip_set = frozenset(self.op.skip_checks)
908     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
909       raise errors.OpPrereqError("Invalid checks to be skipped specified")
910
911   def BuildHooksEnv(self):
912     """Build hooks env.
913
914     Cluster-Verify hooks just rone in the post phase and their failure makes
915     the output be logged in the verify output and the verification to fail.
916
917     """
918     all_nodes = self.cfg.GetNodeList()
919     env = {
920       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
921       }
922     for node in self.cfg.GetAllNodesInfo().values():
923       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
924
925     return env, [], all_nodes
926
927   def Exec(self, feedback_fn):
928     """Verify integrity of cluster, performing various test on nodes.
929
930     """
931     bad = False
932     feedback_fn("* Verifying global settings")
933     for msg in self.cfg.VerifyConfig():
934       feedback_fn("  - ERROR: %s" % msg)
935
936     vg_name = self.cfg.GetVGName()
937     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
938     nodelist = utils.NiceSort(self.cfg.GetNodeList())
939     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
940     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
941     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
942                         for iname in instancelist)
943     i_non_redundant = [] # Non redundant instances
944     i_non_a_balanced = [] # Non auto-balanced instances
945     n_offline = [] # List of offline nodes
946     n_drained = [] # List of nodes being drained
947     node_volume = {}
948     node_instance = {}
949     node_info = {}
950     instance_cfg = {}
951
952     # FIXME: verify OS list
953     # do local checksums
954     master_files = [constants.CLUSTER_CONF_FILE]
955
956     file_names = ssconf.SimpleStore().GetFileList()
957     file_names.append(constants.SSL_CERT_FILE)
958     file_names.append(constants.RAPI_CERT_FILE)
959     file_names.extend(master_files)
960
961     local_checksums = utils.FingerprintFiles(file_names)
962
963     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
964     node_verify_param = {
965       constants.NV_FILELIST: file_names,
966       constants.NV_NODELIST: [node.name for node in nodeinfo
967                               if not node.offline],
968       constants.NV_HYPERVISOR: hypervisors,
969       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
970                                   node.secondary_ip) for node in nodeinfo
971                                  if not node.offline],
972       constants.NV_INSTANCELIST: hypervisors,
973       constants.NV_VERSION: None,
974       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
975       }
976     if vg_name is not None:
977       node_verify_param[constants.NV_VGLIST] = None
978       node_verify_param[constants.NV_LVLIST] = vg_name
979       node_verify_param[constants.NV_DRBDLIST] = None
980     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
981                                            self.cfg.GetClusterName())
982
983     cluster = self.cfg.GetClusterInfo()
984     master_node = self.cfg.GetMasterNode()
985     all_drbd_map = self.cfg.ComputeDRBDMap()
986
987     for node_i in nodeinfo:
988       node = node_i.name
989       nresult = all_nvinfo[node].data
990
991       if node_i.offline:
992         feedback_fn("* Skipping offline node %s" % (node,))
993         n_offline.append(node)
994         continue
995
996       if node == master_node:
997         ntype = "master"
998       elif node_i.master_candidate:
999         ntype = "master candidate"
1000       elif node_i.drained:
1001         ntype = "drained"
1002         n_drained.append(node)
1003       else:
1004         ntype = "regular"
1005       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1006
1007       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1008         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1009         bad = True
1010         continue
1011
1012       node_drbd = {}
1013       for minor, instance in all_drbd_map[node].items():
1014         if instance not in instanceinfo:
1015           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1016                       instance)
1017           # ghost instance should not be running, but otherwise we
1018           # don't give double warnings (both ghost instance and
1019           # unallocated minor in use)
1020           node_drbd[minor] = (instance, False)
1021         else:
1022           instance = instanceinfo[instance]
1023           node_drbd[minor] = (instance.name, instance.admin_up)
1024       result = self._VerifyNode(node_i, file_names, local_checksums,
1025                                 nresult, feedback_fn, master_files,
1026                                 node_drbd, vg_name)
1027       bad = bad or result
1028
1029       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1030       if vg_name is None:
1031         node_volume[node] = {}
1032       elif isinstance(lvdata, basestring):
1033         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1034                     (node, utils.SafeEncode(lvdata)))
1035         bad = True
1036         node_volume[node] = {}
1037       elif not isinstance(lvdata, dict):
1038         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1039         bad = True
1040         continue
1041       else:
1042         node_volume[node] = lvdata
1043
1044       # node_instance
1045       idata = nresult.get(constants.NV_INSTANCELIST, None)
1046       if not isinstance(idata, list):
1047         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1048                     (node,))
1049         bad = True
1050         continue
1051
1052       node_instance[node] = idata
1053
1054       # node_info
1055       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1056       if not isinstance(nodeinfo, dict):
1057         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1058         bad = True
1059         continue
1060
1061       try:
1062         node_info[node] = {
1063           "mfree": int(nodeinfo['memory_free']),
1064           "pinst": [],
1065           "sinst": [],
1066           # dictionary holding all instances this node is secondary for,
1067           # grouped by their primary node. Each key is a cluster node, and each
1068           # value is a list of instances which have the key as primary and the
1069           # current node as secondary.  this is handy to calculate N+1 memory
1070           # availability if you can only failover from a primary to its
1071           # secondary.
1072           "sinst-by-pnode": {},
1073         }
1074         # FIXME: devise a free space model for file based instances as well
1075         if vg_name is not None:
1076           if (constants.NV_VGLIST not in nresult or
1077               vg_name not in nresult[constants.NV_VGLIST]):
1078             feedback_fn("  - ERROR: node %s didn't return data for the"
1079                         " volume group '%s' - it is either missing or broken" %
1080                         (node, vg_name))
1081             bad = True
1082             continue
1083           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1084       except (ValueError, KeyError):
1085         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1086                     " from node %s" % (node,))
1087         bad = True
1088         continue
1089
1090     node_vol_should = {}
1091
1092     for instance in instancelist:
1093       feedback_fn("* Verifying instance %s" % instance)
1094       inst_config = instanceinfo[instance]
1095       result =  self._VerifyInstance(instance, inst_config, node_volume,
1096                                      node_instance, feedback_fn, n_offline)
1097       bad = bad or result
1098       inst_nodes_offline = []
1099
1100       inst_config.MapLVsByNode(node_vol_should)
1101
1102       instance_cfg[instance] = inst_config
1103
1104       pnode = inst_config.primary_node
1105       if pnode in node_info:
1106         node_info[pnode]['pinst'].append(instance)
1107       elif pnode not in n_offline:
1108         feedback_fn("  - ERROR: instance %s, connection to primary node"
1109                     " %s failed" % (instance, pnode))
1110         bad = True
1111
1112       if pnode in n_offline:
1113         inst_nodes_offline.append(pnode)
1114
1115       # If the instance is non-redundant we cannot survive losing its primary
1116       # node, so we are not N+1 compliant. On the other hand we have no disk
1117       # templates with more than one secondary so that situation is not well
1118       # supported either.
1119       # FIXME: does not support file-backed instances
1120       if len(inst_config.secondary_nodes) == 0:
1121         i_non_redundant.append(instance)
1122       elif len(inst_config.secondary_nodes) > 1:
1123         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1124                     % instance)
1125
1126       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1127         i_non_a_balanced.append(instance)
1128
1129       for snode in inst_config.secondary_nodes:
1130         if snode in node_info:
1131           node_info[snode]['sinst'].append(instance)
1132           if pnode not in node_info[snode]['sinst-by-pnode']:
1133             node_info[snode]['sinst-by-pnode'][pnode] = []
1134           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1135         elif snode not in n_offline:
1136           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1137                       " %s failed" % (instance, snode))
1138           bad = True
1139         if snode in n_offline:
1140           inst_nodes_offline.append(snode)
1141
1142       if inst_nodes_offline:
1143         # warn that the instance lives on offline nodes, and set bad=True
1144         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1145                     ", ".join(inst_nodes_offline))
1146         bad = True
1147
1148     feedback_fn("* Verifying orphan volumes")
1149     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1150                                        feedback_fn)
1151     bad = bad or result
1152
1153     feedback_fn("* Verifying remaining instances")
1154     result = self._VerifyOrphanInstances(instancelist, node_instance,
1155                                          feedback_fn)
1156     bad = bad or result
1157
1158     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1159       feedback_fn("* Verifying N+1 Memory redundancy")
1160       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1161       bad = bad or result
1162
1163     feedback_fn("* Other Notes")
1164     if i_non_redundant:
1165       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1166                   % len(i_non_redundant))
1167
1168     if i_non_a_balanced:
1169       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1170                   % len(i_non_a_balanced))
1171
1172     if n_offline:
1173       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1174
1175     if n_drained:
1176       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1177
1178     return not bad
1179
1180   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1181     """Analize the post-hooks' result
1182
1183     This method analyses the hook result, handles it, and sends some
1184     nicely-formatted feedback back to the user.
1185
1186     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1187         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1188     @param hooks_results: the results of the multi-node hooks rpc call
1189     @param feedback_fn: function used send feedback back to the caller
1190     @param lu_result: previous Exec result
1191     @return: the new Exec result, based on the previous result
1192         and hook results
1193
1194     """
1195     # We only really run POST phase hooks, and are only interested in
1196     # their results
1197     if phase == constants.HOOKS_PHASE_POST:
1198       # Used to change hooks' output to proper indentation
1199       indent_re = re.compile('^', re.M)
1200       feedback_fn("* Hooks Results")
1201       if not hooks_results:
1202         feedback_fn("  - ERROR: general communication failure")
1203         lu_result = 1
1204       else:
1205         for node_name in hooks_results:
1206           show_node_header = True
1207           res = hooks_results[node_name]
1208           if res.failed or res.data is False or not isinstance(res.data, list):
1209             if res.offline:
1210               # no need to warn or set fail return value
1211               continue
1212             feedback_fn("    Communication failure in hooks execution")
1213             lu_result = 1
1214             continue
1215           for script, hkr, output in res.data:
1216             if hkr == constants.HKR_FAIL:
1217               # The node header is only shown once, if there are
1218               # failing hooks on that node
1219               if show_node_header:
1220                 feedback_fn("  Node %s:" % node_name)
1221                 show_node_header = False
1222               feedback_fn("    ERROR: Script %s failed, output:" % script)
1223               output = indent_re.sub('      ', output)
1224               feedback_fn("%s" % output)
1225               lu_result = 1
1226
1227       return lu_result
1228
1229
1230 class LUVerifyDisks(NoHooksLU):
1231   """Verifies the cluster disks status.
1232
1233   """
1234   _OP_REQP = []
1235   REQ_BGL = False
1236
1237   def ExpandNames(self):
1238     self.needed_locks = {
1239       locking.LEVEL_NODE: locking.ALL_SET,
1240       locking.LEVEL_INSTANCE: locking.ALL_SET,
1241     }
1242     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1243
1244   def CheckPrereq(self):
1245     """Check prerequisites.
1246
1247     This has no prerequisites.
1248
1249     """
1250     pass
1251
1252   def Exec(self, feedback_fn):
1253     """Verify integrity of cluster disks.
1254
1255     """
1256     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1257
1258     vg_name = self.cfg.GetVGName()
1259     nodes = utils.NiceSort(self.cfg.GetNodeList())
1260     instances = [self.cfg.GetInstanceInfo(name)
1261                  for name in self.cfg.GetInstanceList()]
1262
1263     nv_dict = {}
1264     for inst in instances:
1265       inst_lvs = {}
1266       if (not inst.admin_up or
1267           inst.disk_template not in constants.DTS_NET_MIRROR):
1268         continue
1269       inst.MapLVsByNode(inst_lvs)
1270       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1271       for node, vol_list in inst_lvs.iteritems():
1272         for vol in vol_list:
1273           nv_dict[(node, vol)] = inst
1274
1275     if not nv_dict:
1276       return result
1277
1278     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1279
1280     to_act = set()
1281     for node in nodes:
1282       # node_volume
1283       lvs = node_lvs[node]
1284       if lvs.failed:
1285         if not lvs.offline:
1286           self.LogWarning("Connection to node %s failed: %s" %
1287                           (node, lvs.data))
1288         continue
1289       lvs = lvs.data
1290       if isinstance(lvs, basestring):
1291         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1292         res_nlvm[node] = lvs
1293         continue
1294       elif not isinstance(lvs, dict):
1295         logging.warning("Connection to node %s failed or invalid data"
1296                         " returned", node)
1297         res_nodes.append(node)
1298         continue
1299
1300       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1301         inst = nv_dict.pop((node, lv_name), None)
1302         if (not lv_online and inst is not None
1303             and inst.name not in res_instances):
1304           res_instances.append(inst.name)
1305
1306     # any leftover items in nv_dict are missing LVs, let's arrange the
1307     # data better
1308     for key, inst in nv_dict.iteritems():
1309       if inst.name not in res_missing:
1310         res_missing[inst.name] = []
1311       res_missing[inst.name].append(key)
1312
1313     return result
1314
1315
1316 class LURenameCluster(LogicalUnit):
1317   """Rename the cluster.
1318
1319   """
1320   HPATH = "cluster-rename"
1321   HTYPE = constants.HTYPE_CLUSTER
1322   _OP_REQP = ["name"]
1323
1324   def BuildHooksEnv(self):
1325     """Build hooks env.
1326
1327     """
1328     env = {
1329       "OP_TARGET": self.cfg.GetClusterName(),
1330       "NEW_NAME": self.op.name,
1331       }
1332     mn = self.cfg.GetMasterNode()
1333     return env, [mn], [mn]
1334
1335   def CheckPrereq(self):
1336     """Verify that the passed name is a valid one.
1337
1338     """
1339     hostname = utils.HostInfo(self.op.name)
1340
1341     new_name = hostname.name
1342     self.ip = new_ip = hostname.ip
1343     old_name = self.cfg.GetClusterName()
1344     old_ip = self.cfg.GetMasterIP()
1345     if new_name == old_name and new_ip == old_ip:
1346       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1347                                  " cluster has changed")
1348     if new_ip != old_ip:
1349       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1350         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1351                                    " reachable on the network. Aborting." %
1352                                    new_ip)
1353
1354     self.op.name = new_name
1355
1356   def Exec(self, feedback_fn):
1357     """Rename the cluster.
1358
1359     """
1360     clustername = self.op.name
1361     ip = self.ip
1362
1363     # shutdown the master IP
1364     master = self.cfg.GetMasterNode()
1365     result = self.rpc.call_node_stop_master(master, False)
1366     if result.failed or not result.data:
1367       raise errors.OpExecError("Could not disable the master role")
1368
1369     try:
1370       cluster = self.cfg.GetClusterInfo()
1371       cluster.cluster_name = clustername
1372       cluster.master_ip = ip
1373       self.cfg.Update(cluster)
1374
1375       # update the known hosts file
1376       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1377       node_list = self.cfg.GetNodeList()
1378       try:
1379         node_list.remove(master)
1380       except ValueError:
1381         pass
1382       result = self.rpc.call_upload_file(node_list,
1383                                          constants.SSH_KNOWN_HOSTS_FILE)
1384       for to_node, to_result in result.iteritems():
1385         if to_result.failed or not to_result.data:
1386           logging.error("Copy of file %s to node %s failed",
1387                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1388
1389     finally:
1390       result = self.rpc.call_node_start_master(master, False)
1391       if result.failed or not result.data:
1392         self.LogWarning("Could not re-enable the master role on"
1393                         " the master, please restart manually.")
1394
1395
1396 def _RecursiveCheckIfLVMBased(disk):
1397   """Check if the given disk or its children are lvm-based.
1398
1399   @type disk: L{objects.Disk}
1400   @param disk: the disk to check
1401   @rtype: booleean
1402   @return: boolean indicating whether a LD_LV dev_type was found or not
1403
1404   """
1405   if disk.children:
1406     for chdisk in disk.children:
1407       if _RecursiveCheckIfLVMBased(chdisk):
1408         return True
1409   return disk.dev_type == constants.LD_LV
1410
1411
1412 class LUSetClusterParams(LogicalUnit):
1413   """Change the parameters of the cluster.
1414
1415   """
1416   HPATH = "cluster-modify"
1417   HTYPE = constants.HTYPE_CLUSTER
1418   _OP_REQP = []
1419   REQ_BGL = False
1420
1421   def CheckParameters(self):
1422     """Check parameters
1423
1424     """
1425     if not hasattr(self.op, "candidate_pool_size"):
1426       self.op.candidate_pool_size = None
1427     if self.op.candidate_pool_size is not None:
1428       try:
1429         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1430       except ValueError, err:
1431         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1432                                    str(err))
1433       if self.op.candidate_pool_size < 1:
1434         raise errors.OpPrereqError("At least one master candidate needed")
1435
1436   def ExpandNames(self):
1437     # FIXME: in the future maybe other cluster params won't require checking on
1438     # all nodes to be modified.
1439     self.needed_locks = {
1440       locking.LEVEL_NODE: locking.ALL_SET,
1441     }
1442     self.share_locks[locking.LEVEL_NODE] = 1
1443
1444   def BuildHooksEnv(self):
1445     """Build hooks env.
1446
1447     """
1448     env = {
1449       "OP_TARGET": self.cfg.GetClusterName(),
1450       "NEW_VG_NAME": self.op.vg_name,
1451       }
1452     mn = self.cfg.GetMasterNode()
1453     return env, [mn], [mn]
1454
1455   def CheckPrereq(self):
1456     """Check prerequisites.
1457
1458     This checks whether the given params don't conflict and
1459     if the given volume group is valid.
1460
1461     """
1462     if self.op.vg_name is not None and not self.op.vg_name:
1463       instances = self.cfg.GetAllInstancesInfo().values()
1464       for inst in instances:
1465         for disk in inst.disks:
1466           if _RecursiveCheckIfLVMBased(disk):
1467             raise errors.OpPrereqError("Cannot disable lvm storage while"
1468                                        " lvm-based instances exist")
1469
1470     node_list = self.acquired_locks[locking.LEVEL_NODE]
1471
1472     # if vg_name not None, checks given volume group on all nodes
1473     if self.op.vg_name:
1474       vglist = self.rpc.call_vg_list(node_list)
1475       for node in node_list:
1476         if vglist[node].failed:
1477           # ignoring down node
1478           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1479           continue
1480         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1481                                               self.op.vg_name,
1482                                               constants.MIN_VG_SIZE)
1483         if vgstatus:
1484           raise errors.OpPrereqError("Error on node '%s': %s" %
1485                                      (node, vgstatus))
1486
1487     self.cluster = cluster = self.cfg.GetClusterInfo()
1488     # validate beparams changes
1489     if self.op.beparams:
1490       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1491       self.new_beparams = cluster.FillDict(
1492         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1493
1494     # hypervisor list/parameters
1495     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1496     if self.op.hvparams:
1497       if not isinstance(self.op.hvparams, dict):
1498         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1499       for hv_name, hv_dict in self.op.hvparams.items():
1500         if hv_name not in self.new_hvparams:
1501           self.new_hvparams[hv_name] = hv_dict
1502         else:
1503           self.new_hvparams[hv_name].update(hv_dict)
1504
1505     if self.op.enabled_hypervisors is not None:
1506       self.hv_list = self.op.enabled_hypervisors
1507     else:
1508       self.hv_list = cluster.enabled_hypervisors
1509
1510     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1511       # either the enabled list has changed, or the parameters have, validate
1512       for hv_name, hv_params in self.new_hvparams.items():
1513         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1514             (self.op.enabled_hypervisors and
1515              hv_name in self.op.enabled_hypervisors)):
1516           # either this is a new hypervisor, or its parameters have changed
1517           hv_class = hypervisor.GetHypervisor(hv_name)
1518           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1519           hv_class.CheckParameterSyntax(hv_params)
1520           _CheckHVParams(self, node_list, hv_name, hv_params)
1521
1522   def Exec(self, feedback_fn):
1523     """Change the parameters of the cluster.
1524
1525     """
1526     if self.op.vg_name is not None:
1527       if self.op.vg_name != self.cfg.GetVGName():
1528         self.cfg.SetVGName(self.op.vg_name)
1529       else:
1530         feedback_fn("Cluster LVM configuration already in desired"
1531                     " state, not changing")
1532     if self.op.hvparams:
1533       self.cluster.hvparams = self.new_hvparams
1534     if self.op.enabled_hypervisors is not None:
1535       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1536     if self.op.beparams:
1537       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1538     if self.op.candidate_pool_size is not None:
1539       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1540
1541     self.cfg.Update(self.cluster)
1542
1543     # we want to update nodes after the cluster so that if any errors
1544     # happen, we have recorded and saved the cluster info
1545     if self.op.candidate_pool_size is not None:
1546       _AdjustCandidatePool(self)
1547
1548
1549 class LURedistributeConfig(NoHooksLU):
1550   """Force the redistribution of cluster configuration.
1551
1552   This is a very simple LU.
1553
1554   """
1555   _OP_REQP = []
1556   REQ_BGL = False
1557
1558   def ExpandNames(self):
1559     self.needed_locks = {
1560       locking.LEVEL_NODE: locking.ALL_SET,
1561     }
1562     self.share_locks[locking.LEVEL_NODE] = 1
1563
1564   def CheckPrereq(self):
1565     """Check prerequisites.
1566
1567     """
1568
1569   def Exec(self, feedback_fn):
1570     """Redistribute the configuration.
1571
1572     """
1573     self.cfg.Update(self.cfg.GetClusterInfo())
1574
1575
1576 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1577   """Sleep and poll for an instance's disk to sync.
1578
1579   """
1580   if not instance.disks:
1581     return True
1582
1583   if not oneshot:
1584     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1585
1586   node = instance.primary_node
1587
1588   for dev in instance.disks:
1589     lu.cfg.SetDiskID(dev, node)
1590
1591   retries = 0
1592   while True:
1593     max_time = 0
1594     done = True
1595     cumul_degraded = False
1596     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1597     if rstats.failed or not rstats.data:
1598       lu.LogWarning("Can't get any data from node %s", node)
1599       retries += 1
1600       if retries >= 10:
1601         raise errors.RemoteError("Can't contact node %s for mirror data,"
1602                                  " aborting." % node)
1603       time.sleep(6)
1604       continue
1605     rstats = rstats.data
1606     retries = 0
1607     for i, mstat in enumerate(rstats):
1608       if mstat is None:
1609         lu.LogWarning("Can't compute data for node %s/%s",
1610                            node, instance.disks[i].iv_name)
1611         continue
1612       # we ignore the ldisk parameter
1613       perc_done, est_time, is_degraded, _ = mstat
1614       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1615       if perc_done is not None:
1616         done = False
1617         if est_time is not None:
1618           rem_time = "%d estimated seconds remaining" % est_time
1619           max_time = est_time
1620         else:
1621           rem_time = "no time estimate"
1622         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1623                         (instance.disks[i].iv_name, perc_done, rem_time))
1624     if done or oneshot:
1625       break
1626
1627     time.sleep(min(60, max_time))
1628
1629   if done:
1630     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1631   return not cumul_degraded
1632
1633
1634 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1635   """Check that mirrors are not degraded.
1636
1637   The ldisk parameter, if True, will change the test from the
1638   is_degraded attribute (which represents overall non-ok status for
1639   the device(s)) to the ldisk (representing the local storage status).
1640
1641   """
1642   lu.cfg.SetDiskID(dev, node)
1643   if ldisk:
1644     idx = 6
1645   else:
1646     idx = 5
1647
1648   result = True
1649   if on_primary or dev.AssembleOnSecondary():
1650     rstats = lu.rpc.call_blockdev_find(node, dev)
1651     msg = rstats.RemoteFailMsg()
1652     if msg:
1653       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1654       result = False
1655     elif not rstats.payload:
1656       lu.LogWarning("Can't find disk on node %s", node)
1657       result = False
1658     else:
1659       result = result and (not rstats.payload[idx])
1660   if dev.children:
1661     for child in dev.children:
1662       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1663
1664   return result
1665
1666
1667 class LUDiagnoseOS(NoHooksLU):
1668   """Logical unit for OS diagnose/query.
1669
1670   """
1671   _OP_REQP = ["output_fields", "names"]
1672   REQ_BGL = False
1673   _FIELDS_STATIC = utils.FieldSet()
1674   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1675
1676   def ExpandNames(self):
1677     if self.op.names:
1678       raise errors.OpPrereqError("Selective OS query not supported")
1679
1680     _CheckOutputFields(static=self._FIELDS_STATIC,
1681                        dynamic=self._FIELDS_DYNAMIC,
1682                        selected=self.op.output_fields)
1683
1684     # Lock all nodes, in shared mode
1685     # Temporary removal of locks, should be reverted later
1686     # TODO: reintroduce locks when they are lighter-weight
1687     self.needed_locks = {}
1688     #self.share_locks[locking.LEVEL_NODE] = 1
1689     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1690
1691   def CheckPrereq(self):
1692     """Check prerequisites.
1693
1694     """
1695
1696   @staticmethod
1697   def _DiagnoseByOS(node_list, rlist):
1698     """Remaps a per-node return list into an a per-os per-node dictionary
1699
1700     @param node_list: a list with the names of all nodes
1701     @param rlist: a map with node names as keys and OS objects as values
1702
1703     @rtype: dict
1704     @return: a dictionary with osnames as keys and as value another map, with
1705         nodes as keys and list of OS objects as values, eg::
1706
1707           {"debian-etch": {"node1": [<object>,...],
1708                            "node2": [<object>,]}
1709           }
1710
1711     """
1712     all_os = {}
1713     # we build here the list of nodes that didn't fail the RPC (at RPC
1714     # level), so that nodes with a non-responding node daemon don't
1715     # make all OSes invalid
1716     good_nodes = [node_name for node_name in rlist
1717                   if not rlist[node_name].failed]
1718     for node_name, nr in rlist.iteritems():
1719       if nr.failed or not nr.data:
1720         continue
1721       for os_obj in nr.data:
1722         if os_obj.name not in all_os:
1723           # build a list of nodes for this os containing empty lists
1724           # for each node in node_list
1725           all_os[os_obj.name] = {}
1726           for nname in good_nodes:
1727             all_os[os_obj.name][nname] = []
1728         all_os[os_obj.name][node_name].append(os_obj)
1729     return all_os
1730
1731   def Exec(self, feedback_fn):
1732     """Compute the list of OSes.
1733
1734     """
1735     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1736     node_data = self.rpc.call_os_diagnose(valid_nodes)
1737     if node_data == False:
1738       raise errors.OpExecError("Can't gather the list of OSes")
1739     pol = self._DiagnoseByOS(valid_nodes, node_data)
1740     output = []
1741     for os_name, os_data in pol.iteritems():
1742       row = []
1743       for field in self.op.output_fields:
1744         if field == "name":
1745           val = os_name
1746         elif field == "valid":
1747           val = utils.all([osl and osl[0] for osl in os_data.values()])
1748         elif field == "node_status":
1749           val = {}
1750           for node_name, nos_list in os_data.iteritems():
1751             val[node_name] = [(v.status, v.path) for v in nos_list]
1752         else:
1753           raise errors.ParameterError(field)
1754         row.append(val)
1755       output.append(row)
1756
1757     return output
1758
1759
1760 class LURemoveNode(LogicalUnit):
1761   """Logical unit for removing a node.
1762
1763   """
1764   HPATH = "node-remove"
1765   HTYPE = constants.HTYPE_NODE
1766   _OP_REQP = ["node_name"]
1767
1768   def BuildHooksEnv(self):
1769     """Build hooks env.
1770
1771     This doesn't run on the target node in the pre phase as a failed
1772     node would then be impossible to remove.
1773
1774     """
1775     env = {
1776       "OP_TARGET": self.op.node_name,
1777       "NODE_NAME": self.op.node_name,
1778       }
1779     all_nodes = self.cfg.GetNodeList()
1780     all_nodes.remove(self.op.node_name)
1781     return env, all_nodes, all_nodes
1782
1783   def CheckPrereq(self):
1784     """Check prerequisites.
1785
1786     This checks:
1787      - the node exists in the configuration
1788      - it does not have primary or secondary instances
1789      - it's not the master
1790
1791     Any errors are signalled by raising errors.OpPrereqError.
1792
1793     """
1794     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1795     if node is None:
1796       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1797
1798     instance_list = self.cfg.GetInstanceList()
1799
1800     masternode = self.cfg.GetMasterNode()
1801     if node.name == masternode:
1802       raise errors.OpPrereqError("Node is the master node,"
1803                                  " you need to failover first.")
1804
1805     for instance_name in instance_list:
1806       instance = self.cfg.GetInstanceInfo(instance_name)
1807       if node.name in instance.all_nodes:
1808         raise errors.OpPrereqError("Instance %s is still running on the node,"
1809                                    " please remove first." % instance_name)
1810     self.op.node_name = node.name
1811     self.node = node
1812
1813   def Exec(self, feedback_fn):
1814     """Removes the node from the cluster.
1815
1816     """
1817     node = self.node
1818     logging.info("Stopping the node daemon and removing configs from node %s",
1819                  node.name)
1820
1821     self.context.RemoveNode(node.name)
1822
1823     self.rpc.call_node_leave_cluster(node.name)
1824
1825     # Promote nodes to master candidate as needed
1826     _AdjustCandidatePool(self)
1827
1828
1829 class LUQueryNodes(NoHooksLU):
1830   """Logical unit for querying nodes.
1831
1832   """
1833   _OP_REQP = ["output_fields", "names", "use_locking"]
1834   REQ_BGL = False
1835   _FIELDS_DYNAMIC = utils.FieldSet(
1836     "dtotal", "dfree",
1837     "mtotal", "mnode", "mfree",
1838     "bootid",
1839     "ctotal", "cnodes", "csockets",
1840     )
1841
1842   _FIELDS_STATIC = utils.FieldSet(
1843     "name", "pinst_cnt", "sinst_cnt",
1844     "pinst_list", "sinst_list",
1845     "pip", "sip", "tags",
1846     "serial_no",
1847     "master_candidate",
1848     "master",
1849     "offline",
1850     "drained",
1851     )
1852
1853   def ExpandNames(self):
1854     _CheckOutputFields(static=self._FIELDS_STATIC,
1855                        dynamic=self._FIELDS_DYNAMIC,
1856                        selected=self.op.output_fields)
1857
1858     self.needed_locks = {}
1859     self.share_locks[locking.LEVEL_NODE] = 1
1860
1861     if self.op.names:
1862       self.wanted = _GetWantedNodes(self, self.op.names)
1863     else:
1864       self.wanted = locking.ALL_SET
1865
1866     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1867     self.do_locking = self.do_node_query and self.op.use_locking
1868     if self.do_locking:
1869       # if we don't request only static fields, we need to lock the nodes
1870       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1871
1872
1873   def CheckPrereq(self):
1874     """Check prerequisites.
1875
1876     """
1877     # The validation of the node list is done in the _GetWantedNodes,
1878     # if non empty, and if empty, there's no validation to do
1879     pass
1880
1881   def Exec(self, feedback_fn):
1882     """Computes the list of nodes and their attributes.
1883
1884     """
1885     all_info = self.cfg.GetAllNodesInfo()
1886     if self.do_locking:
1887       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1888     elif self.wanted != locking.ALL_SET:
1889       nodenames = self.wanted
1890       missing = set(nodenames).difference(all_info.keys())
1891       if missing:
1892         raise errors.OpExecError(
1893           "Some nodes were removed before retrieving their data: %s" % missing)
1894     else:
1895       nodenames = all_info.keys()
1896
1897     nodenames = utils.NiceSort(nodenames)
1898     nodelist = [all_info[name] for name in nodenames]
1899
1900     # begin data gathering
1901
1902     if self.do_node_query:
1903       live_data = {}
1904       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1905                                           self.cfg.GetHypervisorType())
1906       for name in nodenames:
1907         nodeinfo = node_data[name]
1908         if not nodeinfo.failed and nodeinfo.data:
1909           nodeinfo = nodeinfo.data
1910           fn = utils.TryConvert
1911           live_data[name] = {
1912             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1913             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1914             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1915             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1916             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1917             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1918             "bootid": nodeinfo.get('bootid', None),
1919             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1920             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1921             }
1922         else:
1923           live_data[name] = {}
1924     else:
1925       live_data = dict.fromkeys(nodenames, {})
1926
1927     node_to_primary = dict([(name, set()) for name in nodenames])
1928     node_to_secondary = dict([(name, set()) for name in nodenames])
1929
1930     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1931                              "sinst_cnt", "sinst_list"))
1932     if inst_fields & frozenset(self.op.output_fields):
1933       instancelist = self.cfg.GetInstanceList()
1934
1935       for instance_name in instancelist:
1936         inst = self.cfg.GetInstanceInfo(instance_name)
1937         if inst.primary_node in node_to_primary:
1938           node_to_primary[inst.primary_node].add(inst.name)
1939         for secnode in inst.secondary_nodes:
1940           if secnode in node_to_secondary:
1941             node_to_secondary[secnode].add(inst.name)
1942
1943     master_node = self.cfg.GetMasterNode()
1944
1945     # end data gathering
1946
1947     output = []
1948     for node in nodelist:
1949       node_output = []
1950       for field in self.op.output_fields:
1951         if field == "name":
1952           val = node.name
1953         elif field == "pinst_list":
1954           val = list(node_to_primary[node.name])
1955         elif field == "sinst_list":
1956           val = list(node_to_secondary[node.name])
1957         elif field == "pinst_cnt":
1958           val = len(node_to_primary[node.name])
1959         elif field == "sinst_cnt":
1960           val = len(node_to_secondary[node.name])
1961         elif field == "pip":
1962           val = node.primary_ip
1963         elif field == "sip":
1964           val = node.secondary_ip
1965         elif field == "tags":
1966           val = list(node.GetTags())
1967         elif field == "serial_no":
1968           val = node.serial_no
1969         elif field == "master_candidate":
1970           val = node.master_candidate
1971         elif field == "master":
1972           val = node.name == master_node
1973         elif field == "offline":
1974           val = node.offline
1975         elif field == "drained":
1976           val = node.drained
1977         elif self._FIELDS_DYNAMIC.Matches(field):
1978           val = live_data[node.name].get(field, None)
1979         else:
1980           raise errors.ParameterError(field)
1981         node_output.append(val)
1982       output.append(node_output)
1983
1984     return output
1985
1986
1987 class LUQueryNodeVolumes(NoHooksLU):
1988   """Logical unit for getting volumes on node(s).
1989
1990   """
1991   _OP_REQP = ["nodes", "output_fields"]
1992   REQ_BGL = False
1993   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1994   _FIELDS_STATIC = utils.FieldSet("node")
1995
1996   def ExpandNames(self):
1997     _CheckOutputFields(static=self._FIELDS_STATIC,
1998                        dynamic=self._FIELDS_DYNAMIC,
1999                        selected=self.op.output_fields)
2000
2001     self.needed_locks = {}
2002     self.share_locks[locking.LEVEL_NODE] = 1
2003     if not self.op.nodes:
2004       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2005     else:
2006       self.needed_locks[locking.LEVEL_NODE] = \
2007         _GetWantedNodes(self, self.op.nodes)
2008
2009   def CheckPrereq(self):
2010     """Check prerequisites.
2011
2012     This checks that the fields required are valid output fields.
2013
2014     """
2015     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2016
2017   def Exec(self, feedback_fn):
2018     """Computes the list of nodes and their attributes.
2019
2020     """
2021     nodenames = self.nodes
2022     volumes = self.rpc.call_node_volumes(nodenames)
2023
2024     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2025              in self.cfg.GetInstanceList()]
2026
2027     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2028
2029     output = []
2030     for node in nodenames:
2031       if node not in volumes or volumes[node].failed or not volumes[node].data:
2032         continue
2033
2034       node_vols = volumes[node].data[:]
2035       node_vols.sort(key=lambda vol: vol['dev'])
2036
2037       for vol in node_vols:
2038         node_output = []
2039         for field in self.op.output_fields:
2040           if field == "node":
2041             val = node
2042           elif field == "phys":
2043             val = vol['dev']
2044           elif field == "vg":
2045             val = vol['vg']
2046           elif field == "name":
2047             val = vol['name']
2048           elif field == "size":
2049             val = int(float(vol['size']))
2050           elif field == "instance":
2051             for inst in ilist:
2052               if node not in lv_by_node[inst]:
2053                 continue
2054               if vol['name'] in lv_by_node[inst][node]:
2055                 val = inst.name
2056                 break
2057             else:
2058               val = '-'
2059           else:
2060             raise errors.ParameterError(field)
2061           node_output.append(str(val))
2062
2063         output.append(node_output)
2064
2065     return output
2066
2067
2068 class LUAddNode(LogicalUnit):
2069   """Logical unit for adding node to the cluster.
2070
2071   """
2072   HPATH = "node-add"
2073   HTYPE = constants.HTYPE_NODE
2074   _OP_REQP = ["node_name"]
2075
2076   def BuildHooksEnv(self):
2077     """Build hooks env.
2078
2079     This will run on all nodes before, and on all nodes + the new node after.
2080
2081     """
2082     env = {
2083       "OP_TARGET": self.op.node_name,
2084       "NODE_NAME": self.op.node_name,
2085       "NODE_PIP": self.op.primary_ip,
2086       "NODE_SIP": self.op.secondary_ip,
2087       }
2088     nodes_0 = self.cfg.GetNodeList()
2089     nodes_1 = nodes_0 + [self.op.node_name, ]
2090     return env, nodes_0, nodes_1
2091
2092   def CheckPrereq(self):
2093     """Check prerequisites.
2094
2095     This checks:
2096      - the new node is not already in the config
2097      - it is resolvable
2098      - its parameters (single/dual homed) matches the cluster
2099
2100     Any errors are signalled by raising errors.OpPrereqError.
2101
2102     """
2103     node_name = self.op.node_name
2104     cfg = self.cfg
2105
2106     dns_data = utils.HostInfo(node_name)
2107
2108     node = dns_data.name
2109     primary_ip = self.op.primary_ip = dns_data.ip
2110     secondary_ip = getattr(self.op, "secondary_ip", None)
2111     if secondary_ip is None:
2112       secondary_ip = primary_ip
2113     if not utils.IsValidIP(secondary_ip):
2114       raise errors.OpPrereqError("Invalid secondary IP given")
2115     self.op.secondary_ip = secondary_ip
2116
2117     node_list = cfg.GetNodeList()
2118     if not self.op.readd and node in node_list:
2119       raise errors.OpPrereqError("Node %s is already in the configuration" %
2120                                  node)
2121     elif self.op.readd and node not in node_list:
2122       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2123
2124     for existing_node_name in node_list:
2125       existing_node = cfg.GetNodeInfo(existing_node_name)
2126
2127       if self.op.readd and node == existing_node_name:
2128         if (existing_node.primary_ip != primary_ip or
2129             existing_node.secondary_ip != secondary_ip):
2130           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2131                                      " address configuration as before")
2132         continue
2133
2134       if (existing_node.primary_ip == primary_ip or
2135           existing_node.secondary_ip == primary_ip or
2136           existing_node.primary_ip == secondary_ip or
2137           existing_node.secondary_ip == secondary_ip):
2138         raise errors.OpPrereqError("New node ip address(es) conflict with"
2139                                    " existing node %s" % existing_node.name)
2140
2141     # check that the type of the node (single versus dual homed) is the
2142     # same as for the master
2143     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2144     master_singlehomed = myself.secondary_ip == myself.primary_ip
2145     newbie_singlehomed = secondary_ip == primary_ip
2146     if master_singlehomed != newbie_singlehomed:
2147       if master_singlehomed:
2148         raise errors.OpPrereqError("The master has no private ip but the"
2149                                    " new node has one")
2150       else:
2151         raise errors.OpPrereqError("The master has a private ip but the"
2152                                    " new node doesn't have one")
2153
2154     # checks reachablity
2155     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2156       raise errors.OpPrereqError("Node not reachable by ping")
2157
2158     if not newbie_singlehomed:
2159       # check reachability from my secondary ip to newbie's secondary ip
2160       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2161                            source=myself.secondary_ip):
2162         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2163                                    " based ping to noded port")
2164
2165     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2166     mc_now, _ = self.cfg.GetMasterCandidateStats()
2167     master_candidate = mc_now < cp_size
2168
2169     self.new_node = objects.Node(name=node,
2170                                  primary_ip=primary_ip,
2171                                  secondary_ip=secondary_ip,
2172                                  master_candidate=master_candidate,
2173                                  offline=False, drained=False)
2174
2175   def Exec(self, feedback_fn):
2176     """Adds the new node to the cluster.
2177
2178     """
2179     new_node = self.new_node
2180     node = new_node.name
2181
2182     # check connectivity
2183     result = self.rpc.call_version([node])[node]
2184     result.Raise()
2185     if result.data:
2186       if constants.PROTOCOL_VERSION == result.data:
2187         logging.info("Communication to node %s fine, sw version %s match",
2188                      node, result.data)
2189       else:
2190         raise errors.OpExecError("Version mismatch master version %s,"
2191                                  " node version %s" %
2192                                  (constants.PROTOCOL_VERSION, result.data))
2193     else:
2194       raise errors.OpExecError("Cannot get version from the new node")
2195
2196     # setup ssh on node
2197     logging.info("Copy ssh key to node %s", node)
2198     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2199     keyarray = []
2200     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2201                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2202                 priv_key, pub_key]
2203
2204     for i in keyfiles:
2205       f = open(i, 'r')
2206       try:
2207         keyarray.append(f.read())
2208       finally:
2209         f.close()
2210
2211     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2212                                     keyarray[2],
2213                                     keyarray[3], keyarray[4], keyarray[5])
2214
2215     msg = result.RemoteFailMsg()
2216     if msg:
2217       raise errors.OpExecError("Cannot transfer ssh keys to the"
2218                                " new node: %s" % msg)
2219
2220     # Add node to our /etc/hosts, and add key to known_hosts
2221     utils.AddHostToEtcHosts(new_node.name)
2222
2223     if new_node.secondary_ip != new_node.primary_ip:
2224       result = self.rpc.call_node_has_ip_address(new_node.name,
2225                                                  new_node.secondary_ip)
2226       if result.failed or not result.data:
2227         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2228                                  " you gave (%s). Please fix and re-run this"
2229                                  " command." % new_node.secondary_ip)
2230
2231     node_verify_list = [self.cfg.GetMasterNode()]
2232     node_verify_param = {
2233       'nodelist': [node],
2234       # TODO: do a node-net-test as well?
2235     }
2236
2237     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2238                                        self.cfg.GetClusterName())
2239     for verifier in node_verify_list:
2240       if result[verifier].failed or not result[verifier].data:
2241         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2242                                  " for remote verification" % verifier)
2243       if result[verifier].data['nodelist']:
2244         for failed in result[verifier].data['nodelist']:
2245           feedback_fn("ssh/hostname verification failed %s -> %s" %
2246                       (verifier, result[verifier].data['nodelist'][failed]))
2247         raise errors.OpExecError("ssh/hostname verification failed.")
2248
2249     # Distribute updated /etc/hosts and known_hosts to all nodes,
2250     # including the node just added
2251     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2252     dist_nodes = self.cfg.GetNodeList()
2253     if not self.op.readd:
2254       dist_nodes.append(node)
2255     if myself.name in dist_nodes:
2256       dist_nodes.remove(myself.name)
2257
2258     logging.debug("Copying hosts and known_hosts to all nodes")
2259     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2260       result = self.rpc.call_upload_file(dist_nodes, fname)
2261       for to_node, to_result in result.iteritems():
2262         if to_result.failed or not to_result.data:
2263           logging.error("Copy of file %s to node %s failed", fname, to_node)
2264
2265     to_copy = []
2266     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2267     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2268       to_copy.append(constants.VNC_PASSWORD_FILE)
2269
2270     for fname in to_copy:
2271       result = self.rpc.call_upload_file([node], fname)
2272       if result[node].failed or not result[node]:
2273         logging.error("Could not copy file %s to node %s", fname, node)
2274
2275     if self.op.readd:
2276       self.context.ReaddNode(new_node)
2277     else:
2278       self.context.AddNode(new_node)
2279
2280
2281 class LUSetNodeParams(LogicalUnit):
2282   """Modifies the parameters of a node.
2283
2284   """
2285   HPATH = "node-modify"
2286   HTYPE = constants.HTYPE_NODE
2287   _OP_REQP = ["node_name"]
2288   REQ_BGL = False
2289
2290   def CheckArguments(self):
2291     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2292     if node_name is None:
2293       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2294     self.op.node_name = node_name
2295     _CheckBooleanOpField(self.op, 'master_candidate')
2296     _CheckBooleanOpField(self.op, 'offline')
2297     _CheckBooleanOpField(self.op, 'drained')
2298     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2299     if all_mods.count(None) == 3:
2300       raise errors.OpPrereqError("Please pass at least one modification")
2301     if all_mods.count(True) > 1:
2302       raise errors.OpPrereqError("Can't set the node into more than one"
2303                                  " state at the same time")
2304
2305   def ExpandNames(self):
2306     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2307
2308   def BuildHooksEnv(self):
2309     """Build hooks env.
2310
2311     This runs on the master node.
2312
2313     """
2314     env = {
2315       "OP_TARGET": self.op.node_name,
2316       "MASTER_CANDIDATE": str(self.op.master_candidate),
2317       "OFFLINE": str(self.op.offline),
2318       "DRAINED": str(self.op.drained),
2319       }
2320     nl = [self.cfg.GetMasterNode(),
2321           self.op.node_name]
2322     return env, nl, nl
2323
2324   def CheckPrereq(self):
2325     """Check prerequisites.
2326
2327     This only checks the instance list against the existing names.
2328
2329     """
2330     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2331
2332     if ((self.op.master_candidate == False or self.op.offline == True or
2333          self.op.drained == True) and node.master_candidate):
2334       # we will demote the node from master_candidate
2335       if self.op.node_name == self.cfg.GetMasterNode():
2336         raise errors.OpPrereqError("The master node has to be a"
2337                                    " master candidate, online and not drained")
2338       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2339       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2340       if num_candidates <= cp_size:
2341         msg = ("Not enough master candidates (desired"
2342                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2343         if self.op.force:
2344           self.LogWarning(msg)
2345         else:
2346           raise errors.OpPrereqError(msg)
2347
2348     if (self.op.master_candidate == True and
2349         ((node.offline and not self.op.offline == False) or
2350          (node.drained and not self.op.drained == False))):
2351       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2352                                  " to master_candidate" % node.name)
2353
2354     return
2355
2356   def Exec(self, feedback_fn):
2357     """Modifies a node.
2358
2359     """
2360     node = self.node
2361
2362     result = []
2363     changed_mc = False
2364
2365     if self.op.offline is not None:
2366       node.offline = self.op.offline
2367       result.append(("offline", str(self.op.offline)))
2368       if self.op.offline == True:
2369         if node.master_candidate:
2370           node.master_candidate = False
2371           changed_mc = True
2372           result.append(("master_candidate", "auto-demotion due to offline"))
2373         if node.drained:
2374           node.drained = False
2375           result.append(("drained", "clear drained status due to offline"))
2376
2377     if self.op.master_candidate is not None:
2378       node.master_candidate = self.op.master_candidate
2379       changed_mc = True
2380       result.append(("master_candidate", str(self.op.master_candidate)))
2381       if self.op.master_candidate == False:
2382         rrc = self.rpc.call_node_demote_from_mc(node.name)
2383         msg = rrc.RemoteFailMsg()
2384         if msg:
2385           self.LogWarning("Node failed to demote itself: %s" % msg)
2386
2387     if self.op.drained is not None:
2388       node.drained = self.op.drained
2389       result.append(("drained", str(self.op.drained)))
2390       if self.op.drained == True:
2391         if node.master_candidate:
2392           node.master_candidate = False
2393           changed_mc = True
2394           result.append(("master_candidate", "auto-demotion due to drain"))
2395         if node.offline:
2396           node.offline = False
2397           result.append(("offline", "clear offline status due to drain"))
2398
2399     # this will trigger configuration file update, if needed
2400     self.cfg.Update(node)
2401     # this will trigger job queue propagation or cleanup
2402     if changed_mc:
2403       self.context.ReaddNode(node)
2404
2405     return result
2406
2407
2408 class LUQueryClusterInfo(NoHooksLU):
2409   """Query cluster configuration.
2410
2411   """
2412   _OP_REQP = []
2413   REQ_BGL = False
2414
2415   def ExpandNames(self):
2416     self.needed_locks = {}
2417
2418   def CheckPrereq(self):
2419     """No prerequsites needed for this LU.
2420
2421     """
2422     pass
2423
2424   def Exec(self, feedback_fn):
2425     """Return cluster config.
2426
2427     """
2428     cluster = self.cfg.GetClusterInfo()
2429     result = {
2430       "software_version": constants.RELEASE_VERSION,
2431       "protocol_version": constants.PROTOCOL_VERSION,
2432       "config_version": constants.CONFIG_VERSION,
2433       "os_api_version": constants.OS_API_VERSION,
2434       "export_version": constants.EXPORT_VERSION,
2435       "architecture": (platform.architecture()[0], platform.machine()),
2436       "name": cluster.cluster_name,
2437       "master": cluster.master_node,
2438       "default_hypervisor": cluster.default_hypervisor,
2439       "enabled_hypervisors": cluster.enabled_hypervisors,
2440       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2441                         for hypervisor in cluster.enabled_hypervisors]),
2442       "beparams": cluster.beparams,
2443       "candidate_pool_size": cluster.candidate_pool_size,
2444       }
2445
2446     return result
2447
2448
2449 class LUQueryConfigValues(NoHooksLU):
2450   """Return configuration values.
2451
2452   """
2453   _OP_REQP = []
2454   REQ_BGL = False
2455   _FIELDS_DYNAMIC = utils.FieldSet()
2456   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2457
2458   def ExpandNames(self):
2459     self.needed_locks = {}
2460
2461     _CheckOutputFields(static=self._FIELDS_STATIC,
2462                        dynamic=self._FIELDS_DYNAMIC,
2463                        selected=self.op.output_fields)
2464
2465   def CheckPrereq(self):
2466     """No prerequisites.
2467
2468     """
2469     pass
2470
2471   def Exec(self, feedback_fn):
2472     """Dump a representation of the cluster config to the standard output.
2473
2474     """
2475     values = []
2476     for field in self.op.output_fields:
2477       if field == "cluster_name":
2478         entry = self.cfg.GetClusterName()
2479       elif field == "master_node":
2480         entry = self.cfg.GetMasterNode()
2481       elif field == "drain_flag":
2482         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2483       else:
2484         raise errors.ParameterError(field)
2485       values.append(entry)
2486     return values
2487
2488
2489 class LUActivateInstanceDisks(NoHooksLU):
2490   """Bring up an instance's disks.
2491
2492   """
2493   _OP_REQP = ["instance_name"]
2494   REQ_BGL = False
2495
2496   def ExpandNames(self):
2497     self._ExpandAndLockInstance()
2498     self.needed_locks[locking.LEVEL_NODE] = []
2499     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2500
2501   def DeclareLocks(self, level):
2502     if level == locking.LEVEL_NODE:
2503       self._LockInstancesNodes()
2504
2505   def CheckPrereq(self):
2506     """Check prerequisites.
2507
2508     This checks that the instance is in the cluster.
2509
2510     """
2511     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2512     assert self.instance is not None, \
2513       "Cannot retrieve locked instance %s" % self.op.instance_name
2514     _CheckNodeOnline(self, self.instance.primary_node)
2515
2516   def Exec(self, feedback_fn):
2517     """Activate the disks.
2518
2519     """
2520     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2521     if not disks_ok:
2522       raise errors.OpExecError("Cannot activate block devices")
2523
2524     return disks_info
2525
2526
2527 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2528   """Prepare the block devices for an instance.
2529
2530   This sets up the block devices on all nodes.
2531
2532   @type lu: L{LogicalUnit}
2533   @param lu: the logical unit on whose behalf we execute
2534   @type instance: L{objects.Instance}
2535   @param instance: the instance for whose disks we assemble
2536   @type ignore_secondaries: boolean
2537   @param ignore_secondaries: if true, errors on secondary nodes
2538       won't result in an error return from the function
2539   @return: False if the operation failed, otherwise a list of
2540       (host, instance_visible_name, node_visible_name)
2541       with the mapping from node devices to instance devices
2542
2543   """
2544   device_info = []
2545   disks_ok = True
2546   iname = instance.name
2547   # With the two passes mechanism we try to reduce the window of
2548   # opportunity for the race condition of switching DRBD to primary
2549   # before handshaking occured, but we do not eliminate it
2550
2551   # The proper fix would be to wait (with some limits) until the
2552   # connection has been made and drbd transitions from WFConnection
2553   # into any other network-connected state (Connected, SyncTarget,
2554   # SyncSource, etc.)
2555
2556   # 1st pass, assemble on all nodes in secondary mode
2557   for inst_disk in instance.disks:
2558     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2559       lu.cfg.SetDiskID(node_disk, node)
2560       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2561       msg = result.RemoteFailMsg()
2562       if msg:
2563         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2564                            " (is_primary=False, pass=1): %s",
2565                            inst_disk.iv_name, node, msg)
2566         if not ignore_secondaries:
2567           disks_ok = False
2568
2569   # FIXME: race condition on drbd migration to primary
2570
2571   # 2nd pass, do only the primary node
2572   for inst_disk in instance.disks:
2573     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2574       if node != instance.primary_node:
2575         continue
2576       lu.cfg.SetDiskID(node_disk, node)
2577       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2578       msg = result.RemoteFailMsg()
2579       if msg:
2580         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2581                            " (is_primary=True, pass=2): %s",
2582                            inst_disk.iv_name, node, msg)
2583         disks_ok = False
2584     device_info.append((instance.primary_node, inst_disk.iv_name,
2585                         result.payload))
2586
2587   # leave the disks configured for the primary node
2588   # this is a workaround that would be fixed better by
2589   # improving the logical/physical id handling
2590   for disk in instance.disks:
2591     lu.cfg.SetDiskID(disk, instance.primary_node)
2592
2593   return disks_ok, device_info
2594
2595
2596 def _StartInstanceDisks(lu, instance, force):
2597   """Start the disks of an instance.
2598
2599   """
2600   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2601                                            ignore_secondaries=force)
2602   if not disks_ok:
2603     _ShutdownInstanceDisks(lu, instance)
2604     if force is not None and not force:
2605       lu.proc.LogWarning("", hint="If the message above refers to a"
2606                          " secondary node,"
2607                          " you can retry the operation using '--force'.")
2608     raise errors.OpExecError("Disk consistency error")
2609
2610
2611 class LUDeactivateInstanceDisks(NoHooksLU):
2612   """Shutdown an instance's disks.
2613
2614   """
2615   _OP_REQP = ["instance_name"]
2616   REQ_BGL = False
2617
2618   def ExpandNames(self):
2619     self._ExpandAndLockInstance()
2620     self.needed_locks[locking.LEVEL_NODE] = []
2621     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2622
2623   def DeclareLocks(self, level):
2624     if level == locking.LEVEL_NODE:
2625       self._LockInstancesNodes()
2626
2627   def CheckPrereq(self):
2628     """Check prerequisites.
2629
2630     This checks that the instance is in the cluster.
2631
2632     """
2633     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2634     assert self.instance is not None, \
2635       "Cannot retrieve locked instance %s" % self.op.instance_name
2636
2637   def Exec(self, feedback_fn):
2638     """Deactivate the disks
2639
2640     """
2641     instance = self.instance
2642     _SafeShutdownInstanceDisks(self, instance)
2643
2644
2645 def _SafeShutdownInstanceDisks(lu, instance):
2646   """Shutdown block devices of an instance.
2647
2648   This function checks if an instance is running, before calling
2649   _ShutdownInstanceDisks.
2650
2651   """
2652   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2653                                       [instance.hypervisor])
2654   ins_l = ins_l[instance.primary_node]
2655   if ins_l.failed or not isinstance(ins_l.data, list):
2656     raise errors.OpExecError("Can't contact node '%s'" %
2657                              instance.primary_node)
2658
2659   if instance.name in ins_l.data:
2660     raise errors.OpExecError("Instance is running, can't shutdown"
2661                              " block devices.")
2662
2663   _ShutdownInstanceDisks(lu, instance)
2664
2665
2666 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2667   """Shutdown block devices of an instance.
2668
2669   This does the shutdown on all nodes of the instance.
2670
2671   If the ignore_primary is false, errors on the primary node are
2672   ignored.
2673
2674   """
2675   all_result = True
2676   for disk in instance.disks:
2677     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2678       lu.cfg.SetDiskID(top_disk, node)
2679       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2680       msg = result.RemoteFailMsg()
2681       if msg:
2682         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2683                       disk.iv_name, node, msg)
2684         if not ignore_primary or node != instance.primary_node:
2685           all_result = False
2686   return all_result
2687
2688
2689 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2690   """Checks if a node has enough free memory.
2691
2692   This function check if a given node has the needed amount of free
2693   memory. In case the node has less memory or we cannot get the
2694   information from the node, this function raise an OpPrereqError
2695   exception.
2696
2697   @type lu: C{LogicalUnit}
2698   @param lu: a logical unit from which we get configuration data
2699   @type node: C{str}
2700   @param node: the node to check
2701   @type reason: C{str}
2702   @param reason: string to use in the error message
2703   @type requested: C{int}
2704   @param requested: the amount of memory in MiB to check for
2705   @type hypervisor_name: C{str}
2706   @param hypervisor_name: the hypervisor to ask for memory stats
2707   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2708       we cannot check the node
2709
2710   """
2711   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2712   nodeinfo[node].Raise()
2713   free_mem = nodeinfo[node].data.get('memory_free')
2714   if not isinstance(free_mem, int):
2715     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2716                              " was '%s'" % (node, free_mem))
2717   if requested > free_mem:
2718     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2719                              " needed %s MiB, available %s MiB" %
2720                              (node, reason, requested, free_mem))
2721
2722
2723 class LUStartupInstance(LogicalUnit):
2724   """Starts an instance.
2725
2726   """
2727   HPATH = "instance-start"
2728   HTYPE = constants.HTYPE_INSTANCE
2729   _OP_REQP = ["instance_name", "force"]
2730   REQ_BGL = False
2731
2732   def ExpandNames(self):
2733     self._ExpandAndLockInstance()
2734
2735   def BuildHooksEnv(self):
2736     """Build hooks env.
2737
2738     This runs on master, primary and secondary nodes of the instance.
2739
2740     """
2741     env = {
2742       "FORCE": self.op.force,
2743       }
2744     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2745     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2746     return env, nl, nl
2747
2748   def CheckPrereq(self):
2749     """Check prerequisites.
2750
2751     This checks that the instance is in the cluster.
2752
2753     """
2754     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2755     assert self.instance is not None, \
2756       "Cannot retrieve locked instance %s" % self.op.instance_name
2757
2758     _CheckNodeOnline(self, instance.primary_node)
2759
2760     bep = self.cfg.GetClusterInfo().FillBE(instance)
2761     # check bridges existance
2762     _CheckInstanceBridgesExist(self, instance)
2763
2764     _CheckNodeFreeMemory(self, instance.primary_node,
2765                          "starting instance %s" % instance.name,
2766                          bep[constants.BE_MEMORY], instance.hypervisor)
2767
2768   def Exec(self, feedback_fn):
2769     """Start the instance.
2770
2771     """
2772     instance = self.instance
2773     force = self.op.force
2774
2775     self.cfg.MarkInstanceUp(instance.name)
2776
2777     node_current = instance.primary_node
2778
2779     _StartInstanceDisks(self, instance, force)
2780
2781     result = self.rpc.call_instance_start(node_current, instance)
2782     msg = result.RemoteFailMsg()
2783     if msg:
2784       _ShutdownInstanceDisks(self, instance)
2785       raise errors.OpExecError("Could not start instance: %s" % msg)
2786
2787
2788 class LURebootInstance(LogicalUnit):
2789   """Reboot an instance.
2790
2791   """
2792   HPATH = "instance-reboot"
2793   HTYPE = constants.HTYPE_INSTANCE
2794   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2795   REQ_BGL = False
2796
2797   def ExpandNames(self):
2798     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2799                                    constants.INSTANCE_REBOOT_HARD,
2800                                    constants.INSTANCE_REBOOT_FULL]:
2801       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2802                                   (constants.INSTANCE_REBOOT_SOFT,
2803                                    constants.INSTANCE_REBOOT_HARD,
2804                                    constants.INSTANCE_REBOOT_FULL))
2805     self._ExpandAndLockInstance()
2806
2807   def BuildHooksEnv(self):
2808     """Build hooks env.
2809
2810     This runs on master, primary and secondary nodes of the instance.
2811
2812     """
2813     env = {
2814       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2815       "REBOOT_TYPE": self.op.reboot_type,
2816       }
2817     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2818     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2819     return env, nl, nl
2820
2821   def CheckPrereq(self):
2822     """Check prerequisites.
2823
2824     This checks that the instance is in the cluster.
2825
2826     """
2827     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2828     assert self.instance is not None, \
2829       "Cannot retrieve locked instance %s" % self.op.instance_name
2830
2831     _CheckNodeOnline(self, instance.primary_node)
2832
2833     # check bridges existance
2834     _CheckInstanceBridgesExist(self, instance)
2835
2836   def Exec(self, feedback_fn):
2837     """Reboot the instance.
2838
2839     """
2840     instance = self.instance
2841     ignore_secondaries = self.op.ignore_secondaries
2842     reboot_type = self.op.reboot_type
2843
2844     node_current = instance.primary_node
2845
2846     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2847                        constants.INSTANCE_REBOOT_HARD]:
2848       for disk in instance.disks:
2849         self.cfg.SetDiskID(disk, node_current)
2850       result = self.rpc.call_instance_reboot(node_current, instance,
2851                                              reboot_type)
2852       msg = result.RemoteFailMsg()
2853       if msg:
2854         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2855     else:
2856       result = self.rpc.call_instance_shutdown(node_current, instance)
2857       msg = result.RemoteFailMsg()
2858       if msg:
2859         raise errors.OpExecError("Could not shutdown instance for"
2860                                  " full reboot: %s" % msg)
2861       _ShutdownInstanceDisks(self, instance)
2862       _StartInstanceDisks(self, instance, ignore_secondaries)
2863       result = self.rpc.call_instance_start(node_current, instance)
2864       msg = result.RemoteFailMsg()
2865       if msg:
2866         _ShutdownInstanceDisks(self, instance)
2867         raise errors.OpExecError("Could not start instance for"
2868                                  " full reboot: %s" % msg)
2869
2870     self.cfg.MarkInstanceUp(instance.name)
2871
2872
2873 class LUShutdownInstance(LogicalUnit):
2874   """Shutdown an instance.
2875
2876   """
2877   HPATH = "instance-stop"
2878   HTYPE = constants.HTYPE_INSTANCE
2879   _OP_REQP = ["instance_name"]
2880   REQ_BGL = False
2881
2882   def ExpandNames(self):
2883     self._ExpandAndLockInstance()
2884
2885   def BuildHooksEnv(self):
2886     """Build hooks env.
2887
2888     This runs on master, primary and secondary nodes of the instance.
2889
2890     """
2891     env = _BuildInstanceHookEnvByObject(self, self.instance)
2892     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2893     return env, nl, nl
2894
2895   def CheckPrereq(self):
2896     """Check prerequisites.
2897
2898     This checks that the instance is in the cluster.
2899
2900     """
2901     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2902     assert self.instance is not None, \
2903       "Cannot retrieve locked instance %s" % self.op.instance_name
2904     _CheckNodeOnline(self, self.instance.primary_node)
2905
2906   def Exec(self, feedback_fn):
2907     """Shutdown the instance.
2908
2909     """
2910     instance = self.instance
2911     node_current = instance.primary_node
2912     self.cfg.MarkInstanceDown(instance.name)
2913     result = self.rpc.call_instance_shutdown(node_current, instance)
2914     msg = result.RemoteFailMsg()
2915     if msg:
2916       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2917
2918     _ShutdownInstanceDisks(self, instance)
2919
2920
2921 class LUReinstallInstance(LogicalUnit):
2922   """Reinstall an instance.
2923
2924   """
2925   HPATH = "instance-reinstall"
2926   HTYPE = constants.HTYPE_INSTANCE
2927   _OP_REQP = ["instance_name"]
2928   REQ_BGL = False
2929
2930   def ExpandNames(self):
2931     self._ExpandAndLockInstance()
2932
2933   def BuildHooksEnv(self):
2934     """Build hooks env.
2935
2936     This runs on master, primary and secondary nodes of the instance.
2937
2938     """
2939     env = _BuildInstanceHookEnvByObject(self, self.instance)
2940     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2941     return env, nl, nl
2942
2943   def CheckPrereq(self):
2944     """Check prerequisites.
2945
2946     This checks that the instance is in the cluster and is not running.
2947
2948     """
2949     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2950     assert instance is not None, \
2951       "Cannot retrieve locked instance %s" % self.op.instance_name
2952     _CheckNodeOnline(self, instance.primary_node)
2953
2954     if instance.disk_template == constants.DT_DISKLESS:
2955       raise errors.OpPrereqError("Instance '%s' has no disks" %
2956                                  self.op.instance_name)
2957     if instance.admin_up:
2958       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2959                                  self.op.instance_name)
2960     remote_info = self.rpc.call_instance_info(instance.primary_node,
2961                                               instance.name,
2962                                               instance.hypervisor)
2963     if remote_info.failed or remote_info.data:
2964       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2965                                  (self.op.instance_name,
2966                                   instance.primary_node))
2967
2968     self.op.os_type = getattr(self.op, "os_type", None)
2969     if self.op.os_type is not None:
2970       # OS verification
2971       pnode = self.cfg.GetNodeInfo(
2972         self.cfg.ExpandNodeName(instance.primary_node))
2973       if pnode is None:
2974         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2975                                    self.op.pnode)
2976       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2977       result.Raise()
2978       if not isinstance(result.data, objects.OS):
2979         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2980                                    " primary node"  % self.op.os_type)
2981
2982     self.instance = instance
2983
2984   def Exec(self, feedback_fn):
2985     """Reinstall the instance.
2986
2987     """
2988     inst = self.instance
2989
2990     if self.op.os_type is not None:
2991       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2992       inst.os = self.op.os_type
2993       self.cfg.Update(inst)
2994
2995     _StartInstanceDisks(self, inst, None)
2996     try:
2997       feedback_fn("Running the instance OS create scripts...")
2998       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2999       msg = result.RemoteFailMsg()
3000       if msg:
3001         raise errors.OpExecError("Could not install OS for instance %s"
3002                                  " on node %s: %s" %
3003                                  (inst.name, inst.primary_node, msg))
3004     finally:
3005       _ShutdownInstanceDisks(self, inst)
3006
3007
3008 class LURenameInstance(LogicalUnit):
3009   """Rename an instance.
3010
3011   """
3012   HPATH = "instance-rename"
3013   HTYPE = constants.HTYPE_INSTANCE
3014   _OP_REQP = ["instance_name", "new_name"]
3015
3016   def BuildHooksEnv(self):
3017     """Build hooks env.
3018
3019     This runs on master, primary and secondary nodes of the instance.
3020
3021     """
3022     env = _BuildInstanceHookEnvByObject(self, self.instance)
3023     env["INSTANCE_NEW_NAME"] = self.op.new_name
3024     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3025     return env, nl, nl
3026
3027   def CheckPrereq(self):
3028     """Check prerequisites.
3029
3030     This checks that the instance is in the cluster and is not running.
3031
3032     """
3033     instance = self.cfg.GetInstanceInfo(
3034       self.cfg.ExpandInstanceName(self.op.instance_name))
3035     if instance is None:
3036       raise errors.OpPrereqError("Instance '%s' not known" %
3037                                  self.op.instance_name)
3038     _CheckNodeOnline(self, instance.primary_node)
3039
3040     if instance.admin_up:
3041       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3042                                  self.op.instance_name)
3043     remote_info = self.rpc.call_instance_info(instance.primary_node,
3044                                               instance.name,
3045                                               instance.hypervisor)
3046     remote_info.Raise()
3047     if remote_info.data:
3048       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3049                                  (self.op.instance_name,
3050                                   instance.primary_node))
3051     self.instance = instance
3052
3053     # new name verification
3054     name_info = utils.HostInfo(self.op.new_name)
3055
3056     self.op.new_name = new_name = name_info.name
3057     instance_list = self.cfg.GetInstanceList()
3058     if new_name in instance_list:
3059       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3060                                  new_name)
3061
3062     if not getattr(self.op, "ignore_ip", False):
3063       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3064         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3065                                    (name_info.ip, new_name))
3066
3067
3068   def Exec(self, feedback_fn):
3069     """Reinstall the instance.
3070
3071     """
3072     inst = self.instance
3073     old_name = inst.name
3074
3075     if inst.disk_template == constants.DT_FILE:
3076       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3077
3078     self.cfg.RenameInstance(inst.name, self.op.new_name)
3079     # Change the instance lock. This is definitely safe while we hold the BGL
3080     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3081     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3082
3083     # re-read the instance from the configuration after rename
3084     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3085
3086     if inst.disk_template == constants.DT_FILE:
3087       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3088       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3089                                                      old_file_storage_dir,
3090                                                      new_file_storage_dir)
3091       result.Raise()
3092       if not result.data:
3093         raise errors.OpExecError("Could not connect to node '%s' to rename"
3094                                  " directory '%s' to '%s' (but the instance"
3095                                  " has been renamed in Ganeti)" % (
3096                                  inst.primary_node, old_file_storage_dir,
3097                                  new_file_storage_dir))
3098
3099       if not result.data[0]:
3100         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3101                                  " (but the instance has been renamed in"
3102                                  " Ganeti)" % (old_file_storage_dir,
3103                                                new_file_storage_dir))
3104
3105     _StartInstanceDisks(self, inst, None)
3106     try:
3107       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3108                                                  old_name)
3109       msg = result.RemoteFailMsg()
3110       if msg:
3111         msg = ("Could not run OS rename script for instance %s on node %s"
3112                " (but the instance has been renamed in Ganeti): %s" %
3113                (inst.name, inst.primary_node, msg))
3114         self.proc.LogWarning(msg)
3115     finally:
3116       _ShutdownInstanceDisks(self, inst)
3117
3118
3119 class LURemoveInstance(LogicalUnit):
3120   """Remove an instance.
3121
3122   """
3123   HPATH = "instance-remove"
3124   HTYPE = constants.HTYPE_INSTANCE
3125   _OP_REQP = ["instance_name", "ignore_failures"]
3126   REQ_BGL = False
3127
3128   def ExpandNames(self):
3129     self._ExpandAndLockInstance()
3130     self.needed_locks[locking.LEVEL_NODE] = []
3131     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3132
3133   def DeclareLocks(self, level):
3134     if level == locking.LEVEL_NODE:
3135       self._LockInstancesNodes()
3136
3137   def BuildHooksEnv(self):
3138     """Build hooks env.
3139
3140     This runs on master, primary and secondary nodes of the instance.
3141
3142     """
3143     env = _BuildInstanceHookEnvByObject(self, self.instance)
3144     nl = [self.cfg.GetMasterNode()]
3145     return env, nl, nl
3146
3147   def CheckPrereq(self):
3148     """Check prerequisites.
3149
3150     This checks that the instance is in the cluster.
3151
3152     """
3153     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3154     assert self.instance is not None, \
3155       "Cannot retrieve locked instance %s" % self.op.instance_name
3156
3157   def Exec(self, feedback_fn):
3158     """Remove the instance.
3159
3160     """
3161     instance = self.instance
3162     logging.info("Shutting down instance %s on node %s",
3163                  instance.name, instance.primary_node)
3164
3165     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3166     msg = result.RemoteFailMsg()
3167     if msg:
3168       if self.op.ignore_failures:
3169         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3170       else:
3171         raise errors.OpExecError("Could not shutdown instance %s on"
3172                                  " node %s: %s" %
3173                                  (instance.name, instance.primary_node, msg))
3174
3175     logging.info("Removing block devices for instance %s", instance.name)
3176
3177     if not _RemoveDisks(self, instance):
3178       if self.op.ignore_failures:
3179         feedback_fn("Warning: can't remove instance's disks")
3180       else:
3181         raise errors.OpExecError("Can't remove instance's disks")
3182
3183     logging.info("Removing instance %s out of cluster config", instance.name)
3184
3185     self.cfg.RemoveInstance(instance.name)
3186     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3187
3188
3189 class LUQueryInstances(NoHooksLU):
3190   """Logical unit for querying instances.
3191
3192   """
3193   _OP_REQP = ["output_fields", "names", "use_locking"]
3194   REQ_BGL = False
3195   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3196                                     "admin_state",
3197                                     "disk_template", "ip", "mac", "bridge",
3198                                     "sda_size", "sdb_size", "vcpus", "tags",
3199                                     "network_port", "beparams",
3200                                     r"(disk)\.(size)/([0-9]+)",
3201                                     r"(disk)\.(sizes)", "disk_usage",
3202                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3203                                     r"(nic)\.(macs|ips|bridges)",
3204                                     r"(disk|nic)\.(count)",
3205                                     "serial_no", "hypervisor", "hvparams",] +
3206                                   ["hv/%s" % name
3207                                    for name in constants.HVS_PARAMETERS] +
3208                                   ["be/%s" % name
3209                                    for name in constants.BES_PARAMETERS])
3210   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3211
3212
3213   def ExpandNames(self):
3214     _CheckOutputFields(static=self._FIELDS_STATIC,
3215                        dynamic=self._FIELDS_DYNAMIC,
3216                        selected=self.op.output_fields)
3217
3218     self.needed_locks = {}
3219     self.share_locks[locking.LEVEL_INSTANCE] = 1
3220     self.share_locks[locking.LEVEL_NODE] = 1
3221
3222     if self.op.names:
3223       self.wanted = _GetWantedInstances(self, self.op.names)
3224     else:
3225       self.wanted = locking.ALL_SET
3226
3227     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3228     self.do_locking = self.do_node_query and self.op.use_locking
3229     if self.do_locking:
3230       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3231       self.needed_locks[locking.LEVEL_NODE] = []
3232       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3233
3234   def DeclareLocks(self, level):
3235     if level == locking.LEVEL_NODE and self.do_locking:
3236       self._LockInstancesNodes()
3237
3238   def CheckPrereq(self):
3239     """Check prerequisites.
3240
3241     """
3242     pass
3243
3244   def Exec(self, feedback_fn):
3245     """Computes the list of nodes and their attributes.
3246
3247     """
3248     all_info = self.cfg.GetAllInstancesInfo()
3249     if self.wanted == locking.ALL_SET:
3250       # caller didn't specify instance names, so ordering is not important
3251       if self.do_locking:
3252         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3253       else:
3254         instance_names = all_info.keys()
3255       instance_names = utils.NiceSort(instance_names)
3256     else:
3257       # caller did specify names, so we must keep the ordering
3258       if self.do_locking:
3259         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3260       else:
3261         tgt_set = all_info.keys()
3262       missing = set(self.wanted).difference(tgt_set)
3263       if missing:
3264         raise errors.OpExecError("Some instances were removed before"
3265                                  " retrieving their data: %s" % missing)
3266       instance_names = self.wanted
3267
3268     instance_list = [all_info[iname] for iname in instance_names]
3269
3270     # begin data gathering
3271
3272     nodes = frozenset([inst.primary_node for inst in instance_list])
3273     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3274
3275     bad_nodes = []
3276     off_nodes = []
3277     if self.do_node_query:
3278       live_data = {}
3279       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3280       for name in nodes:
3281         result = node_data[name]
3282         if result.offline:
3283           # offline nodes will be in both lists
3284           off_nodes.append(name)
3285         if result.failed:
3286           bad_nodes.append(name)
3287         else:
3288           if result.data:
3289             live_data.update(result.data)
3290             # else no instance is alive
3291     else:
3292       live_data = dict([(name, {}) for name in instance_names])
3293
3294     # end data gathering
3295
3296     HVPREFIX = "hv/"
3297     BEPREFIX = "be/"
3298     output = []
3299     for instance in instance_list:
3300       iout = []
3301       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3302       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3303       for field in self.op.output_fields:
3304         st_match = self._FIELDS_STATIC.Matches(field)
3305         if field == "name":
3306           val = instance.name
3307         elif field == "os":
3308           val = instance.os
3309         elif field == "pnode":
3310           val = instance.primary_node
3311         elif field == "snodes":
3312           val = list(instance.secondary_nodes)
3313         elif field == "admin_state":
3314           val = instance.admin_up
3315         elif field == "oper_state":
3316           if instance.primary_node in bad_nodes:
3317             val = None
3318           else:
3319             val = bool(live_data.get(instance.name))
3320         elif field == "status":
3321           if instance.primary_node in off_nodes:
3322             val = "ERROR_nodeoffline"
3323           elif instance.primary_node in bad_nodes:
3324             val = "ERROR_nodedown"
3325           else:
3326             running = bool(live_data.get(instance.name))
3327             if running:
3328               if instance.admin_up:
3329                 val = "running"
3330               else:
3331                 val = "ERROR_up"
3332             else:
3333               if instance.admin_up:
3334                 val = "ERROR_down"
3335               else:
3336                 val = "ADMIN_down"
3337         elif field == "oper_ram":
3338           if instance.primary_node in bad_nodes:
3339             val = None
3340           elif instance.name in live_data:
3341             val = live_data[instance.name].get("memory", "?")
3342           else:
3343             val = "-"
3344         elif field == "disk_template":
3345           val = instance.disk_template
3346         elif field == "ip":
3347           val = instance.nics[0].ip
3348         elif field == "bridge":
3349           val = instance.nics[0].bridge
3350         elif field == "mac":
3351           val = instance.nics[0].mac
3352         elif field == "sda_size" or field == "sdb_size":
3353           idx = ord(field[2]) - ord('a')
3354           try:
3355             val = instance.FindDisk(idx).size
3356           except errors.OpPrereqError:
3357             val = None
3358         elif field == "disk_usage": # total disk usage per node
3359           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3360           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3361         elif field == "tags":
3362           val = list(instance.GetTags())
3363         elif field == "serial_no":
3364           val = instance.serial_no
3365         elif field == "network_port":
3366           val = instance.network_port
3367         elif field == "hypervisor":
3368           val = instance.hypervisor
3369         elif field == "hvparams":
3370           val = i_hv
3371         elif (field.startswith(HVPREFIX) and
3372               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3373           val = i_hv.get(field[len(HVPREFIX):], None)
3374         elif field == "beparams":
3375           val = i_be
3376         elif (field.startswith(BEPREFIX) and
3377               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3378           val = i_be.get(field[len(BEPREFIX):], None)
3379         elif st_match and st_match.groups():
3380           # matches a variable list
3381           st_groups = st_match.groups()
3382           if st_groups and st_groups[0] == "disk":
3383             if st_groups[1] == "count":
3384               val = len(instance.disks)
3385             elif st_groups[1] == "sizes":
3386               val = [disk.size for disk in instance.disks]
3387             elif st_groups[1] == "size":
3388               try:
3389                 val = instance.FindDisk(st_groups[2]).size
3390               except errors.OpPrereqError:
3391                 val = None
3392             else:
3393               assert False, "Unhandled disk parameter"
3394           elif st_groups[0] == "nic":
3395             if st_groups[1] == "count":
3396               val = len(instance.nics)
3397             elif st_groups[1] == "macs":
3398               val = [nic.mac for nic in instance.nics]
3399             elif st_groups[1] == "ips":
3400               val = [nic.ip for nic in instance.nics]
3401             elif st_groups[1] == "bridges":
3402               val = [nic.bridge for nic in instance.nics]
3403             else:
3404               # index-based item
3405               nic_idx = int(st_groups[2])
3406               if nic_idx >= len(instance.nics):
3407                 val = None
3408               else:
3409                 if st_groups[1] == "mac":
3410                   val = instance.nics[nic_idx].mac
3411                 elif st_groups[1] == "ip":
3412                   val = instance.nics[nic_idx].ip
3413                 elif st_groups[1] == "bridge":
3414                   val = instance.nics[nic_idx].bridge
3415                 else:
3416                   assert False, "Unhandled NIC parameter"
3417           else:
3418             assert False, "Unhandled variable parameter"
3419         else:
3420           raise errors.ParameterError(field)
3421         iout.append(val)
3422       output.append(iout)
3423
3424     return output
3425
3426
3427 class LUFailoverInstance(LogicalUnit):
3428   """Failover an instance.
3429
3430   """
3431   HPATH = "instance-failover"
3432   HTYPE = constants.HTYPE_INSTANCE
3433   _OP_REQP = ["instance_name", "ignore_consistency"]
3434   REQ_BGL = False
3435
3436   def ExpandNames(self):
3437     self._ExpandAndLockInstance()
3438     self.needed_locks[locking.LEVEL_NODE] = []
3439     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3440
3441   def DeclareLocks(self, level):
3442     if level == locking.LEVEL_NODE:
3443       self._LockInstancesNodes()
3444
3445   def BuildHooksEnv(self):
3446     """Build hooks env.
3447
3448     This runs on master, primary and secondary nodes of the instance.
3449
3450     """
3451     env = {
3452       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3453       }
3454     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3455     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3456     return env, nl, nl
3457
3458   def CheckPrereq(self):
3459     """Check prerequisites.
3460
3461     This checks that the instance is in the cluster.
3462
3463     """
3464     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3465     assert self.instance is not None, \
3466       "Cannot retrieve locked instance %s" % self.op.instance_name
3467
3468     bep = self.cfg.GetClusterInfo().FillBE(instance)
3469     if instance.disk_template not in constants.DTS_NET_MIRROR:
3470       raise errors.OpPrereqError("Instance's disk layout is not"
3471                                  " network mirrored, cannot failover.")
3472
3473     secondary_nodes = instance.secondary_nodes
3474     if not secondary_nodes:
3475       raise errors.ProgrammerError("no secondary node but using "
3476                                    "a mirrored disk template")
3477
3478     target_node = secondary_nodes[0]
3479     _CheckNodeOnline(self, target_node)
3480     _CheckNodeNotDrained(self, target_node)
3481     # check memory requirements on the secondary node
3482     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3483                          instance.name, bep[constants.BE_MEMORY],
3484                          instance.hypervisor)
3485
3486     # check bridge existance
3487     brlist = [nic.bridge for nic in instance.nics]
3488     result = self.rpc.call_bridges_exist(target_node, brlist)
3489     result.Raise()
3490     if not result.data:
3491       raise errors.OpPrereqError("One or more target bridges %s does not"
3492                                  " exist on destination node '%s'" %
3493                                  (brlist, target_node))
3494
3495   def Exec(self, feedback_fn):
3496     """Failover an instance.
3497
3498     The failover is done by shutting it down on its present node and
3499     starting it on the secondary.
3500
3501     """
3502     instance = self.instance
3503
3504     source_node = instance.primary_node
3505     target_node = instance.secondary_nodes[0]
3506
3507     feedback_fn("* checking disk consistency between source and target")
3508     for dev in instance.disks:
3509       # for drbd, these are drbd over lvm
3510       if not _CheckDiskConsistency(self, dev, target_node, False):
3511         if instance.admin_up and not self.op.ignore_consistency:
3512           raise errors.OpExecError("Disk %s is degraded on target node,"
3513                                    " aborting failover." % dev.iv_name)
3514
3515     feedback_fn("* shutting down instance on source node")
3516     logging.info("Shutting down instance %s on node %s",
3517                  instance.name, source_node)
3518
3519     result = self.rpc.call_instance_shutdown(source_node, instance)
3520     msg = result.RemoteFailMsg()
3521     if msg:
3522       if self.op.ignore_consistency:
3523         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3524                              " Proceeding anyway. Please make sure node"
3525                              " %s is down. Error details: %s",
3526                              instance.name, source_node, source_node, msg)
3527       else:
3528         raise errors.OpExecError("Could not shutdown instance %s on"
3529                                  " node %s: %s" %
3530                                  (instance.name, source_node, msg))
3531
3532     feedback_fn("* deactivating the instance's disks on source node")
3533     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3534       raise errors.OpExecError("Can't shut down the instance's disks.")
3535
3536     instance.primary_node = target_node
3537     # distribute new instance config to the other nodes
3538     self.cfg.Update(instance)
3539
3540     # Only start the instance if it's marked as up
3541     if instance.admin_up:
3542       feedback_fn("* activating the instance's disks on target node")
3543       logging.info("Starting instance %s on node %s",
3544                    instance.name, target_node)
3545
3546       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3547                                                ignore_secondaries=True)
3548       if not disks_ok:
3549         _ShutdownInstanceDisks(self, instance)
3550         raise errors.OpExecError("Can't activate the instance's disks")
3551
3552       feedback_fn("* starting the instance on the target node")
3553       result = self.rpc.call_instance_start(target_node, instance)
3554       msg = result.RemoteFailMsg()
3555       if msg:
3556         _ShutdownInstanceDisks(self, instance)
3557         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3558                                  (instance.name, target_node, msg))
3559
3560
3561 class LUMigrateInstance(LogicalUnit):
3562   """Migrate an instance.
3563
3564   This is migration without shutting down, compared to the failover,
3565   which is done with shutdown.
3566
3567   """
3568   HPATH = "instance-migrate"
3569   HTYPE = constants.HTYPE_INSTANCE
3570   _OP_REQP = ["instance_name", "live", "cleanup"]
3571
3572   REQ_BGL = False
3573
3574   def ExpandNames(self):
3575     self._ExpandAndLockInstance()
3576     self.needed_locks[locking.LEVEL_NODE] = []
3577     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3578
3579   def DeclareLocks(self, level):
3580     if level == locking.LEVEL_NODE:
3581       self._LockInstancesNodes()
3582
3583   def BuildHooksEnv(self):
3584     """Build hooks env.
3585
3586     This runs on master, primary and secondary nodes of the instance.
3587
3588     """
3589     env = _BuildInstanceHookEnvByObject(self, self.instance)
3590     env["MIGRATE_LIVE"] = self.op.live
3591     env["MIGRATE_CLEANUP"] = self.op.cleanup
3592     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3593     return env, nl, nl
3594
3595   def CheckPrereq(self):
3596     """Check prerequisites.
3597
3598     This checks that the instance is in the cluster.
3599
3600     """
3601     instance = self.cfg.GetInstanceInfo(
3602       self.cfg.ExpandInstanceName(self.op.instance_name))
3603     if instance is None:
3604       raise errors.OpPrereqError("Instance '%s' not known" %
3605                                  self.op.instance_name)
3606
3607     if instance.disk_template != constants.DT_DRBD8:
3608       raise errors.OpPrereqError("Instance's disk layout is not"
3609                                  " drbd8, cannot migrate.")
3610
3611     secondary_nodes = instance.secondary_nodes
3612     if not secondary_nodes:
3613       raise errors.ConfigurationError("No secondary node but using"
3614                                       " drbd8 disk template")
3615
3616     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3617
3618     target_node = secondary_nodes[0]
3619     # check memory requirements on the secondary node
3620     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3621                          instance.name, i_be[constants.BE_MEMORY],
3622                          instance.hypervisor)
3623
3624     # check bridge existance
3625     brlist = [nic.bridge for nic in instance.nics]
3626     result = self.rpc.call_bridges_exist(target_node, brlist)
3627     if result.failed or not result.data:
3628       raise errors.OpPrereqError("One or more target bridges %s does not"
3629                                  " exist on destination node '%s'" %
3630                                  (brlist, target_node))
3631
3632     if not self.op.cleanup:
3633       _CheckNodeNotDrained(self, target_node)
3634       result = self.rpc.call_instance_migratable(instance.primary_node,
3635                                                  instance)
3636       msg = result.RemoteFailMsg()
3637       if msg:
3638         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3639                                    msg)
3640
3641     self.instance = instance
3642
3643   def _WaitUntilSync(self):
3644     """Poll with custom rpc for disk sync.
3645
3646     This uses our own step-based rpc call.
3647
3648     """
3649     self.feedback_fn("* wait until resync is done")
3650     all_done = False
3651     while not all_done:
3652       all_done = True
3653       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3654                                             self.nodes_ip,
3655                                             self.instance.disks)
3656       min_percent = 100
3657       for node, nres in result.items():
3658         msg = nres.RemoteFailMsg()
3659         if msg:
3660           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3661                                    (node, msg))
3662         node_done, node_percent = nres.payload
3663         all_done = all_done and node_done
3664         if node_percent is not None:
3665           min_percent = min(min_percent, node_percent)
3666       if not all_done:
3667         if min_percent < 100:
3668           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3669         time.sleep(2)
3670
3671   def _EnsureSecondary(self, node):
3672     """Demote a node to secondary.
3673
3674     """
3675     self.feedback_fn("* switching node %s to secondary mode" % node)
3676
3677     for dev in self.instance.disks:
3678       self.cfg.SetDiskID(dev, node)
3679
3680     result = self.rpc.call_blockdev_close(node, self.instance.name,
3681                                           self.instance.disks)
3682     msg = result.RemoteFailMsg()
3683     if msg:
3684       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3685                                " error %s" % (node, msg))
3686
3687   def _GoStandalone(self):
3688     """Disconnect from the network.
3689
3690     """
3691     self.feedback_fn("* changing into standalone mode")
3692     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3693                                                self.instance.disks)
3694     for node, nres in result.items():
3695       msg = nres.RemoteFailMsg()
3696       if msg:
3697         raise errors.OpExecError("Cannot disconnect disks node %s,"
3698                                  " error %s" % (node, msg))
3699
3700   def _GoReconnect(self, multimaster):
3701     """Reconnect to the network.
3702
3703     """
3704     if multimaster:
3705       msg = "dual-master"
3706     else:
3707       msg = "single-master"
3708     self.feedback_fn("* changing disks into %s mode" % msg)
3709     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3710                                            self.instance.disks,
3711                                            self.instance.name, multimaster)
3712     for node, nres in result.items():
3713       msg = nres.RemoteFailMsg()
3714       if msg:
3715         raise errors.OpExecError("Cannot change disks config on node %s,"
3716                                  " error: %s" % (node, msg))
3717
3718   def _ExecCleanup(self):
3719     """Try to cleanup after a failed migration.
3720
3721     The cleanup is done by:
3722       - check that the instance is running only on one node
3723         (and update the config if needed)
3724       - change disks on its secondary node to secondary
3725       - wait until disks are fully synchronized
3726       - disconnect from the network
3727       - change disks into single-master mode
3728       - wait again until disks are fully synchronized
3729
3730     """
3731     instance = self.instance
3732     target_node = self.target_node
3733     source_node = self.source_node
3734
3735     # check running on only one node
3736     self.feedback_fn("* checking where the instance actually runs"
3737                      " (if this hangs, the hypervisor might be in"
3738                      " a bad state)")
3739     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3740     for node, result in ins_l.items():
3741       result.Raise()
3742       if not isinstance(result.data, list):
3743         raise errors.OpExecError("Can't contact node '%s'" % node)
3744
3745     runningon_source = instance.name in ins_l[source_node].data
3746     runningon_target = instance.name in ins_l[target_node].data
3747
3748     if runningon_source and runningon_target:
3749       raise errors.OpExecError("Instance seems to be running on two nodes,"
3750                                " or the hypervisor is confused. You will have"
3751                                " to ensure manually that it runs only on one"
3752                                " and restart this operation.")
3753
3754     if not (runningon_source or runningon_target):
3755       raise errors.OpExecError("Instance does not seem to be running at all."
3756                                " In this case, it's safer to repair by"
3757                                " running 'gnt-instance stop' to ensure disk"
3758                                " shutdown, and then restarting it.")
3759
3760     if runningon_target:
3761       # the migration has actually succeeded, we need to update the config
3762       self.feedback_fn("* instance running on secondary node (%s),"
3763                        " updating config" % target_node)
3764       instance.primary_node = target_node
3765       self.cfg.Update(instance)
3766       demoted_node = source_node
3767     else:
3768       self.feedback_fn("* instance confirmed to be running on its"
3769                        " primary node (%s)" % source_node)
3770       demoted_node = target_node
3771
3772     self._EnsureSecondary(demoted_node)
3773     try:
3774       self._WaitUntilSync()
3775     except errors.OpExecError:
3776       # we ignore here errors, since if the device is standalone, it
3777       # won't be able to sync
3778       pass
3779     self._GoStandalone()
3780     self._GoReconnect(False)
3781     self._WaitUntilSync()
3782
3783     self.feedback_fn("* done")
3784
3785   def _RevertDiskStatus(self):
3786     """Try to revert the disk status after a failed migration.
3787
3788     """
3789     target_node = self.target_node
3790     try:
3791       self._EnsureSecondary(target_node)
3792       self._GoStandalone()
3793       self._GoReconnect(False)
3794       self._WaitUntilSync()
3795     except errors.OpExecError, err:
3796       self.LogWarning("Migration failed and I can't reconnect the"
3797                       " drives: error '%s'\n"
3798                       "Please look and recover the instance status" %
3799                       str(err))
3800
3801   def _AbortMigration(self):
3802     """Call the hypervisor code to abort a started migration.
3803
3804     """
3805     instance = self.instance
3806     target_node = self.target_node
3807     migration_info = self.migration_info
3808
3809     abort_result = self.rpc.call_finalize_migration(target_node,
3810                                                     instance,
3811                                                     migration_info,
3812                                                     False)
3813     abort_msg = abort_result.RemoteFailMsg()
3814     if abort_msg:
3815       logging.error("Aborting migration failed on target node %s: %s" %
3816                     (target_node, abort_msg))
3817       # Don't raise an exception here, as we stil have to try to revert the
3818       # disk status, even if this step failed.
3819
3820   def _ExecMigration(self):
3821     """Migrate an instance.
3822
3823     The migrate is done by:
3824       - change the disks into dual-master mode
3825       - wait until disks are fully synchronized again
3826       - migrate the instance
3827       - change disks on the new secondary node (the old primary) to secondary
3828       - wait until disks are fully synchronized
3829       - change disks into single-master mode
3830
3831     """
3832     instance = self.instance
3833     target_node = self.target_node
3834     source_node = self.source_node
3835
3836     self.feedback_fn("* checking disk consistency between source and target")
3837     for dev in instance.disks:
3838       if not _CheckDiskConsistency(self, dev, target_node, False):
3839         raise errors.OpExecError("Disk %s is degraded or not fully"
3840                                  " synchronized on target node,"
3841                                  " aborting migrate." % dev.iv_name)
3842
3843     # First get the migration information from the remote node
3844     result = self.rpc.call_migration_info(source_node, instance)
3845     msg = result.RemoteFailMsg()
3846     if msg:
3847       log_err = ("Failed fetching source migration information from %s: %s" %
3848                  (source_node, msg))
3849       logging.error(log_err)
3850       raise errors.OpExecError(log_err)
3851
3852     self.migration_info = migration_info = result.payload
3853
3854     # Then switch the disks to master/master mode
3855     self._EnsureSecondary(target_node)
3856     self._GoStandalone()
3857     self._GoReconnect(True)
3858     self._WaitUntilSync()
3859
3860     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3861     result = self.rpc.call_accept_instance(target_node,
3862                                            instance,
3863                                            migration_info,
3864                                            self.nodes_ip[target_node])
3865
3866     msg = result.RemoteFailMsg()
3867     if msg:
3868       logging.error("Instance pre-migration failed, trying to revert"
3869                     " disk status: %s", msg)
3870       self._AbortMigration()
3871       self._RevertDiskStatus()
3872       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3873                                (instance.name, msg))
3874
3875     self.feedback_fn("* migrating instance to %s" % target_node)
3876     time.sleep(10)
3877     result = self.rpc.call_instance_migrate(source_node, instance,
3878                                             self.nodes_ip[target_node],
3879                                             self.op.live)
3880     msg = result.RemoteFailMsg()
3881     if msg:
3882       logging.error("Instance migration failed, trying to revert"
3883                     " disk status: %s", msg)
3884       self._AbortMigration()
3885       self._RevertDiskStatus()
3886       raise errors.OpExecError("Could not migrate instance %s: %s" %
3887                                (instance.name, msg))
3888     time.sleep(10)
3889
3890     instance.primary_node = target_node
3891     # distribute new instance config to the other nodes
3892     self.cfg.Update(instance)
3893
3894     result = self.rpc.call_finalize_migration(target_node,
3895                                               instance,
3896                                               migration_info,
3897                                               True)
3898     msg = result.RemoteFailMsg()
3899     if msg:
3900       logging.error("Instance migration succeeded, but finalization failed:"
3901                     " %s" % msg)
3902       raise errors.OpExecError("Could not finalize instance migration: %s" %
3903                                msg)
3904
3905     self._EnsureSecondary(source_node)
3906     self._WaitUntilSync()
3907     self._GoStandalone()
3908     self._GoReconnect(False)
3909     self._WaitUntilSync()
3910
3911     self.feedback_fn("* done")
3912
3913   def Exec(self, feedback_fn):
3914     """Perform the migration.
3915
3916     """
3917     self.feedback_fn = feedback_fn
3918
3919     self.source_node = self.instance.primary_node
3920     self.target_node = self.instance.secondary_nodes[0]
3921     self.all_nodes = [self.source_node, self.target_node]
3922     self.nodes_ip = {
3923       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3924       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3925       }
3926     if self.op.cleanup:
3927       return self._ExecCleanup()
3928     else:
3929       return self._ExecMigration()
3930
3931
3932 def _CreateBlockDev(lu, node, instance, device, force_create,
3933                     info, force_open):
3934   """Create a tree of block devices on a given node.
3935
3936   If this device type has to be created on secondaries, create it and
3937   all its children.
3938
3939   If not, just recurse to children keeping the same 'force' value.
3940
3941   @param lu: the lu on whose behalf we execute
3942   @param node: the node on which to create the device
3943   @type instance: L{objects.Instance}
3944   @param instance: the instance which owns the device
3945   @type device: L{objects.Disk}
3946   @param device: the device to create
3947   @type force_create: boolean
3948   @param force_create: whether to force creation of this device; this
3949       will be change to True whenever we find a device which has
3950       CreateOnSecondary() attribute
3951   @param info: the extra 'metadata' we should attach to the device
3952       (this will be represented as a LVM tag)
3953   @type force_open: boolean
3954   @param force_open: this parameter will be passes to the
3955       L{backend.BlockdevCreate} function where it specifies
3956       whether we run on primary or not, and it affects both
3957       the child assembly and the device own Open() execution
3958
3959   """
3960   if device.CreateOnSecondary():
3961     force_create = True
3962
3963   if device.children:
3964     for child in device.children:
3965       _CreateBlockDev(lu, node, instance, child, force_create,
3966                       info, force_open)
3967
3968   if not force_create:
3969     return
3970
3971   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3972
3973
3974 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3975   """Create a single block device on a given node.
3976
3977   This will not recurse over children of the device, so they must be
3978   created in advance.
3979
3980   @param lu: the lu on whose behalf we execute
3981   @param node: the node on which to create the device
3982   @type instance: L{objects.Instance}
3983   @param instance: the instance which owns the device
3984   @type device: L{objects.Disk}
3985   @param device: the device to create
3986   @param info: the extra 'metadata' we should attach to the device
3987       (this will be represented as a LVM tag)
3988   @type force_open: boolean
3989   @param force_open: this parameter will be passes to the
3990       L{backend.BlockdevCreate} function where it specifies
3991       whether we run on primary or not, and it affects both
3992       the child assembly and the device own Open() execution
3993
3994   """
3995   lu.cfg.SetDiskID(device, node)
3996   result = lu.rpc.call_blockdev_create(node, device, device.size,
3997                                        instance.name, force_open, info)
3998   msg = result.RemoteFailMsg()
3999   if msg:
4000     raise errors.OpExecError("Can't create block device %s on"
4001                              " node %s for instance %s: %s" %
4002                              (device, node, instance.name, msg))
4003   if device.physical_id is None:
4004     device.physical_id = result.payload
4005
4006
4007 def _GenerateUniqueNames(lu, exts):
4008   """Generate a suitable LV name.
4009
4010   This will generate a logical volume name for the given instance.
4011
4012   """
4013   results = []
4014   for val in exts:
4015     new_id = lu.cfg.GenerateUniqueID()
4016     results.append("%s%s" % (new_id, val))
4017   return results
4018
4019
4020 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4021                          p_minor, s_minor):
4022   """Generate a drbd8 device complete with its children.
4023
4024   """
4025   port = lu.cfg.AllocatePort()
4026   vgname = lu.cfg.GetVGName()
4027   shared_secret = lu.cfg.GenerateDRBDSecret()
4028   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4029                           logical_id=(vgname, names[0]))
4030   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4031                           logical_id=(vgname, names[1]))
4032   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4033                           logical_id=(primary, secondary, port,
4034                                       p_minor, s_minor,
4035                                       shared_secret),
4036                           children=[dev_data, dev_meta],
4037                           iv_name=iv_name)
4038   return drbd_dev
4039
4040
4041 def _GenerateDiskTemplate(lu, template_name,
4042                           instance_name, primary_node,
4043                           secondary_nodes, disk_info,
4044                           file_storage_dir, file_driver,
4045                           base_index):
4046   """Generate the entire disk layout for a given template type.
4047
4048   """
4049   #TODO: compute space requirements
4050
4051   vgname = lu.cfg.GetVGName()
4052   disk_count = len(disk_info)
4053   disks = []
4054   if template_name == constants.DT_DISKLESS:
4055     pass
4056   elif template_name == constants.DT_PLAIN:
4057     if len(secondary_nodes) != 0:
4058       raise errors.ProgrammerError("Wrong template configuration")
4059
4060     names = _GenerateUniqueNames(lu, [".disk%d" % i
4061                                       for i in range(disk_count)])
4062     for idx, disk in enumerate(disk_info):
4063       disk_index = idx + base_index
4064       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4065                               logical_id=(vgname, names[idx]),
4066                               iv_name="disk/%d" % disk_index,
4067                               mode=disk["mode"])
4068       disks.append(disk_dev)
4069   elif template_name == constants.DT_DRBD8:
4070     if len(secondary_nodes) != 1:
4071       raise errors.ProgrammerError("Wrong template configuration")
4072     remote_node = secondary_nodes[0]
4073     minors = lu.cfg.AllocateDRBDMinor(
4074       [primary_node, remote_node] * len(disk_info), instance_name)
4075
4076     names = []
4077     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4078                                                for i in range(disk_count)]):
4079       names.append(lv_prefix + "_data")
4080       names.append(lv_prefix + "_meta")
4081     for idx, disk in enumerate(disk_info):
4082       disk_index = idx + base_index
4083       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4084                                       disk["size"], names[idx*2:idx*2+2],
4085                                       "disk/%d" % disk_index,
4086                                       minors[idx*2], minors[idx*2+1])
4087       disk_dev.mode = disk["mode"]
4088       disks.append(disk_dev)
4089   elif template_name == constants.DT_FILE:
4090     if len(secondary_nodes) != 0:
4091       raise errors.ProgrammerError("Wrong template configuration")
4092
4093     for idx, disk in enumerate(disk_info):
4094       disk_index = idx + base_index
4095       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4096                               iv_name="disk/%d" % disk_index,
4097                               logical_id=(file_driver,
4098                                           "%s/disk%d" % (file_storage_dir,
4099                                                          disk_index)),
4100                               mode=disk["mode"])
4101       disks.append(disk_dev)
4102   else:
4103     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4104   return disks
4105
4106
4107 def _GetInstanceInfoText(instance):
4108   """Compute that text that should be added to the disk's metadata.
4109
4110   """
4111   return "originstname+%s" % instance.name
4112
4113
4114 def _CreateDisks(lu, instance):
4115   """Create all disks for an instance.
4116
4117   This abstracts away some work from AddInstance.
4118
4119   @type lu: L{LogicalUnit}
4120   @param lu: the logical unit on whose behalf we execute
4121   @type instance: L{objects.Instance}
4122   @param instance: the instance whose disks we should create
4123   @rtype: boolean
4124   @return: the success of the creation
4125
4126   """
4127   info = _GetInstanceInfoText(instance)
4128   pnode = instance.primary_node
4129
4130   if instance.disk_template == constants.DT_FILE:
4131     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4132     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4133
4134     if result.failed or not result.data:
4135       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4136
4137     if not result.data[0]:
4138       raise errors.OpExecError("Failed to create directory '%s'" %
4139                                file_storage_dir)
4140
4141   # Note: this needs to be kept in sync with adding of disks in
4142   # LUSetInstanceParams
4143   for device in instance.disks:
4144     logging.info("Creating volume %s for instance %s",
4145                  device.iv_name, instance.name)
4146     #HARDCODE
4147     for node in instance.all_nodes:
4148       f_create = node == pnode
4149       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4150
4151
4152 def _RemoveDisks(lu, instance):
4153   """Remove all disks for an instance.
4154
4155   This abstracts away some work from `AddInstance()` and
4156   `RemoveInstance()`. Note that in case some of the devices couldn't
4157   be removed, the removal will continue with the other ones (compare
4158   with `_CreateDisks()`).
4159
4160   @type lu: L{LogicalUnit}
4161   @param lu: the logical unit on whose behalf we execute
4162   @type instance: L{objects.Instance}
4163   @param instance: the instance whose disks we should remove
4164   @rtype: boolean
4165   @return: the success of the removal
4166
4167   """
4168   logging.info("Removing block devices for instance %s", instance.name)
4169
4170   all_result = True
4171   for device in instance.disks:
4172     for node, disk in device.ComputeNodeTree(instance.primary_node):
4173       lu.cfg.SetDiskID(disk, node)
4174       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4175       if msg:
4176         lu.LogWarning("Could not remove block device %s on node %s,"
4177                       " continuing anyway: %s", device.iv_name, node, msg)
4178         all_result = False
4179
4180   if instance.disk_template == constants.DT_FILE:
4181     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4182     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4183                                                  file_storage_dir)
4184     if result.failed or not result.data:
4185       logging.error("Could not remove directory '%s'", file_storage_dir)
4186       all_result = False
4187
4188   return all_result
4189
4190
4191 def _ComputeDiskSize(disk_template, disks):
4192   """Compute disk size requirements in the volume group
4193
4194   """
4195   # Required free disk space as a function of disk and swap space
4196   req_size_dict = {
4197     constants.DT_DISKLESS: None,
4198     constants.DT_PLAIN: sum(d["size"] for d in disks),
4199     # 128 MB are added for drbd metadata for each disk
4200     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4201     constants.DT_FILE: None,
4202   }
4203
4204   if disk_template not in req_size_dict:
4205     raise errors.ProgrammerError("Disk template '%s' size requirement"
4206                                  " is unknown" %  disk_template)
4207
4208   return req_size_dict[disk_template]
4209
4210
4211 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4212   """Hypervisor parameter validation.
4213
4214   This function abstract the hypervisor parameter validation to be
4215   used in both instance create and instance modify.
4216
4217   @type lu: L{LogicalUnit}
4218   @param lu: the logical unit for which we check
4219   @type nodenames: list
4220   @param nodenames: the list of nodes on which we should check
4221   @type hvname: string
4222   @param hvname: the name of the hypervisor we should use
4223   @type hvparams: dict
4224   @param hvparams: the parameters which we need to check
4225   @raise errors.OpPrereqError: if the parameters are not valid
4226
4227   """
4228   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4229                                                   hvname,
4230                                                   hvparams)
4231   for node in nodenames:
4232     info = hvinfo[node]
4233     if info.offline:
4234       continue
4235     msg = info.RemoteFailMsg()
4236     if msg:
4237       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4238                                  " %s" % msg)
4239
4240
4241 class LUCreateInstance(LogicalUnit):
4242   """Create an instance.
4243
4244   """
4245   HPATH = "instance-add"
4246   HTYPE = constants.HTYPE_INSTANCE
4247   _OP_REQP = ["instance_name", "disks", "disk_template",
4248               "mode", "start",
4249               "wait_for_sync", "ip_check", "nics",
4250               "hvparams", "beparams"]
4251   REQ_BGL = False
4252
4253   def _ExpandNode(self, node):
4254     """Expands and checks one node name.
4255
4256     """
4257     node_full = self.cfg.ExpandNodeName(node)
4258     if node_full is None:
4259       raise errors.OpPrereqError("Unknown node %s" % node)
4260     return node_full
4261
4262   def ExpandNames(self):
4263     """ExpandNames for CreateInstance.
4264
4265     Figure out the right locks for instance creation.
4266
4267     """
4268     self.needed_locks = {}
4269
4270     # set optional parameters to none if they don't exist
4271     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4272       if not hasattr(self.op, attr):
4273         setattr(self.op, attr, None)
4274
4275     # cheap checks, mostly valid constants given
4276
4277     # verify creation mode
4278     if self.op.mode not in (constants.INSTANCE_CREATE,
4279                             constants.INSTANCE_IMPORT):
4280       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4281                                  self.op.mode)
4282
4283     # disk template and mirror node verification
4284     if self.op.disk_template not in constants.DISK_TEMPLATES:
4285       raise errors.OpPrereqError("Invalid disk template name")
4286
4287     if self.op.hypervisor is None:
4288       self.op.hypervisor = self.cfg.GetHypervisorType()
4289
4290     cluster = self.cfg.GetClusterInfo()
4291     enabled_hvs = cluster.enabled_hypervisors
4292     if self.op.hypervisor not in enabled_hvs:
4293       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4294                                  " cluster (%s)" % (self.op.hypervisor,
4295                                   ",".join(enabled_hvs)))
4296
4297     # check hypervisor parameter syntax (locally)
4298     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4299     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4300                                   self.op.hvparams)
4301     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4302     hv_type.CheckParameterSyntax(filled_hvp)
4303
4304     # fill and remember the beparams dict
4305     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4306     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4307                                     self.op.beparams)
4308
4309     #### instance parameters check
4310
4311     # instance name verification
4312     hostname1 = utils.HostInfo(self.op.instance_name)
4313     self.op.instance_name = instance_name = hostname1.name
4314
4315     # this is just a preventive check, but someone might still add this
4316     # instance in the meantime, and creation will fail at lock-add time
4317     if instance_name in self.cfg.GetInstanceList():
4318       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4319                                  instance_name)
4320
4321     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4322
4323     # NIC buildup
4324     self.nics = []
4325     for nic in self.op.nics:
4326       # ip validity checks
4327       ip = nic.get("ip", None)
4328       if ip is None or ip.lower() == "none":
4329         nic_ip = None
4330       elif ip.lower() == constants.VALUE_AUTO:
4331         nic_ip = hostname1.ip
4332       else:
4333         if not utils.IsValidIP(ip):
4334           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4335                                      " like a valid IP" % ip)
4336         nic_ip = ip
4337
4338       # MAC address verification
4339       mac = nic.get("mac", constants.VALUE_AUTO)
4340       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4341         if not utils.IsValidMac(mac.lower()):
4342           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4343                                      mac)
4344       # bridge verification
4345       bridge = nic.get("bridge", None)
4346       if bridge is None:
4347         bridge = self.cfg.GetDefBridge()
4348       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4349
4350     # disk checks/pre-build
4351     self.disks = []
4352     for disk in self.op.disks:
4353       mode = disk.get("mode", constants.DISK_RDWR)
4354       if mode not in constants.DISK_ACCESS_SET:
4355         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4356                                    mode)
4357       size = disk.get("size", None)
4358       if size is None:
4359         raise errors.OpPrereqError("Missing disk size")
4360       try:
4361         size = int(size)
4362       except ValueError:
4363         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4364       self.disks.append({"size": size, "mode": mode})
4365
4366     # used in CheckPrereq for ip ping check
4367     self.check_ip = hostname1.ip
4368
4369     # file storage checks
4370     if (self.op.file_driver and
4371         not self.op.file_driver in constants.FILE_DRIVER):
4372       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4373                                  self.op.file_driver)
4374
4375     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4376       raise errors.OpPrereqError("File storage directory path not absolute")
4377
4378     ### Node/iallocator related checks
4379     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4380       raise errors.OpPrereqError("One and only one of iallocator and primary"
4381                                  " node must be given")
4382
4383     if self.op.iallocator:
4384       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4385     else:
4386       self.op.pnode = self._ExpandNode(self.op.pnode)
4387       nodelist = [self.op.pnode]
4388       if self.op.snode is not None:
4389         self.op.snode = self._ExpandNode(self.op.snode)
4390         nodelist.append(self.op.snode)
4391       self.needed_locks[locking.LEVEL_NODE] = nodelist
4392
4393     # in case of import lock the source node too
4394     if self.op.mode == constants.INSTANCE_IMPORT:
4395       src_node = getattr(self.op, "src_node", None)
4396       src_path = getattr(self.op, "src_path", None)
4397
4398       if src_path is None:
4399         self.op.src_path = src_path = self.op.instance_name
4400
4401       if src_node is None:
4402         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4403         self.op.src_node = None
4404         if os.path.isabs(src_path):
4405           raise errors.OpPrereqError("Importing an instance from an absolute"
4406                                      " path requires a source node option.")
4407       else:
4408         self.op.src_node = src_node = self._ExpandNode(src_node)
4409         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4410           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4411         if not os.path.isabs(src_path):
4412           self.op.src_path = src_path = \
4413             os.path.join(constants.EXPORT_DIR, src_path)
4414
4415     else: # INSTANCE_CREATE
4416       if getattr(self.op, "os_type", None) is None:
4417         raise errors.OpPrereqError("No guest OS specified")
4418
4419   def _RunAllocator(self):
4420     """Run the allocator based on input opcode.
4421
4422     """
4423     nics = [n.ToDict() for n in self.nics]
4424     ial = IAllocator(self,
4425                      mode=constants.IALLOCATOR_MODE_ALLOC,
4426                      name=self.op.instance_name,
4427                      disk_template=self.op.disk_template,
4428                      tags=[],
4429                      os=self.op.os_type,
4430                      vcpus=self.be_full[constants.BE_VCPUS],
4431                      mem_size=self.be_full[constants.BE_MEMORY],
4432                      disks=self.disks,
4433                      nics=nics,
4434                      hypervisor=self.op.hypervisor,
4435                      )
4436
4437     ial.Run(self.op.iallocator)
4438
4439     if not ial.success:
4440       raise errors.OpPrereqError("Can't compute nodes using"
4441                                  " iallocator '%s': %s" % (self.op.iallocator,
4442                                                            ial.info))
4443     if len(ial.nodes) != ial.required_nodes:
4444       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4445                                  " of nodes (%s), required %s" %
4446                                  (self.op.iallocator, len(ial.nodes),
4447                                   ial.required_nodes))
4448     self.op.pnode = ial.nodes[0]
4449     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4450                  self.op.instance_name, self.op.iallocator,
4451                  ", ".join(ial.nodes))
4452     if ial.required_nodes == 2:
4453       self.op.snode = ial.nodes[1]
4454
4455   def BuildHooksEnv(self):
4456     """Build hooks env.
4457
4458     This runs on master, primary and secondary nodes of the instance.
4459
4460     """
4461     env = {
4462       "ADD_MODE": self.op.mode,
4463       }
4464     if self.op.mode == constants.INSTANCE_IMPORT:
4465       env["SRC_NODE"] = self.op.src_node
4466       env["SRC_PATH"] = self.op.src_path
4467       env["SRC_IMAGES"] = self.src_images
4468
4469     env.update(_BuildInstanceHookEnv(
4470       name=self.op.instance_name,
4471       primary_node=self.op.pnode,
4472       secondary_nodes=self.secondaries,
4473       status=self.op.start,
4474       os_type=self.op.os_type,
4475       memory=self.be_full[constants.BE_MEMORY],
4476       vcpus=self.be_full[constants.BE_VCPUS],
4477       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4478       disk_template=self.op.disk_template,
4479       disks=[(d["size"], d["mode"]) for d in self.disks],
4480     ))
4481
4482     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4483           self.secondaries)
4484     return env, nl, nl
4485
4486
4487   def CheckPrereq(self):
4488     """Check prerequisites.
4489
4490     """
4491     if (not self.cfg.GetVGName() and
4492         self.op.disk_template not in constants.DTS_NOT_LVM):
4493       raise errors.OpPrereqError("Cluster does not support lvm-based"
4494                                  " instances")
4495
4496     if self.op.mode == constants.INSTANCE_IMPORT:
4497       src_node = self.op.src_node
4498       src_path = self.op.src_path
4499
4500       if src_node is None:
4501         exp_list = self.rpc.call_export_list(
4502           self.acquired_locks[locking.LEVEL_NODE])
4503         found = False
4504         for node in exp_list:
4505           if not exp_list[node].failed and src_path in exp_list[node].data:
4506             found = True
4507             self.op.src_node = src_node = node
4508             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4509                                                        src_path)
4510             break
4511         if not found:
4512           raise errors.OpPrereqError("No export found for relative path %s" %
4513                                       src_path)
4514
4515       _CheckNodeOnline(self, src_node)
4516       result = self.rpc.call_export_info(src_node, src_path)
4517       result.Raise()
4518       if not result.data:
4519         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4520
4521       export_info = result.data
4522       if not export_info.has_section(constants.INISECT_EXP):
4523         raise errors.ProgrammerError("Corrupted export config")
4524
4525       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4526       if (int(ei_version) != constants.EXPORT_VERSION):
4527         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4528                                    (ei_version, constants.EXPORT_VERSION))
4529
4530       # Check that the new instance doesn't have less disks than the export
4531       instance_disks = len(self.disks)
4532       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4533       if instance_disks < export_disks:
4534         raise errors.OpPrereqError("Not enough disks to import."
4535                                    " (instance: %d, export: %d)" %
4536                                    (instance_disks, export_disks))
4537
4538       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4539       disk_images = []
4540       for idx in range(export_disks):
4541         option = 'disk%d_dump' % idx
4542         if export_info.has_option(constants.INISECT_INS, option):
4543           # FIXME: are the old os-es, disk sizes, etc. useful?
4544           export_name = export_info.get(constants.INISECT_INS, option)
4545           image = os.path.join(src_path, export_name)
4546           disk_images.append(image)
4547         else:
4548           disk_images.append(False)
4549
4550       self.src_images = disk_images
4551
4552       old_name = export_info.get(constants.INISECT_INS, 'name')
4553       # FIXME: int() here could throw a ValueError on broken exports
4554       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4555       if self.op.instance_name == old_name:
4556         for idx, nic in enumerate(self.nics):
4557           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4558             nic_mac_ini = 'nic%d_mac' % idx
4559             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4560
4561     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4562     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4563     if self.op.start and not self.op.ip_check:
4564       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4565                                  " adding an instance in start mode")
4566
4567     if self.op.ip_check:
4568       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4569         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4570                                    (self.check_ip, self.op.instance_name))
4571
4572     #### mac address generation
4573     # By generating here the mac address both the allocator and the hooks get
4574     # the real final mac address rather than the 'auto' or 'generate' value.
4575     # There is a race condition between the generation and the instance object
4576     # creation, which means that we know the mac is valid now, but we're not
4577     # sure it will be when we actually add the instance. If things go bad
4578     # adding the instance will abort because of a duplicate mac, and the
4579     # creation job will fail.
4580     for nic in self.nics:
4581       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4582         nic.mac = self.cfg.GenerateMAC()
4583
4584     #### allocator run
4585
4586     if self.op.iallocator is not None:
4587       self._RunAllocator()
4588
4589     #### node related checks
4590
4591     # check primary node
4592     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4593     assert self.pnode is not None, \
4594       "Cannot retrieve locked node %s" % self.op.pnode
4595     if pnode.offline:
4596       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4597                                  pnode.name)
4598     if pnode.drained:
4599       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4600                                  pnode.name)
4601
4602     self.secondaries = []
4603
4604     # mirror node verification
4605     if self.op.disk_template in constants.DTS_NET_MIRROR:
4606       if self.op.snode is None:
4607         raise errors.OpPrereqError("The networked disk templates need"
4608                                    " a mirror node")
4609       if self.op.snode == pnode.name:
4610         raise errors.OpPrereqError("The secondary node cannot be"
4611                                    " the primary node.")
4612       _CheckNodeOnline(self, self.op.snode)
4613       _CheckNodeNotDrained(self, self.op.snode)
4614       self.secondaries.append(self.op.snode)
4615
4616     nodenames = [pnode.name] + self.secondaries
4617
4618     req_size = _ComputeDiskSize(self.op.disk_template,
4619                                 self.disks)
4620
4621     # Check lv size requirements
4622     if req_size is not None:
4623       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4624                                          self.op.hypervisor)
4625       for node in nodenames:
4626         info = nodeinfo[node]
4627         info.Raise()
4628         info = info.data
4629         if not info:
4630           raise errors.OpPrereqError("Cannot get current information"
4631                                      " from node '%s'" % node)
4632         vg_free = info.get('vg_free', None)
4633         if not isinstance(vg_free, int):
4634           raise errors.OpPrereqError("Can't compute free disk space on"
4635                                      " node %s" % node)
4636         if req_size > info['vg_free']:
4637           raise errors.OpPrereqError("Not enough disk space on target node %s."
4638                                      " %d MB available, %d MB required" %
4639                                      (node, info['vg_free'], req_size))
4640
4641     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4642
4643     # os verification
4644     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4645     result.Raise()
4646     if not isinstance(result.data, objects.OS):
4647       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4648                                  " primary node"  % self.op.os_type)
4649
4650     # bridge check on primary node
4651     bridges = [n.bridge for n in self.nics]
4652     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4653     result.Raise()
4654     if not result.data:
4655       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4656                                  " exist on destination node '%s'" %
4657                                  (",".join(bridges), pnode.name))
4658
4659     # memory check on primary node
4660     if self.op.start:
4661       _CheckNodeFreeMemory(self, self.pnode.name,
4662                            "creating instance %s" % self.op.instance_name,
4663                            self.be_full[constants.BE_MEMORY],
4664                            self.op.hypervisor)
4665
4666   def Exec(self, feedback_fn):
4667     """Create and add the instance to the cluster.
4668
4669     """
4670     instance = self.op.instance_name
4671     pnode_name = self.pnode.name
4672
4673     ht_kind = self.op.hypervisor
4674     if ht_kind in constants.HTS_REQ_PORT:
4675       network_port = self.cfg.AllocatePort()
4676     else:
4677       network_port = None
4678
4679     ##if self.op.vnc_bind_address is None:
4680     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4681
4682     # this is needed because os.path.join does not accept None arguments
4683     if self.op.file_storage_dir is None:
4684       string_file_storage_dir = ""
4685     else:
4686       string_file_storage_dir = self.op.file_storage_dir
4687
4688     # build the full file storage dir path
4689     file_storage_dir = os.path.normpath(os.path.join(
4690                                         self.cfg.GetFileStorageDir(),
4691                                         string_file_storage_dir, instance))
4692
4693
4694     disks = _GenerateDiskTemplate(self,
4695                                   self.op.disk_template,
4696                                   instance, pnode_name,
4697                                   self.secondaries,
4698                                   self.disks,
4699                                   file_storage_dir,
4700                                   self.op.file_driver,
4701                                   0)
4702
4703     iobj = objects.Instance(name=instance, os=self.op.os_type,
4704                             primary_node=pnode_name,
4705                             nics=self.nics, disks=disks,
4706                             disk_template=self.op.disk_template,
4707                             admin_up=False,
4708                             network_port=network_port,
4709                             beparams=self.op.beparams,
4710                             hvparams=self.op.hvparams,
4711                             hypervisor=self.op.hypervisor,
4712                             )
4713
4714     feedback_fn("* creating instance disks...")
4715     try:
4716       _CreateDisks(self, iobj)
4717     except errors.OpExecError:
4718       self.LogWarning("Device creation failed, reverting...")
4719       try:
4720         _RemoveDisks(self, iobj)
4721       finally:
4722         self.cfg.ReleaseDRBDMinors(instance)
4723         raise
4724
4725     feedback_fn("adding instance %s to cluster config" % instance)
4726
4727     self.cfg.AddInstance(iobj)
4728     # Declare that we don't want to remove the instance lock anymore, as we've
4729     # added the instance to the config
4730     del self.remove_locks[locking.LEVEL_INSTANCE]
4731     # Unlock all the nodes
4732     if self.op.mode == constants.INSTANCE_IMPORT:
4733       nodes_keep = [self.op.src_node]
4734       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4735                        if node != self.op.src_node]
4736       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4737       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4738     else:
4739       self.context.glm.release(locking.LEVEL_NODE)
4740       del self.acquired_locks[locking.LEVEL_NODE]
4741
4742     if self.op.wait_for_sync:
4743       disk_abort = not _WaitForSync(self, iobj)
4744     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4745       # make sure the disks are not degraded (still sync-ing is ok)
4746       time.sleep(15)
4747       feedback_fn("* checking mirrors status")
4748       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4749     else:
4750       disk_abort = False
4751
4752     if disk_abort:
4753       _RemoveDisks(self, iobj)
4754       self.cfg.RemoveInstance(iobj.name)
4755       # Make sure the instance lock gets removed
4756       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4757       raise errors.OpExecError("There are some degraded disks for"
4758                                " this instance")
4759
4760     feedback_fn("creating os for instance %s on node %s" %
4761                 (instance, pnode_name))
4762
4763     if iobj.disk_template != constants.DT_DISKLESS:
4764       if self.op.mode == constants.INSTANCE_CREATE:
4765         feedback_fn("* running the instance OS create scripts...")
4766         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4767         msg = result.RemoteFailMsg()
4768         if msg:
4769           raise errors.OpExecError("Could not add os for instance %s"
4770                                    " on node %s: %s" %
4771                                    (instance, pnode_name, msg))
4772
4773       elif self.op.mode == constants.INSTANCE_IMPORT:
4774         feedback_fn("* running the instance OS import scripts...")
4775         src_node = self.op.src_node
4776         src_images = self.src_images
4777         cluster_name = self.cfg.GetClusterName()
4778         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4779                                                          src_node, src_images,
4780                                                          cluster_name)
4781         import_result.Raise()
4782         for idx, result in enumerate(import_result.data):
4783           if not result:
4784             self.LogWarning("Could not import the image %s for instance"
4785                             " %s, disk %d, on node %s" %
4786                             (src_images[idx], instance, idx, pnode_name))
4787       else:
4788         # also checked in the prereq part
4789         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4790                                      % self.op.mode)
4791
4792     if self.op.start:
4793       iobj.admin_up = True
4794       self.cfg.Update(iobj)
4795       logging.info("Starting instance %s on node %s", instance, pnode_name)
4796       feedback_fn("* starting instance...")
4797       result = self.rpc.call_instance_start(pnode_name, iobj)
4798       msg = result.RemoteFailMsg()
4799       if msg:
4800         raise errors.OpExecError("Could not start instance: %s" % msg)
4801
4802
4803 class LUConnectConsole(NoHooksLU):
4804   """Connect to an instance's console.
4805
4806   This is somewhat special in that it returns the command line that
4807   you need to run on the master node in order to connect to the
4808   console.
4809
4810   """
4811   _OP_REQP = ["instance_name"]
4812   REQ_BGL = False
4813
4814   def ExpandNames(self):
4815     self._ExpandAndLockInstance()
4816
4817   def CheckPrereq(self):
4818     """Check prerequisites.
4819
4820     This checks that the instance is in the cluster.
4821
4822     """
4823     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4824     assert self.instance is not None, \
4825       "Cannot retrieve locked instance %s" % self.op.instance_name
4826     _CheckNodeOnline(self, self.instance.primary_node)
4827
4828   def Exec(self, feedback_fn):
4829     """Connect to the console of an instance
4830
4831     """
4832     instance = self.instance
4833     node = instance.primary_node
4834
4835     node_insts = self.rpc.call_instance_list([node],
4836                                              [instance.hypervisor])[node]
4837     node_insts.Raise()
4838
4839     if instance.name not in node_insts.data:
4840       raise errors.OpExecError("Instance %s is not running." % instance.name)
4841
4842     logging.debug("Connecting to console of %s on %s", instance.name, node)
4843
4844     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4845     cluster = self.cfg.GetClusterInfo()
4846     # beparams and hvparams are passed separately, to avoid editing the
4847     # instance and then saving the defaults in the instance itself.
4848     hvparams = cluster.FillHV(instance)
4849     beparams = cluster.FillBE(instance)
4850     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4851
4852     # build ssh cmdline
4853     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4854
4855
4856 class LUReplaceDisks(LogicalUnit):
4857   """Replace the disks of an instance.
4858
4859   """
4860   HPATH = "mirrors-replace"
4861   HTYPE = constants.HTYPE_INSTANCE
4862   _OP_REQP = ["instance_name", "mode", "disks"]
4863   REQ_BGL = False
4864
4865   def CheckArguments(self):
4866     if not hasattr(self.op, "remote_node"):
4867       self.op.remote_node = None
4868     if not hasattr(self.op, "iallocator"):
4869       self.op.iallocator = None
4870
4871     # check for valid parameter combination
4872     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4873     if self.op.mode == constants.REPLACE_DISK_CHG:
4874       if cnt == 2:
4875         raise errors.OpPrereqError("When changing the secondary either an"
4876                                    " iallocator script must be used or the"
4877                                    " new node given")
4878       elif cnt == 0:
4879         raise errors.OpPrereqError("Give either the iallocator or the new"
4880                                    " secondary, not both")
4881     else: # not replacing the secondary
4882       if cnt != 2:
4883         raise errors.OpPrereqError("The iallocator and new node options can"
4884                                    " be used only when changing the"
4885                                    " secondary node")
4886
4887   def ExpandNames(self):
4888     self._ExpandAndLockInstance()
4889
4890     if self.op.iallocator is not None:
4891       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4892     elif self.op.remote_node is not None:
4893       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4894       if remote_node is None:
4895         raise errors.OpPrereqError("Node '%s' not known" %
4896                                    self.op.remote_node)
4897       self.op.remote_node = remote_node
4898       # Warning: do not remove the locking of the new secondary here
4899       # unless DRBD8.AddChildren is changed to work in parallel;
4900       # currently it doesn't since parallel invocations of
4901       # FindUnusedMinor will conflict
4902       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4903       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4904     else:
4905       self.needed_locks[locking.LEVEL_NODE] = []
4906       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4907
4908   def DeclareLocks(self, level):
4909     # If we're not already locking all nodes in the set we have to declare the
4910     # instance's primary/secondary nodes.
4911     if (level == locking.LEVEL_NODE and
4912         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4913       self._LockInstancesNodes()
4914
4915   def _RunAllocator(self):
4916     """Compute a new secondary node using an IAllocator.
4917
4918     """
4919     ial = IAllocator(self,
4920                      mode=constants.IALLOCATOR_MODE_RELOC,
4921                      name=self.op.instance_name,
4922                      relocate_from=[self.sec_node])
4923
4924     ial.Run(self.op.iallocator)
4925
4926     if not ial.success:
4927       raise errors.OpPrereqError("Can't compute nodes using"
4928                                  " iallocator '%s': %s" % (self.op.iallocator,
4929                                                            ial.info))
4930     if len(ial.nodes) != ial.required_nodes:
4931       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4932                                  " of nodes (%s), required %s" %
4933                                  (len(ial.nodes), ial.required_nodes))
4934     self.op.remote_node = ial.nodes[0]
4935     self.LogInfo("Selected new secondary for the instance: %s",
4936                  self.op.remote_node)
4937
4938   def BuildHooksEnv(self):
4939     """Build hooks env.
4940
4941     This runs on the master, the primary and all the secondaries.
4942
4943     """
4944     env = {
4945       "MODE": self.op.mode,
4946       "NEW_SECONDARY": self.op.remote_node,
4947       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4948       }
4949     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4950     nl = [
4951       self.cfg.GetMasterNode(),
4952       self.instance.primary_node,
4953       ]
4954     if self.op.remote_node is not None:
4955       nl.append(self.op.remote_node)
4956     return env, nl, nl
4957
4958   def CheckPrereq(self):
4959     """Check prerequisites.
4960
4961     This checks that the instance is in the cluster.
4962
4963     """
4964     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4965     assert instance is not None, \
4966       "Cannot retrieve locked instance %s" % self.op.instance_name
4967     self.instance = instance
4968
4969     if instance.disk_template != constants.DT_DRBD8:
4970       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4971                                  " instances")
4972
4973     if len(instance.secondary_nodes) != 1:
4974       raise errors.OpPrereqError("The instance has a strange layout,"
4975                                  " expected one secondary but found %d" %
4976                                  len(instance.secondary_nodes))
4977
4978     self.sec_node = instance.secondary_nodes[0]
4979
4980     if self.op.iallocator is not None:
4981       self._RunAllocator()
4982
4983     remote_node = self.op.remote_node
4984     if remote_node is not None:
4985       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4986       assert self.remote_node_info is not None, \
4987         "Cannot retrieve locked node %s" % remote_node
4988     else:
4989       self.remote_node_info = None
4990     if remote_node == instance.primary_node:
4991       raise errors.OpPrereqError("The specified node is the primary node of"
4992                                  " the instance.")
4993     elif remote_node == self.sec_node:
4994       raise errors.OpPrereqError("The specified node is already the"
4995                                  " secondary node of the instance.")
4996
4997     if self.op.mode == constants.REPLACE_DISK_PRI:
4998       n1 = self.tgt_node = instance.primary_node
4999       n2 = self.oth_node = self.sec_node
5000     elif self.op.mode == constants.REPLACE_DISK_SEC:
5001       n1 = self.tgt_node = self.sec_node
5002       n2 = self.oth_node = instance.primary_node
5003     elif self.op.mode == constants.REPLACE_DISK_CHG:
5004       n1 = self.new_node = remote_node
5005       n2 = self.oth_node = instance.primary_node
5006       self.tgt_node = self.sec_node
5007       _CheckNodeNotDrained(self, remote_node)
5008     else:
5009       raise errors.ProgrammerError("Unhandled disk replace mode")
5010
5011     _CheckNodeOnline(self, n1)
5012     _CheckNodeOnline(self, n2)
5013
5014     if not self.op.disks:
5015       self.op.disks = range(len(instance.disks))
5016
5017     for disk_idx in self.op.disks:
5018       instance.FindDisk(disk_idx)
5019
5020   def _ExecD8DiskOnly(self, feedback_fn):
5021     """Replace a disk on the primary or secondary for dbrd8.
5022
5023     The algorithm for replace is quite complicated:
5024
5025       1. for each disk to be replaced:
5026
5027         1. create new LVs on the target node with unique names
5028         1. detach old LVs from the drbd device
5029         1. rename old LVs to name_replaced.<time_t>
5030         1. rename new LVs to old LVs
5031         1. attach the new LVs (with the old names now) to the drbd device
5032
5033       1. wait for sync across all devices
5034
5035       1. for each modified disk:
5036
5037         1. remove old LVs (which have the name name_replaces.<time_t>)
5038
5039     Failures are not very well handled.
5040
5041     """
5042     steps_total = 6
5043     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5044     instance = self.instance
5045     iv_names = {}
5046     vgname = self.cfg.GetVGName()
5047     # start of work
5048     cfg = self.cfg
5049     tgt_node = self.tgt_node
5050     oth_node = self.oth_node
5051
5052     # Step: check device activation
5053     self.proc.LogStep(1, steps_total, "check device existence")
5054     info("checking volume groups")
5055     my_vg = cfg.GetVGName()
5056     results = self.rpc.call_vg_list([oth_node, tgt_node])
5057     if not results:
5058       raise errors.OpExecError("Can't list volume groups on the nodes")
5059     for node in oth_node, tgt_node:
5060       res = results[node]
5061       if res.failed or not res.data or my_vg not in res.data:
5062         raise errors.OpExecError("Volume group '%s' not found on %s" %
5063                                  (my_vg, node))
5064     for idx, dev in enumerate(instance.disks):
5065       if idx not in self.op.disks:
5066         continue
5067       for node in tgt_node, oth_node:
5068         info("checking disk/%d on %s" % (idx, node))
5069         cfg.SetDiskID(dev, node)
5070         result = self.rpc.call_blockdev_find(node, dev)
5071         msg = result.RemoteFailMsg()
5072         if not msg and not result.payload:
5073           msg = "disk not found"
5074         if msg:
5075           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5076                                    (idx, node, msg))
5077
5078     # Step: check other node consistency
5079     self.proc.LogStep(2, steps_total, "check peer consistency")
5080     for idx, dev in enumerate(instance.disks):
5081       if idx not in self.op.disks:
5082         continue
5083       info("checking disk/%d consistency on %s" % (idx, oth_node))
5084       if not _CheckDiskConsistency(self, dev, oth_node,
5085                                    oth_node==instance.primary_node):
5086         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5087                                  " to replace disks on this node (%s)" %
5088                                  (oth_node, tgt_node))
5089
5090     # Step: create new storage
5091     self.proc.LogStep(3, steps_total, "allocate new storage")
5092     for idx, dev in enumerate(instance.disks):
5093       if idx not in self.op.disks:
5094         continue
5095       size = dev.size
5096       cfg.SetDiskID(dev, tgt_node)
5097       lv_names = [".disk%d_%s" % (idx, suf)
5098                   for suf in ["data", "meta"]]
5099       names = _GenerateUniqueNames(self, lv_names)
5100       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5101                              logical_id=(vgname, names[0]))
5102       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5103                              logical_id=(vgname, names[1]))
5104       new_lvs = [lv_data, lv_meta]
5105       old_lvs = dev.children
5106       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5107       info("creating new local storage on %s for %s" %
5108            (tgt_node, dev.iv_name))
5109       # we pass force_create=True to force the LVM creation
5110       for new_lv in new_lvs:
5111         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5112                         _GetInstanceInfoText(instance), False)
5113
5114     # Step: for each lv, detach+rename*2+attach
5115     self.proc.LogStep(4, steps_total, "change drbd configuration")
5116     for dev, old_lvs, new_lvs in iv_names.itervalues():
5117       info("detaching %s drbd from local storage" % dev.iv_name)
5118       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5119       result.Raise()
5120       if not result.data:
5121         raise errors.OpExecError("Can't detach drbd from local storage on node"
5122                                  " %s for device %s" % (tgt_node, dev.iv_name))
5123       #dev.children = []
5124       #cfg.Update(instance)
5125
5126       # ok, we created the new LVs, so now we know we have the needed
5127       # storage; as such, we proceed on the target node to rename
5128       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5129       # using the assumption that logical_id == physical_id (which in
5130       # turn is the unique_id on that node)
5131
5132       # FIXME(iustin): use a better name for the replaced LVs
5133       temp_suffix = int(time.time())
5134       ren_fn = lambda d, suff: (d.physical_id[0],
5135                                 d.physical_id[1] + "_replaced-%s" % suff)
5136       # build the rename list based on what LVs exist on the node
5137       rlist = []
5138       for to_ren in old_lvs:
5139         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5140         if not result.RemoteFailMsg() and result.payload:
5141           # device exists
5142           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5143
5144       info("renaming the old LVs on the target node")
5145       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5146       result.Raise()
5147       if not result.data:
5148         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5149       # now we rename the new LVs to the old LVs
5150       info("renaming the new LVs on the target node")
5151       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5152       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5153       result.Raise()
5154       if not result.data:
5155         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5156
5157       for old, new in zip(old_lvs, new_lvs):
5158         new.logical_id = old.logical_id
5159         cfg.SetDiskID(new, tgt_node)
5160
5161       for disk in old_lvs:
5162         disk.logical_id = ren_fn(disk, temp_suffix)
5163         cfg.SetDiskID(disk, tgt_node)
5164
5165       # now that the new lvs have the old name, we can add them to the device
5166       info("adding new mirror component on %s" % tgt_node)
5167       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5168       if result.failed or not result.data:
5169         for new_lv in new_lvs:
5170           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5171           if msg:
5172             warning("Can't rollback device %s: %s", dev, msg,
5173                     hint="cleanup manually the unused logical volumes")
5174         raise errors.OpExecError("Can't add local storage to drbd")
5175
5176       dev.children = new_lvs
5177       cfg.Update(instance)
5178
5179     # Step: wait for sync
5180
5181     # this can fail as the old devices are degraded and _WaitForSync
5182     # does a combined result over all disks, so we don't check its
5183     # return value
5184     self.proc.LogStep(5, steps_total, "sync devices")
5185     _WaitForSync(self, instance, unlock=True)
5186
5187     # so check manually all the devices
5188     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5189       cfg.SetDiskID(dev, instance.primary_node)
5190       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5191       msg = result.RemoteFailMsg()
5192       if not msg and not result.payload:
5193         msg = "disk not found"
5194       if msg:
5195         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5196                                  (name, msg))
5197       if result.payload[5]:
5198         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5199
5200     # Step: remove old storage
5201     self.proc.LogStep(6, steps_total, "removing old storage")
5202     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5203       info("remove logical volumes for %s" % name)
5204       for lv in old_lvs:
5205         cfg.SetDiskID(lv, tgt_node)
5206         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5207         if msg:
5208           warning("Can't remove old LV: %s" % msg,
5209                   hint="manually remove unused LVs")
5210           continue
5211
5212   def _ExecD8Secondary(self, feedback_fn):
5213     """Replace the secondary node for drbd8.
5214
5215     The algorithm for replace is quite complicated:
5216       - for all disks of the instance:
5217         - create new LVs on the new node with same names
5218         - shutdown the drbd device on the old secondary
5219         - disconnect the drbd network on the primary
5220         - create the drbd device on the new secondary
5221         - network attach the drbd on the primary, using an artifice:
5222           the drbd code for Attach() will connect to the network if it
5223           finds a device which is connected to the good local disks but
5224           not network enabled
5225       - wait for sync across all devices
5226       - remove all disks from the old secondary
5227
5228     Failures are not very well handled.
5229
5230     """
5231     steps_total = 6
5232     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5233     instance = self.instance
5234     iv_names = {}
5235     # start of work
5236     cfg = self.cfg
5237     old_node = self.tgt_node
5238     new_node = self.new_node
5239     pri_node = instance.primary_node
5240     nodes_ip = {
5241       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5242       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5243       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5244       }
5245
5246     # Step: check device activation
5247     self.proc.LogStep(1, steps_total, "check device existence")
5248     info("checking volume groups")
5249     my_vg = cfg.GetVGName()
5250     results = self.rpc.call_vg_list([pri_node, new_node])
5251     for node in pri_node, new_node:
5252       res = results[node]
5253       if res.failed or not res.data or my_vg not in res.data:
5254         raise errors.OpExecError("Volume group '%s' not found on %s" %
5255                                  (my_vg, node))
5256     for idx, dev in enumerate(instance.disks):
5257       if idx not in self.op.disks:
5258         continue
5259       info("checking disk/%d on %s" % (idx, pri_node))
5260       cfg.SetDiskID(dev, pri_node)
5261       result = self.rpc.call_blockdev_find(pri_node, dev)
5262       msg = result.RemoteFailMsg()
5263       if not msg and not result.payload:
5264         msg = "disk not found"
5265       if msg:
5266         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5267                                  (idx, pri_node, msg))
5268
5269     # Step: check other node consistency
5270     self.proc.LogStep(2, steps_total, "check peer consistency")
5271     for idx, dev in enumerate(instance.disks):
5272       if idx not in self.op.disks:
5273         continue
5274       info("checking disk/%d consistency on %s" % (idx, pri_node))
5275       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5276         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5277                                  " unsafe to replace the secondary" %
5278                                  pri_node)
5279
5280     # Step: create new storage
5281     self.proc.LogStep(3, steps_total, "allocate new storage")
5282     for idx, dev in enumerate(instance.disks):
5283       info("adding new local storage on %s for disk/%d" %
5284            (new_node, idx))
5285       # we pass force_create=True to force LVM creation
5286       for new_lv in dev.children:
5287         _CreateBlockDev(self, new_node, instance, new_lv, True,
5288                         _GetInstanceInfoText(instance), False)
5289
5290     # Step 4: dbrd minors and drbd setups changes
5291     # after this, we must manually remove the drbd minors on both the
5292     # error and the success paths
5293     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5294                                    instance.name)
5295     logging.debug("Allocated minors %s" % (minors,))
5296     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5297     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5298       size = dev.size
5299       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5300       # create new devices on new_node; note that we create two IDs:
5301       # one without port, so the drbd will be activated without
5302       # networking information on the new node at this stage, and one
5303       # with network, for the latter activation in step 4
5304       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5305       if pri_node == o_node1:
5306         p_minor = o_minor1
5307       else:
5308         p_minor = o_minor2
5309
5310       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5311       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5312
5313       iv_names[idx] = (dev, dev.children, new_net_id)
5314       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5315                     new_net_id)
5316       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5317                               logical_id=new_alone_id,
5318                               children=dev.children)
5319       try:
5320         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5321                               _GetInstanceInfoText(instance), False)
5322       except errors.GenericError:
5323         self.cfg.ReleaseDRBDMinors(instance.name)
5324         raise
5325
5326     for idx, dev in enumerate(instance.disks):
5327       # we have new devices, shutdown the drbd on the old secondary
5328       info("shutting down drbd for disk/%d on old node" % idx)
5329       cfg.SetDiskID(dev, old_node)
5330       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5331       if msg:
5332         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5333                 (idx, msg),
5334                 hint="Please cleanup this device manually as soon as possible")
5335
5336     info("detaching primary drbds from the network (=> standalone)")
5337     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5338                                                instance.disks)[pri_node]
5339
5340     msg = result.RemoteFailMsg()
5341     if msg:
5342       # detaches didn't succeed (unlikely)
5343       self.cfg.ReleaseDRBDMinors(instance.name)
5344       raise errors.OpExecError("Can't detach the disks from the network on"
5345                                " old node: %s" % (msg,))
5346
5347     # if we managed to detach at least one, we update all the disks of
5348     # the instance to point to the new secondary
5349     info("updating instance configuration")
5350     for dev, _, new_logical_id in iv_names.itervalues():
5351       dev.logical_id = new_logical_id
5352       cfg.SetDiskID(dev, pri_node)
5353     cfg.Update(instance)
5354
5355     # and now perform the drbd attach
5356     info("attaching primary drbds to new secondary (standalone => connected)")
5357     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5358                                            instance.disks, instance.name,
5359                                            False)
5360     for to_node, to_result in result.items():
5361       msg = to_result.RemoteFailMsg()
5362       if msg:
5363         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5364                 hint="please do a gnt-instance info to see the"
5365                 " status of disks")
5366
5367     # this can fail as the old devices are degraded and _WaitForSync
5368     # does a combined result over all disks, so we don't check its
5369     # return value
5370     self.proc.LogStep(5, steps_total, "sync devices")
5371     _WaitForSync(self, instance, unlock=True)
5372
5373     # so check manually all the devices
5374     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5375       cfg.SetDiskID(dev, pri_node)
5376       result = self.rpc.call_blockdev_find(pri_node, dev)
5377       msg = result.RemoteFailMsg()
5378       if not msg and not result.payload:
5379         msg = "disk not found"
5380       if msg:
5381         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5382                                  (idx, msg))
5383       if result.payload[5]:
5384         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5385
5386     self.proc.LogStep(6, steps_total, "removing old storage")
5387     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5388       info("remove logical volumes for disk/%d" % idx)
5389       for lv in old_lvs:
5390         cfg.SetDiskID(lv, old_node)
5391         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5392         if msg:
5393           warning("Can't remove LV on old secondary: %s", msg,
5394                   hint="Cleanup stale volumes by hand")
5395
5396   def Exec(self, feedback_fn):
5397     """Execute disk replacement.
5398
5399     This dispatches the disk replacement to the appropriate handler.
5400
5401     """
5402     instance = self.instance
5403
5404     # Activate the instance disks if we're replacing them on a down instance
5405     if not instance.admin_up:
5406       _StartInstanceDisks(self, instance, True)
5407
5408     if self.op.mode == constants.REPLACE_DISK_CHG:
5409       fn = self._ExecD8Secondary
5410     else:
5411       fn = self._ExecD8DiskOnly
5412
5413     ret = fn(feedback_fn)
5414
5415     # Deactivate the instance disks if we're replacing them on a down instance
5416     if not instance.admin_up:
5417       _SafeShutdownInstanceDisks(self, instance)
5418
5419     return ret
5420
5421
5422 class LUGrowDisk(LogicalUnit):
5423   """Grow a disk of an instance.
5424
5425   """
5426   HPATH = "disk-grow"
5427   HTYPE = constants.HTYPE_INSTANCE
5428   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5429   REQ_BGL = False
5430
5431   def ExpandNames(self):
5432     self._ExpandAndLockInstance()
5433     self.needed_locks[locking.LEVEL_NODE] = []
5434     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5435
5436   def DeclareLocks(self, level):
5437     if level == locking.LEVEL_NODE:
5438       self._LockInstancesNodes()
5439
5440   def BuildHooksEnv(self):
5441     """Build hooks env.
5442
5443     This runs on the master, the primary and all the secondaries.
5444
5445     """
5446     env = {
5447       "DISK": self.op.disk,
5448       "AMOUNT": self.op.amount,
5449       }
5450     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5451     nl = [
5452       self.cfg.GetMasterNode(),
5453       self.instance.primary_node,
5454       ]
5455     return env, nl, nl
5456
5457   def CheckPrereq(self):
5458     """Check prerequisites.
5459
5460     This checks that the instance is in the cluster.
5461
5462     """
5463     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5464     assert instance is not None, \
5465       "Cannot retrieve locked instance %s" % self.op.instance_name
5466     nodenames = list(instance.all_nodes)
5467     for node in nodenames:
5468       _CheckNodeOnline(self, node)
5469
5470
5471     self.instance = instance
5472
5473     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5474       raise errors.OpPrereqError("Instance's disk layout does not support"
5475                                  " growing.")
5476
5477     self.disk = instance.FindDisk(self.op.disk)
5478
5479     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5480                                        instance.hypervisor)
5481     for node in nodenames:
5482       info = nodeinfo[node]
5483       if info.failed or not info.data:
5484         raise errors.OpPrereqError("Cannot get current information"
5485                                    " from node '%s'" % node)
5486       vg_free = info.data.get('vg_free', None)
5487       if not isinstance(vg_free, int):
5488         raise errors.OpPrereqError("Can't compute free disk space on"
5489                                    " node %s" % node)
5490       if self.op.amount > vg_free:
5491         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5492                                    " %d MiB available, %d MiB required" %
5493                                    (node, vg_free, self.op.amount))
5494
5495   def Exec(self, feedback_fn):
5496     """Execute disk grow.
5497
5498     """
5499     instance = self.instance
5500     disk = self.disk
5501     for node in instance.all_nodes:
5502       self.cfg.SetDiskID(disk, node)
5503       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5504       msg = result.RemoteFailMsg()
5505       if msg:
5506         raise errors.OpExecError("Grow request failed to node %s: %s" %
5507                                  (node, msg))
5508     disk.RecordGrow(self.op.amount)
5509     self.cfg.Update(instance)
5510     if self.op.wait_for_sync:
5511       disk_abort = not _WaitForSync(self, instance)
5512       if disk_abort:
5513         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5514                              " status.\nPlease check the instance.")
5515
5516
5517 class LUQueryInstanceData(NoHooksLU):
5518   """Query runtime instance data.
5519
5520   """
5521   _OP_REQP = ["instances", "static"]
5522   REQ_BGL = False
5523
5524   def ExpandNames(self):
5525     self.needed_locks = {}
5526     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5527
5528     if not isinstance(self.op.instances, list):
5529       raise errors.OpPrereqError("Invalid argument type 'instances'")
5530
5531     if self.op.instances:
5532       self.wanted_names = []
5533       for name in self.op.instances:
5534         full_name = self.cfg.ExpandInstanceName(name)
5535         if full_name is None:
5536           raise errors.OpPrereqError("Instance '%s' not known" % name)
5537         self.wanted_names.append(full_name)
5538       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5539     else:
5540       self.wanted_names = None
5541       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5542
5543     self.needed_locks[locking.LEVEL_NODE] = []
5544     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5545
5546   def DeclareLocks(self, level):
5547     if level == locking.LEVEL_NODE:
5548       self._LockInstancesNodes()
5549
5550   def CheckPrereq(self):
5551     """Check prerequisites.
5552
5553     This only checks the optional instance list against the existing names.
5554
5555     """
5556     if self.wanted_names is None:
5557       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5558
5559     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5560                              in self.wanted_names]
5561     return
5562
5563   def _ComputeDiskStatus(self, instance, snode, dev):
5564     """Compute block device status.
5565
5566     """
5567     static = self.op.static
5568     if not static:
5569       self.cfg.SetDiskID(dev, instance.primary_node)
5570       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5571       if dev_pstatus.offline:
5572         dev_pstatus = None
5573       else:
5574         msg = dev_pstatus.RemoteFailMsg()
5575         if msg:
5576           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5577                                    (instance.name, msg))
5578         dev_pstatus = dev_pstatus.payload
5579     else:
5580       dev_pstatus = None
5581
5582     if dev.dev_type in constants.LDS_DRBD:
5583       # we change the snode then (otherwise we use the one passed in)
5584       if dev.logical_id[0] == instance.primary_node:
5585         snode = dev.logical_id[1]
5586       else:
5587         snode = dev.logical_id[0]
5588
5589     if snode and not static:
5590       self.cfg.SetDiskID(dev, snode)
5591       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5592       if dev_sstatus.offline:
5593         dev_sstatus = None
5594       else:
5595         msg = dev_sstatus.RemoteFailMsg()
5596         if msg:
5597           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5598                                    (instance.name, msg))
5599         dev_sstatus = dev_sstatus.payload
5600     else:
5601       dev_sstatus = None
5602
5603     if dev.children:
5604       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5605                       for child in dev.children]
5606     else:
5607       dev_children = []
5608
5609     data = {
5610       "iv_name": dev.iv_name,
5611       "dev_type": dev.dev_type,
5612       "logical_id": dev.logical_id,
5613       "physical_id": dev.physical_id,
5614       "pstatus": dev_pstatus,
5615       "sstatus": dev_sstatus,
5616       "children": dev_children,
5617       "mode": dev.mode,
5618       }
5619
5620     return data
5621
5622   def Exec(self, feedback_fn):
5623     """Gather and return data"""
5624     result = {}
5625
5626     cluster = self.cfg.GetClusterInfo()
5627
5628     for instance in self.wanted_instances:
5629       if not self.op.static:
5630         remote_info = self.rpc.call_instance_info(instance.primary_node,
5631                                                   instance.name,
5632                                                   instance.hypervisor)
5633         remote_info.Raise()
5634         remote_info = remote_info.data
5635         if remote_info and "state" in remote_info:
5636           remote_state = "up"
5637         else:
5638           remote_state = "down"
5639       else:
5640         remote_state = None
5641       if instance.admin_up:
5642         config_state = "up"
5643       else:
5644         config_state = "down"
5645
5646       disks = [self._ComputeDiskStatus(instance, None, device)
5647                for device in instance.disks]
5648
5649       idict = {
5650         "name": instance.name,
5651         "config_state": config_state,
5652         "run_state": remote_state,
5653         "pnode": instance.primary_node,
5654         "snodes": instance.secondary_nodes,
5655         "os": instance.os,
5656         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5657         "disks": disks,
5658         "hypervisor": instance.hypervisor,
5659         "network_port": instance.network_port,
5660         "hv_instance": instance.hvparams,
5661         "hv_actual": cluster.FillHV(instance),
5662         "be_instance": instance.beparams,
5663         "be_actual": cluster.FillBE(instance),
5664         }
5665
5666       result[instance.name] = idict
5667
5668     return result
5669
5670
5671 class LUSetInstanceParams(LogicalUnit):
5672   """Modifies an instances's parameters.
5673
5674   """
5675   HPATH = "instance-modify"
5676   HTYPE = constants.HTYPE_INSTANCE
5677   _OP_REQP = ["instance_name"]
5678   REQ_BGL = False
5679
5680   def CheckArguments(self):
5681     if not hasattr(self.op, 'nics'):
5682       self.op.nics = []
5683     if not hasattr(self.op, 'disks'):
5684       self.op.disks = []
5685     if not hasattr(self.op, 'beparams'):
5686       self.op.beparams = {}
5687     if not hasattr(self.op, 'hvparams'):
5688       self.op.hvparams = {}
5689     self.op.force = getattr(self.op, "force", False)
5690     if not (self.op.nics or self.op.disks or
5691             self.op.hvparams or self.op.beparams):
5692       raise errors.OpPrereqError("No changes submitted")
5693
5694     # Disk validation
5695     disk_addremove = 0
5696     for disk_op, disk_dict in self.op.disks:
5697       if disk_op == constants.DDM_REMOVE:
5698         disk_addremove += 1
5699         continue
5700       elif disk_op == constants.DDM_ADD:
5701         disk_addremove += 1
5702       else:
5703         if not isinstance(disk_op, int):
5704           raise errors.OpPrereqError("Invalid disk index")
5705       if disk_op == constants.DDM_ADD:
5706         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5707         if mode not in constants.DISK_ACCESS_SET:
5708           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5709         size = disk_dict.get('size', None)
5710         if size is None:
5711           raise errors.OpPrereqError("Required disk parameter size missing")
5712         try:
5713           size = int(size)
5714         except ValueError, err:
5715           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5716                                      str(err))
5717         disk_dict['size'] = size
5718       else:
5719         # modification of disk
5720         if 'size' in disk_dict:
5721           raise errors.OpPrereqError("Disk size change not possible, use"
5722                                      " grow-disk")
5723
5724     if disk_addremove > 1:
5725       raise errors.OpPrereqError("Only one disk add or remove operation"
5726                                  " supported at a time")
5727
5728     # NIC validation
5729     nic_addremove = 0
5730     for nic_op, nic_dict in self.op.nics:
5731       if nic_op == constants.DDM_REMOVE:
5732         nic_addremove += 1
5733         continue
5734       elif nic_op == constants.DDM_ADD:
5735         nic_addremove += 1
5736       else:
5737         if not isinstance(nic_op, int):
5738           raise errors.OpPrereqError("Invalid nic index")
5739
5740       # nic_dict should be a dict
5741       nic_ip = nic_dict.get('ip', None)
5742       if nic_ip is not None:
5743         if nic_ip.lower() == constants.VALUE_NONE:
5744           nic_dict['ip'] = None
5745         else:
5746           if not utils.IsValidIP(nic_ip):
5747             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5748
5749       if nic_op == constants.DDM_ADD:
5750         nic_bridge = nic_dict.get('bridge', None)
5751         if nic_bridge is None:
5752           nic_dict['bridge'] = self.cfg.GetDefBridge()
5753         nic_mac = nic_dict.get('mac', None)
5754         if nic_mac is None:
5755           nic_dict['mac'] = constants.VALUE_AUTO
5756
5757       if 'mac' in nic_dict:
5758         nic_mac = nic_dict['mac']
5759         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5760           if not utils.IsValidMac(nic_mac):
5761             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5762         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5763           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5764                                      " modifying an existing nic")
5765
5766     if nic_addremove > 1:
5767       raise errors.OpPrereqError("Only one NIC add or remove operation"
5768                                  " supported at a time")
5769
5770   def ExpandNames(self):
5771     self._ExpandAndLockInstance()
5772     self.needed_locks[locking.LEVEL_NODE] = []
5773     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5774
5775   def DeclareLocks(self, level):
5776     if level == locking.LEVEL_NODE:
5777       self._LockInstancesNodes()
5778
5779   def BuildHooksEnv(self):
5780     """Build hooks env.
5781
5782     This runs on the master, primary and secondaries.
5783
5784     """
5785     args = dict()
5786     if constants.BE_MEMORY in self.be_new:
5787       args['memory'] = self.be_new[constants.BE_MEMORY]
5788     if constants.BE_VCPUS in self.be_new:
5789       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5790     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5791     # information at all.
5792     if self.op.nics:
5793       args['nics'] = []
5794       nic_override = dict(self.op.nics)
5795       for idx, nic in enumerate(self.instance.nics):
5796         if idx in nic_override:
5797           this_nic_override = nic_override[idx]
5798         else:
5799           this_nic_override = {}
5800         if 'ip' in this_nic_override:
5801           ip = this_nic_override['ip']
5802         else:
5803           ip = nic.ip
5804         if 'bridge' in this_nic_override:
5805           bridge = this_nic_override['bridge']
5806         else:
5807           bridge = nic.bridge
5808         if 'mac' in this_nic_override:
5809           mac = this_nic_override['mac']
5810         else:
5811           mac = nic.mac
5812         args['nics'].append((ip, bridge, mac))
5813       if constants.DDM_ADD in nic_override:
5814         ip = nic_override[constants.DDM_ADD].get('ip', None)
5815         bridge = nic_override[constants.DDM_ADD]['bridge']
5816         mac = nic_override[constants.DDM_ADD]['mac']
5817         args['nics'].append((ip, bridge, mac))
5818       elif constants.DDM_REMOVE in nic_override:
5819         del args['nics'][-1]
5820
5821     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5822     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5823     return env, nl, nl
5824
5825   def CheckPrereq(self):
5826     """Check prerequisites.
5827
5828     This only checks the instance list against the existing names.
5829
5830     """
5831     force = self.force = self.op.force
5832
5833     # checking the new params on the primary/secondary nodes
5834
5835     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5836     assert self.instance is not None, \
5837       "Cannot retrieve locked instance %s" % self.op.instance_name
5838     pnode = instance.primary_node
5839     nodelist = list(instance.all_nodes)
5840
5841     # hvparams processing
5842     if self.op.hvparams:
5843       i_hvdict = copy.deepcopy(instance.hvparams)
5844       for key, val in self.op.hvparams.iteritems():
5845         if val == constants.VALUE_DEFAULT:
5846           try:
5847             del i_hvdict[key]
5848           except KeyError:
5849             pass
5850         else:
5851           i_hvdict[key] = val
5852       cluster = self.cfg.GetClusterInfo()
5853       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5854       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5855                                 i_hvdict)
5856       # local check
5857       hypervisor.GetHypervisor(
5858         instance.hypervisor).CheckParameterSyntax(hv_new)
5859       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5860       self.hv_new = hv_new # the new actual values
5861       self.hv_inst = i_hvdict # the new dict (without defaults)
5862     else:
5863       self.hv_new = self.hv_inst = {}
5864
5865     # beparams processing
5866     if self.op.beparams:
5867       i_bedict = copy.deepcopy(instance.beparams)
5868       for key, val in self.op.beparams.iteritems():
5869         if val == constants.VALUE_DEFAULT:
5870           try:
5871             del i_bedict[key]
5872           except KeyError:
5873             pass
5874         else:
5875           i_bedict[key] = val
5876       cluster = self.cfg.GetClusterInfo()
5877       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5878       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5879                                 i_bedict)
5880       self.be_new = be_new # the new actual values
5881       self.be_inst = i_bedict # the new dict (without defaults)
5882     else:
5883       self.be_new = self.be_inst = {}
5884
5885     self.warn = []
5886
5887     if constants.BE_MEMORY in self.op.beparams and not self.force:
5888       mem_check_list = [pnode]
5889       if be_new[constants.BE_AUTO_BALANCE]:
5890         # either we changed auto_balance to yes or it was from before
5891         mem_check_list.extend(instance.secondary_nodes)
5892       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5893                                                   instance.hypervisor)
5894       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5895                                          instance.hypervisor)
5896       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5897         # Assume the primary node is unreachable and go ahead
5898         self.warn.append("Can't get info from primary node %s" % pnode)
5899       else:
5900         if not instance_info.failed and instance_info.data:
5901           current_mem = instance_info.data['memory']
5902         else:
5903           # Assume instance not running
5904           # (there is a slight race condition here, but it's not very probable,
5905           # and we have no other way to check)
5906           current_mem = 0
5907         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5908                     nodeinfo[pnode].data['memory_free'])
5909         if miss_mem > 0:
5910           raise errors.OpPrereqError("This change will prevent the instance"
5911                                      " from starting, due to %d MB of memory"
5912                                      " missing on its primary node" % miss_mem)
5913
5914       if be_new[constants.BE_AUTO_BALANCE]:
5915         for node, nres in nodeinfo.iteritems():
5916           if node not in instance.secondary_nodes:
5917             continue
5918           if nres.failed or not isinstance(nres.data, dict):
5919             self.warn.append("Can't get info from secondary node %s" % node)
5920           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5921             self.warn.append("Not enough memory to failover instance to"
5922                              " secondary node %s" % node)
5923
5924     # NIC processing
5925     for nic_op, nic_dict in self.op.nics:
5926       if nic_op == constants.DDM_REMOVE:
5927         if not instance.nics:
5928           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5929         continue
5930       if nic_op != constants.DDM_ADD:
5931         # an existing nic
5932         if nic_op < 0 or nic_op >= len(instance.nics):
5933           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5934                                      " are 0 to %d" %
5935                                      (nic_op, len(instance.nics)))
5936       if 'bridge' in nic_dict:
5937         nic_bridge = nic_dict['bridge']
5938         if nic_bridge is None:
5939           raise errors.OpPrereqError('Cannot set the nic bridge to None')
5940         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5941           msg = ("Bridge '%s' doesn't exist on one of"
5942                  " the instance nodes" % nic_bridge)
5943           if self.force:
5944             self.warn.append(msg)
5945           else:
5946             raise errors.OpPrereqError(msg)
5947       if 'mac' in nic_dict:
5948         nic_mac = nic_dict['mac']
5949         if nic_mac is None:
5950           raise errors.OpPrereqError('Cannot set the nic mac to None')
5951         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5952           # otherwise generate the mac
5953           nic_dict['mac'] = self.cfg.GenerateMAC()
5954         else:
5955           # or validate/reserve the current one
5956           if self.cfg.IsMacInUse(nic_mac):
5957             raise errors.OpPrereqError("MAC address %s already in use"
5958                                        " in cluster" % nic_mac)
5959
5960     # DISK processing
5961     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5962       raise errors.OpPrereqError("Disk operations not supported for"
5963                                  " diskless instances")
5964     for disk_op, disk_dict in self.op.disks:
5965       if disk_op == constants.DDM_REMOVE:
5966         if len(instance.disks) == 1:
5967           raise errors.OpPrereqError("Cannot remove the last disk of"
5968                                      " an instance")
5969         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5970         ins_l = ins_l[pnode]
5971         if ins_l.failed or not isinstance(ins_l.data, list):
5972           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5973         if instance.name in ins_l.data:
5974           raise errors.OpPrereqError("Instance is running, can't remove"
5975                                      " disks.")
5976
5977       if (disk_op == constants.DDM_ADD and
5978           len(instance.nics) >= constants.MAX_DISKS):
5979         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5980                                    " add more" % constants.MAX_DISKS)
5981       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5982         # an existing disk
5983         if disk_op < 0 or disk_op >= len(instance.disks):
5984           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5985                                      " are 0 to %d" %
5986                                      (disk_op, len(instance.disks)))
5987
5988     return
5989
5990   def Exec(self, feedback_fn):
5991     """Modifies an instance.
5992
5993     All parameters take effect only at the next restart of the instance.
5994
5995     """
5996     # Process here the warnings from CheckPrereq, as we don't have a
5997     # feedback_fn there.
5998     for warn in self.warn:
5999       feedback_fn("WARNING: %s" % warn)
6000
6001     result = []
6002     instance = self.instance
6003     # disk changes
6004     for disk_op, disk_dict in self.op.disks:
6005       if disk_op == constants.DDM_REMOVE:
6006         # remove the last disk
6007         device = instance.disks.pop()
6008         device_idx = len(instance.disks)
6009         for node, disk in device.ComputeNodeTree(instance.primary_node):
6010           self.cfg.SetDiskID(disk, node)
6011           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6012           if msg:
6013             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6014                             " continuing anyway", device_idx, node, msg)
6015         result.append(("disk/%d" % device_idx, "remove"))
6016       elif disk_op == constants.DDM_ADD:
6017         # add a new disk
6018         if instance.disk_template == constants.DT_FILE:
6019           file_driver, file_path = instance.disks[0].logical_id
6020           file_path = os.path.dirname(file_path)
6021         else:
6022           file_driver = file_path = None
6023         disk_idx_base = len(instance.disks)
6024         new_disk = _GenerateDiskTemplate(self,
6025                                          instance.disk_template,
6026                                          instance.name, instance.primary_node,
6027                                          instance.secondary_nodes,
6028                                          [disk_dict],
6029                                          file_path,
6030                                          file_driver,
6031                                          disk_idx_base)[0]
6032         instance.disks.append(new_disk)
6033         info = _GetInstanceInfoText(instance)
6034
6035         logging.info("Creating volume %s for instance %s",
6036                      new_disk.iv_name, instance.name)
6037         # Note: this needs to be kept in sync with _CreateDisks
6038         #HARDCODE
6039         for node in instance.all_nodes:
6040           f_create = node == instance.primary_node
6041           try:
6042             _CreateBlockDev(self, node, instance, new_disk,
6043                             f_create, info, f_create)
6044           except errors.OpExecError, err:
6045             self.LogWarning("Failed to create volume %s (%s) on"
6046                             " node %s: %s",
6047                             new_disk.iv_name, new_disk, node, err)
6048         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6049                        (new_disk.size, new_disk.mode)))
6050       else:
6051         # change a given disk
6052         instance.disks[disk_op].mode = disk_dict['mode']
6053         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6054     # NIC changes
6055     for nic_op, nic_dict in self.op.nics:
6056       if nic_op == constants.DDM_REMOVE:
6057         # remove the last nic
6058         del instance.nics[-1]
6059         result.append(("nic.%d" % len(instance.nics), "remove"))
6060       elif nic_op == constants.DDM_ADD:
6061         # mac and bridge should be set, by now
6062         mac = nic_dict['mac']
6063         bridge = nic_dict['bridge']
6064         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6065                               bridge=bridge)
6066         instance.nics.append(new_nic)
6067         result.append(("nic.%d" % (len(instance.nics) - 1),
6068                        "add:mac=%s,ip=%s,bridge=%s" %
6069                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6070       else:
6071         # change a given nic
6072         for key in 'mac', 'ip', 'bridge':
6073           if key in nic_dict:
6074             setattr(instance.nics[nic_op], key, nic_dict[key])
6075             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6076
6077     # hvparams changes
6078     if self.op.hvparams:
6079       instance.hvparams = self.hv_inst
6080       for key, val in self.op.hvparams.iteritems():
6081         result.append(("hv/%s" % key, val))
6082
6083     # beparams changes
6084     if self.op.beparams:
6085       instance.beparams = self.be_inst
6086       for key, val in self.op.beparams.iteritems():
6087         result.append(("be/%s" % key, val))
6088
6089     self.cfg.Update(instance)
6090
6091     return result
6092
6093
6094 class LUQueryExports(NoHooksLU):
6095   """Query the exports list
6096
6097   """
6098   _OP_REQP = ['nodes']
6099   REQ_BGL = False
6100
6101   def ExpandNames(self):
6102     self.needed_locks = {}
6103     self.share_locks[locking.LEVEL_NODE] = 1
6104     if not self.op.nodes:
6105       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6106     else:
6107       self.needed_locks[locking.LEVEL_NODE] = \
6108         _GetWantedNodes(self, self.op.nodes)
6109
6110   def CheckPrereq(self):
6111     """Check prerequisites.
6112
6113     """
6114     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6115
6116   def Exec(self, feedback_fn):
6117     """Compute the list of all the exported system images.
6118
6119     @rtype: dict
6120     @return: a dictionary with the structure node->(export-list)
6121         where export-list is a list of the instances exported on
6122         that node.
6123
6124     """
6125     rpcresult = self.rpc.call_export_list(self.nodes)
6126     result = {}
6127     for node in rpcresult:
6128       if rpcresult[node].failed:
6129         result[node] = False
6130       else:
6131         result[node] = rpcresult[node].data
6132
6133     return result
6134
6135
6136 class LUExportInstance(LogicalUnit):
6137   """Export an instance to an image in the cluster.
6138
6139   """
6140   HPATH = "instance-export"
6141   HTYPE = constants.HTYPE_INSTANCE
6142   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6143   REQ_BGL = False
6144
6145   def ExpandNames(self):
6146     self._ExpandAndLockInstance()
6147     # FIXME: lock only instance primary and destination node
6148     #
6149     # Sad but true, for now we have do lock all nodes, as we don't know where
6150     # the previous export might be, and and in this LU we search for it and
6151     # remove it from its current node. In the future we could fix this by:
6152     #  - making a tasklet to search (share-lock all), then create the new one,
6153     #    then one to remove, after
6154     #  - removing the removal operation altoghether
6155     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6156
6157   def DeclareLocks(self, level):
6158     """Last minute lock declaration."""
6159     # All nodes are locked anyway, so nothing to do here.
6160
6161   def BuildHooksEnv(self):
6162     """Build hooks env.
6163
6164     This will run on the master, primary node and target node.
6165
6166     """
6167     env = {
6168       "EXPORT_NODE": self.op.target_node,
6169       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6170       }
6171     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6172     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6173           self.op.target_node]
6174     return env, nl, nl
6175
6176   def CheckPrereq(self):
6177     """Check prerequisites.
6178
6179     This checks that the instance and node names are valid.
6180
6181     """
6182     instance_name = self.op.instance_name
6183     self.instance = self.cfg.GetInstanceInfo(instance_name)
6184     assert self.instance is not None, \
6185           "Cannot retrieve locked instance %s" % self.op.instance_name
6186     _CheckNodeOnline(self, self.instance.primary_node)
6187
6188     self.dst_node = self.cfg.GetNodeInfo(
6189       self.cfg.ExpandNodeName(self.op.target_node))
6190
6191     if self.dst_node is None:
6192       # This is wrong node name, not a non-locked node
6193       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6194     _CheckNodeOnline(self, self.dst_node.name)
6195     _CheckNodeNotDrained(self, self.dst_node.name)
6196
6197     # instance disk type verification
6198     for disk in self.instance.disks:
6199       if disk.dev_type == constants.LD_FILE:
6200         raise errors.OpPrereqError("Export not supported for instances with"
6201                                    " file-based disks")
6202
6203   def Exec(self, feedback_fn):
6204     """Export an instance to an image in the cluster.
6205
6206     """
6207     instance = self.instance
6208     dst_node = self.dst_node
6209     src_node = instance.primary_node
6210     if self.op.shutdown:
6211       # shutdown the instance, but not the disks
6212       result = self.rpc.call_instance_shutdown(src_node, instance)
6213       msg = result.RemoteFailMsg()
6214       if msg:
6215         raise errors.OpExecError("Could not shutdown instance %s on"
6216                                  " node %s: %s" %
6217                                  (instance.name, src_node, msg))
6218
6219     vgname = self.cfg.GetVGName()
6220
6221     snap_disks = []
6222
6223     # set the disks ID correctly since call_instance_start needs the
6224     # correct drbd minor to create the symlinks
6225     for disk in instance.disks:
6226       self.cfg.SetDiskID(disk, src_node)
6227
6228     try:
6229       for disk in instance.disks:
6230         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6231         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6232         if new_dev_name.failed or not new_dev_name.data:
6233           self.LogWarning("Could not snapshot block device %s on node %s",
6234                           disk.logical_id[1], src_node)
6235           snap_disks.append(False)
6236         else:
6237           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6238                                  logical_id=(vgname, new_dev_name.data),
6239                                  physical_id=(vgname, new_dev_name.data),
6240                                  iv_name=disk.iv_name)
6241           snap_disks.append(new_dev)
6242
6243     finally:
6244       if self.op.shutdown and instance.admin_up:
6245         result = self.rpc.call_instance_start(src_node, instance)
6246         msg = result.RemoteFailMsg()
6247         if msg:
6248           _ShutdownInstanceDisks(self, instance)
6249           raise errors.OpExecError("Could not start instance: %s" % msg)
6250
6251     # TODO: check for size
6252
6253     cluster_name = self.cfg.GetClusterName()
6254     for idx, dev in enumerate(snap_disks):
6255       if dev:
6256         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6257                                                instance, cluster_name, idx)
6258         if result.failed or not result.data:
6259           self.LogWarning("Could not export block device %s from node %s to"
6260                           " node %s", dev.logical_id[1], src_node,
6261                           dst_node.name)
6262         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6263         if msg:
6264           self.LogWarning("Could not remove snapshot block device %s from node"
6265                           " %s: %s", dev.logical_id[1], src_node, msg)
6266
6267     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6268     if result.failed or not result.data:
6269       self.LogWarning("Could not finalize export for instance %s on node %s",
6270                       instance.name, dst_node.name)
6271
6272     nodelist = self.cfg.GetNodeList()
6273     nodelist.remove(dst_node.name)
6274
6275     # on one-node clusters nodelist will be empty after the removal
6276     # if we proceed the backup would be removed because OpQueryExports
6277     # substitutes an empty list with the full cluster node list.
6278     if nodelist:
6279       exportlist = self.rpc.call_export_list(nodelist)
6280       for node in exportlist:
6281         if exportlist[node].failed:
6282           continue
6283         if instance.name in exportlist[node].data:
6284           if not self.rpc.call_export_remove(node, instance.name):
6285             self.LogWarning("Could not remove older export for instance %s"
6286                             " on node %s", instance.name, node)
6287
6288
6289 class LURemoveExport(NoHooksLU):
6290   """Remove exports related to the named instance.
6291
6292   """
6293   _OP_REQP = ["instance_name"]
6294   REQ_BGL = False
6295
6296   def ExpandNames(self):
6297     self.needed_locks = {}
6298     # We need all nodes to be locked in order for RemoveExport to work, but we
6299     # don't need to lock the instance itself, as nothing will happen to it (and
6300     # we can remove exports also for a removed instance)
6301     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6302
6303   def CheckPrereq(self):
6304     """Check prerequisites.
6305     """
6306     pass
6307
6308   def Exec(self, feedback_fn):
6309     """Remove any export.
6310
6311     """
6312     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6313     # If the instance was not found we'll try with the name that was passed in.
6314     # This will only work if it was an FQDN, though.
6315     fqdn_warn = False
6316     if not instance_name:
6317       fqdn_warn = True
6318       instance_name = self.op.instance_name
6319
6320     exportlist = self.rpc.call_export_list(self.acquired_locks[
6321       locking.LEVEL_NODE])
6322     found = False
6323     for node in exportlist:
6324       if exportlist[node].failed:
6325         self.LogWarning("Failed to query node %s, continuing" % node)
6326         continue
6327       if instance_name in exportlist[node].data:
6328         found = True
6329         result = self.rpc.call_export_remove(node, instance_name)
6330         if result.failed or not result.data:
6331           logging.error("Could not remove export for instance %s"
6332                         " on node %s", instance_name, node)
6333
6334     if fqdn_warn and not found:
6335       feedback_fn("Export not found. If trying to remove an export belonging"
6336                   " to a deleted instance please use its Fully Qualified"
6337                   " Domain Name.")
6338
6339
6340 class TagsLU(NoHooksLU):
6341   """Generic tags LU.
6342
6343   This is an abstract class which is the parent of all the other tags LUs.
6344
6345   """
6346
6347   def ExpandNames(self):
6348     self.needed_locks = {}
6349     if self.op.kind == constants.TAG_NODE:
6350       name = self.cfg.ExpandNodeName(self.op.name)
6351       if name is None:
6352         raise errors.OpPrereqError("Invalid node name (%s)" %
6353                                    (self.op.name,))
6354       self.op.name = name
6355       self.needed_locks[locking.LEVEL_NODE] = name
6356     elif self.op.kind == constants.TAG_INSTANCE:
6357       name = self.cfg.ExpandInstanceName(self.op.name)
6358       if name is None:
6359         raise errors.OpPrereqError("Invalid instance name (%s)" %
6360                                    (self.op.name,))
6361       self.op.name = name
6362       self.needed_locks[locking.LEVEL_INSTANCE] = name
6363
6364   def CheckPrereq(self):
6365     """Check prerequisites.
6366
6367     """
6368     if self.op.kind == constants.TAG_CLUSTER:
6369       self.target = self.cfg.GetClusterInfo()
6370     elif self.op.kind == constants.TAG_NODE:
6371       self.target = self.cfg.GetNodeInfo(self.op.name)
6372     elif self.op.kind == constants.TAG_INSTANCE:
6373       self.target = self.cfg.GetInstanceInfo(self.op.name)
6374     else:
6375       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6376                                  str(self.op.kind))
6377
6378
6379 class LUGetTags(TagsLU):
6380   """Returns the tags of a given object.
6381
6382   """
6383   _OP_REQP = ["kind", "name"]
6384   REQ_BGL = False
6385
6386   def Exec(self, feedback_fn):
6387     """Returns the tag list.
6388
6389     """
6390     return list(self.target.GetTags())
6391
6392
6393 class LUSearchTags(NoHooksLU):
6394   """Searches the tags for a given pattern.
6395
6396   """
6397   _OP_REQP = ["pattern"]
6398   REQ_BGL = False
6399
6400   def ExpandNames(self):
6401     self.needed_locks = {}
6402
6403   def CheckPrereq(self):
6404     """Check prerequisites.
6405
6406     This checks the pattern passed for validity by compiling it.
6407
6408     """
6409     try:
6410       self.re = re.compile(self.op.pattern)
6411     except re.error, err:
6412       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6413                                  (self.op.pattern, err))
6414
6415   def Exec(self, feedback_fn):
6416     """Returns the tag list.
6417
6418     """
6419     cfg = self.cfg
6420     tgts = [("/cluster", cfg.GetClusterInfo())]
6421     ilist = cfg.GetAllInstancesInfo().values()
6422     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6423     nlist = cfg.GetAllNodesInfo().values()
6424     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6425     results = []
6426     for path, target in tgts:
6427       for tag in target.GetTags():
6428         if self.re.search(tag):
6429           results.append((path, tag))
6430     return results
6431
6432
6433 class LUAddTags(TagsLU):
6434   """Sets a tag on a given object.
6435
6436   """
6437   _OP_REQP = ["kind", "name", "tags"]
6438   REQ_BGL = False
6439
6440   def CheckPrereq(self):
6441     """Check prerequisites.
6442
6443     This checks the type and length of the tag name and value.
6444
6445     """
6446     TagsLU.CheckPrereq(self)
6447     for tag in self.op.tags:
6448       objects.TaggableObject.ValidateTag(tag)
6449
6450   def Exec(self, feedback_fn):
6451     """Sets the tag.
6452
6453     """
6454     try:
6455       for tag in self.op.tags:
6456         self.target.AddTag(tag)
6457     except errors.TagError, err:
6458       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6459     try:
6460       self.cfg.Update(self.target)
6461     except errors.ConfigurationError:
6462       raise errors.OpRetryError("There has been a modification to the"
6463                                 " config file and the operation has been"
6464                                 " aborted. Please retry.")
6465
6466
6467 class LUDelTags(TagsLU):
6468   """Delete a list of tags from a given object.
6469
6470   """
6471   _OP_REQP = ["kind", "name", "tags"]
6472   REQ_BGL = False
6473
6474   def CheckPrereq(self):
6475     """Check prerequisites.
6476
6477     This checks that we have the given tag.
6478
6479     """
6480     TagsLU.CheckPrereq(self)
6481     for tag in self.op.tags:
6482       objects.TaggableObject.ValidateTag(tag)
6483     del_tags = frozenset(self.op.tags)
6484     cur_tags = self.target.GetTags()
6485     if not del_tags <= cur_tags:
6486       diff_tags = del_tags - cur_tags
6487       diff_names = ["'%s'" % tag for tag in diff_tags]
6488       diff_names.sort()
6489       raise errors.OpPrereqError("Tag(s) %s not found" %
6490                                  (",".join(diff_names)))
6491
6492   def Exec(self, feedback_fn):
6493     """Remove the tag from the object.
6494
6495     """
6496     for tag in self.op.tags:
6497       self.target.RemoveTag(tag)
6498     try:
6499       self.cfg.Update(self.target)
6500     except errors.ConfigurationError:
6501       raise errors.OpRetryError("There has been a modification to the"
6502                                 " config file and the operation has been"
6503                                 " aborted. Please retry.")
6504
6505
6506 class LUTestDelay(NoHooksLU):
6507   """Sleep for a specified amount of time.
6508
6509   This LU sleeps on the master and/or nodes for a specified amount of
6510   time.
6511
6512   """
6513   _OP_REQP = ["duration", "on_master", "on_nodes"]
6514   REQ_BGL = False
6515
6516   def ExpandNames(self):
6517     """Expand names and set required locks.
6518
6519     This expands the node list, if any.
6520
6521     """
6522     self.needed_locks = {}
6523     if self.op.on_nodes:
6524       # _GetWantedNodes can be used here, but is not always appropriate to use
6525       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6526       # more information.
6527       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6528       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6529
6530   def CheckPrereq(self):
6531     """Check prerequisites.
6532
6533     """
6534
6535   def Exec(self, feedback_fn):
6536     """Do the actual sleep.
6537
6538     """
6539     if self.op.on_master:
6540       if not utils.TestDelay(self.op.duration):
6541         raise errors.OpExecError("Error during master delay test")
6542     if self.op.on_nodes:
6543       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6544       if not result:
6545         raise errors.OpExecError("Complete failure from rpc call")
6546       for node, node_result in result.items():
6547         node_result.Raise()
6548         if not node_result.data:
6549           raise errors.OpExecError("Failure during rpc call to node %s,"
6550                                    " result: %s" % (node, node_result.data))
6551
6552
6553 class IAllocator(object):
6554   """IAllocator framework.
6555
6556   An IAllocator instance has three sets of attributes:
6557     - cfg that is needed to query the cluster
6558     - input data (all members of the _KEYS class attribute are required)
6559     - four buffer attributes (in|out_data|text), that represent the
6560       input (to the external script) in text and data structure format,
6561       and the output from it, again in two formats
6562     - the result variables from the script (success, info, nodes) for
6563       easy usage
6564
6565   """
6566   _ALLO_KEYS = [
6567     "mem_size", "disks", "disk_template",
6568     "os", "tags", "nics", "vcpus", "hypervisor",
6569     ]
6570   _RELO_KEYS = [
6571     "relocate_from",
6572     ]
6573
6574   def __init__(self, lu, mode, name, **kwargs):
6575     self.lu = lu
6576     # init buffer variables
6577     self.in_text = self.out_text = self.in_data = self.out_data = None
6578     # init all input fields so that pylint is happy
6579     self.mode = mode
6580     self.name = name
6581     self.mem_size = self.disks = self.disk_template = None
6582     self.os = self.tags = self.nics = self.vcpus = None
6583     self.hypervisor = None
6584     self.relocate_from = None
6585     # computed fields
6586     self.required_nodes = None
6587     # init result fields
6588     self.success = self.info = self.nodes = None
6589     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6590       keyset = self._ALLO_KEYS
6591     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6592       keyset = self._RELO_KEYS
6593     else:
6594       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6595                                    " IAllocator" % self.mode)
6596     for key in kwargs:
6597       if key not in keyset:
6598         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6599                                      " IAllocator" % key)
6600       setattr(self, key, kwargs[key])
6601     for key in keyset:
6602       if key not in kwargs:
6603         raise errors.ProgrammerError("Missing input parameter '%s' to"
6604                                      " IAllocator" % key)
6605     self._BuildInputData()
6606
6607   def _ComputeClusterData(self):
6608     """Compute the generic allocator input data.
6609
6610     This is the data that is independent of the actual operation.
6611
6612     """
6613     cfg = self.lu.cfg
6614     cluster_info = cfg.GetClusterInfo()
6615     # cluster data
6616     data = {
6617       "version": constants.IALLOCATOR_VERSION,
6618       "cluster_name": cfg.GetClusterName(),
6619       "cluster_tags": list(cluster_info.GetTags()),
6620       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6621       # we don't have job IDs
6622       }
6623     iinfo = cfg.GetAllInstancesInfo().values()
6624     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6625
6626     # node data
6627     node_results = {}
6628     node_list = cfg.GetNodeList()
6629
6630     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6631       hypervisor_name = self.hypervisor
6632     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6633       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6634
6635     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6636                                            hypervisor_name)
6637     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6638                        cluster_info.enabled_hypervisors)
6639     for nname, nresult in node_data.items():
6640       # first fill in static (config-based) values
6641       ninfo = cfg.GetNodeInfo(nname)
6642       pnr = {
6643         "tags": list(ninfo.GetTags()),
6644         "primary_ip": ninfo.primary_ip,
6645         "secondary_ip": ninfo.secondary_ip,
6646         "offline": ninfo.offline,
6647         "drained": ninfo.drained,
6648         "master_candidate": ninfo.master_candidate,
6649         }
6650
6651       if not ninfo.offline:
6652         nresult.Raise()
6653         if not isinstance(nresult.data, dict):
6654           raise errors.OpExecError("Can't get data for node %s" % nname)
6655         remote_info = nresult.data
6656         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6657                      'vg_size', 'vg_free', 'cpu_total']:
6658           if attr not in remote_info:
6659             raise errors.OpExecError("Node '%s' didn't return attribute"
6660                                      " '%s'" % (nname, attr))
6661           try:
6662             remote_info[attr] = int(remote_info[attr])
6663           except ValueError, err:
6664             raise errors.OpExecError("Node '%s' returned invalid value"
6665                                      " for '%s': %s" % (nname, attr, err))
6666         # compute memory used by primary instances
6667         i_p_mem = i_p_up_mem = 0
6668         for iinfo, beinfo in i_list:
6669           if iinfo.primary_node == nname:
6670             i_p_mem += beinfo[constants.BE_MEMORY]
6671             if iinfo.name not in node_iinfo[nname].data:
6672               i_used_mem = 0
6673             else:
6674               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6675             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6676             remote_info['memory_free'] -= max(0, i_mem_diff)
6677
6678             if iinfo.admin_up:
6679               i_p_up_mem += beinfo[constants.BE_MEMORY]
6680
6681         # compute memory used by instances
6682         pnr_dyn = {
6683           "total_memory": remote_info['memory_total'],
6684           "reserved_memory": remote_info['memory_dom0'],
6685           "free_memory": remote_info['memory_free'],
6686           "total_disk": remote_info['vg_size'],
6687           "free_disk": remote_info['vg_free'],
6688           "total_cpus": remote_info['cpu_total'],
6689           "i_pri_memory": i_p_mem,
6690           "i_pri_up_memory": i_p_up_mem,
6691           }
6692         pnr.update(pnr_dyn)
6693
6694       node_results[nname] = pnr
6695     data["nodes"] = node_results
6696
6697     # instance data
6698     instance_data = {}
6699     for iinfo, beinfo in i_list:
6700       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6701                   for n in iinfo.nics]
6702       pir = {
6703         "tags": list(iinfo.GetTags()),
6704         "admin_up": iinfo.admin_up,
6705         "vcpus": beinfo[constants.BE_VCPUS],
6706         "memory": beinfo[constants.BE_MEMORY],
6707         "os": iinfo.os,
6708         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6709         "nics": nic_data,
6710         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6711         "disk_template": iinfo.disk_template,
6712         "hypervisor": iinfo.hypervisor,
6713         }
6714       instance_data[iinfo.name] = pir
6715
6716     data["instances"] = instance_data
6717
6718     self.in_data = data
6719
6720   def _AddNewInstance(self):
6721     """Add new instance data to allocator structure.
6722
6723     This in combination with _AllocatorGetClusterData will create the
6724     correct structure needed as input for the allocator.
6725
6726     The checks for the completeness of the opcode must have already been
6727     done.
6728
6729     """
6730     data = self.in_data
6731
6732     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6733
6734     if self.disk_template in constants.DTS_NET_MIRROR:
6735       self.required_nodes = 2
6736     else:
6737       self.required_nodes = 1
6738     request = {
6739       "type": "allocate",
6740       "name": self.name,
6741       "disk_template": self.disk_template,
6742       "tags": self.tags,
6743       "os": self.os,
6744       "vcpus": self.vcpus,
6745       "memory": self.mem_size,
6746       "disks": self.disks,
6747       "disk_space_total": disk_space,
6748       "nics": self.nics,
6749       "required_nodes": self.required_nodes,
6750       }
6751     data["request"] = request
6752
6753   def _AddRelocateInstance(self):
6754     """Add relocate instance data to allocator structure.
6755
6756     This in combination with _IAllocatorGetClusterData will create the
6757     correct structure needed as input for the allocator.
6758
6759     The checks for the completeness of the opcode must have already been
6760     done.
6761
6762     """
6763     instance = self.lu.cfg.GetInstanceInfo(self.name)
6764     if instance is None:
6765       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6766                                    " IAllocator" % self.name)
6767
6768     if instance.disk_template not in constants.DTS_NET_MIRROR:
6769       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6770
6771     if len(instance.secondary_nodes) != 1:
6772       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6773
6774     self.required_nodes = 1
6775     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6776     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6777
6778     request = {
6779       "type": "relocate",
6780       "name": self.name,
6781       "disk_space_total": disk_space,
6782       "required_nodes": self.required_nodes,
6783       "relocate_from": self.relocate_from,
6784       }
6785     self.in_data["request"] = request
6786
6787   def _BuildInputData(self):
6788     """Build input data structures.
6789
6790     """
6791     self._ComputeClusterData()
6792
6793     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6794       self._AddNewInstance()
6795     else:
6796       self._AddRelocateInstance()
6797
6798     self.in_text = serializer.Dump(self.in_data)
6799
6800   def Run(self, name, validate=True, call_fn=None):
6801     """Run an instance allocator and return the results.
6802
6803     """
6804     if call_fn is None:
6805       call_fn = self.lu.rpc.call_iallocator_runner
6806     data = self.in_text
6807
6808     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6809     result.Raise()
6810
6811     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6812       raise errors.OpExecError("Invalid result from master iallocator runner")
6813
6814     rcode, stdout, stderr, fail = result.data
6815
6816     if rcode == constants.IARUN_NOTFOUND:
6817       raise errors.OpExecError("Can't find allocator '%s'" % name)
6818     elif rcode == constants.IARUN_FAILURE:
6819       raise errors.OpExecError("Instance allocator call failed: %s,"
6820                                " output: %s" % (fail, stdout+stderr))
6821     self.out_text = stdout
6822     if validate:
6823       self._ValidateResult()
6824
6825   def _ValidateResult(self):
6826     """Process the allocator results.
6827
6828     This will process and if successful save the result in
6829     self.out_data and the other parameters.
6830
6831     """
6832     try:
6833       rdict = serializer.Load(self.out_text)
6834     except Exception, err:
6835       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6836
6837     if not isinstance(rdict, dict):
6838       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6839
6840     for key in "success", "info", "nodes":
6841       if key not in rdict:
6842         raise errors.OpExecError("Can't parse iallocator results:"
6843                                  " missing key '%s'" % key)
6844       setattr(self, key, rdict[key])
6845
6846     if not isinstance(rdict["nodes"], list):
6847       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6848                                " is not a list")
6849     self.out_data = rdict
6850
6851
6852 class LUTestAllocator(NoHooksLU):
6853   """Run allocator tests.
6854
6855   This LU runs the allocator tests
6856
6857   """
6858   _OP_REQP = ["direction", "mode", "name"]
6859
6860   def CheckPrereq(self):
6861     """Check prerequisites.
6862
6863     This checks the opcode parameters depending on the director and mode test.
6864
6865     """
6866     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6867       for attr in ["name", "mem_size", "disks", "disk_template",
6868                    "os", "tags", "nics", "vcpus"]:
6869         if not hasattr(self.op, attr):
6870           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6871                                      attr)
6872       iname = self.cfg.ExpandInstanceName(self.op.name)
6873       if iname is not None:
6874         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6875                                    iname)
6876       if not isinstance(self.op.nics, list):
6877         raise errors.OpPrereqError("Invalid parameter 'nics'")
6878       for row in self.op.nics:
6879         if (not isinstance(row, dict) or
6880             "mac" not in row or
6881             "ip" not in row or
6882             "bridge" not in row):
6883           raise errors.OpPrereqError("Invalid contents of the"
6884                                      " 'nics' parameter")
6885       if not isinstance(self.op.disks, list):
6886         raise errors.OpPrereqError("Invalid parameter 'disks'")
6887       for row in self.op.disks:
6888         if (not isinstance(row, dict) or
6889             "size" not in row or
6890             not isinstance(row["size"], int) or
6891             "mode" not in row or
6892             row["mode"] not in ['r', 'w']):
6893           raise errors.OpPrereqError("Invalid contents of the"
6894                                      " 'disks' parameter")
6895       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6896         self.op.hypervisor = self.cfg.GetHypervisorType()
6897     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6898       if not hasattr(self.op, "name"):
6899         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6900       fname = self.cfg.ExpandInstanceName(self.op.name)
6901       if fname is None:
6902         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6903                                    self.op.name)
6904       self.op.name = fname
6905       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6906     else:
6907       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6908                                  self.op.mode)
6909
6910     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6911       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6912         raise errors.OpPrereqError("Missing allocator name")
6913     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6914       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6915                                  self.op.direction)
6916
6917   def Exec(self, feedback_fn):
6918     """Run the allocator test.
6919
6920     """
6921     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6922       ial = IAllocator(self,
6923                        mode=self.op.mode,
6924                        name=self.op.name,
6925                        mem_size=self.op.mem_size,
6926                        disks=self.op.disks,
6927                        disk_template=self.op.disk_template,
6928                        os=self.op.os,
6929                        tags=self.op.tags,
6930                        nics=self.op.nics,
6931                        vcpus=self.op.vcpus,
6932                        hypervisor=self.op.hypervisor,
6933                        )
6934     else:
6935       ial = IAllocator(self,
6936                        mode=self.op.mode,
6937                        name=self.op.name,
6938                        relocate_from=list(self.relocate_from),
6939                        )
6940
6941     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6942       result = ial.in_text
6943     else:
6944       ial.Run(self.op.allocator, validate=False)
6945       result = ial.out_text
6946     return result