692eb65ffc0ba0d7e086ef47d2b94aaac2333287
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396   return wanted
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _CheckBooleanOpField(op, name):
419   """Validates boolean opcode parameters.
420
421   This will ensure that an opcode parameter is either a boolean value,
422   or None (but that it always exists).
423
424   """
425   val = getattr(op, name, None)
426   if not (val is None or isinstance(val, bool)):
427     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428                                (name, str(val)))
429   setattr(op, name, val)
430
431
432 def _CheckNodeOnline(lu, node):
433   """Ensure that a given node is online.
434
435   @param lu: the LU on behalf of which we make the check
436   @param node: the node to check
437   @raise errors.OpPrereqError: if the node is offline
438
439   """
440   if lu.cfg.GetNodeInfo(node).offline:
441     raise errors.OpPrereqError("Can't use offline node %s" % node)
442
443
444 def _CheckNodeNotDrained(lu, node):
445   """Ensure that a given node is not drained.
446
447   @param lu: the LU on behalf of which we make the check
448   @param node: the node to check
449   @raise errors.OpPrereqError: if the node is drained
450
451   """
452   if lu.cfg.GetNodeInfo(node).drained:
453     raise errors.OpPrereqError("Can't use drained node %s" % node)
454
455
456 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457                           memory, vcpus, nics, disk_template, disks):
458   """Builds instance related env variables for hooks
459
460   This builds the hook environment from individual variables.
461
462   @type name: string
463   @param name: the name of the instance
464   @type primary_node: string
465   @param primary_node: the name of the instance's primary node
466   @type secondary_nodes: list
467   @param secondary_nodes: list of secondary nodes as strings
468   @type os_type: string
469   @param os_type: the name of the instance's OS
470   @type status: boolean
471   @param status: the should_run status of the instance
472   @type memory: string
473   @param memory: the memory size of the instance
474   @type vcpus: string
475   @param vcpus: the count of VCPUs the instance has
476   @type nics: list
477   @param nics: list of tuples (ip, bridge, mac) representing
478       the NICs the instance  has
479   @type disk_template: string
480   @param disk_template: the distk template of the instance
481   @type disks: list
482   @param disks: the list of (size, mode) pairs
483   @rtype: dict
484   @return: the hook environment for this instance
485
486   """
487   if status:
488     str_status = "up"
489   else:
490     str_status = "down"
491   env = {
492     "OP_TARGET": name,
493     "INSTANCE_NAME": name,
494     "INSTANCE_PRIMARY": primary_node,
495     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
496     "INSTANCE_OS_TYPE": os_type,
497     "INSTANCE_STATUS": str_status,
498     "INSTANCE_MEMORY": memory,
499     "INSTANCE_VCPUS": vcpus,
500     "INSTANCE_DISK_TEMPLATE": disk_template,
501   }
502
503   if nics:
504     nic_count = len(nics)
505     for idx, (ip, bridge, mac) in enumerate(nics):
506       if ip is None:
507         ip = ""
508       env["INSTANCE_NIC%d_IP" % idx] = ip
509       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
510       env["INSTANCE_NIC%d_MAC" % idx] = mac
511   else:
512     nic_count = 0
513
514   env["INSTANCE_NIC_COUNT"] = nic_count
515
516   if disks:
517     disk_count = len(disks)
518     for idx, (size, mode) in enumerate(disks):
519       env["INSTANCE_DISK%d_SIZE" % idx] = size
520       env["INSTANCE_DISK%d_MODE" % idx] = mode
521   else:
522     disk_count = 0
523
524   env["INSTANCE_DISK_COUNT"] = disk_count
525
526   return env
527
528
529 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
530   """Builds instance related env variables for hooks from an object.
531
532   @type lu: L{LogicalUnit}
533   @param lu: the logical unit on whose behalf we execute
534   @type instance: L{objects.Instance}
535   @param instance: the instance for which we should build the
536       environment
537   @type override: dict
538   @param override: dictionary with key/values that will override
539       our values
540   @rtype: dict
541   @return: the hook environment dictionary
542
543   """
544   bep = lu.cfg.GetClusterInfo().FillBE(instance)
545   args = {
546     'name': instance.name,
547     'primary_node': instance.primary_node,
548     'secondary_nodes': instance.secondary_nodes,
549     'os_type': instance.os,
550     'status': instance.admin_up,
551     'memory': bep[constants.BE_MEMORY],
552     'vcpus': bep[constants.BE_VCPUS],
553     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
554     'disk_template': instance.disk_template,
555     'disks': [(disk.size, disk.mode) for disk in instance.disks],
556   }
557   if override:
558     args.update(override)
559   return _BuildInstanceHookEnv(**args)
560
561
562 def _AdjustCandidatePool(lu):
563   """Adjust the candidate pool after node operations.
564
565   """
566   mod_list = lu.cfg.MaintainCandidatePool()
567   if mod_list:
568     lu.LogInfo("Promoted nodes to master candidate role: %s",
569                ", ".join(node.name for node in mod_list))
570     for name in mod_list:
571       lu.context.ReaddNode(name)
572   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
573   if mc_now > mc_max:
574     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
575                (mc_now, mc_max))
576
577
578 def _CheckInstanceBridgesExist(lu, instance):
579   """Check that the brigdes needed by an instance exist.
580
581   """
582   # check bridges existance
583   brlist = [nic.bridge for nic in instance.nics]
584   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
585   result.Raise()
586   if not result.data:
587     raise errors.OpPrereqError("One or more target bridges %s does not"
588                                " exist on destination node '%s'" %
589                                (brlist, instance.primary_node))
590
591
592 class LUDestroyCluster(NoHooksLU):
593   """Logical unit for destroying the cluster.
594
595   """
596   _OP_REQP = []
597
598   def CheckPrereq(self):
599     """Check prerequisites.
600
601     This checks whether the cluster is empty.
602
603     Any errors are signalled by raising errors.OpPrereqError.
604
605     """
606     master = self.cfg.GetMasterNode()
607
608     nodelist = self.cfg.GetNodeList()
609     if len(nodelist) != 1 or nodelist[0] != master:
610       raise errors.OpPrereqError("There are still %d node(s) in"
611                                  " this cluster." % (len(nodelist) - 1))
612     instancelist = self.cfg.GetInstanceList()
613     if instancelist:
614       raise errors.OpPrereqError("There are still %d instance(s) in"
615                                  " this cluster." % len(instancelist))
616
617   def Exec(self, feedback_fn):
618     """Destroys the cluster.
619
620     """
621     master = self.cfg.GetMasterNode()
622     result = self.rpc.call_node_stop_master(master, False)
623     result.Raise()
624     if not result.data:
625       raise errors.OpExecError("Could not disable the master role")
626     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
627     utils.CreateBackup(priv_key)
628     utils.CreateBackup(pub_key)
629     return master
630
631
632 class LUVerifyCluster(LogicalUnit):
633   """Verifies the cluster status.
634
635   """
636   HPATH = "cluster-verify"
637   HTYPE = constants.HTYPE_CLUSTER
638   _OP_REQP = ["skip_checks"]
639   REQ_BGL = False
640
641   def ExpandNames(self):
642     self.needed_locks = {
643       locking.LEVEL_NODE: locking.ALL_SET,
644       locking.LEVEL_INSTANCE: locking.ALL_SET,
645     }
646     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
647
648   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
649                   node_result, feedback_fn, master_files,
650                   drbd_map, vg_name):
651     """Run multiple tests against a node.
652
653     Test list:
654
655       - compares ganeti version
656       - checks vg existance and size > 20G
657       - checks config file checksum
658       - checks ssh to other nodes
659
660     @type nodeinfo: L{objects.Node}
661     @param nodeinfo: the node to check
662     @param file_list: required list of files
663     @param local_cksum: dictionary of local files and their checksums
664     @param node_result: the results from the node
665     @param feedback_fn: function used to accumulate results
666     @param master_files: list of files that only masters should have
667     @param drbd_map: the useddrbd minors for this node, in
668         form of minor: (instance, must_exist) which correspond to instances
669         and their running status
670     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
671
672     """
673     node = nodeinfo.name
674
675     # main result, node_result should be a non-empty dict
676     if not node_result or not isinstance(node_result, dict):
677       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
678       return True
679
680     # compares ganeti version
681     local_version = constants.PROTOCOL_VERSION
682     remote_version = node_result.get('version', None)
683     if not (remote_version and isinstance(remote_version, (list, tuple)) and
684             len(remote_version) == 2):
685       feedback_fn("  - ERROR: connection to %s failed" % (node))
686       return True
687
688     if local_version != remote_version[0]:
689       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
690                   " node %s %s" % (local_version, node, remote_version[0]))
691       return True
692
693     # node seems compatible, we can actually try to look into its results
694
695     bad = False
696
697     # full package version
698     if constants.RELEASE_VERSION != remote_version[1]:
699       feedback_fn("  - WARNING: software version mismatch: master %s,"
700                   " node %s %s" %
701                   (constants.RELEASE_VERSION, node, remote_version[1]))
702
703     # checks vg existence and size > 20G
704     if vg_name is not None:
705       vglist = node_result.get(constants.NV_VGLIST, None)
706       if not vglist:
707         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
708                         (node,))
709         bad = True
710       else:
711         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
712                                               constants.MIN_VG_SIZE)
713         if vgstatus:
714           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
715           bad = True
716
717     # checks config file checksum
718
719     remote_cksum = node_result.get(constants.NV_FILELIST, None)
720     if not isinstance(remote_cksum, dict):
721       bad = True
722       feedback_fn("  - ERROR: node hasn't returned file checksum data")
723     else:
724       for file_name in file_list:
725         node_is_mc = nodeinfo.master_candidate
726         must_have_file = file_name not in master_files
727         if file_name not in remote_cksum:
728           if node_is_mc or must_have_file:
729             bad = True
730             feedback_fn("  - ERROR: file '%s' missing" % file_name)
731         elif remote_cksum[file_name] != local_cksum[file_name]:
732           if node_is_mc or must_have_file:
733             bad = True
734             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
735           else:
736             # not candidate and this is not a must-have file
737             bad = True
738             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
739                         " '%s'" % file_name)
740         else:
741           # all good, except non-master/non-must have combination
742           if not node_is_mc and not must_have_file:
743             feedback_fn("  - ERROR: file '%s' should not exist on non master"
744                         " candidates" % file_name)
745
746     # checks ssh to any
747
748     if constants.NV_NODELIST not in node_result:
749       bad = True
750       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
751     else:
752       if node_result[constants.NV_NODELIST]:
753         bad = True
754         for node in node_result[constants.NV_NODELIST]:
755           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
756                           (node, node_result[constants.NV_NODELIST][node]))
757
758     if constants.NV_NODENETTEST not in node_result:
759       bad = True
760       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
761     else:
762       if node_result[constants.NV_NODENETTEST]:
763         bad = True
764         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
765         for node in nlist:
766           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
767                           (node, node_result[constants.NV_NODENETTEST][node]))
768
769     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
770     if isinstance(hyp_result, dict):
771       for hv_name, hv_result in hyp_result.iteritems():
772         if hv_result is not None:
773           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
774                       (hv_name, hv_result))
775
776     # check used drbd list
777     if vg_name is not None:
778       used_minors = node_result.get(constants.NV_DRBDLIST, [])
779       if not isinstance(used_minors, (tuple, list)):
780         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
781                     str(used_minors))
782       else:
783         for minor, (iname, must_exist) in drbd_map.items():
784           if minor not in used_minors and must_exist:
785             feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
786                         (minor, iname))
787             bad = True
788         for minor in used_minors:
789           if minor not in drbd_map:
790             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
791             bad = True
792
793     return bad
794
795   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
796                       node_instance, feedback_fn, n_offline):
797     """Verify an instance.
798
799     This function checks to see if the required block devices are
800     available on the instance's node.
801
802     """
803     bad = False
804
805     node_current = instanceconfig.primary_node
806
807     node_vol_should = {}
808     instanceconfig.MapLVsByNode(node_vol_should)
809
810     for node in node_vol_should:
811       if node in n_offline:
812         # ignore missing volumes on offline nodes
813         continue
814       for volume in node_vol_should[node]:
815         if node not in node_vol_is or volume not in node_vol_is[node]:
816           feedback_fn("  - ERROR: volume %s missing on node %s" %
817                           (volume, node))
818           bad = True
819
820     if instanceconfig.admin_up:
821       if ((node_current not in node_instance or
822           not instance in node_instance[node_current]) and
823           node_current not in n_offline):
824         feedback_fn("  - ERROR: instance %s not running on node %s" %
825                         (instance, node_current))
826         bad = True
827
828     for node in node_instance:
829       if (not node == node_current):
830         if instance in node_instance[node]:
831           feedback_fn("  - ERROR: instance %s should not run on node %s" %
832                           (instance, node))
833           bad = True
834
835     return bad
836
837   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
838     """Verify if there are any unknown volumes in the cluster.
839
840     The .os, .swap and backup volumes are ignored. All other volumes are
841     reported as unknown.
842
843     """
844     bad = False
845
846     for node in node_vol_is:
847       for volume in node_vol_is[node]:
848         if node not in node_vol_should or volume not in node_vol_should[node]:
849           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
850                       (volume, node))
851           bad = True
852     return bad
853
854   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
855     """Verify the list of running instances.
856
857     This checks what instances are running but unknown to the cluster.
858
859     """
860     bad = False
861     for node in node_instance:
862       for runninginstance in node_instance[node]:
863         if runninginstance not in instancelist:
864           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
865                           (runninginstance, node))
866           bad = True
867     return bad
868
869   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
870     """Verify N+1 Memory Resilience.
871
872     Check that if one single node dies we can still start all the instances it
873     was primary for.
874
875     """
876     bad = False
877
878     for node, nodeinfo in node_info.iteritems():
879       # This code checks that every node which is now listed as secondary has
880       # enough memory to host all instances it is supposed to should a single
881       # other node in the cluster fail.
882       # FIXME: not ready for failover to an arbitrary node
883       # FIXME: does not support file-backed instances
884       # WARNING: we currently take into account down instances as well as up
885       # ones, considering that even if they're down someone might want to start
886       # them even in the event of a node failure.
887       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
888         needed_mem = 0
889         for instance in instances:
890           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
891           if bep[constants.BE_AUTO_BALANCE]:
892             needed_mem += bep[constants.BE_MEMORY]
893         if nodeinfo['mfree'] < needed_mem:
894           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
895                       " failovers should node %s fail" % (node, prinode))
896           bad = True
897     return bad
898
899   def CheckPrereq(self):
900     """Check prerequisites.
901
902     Transform the list of checks we're going to skip into a set and check that
903     all its members are valid.
904
905     """
906     self.skip_set = frozenset(self.op.skip_checks)
907     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
908       raise errors.OpPrereqError("Invalid checks to be skipped specified")
909
910   def BuildHooksEnv(self):
911     """Build hooks env.
912
913     Cluster-Verify hooks just rone in the post phase and their failure makes
914     the output be logged in the verify output and the verification to fail.
915
916     """
917     all_nodes = self.cfg.GetNodeList()
918     # TODO: populate the environment with useful information for verify hooks
919     env = {}
920     return env, [], all_nodes
921
922   def Exec(self, feedback_fn):
923     """Verify integrity of cluster, performing various test on nodes.
924
925     """
926     bad = False
927     feedback_fn("* Verifying global settings")
928     for msg in self.cfg.VerifyConfig():
929       feedback_fn("  - ERROR: %s" % msg)
930
931     vg_name = self.cfg.GetVGName()
932     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
933     nodelist = utils.NiceSort(self.cfg.GetNodeList())
934     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
935     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
936     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
937                         for iname in instancelist)
938     i_non_redundant = [] # Non redundant instances
939     i_non_a_balanced = [] # Non auto-balanced instances
940     n_offline = [] # List of offline nodes
941     n_drained = [] # List of nodes being drained
942     node_volume = {}
943     node_instance = {}
944     node_info = {}
945     instance_cfg = {}
946
947     # FIXME: verify OS list
948     # do local checksums
949     master_files = [constants.CLUSTER_CONF_FILE]
950
951     file_names = ssconf.SimpleStore().GetFileList()
952     file_names.append(constants.SSL_CERT_FILE)
953     file_names.append(constants.RAPI_CERT_FILE)
954     file_names.extend(master_files)
955
956     local_checksums = utils.FingerprintFiles(file_names)
957
958     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
959     node_verify_param = {
960       constants.NV_FILELIST: file_names,
961       constants.NV_NODELIST: [node.name for node in nodeinfo
962                               if not node.offline],
963       constants.NV_HYPERVISOR: hypervisors,
964       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
965                                   node.secondary_ip) for node in nodeinfo
966                                  if not node.offline],
967       constants.NV_INSTANCELIST: hypervisors,
968       constants.NV_VERSION: None,
969       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
970       }
971     if vg_name is not None:
972       node_verify_param[constants.NV_VGLIST] = None
973       node_verify_param[constants.NV_LVLIST] = vg_name
974       node_verify_param[constants.NV_DRBDLIST] = None
975     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
976                                            self.cfg.GetClusterName())
977
978     cluster = self.cfg.GetClusterInfo()
979     master_node = self.cfg.GetMasterNode()
980     all_drbd_map = self.cfg.ComputeDRBDMap()
981
982     for node_i in nodeinfo:
983       node = node_i.name
984       nresult = all_nvinfo[node].data
985
986       if node_i.offline:
987         feedback_fn("* Skipping offline node %s" % (node,))
988         n_offline.append(node)
989         continue
990
991       if node == master_node:
992         ntype = "master"
993       elif node_i.master_candidate:
994         ntype = "master candidate"
995       elif node_i.drained:
996         ntype = "drained"
997         n_drained.append(node)
998       else:
999         ntype = "regular"
1000       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1001
1002       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1003         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1004         bad = True
1005         continue
1006
1007       node_drbd = {}
1008       for minor, instance in all_drbd_map[node].items():
1009         instance = instanceinfo[instance]
1010         node_drbd[minor] = (instance.name, instance.admin_up)
1011       result = self._VerifyNode(node_i, file_names, local_checksums,
1012                                 nresult, feedback_fn, master_files,
1013                                 node_drbd, vg_name)
1014       bad = bad or result
1015
1016       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1017       if vg_name is None:
1018         node_volume[node] = {}
1019       elif isinstance(lvdata, basestring):
1020         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1021                     (node, utils.SafeEncode(lvdata)))
1022         bad = True
1023         node_volume[node] = {}
1024       elif not isinstance(lvdata, dict):
1025         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1026         bad = True
1027         continue
1028       else:
1029         node_volume[node] = lvdata
1030
1031       # node_instance
1032       idata = nresult.get(constants.NV_INSTANCELIST, None)
1033       if not isinstance(idata, list):
1034         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1035                     (node,))
1036         bad = True
1037         continue
1038
1039       node_instance[node] = idata
1040
1041       # node_info
1042       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1043       if not isinstance(nodeinfo, dict):
1044         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1045         bad = True
1046         continue
1047
1048       try:
1049         node_info[node] = {
1050           "mfree": int(nodeinfo['memory_free']),
1051           "pinst": [],
1052           "sinst": [],
1053           # dictionary holding all instances this node is secondary for,
1054           # grouped by their primary node. Each key is a cluster node, and each
1055           # value is a list of instances which have the key as primary and the
1056           # current node as secondary.  this is handy to calculate N+1 memory
1057           # availability if you can only failover from a primary to its
1058           # secondary.
1059           "sinst-by-pnode": {},
1060         }
1061         # FIXME: devise a free space model for file based instances as well
1062         if vg_name is not None:
1063           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1064       except ValueError:
1065         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1066         bad = True
1067         continue
1068
1069     node_vol_should = {}
1070
1071     for instance in instancelist:
1072       feedback_fn("* Verifying instance %s" % instance)
1073       inst_config = instanceinfo[instance]
1074       result =  self._VerifyInstance(instance, inst_config, node_volume,
1075                                      node_instance, feedback_fn, n_offline)
1076       bad = bad or result
1077       inst_nodes_offline = []
1078
1079       inst_config.MapLVsByNode(node_vol_should)
1080
1081       instance_cfg[instance] = inst_config
1082
1083       pnode = inst_config.primary_node
1084       if pnode in node_info:
1085         node_info[pnode]['pinst'].append(instance)
1086       elif pnode not in n_offline:
1087         feedback_fn("  - ERROR: instance %s, connection to primary node"
1088                     " %s failed" % (instance, pnode))
1089         bad = True
1090
1091       if pnode in n_offline:
1092         inst_nodes_offline.append(pnode)
1093
1094       # If the instance is non-redundant we cannot survive losing its primary
1095       # node, so we are not N+1 compliant. On the other hand we have no disk
1096       # templates with more than one secondary so that situation is not well
1097       # supported either.
1098       # FIXME: does not support file-backed instances
1099       if len(inst_config.secondary_nodes) == 0:
1100         i_non_redundant.append(instance)
1101       elif len(inst_config.secondary_nodes) > 1:
1102         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1103                     % instance)
1104
1105       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1106         i_non_a_balanced.append(instance)
1107
1108       for snode in inst_config.secondary_nodes:
1109         if snode in node_info:
1110           node_info[snode]['sinst'].append(instance)
1111           if pnode not in node_info[snode]['sinst-by-pnode']:
1112             node_info[snode]['sinst-by-pnode'][pnode] = []
1113           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1114         elif snode not in n_offline:
1115           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1116                       " %s failed" % (instance, snode))
1117           bad = True
1118         if snode in n_offline:
1119           inst_nodes_offline.append(snode)
1120
1121       if inst_nodes_offline:
1122         # warn that the instance lives on offline nodes, and set bad=True
1123         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1124                     ", ".join(inst_nodes_offline))
1125         bad = True
1126
1127     feedback_fn("* Verifying orphan volumes")
1128     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1129                                        feedback_fn)
1130     bad = bad or result
1131
1132     feedback_fn("* Verifying remaining instances")
1133     result = self._VerifyOrphanInstances(instancelist, node_instance,
1134                                          feedback_fn)
1135     bad = bad or result
1136
1137     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1138       feedback_fn("* Verifying N+1 Memory redundancy")
1139       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1140       bad = bad or result
1141
1142     feedback_fn("* Other Notes")
1143     if i_non_redundant:
1144       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1145                   % len(i_non_redundant))
1146
1147     if i_non_a_balanced:
1148       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1149                   % len(i_non_a_balanced))
1150
1151     if n_offline:
1152       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1153
1154     if n_drained:
1155       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1156
1157     return not bad
1158
1159   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1160     """Analize the post-hooks' result
1161
1162     This method analyses the hook result, handles it, and sends some
1163     nicely-formatted feedback back to the user.
1164
1165     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1166         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1167     @param hooks_results: the results of the multi-node hooks rpc call
1168     @param feedback_fn: function used send feedback back to the caller
1169     @param lu_result: previous Exec result
1170     @return: the new Exec result, based on the previous result
1171         and hook results
1172
1173     """
1174     # We only really run POST phase hooks, and are only interested in
1175     # their results
1176     if phase == constants.HOOKS_PHASE_POST:
1177       # Used to change hooks' output to proper indentation
1178       indent_re = re.compile('^', re.M)
1179       feedback_fn("* Hooks Results")
1180       if not hooks_results:
1181         feedback_fn("  - ERROR: general communication failure")
1182         lu_result = 1
1183       else:
1184         for node_name in hooks_results:
1185           show_node_header = True
1186           res = hooks_results[node_name]
1187           if res.failed or res.data is False or not isinstance(res.data, list):
1188             if res.offline:
1189               # no need to warn or set fail return value
1190               continue
1191             feedback_fn("    Communication failure in hooks execution")
1192             lu_result = 1
1193             continue
1194           for script, hkr, output in res.data:
1195             if hkr == constants.HKR_FAIL:
1196               # The node header is only shown once, if there are
1197               # failing hooks on that node
1198               if show_node_header:
1199                 feedback_fn("  Node %s:" % node_name)
1200                 show_node_header = False
1201               feedback_fn("    ERROR: Script %s failed, output:" % script)
1202               output = indent_re.sub('      ', output)
1203               feedback_fn("%s" % output)
1204               lu_result = 1
1205
1206       return lu_result
1207
1208
1209 class LUVerifyDisks(NoHooksLU):
1210   """Verifies the cluster disks status.
1211
1212   """
1213   _OP_REQP = []
1214   REQ_BGL = False
1215
1216   def ExpandNames(self):
1217     self.needed_locks = {
1218       locking.LEVEL_NODE: locking.ALL_SET,
1219       locking.LEVEL_INSTANCE: locking.ALL_SET,
1220     }
1221     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1222
1223   def CheckPrereq(self):
1224     """Check prerequisites.
1225
1226     This has no prerequisites.
1227
1228     """
1229     pass
1230
1231   def Exec(self, feedback_fn):
1232     """Verify integrity of cluster disks.
1233
1234     """
1235     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1236
1237     vg_name = self.cfg.GetVGName()
1238     nodes = utils.NiceSort(self.cfg.GetNodeList())
1239     instances = [self.cfg.GetInstanceInfo(name)
1240                  for name in self.cfg.GetInstanceList()]
1241
1242     nv_dict = {}
1243     for inst in instances:
1244       inst_lvs = {}
1245       if (not inst.admin_up or
1246           inst.disk_template not in constants.DTS_NET_MIRROR):
1247         continue
1248       inst.MapLVsByNode(inst_lvs)
1249       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1250       for node, vol_list in inst_lvs.iteritems():
1251         for vol in vol_list:
1252           nv_dict[(node, vol)] = inst
1253
1254     if not nv_dict:
1255       return result
1256
1257     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1258
1259     to_act = set()
1260     for node in nodes:
1261       # node_volume
1262       lvs = node_lvs[node]
1263       if lvs.failed:
1264         if not lvs.offline:
1265           self.LogWarning("Connection to node %s failed: %s" %
1266                           (node, lvs.data))
1267         continue
1268       lvs = lvs.data
1269       if isinstance(lvs, basestring):
1270         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1271         res_nlvm[node] = lvs
1272       elif not isinstance(lvs, dict):
1273         logging.warning("Connection to node %s failed or invalid data"
1274                         " returned", node)
1275         res_nodes.append(node)
1276         continue
1277
1278       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1279         inst = nv_dict.pop((node, lv_name), None)
1280         if (not lv_online and inst is not None
1281             and inst.name not in res_instances):
1282           res_instances.append(inst.name)
1283
1284     # any leftover items in nv_dict are missing LVs, let's arrange the
1285     # data better
1286     for key, inst in nv_dict.iteritems():
1287       if inst.name not in res_missing:
1288         res_missing[inst.name] = []
1289       res_missing[inst.name].append(key)
1290
1291     return result
1292
1293
1294 class LURenameCluster(LogicalUnit):
1295   """Rename the cluster.
1296
1297   """
1298   HPATH = "cluster-rename"
1299   HTYPE = constants.HTYPE_CLUSTER
1300   _OP_REQP = ["name"]
1301
1302   def BuildHooksEnv(self):
1303     """Build hooks env.
1304
1305     """
1306     env = {
1307       "OP_TARGET": self.cfg.GetClusterName(),
1308       "NEW_NAME": self.op.name,
1309       }
1310     mn = self.cfg.GetMasterNode()
1311     return env, [mn], [mn]
1312
1313   def CheckPrereq(self):
1314     """Verify that the passed name is a valid one.
1315
1316     """
1317     hostname = utils.HostInfo(self.op.name)
1318
1319     new_name = hostname.name
1320     self.ip = new_ip = hostname.ip
1321     old_name = self.cfg.GetClusterName()
1322     old_ip = self.cfg.GetMasterIP()
1323     if new_name == old_name and new_ip == old_ip:
1324       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1325                                  " cluster has changed")
1326     if new_ip != old_ip:
1327       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1328         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1329                                    " reachable on the network. Aborting." %
1330                                    new_ip)
1331
1332     self.op.name = new_name
1333
1334   def Exec(self, feedback_fn):
1335     """Rename the cluster.
1336
1337     """
1338     clustername = self.op.name
1339     ip = self.ip
1340
1341     # shutdown the master IP
1342     master = self.cfg.GetMasterNode()
1343     result = self.rpc.call_node_stop_master(master, False)
1344     if result.failed or not result.data:
1345       raise errors.OpExecError("Could not disable the master role")
1346
1347     try:
1348       cluster = self.cfg.GetClusterInfo()
1349       cluster.cluster_name = clustername
1350       cluster.master_ip = ip
1351       self.cfg.Update(cluster)
1352
1353       # update the known hosts file
1354       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1355       node_list = self.cfg.GetNodeList()
1356       try:
1357         node_list.remove(master)
1358       except ValueError:
1359         pass
1360       result = self.rpc.call_upload_file(node_list,
1361                                          constants.SSH_KNOWN_HOSTS_FILE)
1362       for to_node, to_result in result.iteritems():
1363         if to_result.failed or not to_result.data:
1364           logging.error("Copy of file %s to node %s failed",
1365                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1366
1367     finally:
1368       result = self.rpc.call_node_start_master(master, False)
1369       if result.failed or not result.data:
1370         self.LogWarning("Could not re-enable the master role on"
1371                         " the master, please restart manually.")
1372
1373
1374 def _RecursiveCheckIfLVMBased(disk):
1375   """Check if the given disk or its children are lvm-based.
1376
1377   @type disk: L{objects.Disk}
1378   @param disk: the disk to check
1379   @rtype: booleean
1380   @return: boolean indicating whether a LD_LV dev_type was found or not
1381
1382   """
1383   if disk.children:
1384     for chdisk in disk.children:
1385       if _RecursiveCheckIfLVMBased(chdisk):
1386         return True
1387   return disk.dev_type == constants.LD_LV
1388
1389
1390 class LUSetClusterParams(LogicalUnit):
1391   """Change the parameters of the cluster.
1392
1393   """
1394   HPATH = "cluster-modify"
1395   HTYPE = constants.HTYPE_CLUSTER
1396   _OP_REQP = []
1397   REQ_BGL = False
1398
1399   def CheckParameters(self):
1400     """Check parameters
1401
1402     """
1403     if not hasattr(self.op, "candidate_pool_size"):
1404       self.op.candidate_pool_size = None
1405     if self.op.candidate_pool_size is not None:
1406       try:
1407         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1408       except ValueError, err:
1409         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1410                                    str(err))
1411       if self.op.candidate_pool_size < 1:
1412         raise errors.OpPrereqError("At least one master candidate needed")
1413
1414   def ExpandNames(self):
1415     # FIXME: in the future maybe other cluster params won't require checking on
1416     # all nodes to be modified.
1417     self.needed_locks = {
1418       locking.LEVEL_NODE: locking.ALL_SET,
1419     }
1420     self.share_locks[locking.LEVEL_NODE] = 1
1421
1422   def BuildHooksEnv(self):
1423     """Build hooks env.
1424
1425     """
1426     env = {
1427       "OP_TARGET": self.cfg.GetClusterName(),
1428       "NEW_VG_NAME": self.op.vg_name,
1429       }
1430     mn = self.cfg.GetMasterNode()
1431     return env, [mn], [mn]
1432
1433   def CheckPrereq(self):
1434     """Check prerequisites.
1435
1436     This checks whether the given params don't conflict and
1437     if the given volume group is valid.
1438
1439     """
1440     if self.op.vg_name is not None and not self.op.vg_name:
1441       instances = self.cfg.GetAllInstancesInfo().values()
1442       for inst in instances:
1443         for disk in inst.disks:
1444           if _RecursiveCheckIfLVMBased(disk):
1445             raise errors.OpPrereqError("Cannot disable lvm storage while"
1446                                        " lvm-based instances exist")
1447
1448     node_list = self.acquired_locks[locking.LEVEL_NODE]
1449
1450     # if vg_name not None, checks given volume group on all nodes
1451     if self.op.vg_name:
1452       vglist = self.rpc.call_vg_list(node_list)
1453       for node in node_list:
1454         if vglist[node].failed:
1455           # ignoring down node
1456           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1457           continue
1458         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1459                                               self.op.vg_name,
1460                                               constants.MIN_VG_SIZE)
1461         if vgstatus:
1462           raise errors.OpPrereqError("Error on node '%s': %s" %
1463                                      (node, vgstatus))
1464
1465     self.cluster = cluster = self.cfg.GetClusterInfo()
1466     # validate beparams changes
1467     if self.op.beparams:
1468       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1469       self.new_beparams = cluster.FillDict(
1470         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1471
1472     # hypervisor list/parameters
1473     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1474     if self.op.hvparams:
1475       if not isinstance(self.op.hvparams, dict):
1476         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1477       for hv_name, hv_dict in self.op.hvparams.items():
1478         if hv_name not in self.new_hvparams:
1479           self.new_hvparams[hv_name] = hv_dict
1480         else:
1481           self.new_hvparams[hv_name].update(hv_dict)
1482
1483     if self.op.enabled_hypervisors is not None:
1484       self.hv_list = self.op.enabled_hypervisors
1485     else:
1486       self.hv_list = cluster.enabled_hypervisors
1487
1488     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1489       # either the enabled list has changed, or the parameters have, validate
1490       for hv_name, hv_params in self.new_hvparams.items():
1491         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1492             (self.op.enabled_hypervisors and
1493              hv_name in self.op.enabled_hypervisors)):
1494           # either this is a new hypervisor, or its parameters have changed
1495           hv_class = hypervisor.GetHypervisor(hv_name)
1496           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1497           hv_class.CheckParameterSyntax(hv_params)
1498           _CheckHVParams(self, node_list, hv_name, hv_params)
1499
1500   def Exec(self, feedback_fn):
1501     """Change the parameters of the cluster.
1502
1503     """
1504     if self.op.vg_name is not None:
1505       if self.op.vg_name != self.cfg.GetVGName():
1506         self.cfg.SetVGName(self.op.vg_name)
1507       else:
1508         feedback_fn("Cluster LVM configuration already in desired"
1509                     " state, not changing")
1510     if self.op.hvparams:
1511       self.cluster.hvparams = self.new_hvparams
1512     if self.op.enabled_hypervisors is not None:
1513       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1514     if self.op.beparams:
1515       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1516     if self.op.candidate_pool_size is not None:
1517       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1518
1519     self.cfg.Update(self.cluster)
1520
1521     # we want to update nodes after the cluster so that if any errors
1522     # happen, we have recorded and saved the cluster info
1523     if self.op.candidate_pool_size is not None:
1524       _AdjustCandidatePool(self)
1525
1526
1527 class LURedistributeConfig(NoHooksLU):
1528   """Force the redistribution of cluster configuration.
1529
1530   This is a very simple LU.
1531
1532   """
1533   _OP_REQP = []
1534   REQ_BGL = False
1535
1536   def ExpandNames(self):
1537     self.needed_locks = {
1538       locking.LEVEL_NODE: locking.ALL_SET,
1539     }
1540     self.share_locks[locking.LEVEL_NODE] = 1
1541
1542   def CheckPrereq(self):
1543     """Check prerequisites.
1544
1545     """
1546
1547   def Exec(self, feedback_fn):
1548     """Redistribute the configuration.
1549
1550     """
1551     self.cfg.Update(self.cfg.GetClusterInfo())
1552
1553
1554 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1555   """Sleep and poll for an instance's disk to sync.
1556
1557   """
1558   if not instance.disks:
1559     return True
1560
1561   if not oneshot:
1562     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1563
1564   node = instance.primary_node
1565
1566   for dev in instance.disks:
1567     lu.cfg.SetDiskID(dev, node)
1568
1569   retries = 0
1570   while True:
1571     max_time = 0
1572     done = True
1573     cumul_degraded = False
1574     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1575     if rstats.failed or not rstats.data:
1576       lu.LogWarning("Can't get any data from node %s", node)
1577       retries += 1
1578       if retries >= 10:
1579         raise errors.RemoteError("Can't contact node %s for mirror data,"
1580                                  " aborting." % node)
1581       time.sleep(6)
1582       continue
1583     rstats = rstats.data
1584     retries = 0
1585     for i, mstat in enumerate(rstats):
1586       if mstat is None:
1587         lu.LogWarning("Can't compute data for node %s/%s",
1588                            node, instance.disks[i].iv_name)
1589         continue
1590       # we ignore the ldisk parameter
1591       perc_done, est_time, is_degraded, _ = mstat
1592       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1593       if perc_done is not None:
1594         done = False
1595         if est_time is not None:
1596           rem_time = "%d estimated seconds remaining" % est_time
1597           max_time = est_time
1598         else:
1599           rem_time = "no time estimate"
1600         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1601                         (instance.disks[i].iv_name, perc_done, rem_time))
1602     if done or oneshot:
1603       break
1604
1605     time.sleep(min(60, max_time))
1606
1607   if done:
1608     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1609   return not cumul_degraded
1610
1611
1612 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1613   """Check that mirrors are not degraded.
1614
1615   The ldisk parameter, if True, will change the test from the
1616   is_degraded attribute (which represents overall non-ok status for
1617   the device(s)) to the ldisk (representing the local storage status).
1618
1619   """
1620   lu.cfg.SetDiskID(dev, node)
1621   if ldisk:
1622     idx = 6
1623   else:
1624     idx = 5
1625
1626   result = True
1627   if on_primary or dev.AssembleOnSecondary():
1628     rstats = lu.rpc.call_blockdev_find(node, dev)
1629     msg = rstats.RemoteFailMsg()
1630     if msg:
1631       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1632       result = False
1633     elif not rstats.payload:
1634       lu.LogWarning("Can't find disk on node %s", node)
1635       result = False
1636     else:
1637       result = result and (not rstats.payload[idx])
1638   if dev.children:
1639     for child in dev.children:
1640       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1641
1642   return result
1643
1644
1645 class LUDiagnoseOS(NoHooksLU):
1646   """Logical unit for OS diagnose/query.
1647
1648   """
1649   _OP_REQP = ["output_fields", "names"]
1650   REQ_BGL = False
1651   _FIELDS_STATIC = utils.FieldSet()
1652   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1653
1654   def ExpandNames(self):
1655     if self.op.names:
1656       raise errors.OpPrereqError("Selective OS query not supported")
1657
1658     _CheckOutputFields(static=self._FIELDS_STATIC,
1659                        dynamic=self._FIELDS_DYNAMIC,
1660                        selected=self.op.output_fields)
1661
1662     # Lock all nodes, in shared mode
1663     self.needed_locks = {}
1664     self.share_locks[locking.LEVEL_NODE] = 1
1665     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1666
1667   def CheckPrereq(self):
1668     """Check prerequisites.
1669
1670     """
1671
1672   @staticmethod
1673   def _DiagnoseByOS(node_list, rlist):
1674     """Remaps a per-node return list into an a per-os per-node dictionary
1675
1676     @param node_list: a list with the names of all nodes
1677     @param rlist: a map with node names as keys and OS objects as values
1678
1679     @rtype: dict
1680     @return: a dictionary with osnames as keys and as value another map, with
1681         nodes as keys and list of OS objects as values, eg::
1682
1683           {"debian-etch": {"node1": [<object>,...],
1684                            "node2": [<object>,]}
1685           }
1686
1687     """
1688     all_os = {}
1689     for node_name, nr in rlist.iteritems():
1690       if nr.failed or not nr.data:
1691         continue
1692       for os_obj in nr.data:
1693         if os_obj.name not in all_os:
1694           # build a list of nodes for this os containing empty lists
1695           # for each node in node_list
1696           all_os[os_obj.name] = {}
1697           for nname in node_list:
1698             all_os[os_obj.name][nname] = []
1699         all_os[os_obj.name][node_name].append(os_obj)
1700     return all_os
1701
1702   def Exec(self, feedback_fn):
1703     """Compute the list of OSes.
1704
1705     """
1706     node_list = self.acquired_locks[locking.LEVEL_NODE]
1707     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1708                    if node in node_list]
1709     node_data = self.rpc.call_os_diagnose(valid_nodes)
1710     if node_data == False:
1711       raise errors.OpExecError("Can't gather the list of OSes")
1712     pol = self._DiagnoseByOS(valid_nodes, node_data)
1713     output = []
1714     for os_name, os_data in pol.iteritems():
1715       row = []
1716       for field in self.op.output_fields:
1717         if field == "name":
1718           val = os_name
1719         elif field == "valid":
1720           val = utils.all([osl and osl[0] for osl in os_data.values()])
1721         elif field == "node_status":
1722           val = {}
1723           for node_name, nos_list in os_data.iteritems():
1724             val[node_name] = [(v.status, v.path) for v in nos_list]
1725         else:
1726           raise errors.ParameterError(field)
1727         row.append(val)
1728       output.append(row)
1729
1730     return output
1731
1732
1733 class LURemoveNode(LogicalUnit):
1734   """Logical unit for removing a node.
1735
1736   """
1737   HPATH = "node-remove"
1738   HTYPE = constants.HTYPE_NODE
1739   _OP_REQP = ["node_name"]
1740
1741   def BuildHooksEnv(self):
1742     """Build hooks env.
1743
1744     This doesn't run on the target node in the pre phase as a failed
1745     node would then be impossible to remove.
1746
1747     """
1748     env = {
1749       "OP_TARGET": self.op.node_name,
1750       "NODE_NAME": self.op.node_name,
1751       }
1752     all_nodes = self.cfg.GetNodeList()
1753     all_nodes.remove(self.op.node_name)
1754     return env, all_nodes, all_nodes
1755
1756   def CheckPrereq(self):
1757     """Check prerequisites.
1758
1759     This checks:
1760      - the node exists in the configuration
1761      - it does not have primary or secondary instances
1762      - it's not the master
1763
1764     Any errors are signalled by raising errors.OpPrereqError.
1765
1766     """
1767     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1768     if node is None:
1769       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1770
1771     instance_list = self.cfg.GetInstanceList()
1772
1773     masternode = self.cfg.GetMasterNode()
1774     if node.name == masternode:
1775       raise errors.OpPrereqError("Node is the master node,"
1776                                  " you need to failover first.")
1777
1778     for instance_name in instance_list:
1779       instance = self.cfg.GetInstanceInfo(instance_name)
1780       if node.name in instance.all_nodes:
1781         raise errors.OpPrereqError("Instance %s is still running on the node,"
1782                                    " please remove first." % instance_name)
1783     self.op.node_name = node.name
1784     self.node = node
1785
1786   def Exec(self, feedback_fn):
1787     """Removes the node from the cluster.
1788
1789     """
1790     node = self.node
1791     logging.info("Stopping the node daemon and removing configs from node %s",
1792                  node.name)
1793
1794     self.context.RemoveNode(node.name)
1795
1796     self.rpc.call_node_leave_cluster(node.name)
1797
1798     # Promote nodes to master candidate as needed
1799     _AdjustCandidatePool(self)
1800
1801
1802 class LUQueryNodes(NoHooksLU):
1803   """Logical unit for querying nodes.
1804
1805   """
1806   _OP_REQP = ["output_fields", "names", "use_locking"]
1807   REQ_BGL = False
1808   _FIELDS_DYNAMIC = utils.FieldSet(
1809     "dtotal", "dfree",
1810     "mtotal", "mnode", "mfree",
1811     "bootid",
1812     "ctotal", "cnodes", "csockets",
1813     )
1814
1815   _FIELDS_STATIC = utils.FieldSet(
1816     "name", "pinst_cnt", "sinst_cnt",
1817     "pinst_list", "sinst_list",
1818     "pip", "sip", "tags",
1819     "serial_no",
1820     "master_candidate",
1821     "master",
1822     "offline",
1823     "drained",
1824     )
1825
1826   def ExpandNames(self):
1827     _CheckOutputFields(static=self._FIELDS_STATIC,
1828                        dynamic=self._FIELDS_DYNAMIC,
1829                        selected=self.op.output_fields)
1830
1831     self.needed_locks = {}
1832     self.share_locks[locking.LEVEL_NODE] = 1
1833
1834     if self.op.names:
1835       self.wanted = _GetWantedNodes(self, self.op.names)
1836     else:
1837       self.wanted = locking.ALL_SET
1838
1839     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1840     self.do_locking = self.do_node_query and self.op.use_locking
1841     if self.do_locking:
1842       # if we don't request only static fields, we need to lock the nodes
1843       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1844
1845
1846   def CheckPrereq(self):
1847     """Check prerequisites.
1848
1849     """
1850     # The validation of the node list is done in the _GetWantedNodes,
1851     # if non empty, and if empty, there's no validation to do
1852     pass
1853
1854   def Exec(self, feedback_fn):
1855     """Computes the list of nodes and their attributes.
1856
1857     """
1858     all_info = self.cfg.GetAllNodesInfo()
1859     if self.do_locking:
1860       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1861     elif self.wanted != locking.ALL_SET:
1862       nodenames = self.wanted
1863       missing = set(nodenames).difference(all_info.keys())
1864       if missing:
1865         raise errors.OpExecError(
1866           "Some nodes were removed before retrieving their data: %s" % missing)
1867     else:
1868       nodenames = all_info.keys()
1869
1870     nodenames = utils.NiceSort(nodenames)
1871     nodelist = [all_info[name] for name in nodenames]
1872
1873     # begin data gathering
1874
1875     if self.do_node_query:
1876       live_data = {}
1877       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1878                                           self.cfg.GetHypervisorType())
1879       for name in nodenames:
1880         nodeinfo = node_data[name]
1881         if not nodeinfo.failed and nodeinfo.data:
1882           nodeinfo = nodeinfo.data
1883           fn = utils.TryConvert
1884           live_data[name] = {
1885             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1886             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1887             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1888             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1889             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1890             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1891             "bootid": nodeinfo.get('bootid', None),
1892             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1893             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1894             }
1895         else:
1896           live_data[name] = {}
1897     else:
1898       live_data = dict.fromkeys(nodenames, {})
1899
1900     node_to_primary = dict([(name, set()) for name in nodenames])
1901     node_to_secondary = dict([(name, set()) for name in nodenames])
1902
1903     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1904                              "sinst_cnt", "sinst_list"))
1905     if inst_fields & frozenset(self.op.output_fields):
1906       instancelist = self.cfg.GetInstanceList()
1907
1908       for instance_name in instancelist:
1909         inst = self.cfg.GetInstanceInfo(instance_name)
1910         if inst.primary_node in node_to_primary:
1911           node_to_primary[inst.primary_node].add(inst.name)
1912         for secnode in inst.secondary_nodes:
1913           if secnode in node_to_secondary:
1914             node_to_secondary[secnode].add(inst.name)
1915
1916     master_node = self.cfg.GetMasterNode()
1917
1918     # end data gathering
1919
1920     output = []
1921     for node in nodelist:
1922       node_output = []
1923       for field in self.op.output_fields:
1924         if field == "name":
1925           val = node.name
1926         elif field == "pinst_list":
1927           val = list(node_to_primary[node.name])
1928         elif field == "sinst_list":
1929           val = list(node_to_secondary[node.name])
1930         elif field == "pinst_cnt":
1931           val = len(node_to_primary[node.name])
1932         elif field == "sinst_cnt":
1933           val = len(node_to_secondary[node.name])
1934         elif field == "pip":
1935           val = node.primary_ip
1936         elif field == "sip":
1937           val = node.secondary_ip
1938         elif field == "tags":
1939           val = list(node.GetTags())
1940         elif field == "serial_no":
1941           val = node.serial_no
1942         elif field == "master_candidate":
1943           val = node.master_candidate
1944         elif field == "master":
1945           val = node.name == master_node
1946         elif field == "offline":
1947           val = node.offline
1948         elif field == "drained":
1949           val = node.drained
1950         elif self._FIELDS_DYNAMIC.Matches(field):
1951           val = live_data[node.name].get(field, None)
1952         else:
1953           raise errors.ParameterError(field)
1954         node_output.append(val)
1955       output.append(node_output)
1956
1957     return output
1958
1959
1960 class LUQueryNodeVolumes(NoHooksLU):
1961   """Logical unit for getting volumes on node(s).
1962
1963   """
1964   _OP_REQP = ["nodes", "output_fields"]
1965   REQ_BGL = False
1966   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1967   _FIELDS_STATIC = utils.FieldSet("node")
1968
1969   def ExpandNames(self):
1970     _CheckOutputFields(static=self._FIELDS_STATIC,
1971                        dynamic=self._FIELDS_DYNAMIC,
1972                        selected=self.op.output_fields)
1973
1974     self.needed_locks = {}
1975     self.share_locks[locking.LEVEL_NODE] = 1
1976     if not self.op.nodes:
1977       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1978     else:
1979       self.needed_locks[locking.LEVEL_NODE] = \
1980         _GetWantedNodes(self, self.op.nodes)
1981
1982   def CheckPrereq(self):
1983     """Check prerequisites.
1984
1985     This checks that the fields required are valid output fields.
1986
1987     """
1988     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1989
1990   def Exec(self, feedback_fn):
1991     """Computes the list of nodes and their attributes.
1992
1993     """
1994     nodenames = self.nodes
1995     volumes = self.rpc.call_node_volumes(nodenames)
1996
1997     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1998              in self.cfg.GetInstanceList()]
1999
2000     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2001
2002     output = []
2003     for node in nodenames:
2004       if node not in volumes or volumes[node].failed or not volumes[node].data:
2005         continue
2006
2007       node_vols = volumes[node].data[:]
2008       node_vols.sort(key=lambda vol: vol['dev'])
2009
2010       for vol in node_vols:
2011         node_output = []
2012         for field in self.op.output_fields:
2013           if field == "node":
2014             val = node
2015           elif field == "phys":
2016             val = vol['dev']
2017           elif field == "vg":
2018             val = vol['vg']
2019           elif field == "name":
2020             val = vol['name']
2021           elif field == "size":
2022             val = int(float(vol['size']))
2023           elif field == "instance":
2024             for inst in ilist:
2025               if node not in lv_by_node[inst]:
2026                 continue
2027               if vol['name'] in lv_by_node[inst][node]:
2028                 val = inst.name
2029                 break
2030             else:
2031               val = '-'
2032           else:
2033             raise errors.ParameterError(field)
2034           node_output.append(str(val))
2035
2036         output.append(node_output)
2037
2038     return output
2039
2040
2041 class LUAddNode(LogicalUnit):
2042   """Logical unit for adding node to the cluster.
2043
2044   """
2045   HPATH = "node-add"
2046   HTYPE = constants.HTYPE_NODE
2047   _OP_REQP = ["node_name"]
2048
2049   def BuildHooksEnv(self):
2050     """Build hooks env.
2051
2052     This will run on all nodes before, and on all nodes + the new node after.
2053
2054     """
2055     env = {
2056       "OP_TARGET": self.op.node_name,
2057       "NODE_NAME": self.op.node_name,
2058       "NODE_PIP": self.op.primary_ip,
2059       "NODE_SIP": self.op.secondary_ip,
2060       }
2061     nodes_0 = self.cfg.GetNodeList()
2062     nodes_1 = nodes_0 + [self.op.node_name, ]
2063     return env, nodes_0, nodes_1
2064
2065   def CheckPrereq(self):
2066     """Check prerequisites.
2067
2068     This checks:
2069      - the new node is not already in the config
2070      - it is resolvable
2071      - its parameters (single/dual homed) matches the cluster
2072
2073     Any errors are signalled by raising errors.OpPrereqError.
2074
2075     """
2076     node_name = self.op.node_name
2077     cfg = self.cfg
2078
2079     dns_data = utils.HostInfo(node_name)
2080
2081     node = dns_data.name
2082     primary_ip = self.op.primary_ip = dns_data.ip
2083     secondary_ip = getattr(self.op, "secondary_ip", None)
2084     if secondary_ip is None:
2085       secondary_ip = primary_ip
2086     if not utils.IsValidIP(secondary_ip):
2087       raise errors.OpPrereqError("Invalid secondary IP given")
2088     self.op.secondary_ip = secondary_ip
2089
2090     node_list = cfg.GetNodeList()
2091     if not self.op.readd and node in node_list:
2092       raise errors.OpPrereqError("Node %s is already in the configuration" %
2093                                  node)
2094     elif self.op.readd and node not in node_list:
2095       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2096
2097     for existing_node_name in node_list:
2098       existing_node = cfg.GetNodeInfo(existing_node_name)
2099
2100       if self.op.readd and node == existing_node_name:
2101         if (existing_node.primary_ip != primary_ip or
2102             existing_node.secondary_ip != secondary_ip):
2103           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2104                                      " address configuration as before")
2105         continue
2106
2107       if (existing_node.primary_ip == primary_ip or
2108           existing_node.secondary_ip == primary_ip or
2109           existing_node.primary_ip == secondary_ip or
2110           existing_node.secondary_ip == secondary_ip):
2111         raise errors.OpPrereqError("New node ip address(es) conflict with"
2112                                    " existing node %s" % existing_node.name)
2113
2114     # check that the type of the node (single versus dual homed) is the
2115     # same as for the master
2116     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2117     master_singlehomed = myself.secondary_ip == myself.primary_ip
2118     newbie_singlehomed = secondary_ip == primary_ip
2119     if master_singlehomed != newbie_singlehomed:
2120       if master_singlehomed:
2121         raise errors.OpPrereqError("The master has no private ip but the"
2122                                    " new node has one")
2123       else:
2124         raise errors.OpPrereqError("The master has a private ip but the"
2125                                    " new node doesn't have one")
2126
2127     # checks reachablity
2128     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2129       raise errors.OpPrereqError("Node not reachable by ping")
2130
2131     if not newbie_singlehomed:
2132       # check reachability from my secondary ip to newbie's secondary ip
2133       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2134                            source=myself.secondary_ip):
2135         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2136                                    " based ping to noded port")
2137
2138     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2139     mc_now, _ = self.cfg.GetMasterCandidateStats()
2140     master_candidate = mc_now < cp_size
2141
2142     self.new_node = objects.Node(name=node,
2143                                  primary_ip=primary_ip,
2144                                  secondary_ip=secondary_ip,
2145                                  master_candidate=master_candidate,
2146                                  offline=False, drained=False)
2147
2148   def Exec(self, feedback_fn):
2149     """Adds the new node to the cluster.
2150
2151     """
2152     new_node = self.new_node
2153     node = new_node.name
2154
2155     # check connectivity
2156     result = self.rpc.call_version([node])[node]
2157     result.Raise()
2158     if result.data:
2159       if constants.PROTOCOL_VERSION == result.data:
2160         logging.info("Communication to node %s fine, sw version %s match",
2161                      node, result.data)
2162       else:
2163         raise errors.OpExecError("Version mismatch master version %s,"
2164                                  " node version %s" %
2165                                  (constants.PROTOCOL_VERSION, result.data))
2166     else:
2167       raise errors.OpExecError("Cannot get version from the new node")
2168
2169     # setup ssh on node
2170     logging.info("Copy ssh key to node %s", node)
2171     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2172     keyarray = []
2173     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2174                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2175                 priv_key, pub_key]
2176
2177     for i in keyfiles:
2178       f = open(i, 'r')
2179       try:
2180         keyarray.append(f.read())
2181       finally:
2182         f.close()
2183
2184     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2185                                     keyarray[2],
2186                                     keyarray[3], keyarray[4], keyarray[5])
2187
2188     msg = result.RemoteFailMsg()
2189     if msg:
2190       raise errors.OpExecError("Cannot transfer ssh keys to the"
2191                                " new node: %s" % msg)
2192
2193     # Add node to our /etc/hosts, and add key to known_hosts
2194     utils.AddHostToEtcHosts(new_node.name)
2195
2196     if new_node.secondary_ip != new_node.primary_ip:
2197       result = self.rpc.call_node_has_ip_address(new_node.name,
2198                                                  new_node.secondary_ip)
2199       if result.failed or not result.data:
2200         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2201                                  " you gave (%s). Please fix and re-run this"
2202                                  " command." % new_node.secondary_ip)
2203
2204     node_verify_list = [self.cfg.GetMasterNode()]
2205     node_verify_param = {
2206       'nodelist': [node],
2207       # TODO: do a node-net-test as well?
2208     }
2209
2210     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2211                                        self.cfg.GetClusterName())
2212     for verifier in node_verify_list:
2213       if result[verifier].failed or not result[verifier].data:
2214         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2215                                  " for remote verification" % verifier)
2216       if result[verifier].data['nodelist']:
2217         for failed in result[verifier].data['nodelist']:
2218           feedback_fn("ssh/hostname verification failed %s -> %s" %
2219                       (verifier, result[verifier].data['nodelist'][failed]))
2220         raise errors.OpExecError("ssh/hostname verification failed.")
2221
2222     # Distribute updated /etc/hosts and known_hosts to all nodes,
2223     # including the node just added
2224     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2225     dist_nodes = self.cfg.GetNodeList()
2226     if not self.op.readd:
2227       dist_nodes.append(node)
2228     if myself.name in dist_nodes:
2229       dist_nodes.remove(myself.name)
2230
2231     logging.debug("Copying hosts and known_hosts to all nodes")
2232     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2233       result = self.rpc.call_upload_file(dist_nodes, fname)
2234       for to_node, to_result in result.iteritems():
2235         if to_result.failed or not to_result.data:
2236           logging.error("Copy of file %s to node %s failed", fname, to_node)
2237
2238     to_copy = []
2239     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2240     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2241       to_copy.append(constants.VNC_PASSWORD_FILE)
2242
2243     for fname in to_copy:
2244       result = self.rpc.call_upload_file([node], fname)
2245       if result[node].failed or not result[node]:
2246         logging.error("Could not copy file %s to node %s", fname, node)
2247
2248     if self.op.readd:
2249       self.context.ReaddNode(new_node)
2250     else:
2251       self.context.AddNode(new_node)
2252
2253
2254 class LUSetNodeParams(LogicalUnit):
2255   """Modifies the parameters of a node.
2256
2257   """
2258   HPATH = "node-modify"
2259   HTYPE = constants.HTYPE_NODE
2260   _OP_REQP = ["node_name"]
2261   REQ_BGL = False
2262
2263   def CheckArguments(self):
2264     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2265     if node_name is None:
2266       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2267     self.op.node_name = node_name
2268     _CheckBooleanOpField(self.op, 'master_candidate')
2269     _CheckBooleanOpField(self.op, 'offline')
2270     _CheckBooleanOpField(self.op, 'drained')
2271     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2272     if all_mods.count(None) == 3:
2273       raise errors.OpPrereqError("Please pass at least one modification")
2274     if all_mods.count(True) > 1:
2275       raise errors.OpPrereqError("Can't set the node into more than one"
2276                                  " state at the same time")
2277
2278   def ExpandNames(self):
2279     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2280
2281   def BuildHooksEnv(self):
2282     """Build hooks env.
2283
2284     This runs on the master node.
2285
2286     """
2287     env = {
2288       "OP_TARGET": self.op.node_name,
2289       "MASTER_CANDIDATE": str(self.op.master_candidate),
2290       "OFFLINE": str(self.op.offline),
2291       "DRAINED": str(self.op.drained),
2292       }
2293     nl = [self.cfg.GetMasterNode(),
2294           self.op.node_name]
2295     return env, nl, nl
2296
2297   def CheckPrereq(self):
2298     """Check prerequisites.
2299
2300     This only checks the instance list against the existing names.
2301
2302     """
2303     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2304
2305     if ((self.op.master_candidate == False or self.op.offline == True or
2306          self.op.drained == True) and node.master_candidate):
2307       # we will demote the node from master_candidate
2308       if self.op.node_name == self.cfg.GetMasterNode():
2309         raise errors.OpPrereqError("The master node has to be a"
2310                                    " master candidate, online and not drained")
2311       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2312       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2313       if num_candidates <= cp_size:
2314         msg = ("Not enough master candidates (desired"
2315                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2316         if self.op.force:
2317           self.LogWarning(msg)
2318         else:
2319           raise errors.OpPrereqError(msg)
2320
2321     if (self.op.master_candidate == True and
2322         ((node.offline and not self.op.offline == False) or
2323          (node.drained and not self.op.drained == False))):
2324       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2325                                  " to master_candidate")
2326
2327     return
2328
2329   def Exec(self, feedback_fn):
2330     """Modifies a node.
2331
2332     """
2333     node = self.node
2334
2335     result = []
2336     changed_mc = False
2337
2338     if self.op.offline is not None:
2339       node.offline = self.op.offline
2340       result.append(("offline", str(self.op.offline)))
2341       if self.op.offline == True:
2342         if node.master_candidate:
2343           node.master_candidate = False
2344           changed_mc = True
2345           result.append(("master_candidate", "auto-demotion due to offline"))
2346         if node.drained:
2347           node.drained = False
2348           result.append(("drained", "clear drained status due to offline"))
2349
2350     if self.op.master_candidate is not None:
2351       node.master_candidate = self.op.master_candidate
2352       changed_mc = True
2353       result.append(("master_candidate", str(self.op.master_candidate)))
2354       if self.op.master_candidate == False:
2355         rrc = self.rpc.call_node_demote_from_mc(node.name)
2356         msg = rrc.RemoteFailMsg()
2357         if msg:
2358           self.LogWarning("Node failed to demote itself: %s" % msg)
2359
2360     if self.op.drained is not None:
2361       node.drained = self.op.drained
2362       result.append(("drained", str(self.op.drained)))
2363       if self.op.drained == True:
2364         if node.master_candidate:
2365           node.master_candidate = False
2366           changed_mc = True
2367           result.append(("master_candidate", "auto-demotion due to drain"))
2368         if node.offline:
2369           node.offline = False
2370           result.append(("offline", "clear offline status due to drain"))
2371
2372     # this will trigger configuration file update, if needed
2373     self.cfg.Update(node)
2374     # this will trigger job queue propagation or cleanup
2375     if changed_mc:
2376       self.context.ReaddNode(node)
2377
2378     return result
2379
2380
2381 class LUQueryClusterInfo(NoHooksLU):
2382   """Query cluster configuration.
2383
2384   """
2385   _OP_REQP = []
2386   REQ_BGL = False
2387
2388   def ExpandNames(self):
2389     self.needed_locks = {}
2390
2391   def CheckPrereq(self):
2392     """No prerequsites needed for this LU.
2393
2394     """
2395     pass
2396
2397   def Exec(self, feedback_fn):
2398     """Return cluster config.
2399
2400     """
2401     cluster = self.cfg.GetClusterInfo()
2402     result = {
2403       "software_version": constants.RELEASE_VERSION,
2404       "protocol_version": constants.PROTOCOL_VERSION,
2405       "config_version": constants.CONFIG_VERSION,
2406       "os_api_version": constants.OS_API_VERSION,
2407       "export_version": constants.EXPORT_VERSION,
2408       "architecture": (platform.architecture()[0], platform.machine()),
2409       "name": cluster.cluster_name,
2410       "master": cluster.master_node,
2411       "default_hypervisor": cluster.default_hypervisor,
2412       "enabled_hypervisors": cluster.enabled_hypervisors,
2413       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2414                         for hypervisor in cluster.enabled_hypervisors]),
2415       "beparams": cluster.beparams,
2416       "candidate_pool_size": cluster.candidate_pool_size,
2417       }
2418
2419     return result
2420
2421
2422 class LUQueryConfigValues(NoHooksLU):
2423   """Return configuration values.
2424
2425   """
2426   _OP_REQP = []
2427   REQ_BGL = False
2428   _FIELDS_DYNAMIC = utils.FieldSet()
2429   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2430
2431   def ExpandNames(self):
2432     self.needed_locks = {}
2433
2434     _CheckOutputFields(static=self._FIELDS_STATIC,
2435                        dynamic=self._FIELDS_DYNAMIC,
2436                        selected=self.op.output_fields)
2437
2438   def CheckPrereq(self):
2439     """No prerequisites.
2440
2441     """
2442     pass
2443
2444   def Exec(self, feedback_fn):
2445     """Dump a representation of the cluster config to the standard output.
2446
2447     """
2448     values = []
2449     for field in self.op.output_fields:
2450       if field == "cluster_name":
2451         entry = self.cfg.GetClusterName()
2452       elif field == "master_node":
2453         entry = self.cfg.GetMasterNode()
2454       elif field == "drain_flag":
2455         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2456       else:
2457         raise errors.ParameterError(field)
2458       values.append(entry)
2459     return values
2460
2461
2462 class LUActivateInstanceDisks(NoHooksLU):
2463   """Bring up an instance's disks.
2464
2465   """
2466   _OP_REQP = ["instance_name"]
2467   REQ_BGL = False
2468
2469   def ExpandNames(self):
2470     self._ExpandAndLockInstance()
2471     self.needed_locks[locking.LEVEL_NODE] = []
2472     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2473
2474   def DeclareLocks(self, level):
2475     if level == locking.LEVEL_NODE:
2476       self._LockInstancesNodes()
2477
2478   def CheckPrereq(self):
2479     """Check prerequisites.
2480
2481     This checks that the instance is in the cluster.
2482
2483     """
2484     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2485     assert self.instance is not None, \
2486       "Cannot retrieve locked instance %s" % self.op.instance_name
2487     _CheckNodeOnline(self, self.instance.primary_node)
2488
2489   def Exec(self, feedback_fn):
2490     """Activate the disks.
2491
2492     """
2493     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2494     if not disks_ok:
2495       raise errors.OpExecError("Cannot activate block devices")
2496
2497     return disks_info
2498
2499
2500 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2501   """Prepare the block devices for an instance.
2502
2503   This sets up the block devices on all nodes.
2504
2505   @type lu: L{LogicalUnit}
2506   @param lu: the logical unit on whose behalf we execute
2507   @type instance: L{objects.Instance}
2508   @param instance: the instance for whose disks we assemble
2509   @type ignore_secondaries: boolean
2510   @param ignore_secondaries: if true, errors on secondary nodes
2511       won't result in an error return from the function
2512   @return: False if the operation failed, otherwise a list of
2513       (host, instance_visible_name, node_visible_name)
2514       with the mapping from node devices to instance devices
2515
2516   """
2517   device_info = []
2518   disks_ok = True
2519   iname = instance.name
2520   # With the two passes mechanism we try to reduce the window of
2521   # opportunity for the race condition of switching DRBD to primary
2522   # before handshaking occured, but we do not eliminate it
2523
2524   # The proper fix would be to wait (with some limits) until the
2525   # connection has been made and drbd transitions from WFConnection
2526   # into any other network-connected state (Connected, SyncTarget,
2527   # SyncSource, etc.)
2528
2529   # 1st pass, assemble on all nodes in secondary mode
2530   for inst_disk in instance.disks:
2531     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2532       lu.cfg.SetDiskID(node_disk, node)
2533       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2534       msg = result.RemoteFailMsg()
2535       if msg:
2536         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2537                            " (is_primary=False, pass=1): %s",
2538                            inst_disk.iv_name, node, msg)
2539         if not ignore_secondaries:
2540           disks_ok = False
2541
2542   # FIXME: race condition on drbd migration to primary
2543
2544   # 2nd pass, do only the primary node
2545   for inst_disk in instance.disks:
2546     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2547       if node != instance.primary_node:
2548         continue
2549       lu.cfg.SetDiskID(node_disk, node)
2550       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2551       msg = result.RemoteFailMsg()
2552       if msg:
2553         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2554                            " (is_primary=True, pass=2): %s",
2555                            inst_disk.iv_name, node, msg)
2556         disks_ok = False
2557     device_info.append((instance.primary_node, inst_disk.iv_name,
2558                         result.payload))
2559
2560   # leave the disks configured for the primary node
2561   # this is a workaround that would be fixed better by
2562   # improving the logical/physical id handling
2563   for disk in instance.disks:
2564     lu.cfg.SetDiskID(disk, instance.primary_node)
2565
2566   return disks_ok, device_info
2567
2568
2569 def _StartInstanceDisks(lu, instance, force):
2570   """Start the disks of an instance.
2571
2572   """
2573   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2574                                            ignore_secondaries=force)
2575   if not disks_ok:
2576     _ShutdownInstanceDisks(lu, instance)
2577     if force is not None and not force:
2578       lu.proc.LogWarning("", hint="If the message above refers to a"
2579                          " secondary node,"
2580                          " you can retry the operation using '--force'.")
2581     raise errors.OpExecError("Disk consistency error")
2582
2583
2584 class LUDeactivateInstanceDisks(NoHooksLU):
2585   """Shutdown an instance's disks.
2586
2587   """
2588   _OP_REQP = ["instance_name"]
2589   REQ_BGL = False
2590
2591   def ExpandNames(self):
2592     self._ExpandAndLockInstance()
2593     self.needed_locks[locking.LEVEL_NODE] = []
2594     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2595
2596   def DeclareLocks(self, level):
2597     if level == locking.LEVEL_NODE:
2598       self._LockInstancesNodes()
2599
2600   def CheckPrereq(self):
2601     """Check prerequisites.
2602
2603     This checks that the instance is in the cluster.
2604
2605     """
2606     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2607     assert self.instance is not None, \
2608       "Cannot retrieve locked instance %s" % self.op.instance_name
2609
2610   def Exec(self, feedback_fn):
2611     """Deactivate the disks
2612
2613     """
2614     instance = self.instance
2615     _SafeShutdownInstanceDisks(self, instance)
2616
2617
2618 def _SafeShutdownInstanceDisks(lu, instance):
2619   """Shutdown block devices of an instance.
2620
2621   This function checks if an instance is running, before calling
2622   _ShutdownInstanceDisks.
2623
2624   """
2625   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2626                                       [instance.hypervisor])
2627   ins_l = ins_l[instance.primary_node]
2628   if ins_l.failed or not isinstance(ins_l.data, list):
2629     raise errors.OpExecError("Can't contact node '%s'" %
2630                              instance.primary_node)
2631
2632   if instance.name in ins_l.data:
2633     raise errors.OpExecError("Instance is running, can't shutdown"
2634                              " block devices.")
2635
2636   _ShutdownInstanceDisks(lu, instance)
2637
2638
2639 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2640   """Shutdown block devices of an instance.
2641
2642   This does the shutdown on all nodes of the instance.
2643
2644   If the ignore_primary is false, errors on the primary node are
2645   ignored.
2646
2647   """
2648   all_result = True
2649   for disk in instance.disks:
2650     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2651       lu.cfg.SetDiskID(top_disk, node)
2652       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2653       msg = result.RemoteFailMsg()
2654       if msg:
2655         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2656                       disk.iv_name, node, msg)
2657         if not ignore_primary or node != instance.primary_node:
2658           all_result = False
2659   return all_result
2660
2661
2662 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2663   """Checks if a node has enough free memory.
2664
2665   This function check if a given node has the needed amount of free
2666   memory. In case the node has less memory or we cannot get the
2667   information from the node, this function raise an OpPrereqError
2668   exception.
2669
2670   @type lu: C{LogicalUnit}
2671   @param lu: a logical unit from which we get configuration data
2672   @type node: C{str}
2673   @param node: the node to check
2674   @type reason: C{str}
2675   @param reason: string to use in the error message
2676   @type requested: C{int}
2677   @param requested: the amount of memory in MiB to check for
2678   @type hypervisor_name: C{str}
2679   @param hypervisor_name: the hypervisor to ask for memory stats
2680   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2681       we cannot check the node
2682
2683   """
2684   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2685   nodeinfo[node].Raise()
2686   free_mem = nodeinfo[node].data.get('memory_free')
2687   if not isinstance(free_mem, int):
2688     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2689                              " was '%s'" % (node, free_mem))
2690   if requested > free_mem:
2691     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2692                              " needed %s MiB, available %s MiB" %
2693                              (node, reason, requested, free_mem))
2694
2695
2696 class LUStartupInstance(LogicalUnit):
2697   """Starts an instance.
2698
2699   """
2700   HPATH = "instance-start"
2701   HTYPE = constants.HTYPE_INSTANCE
2702   _OP_REQP = ["instance_name", "force"]
2703   REQ_BGL = False
2704
2705   def ExpandNames(self):
2706     self._ExpandAndLockInstance()
2707
2708   def BuildHooksEnv(self):
2709     """Build hooks env.
2710
2711     This runs on master, primary and secondary nodes of the instance.
2712
2713     """
2714     env = {
2715       "FORCE": self.op.force,
2716       }
2717     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2718     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2719     return env, nl, nl
2720
2721   def CheckPrereq(self):
2722     """Check prerequisites.
2723
2724     This checks that the instance is in the cluster.
2725
2726     """
2727     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2728     assert self.instance is not None, \
2729       "Cannot retrieve locked instance %s" % self.op.instance_name
2730
2731     _CheckNodeOnline(self, instance.primary_node)
2732
2733     bep = self.cfg.GetClusterInfo().FillBE(instance)
2734     # check bridges existance
2735     _CheckInstanceBridgesExist(self, instance)
2736
2737     _CheckNodeFreeMemory(self, instance.primary_node,
2738                          "starting instance %s" % instance.name,
2739                          bep[constants.BE_MEMORY], instance.hypervisor)
2740
2741   def Exec(self, feedback_fn):
2742     """Start the instance.
2743
2744     """
2745     instance = self.instance
2746     force = self.op.force
2747
2748     self.cfg.MarkInstanceUp(instance.name)
2749
2750     node_current = instance.primary_node
2751
2752     _StartInstanceDisks(self, instance, force)
2753
2754     result = self.rpc.call_instance_start(node_current, instance)
2755     msg = result.RemoteFailMsg()
2756     if msg:
2757       _ShutdownInstanceDisks(self, instance)
2758       raise errors.OpExecError("Could not start instance: %s" % msg)
2759
2760
2761 class LURebootInstance(LogicalUnit):
2762   """Reboot an instance.
2763
2764   """
2765   HPATH = "instance-reboot"
2766   HTYPE = constants.HTYPE_INSTANCE
2767   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2768   REQ_BGL = False
2769
2770   def ExpandNames(self):
2771     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2772                                    constants.INSTANCE_REBOOT_HARD,
2773                                    constants.INSTANCE_REBOOT_FULL]:
2774       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2775                                   (constants.INSTANCE_REBOOT_SOFT,
2776                                    constants.INSTANCE_REBOOT_HARD,
2777                                    constants.INSTANCE_REBOOT_FULL))
2778     self._ExpandAndLockInstance()
2779
2780   def BuildHooksEnv(self):
2781     """Build hooks env.
2782
2783     This runs on master, primary and secondary nodes of the instance.
2784
2785     """
2786     env = {
2787       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2788       "REBOOT_TYPE": self.op.reboot_type,
2789       }
2790     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2791     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2792     return env, nl, nl
2793
2794   def CheckPrereq(self):
2795     """Check prerequisites.
2796
2797     This checks that the instance is in the cluster.
2798
2799     """
2800     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2801     assert self.instance is not None, \
2802       "Cannot retrieve locked instance %s" % self.op.instance_name
2803
2804     _CheckNodeOnline(self, instance.primary_node)
2805
2806     # check bridges existance
2807     _CheckInstanceBridgesExist(self, instance)
2808
2809   def Exec(self, feedback_fn):
2810     """Reboot the instance.
2811
2812     """
2813     instance = self.instance
2814     ignore_secondaries = self.op.ignore_secondaries
2815     reboot_type = self.op.reboot_type
2816
2817     node_current = instance.primary_node
2818
2819     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2820                        constants.INSTANCE_REBOOT_HARD]:
2821       for disk in instance.disks:
2822         self.cfg.SetDiskID(disk, node_current)
2823       result = self.rpc.call_instance_reboot(node_current, instance,
2824                                              reboot_type)
2825       msg = result.RemoteFailMsg()
2826       if msg:
2827         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2828     else:
2829       result = self.rpc.call_instance_shutdown(node_current, instance)
2830       msg = result.RemoteFailMsg()
2831       if msg:
2832         raise errors.OpExecError("Could not shutdown instance for"
2833                                  " full reboot: %s" % msg)
2834       _ShutdownInstanceDisks(self, instance)
2835       _StartInstanceDisks(self, instance, ignore_secondaries)
2836       result = self.rpc.call_instance_start(node_current, instance)
2837       msg = result.RemoteFailMsg()
2838       if msg:
2839         _ShutdownInstanceDisks(self, instance)
2840         raise errors.OpExecError("Could not start instance for"
2841                                  " full reboot: %s" % msg)
2842
2843     self.cfg.MarkInstanceUp(instance.name)
2844
2845
2846 class LUShutdownInstance(LogicalUnit):
2847   """Shutdown an instance.
2848
2849   """
2850   HPATH = "instance-stop"
2851   HTYPE = constants.HTYPE_INSTANCE
2852   _OP_REQP = ["instance_name"]
2853   REQ_BGL = False
2854
2855   def ExpandNames(self):
2856     self._ExpandAndLockInstance()
2857
2858   def BuildHooksEnv(self):
2859     """Build hooks env.
2860
2861     This runs on master, primary and secondary nodes of the instance.
2862
2863     """
2864     env = _BuildInstanceHookEnvByObject(self, self.instance)
2865     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2866     return env, nl, nl
2867
2868   def CheckPrereq(self):
2869     """Check prerequisites.
2870
2871     This checks that the instance is in the cluster.
2872
2873     """
2874     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2875     assert self.instance is not None, \
2876       "Cannot retrieve locked instance %s" % self.op.instance_name
2877     _CheckNodeOnline(self, self.instance.primary_node)
2878
2879   def Exec(self, feedback_fn):
2880     """Shutdown the instance.
2881
2882     """
2883     instance = self.instance
2884     node_current = instance.primary_node
2885     self.cfg.MarkInstanceDown(instance.name)
2886     result = self.rpc.call_instance_shutdown(node_current, instance)
2887     msg = result.RemoteFailMsg()
2888     if msg:
2889       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2890
2891     _ShutdownInstanceDisks(self, instance)
2892
2893
2894 class LUReinstallInstance(LogicalUnit):
2895   """Reinstall an instance.
2896
2897   """
2898   HPATH = "instance-reinstall"
2899   HTYPE = constants.HTYPE_INSTANCE
2900   _OP_REQP = ["instance_name"]
2901   REQ_BGL = False
2902
2903   def ExpandNames(self):
2904     self._ExpandAndLockInstance()
2905
2906   def BuildHooksEnv(self):
2907     """Build hooks env.
2908
2909     This runs on master, primary and secondary nodes of the instance.
2910
2911     """
2912     env = _BuildInstanceHookEnvByObject(self, self.instance)
2913     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2914     return env, nl, nl
2915
2916   def CheckPrereq(self):
2917     """Check prerequisites.
2918
2919     This checks that the instance is in the cluster and is not running.
2920
2921     """
2922     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2923     assert instance is not None, \
2924       "Cannot retrieve locked instance %s" % self.op.instance_name
2925     _CheckNodeOnline(self, instance.primary_node)
2926
2927     if instance.disk_template == constants.DT_DISKLESS:
2928       raise errors.OpPrereqError("Instance '%s' has no disks" %
2929                                  self.op.instance_name)
2930     if instance.admin_up:
2931       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2932                                  self.op.instance_name)
2933     remote_info = self.rpc.call_instance_info(instance.primary_node,
2934                                               instance.name,
2935                                               instance.hypervisor)
2936     if remote_info.failed or remote_info.data:
2937       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2938                                  (self.op.instance_name,
2939                                   instance.primary_node))
2940
2941     self.op.os_type = getattr(self.op, "os_type", None)
2942     if self.op.os_type is not None:
2943       # OS verification
2944       pnode = self.cfg.GetNodeInfo(
2945         self.cfg.ExpandNodeName(instance.primary_node))
2946       if pnode is None:
2947         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2948                                    self.op.pnode)
2949       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2950       result.Raise()
2951       if not isinstance(result.data, objects.OS):
2952         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2953                                    " primary node"  % self.op.os_type)
2954
2955     self.instance = instance
2956
2957   def Exec(self, feedback_fn):
2958     """Reinstall the instance.
2959
2960     """
2961     inst = self.instance
2962
2963     if self.op.os_type is not None:
2964       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2965       inst.os = self.op.os_type
2966       self.cfg.Update(inst)
2967
2968     _StartInstanceDisks(self, inst, None)
2969     try:
2970       feedback_fn("Running the instance OS create scripts...")
2971       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2972       msg = result.RemoteFailMsg()
2973       if msg:
2974         raise errors.OpExecError("Could not install OS for instance %s"
2975                                  " on node %s: %s" %
2976                                  (inst.name, inst.primary_node, msg))
2977     finally:
2978       _ShutdownInstanceDisks(self, inst)
2979
2980
2981 class LURenameInstance(LogicalUnit):
2982   """Rename an instance.
2983
2984   """
2985   HPATH = "instance-rename"
2986   HTYPE = constants.HTYPE_INSTANCE
2987   _OP_REQP = ["instance_name", "new_name"]
2988
2989   def BuildHooksEnv(self):
2990     """Build hooks env.
2991
2992     This runs on master, primary and secondary nodes of the instance.
2993
2994     """
2995     env = _BuildInstanceHookEnvByObject(self, self.instance)
2996     env["INSTANCE_NEW_NAME"] = self.op.new_name
2997     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2998     return env, nl, nl
2999
3000   def CheckPrereq(self):
3001     """Check prerequisites.
3002
3003     This checks that the instance is in the cluster and is not running.
3004
3005     """
3006     instance = self.cfg.GetInstanceInfo(
3007       self.cfg.ExpandInstanceName(self.op.instance_name))
3008     if instance is None:
3009       raise errors.OpPrereqError("Instance '%s' not known" %
3010                                  self.op.instance_name)
3011     _CheckNodeOnline(self, instance.primary_node)
3012
3013     if instance.admin_up:
3014       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3015                                  self.op.instance_name)
3016     remote_info = self.rpc.call_instance_info(instance.primary_node,
3017                                               instance.name,
3018                                               instance.hypervisor)
3019     remote_info.Raise()
3020     if remote_info.data:
3021       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3022                                  (self.op.instance_name,
3023                                   instance.primary_node))
3024     self.instance = instance
3025
3026     # new name verification
3027     name_info = utils.HostInfo(self.op.new_name)
3028
3029     self.op.new_name = new_name = name_info.name
3030     instance_list = self.cfg.GetInstanceList()
3031     if new_name in instance_list:
3032       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3033                                  new_name)
3034
3035     if not getattr(self.op, "ignore_ip", False):
3036       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3037         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3038                                    (name_info.ip, new_name))
3039
3040
3041   def Exec(self, feedback_fn):
3042     """Reinstall the instance.
3043
3044     """
3045     inst = self.instance
3046     old_name = inst.name
3047
3048     if inst.disk_template == constants.DT_FILE:
3049       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3050
3051     self.cfg.RenameInstance(inst.name, self.op.new_name)
3052     # Change the instance lock. This is definitely safe while we hold the BGL
3053     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3054     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3055
3056     # re-read the instance from the configuration after rename
3057     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3058
3059     if inst.disk_template == constants.DT_FILE:
3060       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3061       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3062                                                      old_file_storage_dir,
3063                                                      new_file_storage_dir)
3064       result.Raise()
3065       if not result.data:
3066         raise errors.OpExecError("Could not connect to node '%s' to rename"
3067                                  " directory '%s' to '%s' (but the instance"
3068                                  " has been renamed in Ganeti)" % (
3069                                  inst.primary_node, old_file_storage_dir,
3070                                  new_file_storage_dir))
3071
3072       if not result.data[0]:
3073         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3074                                  " (but the instance has been renamed in"
3075                                  " Ganeti)" % (old_file_storage_dir,
3076                                                new_file_storage_dir))
3077
3078     _StartInstanceDisks(self, inst, None)
3079     try:
3080       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3081                                                  old_name)
3082       msg = result.RemoteFailMsg()
3083       if msg:
3084         msg = ("Could not run OS rename script for instance %s on node %s"
3085                " (but the instance has been renamed in Ganeti): %s" %
3086                (inst.name, inst.primary_node, msg))
3087         self.proc.LogWarning(msg)
3088     finally:
3089       _ShutdownInstanceDisks(self, inst)
3090
3091
3092 class LURemoveInstance(LogicalUnit):
3093   """Remove an instance.
3094
3095   """
3096   HPATH = "instance-remove"
3097   HTYPE = constants.HTYPE_INSTANCE
3098   _OP_REQP = ["instance_name", "ignore_failures"]
3099   REQ_BGL = False
3100
3101   def ExpandNames(self):
3102     self._ExpandAndLockInstance()
3103     self.needed_locks[locking.LEVEL_NODE] = []
3104     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3105
3106   def DeclareLocks(self, level):
3107     if level == locking.LEVEL_NODE:
3108       self._LockInstancesNodes()
3109
3110   def BuildHooksEnv(self):
3111     """Build hooks env.
3112
3113     This runs on master, primary and secondary nodes of the instance.
3114
3115     """
3116     env = _BuildInstanceHookEnvByObject(self, self.instance)
3117     nl = [self.cfg.GetMasterNode()]
3118     return env, nl, nl
3119
3120   def CheckPrereq(self):
3121     """Check prerequisites.
3122
3123     This checks that the instance is in the cluster.
3124
3125     """
3126     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3127     assert self.instance is not None, \
3128       "Cannot retrieve locked instance %s" % self.op.instance_name
3129
3130   def Exec(self, feedback_fn):
3131     """Remove the instance.
3132
3133     """
3134     instance = self.instance
3135     logging.info("Shutting down instance %s on node %s",
3136                  instance.name, instance.primary_node)
3137
3138     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3139     msg = result.RemoteFailMsg()
3140     if msg:
3141       if self.op.ignore_failures:
3142         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3143       else:
3144         raise errors.OpExecError("Could not shutdown instance %s on"
3145                                  " node %s: %s" %
3146                                  (instance.name, instance.primary_node, msg))
3147
3148     logging.info("Removing block devices for instance %s", instance.name)
3149
3150     if not _RemoveDisks(self, instance):
3151       if self.op.ignore_failures:
3152         feedback_fn("Warning: can't remove instance's disks")
3153       else:
3154         raise errors.OpExecError("Can't remove instance's disks")
3155
3156     logging.info("Removing instance %s out of cluster config", instance.name)
3157
3158     self.cfg.RemoveInstance(instance.name)
3159     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3160
3161
3162 class LUQueryInstances(NoHooksLU):
3163   """Logical unit for querying instances.
3164
3165   """
3166   _OP_REQP = ["output_fields", "names", "use_locking"]
3167   REQ_BGL = False
3168   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3169                                     "admin_state",
3170                                     "disk_template", "ip", "mac", "bridge",
3171                                     "sda_size", "sdb_size", "vcpus", "tags",
3172                                     "network_port", "beparams",
3173                                     r"(disk)\.(size)/([0-9]+)",
3174                                     r"(disk)\.(sizes)", "disk_usage",
3175                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3176                                     r"(nic)\.(macs|ips|bridges)",
3177                                     r"(disk|nic)\.(count)",
3178                                     "serial_no", "hypervisor", "hvparams",] +
3179                                   ["hv/%s" % name
3180                                    for name in constants.HVS_PARAMETERS] +
3181                                   ["be/%s" % name
3182                                    for name in constants.BES_PARAMETERS])
3183   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3184
3185
3186   def ExpandNames(self):
3187     _CheckOutputFields(static=self._FIELDS_STATIC,
3188                        dynamic=self._FIELDS_DYNAMIC,
3189                        selected=self.op.output_fields)
3190
3191     self.needed_locks = {}
3192     self.share_locks[locking.LEVEL_INSTANCE] = 1
3193     self.share_locks[locking.LEVEL_NODE] = 1
3194
3195     if self.op.names:
3196       self.wanted = _GetWantedInstances(self, self.op.names)
3197     else:
3198       self.wanted = locking.ALL_SET
3199
3200     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3201     self.do_locking = self.do_node_query and self.op.use_locking
3202     if self.do_locking:
3203       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3204       self.needed_locks[locking.LEVEL_NODE] = []
3205       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3206
3207   def DeclareLocks(self, level):
3208     if level == locking.LEVEL_NODE and self.do_locking:
3209       self._LockInstancesNodes()
3210
3211   def CheckPrereq(self):
3212     """Check prerequisites.
3213
3214     """
3215     pass
3216
3217   def Exec(self, feedback_fn):
3218     """Computes the list of nodes and their attributes.
3219
3220     """
3221     all_info = self.cfg.GetAllInstancesInfo()
3222     if self.wanted == locking.ALL_SET:
3223       # caller didn't specify instance names, so ordering is not important
3224       if self.do_locking:
3225         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3226       else:
3227         instance_names = all_info.keys()
3228       instance_names = utils.NiceSort(instance_names)
3229     else:
3230       # caller did specify names, so we must keep the ordering
3231       if self.do_locking:
3232         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3233       else:
3234         tgt_set = all_info.keys()
3235       missing = set(self.wanted).difference(tgt_set)
3236       if missing:
3237         raise errors.OpExecError("Some instances were removed before"
3238                                  " retrieving their data: %s" % missing)
3239       instance_names = self.wanted
3240
3241     instance_list = [all_info[iname] for iname in instance_names]
3242
3243     # begin data gathering
3244
3245     nodes = frozenset([inst.primary_node for inst in instance_list])
3246     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3247
3248     bad_nodes = []
3249     off_nodes = []
3250     if self.do_node_query:
3251       live_data = {}
3252       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3253       for name in nodes:
3254         result = node_data[name]
3255         if result.offline:
3256           # offline nodes will be in both lists
3257           off_nodes.append(name)
3258         if result.failed:
3259           bad_nodes.append(name)
3260         else:
3261           if result.data:
3262             live_data.update(result.data)
3263             # else no instance is alive
3264     else:
3265       live_data = dict([(name, {}) for name in instance_names])
3266
3267     # end data gathering
3268
3269     HVPREFIX = "hv/"
3270     BEPREFIX = "be/"
3271     output = []
3272     for instance in instance_list:
3273       iout = []
3274       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3275       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3276       for field in self.op.output_fields:
3277         st_match = self._FIELDS_STATIC.Matches(field)
3278         if field == "name":
3279           val = instance.name
3280         elif field == "os":
3281           val = instance.os
3282         elif field == "pnode":
3283           val = instance.primary_node
3284         elif field == "snodes":
3285           val = list(instance.secondary_nodes)
3286         elif field == "admin_state":
3287           val = instance.admin_up
3288         elif field == "oper_state":
3289           if instance.primary_node in bad_nodes:
3290             val = None
3291           else:
3292             val = bool(live_data.get(instance.name))
3293         elif field == "status":
3294           if instance.primary_node in off_nodes:
3295             val = "ERROR_nodeoffline"
3296           elif instance.primary_node in bad_nodes:
3297             val = "ERROR_nodedown"
3298           else:
3299             running = bool(live_data.get(instance.name))
3300             if running:
3301               if instance.admin_up:
3302                 val = "running"
3303               else:
3304                 val = "ERROR_up"
3305             else:
3306               if instance.admin_up:
3307                 val = "ERROR_down"
3308               else:
3309                 val = "ADMIN_down"
3310         elif field == "oper_ram":
3311           if instance.primary_node in bad_nodes:
3312             val = None
3313           elif instance.name in live_data:
3314             val = live_data[instance.name].get("memory", "?")
3315           else:
3316             val = "-"
3317         elif field == "disk_template":
3318           val = instance.disk_template
3319         elif field == "ip":
3320           val = instance.nics[0].ip
3321         elif field == "bridge":
3322           val = instance.nics[0].bridge
3323         elif field == "mac":
3324           val = instance.nics[0].mac
3325         elif field == "sda_size" or field == "sdb_size":
3326           idx = ord(field[2]) - ord('a')
3327           try:
3328             val = instance.FindDisk(idx).size
3329           except errors.OpPrereqError:
3330             val = None
3331         elif field == "disk_usage": # total disk usage per node
3332           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3333           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3334         elif field == "tags":
3335           val = list(instance.GetTags())
3336         elif field == "serial_no":
3337           val = instance.serial_no
3338         elif field == "network_port":
3339           val = instance.network_port
3340         elif field == "hypervisor":
3341           val = instance.hypervisor
3342         elif field == "hvparams":
3343           val = i_hv
3344         elif (field.startswith(HVPREFIX) and
3345               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3346           val = i_hv.get(field[len(HVPREFIX):], None)
3347         elif field == "beparams":
3348           val = i_be
3349         elif (field.startswith(BEPREFIX) and
3350               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3351           val = i_be.get(field[len(BEPREFIX):], None)
3352         elif st_match and st_match.groups():
3353           # matches a variable list
3354           st_groups = st_match.groups()
3355           if st_groups and st_groups[0] == "disk":
3356             if st_groups[1] == "count":
3357               val = len(instance.disks)
3358             elif st_groups[1] == "sizes":
3359               val = [disk.size for disk in instance.disks]
3360             elif st_groups[1] == "size":
3361               try:
3362                 val = instance.FindDisk(st_groups[2]).size
3363               except errors.OpPrereqError:
3364                 val = None
3365             else:
3366               assert False, "Unhandled disk parameter"
3367           elif st_groups[0] == "nic":
3368             if st_groups[1] == "count":
3369               val = len(instance.nics)
3370             elif st_groups[1] == "macs":
3371               val = [nic.mac for nic in instance.nics]
3372             elif st_groups[1] == "ips":
3373               val = [nic.ip for nic in instance.nics]
3374             elif st_groups[1] == "bridges":
3375               val = [nic.bridge for nic in instance.nics]
3376             else:
3377               # index-based item
3378               nic_idx = int(st_groups[2])
3379               if nic_idx >= len(instance.nics):
3380                 val = None
3381               else:
3382                 if st_groups[1] == "mac":
3383                   val = instance.nics[nic_idx].mac
3384                 elif st_groups[1] == "ip":
3385                   val = instance.nics[nic_idx].ip
3386                 elif st_groups[1] == "bridge":
3387                   val = instance.nics[nic_idx].bridge
3388                 else:
3389                   assert False, "Unhandled NIC parameter"
3390           else:
3391             assert False, "Unhandled variable parameter"
3392         else:
3393           raise errors.ParameterError(field)
3394         iout.append(val)
3395       output.append(iout)
3396
3397     return output
3398
3399
3400 class LUFailoverInstance(LogicalUnit):
3401   """Failover an instance.
3402
3403   """
3404   HPATH = "instance-failover"
3405   HTYPE = constants.HTYPE_INSTANCE
3406   _OP_REQP = ["instance_name", "ignore_consistency"]
3407   REQ_BGL = False
3408
3409   def ExpandNames(self):
3410     self._ExpandAndLockInstance()
3411     self.needed_locks[locking.LEVEL_NODE] = []
3412     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3413
3414   def DeclareLocks(self, level):
3415     if level == locking.LEVEL_NODE:
3416       self._LockInstancesNodes()
3417
3418   def BuildHooksEnv(self):
3419     """Build hooks env.
3420
3421     This runs on master, primary and secondary nodes of the instance.
3422
3423     """
3424     env = {
3425       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3426       }
3427     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3428     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3429     return env, nl, nl
3430
3431   def CheckPrereq(self):
3432     """Check prerequisites.
3433
3434     This checks that the instance is in the cluster.
3435
3436     """
3437     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3438     assert self.instance is not None, \
3439       "Cannot retrieve locked instance %s" % self.op.instance_name
3440
3441     bep = self.cfg.GetClusterInfo().FillBE(instance)
3442     if instance.disk_template not in constants.DTS_NET_MIRROR:
3443       raise errors.OpPrereqError("Instance's disk layout is not"
3444                                  " network mirrored, cannot failover.")
3445
3446     secondary_nodes = instance.secondary_nodes
3447     if not secondary_nodes:
3448       raise errors.ProgrammerError("no secondary node but using "
3449                                    "a mirrored disk template")
3450
3451     target_node = secondary_nodes[0]
3452     _CheckNodeOnline(self, target_node)
3453     _CheckNodeNotDrained(self, target_node)
3454     # check memory requirements on the secondary node
3455     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3456                          instance.name, bep[constants.BE_MEMORY],
3457                          instance.hypervisor)
3458
3459     # check bridge existance
3460     brlist = [nic.bridge for nic in instance.nics]
3461     result = self.rpc.call_bridges_exist(target_node, brlist)
3462     result.Raise()
3463     if not result.data:
3464       raise errors.OpPrereqError("One or more target bridges %s does not"
3465                                  " exist on destination node '%s'" %
3466                                  (brlist, target_node))
3467
3468   def Exec(self, feedback_fn):
3469     """Failover an instance.
3470
3471     The failover is done by shutting it down on its present node and
3472     starting it on the secondary.
3473
3474     """
3475     instance = self.instance
3476
3477     source_node = instance.primary_node
3478     target_node = instance.secondary_nodes[0]
3479
3480     feedback_fn("* checking disk consistency between source and target")
3481     for dev in instance.disks:
3482       # for drbd, these are drbd over lvm
3483       if not _CheckDiskConsistency(self, dev, target_node, False):
3484         if instance.admin_up and not self.op.ignore_consistency:
3485           raise errors.OpExecError("Disk %s is degraded on target node,"
3486                                    " aborting failover." % dev.iv_name)
3487
3488     feedback_fn("* shutting down instance on source node")
3489     logging.info("Shutting down instance %s on node %s",
3490                  instance.name, source_node)
3491
3492     result = self.rpc.call_instance_shutdown(source_node, instance)
3493     msg = result.RemoteFailMsg()
3494     if msg:
3495       if self.op.ignore_consistency:
3496         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3497                              " Proceeding anyway. Please make sure node"
3498                              " %s is down. Error details: %s",
3499                              instance.name, source_node, source_node, msg)
3500       else:
3501         raise errors.OpExecError("Could not shutdown instance %s on"
3502                                  " node %s: %s" %
3503                                  (instance.name, source_node, msg))
3504
3505     feedback_fn("* deactivating the instance's disks on source node")
3506     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3507       raise errors.OpExecError("Can't shut down the instance's disks.")
3508
3509     instance.primary_node = target_node
3510     # distribute new instance config to the other nodes
3511     self.cfg.Update(instance)
3512
3513     # Only start the instance if it's marked as up
3514     if instance.admin_up:
3515       feedback_fn("* activating the instance's disks on target node")
3516       logging.info("Starting instance %s on node %s",
3517                    instance.name, target_node)
3518
3519       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3520                                                ignore_secondaries=True)
3521       if not disks_ok:
3522         _ShutdownInstanceDisks(self, instance)
3523         raise errors.OpExecError("Can't activate the instance's disks")
3524
3525       feedback_fn("* starting the instance on the target node")
3526       result = self.rpc.call_instance_start(target_node, instance)
3527       msg = result.RemoteFailMsg()
3528       if msg:
3529         _ShutdownInstanceDisks(self, instance)
3530         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3531                                  (instance.name, target_node, msg))
3532
3533
3534 class LUMigrateInstance(LogicalUnit):
3535   """Migrate an instance.
3536
3537   This is migration without shutting down, compared to the failover,
3538   which is done with shutdown.
3539
3540   """
3541   HPATH = "instance-migrate"
3542   HTYPE = constants.HTYPE_INSTANCE
3543   _OP_REQP = ["instance_name", "live", "cleanup"]
3544
3545   REQ_BGL = False
3546
3547   def ExpandNames(self):
3548     self._ExpandAndLockInstance()
3549     self.needed_locks[locking.LEVEL_NODE] = []
3550     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3551
3552   def DeclareLocks(self, level):
3553     if level == locking.LEVEL_NODE:
3554       self._LockInstancesNodes()
3555
3556   def BuildHooksEnv(self):
3557     """Build hooks env.
3558
3559     This runs on master, primary and secondary nodes of the instance.
3560
3561     """
3562     env = _BuildInstanceHookEnvByObject(self, self.instance)
3563     env["MIGRATE_LIVE"] = self.op.live
3564     env["MIGRATE_CLEANUP"] = self.op.cleanup
3565     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3566     return env, nl, nl
3567
3568   def CheckPrereq(self):
3569     """Check prerequisites.
3570
3571     This checks that the instance is in the cluster.
3572
3573     """
3574     instance = self.cfg.GetInstanceInfo(
3575       self.cfg.ExpandInstanceName(self.op.instance_name))
3576     if instance is None:
3577       raise errors.OpPrereqError("Instance '%s' not known" %
3578                                  self.op.instance_name)
3579
3580     if instance.disk_template != constants.DT_DRBD8:
3581       raise errors.OpPrereqError("Instance's disk layout is not"
3582                                  " drbd8, cannot migrate.")
3583
3584     secondary_nodes = instance.secondary_nodes
3585     if not secondary_nodes:
3586       raise errors.ConfigurationError("No secondary node but using"
3587                                       " drbd8 disk template")
3588
3589     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3590
3591     target_node = secondary_nodes[0]
3592     # check memory requirements on the secondary node
3593     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3594                          instance.name, i_be[constants.BE_MEMORY],
3595                          instance.hypervisor)
3596
3597     # check bridge existance
3598     brlist = [nic.bridge for nic in instance.nics]
3599     result = self.rpc.call_bridges_exist(target_node, brlist)
3600     if result.failed or not result.data:
3601       raise errors.OpPrereqError("One or more target bridges %s does not"
3602                                  " exist on destination node '%s'" %
3603                                  (brlist, target_node))
3604
3605     if not self.op.cleanup:
3606       _CheckNodeNotDrained(self, target_node)
3607       result = self.rpc.call_instance_migratable(instance.primary_node,
3608                                                  instance)
3609       msg = result.RemoteFailMsg()
3610       if msg:
3611         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3612                                    msg)
3613
3614     self.instance = instance
3615
3616   def _WaitUntilSync(self):
3617     """Poll with custom rpc for disk sync.
3618
3619     This uses our own step-based rpc call.
3620
3621     """
3622     self.feedback_fn("* wait until resync is done")
3623     all_done = False
3624     while not all_done:
3625       all_done = True
3626       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3627                                             self.nodes_ip,
3628                                             self.instance.disks)
3629       min_percent = 100
3630       for node, nres in result.items():
3631         msg = nres.RemoteFailMsg()
3632         if msg:
3633           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3634                                    (node, msg))
3635         node_done, node_percent = nres.payload
3636         all_done = all_done and node_done
3637         if node_percent is not None:
3638           min_percent = min(min_percent, node_percent)
3639       if not all_done:
3640         if min_percent < 100:
3641           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3642         time.sleep(2)
3643
3644   def _EnsureSecondary(self, node):
3645     """Demote a node to secondary.
3646
3647     """
3648     self.feedback_fn("* switching node %s to secondary mode" % node)
3649
3650     for dev in self.instance.disks:
3651       self.cfg.SetDiskID(dev, node)
3652
3653     result = self.rpc.call_blockdev_close(node, self.instance.name,
3654                                           self.instance.disks)
3655     msg = result.RemoteFailMsg()
3656     if msg:
3657       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3658                                " error %s" % (node, msg))
3659
3660   def _GoStandalone(self):
3661     """Disconnect from the network.
3662
3663     """
3664     self.feedback_fn("* changing into standalone mode")
3665     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3666                                                self.instance.disks)
3667     for node, nres in result.items():
3668       msg = nres.RemoteFailMsg()
3669       if msg:
3670         raise errors.OpExecError("Cannot disconnect disks node %s,"
3671                                  " error %s" % (node, msg))
3672
3673   def _GoReconnect(self, multimaster):
3674     """Reconnect to the network.
3675
3676     """
3677     if multimaster:
3678       msg = "dual-master"
3679     else:
3680       msg = "single-master"
3681     self.feedback_fn("* changing disks into %s mode" % msg)
3682     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3683                                            self.instance.disks,
3684                                            self.instance.name, multimaster)
3685     for node, nres in result.items():
3686       msg = nres.RemoteFailMsg()
3687       if msg:
3688         raise errors.OpExecError("Cannot change disks config on node %s,"
3689                                  " error: %s" % (node, msg))
3690
3691   def _ExecCleanup(self):
3692     """Try to cleanup after a failed migration.
3693
3694     The cleanup is done by:
3695       - check that the instance is running only on one node
3696         (and update the config if needed)
3697       - change disks on its secondary node to secondary
3698       - wait until disks are fully synchronized
3699       - disconnect from the network
3700       - change disks into single-master mode
3701       - wait again until disks are fully synchronized
3702
3703     """
3704     instance = self.instance
3705     target_node = self.target_node
3706     source_node = self.source_node
3707
3708     # check running on only one node
3709     self.feedback_fn("* checking where the instance actually runs"
3710                      " (if this hangs, the hypervisor might be in"
3711                      " a bad state)")
3712     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3713     for node, result in ins_l.items():
3714       result.Raise()
3715       if not isinstance(result.data, list):
3716         raise errors.OpExecError("Can't contact node '%s'" % node)
3717
3718     runningon_source = instance.name in ins_l[source_node].data
3719     runningon_target = instance.name in ins_l[target_node].data
3720
3721     if runningon_source and runningon_target:
3722       raise errors.OpExecError("Instance seems to be running on two nodes,"
3723                                " or the hypervisor is confused. You will have"
3724                                " to ensure manually that it runs only on one"
3725                                " and restart this operation.")
3726
3727     if not (runningon_source or runningon_target):
3728       raise errors.OpExecError("Instance does not seem to be running at all."
3729                                " In this case, it's safer to repair by"
3730                                " running 'gnt-instance stop' to ensure disk"
3731                                " shutdown, and then restarting it.")
3732
3733     if runningon_target:
3734       # the migration has actually succeeded, we need to update the config
3735       self.feedback_fn("* instance running on secondary node (%s),"
3736                        " updating config" % target_node)
3737       instance.primary_node = target_node
3738       self.cfg.Update(instance)
3739       demoted_node = source_node
3740     else:
3741       self.feedback_fn("* instance confirmed to be running on its"
3742                        " primary node (%s)" % source_node)
3743       demoted_node = target_node
3744
3745     self._EnsureSecondary(demoted_node)
3746     try:
3747       self._WaitUntilSync()
3748     except errors.OpExecError:
3749       # we ignore here errors, since if the device is standalone, it
3750       # won't be able to sync
3751       pass
3752     self._GoStandalone()
3753     self._GoReconnect(False)
3754     self._WaitUntilSync()
3755
3756     self.feedback_fn("* done")
3757
3758   def _RevertDiskStatus(self):
3759     """Try to revert the disk status after a failed migration.
3760
3761     """
3762     target_node = self.target_node
3763     try:
3764       self._EnsureSecondary(target_node)
3765       self._GoStandalone()
3766       self._GoReconnect(False)
3767       self._WaitUntilSync()
3768     except errors.OpExecError, err:
3769       self.LogWarning("Migration failed and I can't reconnect the"
3770                       " drives: error '%s'\n"
3771                       "Please look and recover the instance status" %
3772                       str(err))
3773
3774   def _AbortMigration(self):
3775     """Call the hypervisor code to abort a started migration.
3776
3777     """
3778     instance = self.instance
3779     target_node = self.target_node
3780     migration_info = self.migration_info
3781
3782     abort_result = self.rpc.call_finalize_migration(target_node,
3783                                                     instance,
3784                                                     migration_info,
3785                                                     False)
3786     abort_msg = abort_result.RemoteFailMsg()
3787     if abort_msg:
3788       logging.error("Aborting migration failed on target node %s: %s" %
3789                     (target_node, abort_msg))
3790       # Don't raise an exception here, as we stil have to try to revert the
3791       # disk status, even if this step failed.
3792
3793   def _ExecMigration(self):
3794     """Migrate an instance.
3795
3796     The migrate is done by:
3797       - change the disks into dual-master mode
3798       - wait until disks are fully synchronized again
3799       - migrate the instance
3800       - change disks on the new secondary node (the old primary) to secondary
3801       - wait until disks are fully synchronized
3802       - change disks into single-master mode
3803
3804     """
3805     instance = self.instance
3806     target_node = self.target_node
3807     source_node = self.source_node
3808
3809     self.feedback_fn("* checking disk consistency between source and target")
3810     for dev in instance.disks:
3811       if not _CheckDiskConsistency(self, dev, target_node, False):
3812         raise errors.OpExecError("Disk %s is degraded or not fully"
3813                                  " synchronized on target node,"
3814                                  " aborting migrate." % dev.iv_name)
3815
3816     # First get the migration information from the remote node
3817     result = self.rpc.call_migration_info(source_node, instance)
3818     msg = result.RemoteFailMsg()
3819     if msg:
3820       log_err = ("Failed fetching source migration information from %s: %s" %
3821                  (source_node, msg))
3822       logging.error(log_err)
3823       raise errors.OpExecError(log_err)
3824
3825     self.migration_info = migration_info = result.payload
3826
3827     # Then switch the disks to master/master mode
3828     self._EnsureSecondary(target_node)
3829     self._GoStandalone()
3830     self._GoReconnect(True)
3831     self._WaitUntilSync()
3832
3833     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3834     result = self.rpc.call_accept_instance(target_node,
3835                                            instance,
3836                                            migration_info,
3837                                            self.nodes_ip[target_node])
3838
3839     msg = result.RemoteFailMsg()
3840     if msg:
3841       logging.error("Instance pre-migration failed, trying to revert"
3842                     " disk status: %s", msg)
3843       self._AbortMigration()
3844       self._RevertDiskStatus()
3845       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3846                                (instance.name, msg))
3847
3848     self.feedback_fn("* migrating instance to %s" % target_node)
3849     time.sleep(10)
3850     result = self.rpc.call_instance_migrate(source_node, instance,
3851                                             self.nodes_ip[target_node],
3852                                             self.op.live)
3853     msg = result.RemoteFailMsg()
3854     if msg:
3855       logging.error("Instance migration failed, trying to revert"
3856                     " disk status: %s", msg)
3857       self._AbortMigration()
3858       self._RevertDiskStatus()
3859       raise errors.OpExecError("Could not migrate instance %s: %s" %
3860                                (instance.name, msg))
3861     time.sleep(10)
3862
3863     instance.primary_node = target_node
3864     # distribute new instance config to the other nodes
3865     self.cfg.Update(instance)
3866
3867     result = self.rpc.call_finalize_migration(target_node,
3868                                               instance,
3869                                               migration_info,
3870                                               True)
3871     msg = result.RemoteFailMsg()
3872     if msg:
3873       logging.error("Instance migration succeeded, but finalization failed:"
3874                     " %s" % msg)
3875       raise errors.OpExecError("Could not finalize instance migration: %s" %
3876                                msg)
3877
3878     self._EnsureSecondary(source_node)
3879     self._WaitUntilSync()
3880     self._GoStandalone()
3881     self._GoReconnect(False)
3882     self._WaitUntilSync()
3883
3884     self.feedback_fn("* done")
3885
3886   def Exec(self, feedback_fn):
3887     """Perform the migration.
3888
3889     """
3890     self.feedback_fn = feedback_fn
3891
3892     self.source_node = self.instance.primary_node
3893     self.target_node = self.instance.secondary_nodes[0]
3894     self.all_nodes = [self.source_node, self.target_node]
3895     self.nodes_ip = {
3896       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3897       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3898       }
3899     if self.op.cleanup:
3900       return self._ExecCleanup()
3901     else:
3902       return self._ExecMigration()
3903
3904
3905 def _CreateBlockDev(lu, node, instance, device, force_create,
3906                     info, force_open):
3907   """Create a tree of block devices on a given node.
3908
3909   If this device type has to be created on secondaries, create it and
3910   all its children.
3911
3912   If not, just recurse to children keeping the same 'force' value.
3913
3914   @param lu: the lu on whose behalf we execute
3915   @param node: the node on which to create the device
3916   @type instance: L{objects.Instance}
3917   @param instance: the instance which owns the device
3918   @type device: L{objects.Disk}
3919   @param device: the device to create
3920   @type force_create: boolean
3921   @param force_create: whether to force creation of this device; this
3922       will be change to True whenever we find a device which has
3923       CreateOnSecondary() attribute
3924   @param info: the extra 'metadata' we should attach to the device
3925       (this will be represented as a LVM tag)
3926   @type force_open: boolean
3927   @param force_open: this parameter will be passes to the
3928       L{backend.BlockdevCreate} function where it specifies
3929       whether we run on primary or not, and it affects both
3930       the child assembly and the device own Open() execution
3931
3932   """
3933   if device.CreateOnSecondary():
3934     force_create = True
3935
3936   if device.children:
3937     for child in device.children:
3938       _CreateBlockDev(lu, node, instance, child, force_create,
3939                       info, force_open)
3940
3941   if not force_create:
3942     return
3943
3944   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3945
3946
3947 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3948   """Create a single block device on a given node.
3949
3950   This will not recurse over children of the device, so they must be
3951   created in advance.
3952
3953   @param lu: the lu on whose behalf we execute
3954   @param node: the node on which to create the device
3955   @type instance: L{objects.Instance}
3956   @param instance: the instance which owns the device
3957   @type device: L{objects.Disk}
3958   @param device: the device to create
3959   @param info: the extra 'metadata' we should attach to the device
3960       (this will be represented as a LVM tag)
3961   @type force_open: boolean
3962   @param force_open: this parameter will be passes to the
3963       L{backend.BlockdevCreate} function where it specifies
3964       whether we run on primary or not, and it affects both
3965       the child assembly and the device own Open() execution
3966
3967   """
3968   lu.cfg.SetDiskID(device, node)
3969   result = lu.rpc.call_blockdev_create(node, device, device.size,
3970                                        instance.name, force_open, info)
3971   msg = result.RemoteFailMsg()
3972   if msg:
3973     raise errors.OpExecError("Can't create block device %s on"
3974                              " node %s for instance %s: %s" %
3975                              (device, node, instance.name, msg))
3976   if device.physical_id is None:
3977     device.physical_id = result.payload
3978
3979
3980 def _GenerateUniqueNames(lu, exts):
3981   """Generate a suitable LV name.
3982
3983   This will generate a logical volume name for the given instance.
3984
3985   """
3986   results = []
3987   for val in exts:
3988     new_id = lu.cfg.GenerateUniqueID()
3989     results.append("%s%s" % (new_id, val))
3990   return results
3991
3992
3993 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3994                          p_minor, s_minor):
3995   """Generate a drbd8 device complete with its children.
3996
3997   """
3998   port = lu.cfg.AllocatePort()
3999   vgname = lu.cfg.GetVGName()
4000   shared_secret = lu.cfg.GenerateDRBDSecret()
4001   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4002                           logical_id=(vgname, names[0]))
4003   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4004                           logical_id=(vgname, names[1]))
4005   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4006                           logical_id=(primary, secondary, port,
4007                                       p_minor, s_minor,
4008                                       shared_secret),
4009                           children=[dev_data, dev_meta],
4010                           iv_name=iv_name)
4011   return drbd_dev
4012
4013
4014 def _GenerateDiskTemplate(lu, template_name,
4015                           instance_name, primary_node,
4016                           secondary_nodes, disk_info,
4017                           file_storage_dir, file_driver,
4018                           base_index):
4019   """Generate the entire disk layout for a given template type.
4020
4021   """
4022   #TODO: compute space requirements
4023
4024   vgname = lu.cfg.GetVGName()
4025   disk_count = len(disk_info)
4026   disks = []
4027   if template_name == constants.DT_DISKLESS:
4028     pass
4029   elif template_name == constants.DT_PLAIN:
4030     if len(secondary_nodes) != 0:
4031       raise errors.ProgrammerError("Wrong template configuration")
4032
4033     names = _GenerateUniqueNames(lu, [".disk%d" % i
4034                                       for i in range(disk_count)])
4035     for idx, disk in enumerate(disk_info):
4036       disk_index = idx + base_index
4037       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4038                               logical_id=(vgname, names[idx]),
4039                               iv_name="disk/%d" % disk_index,
4040                               mode=disk["mode"])
4041       disks.append(disk_dev)
4042   elif template_name == constants.DT_DRBD8:
4043     if len(secondary_nodes) != 1:
4044       raise errors.ProgrammerError("Wrong template configuration")
4045     remote_node = secondary_nodes[0]
4046     minors = lu.cfg.AllocateDRBDMinor(
4047       [primary_node, remote_node] * len(disk_info), instance_name)
4048
4049     names = []
4050     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4051                                                for i in range(disk_count)]):
4052       names.append(lv_prefix + "_data")
4053       names.append(lv_prefix + "_meta")
4054     for idx, disk in enumerate(disk_info):
4055       disk_index = idx + base_index
4056       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4057                                       disk["size"], names[idx*2:idx*2+2],
4058                                       "disk/%d" % disk_index,
4059                                       minors[idx*2], minors[idx*2+1])
4060       disk_dev.mode = disk["mode"]
4061       disks.append(disk_dev)
4062   elif template_name == constants.DT_FILE:
4063     if len(secondary_nodes) != 0:
4064       raise errors.ProgrammerError("Wrong template configuration")
4065
4066     for idx, disk in enumerate(disk_info):
4067       disk_index = idx + base_index
4068       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4069                               iv_name="disk/%d" % disk_index,
4070                               logical_id=(file_driver,
4071                                           "%s/disk%d" % (file_storage_dir,
4072                                                          disk_index)),
4073                               mode=disk["mode"])
4074       disks.append(disk_dev)
4075   else:
4076     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4077   return disks
4078
4079
4080 def _GetInstanceInfoText(instance):
4081   """Compute that text that should be added to the disk's metadata.
4082
4083   """
4084   return "originstname+%s" % instance.name
4085
4086
4087 def _CreateDisks(lu, instance):
4088   """Create all disks for an instance.
4089
4090   This abstracts away some work from AddInstance.
4091
4092   @type lu: L{LogicalUnit}
4093   @param lu: the logical unit on whose behalf we execute
4094   @type instance: L{objects.Instance}
4095   @param instance: the instance whose disks we should create
4096   @rtype: boolean
4097   @return: the success of the creation
4098
4099   """
4100   info = _GetInstanceInfoText(instance)
4101   pnode = instance.primary_node
4102
4103   if instance.disk_template == constants.DT_FILE:
4104     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4105     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4106
4107     if result.failed or not result.data:
4108       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4109
4110     if not result.data[0]:
4111       raise errors.OpExecError("Failed to create directory '%s'" %
4112                                file_storage_dir)
4113
4114   # Note: this needs to be kept in sync with adding of disks in
4115   # LUSetInstanceParams
4116   for device in instance.disks:
4117     logging.info("Creating volume %s for instance %s",
4118                  device.iv_name, instance.name)
4119     #HARDCODE
4120     for node in instance.all_nodes:
4121       f_create = node == pnode
4122       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4123
4124
4125 def _RemoveDisks(lu, instance):
4126   """Remove all disks for an instance.
4127
4128   This abstracts away some work from `AddInstance()` and
4129   `RemoveInstance()`. Note that in case some of the devices couldn't
4130   be removed, the removal will continue with the other ones (compare
4131   with `_CreateDisks()`).
4132
4133   @type lu: L{LogicalUnit}
4134   @param lu: the logical unit on whose behalf we execute
4135   @type instance: L{objects.Instance}
4136   @param instance: the instance whose disks we should remove
4137   @rtype: boolean
4138   @return: the success of the removal
4139
4140   """
4141   logging.info("Removing block devices for instance %s", instance.name)
4142
4143   all_result = True
4144   for device in instance.disks:
4145     for node, disk in device.ComputeNodeTree(instance.primary_node):
4146       lu.cfg.SetDiskID(disk, node)
4147       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4148       if msg:
4149         lu.LogWarning("Could not remove block device %s on node %s,"
4150                       " continuing anyway: %s", device.iv_name, node, msg)
4151         all_result = False
4152
4153   if instance.disk_template == constants.DT_FILE:
4154     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4155     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4156                                                  file_storage_dir)
4157     if result.failed or not result.data:
4158       logging.error("Could not remove directory '%s'", file_storage_dir)
4159       all_result = False
4160
4161   return all_result
4162
4163
4164 def _ComputeDiskSize(disk_template, disks):
4165   """Compute disk size requirements in the volume group
4166
4167   """
4168   # Required free disk space as a function of disk and swap space
4169   req_size_dict = {
4170     constants.DT_DISKLESS: None,
4171     constants.DT_PLAIN: sum(d["size"] for d in disks),
4172     # 128 MB are added for drbd metadata for each disk
4173     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4174     constants.DT_FILE: None,
4175   }
4176
4177   if disk_template not in req_size_dict:
4178     raise errors.ProgrammerError("Disk template '%s' size requirement"
4179                                  " is unknown" %  disk_template)
4180
4181   return req_size_dict[disk_template]
4182
4183
4184 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4185   """Hypervisor parameter validation.
4186
4187   This function abstract the hypervisor parameter validation to be
4188   used in both instance create and instance modify.
4189
4190   @type lu: L{LogicalUnit}
4191   @param lu: the logical unit for which we check
4192   @type nodenames: list
4193   @param nodenames: the list of nodes on which we should check
4194   @type hvname: string
4195   @param hvname: the name of the hypervisor we should use
4196   @type hvparams: dict
4197   @param hvparams: the parameters which we need to check
4198   @raise errors.OpPrereqError: if the parameters are not valid
4199
4200   """
4201   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4202                                                   hvname,
4203                                                   hvparams)
4204   for node in nodenames:
4205     info = hvinfo[node]
4206     if info.offline:
4207       continue
4208     msg = info.RemoteFailMsg()
4209     if msg:
4210       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4211                                  " %s" % msg)
4212
4213
4214 class LUCreateInstance(LogicalUnit):
4215   """Create an instance.
4216
4217   """
4218   HPATH = "instance-add"
4219   HTYPE = constants.HTYPE_INSTANCE
4220   _OP_REQP = ["instance_name", "disks", "disk_template",
4221               "mode", "start",
4222               "wait_for_sync", "ip_check", "nics",
4223               "hvparams", "beparams"]
4224   REQ_BGL = False
4225
4226   def _ExpandNode(self, node):
4227     """Expands and checks one node name.
4228
4229     """
4230     node_full = self.cfg.ExpandNodeName(node)
4231     if node_full is None:
4232       raise errors.OpPrereqError("Unknown node %s" % node)
4233     return node_full
4234
4235   def ExpandNames(self):
4236     """ExpandNames for CreateInstance.
4237
4238     Figure out the right locks for instance creation.
4239
4240     """
4241     self.needed_locks = {}
4242
4243     # set optional parameters to none if they don't exist
4244     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4245       if not hasattr(self.op, attr):
4246         setattr(self.op, attr, None)
4247
4248     # cheap checks, mostly valid constants given
4249
4250     # verify creation mode
4251     if self.op.mode not in (constants.INSTANCE_CREATE,
4252                             constants.INSTANCE_IMPORT):
4253       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4254                                  self.op.mode)
4255
4256     # disk template and mirror node verification
4257     if self.op.disk_template not in constants.DISK_TEMPLATES:
4258       raise errors.OpPrereqError("Invalid disk template name")
4259
4260     if self.op.hypervisor is None:
4261       self.op.hypervisor = self.cfg.GetHypervisorType()
4262
4263     cluster = self.cfg.GetClusterInfo()
4264     enabled_hvs = cluster.enabled_hypervisors
4265     if self.op.hypervisor not in enabled_hvs:
4266       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4267                                  " cluster (%s)" % (self.op.hypervisor,
4268                                   ",".join(enabled_hvs)))
4269
4270     # check hypervisor parameter syntax (locally)
4271     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4272     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4273                                   self.op.hvparams)
4274     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4275     hv_type.CheckParameterSyntax(filled_hvp)
4276
4277     # fill and remember the beparams dict
4278     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4279     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4280                                     self.op.beparams)
4281
4282     #### instance parameters check
4283
4284     # instance name verification
4285     hostname1 = utils.HostInfo(self.op.instance_name)
4286     self.op.instance_name = instance_name = hostname1.name
4287
4288     # this is just a preventive check, but someone might still add this
4289     # instance in the meantime, and creation will fail at lock-add time
4290     if instance_name in self.cfg.GetInstanceList():
4291       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4292                                  instance_name)
4293
4294     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4295
4296     # NIC buildup
4297     self.nics = []
4298     for nic in self.op.nics:
4299       # ip validity checks
4300       ip = nic.get("ip", None)
4301       if ip is None or ip.lower() == "none":
4302         nic_ip = None
4303       elif ip.lower() == constants.VALUE_AUTO:
4304         nic_ip = hostname1.ip
4305       else:
4306         if not utils.IsValidIP(ip):
4307           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4308                                      " like a valid IP" % ip)
4309         nic_ip = ip
4310
4311       # MAC address verification
4312       mac = nic.get("mac", constants.VALUE_AUTO)
4313       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4314         if not utils.IsValidMac(mac.lower()):
4315           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4316                                      mac)
4317       # bridge verification
4318       bridge = nic.get("bridge", None)
4319       if bridge is None:
4320         bridge = self.cfg.GetDefBridge()
4321       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4322
4323     # disk checks/pre-build
4324     self.disks = []
4325     for disk in self.op.disks:
4326       mode = disk.get("mode", constants.DISK_RDWR)
4327       if mode not in constants.DISK_ACCESS_SET:
4328         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4329                                    mode)
4330       size = disk.get("size", None)
4331       if size is None:
4332         raise errors.OpPrereqError("Missing disk size")
4333       try:
4334         size = int(size)
4335       except ValueError:
4336         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4337       self.disks.append({"size": size, "mode": mode})
4338
4339     # used in CheckPrereq for ip ping check
4340     self.check_ip = hostname1.ip
4341
4342     # file storage checks
4343     if (self.op.file_driver and
4344         not self.op.file_driver in constants.FILE_DRIVER):
4345       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4346                                  self.op.file_driver)
4347
4348     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4349       raise errors.OpPrereqError("File storage directory path not absolute")
4350
4351     ### Node/iallocator related checks
4352     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4353       raise errors.OpPrereqError("One and only one of iallocator and primary"
4354                                  " node must be given")
4355
4356     if self.op.iallocator:
4357       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4358     else:
4359       self.op.pnode = self._ExpandNode(self.op.pnode)
4360       nodelist = [self.op.pnode]
4361       if self.op.snode is not None:
4362         self.op.snode = self._ExpandNode(self.op.snode)
4363         nodelist.append(self.op.snode)
4364       self.needed_locks[locking.LEVEL_NODE] = nodelist
4365
4366     # in case of import lock the source node too
4367     if self.op.mode == constants.INSTANCE_IMPORT:
4368       src_node = getattr(self.op, "src_node", None)
4369       src_path = getattr(self.op, "src_path", None)
4370
4371       if src_path is None:
4372         self.op.src_path = src_path = self.op.instance_name
4373
4374       if src_node is None:
4375         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4376         self.op.src_node = None
4377         if os.path.isabs(src_path):
4378           raise errors.OpPrereqError("Importing an instance from an absolute"
4379                                      " path requires a source node option.")
4380       else:
4381         self.op.src_node = src_node = self._ExpandNode(src_node)
4382         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4383           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4384         if not os.path.isabs(src_path):
4385           self.op.src_path = src_path = \
4386             os.path.join(constants.EXPORT_DIR, src_path)
4387
4388     else: # INSTANCE_CREATE
4389       if getattr(self.op, "os_type", None) is None:
4390         raise errors.OpPrereqError("No guest OS specified")
4391
4392   def _RunAllocator(self):
4393     """Run the allocator based on input opcode.
4394
4395     """
4396     nics = [n.ToDict() for n in self.nics]
4397     ial = IAllocator(self,
4398                      mode=constants.IALLOCATOR_MODE_ALLOC,
4399                      name=self.op.instance_name,
4400                      disk_template=self.op.disk_template,
4401                      tags=[],
4402                      os=self.op.os_type,
4403                      vcpus=self.be_full[constants.BE_VCPUS],
4404                      mem_size=self.be_full[constants.BE_MEMORY],
4405                      disks=self.disks,
4406                      nics=nics,
4407                      hypervisor=self.op.hypervisor,
4408                      )
4409
4410     ial.Run(self.op.iallocator)
4411
4412     if not ial.success:
4413       raise errors.OpPrereqError("Can't compute nodes using"
4414                                  " iallocator '%s': %s" % (self.op.iallocator,
4415                                                            ial.info))
4416     if len(ial.nodes) != ial.required_nodes:
4417       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4418                                  " of nodes (%s), required %s" %
4419                                  (self.op.iallocator, len(ial.nodes),
4420                                   ial.required_nodes))
4421     self.op.pnode = ial.nodes[0]
4422     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4423                  self.op.instance_name, self.op.iallocator,
4424                  ", ".join(ial.nodes))
4425     if ial.required_nodes == 2:
4426       self.op.snode = ial.nodes[1]
4427
4428   def BuildHooksEnv(self):
4429     """Build hooks env.
4430
4431     This runs on master, primary and secondary nodes of the instance.
4432
4433     """
4434     env = {
4435       "ADD_MODE": self.op.mode,
4436       }
4437     if self.op.mode == constants.INSTANCE_IMPORT:
4438       env["SRC_NODE"] = self.op.src_node
4439       env["SRC_PATH"] = self.op.src_path
4440       env["SRC_IMAGES"] = self.src_images
4441
4442     env.update(_BuildInstanceHookEnv(
4443       name=self.op.instance_name,
4444       primary_node=self.op.pnode,
4445       secondary_nodes=self.secondaries,
4446       status=self.op.start,
4447       os_type=self.op.os_type,
4448       memory=self.be_full[constants.BE_MEMORY],
4449       vcpus=self.be_full[constants.BE_VCPUS],
4450       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4451       disk_template=self.op.disk_template,
4452       disks=[(d["size"], d["mode"]) for d in self.disks],
4453     ))
4454
4455     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4456           self.secondaries)
4457     return env, nl, nl
4458
4459
4460   def CheckPrereq(self):
4461     """Check prerequisites.
4462
4463     """
4464     if (not self.cfg.GetVGName() and
4465         self.op.disk_template not in constants.DTS_NOT_LVM):
4466       raise errors.OpPrereqError("Cluster does not support lvm-based"
4467                                  " instances")
4468
4469     if self.op.mode == constants.INSTANCE_IMPORT:
4470       src_node = self.op.src_node
4471       src_path = self.op.src_path
4472
4473       if src_node is None:
4474         exp_list = self.rpc.call_export_list(
4475           self.acquired_locks[locking.LEVEL_NODE])
4476         found = False
4477         for node in exp_list:
4478           if not exp_list[node].failed and src_path in exp_list[node].data:
4479             found = True
4480             self.op.src_node = src_node = node
4481             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4482                                                        src_path)
4483             break
4484         if not found:
4485           raise errors.OpPrereqError("No export found for relative path %s" %
4486                                       src_path)
4487
4488       _CheckNodeOnline(self, src_node)
4489       result = self.rpc.call_export_info(src_node, src_path)
4490       result.Raise()
4491       if not result.data:
4492         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4493
4494       export_info = result.data
4495       if not export_info.has_section(constants.INISECT_EXP):
4496         raise errors.ProgrammerError("Corrupted export config")
4497
4498       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4499       if (int(ei_version) != constants.EXPORT_VERSION):
4500         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4501                                    (ei_version, constants.EXPORT_VERSION))
4502
4503       # Check that the new instance doesn't have less disks than the export
4504       instance_disks = len(self.disks)
4505       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4506       if instance_disks < export_disks:
4507         raise errors.OpPrereqError("Not enough disks to import."
4508                                    " (instance: %d, export: %d)" %
4509                                    (instance_disks, export_disks))
4510
4511       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4512       disk_images = []
4513       for idx in range(export_disks):
4514         option = 'disk%d_dump' % idx
4515         if export_info.has_option(constants.INISECT_INS, option):
4516           # FIXME: are the old os-es, disk sizes, etc. useful?
4517           export_name = export_info.get(constants.INISECT_INS, option)
4518           image = os.path.join(src_path, export_name)
4519           disk_images.append(image)
4520         else:
4521           disk_images.append(False)
4522
4523       self.src_images = disk_images
4524
4525       old_name = export_info.get(constants.INISECT_INS, 'name')
4526       # FIXME: int() here could throw a ValueError on broken exports
4527       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4528       if self.op.instance_name == old_name:
4529         for idx, nic in enumerate(self.nics):
4530           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4531             nic_mac_ini = 'nic%d_mac' % idx
4532             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4533
4534     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4535     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4536     if self.op.start and not self.op.ip_check:
4537       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4538                                  " adding an instance in start mode")
4539
4540     if self.op.ip_check:
4541       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4542         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4543                                    (self.check_ip, self.op.instance_name))
4544
4545     #### mac address generation
4546     # By generating here the mac address both the allocator and the hooks get
4547     # the real final mac address rather than the 'auto' or 'generate' value.
4548     # There is a race condition between the generation and the instance object
4549     # creation, which means that we know the mac is valid now, but we're not
4550     # sure it will be when we actually add the instance. If things go bad
4551     # adding the instance will abort because of a duplicate mac, and the
4552     # creation job will fail.
4553     for nic in self.nics:
4554       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4555         nic.mac = self.cfg.GenerateMAC()
4556
4557     #### allocator run
4558
4559     if self.op.iallocator is not None:
4560       self._RunAllocator()
4561
4562     #### node related checks
4563
4564     # check primary node
4565     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4566     assert self.pnode is not None, \
4567       "Cannot retrieve locked node %s" % self.op.pnode
4568     if pnode.offline:
4569       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4570                                  pnode.name)
4571     if pnode.drained:
4572       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4573                                  pnode.name)
4574
4575     self.secondaries = []
4576
4577     # mirror node verification
4578     if self.op.disk_template in constants.DTS_NET_MIRROR:
4579       if self.op.snode is None:
4580         raise errors.OpPrereqError("The networked disk templates need"
4581                                    " a mirror node")
4582       if self.op.snode == pnode.name:
4583         raise errors.OpPrereqError("The secondary node cannot be"
4584                                    " the primary node.")
4585       _CheckNodeOnline(self, self.op.snode)
4586       _CheckNodeNotDrained(self, self.op.snode)
4587       self.secondaries.append(self.op.snode)
4588
4589     nodenames = [pnode.name] + self.secondaries
4590
4591     req_size = _ComputeDiskSize(self.op.disk_template,
4592                                 self.disks)
4593
4594     # Check lv size requirements
4595     if req_size is not None:
4596       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4597                                          self.op.hypervisor)
4598       for node in nodenames:
4599         info = nodeinfo[node]
4600         info.Raise()
4601         info = info.data
4602         if not info:
4603           raise errors.OpPrereqError("Cannot get current information"
4604                                      " from node '%s'" % node)
4605         vg_free = info.get('vg_free', None)
4606         if not isinstance(vg_free, int):
4607           raise errors.OpPrereqError("Can't compute free disk space on"
4608                                      " node %s" % node)
4609         if req_size > info['vg_free']:
4610           raise errors.OpPrereqError("Not enough disk space on target node %s."
4611                                      " %d MB available, %d MB required" %
4612                                      (node, info['vg_free'], req_size))
4613
4614     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4615
4616     # os verification
4617     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4618     result.Raise()
4619     if not isinstance(result.data, objects.OS):
4620       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4621                                  " primary node"  % self.op.os_type)
4622
4623     # bridge check on primary node
4624     bridges = [n.bridge for n in self.nics]
4625     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4626     result.Raise()
4627     if not result.data:
4628       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4629                                  " exist on destination node '%s'" %
4630                                  (",".join(bridges), pnode.name))
4631
4632     # memory check on primary node
4633     if self.op.start:
4634       _CheckNodeFreeMemory(self, self.pnode.name,
4635                            "creating instance %s" % self.op.instance_name,
4636                            self.be_full[constants.BE_MEMORY],
4637                            self.op.hypervisor)
4638
4639   def Exec(self, feedback_fn):
4640     """Create and add the instance to the cluster.
4641
4642     """
4643     instance = self.op.instance_name
4644     pnode_name = self.pnode.name
4645
4646     ht_kind = self.op.hypervisor
4647     if ht_kind in constants.HTS_REQ_PORT:
4648       network_port = self.cfg.AllocatePort()
4649     else:
4650       network_port = None
4651
4652     ##if self.op.vnc_bind_address is None:
4653     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4654
4655     # this is needed because os.path.join does not accept None arguments
4656     if self.op.file_storage_dir is None:
4657       string_file_storage_dir = ""
4658     else:
4659       string_file_storage_dir = self.op.file_storage_dir
4660
4661     # build the full file storage dir path
4662     file_storage_dir = os.path.normpath(os.path.join(
4663                                         self.cfg.GetFileStorageDir(),
4664                                         string_file_storage_dir, instance))
4665
4666
4667     disks = _GenerateDiskTemplate(self,
4668                                   self.op.disk_template,
4669                                   instance, pnode_name,
4670                                   self.secondaries,
4671                                   self.disks,
4672                                   file_storage_dir,
4673                                   self.op.file_driver,
4674                                   0)
4675
4676     iobj = objects.Instance(name=instance, os=self.op.os_type,
4677                             primary_node=pnode_name,
4678                             nics=self.nics, disks=disks,
4679                             disk_template=self.op.disk_template,
4680                             admin_up=False,
4681                             network_port=network_port,
4682                             beparams=self.op.beparams,
4683                             hvparams=self.op.hvparams,
4684                             hypervisor=self.op.hypervisor,
4685                             )
4686
4687     feedback_fn("* creating instance disks...")
4688     try:
4689       _CreateDisks(self, iobj)
4690     except errors.OpExecError:
4691       self.LogWarning("Device creation failed, reverting...")
4692       try:
4693         _RemoveDisks(self, iobj)
4694       finally:
4695         self.cfg.ReleaseDRBDMinors(instance)
4696         raise
4697
4698     feedback_fn("adding instance %s to cluster config" % instance)
4699
4700     self.cfg.AddInstance(iobj)
4701     # Declare that we don't want to remove the instance lock anymore, as we've
4702     # added the instance to the config
4703     del self.remove_locks[locking.LEVEL_INSTANCE]
4704     # Unlock all the nodes
4705     if self.op.mode == constants.INSTANCE_IMPORT:
4706       nodes_keep = [self.op.src_node]
4707       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4708                        if node != self.op.src_node]
4709       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4710       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4711     else:
4712       self.context.glm.release(locking.LEVEL_NODE)
4713       del self.acquired_locks[locking.LEVEL_NODE]
4714
4715     if self.op.wait_for_sync:
4716       disk_abort = not _WaitForSync(self, iobj)
4717     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4718       # make sure the disks are not degraded (still sync-ing is ok)
4719       time.sleep(15)
4720       feedback_fn("* checking mirrors status")
4721       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4722     else:
4723       disk_abort = False
4724
4725     if disk_abort:
4726       _RemoveDisks(self, iobj)
4727       self.cfg.RemoveInstance(iobj.name)
4728       # Make sure the instance lock gets removed
4729       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4730       raise errors.OpExecError("There are some degraded disks for"
4731                                " this instance")
4732
4733     feedback_fn("creating os for instance %s on node %s" %
4734                 (instance, pnode_name))
4735
4736     if iobj.disk_template != constants.DT_DISKLESS:
4737       if self.op.mode == constants.INSTANCE_CREATE:
4738         feedback_fn("* running the instance OS create scripts...")
4739         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4740         msg = result.RemoteFailMsg()
4741         if msg:
4742           raise errors.OpExecError("Could not add os for instance %s"
4743                                    " on node %s: %s" %
4744                                    (instance, pnode_name, msg))
4745
4746       elif self.op.mode == constants.INSTANCE_IMPORT:
4747         feedback_fn("* running the instance OS import scripts...")
4748         src_node = self.op.src_node
4749         src_images = self.src_images
4750         cluster_name = self.cfg.GetClusterName()
4751         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4752                                                          src_node, src_images,
4753                                                          cluster_name)
4754         import_result.Raise()
4755         for idx, result in enumerate(import_result.data):
4756           if not result:
4757             self.LogWarning("Could not import the image %s for instance"
4758                             " %s, disk %d, on node %s" %
4759                             (src_images[idx], instance, idx, pnode_name))
4760       else:
4761         # also checked in the prereq part
4762         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4763                                      % self.op.mode)
4764
4765     if self.op.start:
4766       iobj.admin_up = True
4767       self.cfg.Update(iobj)
4768       logging.info("Starting instance %s on node %s", instance, pnode_name)
4769       feedback_fn("* starting instance...")
4770       result = self.rpc.call_instance_start(pnode_name, iobj)
4771       msg = result.RemoteFailMsg()
4772       if msg:
4773         raise errors.OpExecError("Could not start instance: %s" % msg)
4774
4775
4776 class LUConnectConsole(NoHooksLU):
4777   """Connect to an instance's console.
4778
4779   This is somewhat special in that it returns the command line that
4780   you need to run on the master node in order to connect to the
4781   console.
4782
4783   """
4784   _OP_REQP = ["instance_name"]
4785   REQ_BGL = False
4786
4787   def ExpandNames(self):
4788     self._ExpandAndLockInstance()
4789
4790   def CheckPrereq(self):
4791     """Check prerequisites.
4792
4793     This checks that the instance is in the cluster.
4794
4795     """
4796     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4797     assert self.instance is not None, \
4798       "Cannot retrieve locked instance %s" % self.op.instance_name
4799     _CheckNodeOnline(self, self.instance.primary_node)
4800
4801   def Exec(self, feedback_fn):
4802     """Connect to the console of an instance
4803
4804     """
4805     instance = self.instance
4806     node = instance.primary_node
4807
4808     node_insts = self.rpc.call_instance_list([node],
4809                                              [instance.hypervisor])[node]
4810     node_insts.Raise()
4811
4812     if instance.name not in node_insts.data:
4813       raise errors.OpExecError("Instance %s is not running." % instance.name)
4814
4815     logging.debug("Connecting to console of %s on %s", instance.name, node)
4816
4817     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4818     cluster = self.cfg.GetClusterInfo()
4819     # beparams and hvparams are passed separately, to avoid editing the
4820     # instance and then saving the defaults in the instance itself.
4821     hvparams = cluster.FillHV(instance)
4822     beparams = cluster.FillBE(instance)
4823     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4824
4825     # build ssh cmdline
4826     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4827
4828
4829 class LUReplaceDisks(LogicalUnit):
4830   """Replace the disks of an instance.
4831
4832   """
4833   HPATH = "mirrors-replace"
4834   HTYPE = constants.HTYPE_INSTANCE
4835   _OP_REQP = ["instance_name", "mode", "disks"]
4836   REQ_BGL = False
4837
4838   def CheckArguments(self):
4839     if not hasattr(self.op, "remote_node"):
4840       self.op.remote_node = None
4841     if not hasattr(self.op, "iallocator"):
4842       self.op.iallocator = None
4843
4844     # check for valid parameter combination
4845     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4846     if self.op.mode == constants.REPLACE_DISK_CHG:
4847       if cnt == 2:
4848         raise errors.OpPrereqError("When changing the secondary either an"
4849                                    " iallocator script must be used or the"
4850                                    " new node given")
4851       elif cnt == 0:
4852         raise errors.OpPrereqError("Give either the iallocator or the new"
4853                                    " secondary, not both")
4854     else: # not replacing the secondary
4855       if cnt != 2:
4856         raise errors.OpPrereqError("The iallocator and new node options can"
4857                                    " be used only when changing the"
4858                                    " secondary node")
4859
4860   def ExpandNames(self):
4861     self._ExpandAndLockInstance()
4862
4863     if self.op.iallocator is not None:
4864       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4865     elif self.op.remote_node is not None:
4866       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4867       if remote_node is None:
4868         raise errors.OpPrereqError("Node '%s' not known" %
4869                                    self.op.remote_node)
4870       self.op.remote_node = remote_node
4871       # Warning: do not remove the locking of the new secondary here
4872       # unless DRBD8.AddChildren is changed to work in parallel;
4873       # currently it doesn't since parallel invocations of
4874       # FindUnusedMinor will conflict
4875       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4876       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4877     else:
4878       self.needed_locks[locking.LEVEL_NODE] = []
4879       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4880
4881   def DeclareLocks(self, level):
4882     # If we're not already locking all nodes in the set we have to declare the
4883     # instance's primary/secondary nodes.
4884     if (level == locking.LEVEL_NODE and
4885         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4886       self._LockInstancesNodes()
4887
4888   def _RunAllocator(self):
4889     """Compute a new secondary node using an IAllocator.
4890
4891     """
4892     ial = IAllocator(self,
4893                      mode=constants.IALLOCATOR_MODE_RELOC,
4894                      name=self.op.instance_name,
4895                      relocate_from=[self.sec_node])
4896
4897     ial.Run(self.op.iallocator)
4898
4899     if not ial.success:
4900       raise errors.OpPrereqError("Can't compute nodes using"
4901                                  " iallocator '%s': %s" % (self.op.iallocator,
4902                                                            ial.info))
4903     if len(ial.nodes) != ial.required_nodes:
4904       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4905                                  " of nodes (%s), required %s" %
4906                                  (len(ial.nodes), ial.required_nodes))
4907     self.op.remote_node = ial.nodes[0]
4908     self.LogInfo("Selected new secondary for the instance: %s",
4909                  self.op.remote_node)
4910
4911   def BuildHooksEnv(self):
4912     """Build hooks env.
4913
4914     This runs on the master, the primary and all the secondaries.
4915
4916     """
4917     env = {
4918       "MODE": self.op.mode,
4919       "NEW_SECONDARY": self.op.remote_node,
4920       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4921       }
4922     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4923     nl = [
4924       self.cfg.GetMasterNode(),
4925       self.instance.primary_node,
4926       ]
4927     if self.op.remote_node is not None:
4928       nl.append(self.op.remote_node)
4929     return env, nl, nl
4930
4931   def CheckPrereq(self):
4932     """Check prerequisites.
4933
4934     This checks that the instance is in the cluster.
4935
4936     """
4937     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4938     assert instance is not None, \
4939       "Cannot retrieve locked instance %s" % self.op.instance_name
4940     self.instance = instance
4941
4942     if instance.disk_template != constants.DT_DRBD8:
4943       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4944                                  " instances")
4945
4946     if len(instance.secondary_nodes) != 1:
4947       raise errors.OpPrereqError("The instance has a strange layout,"
4948                                  " expected one secondary but found %d" %
4949                                  len(instance.secondary_nodes))
4950
4951     self.sec_node = instance.secondary_nodes[0]
4952
4953     if self.op.iallocator is not None:
4954       self._RunAllocator()
4955
4956     remote_node = self.op.remote_node
4957     if remote_node is not None:
4958       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4959       assert self.remote_node_info is not None, \
4960         "Cannot retrieve locked node %s" % remote_node
4961     else:
4962       self.remote_node_info = None
4963     if remote_node == instance.primary_node:
4964       raise errors.OpPrereqError("The specified node is the primary node of"
4965                                  " the instance.")
4966     elif remote_node == self.sec_node:
4967       raise errors.OpPrereqError("The specified node is already the"
4968                                  " secondary node of the instance.")
4969
4970     if self.op.mode == constants.REPLACE_DISK_PRI:
4971       n1 = self.tgt_node = instance.primary_node
4972       n2 = self.oth_node = self.sec_node
4973     elif self.op.mode == constants.REPLACE_DISK_SEC:
4974       n1 = self.tgt_node = self.sec_node
4975       n2 = self.oth_node = instance.primary_node
4976     elif self.op.mode == constants.REPLACE_DISK_CHG:
4977       n1 = self.new_node = remote_node
4978       n2 = self.oth_node = instance.primary_node
4979       self.tgt_node = self.sec_node
4980       _CheckNodeNotDrained(self, remote_node)
4981     else:
4982       raise errors.ProgrammerError("Unhandled disk replace mode")
4983
4984     _CheckNodeOnline(self, n1)
4985     _CheckNodeOnline(self, n2)
4986
4987     if not self.op.disks:
4988       self.op.disks = range(len(instance.disks))
4989
4990     for disk_idx in self.op.disks:
4991       instance.FindDisk(disk_idx)
4992
4993   def _ExecD8DiskOnly(self, feedback_fn):
4994     """Replace a disk on the primary or secondary for dbrd8.
4995
4996     The algorithm for replace is quite complicated:
4997
4998       1. for each disk to be replaced:
4999
5000         1. create new LVs on the target node with unique names
5001         1. detach old LVs from the drbd device
5002         1. rename old LVs to name_replaced.<time_t>
5003         1. rename new LVs to old LVs
5004         1. attach the new LVs (with the old names now) to the drbd device
5005
5006       1. wait for sync across all devices
5007
5008       1. for each modified disk:
5009
5010         1. remove old LVs (which have the name name_replaces.<time_t>)
5011
5012     Failures are not very well handled.
5013
5014     """
5015     steps_total = 6
5016     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5017     instance = self.instance
5018     iv_names = {}
5019     vgname = self.cfg.GetVGName()
5020     # start of work
5021     cfg = self.cfg
5022     tgt_node = self.tgt_node
5023     oth_node = self.oth_node
5024
5025     # Step: check device activation
5026     self.proc.LogStep(1, steps_total, "check device existence")
5027     info("checking volume groups")
5028     my_vg = cfg.GetVGName()
5029     results = self.rpc.call_vg_list([oth_node, tgt_node])
5030     if not results:
5031       raise errors.OpExecError("Can't list volume groups on the nodes")
5032     for node in oth_node, tgt_node:
5033       res = results[node]
5034       if res.failed or not res.data or my_vg not in res.data:
5035         raise errors.OpExecError("Volume group '%s' not found on %s" %
5036                                  (my_vg, node))
5037     for idx, dev in enumerate(instance.disks):
5038       if idx not in self.op.disks:
5039         continue
5040       for node in tgt_node, oth_node:
5041         info("checking disk/%d on %s" % (idx, node))
5042         cfg.SetDiskID(dev, node)
5043         result = self.rpc.call_blockdev_find(node, dev)
5044         msg = result.RemoteFailMsg()
5045         if not msg and not result.payload:
5046           msg = "disk not found"
5047         if msg:
5048           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5049                                    (idx, node, msg))
5050
5051     # Step: check other node consistency
5052     self.proc.LogStep(2, steps_total, "check peer consistency")
5053     for idx, dev in enumerate(instance.disks):
5054       if idx not in self.op.disks:
5055         continue
5056       info("checking disk/%d consistency on %s" % (idx, oth_node))
5057       if not _CheckDiskConsistency(self, dev, oth_node,
5058                                    oth_node==instance.primary_node):
5059         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5060                                  " to replace disks on this node (%s)" %
5061                                  (oth_node, tgt_node))
5062
5063     # Step: create new storage
5064     self.proc.LogStep(3, steps_total, "allocate new storage")
5065     for idx, dev in enumerate(instance.disks):
5066       if idx not in self.op.disks:
5067         continue
5068       size = dev.size
5069       cfg.SetDiskID(dev, tgt_node)
5070       lv_names = [".disk%d_%s" % (idx, suf)
5071                   for suf in ["data", "meta"]]
5072       names = _GenerateUniqueNames(self, lv_names)
5073       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5074                              logical_id=(vgname, names[0]))
5075       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5076                              logical_id=(vgname, names[1]))
5077       new_lvs = [lv_data, lv_meta]
5078       old_lvs = dev.children
5079       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5080       info("creating new local storage on %s for %s" %
5081            (tgt_node, dev.iv_name))
5082       # we pass force_create=True to force the LVM creation
5083       for new_lv in new_lvs:
5084         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5085                         _GetInstanceInfoText(instance), False)
5086
5087     # Step: for each lv, detach+rename*2+attach
5088     self.proc.LogStep(4, steps_total, "change drbd configuration")
5089     for dev, old_lvs, new_lvs in iv_names.itervalues():
5090       info("detaching %s drbd from local storage" % dev.iv_name)
5091       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5092       result.Raise()
5093       if not result.data:
5094         raise errors.OpExecError("Can't detach drbd from local storage on node"
5095                                  " %s for device %s" % (tgt_node, dev.iv_name))
5096       #dev.children = []
5097       #cfg.Update(instance)
5098
5099       # ok, we created the new LVs, so now we know we have the needed
5100       # storage; as such, we proceed on the target node to rename
5101       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5102       # using the assumption that logical_id == physical_id (which in
5103       # turn is the unique_id on that node)
5104
5105       # FIXME(iustin): use a better name for the replaced LVs
5106       temp_suffix = int(time.time())
5107       ren_fn = lambda d, suff: (d.physical_id[0],
5108                                 d.physical_id[1] + "_replaced-%s" % suff)
5109       # build the rename list based on what LVs exist on the node
5110       rlist = []
5111       for to_ren in old_lvs:
5112         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5113         if not result.RemoteFailMsg() and result.payload:
5114           # device exists
5115           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5116
5117       info("renaming the old LVs on the target node")
5118       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5119       result.Raise()
5120       if not result.data:
5121         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5122       # now we rename the new LVs to the old LVs
5123       info("renaming the new LVs on the target node")
5124       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5125       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5126       result.Raise()
5127       if not result.data:
5128         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5129
5130       for old, new in zip(old_lvs, new_lvs):
5131         new.logical_id = old.logical_id
5132         cfg.SetDiskID(new, tgt_node)
5133
5134       for disk in old_lvs:
5135         disk.logical_id = ren_fn(disk, temp_suffix)
5136         cfg.SetDiskID(disk, tgt_node)
5137
5138       # now that the new lvs have the old name, we can add them to the device
5139       info("adding new mirror component on %s" % tgt_node)
5140       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5141       if result.failed or not result.data:
5142         for new_lv in new_lvs:
5143           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5144           if msg:
5145             warning("Can't rollback device %s: %s", dev, msg,
5146                     hint="cleanup manually the unused logical volumes")
5147         raise errors.OpExecError("Can't add local storage to drbd")
5148
5149       dev.children = new_lvs
5150       cfg.Update(instance)
5151
5152     # Step: wait for sync
5153
5154     # this can fail as the old devices are degraded and _WaitForSync
5155     # does a combined result over all disks, so we don't check its
5156     # return value
5157     self.proc.LogStep(5, steps_total, "sync devices")
5158     _WaitForSync(self, instance, unlock=True)
5159
5160     # so check manually all the devices
5161     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5162       cfg.SetDiskID(dev, instance.primary_node)
5163       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5164       msg = result.RemoteFailMsg()
5165       if not msg and not result.payload:
5166         msg = "disk not found"
5167       if msg:
5168         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5169                                  (name, msg))
5170       if result.payload[5]:
5171         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5172
5173     # Step: remove old storage
5174     self.proc.LogStep(6, steps_total, "removing old storage")
5175     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5176       info("remove logical volumes for %s" % name)
5177       for lv in old_lvs:
5178         cfg.SetDiskID(lv, tgt_node)
5179         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5180         if msg:
5181           warning("Can't remove old LV: %s" % msg,
5182                   hint="manually remove unused LVs")
5183           continue
5184
5185   def _ExecD8Secondary(self, feedback_fn):
5186     """Replace the secondary node for drbd8.
5187
5188     The algorithm for replace is quite complicated:
5189       - for all disks of the instance:
5190         - create new LVs on the new node with same names
5191         - shutdown the drbd device on the old secondary
5192         - disconnect the drbd network on the primary
5193         - create the drbd device on the new secondary
5194         - network attach the drbd on the primary, using an artifice:
5195           the drbd code for Attach() will connect to the network if it
5196           finds a device which is connected to the good local disks but
5197           not network enabled
5198       - wait for sync across all devices
5199       - remove all disks from the old secondary
5200
5201     Failures are not very well handled.
5202
5203     """
5204     steps_total = 6
5205     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5206     instance = self.instance
5207     iv_names = {}
5208     # start of work
5209     cfg = self.cfg
5210     old_node = self.tgt_node
5211     new_node = self.new_node
5212     pri_node = instance.primary_node
5213     nodes_ip = {
5214       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5215       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5216       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5217       }
5218
5219     # Step: check device activation
5220     self.proc.LogStep(1, steps_total, "check device existence")
5221     info("checking volume groups")
5222     my_vg = cfg.GetVGName()
5223     results = self.rpc.call_vg_list([pri_node, new_node])
5224     for node in pri_node, new_node:
5225       res = results[node]
5226       if res.failed or not res.data or my_vg not in res.data:
5227         raise errors.OpExecError("Volume group '%s' not found on %s" %
5228                                  (my_vg, node))
5229     for idx, dev in enumerate(instance.disks):
5230       if idx not in self.op.disks:
5231         continue
5232       info("checking disk/%d on %s" % (idx, pri_node))
5233       cfg.SetDiskID(dev, pri_node)
5234       result = self.rpc.call_blockdev_find(pri_node, dev)
5235       msg = result.RemoteFailMsg()
5236       if not msg and not result.payload:
5237         msg = "disk not found"
5238       if msg:
5239         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5240                                  (idx, pri_node, msg))
5241
5242     # Step: check other node consistency
5243     self.proc.LogStep(2, steps_total, "check peer consistency")
5244     for idx, dev in enumerate(instance.disks):
5245       if idx not in self.op.disks:
5246         continue
5247       info("checking disk/%d consistency on %s" % (idx, pri_node))
5248       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5249         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5250                                  " unsafe to replace the secondary" %
5251                                  pri_node)
5252
5253     # Step: create new storage
5254     self.proc.LogStep(3, steps_total, "allocate new storage")
5255     for idx, dev in enumerate(instance.disks):
5256       info("adding new local storage on %s for disk/%d" %
5257            (new_node, idx))
5258       # we pass force_create=True to force LVM creation
5259       for new_lv in dev.children:
5260         _CreateBlockDev(self, new_node, instance, new_lv, True,
5261                         _GetInstanceInfoText(instance), False)
5262
5263     # Step 4: dbrd minors and drbd setups changes
5264     # after this, we must manually remove the drbd minors on both the
5265     # error and the success paths
5266     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5267                                    instance.name)
5268     logging.debug("Allocated minors %s" % (minors,))
5269     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5270     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5271       size = dev.size
5272       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5273       # create new devices on new_node; note that we create two IDs:
5274       # one without port, so the drbd will be activated without
5275       # networking information on the new node at this stage, and one
5276       # with network, for the latter activation in step 4
5277       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5278       if pri_node == o_node1:
5279         p_minor = o_minor1
5280       else:
5281         p_minor = o_minor2
5282
5283       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5284       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5285
5286       iv_names[idx] = (dev, dev.children, new_net_id)
5287       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5288                     new_net_id)
5289       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5290                               logical_id=new_alone_id,
5291                               children=dev.children)
5292       try:
5293         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5294                               _GetInstanceInfoText(instance), False)
5295       except errors.BlockDeviceError:
5296         self.cfg.ReleaseDRBDMinors(instance.name)
5297         raise
5298
5299     for idx, dev in enumerate(instance.disks):
5300       # we have new devices, shutdown the drbd on the old secondary
5301       info("shutting down drbd for disk/%d on old node" % idx)
5302       cfg.SetDiskID(dev, old_node)
5303       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5304       if msg:
5305         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5306                 (idx, msg),
5307                 hint="Please cleanup this device manually as soon as possible")
5308
5309     info("detaching primary drbds from the network (=> standalone)")
5310     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5311                                                instance.disks)[pri_node]
5312
5313     msg = result.RemoteFailMsg()
5314     if msg:
5315       # detaches didn't succeed (unlikely)
5316       self.cfg.ReleaseDRBDMinors(instance.name)
5317       raise errors.OpExecError("Can't detach the disks from the network on"
5318                                " old node: %s" % (msg,))
5319
5320     # if we managed to detach at least one, we update all the disks of
5321     # the instance to point to the new secondary
5322     info("updating instance configuration")
5323     for dev, _, new_logical_id in iv_names.itervalues():
5324       dev.logical_id = new_logical_id
5325       cfg.SetDiskID(dev, pri_node)
5326     cfg.Update(instance)
5327
5328     # and now perform the drbd attach
5329     info("attaching primary drbds to new secondary (standalone => connected)")
5330     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5331                                            instance.disks, instance.name,
5332                                            False)
5333     for to_node, to_result in result.items():
5334       msg = to_result.RemoteFailMsg()
5335       if msg:
5336         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5337                 hint="please do a gnt-instance info to see the"
5338                 " status of disks")
5339
5340     # this can fail as the old devices are degraded and _WaitForSync
5341     # does a combined result over all disks, so we don't check its
5342     # return value
5343     self.proc.LogStep(5, steps_total, "sync devices")
5344     _WaitForSync(self, instance, unlock=True)
5345
5346     # so check manually all the devices
5347     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5348       cfg.SetDiskID(dev, pri_node)
5349       result = self.rpc.call_blockdev_find(pri_node, dev)
5350       msg = result.RemoteFailMsg()
5351       if not msg and not result.payload:
5352         msg = "disk not found"
5353       if msg:
5354         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5355                                  (idx, msg))
5356       if result.payload[5]:
5357         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5358
5359     self.proc.LogStep(6, steps_total, "removing old storage")
5360     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5361       info("remove logical volumes for disk/%d" % idx)
5362       for lv in old_lvs:
5363         cfg.SetDiskID(lv, old_node)
5364         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5365         if msg:
5366           warning("Can't remove LV on old secondary: %s", msg,
5367                   hint="Cleanup stale volumes by hand")
5368
5369   def Exec(self, feedback_fn):
5370     """Execute disk replacement.
5371
5372     This dispatches the disk replacement to the appropriate handler.
5373
5374     """
5375     instance = self.instance
5376
5377     # Activate the instance disks if we're replacing them on a down instance
5378     if not instance.admin_up:
5379       _StartInstanceDisks(self, instance, True)
5380
5381     if self.op.mode == constants.REPLACE_DISK_CHG:
5382       fn = self._ExecD8Secondary
5383     else:
5384       fn = self._ExecD8DiskOnly
5385
5386     ret = fn(feedback_fn)
5387
5388     # Deactivate the instance disks if we're replacing them on a down instance
5389     if not instance.admin_up:
5390       _SafeShutdownInstanceDisks(self, instance)
5391
5392     return ret
5393
5394
5395 class LUGrowDisk(LogicalUnit):
5396   """Grow a disk of an instance.
5397
5398   """
5399   HPATH = "disk-grow"
5400   HTYPE = constants.HTYPE_INSTANCE
5401   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5402   REQ_BGL = False
5403
5404   def ExpandNames(self):
5405     self._ExpandAndLockInstance()
5406     self.needed_locks[locking.LEVEL_NODE] = []
5407     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5408
5409   def DeclareLocks(self, level):
5410     if level == locking.LEVEL_NODE:
5411       self._LockInstancesNodes()
5412
5413   def BuildHooksEnv(self):
5414     """Build hooks env.
5415
5416     This runs on the master, the primary and all the secondaries.
5417
5418     """
5419     env = {
5420       "DISK": self.op.disk,
5421       "AMOUNT": self.op.amount,
5422       }
5423     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5424     nl = [
5425       self.cfg.GetMasterNode(),
5426       self.instance.primary_node,
5427       ]
5428     return env, nl, nl
5429
5430   def CheckPrereq(self):
5431     """Check prerequisites.
5432
5433     This checks that the instance is in the cluster.
5434
5435     """
5436     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5437     assert instance is not None, \
5438       "Cannot retrieve locked instance %s" % self.op.instance_name
5439     nodenames = list(instance.all_nodes)
5440     for node in nodenames:
5441       _CheckNodeOnline(self, node)
5442
5443
5444     self.instance = instance
5445
5446     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5447       raise errors.OpPrereqError("Instance's disk layout does not support"
5448                                  " growing.")
5449
5450     self.disk = instance.FindDisk(self.op.disk)
5451
5452     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5453                                        instance.hypervisor)
5454     for node in nodenames:
5455       info = nodeinfo[node]
5456       if info.failed or not info.data:
5457         raise errors.OpPrereqError("Cannot get current information"
5458                                    " from node '%s'" % node)
5459       vg_free = info.data.get('vg_free', None)
5460       if not isinstance(vg_free, int):
5461         raise errors.OpPrereqError("Can't compute free disk space on"
5462                                    " node %s" % node)
5463       if self.op.amount > vg_free:
5464         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5465                                    " %d MiB available, %d MiB required" %
5466                                    (node, vg_free, self.op.amount))
5467
5468   def Exec(self, feedback_fn):
5469     """Execute disk grow.
5470
5471     """
5472     instance = self.instance
5473     disk = self.disk
5474     for node in instance.all_nodes:
5475       self.cfg.SetDiskID(disk, node)
5476       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5477       msg = result.RemoteFailMsg()
5478       if msg:
5479         raise errors.OpExecError("Grow request failed to node %s: %s" %
5480                                  (node, msg))
5481     disk.RecordGrow(self.op.amount)
5482     self.cfg.Update(instance)
5483     if self.op.wait_for_sync:
5484       disk_abort = not _WaitForSync(self, instance)
5485       if disk_abort:
5486         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5487                              " status.\nPlease check the instance.")
5488
5489
5490 class LUQueryInstanceData(NoHooksLU):
5491   """Query runtime instance data.
5492
5493   """
5494   _OP_REQP = ["instances", "static"]
5495   REQ_BGL = False
5496
5497   def ExpandNames(self):
5498     self.needed_locks = {}
5499     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5500
5501     if not isinstance(self.op.instances, list):
5502       raise errors.OpPrereqError("Invalid argument type 'instances'")
5503
5504     if self.op.instances:
5505       self.wanted_names = []
5506       for name in self.op.instances:
5507         full_name = self.cfg.ExpandInstanceName(name)
5508         if full_name is None:
5509           raise errors.OpPrereqError("Instance '%s' not known" % name)
5510         self.wanted_names.append(full_name)
5511       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5512     else:
5513       self.wanted_names = None
5514       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5515
5516     self.needed_locks[locking.LEVEL_NODE] = []
5517     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5518
5519   def DeclareLocks(self, level):
5520     if level == locking.LEVEL_NODE:
5521       self._LockInstancesNodes()
5522
5523   def CheckPrereq(self):
5524     """Check prerequisites.
5525
5526     This only checks the optional instance list against the existing names.
5527
5528     """
5529     if self.wanted_names is None:
5530       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5531
5532     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5533                              in self.wanted_names]
5534     return
5535
5536   def _ComputeDiskStatus(self, instance, snode, dev):
5537     """Compute block device status.
5538
5539     """
5540     static = self.op.static
5541     if not static:
5542       self.cfg.SetDiskID(dev, instance.primary_node)
5543       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5544       if dev_pstatus.offline:
5545         dev_pstatus = None
5546       else:
5547         msg = dev_pstatus.RemoteFailMsg()
5548         if msg:
5549           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5550                                    (instance.name, msg))
5551         dev_pstatus = dev_pstatus.payload
5552     else:
5553       dev_pstatus = None
5554
5555     if dev.dev_type in constants.LDS_DRBD:
5556       # we change the snode then (otherwise we use the one passed in)
5557       if dev.logical_id[0] == instance.primary_node:
5558         snode = dev.logical_id[1]
5559       else:
5560         snode = dev.logical_id[0]
5561
5562     if snode and not static:
5563       self.cfg.SetDiskID(dev, snode)
5564       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5565       if dev_sstatus.offline:
5566         dev_sstatus = None
5567       else:
5568         msg = dev_sstatus.RemoteFailMsg()
5569         if msg:
5570           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5571                                    (instance.name, msg))
5572         dev_sstatus = dev_sstatus.payload
5573     else:
5574       dev_sstatus = None
5575
5576     if dev.children:
5577       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5578                       for child in dev.children]
5579     else:
5580       dev_children = []
5581
5582     data = {
5583       "iv_name": dev.iv_name,
5584       "dev_type": dev.dev_type,
5585       "logical_id": dev.logical_id,
5586       "physical_id": dev.physical_id,
5587       "pstatus": dev_pstatus,
5588       "sstatus": dev_sstatus,
5589       "children": dev_children,
5590       "mode": dev.mode,
5591       }
5592
5593     return data
5594
5595   def Exec(self, feedback_fn):
5596     """Gather and return data"""
5597     result = {}
5598
5599     cluster = self.cfg.GetClusterInfo()
5600
5601     for instance in self.wanted_instances:
5602       if not self.op.static:
5603         remote_info = self.rpc.call_instance_info(instance.primary_node,
5604                                                   instance.name,
5605                                                   instance.hypervisor)
5606         remote_info.Raise()
5607         remote_info = remote_info.data
5608         if remote_info and "state" in remote_info:
5609           remote_state = "up"
5610         else:
5611           remote_state = "down"
5612       else:
5613         remote_state = None
5614       if instance.admin_up:
5615         config_state = "up"
5616       else:
5617         config_state = "down"
5618
5619       disks = [self._ComputeDiskStatus(instance, None, device)
5620                for device in instance.disks]
5621
5622       idict = {
5623         "name": instance.name,
5624         "config_state": config_state,
5625         "run_state": remote_state,
5626         "pnode": instance.primary_node,
5627         "snodes": instance.secondary_nodes,
5628         "os": instance.os,
5629         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5630         "disks": disks,
5631         "hypervisor": instance.hypervisor,
5632         "network_port": instance.network_port,
5633         "hv_instance": instance.hvparams,
5634         "hv_actual": cluster.FillHV(instance),
5635         "be_instance": instance.beparams,
5636         "be_actual": cluster.FillBE(instance),
5637         }
5638
5639       result[instance.name] = idict
5640
5641     return result
5642
5643
5644 class LUSetInstanceParams(LogicalUnit):
5645   """Modifies an instances's parameters.
5646
5647   """
5648   HPATH = "instance-modify"
5649   HTYPE = constants.HTYPE_INSTANCE
5650   _OP_REQP = ["instance_name"]
5651   REQ_BGL = False
5652
5653   def CheckArguments(self):
5654     if not hasattr(self.op, 'nics'):
5655       self.op.nics = []
5656     if not hasattr(self.op, 'disks'):
5657       self.op.disks = []
5658     if not hasattr(self.op, 'beparams'):
5659       self.op.beparams = {}
5660     if not hasattr(self.op, 'hvparams'):
5661       self.op.hvparams = {}
5662     self.op.force = getattr(self.op, "force", False)
5663     if not (self.op.nics or self.op.disks or
5664             self.op.hvparams or self.op.beparams):
5665       raise errors.OpPrereqError("No changes submitted")
5666
5667     # Disk validation
5668     disk_addremove = 0
5669     for disk_op, disk_dict in self.op.disks:
5670       if disk_op == constants.DDM_REMOVE:
5671         disk_addremove += 1
5672         continue
5673       elif disk_op == constants.DDM_ADD:
5674         disk_addremove += 1
5675       else:
5676         if not isinstance(disk_op, int):
5677           raise errors.OpPrereqError("Invalid disk index")
5678       if disk_op == constants.DDM_ADD:
5679         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5680         if mode not in constants.DISK_ACCESS_SET:
5681           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5682         size = disk_dict.get('size', None)
5683         if size is None:
5684           raise errors.OpPrereqError("Required disk parameter size missing")
5685         try:
5686           size = int(size)
5687         except ValueError, err:
5688           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5689                                      str(err))
5690         disk_dict['size'] = size
5691       else:
5692         # modification of disk
5693         if 'size' in disk_dict:
5694           raise errors.OpPrereqError("Disk size change not possible, use"
5695                                      " grow-disk")
5696
5697     if disk_addremove > 1:
5698       raise errors.OpPrereqError("Only one disk add or remove operation"
5699                                  " supported at a time")
5700
5701     # NIC validation
5702     nic_addremove = 0
5703     for nic_op, nic_dict in self.op.nics:
5704       if nic_op == constants.DDM_REMOVE:
5705         nic_addremove += 1
5706         continue
5707       elif nic_op == constants.DDM_ADD:
5708         nic_addremove += 1
5709       else:
5710         if not isinstance(nic_op, int):
5711           raise errors.OpPrereqError("Invalid nic index")
5712
5713       # nic_dict should be a dict
5714       nic_ip = nic_dict.get('ip', None)
5715       if nic_ip is not None:
5716         if nic_ip.lower() == constants.VALUE_NONE:
5717           nic_dict['ip'] = None
5718         else:
5719           if not utils.IsValidIP(nic_ip):
5720             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5721
5722       if nic_op == constants.DDM_ADD:
5723         nic_bridge = nic_dict.get('bridge', None)
5724         if nic_bridge is None:
5725           nic_dict['bridge'] = self.cfg.GetDefBridge()
5726         nic_mac = nic_dict.get('mac', None)
5727         if nic_mac is None:
5728           nic_dict['mac'] = constants.VALUE_AUTO
5729
5730       if 'mac' in nic_dict:
5731         nic_mac = nic_dict['mac']
5732         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5733           if not utils.IsValidMac(nic_mac):
5734             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5735         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5736           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5737                                      " modifying an existing nic")
5738
5739     if nic_addremove > 1:
5740       raise errors.OpPrereqError("Only one NIC add or remove operation"
5741                                  " supported at a time")
5742
5743   def ExpandNames(self):
5744     self._ExpandAndLockInstance()
5745     self.needed_locks[locking.LEVEL_NODE] = []
5746     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5747
5748   def DeclareLocks(self, level):
5749     if level == locking.LEVEL_NODE:
5750       self._LockInstancesNodes()
5751
5752   def BuildHooksEnv(self):
5753     """Build hooks env.
5754
5755     This runs on the master, primary and secondaries.
5756
5757     """
5758     args = dict()
5759     if constants.BE_MEMORY in self.be_new:
5760       args['memory'] = self.be_new[constants.BE_MEMORY]
5761     if constants.BE_VCPUS in self.be_new:
5762       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5763     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5764     # information at all.
5765     if self.op.nics:
5766       args['nics'] = []
5767       nic_override = dict(self.op.nics)
5768       for idx, nic in enumerate(self.instance.nics):
5769         if idx in nic_override:
5770           this_nic_override = nic_override[idx]
5771         else:
5772           this_nic_override = {}
5773         if 'ip' in this_nic_override:
5774           ip = this_nic_override['ip']
5775         else:
5776           ip = nic.ip
5777         if 'bridge' in this_nic_override:
5778           bridge = this_nic_override['bridge']
5779         else:
5780           bridge = nic.bridge
5781         if 'mac' in this_nic_override:
5782           mac = this_nic_override['mac']
5783         else:
5784           mac = nic.mac
5785         args['nics'].append((ip, bridge, mac))
5786       if constants.DDM_ADD in nic_override:
5787         ip = nic_override[constants.DDM_ADD].get('ip', None)
5788         bridge = nic_override[constants.DDM_ADD]['bridge']
5789         mac = nic_override[constants.DDM_ADD]['mac']
5790         args['nics'].append((ip, bridge, mac))
5791       elif constants.DDM_REMOVE in nic_override:
5792         del args['nics'][-1]
5793
5794     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5795     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5796     return env, nl, nl
5797
5798   def CheckPrereq(self):
5799     """Check prerequisites.
5800
5801     This only checks the instance list against the existing names.
5802
5803     """
5804     force = self.force = self.op.force
5805
5806     # checking the new params on the primary/secondary nodes
5807
5808     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5809     assert self.instance is not None, \
5810       "Cannot retrieve locked instance %s" % self.op.instance_name
5811     pnode = instance.primary_node
5812     nodelist = list(instance.all_nodes)
5813
5814     # hvparams processing
5815     if self.op.hvparams:
5816       i_hvdict = copy.deepcopy(instance.hvparams)
5817       for key, val in self.op.hvparams.iteritems():
5818         if val == constants.VALUE_DEFAULT:
5819           try:
5820             del i_hvdict[key]
5821           except KeyError:
5822             pass
5823         else:
5824           i_hvdict[key] = val
5825       cluster = self.cfg.GetClusterInfo()
5826       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5827       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5828                                 i_hvdict)
5829       # local check
5830       hypervisor.GetHypervisor(
5831         instance.hypervisor).CheckParameterSyntax(hv_new)
5832       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5833       self.hv_new = hv_new # the new actual values
5834       self.hv_inst = i_hvdict # the new dict (without defaults)
5835     else:
5836       self.hv_new = self.hv_inst = {}
5837
5838     # beparams processing
5839     if self.op.beparams:
5840       i_bedict = copy.deepcopy(instance.beparams)
5841       for key, val in self.op.beparams.iteritems():
5842         if val == constants.VALUE_DEFAULT:
5843           try:
5844             del i_bedict[key]
5845           except KeyError:
5846             pass
5847         else:
5848           i_bedict[key] = val
5849       cluster = self.cfg.GetClusterInfo()
5850       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5851       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5852                                 i_bedict)
5853       self.be_new = be_new # the new actual values
5854       self.be_inst = i_bedict # the new dict (without defaults)
5855     else:
5856       self.be_new = self.be_inst = {}
5857
5858     self.warn = []
5859
5860     if constants.BE_MEMORY in self.op.beparams and not self.force:
5861       mem_check_list = [pnode]
5862       if be_new[constants.BE_AUTO_BALANCE]:
5863         # either we changed auto_balance to yes or it was from before
5864         mem_check_list.extend(instance.secondary_nodes)
5865       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5866                                                   instance.hypervisor)
5867       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5868                                          instance.hypervisor)
5869       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5870         # Assume the primary node is unreachable and go ahead
5871         self.warn.append("Can't get info from primary node %s" % pnode)
5872       else:
5873         if not instance_info.failed and instance_info.data:
5874           current_mem = instance_info.data['memory']
5875         else:
5876           # Assume instance not running
5877           # (there is a slight race condition here, but it's not very probable,
5878           # and we have no other way to check)
5879           current_mem = 0
5880         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5881                     nodeinfo[pnode].data['memory_free'])
5882         if miss_mem > 0:
5883           raise errors.OpPrereqError("This change will prevent the instance"
5884                                      " from starting, due to %d MB of memory"
5885                                      " missing on its primary node" % miss_mem)
5886
5887       if be_new[constants.BE_AUTO_BALANCE]:
5888         for node, nres in nodeinfo.iteritems():
5889           if node not in instance.secondary_nodes:
5890             continue
5891           if nres.failed or not isinstance(nres.data, dict):
5892             self.warn.append("Can't get info from secondary node %s" % node)
5893           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5894             self.warn.append("Not enough memory to failover instance to"
5895                              " secondary node %s" % node)
5896
5897     # NIC processing
5898     for nic_op, nic_dict in self.op.nics:
5899       if nic_op == constants.DDM_REMOVE:
5900         if not instance.nics:
5901           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5902         continue
5903       if nic_op != constants.DDM_ADD:
5904         # an existing nic
5905         if nic_op < 0 or nic_op >= len(instance.nics):
5906           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5907                                      " are 0 to %d" %
5908                                      (nic_op, len(instance.nics)))
5909       if 'bridge' in nic_dict:
5910         nic_bridge = nic_dict['bridge']
5911         if nic_bridge is None:
5912           raise errors.OpPrereqError('Cannot set the nic bridge to None')
5913         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5914           msg = ("Bridge '%s' doesn't exist on one of"
5915                  " the instance nodes" % nic_bridge)
5916           if self.force:
5917             self.warn.append(msg)
5918           else:
5919             raise errors.OpPrereqError(msg)
5920       if 'mac' in nic_dict:
5921         nic_mac = nic_dict['mac']
5922         if nic_mac is None:
5923           raise errors.OpPrereqError('Cannot set the nic mac to None')
5924         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5925           # otherwise generate the mac
5926           nic_dict['mac'] = self.cfg.GenerateMAC()
5927         else:
5928           # or validate/reserve the current one
5929           if self.cfg.IsMacInUse(nic_mac):
5930             raise errors.OpPrereqError("MAC address %s already in use"
5931                                        " in cluster" % nic_mac)
5932
5933     # DISK processing
5934     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5935       raise errors.OpPrereqError("Disk operations not supported for"
5936                                  " diskless instances")
5937     for disk_op, disk_dict in self.op.disks:
5938       if disk_op == constants.DDM_REMOVE:
5939         if len(instance.disks) == 1:
5940           raise errors.OpPrereqError("Cannot remove the last disk of"
5941                                      " an instance")
5942         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5943         ins_l = ins_l[pnode]
5944         if ins_l.failed or not isinstance(ins_l.data, list):
5945           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5946         if instance.name in ins_l.data:
5947           raise errors.OpPrereqError("Instance is running, can't remove"
5948                                      " disks.")
5949
5950       if (disk_op == constants.DDM_ADD and
5951           len(instance.nics) >= constants.MAX_DISKS):
5952         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5953                                    " add more" % constants.MAX_DISKS)
5954       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5955         # an existing disk
5956         if disk_op < 0 or disk_op >= len(instance.disks):
5957           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5958                                      " are 0 to %d" %
5959                                      (disk_op, len(instance.disks)))
5960
5961     return
5962
5963   def Exec(self, feedback_fn):
5964     """Modifies an instance.
5965
5966     All parameters take effect only at the next restart of the instance.
5967
5968     """
5969     # Process here the warnings from CheckPrereq, as we don't have a
5970     # feedback_fn there.
5971     for warn in self.warn:
5972       feedback_fn("WARNING: %s" % warn)
5973
5974     result = []
5975     instance = self.instance
5976     # disk changes
5977     for disk_op, disk_dict in self.op.disks:
5978       if disk_op == constants.DDM_REMOVE:
5979         # remove the last disk
5980         device = instance.disks.pop()
5981         device_idx = len(instance.disks)
5982         for node, disk in device.ComputeNodeTree(instance.primary_node):
5983           self.cfg.SetDiskID(disk, node)
5984           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
5985           if msg:
5986             self.LogWarning("Could not remove disk/%d on node %s: %s,"
5987                             " continuing anyway", device_idx, node, msg)
5988         result.append(("disk/%d" % device_idx, "remove"))
5989       elif disk_op == constants.DDM_ADD:
5990         # add a new disk
5991         if instance.disk_template == constants.DT_FILE:
5992           file_driver, file_path = instance.disks[0].logical_id
5993           file_path = os.path.dirname(file_path)
5994         else:
5995           file_driver = file_path = None
5996         disk_idx_base = len(instance.disks)
5997         new_disk = _GenerateDiskTemplate(self,
5998                                          instance.disk_template,
5999                                          instance.name, instance.primary_node,
6000                                          instance.secondary_nodes,
6001                                          [disk_dict],
6002                                          file_path,
6003                                          file_driver,
6004                                          disk_idx_base)[0]
6005         instance.disks.append(new_disk)
6006         info = _GetInstanceInfoText(instance)
6007
6008         logging.info("Creating volume %s for instance %s",
6009                      new_disk.iv_name, instance.name)
6010         # Note: this needs to be kept in sync with _CreateDisks
6011         #HARDCODE
6012         for node in instance.all_nodes:
6013           f_create = node == instance.primary_node
6014           try:
6015             _CreateBlockDev(self, node, instance, new_disk,
6016                             f_create, info, f_create)
6017           except errors.OpExecError, err:
6018             self.LogWarning("Failed to create volume %s (%s) on"
6019                             " node %s: %s",
6020                             new_disk.iv_name, new_disk, node, err)
6021         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6022                        (new_disk.size, new_disk.mode)))
6023       else:
6024         # change a given disk
6025         instance.disks[disk_op].mode = disk_dict['mode']
6026         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6027     # NIC changes
6028     for nic_op, nic_dict in self.op.nics:
6029       if nic_op == constants.DDM_REMOVE:
6030         # remove the last nic
6031         del instance.nics[-1]
6032         result.append(("nic.%d" % len(instance.nics), "remove"))
6033       elif nic_op == constants.DDM_ADD:
6034         # mac and bridge should be set, by now
6035         mac = nic_dict['mac']
6036         bridge = nic_dict['bridge']
6037         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6038                               bridge=bridge)
6039         instance.nics.append(new_nic)
6040         result.append(("nic.%d" % (len(instance.nics) - 1),
6041                        "add:mac=%s,ip=%s,bridge=%s" %
6042                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6043       else:
6044         # change a given nic
6045         for key in 'mac', 'ip', 'bridge':
6046           if key in nic_dict:
6047             setattr(instance.nics[nic_op], key, nic_dict[key])
6048             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6049
6050     # hvparams changes
6051     if self.op.hvparams:
6052       instance.hvparams = self.hv_inst
6053       for key, val in self.op.hvparams.iteritems():
6054         result.append(("hv/%s" % key, val))
6055
6056     # beparams changes
6057     if self.op.beparams:
6058       instance.beparams = self.be_inst
6059       for key, val in self.op.beparams.iteritems():
6060         result.append(("be/%s" % key, val))
6061
6062     self.cfg.Update(instance)
6063
6064     return result
6065
6066
6067 class LUQueryExports(NoHooksLU):
6068   """Query the exports list
6069
6070   """
6071   _OP_REQP = ['nodes']
6072   REQ_BGL = False
6073
6074   def ExpandNames(self):
6075     self.needed_locks = {}
6076     self.share_locks[locking.LEVEL_NODE] = 1
6077     if not self.op.nodes:
6078       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6079     else:
6080       self.needed_locks[locking.LEVEL_NODE] = \
6081         _GetWantedNodes(self, self.op.nodes)
6082
6083   def CheckPrereq(self):
6084     """Check prerequisites.
6085
6086     """
6087     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6088
6089   def Exec(self, feedback_fn):
6090     """Compute the list of all the exported system images.
6091
6092     @rtype: dict
6093     @return: a dictionary with the structure node->(export-list)
6094         where export-list is a list of the instances exported on
6095         that node.
6096
6097     """
6098     rpcresult = self.rpc.call_export_list(self.nodes)
6099     result = {}
6100     for node in rpcresult:
6101       if rpcresult[node].failed:
6102         result[node] = False
6103       else:
6104         result[node] = rpcresult[node].data
6105
6106     return result
6107
6108
6109 class LUExportInstance(LogicalUnit):
6110   """Export an instance to an image in the cluster.
6111
6112   """
6113   HPATH = "instance-export"
6114   HTYPE = constants.HTYPE_INSTANCE
6115   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6116   REQ_BGL = False
6117
6118   def ExpandNames(self):
6119     self._ExpandAndLockInstance()
6120     # FIXME: lock only instance primary and destination node
6121     #
6122     # Sad but true, for now we have do lock all nodes, as we don't know where
6123     # the previous export might be, and and in this LU we search for it and
6124     # remove it from its current node. In the future we could fix this by:
6125     #  - making a tasklet to search (share-lock all), then create the new one,
6126     #    then one to remove, after
6127     #  - removing the removal operation altoghether
6128     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6129
6130   def DeclareLocks(self, level):
6131     """Last minute lock declaration."""
6132     # All nodes are locked anyway, so nothing to do here.
6133
6134   def BuildHooksEnv(self):
6135     """Build hooks env.
6136
6137     This will run on the master, primary node and target node.
6138
6139     """
6140     env = {
6141       "EXPORT_NODE": self.op.target_node,
6142       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6143       }
6144     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6145     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6146           self.op.target_node]
6147     return env, nl, nl
6148
6149   def CheckPrereq(self):
6150     """Check prerequisites.
6151
6152     This checks that the instance and node names are valid.
6153
6154     """
6155     instance_name = self.op.instance_name
6156     self.instance = self.cfg.GetInstanceInfo(instance_name)
6157     assert self.instance is not None, \
6158           "Cannot retrieve locked instance %s" % self.op.instance_name
6159     _CheckNodeOnline(self, self.instance.primary_node)
6160
6161     self.dst_node = self.cfg.GetNodeInfo(
6162       self.cfg.ExpandNodeName(self.op.target_node))
6163
6164     if self.dst_node is None:
6165       # This is wrong node name, not a non-locked node
6166       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6167     _CheckNodeOnline(self, self.dst_node.name)
6168     _CheckNodeNotDrained(self, self.dst_node.name)
6169
6170     # instance disk type verification
6171     for disk in self.instance.disks:
6172       if disk.dev_type == constants.LD_FILE:
6173         raise errors.OpPrereqError("Export not supported for instances with"
6174                                    " file-based disks")
6175
6176   def Exec(self, feedback_fn):
6177     """Export an instance to an image in the cluster.
6178
6179     """
6180     instance = self.instance
6181     dst_node = self.dst_node
6182     src_node = instance.primary_node
6183     if self.op.shutdown:
6184       # shutdown the instance, but not the disks
6185       result = self.rpc.call_instance_shutdown(src_node, instance)
6186       msg = result.RemoteFailMsg()
6187       if msg:
6188         raise errors.OpExecError("Could not shutdown instance %s on"
6189                                  " node %s: %s" %
6190                                  (instance.name, src_node, msg))
6191
6192     vgname = self.cfg.GetVGName()
6193
6194     snap_disks = []
6195
6196     # set the disks ID correctly since call_instance_start needs the
6197     # correct drbd minor to create the symlinks
6198     for disk in instance.disks:
6199       self.cfg.SetDiskID(disk, src_node)
6200
6201     try:
6202       for disk in instance.disks:
6203         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6204         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6205         if new_dev_name.failed or not new_dev_name.data:
6206           self.LogWarning("Could not snapshot block device %s on node %s",
6207                           disk.logical_id[1], src_node)
6208           snap_disks.append(False)
6209         else:
6210           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6211                                  logical_id=(vgname, new_dev_name.data),
6212                                  physical_id=(vgname, new_dev_name.data),
6213                                  iv_name=disk.iv_name)
6214           snap_disks.append(new_dev)
6215
6216     finally:
6217       if self.op.shutdown and instance.admin_up:
6218         result = self.rpc.call_instance_start(src_node, instance)
6219         msg = result.RemoteFailMsg()
6220         if msg:
6221           _ShutdownInstanceDisks(self, instance)
6222           raise errors.OpExecError("Could not start instance: %s" % msg)
6223
6224     # TODO: check for size
6225
6226     cluster_name = self.cfg.GetClusterName()
6227     for idx, dev in enumerate(snap_disks):
6228       if dev:
6229         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6230                                                instance, cluster_name, idx)
6231         if result.failed or not result.data:
6232           self.LogWarning("Could not export block device %s from node %s to"
6233                           " node %s", dev.logical_id[1], src_node,
6234                           dst_node.name)
6235         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6236         if msg:
6237           self.LogWarning("Could not remove snapshot block device %s from node"
6238                           " %s: %s", dev.logical_id[1], src_node, msg)
6239
6240     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6241     if result.failed or not result.data:
6242       self.LogWarning("Could not finalize export for instance %s on node %s",
6243                       instance.name, dst_node.name)
6244
6245     nodelist = self.cfg.GetNodeList()
6246     nodelist.remove(dst_node.name)
6247
6248     # on one-node clusters nodelist will be empty after the removal
6249     # if we proceed the backup would be removed because OpQueryExports
6250     # substitutes an empty list with the full cluster node list.
6251     if nodelist:
6252       exportlist = self.rpc.call_export_list(nodelist)
6253       for node in exportlist:
6254         if exportlist[node].failed:
6255           continue
6256         if instance.name in exportlist[node].data:
6257           if not self.rpc.call_export_remove(node, instance.name):
6258             self.LogWarning("Could not remove older export for instance %s"
6259                             " on node %s", instance.name, node)
6260
6261
6262 class LURemoveExport(NoHooksLU):
6263   """Remove exports related to the named instance.
6264
6265   """
6266   _OP_REQP = ["instance_name"]
6267   REQ_BGL = False
6268
6269   def ExpandNames(self):
6270     self.needed_locks = {}
6271     # We need all nodes to be locked in order for RemoveExport to work, but we
6272     # don't need to lock the instance itself, as nothing will happen to it (and
6273     # we can remove exports also for a removed instance)
6274     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6275
6276   def CheckPrereq(self):
6277     """Check prerequisites.
6278     """
6279     pass
6280
6281   def Exec(self, feedback_fn):
6282     """Remove any export.
6283
6284     """
6285     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6286     # If the instance was not found we'll try with the name that was passed in.
6287     # This will only work if it was an FQDN, though.
6288     fqdn_warn = False
6289     if not instance_name:
6290       fqdn_warn = True
6291       instance_name = self.op.instance_name
6292
6293     exportlist = self.rpc.call_export_list(self.acquired_locks[
6294       locking.LEVEL_NODE])
6295     found = False
6296     for node in exportlist:
6297       if exportlist[node].failed:
6298         self.LogWarning("Failed to query node %s, continuing" % node)
6299         continue
6300       if instance_name in exportlist[node].data:
6301         found = True
6302         result = self.rpc.call_export_remove(node, instance_name)
6303         if result.failed or not result.data:
6304           logging.error("Could not remove export for instance %s"
6305                         " on node %s", instance_name, node)
6306
6307     if fqdn_warn and not found:
6308       feedback_fn("Export not found. If trying to remove an export belonging"
6309                   " to a deleted instance please use its Fully Qualified"
6310                   " Domain Name.")
6311
6312
6313 class TagsLU(NoHooksLU):
6314   """Generic tags LU.
6315
6316   This is an abstract class which is the parent of all the other tags LUs.
6317
6318   """
6319
6320   def ExpandNames(self):
6321     self.needed_locks = {}
6322     if self.op.kind == constants.TAG_NODE:
6323       name = self.cfg.ExpandNodeName(self.op.name)
6324       if name is None:
6325         raise errors.OpPrereqError("Invalid node name (%s)" %
6326                                    (self.op.name,))
6327       self.op.name = name
6328       self.needed_locks[locking.LEVEL_NODE] = name
6329     elif self.op.kind == constants.TAG_INSTANCE:
6330       name = self.cfg.ExpandInstanceName(self.op.name)
6331       if name is None:
6332         raise errors.OpPrereqError("Invalid instance name (%s)" %
6333                                    (self.op.name,))
6334       self.op.name = name
6335       self.needed_locks[locking.LEVEL_INSTANCE] = name
6336
6337   def CheckPrereq(self):
6338     """Check prerequisites.
6339
6340     """
6341     if self.op.kind == constants.TAG_CLUSTER:
6342       self.target = self.cfg.GetClusterInfo()
6343     elif self.op.kind == constants.TAG_NODE:
6344       self.target = self.cfg.GetNodeInfo(self.op.name)
6345     elif self.op.kind == constants.TAG_INSTANCE:
6346       self.target = self.cfg.GetInstanceInfo(self.op.name)
6347     else:
6348       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6349                                  str(self.op.kind))
6350
6351
6352 class LUGetTags(TagsLU):
6353   """Returns the tags of a given object.
6354
6355   """
6356   _OP_REQP = ["kind", "name"]
6357   REQ_BGL = False
6358
6359   def Exec(self, feedback_fn):
6360     """Returns the tag list.
6361
6362     """
6363     return list(self.target.GetTags())
6364
6365
6366 class LUSearchTags(NoHooksLU):
6367   """Searches the tags for a given pattern.
6368
6369   """
6370   _OP_REQP = ["pattern"]
6371   REQ_BGL = False
6372
6373   def ExpandNames(self):
6374     self.needed_locks = {}
6375
6376   def CheckPrereq(self):
6377     """Check prerequisites.
6378
6379     This checks the pattern passed for validity by compiling it.
6380
6381     """
6382     try:
6383       self.re = re.compile(self.op.pattern)
6384     except re.error, err:
6385       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6386                                  (self.op.pattern, err))
6387
6388   def Exec(self, feedback_fn):
6389     """Returns the tag list.
6390
6391     """
6392     cfg = self.cfg
6393     tgts = [("/cluster", cfg.GetClusterInfo())]
6394     ilist = cfg.GetAllInstancesInfo().values()
6395     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6396     nlist = cfg.GetAllNodesInfo().values()
6397     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6398     results = []
6399     for path, target in tgts:
6400       for tag in target.GetTags():
6401         if self.re.search(tag):
6402           results.append((path, tag))
6403     return results
6404
6405
6406 class LUAddTags(TagsLU):
6407   """Sets a tag on a given object.
6408
6409   """
6410   _OP_REQP = ["kind", "name", "tags"]
6411   REQ_BGL = False
6412
6413   def CheckPrereq(self):
6414     """Check prerequisites.
6415
6416     This checks the type and length of the tag name and value.
6417
6418     """
6419     TagsLU.CheckPrereq(self)
6420     for tag in self.op.tags:
6421       objects.TaggableObject.ValidateTag(tag)
6422
6423   def Exec(self, feedback_fn):
6424     """Sets the tag.
6425
6426     """
6427     try:
6428       for tag in self.op.tags:
6429         self.target.AddTag(tag)
6430     except errors.TagError, err:
6431       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6432     try:
6433       self.cfg.Update(self.target)
6434     except errors.ConfigurationError:
6435       raise errors.OpRetryError("There has been a modification to the"
6436                                 " config file and the operation has been"
6437                                 " aborted. Please retry.")
6438
6439
6440 class LUDelTags(TagsLU):
6441   """Delete a list of tags from a given object.
6442
6443   """
6444   _OP_REQP = ["kind", "name", "tags"]
6445   REQ_BGL = False
6446
6447   def CheckPrereq(self):
6448     """Check prerequisites.
6449
6450     This checks that we have the given tag.
6451
6452     """
6453     TagsLU.CheckPrereq(self)
6454     for tag in self.op.tags:
6455       objects.TaggableObject.ValidateTag(tag)
6456     del_tags = frozenset(self.op.tags)
6457     cur_tags = self.target.GetTags()
6458     if not del_tags <= cur_tags:
6459       diff_tags = del_tags - cur_tags
6460       diff_names = ["'%s'" % tag for tag in diff_tags]
6461       diff_names.sort()
6462       raise errors.OpPrereqError("Tag(s) %s not found" %
6463                                  (",".join(diff_names)))
6464
6465   def Exec(self, feedback_fn):
6466     """Remove the tag from the object.
6467
6468     """
6469     for tag in self.op.tags:
6470       self.target.RemoveTag(tag)
6471     try:
6472       self.cfg.Update(self.target)
6473     except errors.ConfigurationError:
6474       raise errors.OpRetryError("There has been a modification to the"
6475                                 " config file and the operation has been"
6476                                 " aborted. Please retry.")
6477
6478
6479 class LUTestDelay(NoHooksLU):
6480   """Sleep for a specified amount of time.
6481
6482   This LU sleeps on the master and/or nodes for a specified amount of
6483   time.
6484
6485   """
6486   _OP_REQP = ["duration", "on_master", "on_nodes"]
6487   REQ_BGL = False
6488
6489   def ExpandNames(self):
6490     """Expand names and set required locks.
6491
6492     This expands the node list, if any.
6493
6494     """
6495     self.needed_locks = {}
6496     if self.op.on_nodes:
6497       # _GetWantedNodes can be used here, but is not always appropriate to use
6498       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6499       # more information.
6500       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6501       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6502
6503   def CheckPrereq(self):
6504     """Check prerequisites.
6505
6506     """
6507
6508   def Exec(self, feedback_fn):
6509     """Do the actual sleep.
6510
6511     """
6512     if self.op.on_master:
6513       if not utils.TestDelay(self.op.duration):
6514         raise errors.OpExecError("Error during master delay test")
6515     if self.op.on_nodes:
6516       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6517       if not result:
6518         raise errors.OpExecError("Complete failure from rpc call")
6519       for node, node_result in result.items():
6520         node_result.Raise()
6521         if not node_result.data:
6522           raise errors.OpExecError("Failure during rpc call to node %s,"
6523                                    " result: %s" % (node, node_result.data))
6524
6525
6526 class IAllocator(object):
6527   """IAllocator framework.
6528
6529   An IAllocator instance has three sets of attributes:
6530     - cfg that is needed to query the cluster
6531     - input data (all members of the _KEYS class attribute are required)
6532     - four buffer attributes (in|out_data|text), that represent the
6533       input (to the external script) in text and data structure format,
6534       and the output from it, again in two formats
6535     - the result variables from the script (success, info, nodes) for
6536       easy usage
6537
6538   """
6539   _ALLO_KEYS = [
6540     "mem_size", "disks", "disk_template",
6541     "os", "tags", "nics", "vcpus", "hypervisor",
6542     ]
6543   _RELO_KEYS = [
6544     "relocate_from",
6545     ]
6546
6547   def __init__(self, lu, mode, name, **kwargs):
6548     self.lu = lu
6549     # init buffer variables
6550     self.in_text = self.out_text = self.in_data = self.out_data = None
6551     # init all input fields so that pylint is happy
6552     self.mode = mode
6553     self.name = name
6554     self.mem_size = self.disks = self.disk_template = None
6555     self.os = self.tags = self.nics = self.vcpus = None
6556     self.hypervisor = None
6557     self.relocate_from = None
6558     # computed fields
6559     self.required_nodes = None
6560     # init result fields
6561     self.success = self.info = self.nodes = None
6562     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6563       keyset = self._ALLO_KEYS
6564     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6565       keyset = self._RELO_KEYS
6566     else:
6567       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6568                                    " IAllocator" % self.mode)
6569     for key in kwargs:
6570       if key not in keyset:
6571         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6572                                      " IAllocator" % key)
6573       setattr(self, key, kwargs[key])
6574     for key in keyset:
6575       if key not in kwargs:
6576         raise errors.ProgrammerError("Missing input parameter '%s' to"
6577                                      " IAllocator" % key)
6578     self._BuildInputData()
6579
6580   def _ComputeClusterData(self):
6581     """Compute the generic allocator input data.
6582
6583     This is the data that is independent of the actual operation.
6584
6585     """
6586     cfg = self.lu.cfg
6587     cluster_info = cfg.GetClusterInfo()
6588     # cluster data
6589     data = {
6590       "version": 1,
6591       "cluster_name": cfg.GetClusterName(),
6592       "cluster_tags": list(cluster_info.GetTags()),
6593       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6594       # we don't have job IDs
6595       }
6596     iinfo = cfg.GetAllInstancesInfo().values()
6597     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6598
6599     # node data
6600     node_results = {}
6601     node_list = cfg.GetNodeList()
6602
6603     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6604       hypervisor_name = self.hypervisor
6605     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6606       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6607
6608     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6609                                            hypervisor_name)
6610     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6611                        cluster_info.enabled_hypervisors)
6612     for nname, nresult in node_data.items():
6613       # first fill in static (config-based) values
6614       ninfo = cfg.GetNodeInfo(nname)
6615       pnr = {
6616         "tags": list(ninfo.GetTags()),
6617         "primary_ip": ninfo.primary_ip,
6618         "secondary_ip": ninfo.secondary_ip,
6619         "offline": ninfo.offline,
6620         "drained": ninfo.drained,
6621         "master_candidate": ninfo.master_candidate,
6622         }
6623
6624       if not ninfo.offline:
6625         nresult.Raise()
6626         if not isinstance(nresult.data, dict):
6627           raise errors.OpExecError("Can't get data for node %s" % nname)
6628         remote_info = nresult.data
6629         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6630                      'vg_size', 'vg_free', 'cpu_total']:
6631           if attr not in remote_info:
6632             raise errors.OpExecError("Node '%s' didn't return attribute"
6633                                      " '%s'" % (nname, attr))
6634           try:
6635             remote_info[attr] = int(remote_info[attr])
6636           except ValueError, err:
6637             raise errors.OpExecError("Node '%s' returned invalid value"
6638                                      " for '%s': %s" % (nname, attr, err))
6639         # compute memory used by primary instances
6640         i_p_mem = i_p_up_mem = 0
6641         for iinfo, beinfo in i_list:
6642           if iinfo.primary_node == nname:
6643             i_p_mem += beinfo[constants.BE_MEMORY]
6644             if iinfo.name not in node_iinfo[nname].data:
6645               i_used_mem = 0
6646             else:
6647               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6648             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6649             remote_info['memory_free'] -= max(0, i_mem_diff)
6650
6651             if iinfo.admin_up:
6652               i_p_up_mem += beinfo[constants.BE_MEMORY]
6653
6654         # compute memory used by instances
6655         pnr_dyn = {
6656           "total_memory": remote_info['memory_total'],
6657           "reserved_memory": remote_info['memory_dom0'],
6658           "free_memory": remote_info['memory_free'],
6659           "total_disk": remote_info['vg_size'],
6660           "free_disk": remote_info['vg_free'],
6661           "total_cpus": remote_info['cpu_total'],
6662           "i_pri_memory": i_p_mem,
6663           "i_pri_up_memory": i_p_up_mem,
6664           }
6665         pnr.update(pnr_dyn)
6666
6667       node_results[nname] = pnr
6668     data["nodes"] = node_results
6669
6670     # instance data
6671     instance_data = {}
6672     for iinfo, beinfo in i_list:
6673       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6674                   for n in iinfo.nics]
6675       pir = {
6676         "tags": list(iinfo.GetTags()),
6677         "admin_up": iinfo.admin_up,
6678         "vcpus": beinfo[constants.BE_VCPUS],
6679         "memory": beinfo[constants.BE_MEMORY],
6680         "os": iinfo.os,
6681         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6682         "nics": nic_data,
6683         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6684         "disk_template": iinfo.disk_template,
6685         "hypervisor": iinfo.hypervisor,
6686         }
6687       instance_data[iinfo.name] = pir
6688
6689     data["instances"] = instance_data
6690
6691     self.in_data = data
6692
6693   def _AddNewInstance(self):
6694     """Add new instance data to allocator structure.
6695
6696     This in combination with _AllocatorGetClusterData will create the
6697     correct structure needed as input for the allocator.
6698
6699     The checks for the completeness of the opcode must have already been
6700     done.
6701
6702     """
6703     data = self.in_data
6704
6705     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6706
6707     if self.disk_template in constants.DTS_NET_MIRROR:
6708       self.required_nodes = 2
6709     else:
6710       self.required_nodes = 1
6711     request = {
6712       "type": "allocate",
6713       "name": self.name,
6714       "disk_template": self.disk_template,
6715       "tags": self.tags,
6716       "os": self.os,
6717       "vcpus": self.vcpus,
6718       "memory": self.mem_size,
6719       "disks": self.disks,
6720       "disk_space_total": disk_space,
6721       "nics": self.nics,
6722       "required_nodes": self.required_nodes,
6723       }
6724     data["request"] = request
6725
6726   def _AddRelocateInstance(self):
6727     """Add relocate instance data to allocator structure.
6728
6729     This in combination with _IAllocatorGetClusterData will create the
6730     correct structure needed as input for the allocator.
6731
6732     The checks for the completeness of the opcode must have already been
6733     done.
6734
6735     """
6736     instance = self.lu.cfg.GetInstanceInfo(self.name)
6737     if instance is None:
6738       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6739                                    " IAllocator" % self.name)
6740
6741     if instance.disk_template not in constants.DTS_NET_MIRROR:
6742       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6743
6744     if len(instance.secondary_nodes) != 1:
6745       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6746
6747     self.required_nodes = 1
6748     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6749     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6750
6751     request = {
6752       "type": "relocate",
6753       "name": self.name,
6754       "disk_space_total": disk_space,
6755       "required_nodes": self.required_nodes,
6756       "relocate_from": self.relocate_from,
6757       }
6758     self.in_data["request"] = request
6759
6760   def _BuildInputData(self):
6761     """Build input data structures.
6762
6763     """
6764     self._ComputeClusterData()
6765
6766     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6767       self._AddNewInstance()
6768     else:
6769       self._AddRelocateInstance()
6770
6771     self.in_text = serializer.Dump(self.in_data)
6772
6773   def Run(self, name, validate=True, call_fn=None):
6774     """Run an instance allocator and return the results.
6775
6776     """
6777     if call_fn is None:
6778       call_fn = self.lu.rpc.call_iallocator_runner
6779     data = self.in_text
6780
6781     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6782     result.Raise()
6783
6784     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6785       raise errors.OpExecError("Invalid result from master iallocator runner")
6786
6787     rcode, stdout, stderr, fail = result.data
6788
6789     if rcode == constants.IARUN_NOTFOUND:
6790       raise errors.OpExecError("Can't find allocator '%s'" % name)
6791     elif rcode == constants.IARUN_FAILURE:
6792       raise errors.OpExecError("Instance allocator call failed: %s,"
6793                                " output: %s" % (fail, stdout+stderr))
6794     self.out_text = stdout
6795     if validate:
6796       self._ValidateResult()
6797
6798   def _ValidateResult(self):
6799     """Process the allocator results.
6800
6801     This will process and if successful save the result in
6802     self.out_data and the other parameters.
6803
6804     """
6805     try:
6806       rdict = serializer.Load(self.out_text)
6807     except Exception, err:
6808       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6809
6810     if not isinstance(rdict, dict):
6811       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6812
6813     for key in "success", "info", "nodes":
6814       if key not in rdict:
6815         raise errors.OpExecError("Can't parse iallocator results:"
6816                                  " missing key '%s'" % key)
6817       setattr(self, key, rdict[key])
6818
6819     if not isinstance(rdict["nodes"], list):
6820       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6821                                " is not a list")
6822     self.out_data = rdict
6823
6824
6825 class LUTestAllocator(NoHooksLU):
6826   """Run allocator tests.
6827
6828   This LU runs the allocator tests
6829
6830   """
6831   _OP_REQP = ["direction", "mode", "name"]
6832
6833   def CheckPrereq(self):
6834     """Check prerequisites.
6835
6836     This checks the opcode parameters depending on the director and mode test.
6837
6838     """
6839     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6840       for attr in ["name", "mem_size", "disks", "disk_template",
6841                    "os", "tags", "nics", "vcpus"]:
6842         if not hasattr(self.op, attr):
6843           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6844                                      attr)
6845       iname = self.cfg.ExpandInstanceName(self.op.name)
6846       if iname is not None:
6847         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6848                                    iname)
6849       if not isinstance(self.op.nics, list):
6850         raise errors.OpPrereqError("Invalid parameter 'nics'")
6851       for row in self.op.nics:
6852         if (not isinstance(row, dict) or
6853             "mac" not in row or
6854             "ip" not in row or
6855             "bridge" not in row):
6856           raise errors.OpPrereqError("Invalid contents of the"
6857                                      " 'nics' parameter")
6858       if not isinstance(self.op.disks, list):
6859         raise errors.OpPrereqError("Invalid parameter 'disks'")
6860       for row in self.op.disks:
6861         if (not isinstance(row, dict) or
6862             "size" not in row or
6863             not isinstance(row["size"], int) or
6864             "mode" not in row or
6865             row["mode"] not in ['r', 'w']):
6866           raise errors.OpPrereqError("Invalid contents of the"
6867                                      " 'disks' parameter")
6868       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6869         self.op.hypervisor = self.cfg.GetHypervisorType()
6870     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6871       if not hasattr(self.op, "name"):
6872         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6873       fname = self.cfg.ExpandInstanceName(self.op.name)
6874       if fname is None:
6875         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6876                                    self.op.name)
6877       self.op.name = fname
6878       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6879     else:
6880       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6881                                  self.op.mode)
6882
6883     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6884       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6885         raise errors.OpPrereqError("Missing allocator name")
6886     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6887       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6888                                  self.op.direction)
6889
6890   def Exec(self, feedback_fn):
6891     """Run the allocator test.
6892
6893     """
6894     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6895       ial = IAllocator(self,
6896                        mode=self.op.mode,
6897                        name=self.op.name,
6898                        mem_size=self.op.mem_size,
6899                        disks=self.op.disks,
6900                        disk_template=self.op.disk_template,
6901                        os=self.op.os,
6902                        tags=self.op.tags,
6903                        nics=self.op.nics,
6904                        vcpus=self.op.vcpus,
6905                        hypervisor=self.op.hypervisor,
6906                        )
6907     else:
6908       ial = IAllocator(self,
6909                        mode=self.op.mode,
6910                        name=self.op.name,
6911                        relocate_from=list(self.relocate_from),
6912                        )
6913
6914     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6915       result = ial.in_text
6916     else:
6917       ial.Run(self.op.allocator, validate=False)
6918       result = ial.out_text
6919     return result