Update some hooks settings
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396   return wanted
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _CheckBooleanOpField(op, name):
419   """Validates boolean opcode parameters.
420
421   This will ensure that an opcode parameter is either a boolean value,
422   or None (but that it always exists).
423
424   """
425   val = getattr(op, name, None)
426   if not (val is None or isinstance(val, bool)):
427     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428                                (name, str(val)))
429   setattr(op, name, val)
430
431
432 def _CheckNodeOnline(lu, node):
433   """Ensure that a given node is online.
434
435   @param lu: the LU on behalf of which we make the check
436   @param node: the node to check
437   @raise errors.OpPrereqError: if the node is offline
438
439   """
440   if lu.cfg.GetNodeInfo(node).offline:
441     raise errors.OpPrereqError("Can't use offline node %s" % node)
442
443
444 def _CheckNodeNotDrained(lu, node):
445   """Ensure that a given node is not drained.
446
447   @param lu: the LU on behalf of which we make the check
448   @param node: the node to check
449   @raise errors.OpPrereqError: if the node is drained
450
451   """
452   if lu.cfg.GetNodeInfo(node).drained:
453     raise errors.OpPrereqError("Can't use drained node %s" % node)
454
455
456 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457                           memory, vcpus, nics, disk_template, disks):
458   """Builds instance related env variables for hooks
459
460   This builds the hook environment from individual variables.
461
462   @type name: string
463   @param name: the name of the instance
464   @type primary_node: string
465   @param primary_node: the name of the instance's primary node
466   @type secondary_nodes: list
467   @param secondary_nodes: list of secondary nodes as strings
468   @type os_type: string
469   @param os_type: the name of the instance's OS
470   @type status: boolean
471   @param status: the should_run status of the instance
472   @type memory: string
473   @param memory: the memory size of the instance
474   @type vcpus: string
475   @param vcpus: the count of VCPUs the instance has
476   @type nics: list
477   @param nics: list of tuples (ip, bridge, mac) representing
478       the NICs the instance  has
479   @type disk_template: string
480   @param disk_template: the distk template of the instance
481   @type disks: list
482   @param disks: the list of (size, mode) pairs
483   @rtype: dict
484   @return: the hook environment for this instance
485
486   """
487   if status:
488     str_status = "up"
489   else:
490     str_status = "down"
491   env = {
492     "OP_TARGET": name,
493     "INSTANCE_NAME": name,
494     "INSTANCE_PRIMARY": primary_node,
495     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
496     "INSTANCE_OS_TYPE": os_type,
497     "INSTANCE_STATUS": str_status,
498     "INSTANCE_MEMORY": memory,
499     "INSTANCE_VCPUS": vcpus,
500     "INSTANCE_DISK_TEMPLATE": disk_template,
501   }
502
503   if nics:
504     nic_count = len(nics)
505     for idx, (ip, bridge, mac) in enumerate(nics):
506       if ip is None:
507         ip = ""
508       env["INSTANCE_NIC%d_IP" % idx] = ip
509       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
510       env["INSTANCE_NIC%d_MAC" % idx] = mac
511   else:
512     nic_count = 0
513
514   env["INSTANCE_NIC_COUNT"] = nic_count
515
516   if disks:
517     disk_count = len(disks)
518     for idx, (size, mode) in enumerate(disks):
519       env["INSTANCE_DISK%d_SIZE" % idx] = size
520       env["INSTANCE_DISK%d_MODE" % idx] = mode
521   else:
522     disk_count = 0
523
524   env["INSTANCE_DISK_COUNT"] = disk_count
525
526   return env
527
528
529 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
530   """Builds instance related env variables for hooks from an object.
531
532   @type lu: L{LogicalUnit}
533   @param lu: the logical unit on whose behalf we execute
534   @type instance: L{objects.Instance}
535   @param instance: the instance for which we should build the
536       environment
537   @type override: dict
538   @param override: dictionary with key/values that will override
539       our values
540   @rtype: dict
541   @return: the hook environment dictionary
542
543   """
544   bep = lu.cfg.GetClusterInfo().FillBE(instance)
545   args = {
546     'name': instance.name,
547     'primary_node': instance.primary_node,
548     'secondary_nodes': instance.secondary_nodes,
549     'os_type': instance.os,
550     'status': instance.admin_up,
551     'memory': bep[constants.BE_MEMORY],
552     'vcpus': bep[constants.BE_VCPUS],
553     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
554     'disk_template': instance.disk_template,
555     'disks': [(disk.size, disk.mode) for disk in instance.disks],
556   }
557   if override:
558     args.update(override)
559   return _BuildInstanceHookEnv(**args)
560
561
562 def _AdjustCandidatePool(lu):
563   """Adjust the candidate pool after node operations.
564
565   """
566   mod_list = lu.cfg.MaintainCandidatePool()
567   if mod_list:
568     lu.LogInfo("Promoted nodes to master candidate role: %s",
569                ", ".join(node.name for node in mod_list))
570     for name in mod_list:
571       lu.context.ReaddNode(name)
572   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
573   if mc_now > mc_max:
574     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
575                (mc_now, mc_max))
576
577
578 def _CheckInstanceBridgesExist(lu, instance):
579   """Check that the brigdes needed by an instance exist.
580
581   """
582   # check bridges existance
583   brlist = [nic.bridge for nic in instance.nics]
584   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
585   result.Raise()
586   if not result.data:
587     raise errors.OpPrereqError("One or more target bridges %s does not"
588                                " exist on destination node '%s'" %
589                                (brlist, instance.primary_node))
590
591
592 class LUDestroyCluster(NoHooksLU):
593   """Logical unit for destroying the cluster.
594
595   """
596   _OP_REQP = []
597
598   def CheckPrereq(self):
599     """Check prerequisites.
600
601     This checks whether the cluster is empty.
602
603     Any errors are signalled by raising errors.OpPrereqError.
604
605     """
606     master = self.cfg.GetMasterNode()
607
608     nodelist = self.cfg.GetNodeList()
609     if len(nodelist) != 1 or nodelist[0] != master:
610       raise errors.OpPrereqError("There are still %d node(s) in"
611                                  " this cluster." % (len(nodelist) - 1))
612     instancelist = self.cfg.GetInstanceList()
613     if instancelist:
614       raise errors.OpPrereqError("There are still %d instance(s) in"
615                                  " this cluster." % len(instancelist))
616
617   def Exec(self, feedback_fn):
618     """Destroys the cluster.
619
620     """
621     master = self.cfg.GetMasterNode()
622     result = self.rpc.call_node_stop_master(master, False)
623     result.Raise()
624     if not result.data:
625       raise errors.OpExecError("Could not disable the master role")
626     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
627     utils.CreateBackup(priv_key)
628     utils.CreateBackup(pub_key)
629     return master
630
631
632 class LUVerifyCluster(LogicalUnit):
633   """Verifies the cluster status.
634
635   """
636   HPATH = "cluster-verify"
637   HTYPE = constants.HTYPE_CLUSTER
638   _OP_REQP = ["skip_checks"]
639   REQ_BGL = False
640
641   def ExpandNames(self):
642     self.needed_locks = {
643       locking.LEVEL_NODE: locking.ALL_SET,
644       locking.LEVEL_INSTANCE: locking.ALL_SET,
645     }
646     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
647
648   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
649                   node_result, feedback_fn, master_files,
650                   drbd_map):
651     """Run multiple tests against a node.
652
653     Test list:
654
655       - compares ganeti version
656       - checks vg existance and size > 20G
657       - checks config file checksum
658       - checks ssh to other nodes
659
660     @type nodeinfo: L{objects.Node}
661     @param nodeinfo: the node to check
662     @param file_list: required list of files
663     @param local_cksum: dictionary of local files and their checksums
664     @param node_result: the results from the node
665     @param feedback_fn: function used to accumulate results
666     @param master_files: list of files that only masters should have
667     @param drbd_map: the useddrbd minors for this node, in
668         form of minor: (instance, must_exist) which correspond to instances
669         and their running status
670
671     """
672     node = nodeinfo.name
673
674     # main result, node_result should be a non-empty dict
675     if not node_result or not isinstance(node_result, dict):
676       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
677       return True
678
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     remote_version = node_result.get('version', None)
682     if not (remote_version and isinstance(remote_version, (list, tuple)) and
683             len(remote_version) == 2):
684       feedback_fn("  - ERROR: connection to %s failed" % (node))
685       return True
686
687     if local_version != remote_version[0]:
688       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
689                   " node %s %s" % (local_version, node, remote_version[0]))
690       return True
691
692     # node seems compatible, we can actually try to look into its results
693
694     bad = False
695
696     # full package version
697     if constants.RELEASE_VERSION != remote_version[1]:
698       feedback_fn("  - WARNING: software version mismatch: master %s,"
699                   " node %s %s" %
700                   (constants.RELEASE_VERSION, node, remote_version[1]))
701
702     # checks vg existence and size > 20G
703
704     vglist = node_result.get(constants.NV_VGLIST, None)
705     if not vglist:
706       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
707                       (node,))
708       bad = True
709     else:
710       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
711                                             constants.MIN_VG_SIZE)
712       if vgstatus:
713         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
714         bad = True
715
716     # checks config file checksum
717
718     remote_cksum = node_result.get(constants.NV_FILELIST, None)
719     if not isinstance(remote_cksum, dict):
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned file checksum data")
722     else:
723       for file_name in file_list:
724         node_is_mc = nodeinfo.master_candidate
725         must_have_file = file_name not in master_files
726         if file_name not in remote_cksum:
727           if node_is_mc or must_have_file:
728             bad = True
729             feedback_fn("  - ERROR: file '%s' missing" % file_name)
730         elif remote_cksum[file_name] != local_cksum[file_name]:
731           if node_is_mc or must_have_file:
732             bad = True
733             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
734           else:
735             # not candidate and this is not a must-have file
736             bad = True
737             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
738                         " '%s'" % file_name)
739         else:
740           # all good, except non-master/non-must have combination
741           if not node_is_mc and not must_have_file:
742             feedback_fn("  - ERROR: file '%s' should not exist on non master"
743                         " candidates" % file_name)
744
745     # checks ssh to any
746
747     if constants.NV_NODELIST not in node_result:
748       bad = True
749       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
750     else:
751       if node_result[constants.NV_NODELIST]:
752         bad = True
753         for node in node_result[constants.NV_NODELIST]:
754           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
755                           (node, node_result[constants.NV_NODELIST][node]))
756
757     if constants.NV_NODENETTEST not in node_result:
758       bad = True
759       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
760     else:
761       if node_result[constants.NV_NODENETTEST]:
762         bad = True
763         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
764         for node in nlist:
765           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
766                           (node, node_result[constants.NV_NODENETTEST][node]))
767
768     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
769     if isinstance(hyp_result, dict):
770       for hv_name, hv_result in hyp_result.iteritems():
771         if hv_result is not None:
772           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
773                       (hv_name, hv_result))
774
775     # check used drbd list
776     used_minors = node_result.get(constants.NV_DRBDLIST, [])
777     if not isinstance(used_minors, (tuple, list)):
778       feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
779                   str(used_minors))
780     else:
781       for minor, (iname, must_exist) in drbd_map.items():
782         if minor not in used_minors and must_exist:
783           feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
784                       (minor, iname))
785           bad = True
786       for minor in used_minors:
787         if minor not in drbd_map:
788           feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
789           bad = True
790
791     return bad
792
793   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
794                       node_instance, feedback_fn, n_offline):
795     """Verify an instance.
796
797     This function checks to see if the required block devices are
798     available on the instance's node.
799
800     """
801     bad = False
802
803     node_current = instanceconfig.primary_node
804
805     node_vol_should = {}
806     instanceconfig.MapLVsByNode(node_vol_should)
807
808     for node in node_vol_should:
809       if node in n_offline:
810         # ignore missing volumes on offline nodes
811         continue
812       for volume in node_vol_should[node]:
813         if node not in node_vol_is or volume not in node_vol_is[node]:
814           feedback_fn("  - ERROR: volume %s missing on node %s" %
815                           (volume, node))
816           bad = True
817
818     if instanceconfig.admin_up:
819       if ((node_current not in node_instance or
820           not instance in node_instance[node_current]) and
821           node_current not in n_offline):
822         feedback_fn("  - ERROR: instance %s not running on node %s" %
823                         (instance, node_current))
824         bad = True
825
826     for node in node_instance:
827       if (not node == node_current):
828         if instance in node_instance[node]:
829           feedback_fn("  - ERROR: instance %s should not run on node %s" %
830                           (instance, node))
831           bad = True
832
833     return bad
834
835   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
836     """Verify if there are any unknown volumes in the cluster.
837
838     The .os, .swap and backup volumes are ignored. All other volumes are
839     reported as unknown.
840
841     """
842     bad = False
843
844     for node in node_vol_is:
845       for volume in node_vol_is[node]:
846         if node not in node_vol_should or volume not in node_vol_should[node]:
847           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
848                       (volume, node))
849           bad = True
850     return bad
851
852   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
853     """Verify the list of running instances.
854
855     This checks what instances are running but unknown to the cluster.
856
857     """
858     bad = False
859     for node in node_instance:
860       for runninginstance in node_instance[node]:
861         if runninginstance not in instancelist:
862           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
863                           (runninginstance, node))
864           bad = True
865     return bad
866
867   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
868     """Verify N+1 Memory Resilience.
869
870     Check that if one single node dies we can still start all the instances it
871     was primary for.
872
873     """
874     bad = False
875
876     for node, nodeinfo in node_info.iteritems():
877       # This code checks that every node which is now listed as secondary has
878       # enough memory to host all instances it is supposed to should a single
879       # other node in the cluster fail.
880       # FIXME: not ready for failover to an arbitrary node
881       # FIXME: does not support file-backed instances
882       # WARNING: we currently take into account down instances as well as up
883       # ones, considering that even if they're down someone might want to start
884       # them even in the event of a node failure.
885       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
886         needed_mem = 0
887         for instance in instances:
888           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
889           if bep[constants.BE_AUTO_BALANCE]:
890             needed_mem += bep[constants.BE_MEMORY]
891         if nodeinfo['mfree'] < needed_mem:
892           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
893                       " failovers should node %s fail" % (node, prinode))
894           bad = True
895     return bad
896
897   def CheckPrereq(self):
898     """Check prerequisites.
899
900     Transform the list of checks we're going to skip into a set and check that
901     all its members are valid.
902
903     """
904     self.skip_set = frozenset(self.op.skip_checks)
905     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
906       raise errors.OpPrereqError("Invalid checks to be skipped specified")
907
908   def BuildHooksEnv(self):
909     """Build hooks env.
910
911     Cluster-Verify hooks just rone in the post phase and their failure makes
912     the output be logged in the verify output and the verification to fail.
913
914     """
915     all_nodes = self.cfg.GetNodeList()
916     # TODO: populate the environment with useful information for verify hooks
917     env = {}
918     return env, [], all_nodes
919
920   def Exec(self, feedback_fn):
921     """Verify integrity of cluster, performing various test on nodes.
922
923     """
924     bad = False
925     feedback_fn("* Verifying global settings")
926     for msg in self.cfg.VerifyConfig():
927       feedback_fn("  - ERROR: %s" % msg)
928
929     vg_name = self.cfg.GetVGName()
930     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
931     nodelist = utils.NiceSort(self.cfg.GetNodeList())
932     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
933     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
934     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
935                         for iname in instancelist)
936     i_non_redundant = [] # Non redundant instances
937     i_non_a_balanced = [] # Non auto-balanced instances
938     n_offline = [] # List of offline nodes
939     n_drained = [] # List of nodes being drained
940     node_volume = {}
941     node_instance = {}
942     node_info = {}
943     instance_cfg = {}
944
945     # FIXME: verify OS list
946     # do local checksums
947     master_files = [constants.CLUSTER_CONF_FILE]
948
949     file_names = ssconf.SimpleStore().GetFileList()
950     file_names.append(constants.SSL_CERT_FILE)
951     file_names.append(constants.RAPI_CERT_FILE)
952     file_names.extend(master_files)
953
954     local_checksums = utils.FingerprintFiles(file_names)
955
956     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
957     node_verify_param = {
958       constants.NV_FILELIST: file_names,
959       constants.NV_NODELIST: [node.name for node in nodeinfo
960                               if not node.offline],
961       constants.NV_HYPERVISOR: hypervisors,
962       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
963                                   node.secondary_ip) for node in nodeinfo
964                                  if not node.offline],
965       constants.NV_LVLIST: vg_name,
966       constants.NV_INSTANCELIST: hypervisors,
967       constants.NV_VGLIST: None,
968       constants.NV_VERSION: None,
969       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
970       constants.NV_DRBDLIST: None,
971       }
972     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
973                                            self.cfg.GetClusterName())
974
975     cluster = self.cfg.GetClusterInfo()
976     master_node = self.cfg.GetMasterNode()
977     all_drbd_map = self.cfg.ComputeDRBDMap()
978
979     for node_i in nodeinfo:
980       node = node_i.name
981       nresult = all_nvinfo[node].data
982
983       if node_i.offline:
984         feedback_fn("* Skipping offline node %s" % (node,))
985         n_offline.append(node)
986         continue
987
988       if node == master_node:
989         ntype = "master"
990       elif node_i.master_candidate:
991         ntype = "master candidate"
992       elif node_i.drained:
993         ntype = "drained"
994         n_drained.append(node)
995       else:
996         ntype = "regular"
997       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
998
999       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1000         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1001         bad = True
1002         continue
1003
1004       node_drbd = {}
1005       for minor, instance in all_drbd_map[node].items():
1006         instance = instanceinfo[instance]
1007         node_drbd[minor] = (instance.name, instance.admin_up)
1008       result = self._VerifyNode(node_i, file_names, local_checksums,
1009                                 nresult, feedback_fn, master_files,
1010                                 node_drbd)
1011       bad = bad or result
1012
1013       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1014       if isinstance(lvdata, basestring):
1015         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1016                     (node, utils.SafeEncode(lvdata)))
1017         bad = True
1018         node_volume[node] = {}
1019       elif not isinstance(lvdata, dict):
1020         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1021         bad = True
1022         continue
1023       else:
1024         node_volume[node] = lvdata
1025
1026       # node_instance
1027       idata = nresult.get(constants.NV_INSTANCELIST, None)
1028       if not isinstance(idata, list):
1029         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1030                     (node,))
1031         bad = True
1032         continue
1033
1034       node_instance[node] = idata
1035
1036       # node_info
1037       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1038       if not isinstance(nodeinfo, dict):
1039         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1040         bad = True
1041         continue
1042
1043       try:
1044         node_info[node] = {
1045           "mfree": int(nodeinfo['memory_free']),
1046           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
1047           "pinst": [],
1048           "sinst": [],
1049           # dictionary holding all instances this node is secondary for,
1050           # grouped by their primary node. Each key is a cluster node, and each
1051           # value is a list of instances which have the key as primary and the
1052           # current node as secondary.  this is handy to calculate N+1 memory
1053           # availability if you can only failover from a primary to its
1054           # secondary.
1055           "sinst-by-pnode": {},
1056         }
1057       except ValueError:
1058         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1059         bad = True
1060         continue
1061
1062     node_vol_should = {}
1063
1064     for instance in instancelist:
1065       feedback_fn("* Verifying instance %s" % instance)
1066       inst_config = instanceinfo[instance]
1067       result =  self._VerifyInstance(instance, inst_config, node_volume,
1068                                      node_instance, feedback_fn, n_offline)
1069       bad = bad or result
1070       inst_nodes_offline = []
1071
1072       inst_config.MapLVsByNode(node_vol_should)
1073
1074       instance_cfg[instance] = inst_config
1075
1076       pnode = inst_config.primary_node
1077       if pnode in node_info:
1078         node_info[pnode]['pinst'].append(instance)
1079       elif pnode not in n_offline:
1080         feedback_fn("  - ERROR: instance %s, connection to primary node"
1081                     " %s failed" % (instance, pnode))
1082         bad = True
1083
1084       if pnode in n_offline:
1085         inst_nodes_offline.append(pnode)
1086
1087       # If the instance is non-redundant we cannot survive losing its primary
1088       # node, so we are not N+1 compliant. On the other hand we have no disk
1089       # templates with more than one secondary so that situation is not well
1090       # supported either.
1091       # FIXME: does not support file-backed instances
1092       if len(inst_config.secondary_nodes) == 0:
1093         i_non_redundant.append(instance)
1094       elif len(inst_config.secondary_nodes) > 1:
1095         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1096                     % instance)
1097
1098       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1099         i_non_a_balanced.append(instance)
1100
1101       for snode in inst_config.secondary_nodes:
1102         if snode in node_info:
1103           node_info[snode]['sinst'].append(instance)
1104           if pnode not in node_info[snode]['sinst-by-pnode']:
1105             node_info[snode]['sinst-by-pnode'][pnode] = []
1106           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1107         elif snode not in n_offline:
1108           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1109                       " %s failed" % (instance, snode))
1110           bad = True
1111         if snode in n_offline:
1112           inst_nodes_offline.append(snode)
1113
1114       if inst_nodes_offline:
1115         # warn that the instance lives on offline nodes, and set bad=True
1116         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1117                     ", ".join(inst_nodes_offline))
1118         bad = True
1119
1120     feedback_fn("* Verifying orphan volumes")
1121     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1122                                        feedback_fn)
1123     bad = bad or result
1124
1125     feedback_fn("* Verifying remaining instances")
1126     result = self._VerifyOrphanInstances(instancelist, node_instance,
1127                                          feedback_fn)
1128     bad = bad or result
1129
1130     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1131       feedback_fn("* Verifying N+1 Memory redundancy")
1132       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1133       bad = bad or result
1134
1135     feedback_fn("* Other Notes")
1136     if i_non_redundant:
1137       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1138                   % len(i_non_redundant))
1139
1140     if i_non_a_balanced:
1141       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1142                   % len(i_non_a_balanced))
1143
1144     if n_offline:
1145       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1146
1147     if n_drained:
1148       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1149
1150     return not bad
1151
1152   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1153     """Analize the post-hooks' result
1154
1155     This method analyses the hook result, handles it, and sends some
1156     nicely-formatted feedback back to the user.
1157
1158     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1159         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1160     @param hooks_results: the results of the multi-node hooks rpc call
1161     @param feedback_fn: function used send feedback back to the caller
1162     @param lu_result: previous Exec result
1163     @return: the new Exec result, based on the previous result
1164         and hook results
1165
1166     """
1167     # We only really run POST phase hooks, and are only interested in
1168     # their results
1169     if phase == constants.HOOKS_PHASE_POST:
1170       # Used to change hooks' output to proper indentation
1171       indent_re = re.compile('^', re.M)
1172       feedback_fn("* Hooks Results")
1173       if not hooks_results:
1174         feedback_fn("  - ERROR: general communication failure")
1175         lu_result = 1
1176       else:
1177         for node_name in hooks_results:
1178           show_node_header = True
1179           res = hooks_results[node_name]
1180           if res.failed or res.data is False or not isinstance(res.data, list):
1181             if res.offline:
1182               # no need to warn or set fail return value
1183               continue
1184             feedback_fn("    Communication failure in hooks execution")
1185             lu_result = 1
1186             continue
1187           for script, hkr, output in res.data:
1188             if hkr == constants.HKR_FAIL:
1189               # The node header is only shown once, if there are
1190               # failing hooks on that node
1191               if show_node_header:
1192                 feedback_fn("  Node %s:" % node_name)
1193                 show_node_header = False
1194               feedback_fn("    ERROR: Script %s failed, output:" % script)
1195               output = indent_re.sub('      ', output)
1196               feedback_fn("%s" % output)
1197               lu_result = 1
1198
1199       return lu_result
1200
1201
1202 class LUVerifyDisks(NoHooksLU):
1203   """Verifies the cluster disks status.
1204
1205   """
1206   _OP_REQP = []
1207   REQ_BGL = False
1208
1209   def ExpandNames(self):
1210     self.needed_locks = {
1211       locking.LEVEL_NODE: locking.ALL_SET,
1212       locking.LEVEL_INSTANCE: locking.ALL_SET,
1213     }
1214     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1215
1216   def CheckPrereq(self):
1217     """Check prerequisites.
1218
1219     This has no prerequisites.
1220
1221     """
1222     pass
1223
1224   def Exec(self, feedback_fn):
1225     """Verify integrity of cluster disks.
1226
1227     """
1228     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1229
1230     vg_name = self.cfg.GetVGName()
1231     nodes = utils.NiceSort(self.cfg.GetNodeList())
1232     instances = [self.cfg.GetInstanceInfo(name)
1233                  for name in self.cfg.GetInstanceList()]
1234
1235     nv_dict = {}
1236     for inst in instances:
1237       inst_lvs = {}
1238       if (not inst.admin_up or
1239           inst.disk_template not in constants.DTS_NET_MIRROR):
1240         continue
1241       inst.MapLVsByNode(inst_lvs)
1242       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1243       for node, vol_list in inst_lvs.iteritems():
1244         for vol in vol_list:
1245           nv_dict[(node, vol)] = inst
1246
1247     if not nv_dict:
1248       return result
1249
1250     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1251
1252     to_act = set()
1253     for node in nodes:
1254       # node_volume
1255       lvs = node_lvs[node]
1256       if lvs.failed:
1257         if not lvs.offline:
1258           self.LogWarning("Connection to node %s failed: %s" %
1259                           (node, lvs.data))
1260         continue
1261       lvs = lvs.data
1262       if isinstance(lvs, basestring):
1263         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1264         res_nlvm[node] = lvs
1265       elif not isinstance(lvs, dict):
1266         logging.warning("Connection to node %s failed or invalid data"
1267                         " returned", node)
1268         res_nodes.append(node)
1269         continue
1270
1271       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1272         inst = nv_dict.pop((node, lv_name), None)
1273         if (not lv_online and inst is not None
1274             and inst.name not in res_instances):
1275           res_instances.append(inst.name)
1276
1277     # any leftover items in nv_dict are missing LVs, let's arrange the
1278     # data better
1279     for key, inst in nv_dict.iteritems():
1280       if inst.name not in res_missing:
1281         res_missing[inst.name] = []
1282       res_missing[inst.name].append(key)
1283
1284     return result
1285
1286
1287 class LURenameCluster(LogicalUnit):
1288   """Rename the cluster.
1289
1290   """
1291   HPATH = "cluster-rename"
1292   HTYPE = constants.HTYPE_CLUSTER
1293   _OP_REQP = ["name"]
1294
1295   def BuildHooksEnv(self):
1296     """Build hooks env.
1297
1298     """
1299     env = {
1300       "OP_TARGET": self.cfg.GetClusterName(),
1301       "NEW_NAME": self.op.name,
1302       }
1303     mn = self.cfg.GetMasterNode()
1304     return env, [mn], [mn]
1305
1306   def CheckPrereq(self):
1307     """Verify that the passed name is a valid one.
1308
1309     """
1310     hostname = utils.HostInfo(self.op.name)
1311
1312     new_name = hostname.name
1313     self.ip = new_ip = hostname.ip
1314     old_name = self.cfg.GetClusterName()
1315     old_ip = self.cfg.GetMasterIP()
1316     if new_name == old_name and new_ip == old_ip:
1317       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1318                                  " cluster has changed")
1319     if new_ip != old_ip:
1320       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1321         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1322                                    " reachable on the network. Aborting." %
1323                                    new_ip)
1324
1325     self.op.name = new_name
1326
1327   def Exec(self, feedback_fn):
1328     """Rename the cluster.
1329
1330     """
1331     clustername = self.op.name
1332     ip = self.ip
1333
1334     # shutdown the master IP
1335     master = self.cfg.GetMasterNode()
1336     result = self.rpc.call_node_stop_master(master, False)
1337     if result.failed or not result.data:
1338       raise errors.OpExecError("Could not disable the master role")
1339
1340     try:
1341       cluster = self.cfg.GetClusterInfo()
1342       cluster.cluster_name = clustername
1343       cluster.master_ip = ip
1344       self.cfg.Update(cluster)
1345
1346       # update the known hosts file
1347       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1348       node_list = self.cfg.GetNodeList()
1349       try:
1350         node_list.remove(master)
1351       except ValueError:
1352         pass
1353       result = self.rpc.call_upload_file(node_list,
1354                                          constants.SSH_KNOWN_HOSTS_FILE)
1355       for to_node, to_result in result.iteritems():
1356         if to_result.failed or not to_result.data:
1357           logging.error("Copy of file %s to node %s failed",
1358                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1359
1360     finally:
1361       result = self.rpc.call_node_start_master(master, False)
1362       if result.failed or not result.data:
1363         self.LogWarning("Could not re-enable the master role on"
1364                         " the master, please restart manually.")
1365
1366
1367 def _RecursiveCheckIfLVMBased(disk):
1368   """Check if the given disk or its children are lvm-based.
1369
1370   @type disk: L{objects.Disk}
1371   @param disk: the disk to check
1372   @rtype: booleean
1373   @return: boolean indicating whether a LD_LV dev_type was found or not
1374
1375   """
1376   if disk.children:
1377     for chdisk in disk.children:
1378       if _RecursiveCheckIfLVMBased(chdisk):
1379         return True
1380   return disk.dev_type == constants.LD_LV
1381
1382
1383 class LUSetClusterParams(LogicalUnit):
1384   """Change the parameters of the cluster.
1385
1386   """
1387   HPATH = "cluster-modify"
1388   HTYPE = constants.HTYPE_CLUSTER
1389   _OP_REQP = []
1390   REQ_BGL = False
1391
1392   def CheckParameters(self):
1393     """Check parameters
1394
1395     """
1396     if not hasattr(self.op, "candidate_pool_size"):
1397       self.op.candidate_pool_size = None
1398     if self.op.candidate_pool_size is not None:
1399       try:
1400         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1401       except ValueError, err:
1402         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1403                                    str(err))
1404       if self.op.candidate_pool_size < 1:
1405         raise errors.OpPrereqError("At least one master candidate needed")
1406
1407   def ExpandNames(self):
1408     # FIXME: in the future maybe other cluster params won't require checking on
1409     # all nodes to be modified.
1410     self.needed_locks = {
1411       locking.LEVEL_NODE: locking.ALL_SET,
1412     }
1413     self.share_locks[locking.LEVEL_NODE] = 1
1414
1415   def BuildHooksEnv(self):
1416     """Build hooks env.
1417
1418     """
1419     env = {
1420       "OP_TARGET": self.cfg.GetClusterName(),
1421       "NEW_VG_NAME": self.op.vg_name,
1422       }
1423     mn = self.cfg.GetMasterNode()
1424     return env, [mn], [mn]
1425
1426   def CheckPrereq(self):
1427     """Check prerequisites.
1428
1429     This checks whether the given params don't conflict and
1430     if the given volume group is valid.
1431
1432     """
1433     if self.op.vg_name is not None and not self.op.vg_name:
1434       instances = self.cfg.GetAllInstancesInfo().values()
1435       for inst in instances:
1436         for disk in inst.disks:
1437           if _RecursiveCheckIfLVMBased(disk):
1438             raise errors.OpPrereqError("Cannot disable lvm storage while"
1439                                        " lvm-based instances exist")
1440
1441     node_list = self.acquired_locks[locking.LEVEL_NODE]
1442
1443     # if vg_name not None, checks given volume group on all nodes
1444     if self.op.vg_name:
1445       vglist = self.rpc.call_vg_list(node_list)
1446       for node in node_list:
1447         if vglist[node].failed:
1448           # ignoring down node
1449           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1450           continue
1451         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1452                                               self.op.vg_name,
1453                                               constants.MIN_VG_SIZE)
1454         if vgstatus:
1455           raise errors.OpPrereqError("Error on node '%s': %s" %
1456                                      (node, vgstatus))
1457
1458     self.cluster = cluster = self.cfg.GetClusterInfo()
1459     # validate beparams changes
1460     if self.op.beparams:
1461       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1462       self.new_beparams = cluster.FillDict(
1463         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1464
1465     # hypervisor list/parameters
1466     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1467     if self.op.hvparams:
1468       if not isinstance(self.op.hvparams, dict):
1469         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1470       for hv_name, hv_dict in self.op.hvparams.items():
1471         if hv_name not in self.new_hvparams:
1472           self.new_hvparams[hv_name] = hv_dict
1473         else:
1474           self.new_hvparams[hv_name].update(hv_dict)
1475
1476     if self.op.enabled_hypervisors is not None:
1477       self.hv_list = self.op.enabled_hypervisors
1478     else:
1479       self.hv_list = cluster.enabled_hypervisors
1480
1481     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1482       # either the enabled list has changed, or the parameters have, validate
1483       for hv_name, hv_params in self.new_hvparams.items():
1484         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1485             (self.op.enabled_hypervisors and
1486              hv_name in self.op.enabled_hypervisors)):
1487           # either this is a new hypervisor, or its parameters have changed
1488           hv_class = hypervisor.GetHypervisor(hv_name)
1489           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1490           hv_class.CheckParameterSyntax(hv_params)
1491           _CheckHVParams(self, node_list, hv_name, hv_params)
1492
1493   def Exec(self, feedback_fn):
1494     """Change the parameters of the cluster.
1495
1496     """
1497     if self.op.vg_name is not None:
1498       if self.op.vg_name != self.cfg.GetVGName():
1499         self.cfg.SetVGName(self.op.vg_name)
1500       else:
1501         feedback_fn("Cluster LVM configuration already in desired"
1502                     " state, not changing")
1503     if self.op.hvparams:
1504       self.cluster.hvparams = self.new_hvparams
1505     if self.op.enabled_hypervisors is not None:
1506       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1507     if self.op.beparams:
1508       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1509     if self.op.candidate_pool_size is not None:
1510       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1511
1512     self.cfg.Update(self.cluster)
1513
1514     # we want to update nodes after the cluster so that if any errors
1515     # happen, we have recorded and saved the cluster info
1516     if self.op.candidate_pool_size is not None:
1517       _AdjustCandidatePool(self)
1518
1519
1520 class LURedistributeConfig(NoHooksLU):
1521   """Force the redistribution of cluster configuration.
1522
1523   This is a very simple LU.
1524
1525   """
1526   _OP_REQP = []
1527   REQ_BGL = False
1528
1529   def ExpandNames(self):
1530     self.needed_locks = {
1531       locking.LEVEL_NODE: locking.ALL_SET,
1532     }
1533     self.share_locks[locking.LEVEL_NODE] = 1
1534
1535   def CheckPrereq(self):
1536     """Check prerequisites.
1537
1538     """
1539
1540   def Exec(self, feedback_fn):
1541     """Redistribute the configuration.
1542
1543     """
1544     self.cfg.Update(self.cfg.GetClusterInfo())
1545
1546
1547 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1548   """Sleep and poll for an instance's disk to sync.
1549
1550   """
1551   if not instance.disks:
1552     return True
1553
1554   if not oneshot:
1555     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1556
1557   node = instance.primary_node
1558
1559   for dev in instance.disks:
1560     lu.cfg.SetDiskID(dev, node)
1561
1562   retries = 0
1563   while True:
1564     max_time = 0
1565     done = True
1566     cumul_degraded = False
1567     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1568     if rstats.failed or not rstats.data:
1569       lu.LogWarning("Can't get any data from node %s", node)
1570       retries += 1
1571       if retries >= 10:
1572         raise errors.RemoteError("Can't contact node %s for mirror data,"
1573                                  " aborting." % node)
1574       time.sleep(6)
1575       continue
1576     rstats = rstats.data
1577     retries = 0
1578     for i, mstat in enumerate(rstats):
1579       if mstat is None:
1580         lu.LogWarning("Can't compute data for node %s/%s",
1581                            node, instance.disks[i].iv_name)
1582         continue
1583       # we ignore the ldisk parameter
1584       perc_done, est_time, is_degraded, _ = mstat
1585       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1586       if perc_done is not None:
1587         done = False
1588         if est_time is not None:
1589           rem_time = "%d estimated seconds remaining" % est_time
1590           max_time = est_time
1591         else:
1592           rem_time = "no time estimate"
1593         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1594                         (instance.disks[i].iv_name, perc_done, rem_time))
1595     if done or oneshot:
1596       break
1597
1598     time.sleep(min(60, max_time))
1599
1600   if done:
1601     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1602   return not cumul_degraded
1603
1604
1605 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1606   """Check that mirrors are not degraded.
1607
1608   The ldisk parameter, if True, will change the test from the
1609   is_degraded attribute (which represents overall non-ok status for
1610   the device(s)) to the ldisk (representing the local storage status).
1611
1612   """
1613   lu.cfg.SetDiskID(dev, node)
1614   if ldisk:
1615     idx = 6
1616   else:
1617     idx = 5
1618
1619   result = True
1620   if on_primary or dev.AssembleOnSecondary():
1621     rstats = lu.rpc.call_blockdev_find(node, dev)
1622     msg = rstats.RemoteFailMsg()
1623     if msg:
1624       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1625       result = False
1626     elif not rstats.payload:
1627       lu.LogWarning("Can't find disk on node %s", node)
1628       result = False
1629     else:
1630       result = result and (not rstats.payload[idx])
1631   if dev.children:
1632     for child in dev.children:
1633       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1634
1635   return result
1636
1637
1638 class LUDiagnoseOS(NoHooksLU):
1639   """Logical unit for OS diagnose/query.
1640
1641   """
1642   _OP_REQP = ["output_fields", "names"]
1643   REQ_BGL = False
1644   _FIELDS_STATIC = utils.FieldSet()
1645   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1646
1647   def ExpandNames(self):
1648     if self.op.names:
1649       raise errors.OpPrereqError("Selective OS query not supported")
1650
1651     _CheckOutputFields(static=self._FIELDS_STATIC,
1652                        dynamic=self._FIELDS_DYNAMIC,
1653                        selected=self.op.output_fields)
1654
1655     # Lock all nodes, in shared mode
1656     self.needed_locks = {}
1657     self.share_locks[locking.LEVEL_NODE] = 1
1658     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1659
1660   def CheckPrereq(self):
1661     """Check prerequisites.
1662
1663     """
1664
1665   @staticmethod
1666   def _DiagnoseByOS(node_list, rlist):
1667     """Remaps a per-node return list into an a per-os per-node dictionary
1668
1669     @param node_list: a list with the names of all nodes
1670     @param rlist: a map with node names as keys and OS objects as values
1671
1672     @rtype: dict
1673     @returns: a dictionary with osnames as keys and as value another map, with
1674         nodes as keys and list of OS objects as values, eg::
1675
1676           {"debian-etch": {"node1": [<object>,...],
1677                            "node2": [<object>,]}
1678           }
1679
1680     """
1681     all_os = {}
1682     for node_name, nr in rlist.iteritems():
1683       if nr.failed or not nr.data:
1684         continue
1685       for os_obj in nr.data:
1686         if os_obj.name not in all_os:
1687           # build a list of nodes for this os containing empty lists
1688           # for each node in node_list
1689           all_os[os_obj.name] = {}
1690           for nname in node_list:
1691             all_os[os_obj.name][nname] = []
1692         all_os[os_obj.name][node_name].append(os_obj)
1693     return all_os
1694
1695   def Exec(self, feedback_fn):
1696     """Compute the list of OSes.
1697
1698     """
1699     node_list = self.acquired_locks[locking.LEVEL_NODE]
1700     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1701                    if node in node_list]
1702     node_data = self.rpc.call_os_diagnose(valid_nodes)
1703     if node_data == False:
1704       raise errors.OpExecError("Can't gather the list of OSes")
1705     pol = self._DiagnoseByOS(valid_nodes, node_data)
1706     output = []
1707     for os_name, os_data in pol.iteritems():
1708       row = []
1709       for field in self.op.output_fields:
1710         if field == "name":
1711           val = os_name
1712         elif field == "valid":
1713           val = utils.all([osl and osl[0] for osl in os_data.values()])
1714         elif field == "node_status":
1715           val = {}
1716           for node_name, nos_list in os_data.iteritems():
1717             val[node_name] = [(v.status, v.path) for v in nos_list]
1718         else:
1719           raise errors.ParameterError(field)
1720         row.append(val)
1721       output.append(row)
1722
1723     return output
1724
1725
1726 class LURemoveNode(LogicalUnit):
1727   """Logical unit for removing a node.
1728
1729   """
1730   HPATH = "node-remove"
1731   HTYPE = constants.HTYPE_NODE
1732   _OP_REQP = ["node_name"]
1733
1734   def BuildHooksEnv(self):
1735     """Build hooks env.
1736
1737     This doesn't run on the target node in the pre phase as a failed
1738     node would then be impossible to remove.
1739
1740     """
1741     env = {
1742       "OP_TARGET": self.op.node_name,
1743       "NODE_NAME": self.op.node_name,
1744       }
1745     all_nodes = self.cfg.GetNodeList()
1746     all_nodes.remove(self.op.node_name)
1747     return env, all_nodes, all_nodes
1748
1749   def CheckPrereq(self):
1750     """Check prerequisites.
1751
1752     This checks:
1753      - the node exists in the configuration
1754      - it does not have primary or secondary instances
1755      - it's not the master
1756
1757     Any errors are signalled by raising errors.OpPrereqError.
1758
1759     """
1760     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1761     if node is None:
1762       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1763
1764     instance_list = self.cfg.GetInstanceList()
1765
1766     masternode = self.cfg.GetMasterNode()
1767     if node.name == masternode:
1768       raise errors.OpPrereqError("Node is the master node,"
1769                                  " you need to failover first.")
1770
1771     for instance_name in instance_list:
1772       instance = self.cfg.GetInstanceInfo(instance_name)
1773       if node.name in instance.all_nodes:
1774         raise errors.OpPrereqError("Instance %s is still running on the node,"
1775                                    " please remove first." % instance_name)
1776     self.op.node_name = node.name
1777     self.node = node
1778
1779   def Exec(self, feedback_fn):
1780     """Removes the node from the cluster.
1781
1782     """
1783     node = self.node
1784     logging.info("Stopping the node daemon and removing configs from node %s",
1785                  node.name)
1786
1787     self.context.RemoveNode(node.name)
1788
1789     self.rpc.call_node_leave_cluster(node.name)
1790
1791     # Promote nodes to master candidate as needed
1792     _AdjustCandidatePool(self)
1793
1794
1795 class LUQueryNodes(NoHooksLU):
1796   """Logical unit for querying nodes.
1797
1798   """
1799   _OP_REQP = ["output_fields", "names", "use_locking"]
1800   REQ_BGL = False
1801   _FIELDS_DYNAMIC = utils.FieldSet(
1802     "dtotal", "dfree",
1803     "mtotal", "mnode", "mfree",
1804     "bootid",
1805     "ctotal", "cnodes", "csockets",
1806     )
1807
1808   _FIELDS_STATIC = utils.FieldSet(
1809     "name", "pinst_cnt", "sinst_cnt",
1810     "pinst_list", "sinst_list",
1811     "pip", "sip", "tags",
1812     "serial_no",
1813     "master_candidate",
1814     "master",
1815     "offline",
1816     "drained",
1817     )
1818
1819   def ExpandNames(self):
1820     _CheckOutputFields(static=self._FIELDS_STATIC,
1821                        dynamic=self._FIELDS_DYNAMIC,
1822                        selected=self.op.output_fields)
1823
1824     self.needed_locks = {}
1825     self.share_locks[locking.LEVEL_NODE] = 1
1826
1827     if self.op.names:
1828       self.wanted = _GetWantedNodes(self, self.op.names)
1829     else:
1830       self.wanted = locking.ALL_SET
1831
1832     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1833     self.do_locking = self.do_node_query and self.op.use_locking
1834     if self.do_locking:
1835       # if we don't request only static fields, we need to lock the nodes
1836       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1837
1838
1839   def CheckPrereq(self):
1840     """Check prerequisites.
1841
1842     """
1843     # The validation of the node list is done in the _GetWantedNodes,
1844     # if non empty, and if empty, there's no validation to do
1845     pass
1846
1847   def Exec(self, feedback_fn):
1848     """Computes the list of nodes and their attributes.
1849
1850     """
1851     all_info = self.cfg.GetAllNodesInfo()
1852     if self.do_locking:
1853       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1854     elif self.wanted != locking.ALL_SET:
1855       nodenames = self.wanted
1856       missing = set(nodenames).difference(all_info.keys())
1857       if missing:
1858         raise errors.OpExecError(
1859           "Some nodes were removed before retrieving their data: %s" % missing)
1860     else:
1861       nodenames = all_info.keys()
1862
1863     nodenames = utils.NiceSort(nodenames)
1864     nodelist = [all_info[name] for name in nodenames]
1865
1866     # begin data gathering
1867
1868     if self.do_node_query:
1869       live_data = {}
1870       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1871                                           self.cfg.GetHypervisorType())
1872       for name in nodenames:
1873         nodeinfo = node_data[name]
1874         if not nodeinfo.failed and nodeinfo.data:
1875           nodeinfo = nodeinfo.data
1876           fn = utils.TryConvert
1877           live_data[name] = {
1878             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1879             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1880             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1881             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1882             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1883             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1884             "bootid": nodeinfo.get('bootid', None),
1885             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1886             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1887             }
1888         else:
1889           live_data[name] = {}
1890     else:
1891       live_data = dict.fromkeys(nodenames, {})
1892
1893     node_to_primary = dict([(name, set()) for name in nodenames])
1894     node_to_secondary = dict([(name, set()) for name in nodenames])
1895
1896     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1897                              "sinst_cnt", "sinst_list"))
1898     if inst_fields & frozenset(self.op.output_fields):
1899       instancelist = self.cfg.GetInstanceList()
1900
1901       for instance_name in instancelist:
1902         inst = self.cfg.GetInstanceInfo(instance_name)
1903         if inst.primary_node in node_to_primary:
1904           node_to_primary[inst.primary_node].add(inst.name)
1905         for secnode in inst.secondary_nodes:
1906           if secnode in node_to_secondary:
1907             node_to_secondary[secnode].add(inst.name)
1908
1909     master_node = self.cfg.GetMasterNode()
1910
1911     # end data gathering
1912
1913     output = []
1914     for node in nodelist:
1915       node_output = []
1916       for field in self.op.output_fields:
1917         if field == "name":
1918           val = node.name
1919         elif field == "pinst_list":
1920           val = list(node_to_primary[node.name])
1921         elif field == "sinst_list":
1922           val = list(node_to_secondary[node.name])
1923         elif field == "pinst_cnt":
1924           val = len(node_to_primary[node.name])
1925         elif field == "sinst_cnt":
1926           val = len(node_to_secondary[node.name])
1927         elif field == "pip":
1928           val = node.primary_ip
1929         elif field == "sip":
1930           val = node.secondary_ip
1931         elif field == "tags":
1932           val = list(node.GetTags())
1933         elif field == "serial_no":
1934           val = node.serial_no
1935         elif field == "master_candidate":
1936           val = node.master_candidate
1937         elif field == "master":
1938           val = node.name == master_node
1939         elif field == "offline":
1940           val = node.offline
1941         elif field == "drained":
1942           val = node.drained
1943         elif self._FIELDS_DYNAMIC.Matches(field):
1944           val = live_data[node.name].get(field, None)
1945         else:
1946           raise errors.ParameterError(field)
1947         node_output.append(val)
1948       output.append(node_output)
1949
1950     return output
1951
1952
1953 class LUQueryNodeVolumes(NoHooksLU):
1954   """Logical unit for getting volumes on node(s).
1955
1956   """
1957   _OP_REQP = ["nodes", "output_fields"]
1958   REQ_BGL = False
1959   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1960   _FIELDS_STATIC = utils.FieldSet("node")
1961
1962   def ExpandNames(self):
1963     _CheckOutputFields(static=self._FIELDS_STATIC,
1964                        dynamic=self._FIELDS_DYNAMIC,
1965                        selected=self.op.output_fields)
1966
1967     self.needed_locks = {}
1968     self.share_locks[locking.LEVEL_NODE] = 1
1969     if not self.op.nodes:
1970       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1971     else:
1972       self.needed_locks[locking.LEVEL_NODE] = \
1973         _GetWantedNodes(self, self.op.nodes)
1974
1975   def CheckPrereq(self):
1976     """Check prerequisites.
1977
1978     This checks that the fields required are valid output fields.
1979
1980     """
1981     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1982
1983   def Exec(self, feedback_fn):
1984     """Computes the list of nodes and their attributes.
1985
1986     """
1987     nodenames = self.nodes
1988     volumes = self.rpc.call_node_volumes(nodenames)
1989
1990     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1991              in self.cfg.GetInstanceList()]
1992
1993     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1994
1995     output = []
1996     for node in nodenames:
1997       if node not in volumes or volumes[node].failed or not volumes[node].data:
1998         continue
1999
2000       node_vols = volumes[node].data[:]
2001       node_vols.sort(key=lambda vol: vol['dev'])
2002
2003       for vol in node_vols:
2004         node_output = []
2005         for field in self.op.output_fields:
2006           if field == "node":
2007             val = node
2008           elif field == "phys":
2009             val = vol['dev']
2010           elif field == "vg":
2011             val = vol['vg']
2012           elif field == "name":
2013             val = vol['name']
2014           elif field == "size":
2015             val = int(float(vol['size']))
2016           elif field == "instance":
2017             for inst in ilist:
2018               if node not in lv_by_node[inst]:
2019                 continue
2020               if vol['name'] in lv_by_node[inst][node]:
2021                 val = inst.name
2022                 break
2023             else:
2024               val = '-'
2025           else:
2026             raise errors.ParameterError(field)
2027           node_output.append(str(val))
2028
2029         output.append(node_output)
2030
2031     return output
2032
2033
2034 class LUAddNode(LogicalUnit):
2035   """Logical unit for adding node to the cluster.
2036
2037   """
2038   HPATH = "node-add"
2039   HTYPE = constants.HTYPE_NODE
2040   _OP_REQP = ["node_name"]
2041
2042   def BuildHooksEnv(self):
2043     """Build hooks env.
2044
2045     This will run on all nodes before, and on all nodes + the new node after.
2046
2047     """
2048     env = {
2049       "OP_TARGET": self.op.node_name,
2050       "NODE_NAME": self.op.node_name,
2051       "NODE_PIP": self.op.primary_ip,
2052       "NODE_SIP": self.op.secondary_ip,
2053       }
2054     nodes_0 = self.cfg.GetNodeList()
2055     nodes_1 = nodes_0 + [self.op.node_name, ]
2056     return env, nodes_0, nodes_1
2057
2058   def CheckPrereq(self):
2059     """Check prerequisites.
2060
2061     This checks:
2062      - the new node is not already in the config
2063      - it is resolvable
2064      - its parameters (single/dual homed) matches the cluster
2065
2066     Any errors are signalled by raising errors.OpPrereqError.
2067
2068     """
2069     node_name = self.op.node_name
2070     cfg = self.cfg
2071
2072     dns_data = utils.HostInfo(node_name)
2073
2074     node = dns_data.name
2075     primary_ip = self.op.primary_ip = dns_data.ip
2076     secondary_ip = getattr(self.op, "secondary_ip", None)
2077     if secondary_ip is None:
2078       secondary_ip = primary_ip
2079     if not utils.IsValidIP(secondary_ip):
2080       raise errors.OpPrereqError("Invalid secondary IP given")
2081     self.op.secondary_ip = secondary_ip
2082
2083     node_list = cfg.GetNodeList()
2084     if not self.op.readd and node in node_list:
2085       raise errors.OpPrereqError("Node %s is already in the configuration" %
2086                                  node)
2087     elif self.op.readd and node not in node_list:
2088       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2089
2090     for existing_node_name in node_list:
2091       existing_node = cfg.GetNodeInfo(existing_node_name)
2092
2093       if self.op.readd and node == existing_node_name:
2094         if (existing_node.primary_ip != primary_ip or
2095             existing_node.secondary_ip != secondary_ip):
2096           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2097                                      " address configuration as before")
2098         continue
2099
2100       if (existing_node.primary_ip == primary_ip or
2101           existing_node.secondary_ip == primary_ip or
2102           existing_node.primary_ip == secondary_ip or
2103           existing_node.secondary_ip == secondary_ip):
2104         raise errors.OpPrereqError("New node ip address(es) conflict with"
2105                                    " existing node %s" % existing_node.name)
2106
2107     # check that the type of the node (single versus dual homed) is the
2108     # same as for the master
2109     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2110     master_singlehomed = myself.secondary_ip == myself.primary_ip
2111     newbie_singlehomed = secondary_ip == primary_ip
2112     if master_singlehomed != newbie_singlehomed:
2113       if master_singlehomed:
2114         raise errors.OpPrereqError("The master has no private ip but the"
2115                                    " new node has one")
2116       else:
2117         raise errors.OpPrereqError("The master has a private ip but the"
2118                                    " new node doesn't have one")
2119
2120     # checks reachablity
2121     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2122       raise errors.OpPrereqError("Node not reachable by ping")
2123
2124     if not newbie_singlehomed:
2125       # check reachability from my secondary ip to newbie's secondary ip
2126       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2127                            source=myself.secondary_ip):
2128         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2129                                    " based ping to noded port")
2130
2131     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2132     mc_now, _ = self.cfg.GetMasterCandidateStats()
2133     master_candidate = mc_now < cp_size
2134
2135     self.new_node = objects.Node(name=node,
2136                                  primary_ip=primary_ip,
2137                                  secondary_ip=secondary_ip,
2138                                  master_candidate=master_candidate,
2139                                  offline=False, drained=False)
2140
2141   def Exec(self, feedback_fn):
2142     """Adds the new node to the cluster.
2143
2144     """
2145     new_node = self.new_node
2146     node = new_node.name
2147
2148     # check connectivity
2149     result = self.rpc.call_version([node])[node]
2150     result.Raise()
2151     if result.data:
2152       if constants.PROTOCOL_VERSION == result.data:
2153         logging.info("Communication to node %s fine, sw version %s match",
2154                      node, result.data)
2155       else:
2156         raise errors.OpExecError("Version mismatch master version %s,"
2157                                  " node version %s" %
2158                                  (constants.PROTOCOL_VERSION, result.data))
2159     else:
2160       raise errors.OpExecError("Cannot get version from the new node")
2161
2162     # setup ssh on node
2163     logging.info("Copy ssh key to node %s", node)
2164     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2165     keyarray = []
2166     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2167                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2168                 priv_key, pub_key]
2169
2170     for i in keyfiles:
2171       f = open(i, 'r')
2172       try:
2173         keyarray.append(f.read())
2174       finally:
2175         f.close()
2176
2177     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2178                                     keyarray[2],
2179                                     keyarray[3], keyarray[4], keyarray[5])
2180
2181     msg = result.RemoteFailMsg()
2182     if msg:
2183       raise errors.OpExecError("Cannot transfer ssh keys to the"
2184                                " new node: %s" % msg)
2185
2186     # Add node to our /etc/hosts, and add key to known_hosts
2187     utils.AddHostToEtcHosts(new_node.name)
2188
2189     if new_node.secondary_ip != new_node.primary_ip:
2190       result = self.rpc.call_node_has_ip_address(new_node.name,
2191                                                  new_node.secondary_ip)
2192       if result.failed or not result.data:
2193         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2194                                  " you gave (%s). Please fix and re-run this"
2195                                  " command." % new_node.secondary_ip)
2196
2197     node_verify_list = [self.cfg.GetMasterNode()]
2198     node_verify_param = {
2199       'nodelist': [node],
2200       # TODO: do a node-net-test as well?
2201     }
2202
2203     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2204                                        self.cfg.GetClusterName())
2205     for verifier in node_verify_list:
2206       if result[verifier].failed or not result[verifier].data:
2207         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2208                                  " for remote verification" % verifier)
2209       if result[verifier].data['nodelist']:
2210         for failed in result[verifier].data['nodelist']:
2211           feedback_fn("ssh/hostname verification failed %s -> %s" %
2212                       (verifier, result[verifier].data['nodelist'][failed]))
2213         raise errors.OpExecError("ssh/hostname verification failed.")
2214
2215     # Distribute updated /etc/hosts and known_hosts to all nodes,
2216     # including the node just added
2217     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2218     dist_nodes = self.cfg.GetNodeList()
2219     if not self.op.readd:
2220       dist_nodes.append(node)
2221     if myself.name in dist_nodes:
2222       dist_nodes.remove(myself.name)
2223
2224     logging.debug("Copying hosts and known_hosts to all nodes")
2225     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2226       result = self.rpc.call_upload_file(dist_nodes, fname)
2227       for to_node, to_result in result.iteritems():
2228         if to_result.failed or not to_result.data:
2229           logging.error("Copy of file %s to node %s failed", fname, to_node)
2230
2231     to_copy = []
2232     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2233     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2234       to_copy.append(constants.VNC_PASSWORD_FILE)
2235
2236     for fname in to_copy:
2237       result = self.rpc.call_upload_file([node], fname)
2238       if result[node].failed or not result[node]:
2239         logging.error("Could not copy file %s to node %s", fname, node)
2240
2241     if self.op.readd:
2242       self.context.ReaddNode(new_node)
2243     else:
2244       self.context.AddNode(new_node)
2245
2246
2247 class LUSetNodeParams(LogicalUnit):
2248   """Modifies the parameters of a node.
2249
2250   """
2251   HPATH = "node-modify"
2252   HTYPE = constants.HTYPE_NODE
2253   _OP_REQP = ["node_name"]
2254   REQ_BGL = False
2255
2256   def CheckArguments(self):
2257     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2258     if node_name is None:
2259       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2260     self.op.node_name = node_name
2261     _CheckBooleanOpField(self.op, 'master_candidate')
2262     _CheckBooleanOpField(self.op, 'offline')
2263     _CheckBooleanOpField(self.op, 'drained')
2264     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2265     if all_mods.count(None) == 3:
2266       raise errors.OpPrereqError("Please pass at least one modification")
2267     if all_mods.count(True) > 1:
2268       raise errors.OpPrereqError("Can't set the node into more than one"
2269                                  " state at the same time")
2270
2271   def ExpandNames(self):
2272     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2273
2274   def BuildHooksEnv(self):
2275     """Build hooks env.
2276
2277     This runs on the master node.
2278
2279     """
2280     env = {
2281       "OP_TARGET": self.op.node_name,
2282       "MASTER_CANDIDATE": str(self.op.master_candidate),
2283       "OFFLINE": str(self.op.offline),
2284       "DRAINED": str(self.op.drained),
2285       }
2286     nl = [self.cfg.GetMasterNode(),
2287           self.op.node_name]
2288     return env, nl, nl
2289
2290   def CheckPrereq(self):
2291     """Check prerequisites.
2292
2293     This only checks the instance list against the existing names.
2294
2295     """
2296     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2297
2298     if ((self.op.master_candidate == False or self.op.offline == True or
2299          self.op.drained == True) and node.master_candidate):
2300       # we will demote the node from master_candidate
2301       if self.op.node_name == self.cfg.GetMasterNode():
2302         raise errors.OpPrereqError("The master node has to be a"
2303                                    " master candidate, online and not drained")
2304       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2305       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2306       if num_candidates <= cp_size:
2307         msg = ("Not enough master candidates (desired"
2308                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2309         if self.op.force:
2310           self.LogWarning(msg)
2311         else:
2312           raise errors.OpPrereqError(msg)
2313
2314     if (self.op.master_candidate == True and
2315         ((node.offline and not self.op.offline == False) or
2316          (node.drained and not self.op.drained == False))):
2317       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2318                                  " to master_candidate")
2319
2320     return
2321
2322   def Exec(self, feedback_fn):
2323     """Modifies a node.
2324
2325     """
2326     node = self.node
2327
2328     result = []
2329     changed_mc = False
2330
2331     if self.op.offline is not None:
2332       node.offline = self.op.offline
2333       result.append(("offline", str(self.op.offline)))
2334       if self.op.offline == True:
2335         if node.master_candidate:
2336           node.master_candidate = False
2337           changed_mc = True
2338           result.append(("master_candidate", "auto-demotion due to offline"))
2339         if node.drained:
2340           node.drained = False
2341           result.append(("drained", "clear drained status due to offline"))
2342
2343     if self.op.master_candidate is not None:
2344       node.master_candidate = self.op.master_candidate
2345       changed_mc = True
2346       result.append(("master_candidate", str(self.op.master_candidate)))
2347       if self.op.master_candidate == False:
2348         rrc = self.rpc.call_node_demote_from_mc(node.name)
2349         msg = rrc.RemoteFailMsg()
2350         if msg:
2351           self.LogWarning("Node failed to demote itself: %s" % msg)
2352
2353     if self.op.drained is not None:
2354       node.drained = self.op.drained
2355       result.append(("drained", str(self.op.drained)))
2356       if self.op.drained == True:
2357         if node.master_candidate:
2358           node.master_candidate = False
2359           changed_mc = True
2360           result.append(("master_candidate", "auto-demotion due to drain"))
2361         if node.offline:
2362           node.offline = False
2363           result.append(("offline", "clear offline status due to drain"))
2364
2365     # this will trigger configuration file update, if needed
2366     self.cfg.Update(node)
2367     # this will trigger job queue propagation or cleanup
2368     if changed_mc:
2369       self.context.ReaddNode(node)
2370
2371     return result
2372
2373
2374 class LUQueryClusterInfo(NoHooksLU):
2375   """Query cluster configuration.
2376
2377   """
2378   _OP_REQP = []
2379   REQ_BGL = False
2380
2381   def ExpandNames(self):
2382     self.needed_locks = {}
2383
2384   def CheckPrereq(self):
2385     """No prerequsites needed for this LU.
2386
2387     """
2388     pass
2389
2390   def Exec(self, feedback_fn):
2391     """Return cluster config.
2392
2393     """
2394     cluster = self.cfg.GetClusterInfo()
2395     result = {
2396       "software_version": constants.RELEASE_VERSION,
2397       "protocol_version": constants.PROTOCOL_VERSION,
2398       "config_version": constants.CONFIG_VERSION,
2399       "os_api_version": constants.OS_API_VERSION,
2400       "export_version": constants.EXPORT_VERSION,
2401       "architecture": (platform.architecture()[0], platform.machine()),
2402       "name": cluster.cluster_name,
2403       "master": cluster.master_node,
2404       "default_hypervisor": cluster.default_hypervisor,
2405       "enabled_hypervisors": cluster.enabled_hypervisors,
2406       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2407                         for hypervisor in cluster.enabled_hypervisors]),
2408       "beparams": cluster.beparams,
2409       "candidate_pool_size": cluster.candidate_pool_size,
2410       }
2411
2412     return result
2413
2414
2415 class LUQueryConfigValues(NoHooksLU):
2416   """Return configuration values.
2417
2418   """
2419   _OP_REQP = []
2420   REQ_BGL = False
2421   _FIELDS_DYNAMIC = utils.FieldSet()
2422   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2423
2424   def ExpandNames(self):
2425     self.needed_locks = {}
2426
2427     _CheckOutputFields(static=self._FIELDS_STATIC,
2428                        dynamic=self._FIELDS_DYNAMIC,
2429                        selected=self.op.output_fields)
2430
2431   def CheckPrereq(self):
2432     """No prerequisites.
2433
2434     """
2435     pass
2436
2437   def Exec(self, feedback_fn):
2438     """Dump a representation of the cluster config to the standard output.
2439
2440     """
2441     values = []
2442     for field in self.op.output_fields:
2443       if field == "cluster_name":
2444         entry = self.cfg.GetClusterName()
2445       elif field == "master_node":
2446         entry = self.cfg.GetMasterNode()
2447       elif field == "drain_flag":
2448         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2449       else:
2450         raise errors.ParameterError(field)
2451       values.append(entry)
2452     return values
2453
2454
2455 class LUActivateInstanceDisks(NoHooksLU):
2456   """Bring up an instance's disks.
2457
2458   """
2459   _OP_REQP = ["instance_name"]
2460   REQ_BGL = False
2461
2462   def ExpandNames(self):
2463     self._ExpandAndLockInstance()
2464     self.needed_locks[locking.LEVEL_NODE] = []
2465     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2466
2467   def DeclareLocks(self, level):
2468     if level == locking.LEVEL_NODE:
2469       self._LockInstancesNodes()
2470
2471   def CheckPrereq(self):
2472     """Check prerequisites.
2473
2474     This checks that the instance is in the cluster.
2475
2476     """
2477     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2478     assert self.instance is not None, \
2479       "Cannot retrieve locked instance %s" % self.op.instance_name
2480     _CheckNodeOnline(self, self.instance.primary_node)
2481
2482   def Exec(self, feedback_fn):
2483     """Activate the disks.
2484
2485     """
2486     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2487     if not disks_ok:
2488       raise errors.OpExecError("Cannot activate block devices")
2489
2490     return disks_info
2491
2492
2493 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2494   """Prepare the block devices for an instance.
2495
2496   This sets up the block devices on all nodes.
2497
2498   @type lu: L{LogicalUnit}
2499   @param lu: the logical unit on whose behalf we execute
2500   @type instance: L{objects.Instance}
2501   @param instance: the instance for whose disks we assemble
2502   @type ignore_secondaries: boolean
2503   @param ignore_secondaries: if true, errors on secondary nodes
2504       won't result in an error return from the function
2505   @return: False if the operation failed, otherwise a list of
2506       (host, instance_visible_name, node_visible_name)
2507       with the mapping from node devices to instance devices
2508
2509   """
2510   device_info = []
2511   disks_ok = True
2512   iname = instance.name
2513   # With the two passes mechanism we try to reduce the window of
2514   # opportunity for the race condition of switching DRBD to primary
2515   # before handshaking occured, but we do not eliminate it
2516
2517   # The proper fix would be to wait (with some limits) until the
2518   # connection has been made and drbd transitions from WFConnection
2519   # into any other network-connected state (Connected, SyncTarget,
2520   # SyncSource, etc.)
2521
2522   # 1st pass, assemble on all nodes in secondary mode
2523   for inst_disk in instance.disks:
2524     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2525       lu.cfg.SetDiskID(node_disk, node)
2526       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2527       msg = result.RemoteFailMsg()
2528       if msg:
2529         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2530                            " (is_primary=False, pass=1): %s",
2531                            inst_disk.iv_name, node, msg)
2532         if not ignore_secondaries:
2533           disks_ok = False
2534
2535   # FIXME: race condition on drbd migration to primary
2536
2537   # 2nd pass, do only the primary node
2538   for inst_disk in instance.disks:
2539     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2540       if node != instance.primary_node:
2541         continue
2542       lu.cfg.SetDiskID(node_disk, node)
2543       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2544       msg = result.RemoteFailMsg()
2545       if msg:
2546         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2547                            " (is_primary=True, pass=2): %s",
2548                            inst_disk.iv_name, node, msg)
2549         disks_ok = False
2550     device_info.append((instance.primary_node, inst_disk.iv_name,
2551                         result.payload))
2552
2553   # leave the disks configured for the primary node
2554   # this is a workaround that would be fixed better by
2555   # improving the logical/physical id handling
2556   for disk in instance.disks:
2557     lu.cfg.SetDiskID(disk, instance.primary_node)
2558
2559   return disks_ok, device_info
2560
2561
2562 def _StartInstanceDisks(lu, instance, force):
2563   """Start the disks of an instance.
2564
2565   """
2566   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2567                                            ignore_secondaries=force)
2568   if not disks_ok:
2569     _ShutdownInstanceDisks(lu, instance)
2570     if force is not None and not force:
2571       lu.proc.LogWarning("", hint="If the message above refers to a"
2572                          " secondary node,"
2573                          " you can retry the operation using '--force'.")
2574     raise errors.OpExecError("Disk consistency error")
2575
2576
2577 class LUDeactivateInstanceDisks(NoHooksLU):
2578   """Shutdown an instance's disks.
2579
2580   """
2581   _OP_REQP = ["instance_name"]
2582   REQ_BGL = False
2583
2584   def ExpandNames(self):
2585     self._ExpandAndLockInstance()
2586     self.needed_locks[locking.LEVEL_NODE] = []
2587     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2588
2589   def DeclareLocks(self, level):
2590     if level == locking.LEVEL_NODE:
2591       self._LockInstancesNodes()
2592
2593   def CheckPrereq(self):
2594     """Check prerequisites.
2595
2596     This checks that the instance is in the cluster.
2597
2598     """
2599     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2600     assert self.instance is not None, \
2601       "Cannot retrieve locked instance %s" % self.op.instance_name
2602
2603   def Exec(self, feedback_fn):
2604     """Deactivate the disks
2605
2606     """
2607     instance = self.instance
2608     _SafeShutdownInstanceDisks(self, instance)
2609
2610
2611 def _SafeShutdownInstanceDisks(lu, instance):
2612   """Shutdown block devices of an instance.
2613
2614   This function checks if an instance is running, before calling
2615   _ShutdownInstanceDisks.
2616
2617   """
2618   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2619                                       [instance.hypervisor])
2620   ins_l = ins_l[instance.primary_node]
2621   if ins_l.failed or not isinstance(ins_l.data, list):
2622     raise errors.OpExecError("Can't contact node '%s'" %
2623                              instance.primary_node)
2624
2625   if instance.name in ins_l.data:
2626     raise errors.OpExecError("Instance is running, can't shutdown"
2627                              " block devices.")
2628
2629   _ShutdownInstanceDisks(lu, instance)
2630
2631
2632 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2633   """Shutdown block devices of an instance.
2634
2635   This does the shutdown on all nodes of the instance.
2636
2637   If the ignore_primary is false, errors on the primary node are
2638   ignored.
2639
2640   """
2641   all_result = True
2642   for disk in instance.disks:
2643     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2644       lu.cfg.SetDiskID(top_disk, node)
2645       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2646       msg = result.RemoteFailMsg()
2647       if msg:
2648         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2649                       disk.iv_name, node, msg)
2650         if not ignore_primary or node != instance.primary_node:
2651           all_result = False
2652   return all_result
2653
2654
2655 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2656   """Checks if a node has enough free memory.
2657
2658   This function check if a given node has the needed amount of free
2659   memory. In case the node has less memory or we cannot get the
2660   information from the node, this function raise an OpPrereqError
2661   exception.
2662
2663   @type lu: C{LogicalUnit}
2664   @param lu: a logical unit from which we get configuration data
2665   @type node: C{str}
2666   @param node: the node to check
2667   @type reason: C{str}
2668   @param reason: string to use in the error message
2669   @type requested: C{int}
2670   @param requested: the amount of memory in MiB to check for
2671   @type hypervisor_name: C{str}
2672   @param hypervisor_name: the hypervisor to ask for memory stats
2673   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2674       we cannot check the node
2675
2676   """
2677   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2678   nodeinfo[node].Raise()
2679   free_mem = nodeinfo[node].data.get('memory_free')
2680   if not isinstance(free_mem, int):
2681     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2682                              " was '%s'" % (node, free_mem))
2683   if requested > free_mem:
2684     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2685                              " needed %s MiB, available %s MiB" %
2686                              (node, reason, requested, free_mem))
2687
2688
2689 class LUStartupInstance(LogicalUnit):
2690   """Starts an instance.
2691
2692   """
2693   HPATH = "instance-start"
2694   HTYPE = constants.HTYPE_INSTANCE
2695   _OP_REQP = ["instance_name", "force"]
2696   REQ_BGL = False
2697
2698   def ExpandNames(self):
2699     self._ExpandAndLockInstance()
2700
2701   def BuildHooksEnv(self):
2702     """Build hooks env.
2703
2704     This runs on master, primary and secondary nodes of the instance.
2705
2706     """
2707     env = {
2708       "FORCE": self.op.force,
2709       }
2710     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2711     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2712     return env, nl, nl
2713
2714   def CheckPrereq(self):
2715     """Check prerequisites.
2716
2717     This checks that the instance is in the cluster.
2718
2719     """
2720     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2721     assert self.instance is not None, \
2722       "Cannot retrieve locked instance %s" % self.op.instance_name
2723
2724     _CheckNodeOnline(self, instance.primary_node)
2725
2726     bep = self.cfg.GetClusterInfo().FillBE(instance)
2727     # check bridges existance
2728     _CheckInstanceBridgesExist(self, instance)
2729
2730     _CheckNodeFreeMemory(self, instance.primary_node,
2731                          "starting instance %s" % instance.name,
2732                          bep[constants.BE_MEMORY], instance.hypervisor)
2733
2734   def Exec(self, feedback_fn):
2735     """Start the instance.
2736
2737     """
2738     instance = self.instance
2739     force = self.op.force
2740
2741     self.cfg.MarkInstanceUp(instance.name)
2742
2743     node_current = instance.primary_node
2744
2745     _StartInstanceDisks(self, instance, force)
2746
2747     result = self.rpc.call_instance_start(node_current, instance)
2748     msg = result.RemoteFailMsg()
2749     if msg:
2750       _ShutdownInstanceDisks(self, instance)
2751       raise errors.OpExecError("Could not start instance: %s" % msg)
2752
2753
2754 class LURebootInstance(LogicalUnit):
2755   """Reboot an instance.
2756
2757   """
2758   HPATH = "instance-reboot"
2759   HTYPE = constants.HTYPE_INSTANCE
2760   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2761   REQ_BGL = False
2762
2763   def ExpandNames(self):
2764     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2765                                    constants.INSTANCE_REBOOT_HARD,
2766                                    constants.INSTANCE_REBOOT_FULL]:
2767       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2768                                   (constants.INSTANCE_REBOOT_SOFT,
2769                                    constants.INSTANCE_REBOOT_HARD,
2770                                    constants.INSTANCE_REBOOT_FULL))
2771     self._ExpandAndLockInstance()
2772
2773   def BuildHooksEnv(self):
2774     """Build hooks env.
2775
2776     This runs on master, primary and secondary nodes of the instance.
2777
2778     """
2779     env = {
2780       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2781       "REBOOT_TYPE": self.op.reboot_type,
2782       }
2783     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2784     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2785     return env, nl, nl
2786
2787   def CheckPrereq(self):
2788     """Check prerequisites.
2789
2790     This checks that the instance is in the cluster.
2791
2792     """
2793     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2794     assert self.instance is not None, \
2795       "Cannot retrieve locked instance %s" % self.op.instance_name
2796
2797     _CheckNodeOnline(self, instance.primary_node)
2798
2799     # check bridges existance
2800     _CheckInstanceBridgesExist(self, instance)
2801
2802   def Exec(self, feedback_fn):
2803     """Reboot the instance.
2804
2805     """
2806     instance = self.instance
2807     ignore_secondaries = self.op.ignore_secondaries
2808     reboot_type = self.op.reboot_type
2809
2810     node_current = instance.primary_node
2811
2812     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2813                        constants.INSTANCE_REBOOT_HARD]:
2814       for disk in instance.disks:
2815         self.cfg.SetDiskID(disk, node_current)
2816       result = self.rpc.call_instance_reboot(node_current, instance,
2817                                              reboot_type)
2818       msg = result.RemoteFailMsg()
2819       if msg:
2820         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2821     else:
2822       result = self.rpc.call_instance_shutdown(node_current, instance)
2823       msg = result.RemoteFailMsg()
2824       if msg:
2825         raise errors.OpExecError("Could not shutdown instance for"
2826                                  " full reboot: %s" % msg)
2827       _ShutdownInstanceDisks(self, instance)
2828       _StartInstanceDisks(self, instance, ignore_secondaries)
2829       result = self.rpc.call_instance_start(node_current, instance)
2830       msg = result.RemoteFailMsg()
2831       if msg:
2832         _ShutdownInstanceDisks(self, instance)
2833         raise errors.OpExecError("Could not start instance for"
2834                                  " full reboot: %s" % msg)
2835
2836     self.cfg.MarkInstanceUp(instance.name)
2837
2838
2839 class LUShutdownInstance(LogicalUnit):
2840   """Shutdown an instance.
2841
2842   """
2843   HPATH = "instance-stop"
2844   HTYPE = constants.HTYPE_INSTANCE
2845   _OP_REQP = ["instance_name"]
2846   REQ_BGL = False
2847
2848   def ExpandNames(self):
2849     self._ExpandAndLockInstance()
2850
2851   def BuildHooksEnv(self):
2852     """Build hooks env.
2853
2854     This runs on master, primary and secondary nodes of the instance.
2855
2856     """
2857     env = _BuildInstanceHookEnvByObject(self, self.instance)
2858     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2859     return env, nl, nl
2860
2861   def CheckPrereq(self):
2862     """Check prerequisites.
2863
2864     This checks that the instance is in the cluster.
2865
2866     """
2867     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2868     assert self.instance is not None, \
2869       "Cannot retrieve locked instance %s" % self.op.instance_name
2870     _CheckNodeOnline(self, self.instance.primary_node)
2871
2872   def Exec(self, feedback_fn):
2873     """Shutdown the instance.
2874
2875     """
2876     instance = self.instance
2877     node_current = instance.primary_node
2878     self.cfg.MarkInstanceDown(instance.name)
2879     result = self.rpc.call_instance_shutdown(node_current, instance)
2880     msg = result.RemoteFailMsg()
2881     if msg:
2882       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2883
2884     _ShutdownInstanceDisks(self, instance)
2885
2886
2887 class LUReinstallInstance(LogicalUnit):
2888   """Reinstall an instance.
2889
2890   """
2891   HPATH = "instance-reinstall"
2892   HTYPE = constants.HTYPE_INSTANCE
2893   _OP_REQP = ["instance_name"]
2894   REQ_BGL = False
2895
2896   def ExpandNames(self):
2897     self._ExpandAndLockInstance()
2898
2899   def BuildHooksEnv(self):
2900     """Build hooks env.
2901
2902     This runs on master, primary and secondary nodes of the instance.
2903
2904     """
2905     env = _BuildInstanceHookEnvByObject(self, self.instance)
2906     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2907     return env, nl, nl
2908
2909   def CheckPrereq(self):
2910     """Check prerequisites.
2911
2912     This checks that the instance is in the cluster and is not running.
2913
2914     """
2915     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2916     assert instance is not None, \
2917       "Cannot retrieve locked instance %s" % self.op.instance_name
2918     _CheckNodeOnline(self, instance.primary_node)
2919
2920     if instance.disk_template == constants.DT_DISKLESS:
2921       raise errors.OpPrereqError("Instance '%s' has no disks" %
2922                                  self.op.instance_name)
2923     if instance.admin_up:
2924       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2925                                  self.op.instance_name)
2926     remote_info = self.rpc.call_instance_info(instance.primary_node,
2927                                               instance.name,
2928                                               instance.hypervisor)
2929     if remote_info.failed or remote_info.data:
2930       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2931                                  (self.op.instance_name,
2932                                   instance.primary_node))
2933
2934     self.op.os_type = getattr(self.op, "os_type", None)
2935     if self.op.os_type is not None:
2936       # OS verification
2937       pnode = self.cfg.GetNodeInfo(
2938         self.cfg.ExpandNodeName(instance.primary_node))
2939       if pnode is None:
2940         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2941                                    self.op.pnode)
2942       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2943       result.Raise()
2944       if not isinstance(result.data, objects.OS):
2945         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2946                                    " primary node"  % self.op.os_type)
2947
2948     self.instance = instance
2949
2950   def Exec(self, feedback_fn):
2951     """Reinstall the instance.
2952
2953     """
2954     inst = self.instance
2955
2956     if self.op.os_type is not None:
2957       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2958       inst.os = self.op.os_type
2959       self.cfg.Update(inst)
2960
2961     _StartInstanceDisks(self, inst, None)
2962     try:
2963       feedback_fn("Running the instance OS create scripts...")
2964       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2965       msg = result.RemoteFailMsg()
2966       if msg:
2967         raise errors.OpExecError("Could not install OS for instance %s"
2968                                  " on node %s: %s" %
2969                                  (inst.name, inst.primary_node, msg))
2970     finally:
2971       _ShutdownInstanceDisks(self, inst)
2972
2973
2974 class LURenameInstance(LogicalUnit):
2975   """Rename an instance.
2976
2977   """
2978   HPATH = "instance-rename"
2979   HTYPE = constants.HTYPE_INSTANCE
2980   _OP_REQP = ["instance_name", "new_name"]
2981
2982   def BuildHooksEnv(self):
2983     """Build hooks env.
2984
2985     This runs on master, primary and secondary nodes of the instance.
2986
2987     """
2988     env = _BuildInstanceHookEnvByObject(self, self.instance)
2989     env["INSTANCE_NEW_NAME"] = self.op.new_name
2990     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2991     return env, nl, nl
2992
2993   def CheckPrereq(self):
2994     """Check prerequisites.
2995
2996     This checks that the instance is in the cluster and is not running.
2997
2998     """
2999     instance = self.cfg.GetInstanceInfo(
3000       self.cfg.ExpandInstanceName(self.op.instance_name))
3001     if instance is None:
3002       raise errors.OpPrereqError("Instance '%s' not known" %
3003                                  self.op.instance_name)
3004     _CheckNodeOnline(self, instance.primary_node)
3005
3006     if instance.admin_up:
3007       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3008                                  self.op.instance_name)
3009     remote_info = self.rpc.call_instance_info(instance.primary_node,
3010                                               instance.name,
3011                                               instance.hypervisor)
3012     remote_info.Raise()
3013     if remote_info.data:
3014       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3015                                  (self.op.instance_name,
3016                                   instance.primary_node))
3017     self.instance = instance
3018
3019     # new name verification
3020     name_info = utils.HostInfo(self.op.new_name)
3021
3022     self.op.new_name = new_name = name_info.name
3023     instance_list = self.cfg.GetInstanceList()
3024     if new_name in instance_list:
3025       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3026                                  new_name)
3027
3028     if not getattr(self.op, "ignore_ip", False):
3029       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3030         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3031                                    (name_info.ip, new_name))
3032
3033
3034   def Exec(self, feedback_fn):
3035     """Reinstall the instance.
3036
3037     """
3038     inst = self.instance
3039     old_name = inst.name
3040
3041     if inst.disk_template == constants.DT_FILE:
3042       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3043
3044     self.cfg.RenameInstance(inst.name, self.op.new_name)
3045     # Change the instance lock. This is definitely safe while we hold the BGL
3046     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3047     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3048
3049     # re-read the instance from the configuration after rename
3050     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3051
3052     if inst.disk_template == constants.DT_FILE:
3053       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3054       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3055                                                      old_file_storage_dir,
3056                                                      new_file_storage_dir)
3057       result.Raise()
3058       if not result.data:
3059         raise errors.OpExecError("Could not connect to node '%s' to rename"
3060                                  " directory '%s' to '%s' (but the instance"
3061                                  " has been renamed in Ganeti)" % (
3062                                  inst.primary_node, old_file_storage_dir,
3063                                  new_file_storage_dir))
3064
3065       if not result.data[0]:
3066         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3067                                  " (but the instance has been renamed in"
3068                                  " Ganeti)" % (old_file_storage_dir,
3069                                                new_file_storage_dir))
3070
3071     _StartInstanceDisks(self, inst, None)
3072     try:
3073       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3074                                                  old_name)
3075       msg = result.RemoteFailMsg()
3076       if msg:
3077         msg = ("Could not run OS rename script for instance %s on node %s"
3078                " (but the instance has been renamed in Ganeti): %s" %
3079                (inst.name, inst.primary_node, msg))
3080         self.proc.LogWarning(msg)
3081     finally:
3082       _ShutdownInstanceDisks(self, inst)
3083
3084
3085 class LURemoveInstance(LogicalUnit):
3086   """Remove an instance.
3087
3088   """
3089   HPATH = "instance-remove"
3090   HTYPE = constants.HTYPE_INSTANCE
3091   _OP_REQP = ["instance_name", "ignore_failures"]
3092   REQ_BGL = False
3093
3094   def ExpandNames(self):
3095     self._ExpandAndLockInstance()
3096     self.needed_locks[locking.LEVEL_NODE] = []
3097     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3098
3099   def DeclareLocks(self, level):
3100     if level == locking.LEVEL_NODE:
3101       self._LockInstancesNodes()
3102
3103   def BuildHooksEnv(self):
3104     """Build hooks env.
3105
3106     This runs on master, primary and secondary nodes of the instance.
3107
3108     """
3109     env = _BuildInstanceHookEnvByObject(self, self.instance)
3110     nl = [self.cfg.GetMasterNode()]
3111     return env, nl, nl
3112
3113   def CheckPrereq(self):
3114     """Check prerequisites.
3115
3116     This checks that the instance is in the cluster.
3117
3118     """
3119     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3120     assert self.instance is not None, \
3121       "Cannot retrieve locked instance %s" % self.op.instance_name
3122
3123   def Exec(self, feedback_fn):
3124     """Remove the instance.
3125
3126     """
3127     instance = self.instance
3128     logging.info("Shutting down instance %s on node %s",
3129                  instance.name, instance.primary_node)
3130
3131     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3132     msg = result.RemoteFailMsg()
3133     if msg:
3134       if self.op.ignore_failures:
3135         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3136       else:
3137         raise errors.OpExecError("Could not shutdown instance %s on"
3138                                  " node %s: %s" %
3139                                  (instance.name, instance.primary_node, msg))
3140
3141     logging.info("Removing block devices for instance %s", instance.name)
3142
3143     if not _RemoveDisks(self, instance):
3144       if self.op.ignore_failures:
3145         feedback_fn("Warning: can't remove instance's disks")
3146       else:
3147         raise errors.OpExecError("Can't remove instance's disks")
3148
3149     logging.info("Removing instance %s out of cluster config", instance.name)
3150
3151     self.cfg.RemoveInstance(instance.name)
3152     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3153
3154
3155 class LUQueryInstances(NoHooksLU):
3156   """Logical unit for querying instances.
3157
3158   """
3159   _OP_REQP = ["output_fields", "names", "use_locking"]
3160   REQ_BGL = False
3161   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3162                                     "admin_state",
3163                                     "disk_template", "ip", "mac", "bridge",
3164                                     "sda_size", "sdb_size", "vcpus", "tags",
3165                                     "network_port", "beparams",
3166                                     r"(disk)\.(size)/([0-9]+)",
3167                                     r"(disk)\.(sizes)", "disk_usage",
3168                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3169                                     r"(nic)\.(macs|ips|bridges)",
3170                                     r"(disk|nic)\.(count)",
3171                                     "serial_no", "hypervisor", "hvparams",] +
3172                                   ["hv/%s" % name
3173                                    for name in constants.HVS_PARAMETERS] +
3174                                   ["be/%s" % name
3175                                    for name in constants.BES_PARAMETERS])
3176   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3177
3178
3179   def ExpandNames(self):
3180     _CheckOutputFields(static=self._FIELDS_STATIC,
3181                        dynamic=self._FIELDS_DYNAMIC,
3182                        selected=self.op.output_fields)
3183
3184     self.needed_locks = {}
3185     self.share_locks[locking.LEVEL_INSTANCE] = 1
3186     self.share_locks[locking.LEVEL_NODE] = 1
3187
3188     if self.op.names:
3189       self.wanted = _GetWantedInstances(self, self.op.names)
3190     else:
3191       self.wanted = locking.ALL_SET
3192
3193     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3194     self.do_locking = self.do_node_query and self.op.use_locking
3195     if self.do_locking:
3196       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3197       self.needed_locks[locking.LEVEL_NODE] = []
3198       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3199
3200   def DeclareLocks(self, level):
3201     if level == locking.LEVEL_NODE and self.do_locking:
3202       self._LockInstancesNodes()
3203
3204   def CheckPrereq(self):
3205     """Check prerequisites.
3206
3207     """
3208     pass
3209
3210   def Exec(self, feedback_fn):
3211     """Computes the list of nodes and their attributes.
3212
3213     """
3214     all_info = self.cfg.GetAllInstancesInfo()
3215     if self.wanted == locking.ALL_SET:
3216       # caller didn't specify instance names, so ordering is not important
3217       if self.do_locking:
3218         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3219       else:
3220         instance_names = all_info.keys()
3221       instance_names = utils.NiceSort(instance_names)
3222     else:
3223       # caller did specify names, so we must keep the ordering
3224       if self.do_locking:
3225         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3226       else:
3227         tgt_set = all_info.keys()
3228       missing = set(self.wanted).difference(tgt_set)
3229       if missing:
3230         raise errors.OpExecError("Some instances were removed before"
3231                                  " retrieving their data: %s" % missing)
3232       instance_names = self.wanted
3233
3234     instance_list = [all_info[iname] for iname in instance_names]
3235
3236     # begin data gathering
3237
3238     nodes = frozenset([inst.primary_node for inst in instance_list])
3239     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3240
3241     bad_nodes = []
3242     off_nodes = []
3243     if self.do_node_query:
3244       live_data = {}
3245       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3246       for name in nodes:
3247         result = node_data[name]
3248         if result.offline:
3249           # offline nodes will be in both lists
3250           off_nodes.append(name)
3251         if result.failed:
3252           bad_nodes.append(name)
3253         else:
3254           if result.data:
3255             live_data.update(result.data)
3256             # else no instance is alive
3257     else:
3258       live_data = dict([(name, {}) for name in instance_names])
3259
3260     # end data gathering
3261
3262     HVPREFIX = "hv/"
3263     BEPREFIX = "be/"
3264     output = []
3265     for instance in instance_list:
3266       iout = []
3267       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3268       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3269       for field in self.op.output_fields:
3270         st_match = self._FIELDS_STATIC.Matches(field)
3271         if field == "name":
3272           val = instance.name
3273         elif field == "os":
3274           val = instance.os
3275         elif field == "pnode":
3276           val = instance.primary_node
3277         elif field == "snodes":
3278           val = list(instance.secondary_nodes)
3279         elif field == "admin_state":
3280           val = instance.admin_up
3281         elif field == "oper_state":
3282           if instance.primary_node in bad_nodes:
3283             val = None
3284           else:
3285             val = bool(live_data.get(instance.name))
3286         elif field == "status":
3287           if instance.primary_node in off_nodes:
3288             val = "ERROR_nodeoffline"
3289           elif instance.primary_node in bad_nodes:
3290             val = "ERROR_nodedown"
3291           else:
3292             running = bool(live_data.get(instance.name))
3293             if running:
3294               if instance.admin_up:
3295                 val = "running"
3296               else:
3297                 val = "ERROR_up"
3298             else:
3299               if instance.admin_up:
3300                 val = "ERROR_down"
3301               else:
3302                 val = "ADMIN_down"
3303         elif field == "oper_ram":
3304           if instance.primary_node in bad_nodes:
3305             val = None
3306           elif instance.name in live_data:
3307             val = live_data[instance.name].get("memory", "?")
3308           else:
3309             val = "-"
3310         elif field == "disk_template":
3311           val = instance.disk_template
3312         elif field == "ip":
3313           val = instance.nics[0].ip
3314         elif field == "bridge":
3315           val = instance.nics[0].bridge
3316         elif field == "mac":
3317           val = instance.nics[0].mac
3318         elif field == "sda_size" or field == "sdb_size":
3319           idx = ord(field[2]) - ord('a')
3320           try:
3321             val = instance.FindDisk(idx).size
3322           except errors.OpPrereqError:
3323             val = None
3324         elif field == "disk_usage": # total disk usage per node
3325           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3326           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3327         elif field == "tags":
3328           val = list(instance.GetTags())
3329         elif field == "serial_no":
3330           val = instance.serial_no
3331         elif field == "network_port":
3332           val = instance.network_port
3333         elif field == "hypervisor":
3334           val = instance.hypervisor
3335         elif field == "hvparams":
3336           val = i_hv
3337         elif (field.startswith(HVPREFIX) and
3338               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3339           val = i_hv.get(field[len(HVPREFIX):], None)
3340         elif field == "beparams":
3341           val = i_be
3342         elif (field.startswith(BEPREFIX) and
3343               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3344           val = i_be.get(field[len(BEPREFIX):], None)
3345         elif st_match and st_match.groups():
3346           # matches a variable list
3347           st_groups = st_match.groups()
3348           if st_groups and st_groups[0] == "disk":
3349             if st_groups[1] == "count":
3350               val = len(instance.disks)
3351             elif st_groups[1] == "sizes":
3352               val = [disk.size for disk in instance.disks]
3353             elif st_groups[1] == "size":
3354               try:
3355                 val = instance.FindDisk(st_groups[2]).size
3356               except errors.OpPrereqError:
3357                 val = None
3358             else:
3359               assert False, "Unhandled disk parameter"
3360           elif st_groups[0] == "nic":
3361             if st_groups[1] == "count":
3362               val = len(instance.nics)
3363             elif st_groups[1] == "macs":
3364               val = [nic.mac for nic in instance.nics]
3365             elif st_groups[1] == "ips":
3366               val = [nic.ip for nic in instance.nics]
3367             elif st_groups[1] == "bridges":
3368               val = [nic.bridge for nic in instance.nics]
3369             else:
3370               # index-based item
3371               nic_idx = int(st_groups[2])
3372               if nic_idx >= len(instance.nics):
3373                 val = None
3374               else:
3375                 if st_groups[1] == "mac":
3376                   val = instance.nics[nic_idx].mac
3377                 elif st_groups[1] == "ip":
3378                   val = instance.nics[nic_idx].ip
3379                 elif st_groups[1] == "bridge":
3380                   val = instance.nics[nic_idx].bridge
3381                 else:
3382                   assert False, "Unhandled NIC parameter"
3383           else:
3384             assert False, "Unhandled variable parameter"
3385         else:
3386           raise errors.ParameterError(field)
3387         iout.append(val)
3388       output.append(iout)
3389
3390     return output
3391
3392
3393 class LUFailoverInstance(LogicalUnit):
3394   """Failover an instance.
3395
3396   """
3397   HPATH = "instance-failover"
3398   HTYPE = constants.HTYPE_INSTANCE
3399   _OP_REQP = ["instance_name", "ignore_consistency"]
3400   REQ_BGL = False
3401
3402   def ExpandNames(self):
3403     self._ExpandAndLockInstance()
3404     self.needed_locks[locking.LEVEL_NODE] = []
3405     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3406
3407   def DeclareLocks(self, level):
3408     if level == locking.LEVEL_NODE:
3409       self._LockInstancesNodes()
3410
3411   def BuildHooksEnv(self):
3412     """Build hooks env.
3413
3414     This runs on master, primary and secondary nodes of the instance.
3415
3416     """
3417     env = {
3418       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3419       }
3420     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3421     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3422     return env, nl, nl
3423
3424   def CheckPrereq(self):
3425     """Check prerequisites.
3426
3427     This checks that the instance is in the cluster.
3428
3429     """
3430     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3431     assert self.instance is not None, \
3432       "Cannot retrieve locked instance %s" % self.op.instance_name
3433
3434     bep = self.cfg.GetClusterInfo().FillBE(instance)
3435     if instance.disk_template not in constants.DTS_NET_MIRROR:
3436       raise errors.OpPrereqError("Instance's disk layout is not"
3437                                  " network mirrored, cannot failover.")
3438
3439     secondary_nodes = instance.secondary_nodes
3440     if not secondary_nodes:
3441       raise errors.ProgrammerError("no secondary node but using "
3442                                    "a mirrored disk template")
3443
3444     target_node = secondary_nodes[0]
3445     _CheckNodeOnline(self, target_node)
3446     _CheckNodeNotDrained(self, target_node)
3447     # check memory requirements on the secondary node
3448     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3449                          instance.name, bep[constants.BE_MEMORY],
3450                          instance.hypervisor)
3451
3452     # check bridge existance
3453     brlist = [nic.bridge for nic in instance.nics]
3454     result = self.rpc.call_bridges_exist(target_node, brlist)
3455     result.Raise()
3456     if not result.data:
3457       raise errors.OpPrereqError("One or more target bridges %s does not"
3458                                  " exist on destination node '%s'" %
3459                                  (brlist, target_node))
3460
3461   def Exec(self, feedback_fn):
3462     """Failover an instance.
3463
3464     The failover is done by shutting it down on its present node and
3465     starting it on the secondary.
3466
3467     """
3468     instance = self.instance
3469
3470     source_node = instance.primary_node
3471     target_node = instance.secondary_nodes[0]
3472
3473     feedback_fn("* checking disk consistency between source and target")
3474     for dev in instance.disks:
3475       # for drbd, these are drbd over lvm
3476       if not _CheckDiskConsistency(self, dev, target_node, False):
3477         if instance.admin_up and not self.op.ignore_consistency:
3478           raise errors.OpExecError("Disk %s is degraded on target node,"
3479                                    " aborting failover." % dev.iv_name)
3480
3481     feedback_fn("* shutting down instance on source node")
3482     logging.info("Shutting down instance %s on node %s",
3483                  instance.name, source_node)
3484
3485     result = self.rpc.call_instance_shutdown(source_node, instance)
3486     msg = result.RemoteFailMsg()
3487     if msg:
3488       if self.op.ignore_consistency:
3489         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3490                              " Proceeding anyway. Please make sure node"
3491                              " %s is down. Error details: %s",
3492                              instance.name, source_node, source_node, msg)
3493       else:
3494         raise errors.OpExecError("Could not shutdown instance %s on"
3495                                  " node %s: %s" %
3496                                  (instance.name, source_node, msg))
3497
3498     feedback_fn("* deactivating the instance's disks on source node")
3499     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3500       raise errors.OpExecError("Can't shut down the instance's disks.")
3501
3502     instance.primary_node = target_node
3503     # distribute new instance config to the other nodes
3504     self.cfg.Update(instance)
3505
3506     # Only start the instance if it's marked as up
3507     if instance.admin_up:
3508       feedback_fn("* activating the instance's disks on target node")
3509       logging.info("Starting instance %s on node %s",
3510                    instance.name, target_node)
3511
3512       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3513                                                ignore_secondaries=True)
3514       if not disks_ok:
3515         _ShutdownInstanceDisks(self, instance)
3516         raise errors.OpExecError("Can't activate the instance's disks")
3517
3518       feedback_fn("* starting the instance on the target node")
3519       result = self.rpc.call_instance_start(target_node, instance)
3520       msg = result.RemoteFailMsg()
3521       if msg:
3522         _ShutdownInstanceDisks(self, instance)
3523         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3524                                  (instance.name, target_node, msg))
3525
3526
3527 class LUMigrateInstance(LogicalUnit):
3528   """Migrate an instance.
3529
3530   This is migration without shutting down, compared to the failover,
3531   which is done with shutdown.
3532
3533   """
3534   HPATH = "instance-migrate"
3535   HTYPE = constants.HTYPE_INSTANCE
3536   _OP_REQP = ["instance_name", "live", "cleanup"]
3537
3538   REQ_BGL = False
3539
3540   def ExpandNames(self):
3541     self._ExpandAndLockInstance()
3542     self.needed_locks[locking.LEVEL_NODE] = []
3543     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3544
3545   def DeclareLocks(self, level):
3546     if level == locking.LEVEL_NODE:
3547       self._LockInstancesNodes()
3548
3549   def BuildHooksEnv(self):
3550     """Build hooks env.
3551
3552     This runs on master, primary and secondary nodes of the instance.
3553
3554     """
3555     env = _BuildInstanceHookEnvByObject(self, self.instance)
3556     env["MIGRATE_LIVE"] = self.op.live
3557     env["MIGRATE_CLEANUP"] = self.op.cleanup
3558     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3559     return env, nl, nl
3560
3561   def CheckPrereq(self):
3562     """Check prerequisites.
3563
3564     This checks that the instance is in the cluster.
3565
3566     """
3567     instance = self.cfg.GetInstanceInfo(
3568       self.cfg.ExpandInstanceName(self.op.instance_name))
3569     if instance is None:
3570       raise errors.OpPrereqError("Instance '%s' not known" %
3571                                  self.op.instance_name)
3572
3573     if instance.disk_template != constants.DT_DRBD8:
3574       raise errors.OpPrereqError("Instance's disk layout is not"
3575                                  " drbd8, cannot migrate.")
3576
3577     secondary_nodes = instance.secondary_nodes
3578     if not secondary_nodes:
3579       raise errors.ConfigurationError("No secondary node but using"
3580                                       " drbd8 disk template")
3581
3582     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3583
3584     target_node = secondary_nodes[0]
3585     # check memory requirements on the secondary node
3586     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3587                          instance.name, i_be[constants.BE_MEMORY],
3588                          instance.hypervisor)
3589
3590     # check bridge existance
3591     brlist = [nic.bridge for nic in instance.nics]
3592     result = self.rpc.call_bridges_exist(target_node, brlist)
3593     if result.failed or not result.data:
3594       raise errors.OpPrereqError("One or more target bridges %s does not"
3595                                  " exist on destination node '%s'" %
3596                                  (brlist, target_node))
3597
3598     if not self.op.cleanup:
3599       _CheckNodeNotDrained(self, target_node)
3600       result = self.rpc.call_instance_migratable(instance.primary_node,
3601                                                  instance)
3602       msg = result.RemoteFailMsg()
3603       if msg:
3604         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3605                                    msg)
3606
3607     self.instance = instance
3608
3609   def _WaitUntilSync(self):
3610     """Poll with custom rpc for disk sync.
3611
3612     This uses our own step-based rpc call.
3613
3614     """
3615     self.feedback_fn("* wait until resync is done")
3616     all_done = False
3617     while not all_done:
3618       all_done = True
3619       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3620                                             self.nodes_ip,
3621                                             self.instance.disks)
3622       min_percent = 100
3623       for node, nres in result.items():
3624         msg = nres.RemoteFailMsg()
3625         if msg:
3626           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3627                                    (node, msg))
3628         node_done, node_percent = nres.payload
3629         all_done = all_done and node_done
3630         if node_percent is not None:
3631           min_percent = min(min_percent, node_percent)
3632       if not all_done:
3633         if min_percent < 100:
3634           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3635         time.sleep(2)
3636
3637   def _EnsureSecondary(self, node):
3638     """Demote a node to secondary.
3639
3640     """
3641     self.feedback_fn("* switching node %s to secondary mode" % node)
3642
3643     for dev in self.instance.disks:
3644       self.cfg.SetDiskID(dev, node)
3645
3646     result = self.rpc.call_blockdev_close(node, self.instance.name,
3647                                           self.instance.disks)
3648     msg = result.RemoteFailMsg()
3649     if msg:
3650       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3651                                " error %s" % (node, msg))
3652
3653   def _GoStandalone(self):
3654     """Disconnect from the network.
3655
3656     """
3657     self.feedback_fn("* changing into standalone mode")
3658     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3659                                                self.instance.disks)
3660     for node, nres in result.items():
3661       msg = nres.RemoteFailMsg()
3662       if msg:
3663         raise errors.OpExecError("Cannot disconnect disks node %s,"
3664                                  " error %s" % (node, msg))
3665
3666   def _GoReconnect(self, multimaster):
3667     """Reconnect to the network.
3668
3669     """
3670     if multimaster:
3671       msg = "dual-master"
3672     else:
3673       msg = "single-master"
3674     self.feedback_fn("* changing disks into %s mode" % msg)
3675     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3676                                            self.instance.disks,
3677                                            self.instance.name, multimaster)
3678     for node, nres in result.items():
3679       msg = nres.RemoteFailMsg()
3680       if msg:
3681         raise errors.OpExecError("Cannot change disks config on node %s,"
3682                                  " error: %s" % (node, msg))
3683
3684   def _ExecCleanup(self):
3685     """Try to cleanup after a failed migration.
3686
3687     The cleanup is done by:
3688       - check that the instance is running only on one node
3689         (and update the config if needed)
3690       - change disks on its secondary node to secondary
3691       - wait until disks are fully synchronized
3692       - disconnect from the network
3693       - change disks into single-master mode
3694       - wait again until disks are fully synchronized
3695
3696     """
3697     instance = self.instance
3698     target_node = self.target_node
3699     source_node = self.source_node
3700
3701     # check running on only one node
3702     self.feedback_fn("* checking where the instance actually runs"
3703                      " (if this hangs, the hypervisor might be in"
3704                      " a bad state)")
3705     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3706     for node, result in ins_l.items():
3707       result.Raise()
3708       if not isinstance(result.data, list):
3709         raise errors.OpExecError("Can't contact node '%s'" % node)
3710
3711     runningon_source = instance.name in ins_l[source_node].data
3712     runningon_target = instance.name in ins_l[target_node].data
3713
3714     if runningon_source and runningon_target:
3715       raise errors.OpExecError("Instance seems to be running on two nodes,"
3716                                " or the hypervisor is confused. You will have"
3717                                " to ensure manually that it runs only on one"
3718                                " and restart this operation.")
3719
3720     if not (runningon_source or runningon_target):
3721       raise errors.OpExecError("Instance does not seem to be running at all."
3722                                " In this case, it's safer to repair by"
3723                                " running 'gnt-instance stop' to ensure disk"
3724                                " shutdown, and then restarting it.")
3725
3726     if runningon_target:
3727       # the migration has actually succeeded, we need to update the config
3728       self.feedback_fn("* instance running on secondary node (%s),"
3729                        " updating config" % target_node)
3730       instance.primary_node = target_node
3731       self.cfg.Update(instance)
3732       demoted_node = source_node
3733     else:
3734       self.feedback_fn("* instance confirmed to be running on its"
3735                        " primary node (%s)" % source_node)
3736       demoted_node = target_node
3737
3738     self._EnsureSecondary(demoted_node)
3739     try:
3740       self._WaitUntilSync()
3741     except errors.OpExecError:
3742       # we ignore here errors, since if the device is standalone, it
3743       # won't be able to sync
3744       pass
3745     self._GoStandalone()
3746     self._GoReconnect(False)
3747     self._WaitUntilSync()
3748
3749     self.feedback_fn("* done")
3750
3751   def _RevertDiskStatus(self):
3752     """Try to revert the disk status after a failed migration.
3753
3754     """
3755     target_node = self.target_node
3756     try:
3757       self._EnsureSecondary(target_node)
3758       self._GoStandalone()
3759       self._GoReconnect(False)
3760       self._WaitUntilSync()
3761     except errors.OpExecError, err:
3762       self.LogWarning("Migration failed and I can't reconnect the"
3763                       " drives: error '%s'\n"
3764                       "Please look and recover the instance status" %
3765                       str(err))
3766
3767   def _AbortMigration(self):
3768     """Call the hypervisor code to abort a started migration.
3769
3770     """
3771     instance = self.instance
3772     target_node = self.target_node
3773     migration_info = self.migration_info
3774
3775     abort_result = self.rpc.call_finalize_migration(target_node,
3776                                                     instance,
3777                                                     migration_info,
3778                                                     False)
3779     abort_msg = abort_result.RemoteFailMsg()
3780     if abort_msg:
3781       logging.error("Aborting migration failed on target node %s: %s" %
3782                     (target_node, abort_msg))
3783       # Don't raise an exception here, as we stil have to try to revert the
3784       # disk status, even if this step failed.
3785
3786   def _ExecMigration(self):
3787     """Migrate an instance.
3788
3789     The migrate is done by:
3790       - change the disks into dual-master mode
3791       - wait until disks are fully synchronized again
3792       - migrate the instance
3793       - change disks on the new secondary node (the old primary) to secondary
3794       - wait until disks are fully synchronized
3795       - change disks into single-master mode
3796
3797     """
3798     instance = self.instance
3799     target_node = self.target_node
3800     source_node = self.source_node
3801
3802     self.feedback_fn("* checking disk consistency between source and target")
3803     for dev in instance.disks:
3804       if not _CheckDiskConsistency(self, dev, target_node, False):
3805         raise errors.OpExecError("Disk %s is degraded or not fully"
3806                                  " synchronized on target node,"
3807                                  " aborting migrate." % dev.iv_name)
3808
3809     # First get the migration information from the remote node
3810     result = self.rpc.call_migration_info(source_node, instance)
3811     msg = result.RemoteFailMsg()
3812     if msg:
3813       log_err = ("Failed fetching source migration information from %s: %s" %
3814                  (source_node, msg))
3815       logging.error(log_err)
3816       raise errors.OpExecError(log_err)
3817
3818     self.migration_info = migration_info = result.payload
3819
3820     # Then switch the disks to master/master mode
3821     self._EnsureSecondary(target_node)
3822     self._GoStandalone()
3823     self._GoReconnect(True)
3824     self._WaitUntilSync()
3825
3826     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3827     result = self.rpc.call_accept_instance(target_node,
3828                                            instance,
3829                                            migration_info,
3830                                            self.nodes_ip[target_node])
3831
3832     msg = result.RemoteFailMsg()
3833     if msg:
3834       logging.error("Instance pre-migration failed, trying to revert"
3835                     " disk status: %s", msg)
3836       self._AbortMigration()
3837       self._RevertDiskStatus()
3838       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3839                                (instance.name, msg))
3840
3841     self.feedback_fn("* migrating instance to %s" % target_node)
3842     time.sleep(10)
3843     result = self.rpc.call_instance_migrate(source_node, instance,
3844                                             self.nodes_ip[target_node],
3845                                             self.op.live)
3846     msg = result.RemoteFailMsg()
3847     if msg:
3848       logging.error("Instance migration failed, trying to revert"
3849                     " disk status: %s", msg)
3850       self._AbortMigration()
3851       self._RevertDiskStatus()
3852       raise errors.OpExecError("Could not migrate instance %s: %s" %
3853                                (instance.name, msg))
3854     time.sleep(10)
3855
3856     instance.primary_node = target_node
3857     # distribute new instance config to the other nodes
3858     self.cfg.Update(instance)
3859
3860     result = self.rpc.call_finalize_migration(target_node,
3861                                               instance,
3862                                               migration_info,
3863                                               True)
3864     msg = result.RemoteFailMsg()
3865     if msg:
3866       logging.error("Instance migration succeeded, but finalization failed:"
3867                     " %s" % msg)
3868       raise errors.OpExecError("Could not finalize instance migration: %s" %
3869                                msg)
3870
3871     self._EnsureSecondary(source_node)
3872     self._WaitUntilSync()
3873     self._GoStandalone()
3874     self._GoReconnect(False)
3875     self._WaitUntilSync()
3876
3877     self.feedback_fn("* done")
3878
3879   def Exec(self, feedback_fn):
3880     """Perform the migration.
3881
3882     """
3883     self.feedback_fn = feedback_fn
3884
3885     self.source_node = self.instance.primary_node
3886     self.target_node = self.instance.secondary_nodes[0]
3887     self.all_nodes = [self.source_node, self.target_node]
3888     self.nodes_ip = {
3889       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3890       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3891       }
3892     if self.op.cleanup:
3893       return self._ExecCleanup()
3894     else:
3895       return self._ExecMigration()
3896
3897
3898 def _CreateBlockDev(lu, node, instance, device, force_create,
3899                     info, force_open):
3900   """Create a tree of block devices on a given node.
3901
3902   If this device type has to be created on secondaries, create it and
3903   all its children.
3904
3905   If not, just recurse to children keeping the same 'force' value.
3906
3907   @param lu: the lu on whose behalf we execute
3908   @param node: the node on which to create the device
3909   @type instance: L{objects.Instance}
3910   @param instance: the instance which owns the device
3911   @type device: L{objects.Disk}
3912   @param device: the device to create
3913   @type force_create: boolean
3914   @param force_create: whether to force creation of this device; this
3915       will be change to True whenever we find a device which has
3916       CreateOnSecondary() attribute
3917   @param info: the extra 'metadata' we should attach to the device
3918       (this will be represented as a LVM tag)
3919   @type force_open: boolean
3920   @param force_open: this parameter will be passes to the
3921       L{backend.BlockdevCreate} function where it specifies
3922       whether we run on primary or not, and it affects both
3923       the child assembly and the device own Open() execution
3924
3925   """
3926   if device.CreateOnSecondary():
3927     force_create = True
3928
3929   if device.children:
3930     for child in device.children:
3931       _CreateBlockDev(lu, node, instance, child, force_create,
3932                       info, force_open)
3933
3934   if not force_create:
3935     return
3936
3937   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3938
3939
3940 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3941   """Create a single block device on a given node.
3942
3943   This will not recurse over children of the device, so they must be
3944   created in advance.
3945
3946   @param lu: the lu on whose behalf we execute
3947   @param node: the node on which to create the device
3948   @type instance: L{objects.Instance}
3949   @param instance: the instance which owns the device
3950   @type device: L{objects.Disk}
3951   @param device: the device to create
3952   @param info: the extra 'metadata' we should attach to the device
3953       (this will be represented as a LVM tag)
3954   @type force_open: boolean
3955   @param force_open: this parameter will be passes to the
3956       L{backend.BlockdevCreate} function where it specifies
3957       whether we run on primary or not, and it affects both
3958       the child assembly and the device own Open() execution
3959
3960   """
3961   lu.cfg.SetDiskID(device, node)
3962   result = lu.rpc.call_blockdev_create(node, device, device.size,
3963                                        instance.name, force_open, info)
3964   msg = result.RemoteFailMsg()
3965   if msg:
3966     raise errors.OpExecError("Can't create block device %s on"
3967                              " node %s for instance %s: %s" %
3968                              (device, node, instance.name, msg))
3969   if device.physical_id is None:
3970     device.physical_id = result.payload
3971
3972
3973 def _GenerateUniqueNames(lu, exts):
3974   """Generate a suitable LV name.
3975
3976   This will generate a logical volume name for the given instance.
3977
3978   """
3979   results = []
3980   for val in exts:
3981     new_id = lu.cfg.GenerateUniqueID()
3982     results.append("%s%s" % (new_id, val))
3983   return results
3984
3985
3986 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3987                          p_minor, s_minor):
3988   """Generate a drbd8 device complete with its children.
3989
3990   """
3991   port = lu.cfg.AllocatePort()
3992   vgname = lu.cfg.GetVGName()
3993   shared_secret = lu.cfg.GenerateDRBDSecret()
3994   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3995                           logical_id=(vgname, names[0]))
3996   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3997                           logical_id=(vgname, names[1]))
3998   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3999                           logical_id=(primary, secondary, port,
4000                                       p_minor, s_minor,
4001                                       shared_secret),
4002                           children=[dev_data, dev_meta],
4003                           iv_name=iv_name)
4004   return drbd_dev
4005
4006
4007 def _GenerateDiskTemplate(lu, template_name,
4008                           instance_name, primary_node,
4009                           secondary_nodes, disk_info,
4010                           file_storage_dir, file_driver,
4011                           base_index):
4012   """Generate the entire disk layout for a given template type.
4013
4014   """
4015   #TODO: compute space requirements
4016
4017   vgname = lu.cfg.GetVGName()
4018   disk_count = len(disk_info)
4019   disks = []
4020   if template_name == constants.DT_DISKLESS:
4021     pass
4022   elif template_name == constants.DT_PLAIN:
4023     if len(secondary_nodes) != 0:
4024       raise errors.ProgrammerError("Wrong template configuration")
4025
4026     names = _GenerateUniqueNames(lu, [".disk%d" % i
4027                                       for i in range(disk_count)])
4028     for idx, disk in enumerate(disk_info):
4029       disk_index = idx + base_index
4030       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4031                               logical_id=(vgname, names[idx]),
4032                               iv_name="disk/%d" % disk_index,
4033                               mode=disk["mode"])
4034       disks.append(disk_dev)
4035   elif template_name == constants.DT_DRBD8:
4036     if len(secondary_nodes) != 1:
4037       raise errors.ProgrammerError("Wrong template configuration")
4038     remote_node = secondary_nodes[0]
4039     minors = lu.cfg.AllocateDRBDMinor(
4040       [primary_node, remote_node] * len(disk_info), instance_name)
4041
4042     names = []
4043     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4044                                                for i in range(disk_count)]):
4045       names.append(lv_prefix + "_data")
4046       names.append(lv_prefix + "_meta")
4047     for idx, disk in enumerate(disk_info):
4048       disk_index = idx + base_index
4049       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4050                                       disk["size"], names[idx*2:idx*2+2],
4051                                       "disk/%d" % disk_index,
4052                                       minors[idx*2], minors[idx*2+1])
4053       disk_dev.mode = disk["mode"]
4054       disks.append(disk_dev)
4055   elif template_name == constants.DT_FILE:
4056     if len(secondary_nodes) != 0:
4057       raise errors.ProgrammerError("Wrong template configuration")
4058
4059     for idx, disk in enumerate(disk_info):
4060       disk_index = idx + base_index
4061       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4062                               iv_name="disk/%d" % disk_index,
4063                               logical_id=(file_driver,
4064                                           "%s/disk%d" % (file_storage_dir,
4065                                                          disk_index)),
4066                               mode=disk["mode"])
4067       disks.append(disk_dev)
4068   else:
4069     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4070   return disks
4071
4072
4073 def _GetInstanceInfoText(instance):
4074   """Compute that text that should be added to the disk's metadata.
4075
4076   """
4077   return "originstname+%s" % instance.name
4078
4079
4080 def _CreateDisks(lu, instance):
4081   """Create all disks for an instance.
4082
4083   This abstracts away some work from AddInstance.
4084
4085   @type lu: L{LogicalUnit}
4086   @param lu: the logical unit on whose behalf we execute
4087   @type instance: L{objects.Instance}
4088   @param instance: the instance whose disks we should create
4089   @rtype: boolean
4090   @return: the success of the creation
4091
4092   """
4093   info = _GetInstanceInfoText(instance)
4094   pnode = instance.primary_node
4095
4096   if instance.disk_template == constants.DT_FILE:
4097     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4098     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4099
4100     if result.failed or not result.data:
4101       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4102
4103     if not result.data[0]:
4104       raise errors.OpExecError("Failed to create directory '%s'" %
4105                                file_storage_dir)
4106
4107   # Note: this needs to be kept in sync with adding of disks in
4108   # LUSetInstanceParams
4109   for device in instance.disks:
4110     logging.info("Creating volume %s for instance %s",
4111                  device.iv_name, instance.name)
4112     #HARDCODE
4113     for node in instance.all_nodes:
4114       f_create = node == pnode
4115       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4116
4117
4118 def _RemoveDisks(lu, instance):
4119   """Remove all disks for an instance.
4120
4121   This abstracts away some work from `AddInstance()` and
4122   `RemoveInstance()`. Note that in case some of the devices couldn't
4123   be removed, the removal will continue with the other ones (compare
4124   with `_CreateDisks()`).
4125
4126   @type lu: L{LogicalUnit}
4127   @param lu: the logical unit on whose behalf we execute
4128   @type instance: L{objects.Instance}
4129   @param instance: the instance whose disks we should remove
4130   @rtype: boolean
4131   @return: the success of the removal
4132
4133   """
4134   logging.info("Removing block devices for instance %s", instance.name)
4135
4136   all_result = True
4137   for device in instance.disks:
4138     for node, disk in device.ComputeNodeTree(instance.primary_node):
4139       lu.cfg.SetDiskID(disk, node)
4140       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4141       if msg:
4142         lu.LogWarning("Could not remove block device %s on node %s,"
4143                       " continuing anyway: %s", device.iv_name, node, msg)
4144         all_result = False
4145
4146   if instance.disk_template == constants.DT_FILE:
4147     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4148     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4149                                                  file_storage_dir)
4150     if result.failed or not result.data:
4151       logging.error("Could not remove directory '%s'", file_storage_dir)
4152       all_result = False
4153
4154   return all_result
4155
4156
4157 def _ComputeDiskSize(disk_template, disks):
4158   """Compute disk size requirements in the volume group
4159
4160   """
4161   # Required free disk space as a function of disk and swap space
4162   req_size_dict = {
4163     constants.DT_DISKLESS: None,
4164     constants.DT_PLAIN: sum(d["size"] for d in disks),
4165     # 128 MB are added for drbd metadata for each disk
4166     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4167     constants.DT_FILE: None,
4168   }
4169
4170   if disk_template not in req_size_dict:
4171     raise errors.ProgrammerError("Disk template '%s' size requirement"
4172                                  " is unknown" %  disk_template)
4173
4174   return req_size_dict[disk_template]
4175
4176
4177 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4178   """Hypervisor parameter validation.
4179
4180   This function abstract the hypervisor parameter validation to be
4181   used in both instance create and instance modify.
4182
4183   @type lu: L{LogicalUnit}
4184   @param lu: the logical unit for which we check
4185   @type nodenames: list
4186   @param nodenames: the list of nodes on which we should check
4187   @type hvname: string
4188   @param hvname: the name of the hypervisor we should use
4189   @type hvparams: dict
4190   @param hvparams: the parameters which we need to check
4191   @raise errors.OpPrereqError: if the parameters are not valid
4192
4193   """
4194   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4195                                                   hvname,
4196                                                   hvparams)
4197   for node in nodenames:
4198     info = hvinfo[node]
4199     if info.offline:
4200       continue
4201     msg = info.RemoteFailMsg()
4202     if msg:
4203       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4204                                  " %s" % msg)
4205
4206
4207 class LUCreateInstance(LogicalUnit):
4208   """Create an instance.
4209
4210   """
4211   HPATH = "instance-add"
4212   HTYPE = constants.HTYPE_INSTANCE
4213   _OP_REQP = ["instance_name", "disks", "disk_template",
4214               "mode", "start",
4215               "wait_for_sync", "ip_check", "nics",
4216               "hvparams", "beparams"]
4217   REQ_BGL = False
4218
4219   def _ExpandNode(self, node):
4220     """Expands and checks one node name.
4221
4222     """
4223     node_full = self.cfg.ExpandNodeName(node)
4224     if node_full is None:
4225       raise errors.OpPrereqError("Unknown node %s" % node)
4226     return node_full
4227
4228   def ExpandNames(self):
4229     """ExpandNames for CreateInstance.
4230
4231     Figure out the right locks for instance creation.
4232
4233     """
4234     self.needed_locks = {}
4235
4236     # set optional parameters to none if they don't exist
4237     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4238       if not hasattr(self.op, attr):
4239         setattr(self.op, attr, None)
4240
4241     # cheap checks, mostly valid constants given
4242
4243     # verify creation mode
4244     if self.op.mode not in (constants.INSTANCE_CREATE,
4245                             constants.INSTANCE_IMPORT):
4246       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4247                                  self.op.mode)
4248
4249     # disk template and mirror node verification
4250     if self.op.disk_template not in constants.DISK_TEMPLATES:
4251       raise errors.OpPrereqError("Invalid disk template name")
4252
4253     if self.op.hypervisor is None:
4254       self.op.hypervisor = self.cfg.GetHypervisorType()
4255
4256     cluster = self.cfg.GetClusterInfo()
4257     enabled_hvs = cluster.enabled_hypervisors
4258     if self.op.hypervisor not in enabled_hvs:
4259       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4260                                  " cluster (%s)" % (self.op.hypervisor,
4261                                   ",".join(enabled_hvs)))
4262
4263     # check hypervisor parameter syntax (locally)
4264     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4265     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4266                                   self.op.hvparams)
4267     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4268     hv_type.CheckParameterSyntax(filled_hvp)
4269
4270     # fill and remember the beparams dict
4271     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4272     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4273                                     self.op.beparams)
4274
4275     #### instance parameters check
4276
4277     # instance name verification
4278     hostname1 = utils.HostInfo(self.op.instance_name)
4279     self.op.instance_name = instance_name = hostname1.name
4280
4281     # this is just a preventive check, but someone might still add this
4282     # instance in the meantime, and creation will fail at lock-add time
4283     if instance_name in self.cfg.GetInstanceList():
4284       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4285                                  instance_name)
4286
4287     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4288
4289     # NIC buildup
4290     self.nics = []
4291     for nic in self.op.nics:
4292       # ip validity checks
4293       ip = nic.get("ip", None)
4294       if ip is None or ip.lower() == "none":
4295         nic_ip = None
4296       elif ip.lower() == constants.VALUE_AUTO:
4297         nic_ip = hostname1.ip
4298       else:
4299         if not utils.IsValidIP(ip):
4300           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4301                                      " like a valid IP" % ip)
4302         nic_ip = ip
4303
4304       # MAC address verification
4305       mac = nic.get("mac", constants.VALUE_AUTO)
4306       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4307         if not utils.IsValidMac(mac.lower()):
4308           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4309                                      mac)
4310       # bridge verification
4311       bridge = nic.get("bridge", None)
4312       if bridge is None:
4313         bridge = self.cfg.GetDefBridge()
4314       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4315
4316     # disk checks/pre-build
4317     self.disks = []
4318     for disk in self.op.disks:
4319       mode = disk.get("mode", constants.DISK_RDWR)
4320       if mode not in constants.DISK_ACCESS_SET:
4321         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4322                                    mode)
4323       size = disk.get("size", None)
4324       if size is None:
4325         raise errors.OpPrereqError("Missing disk size")
4326       try:
4327         size = int(size)
4328       except ValueError:
4329         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4330       self.disks.append({"size": size, "mode": mode})
4331
4332     # used in CheckPrereq for ip ping check
4333     self.check_ip = hostname1.ip
4334
4335     # file storage checks
4336     if (self.op.file_driver and
4337         not self.op.file_driver in constants.FILE_DRIVER):
4338       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4339                                  self.op.file_driver)
4340
4341     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4342       raise errors.OpPrereqError("File storage directory path not absolute")
4343
4344     ### Node/iallocator related checks
4345     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4346       raise errors.OpPrereqError("One and only one of iallocator and primary"
4347                                  " node must be given")
4348
4349     if self.op.iallocator:
4350       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4351     else:
4352       self.op.pnode = self._ExpandNode(self.op.pnode)
4353       nodelist = [self.op.pnode]
4354       if self.op.snode is not None:
4355         self.op.snode = self._ExpandNode(self.op.snode)
4356         nodelist.append(self.op.snode)
4357       self.needed_locks[locking.LEVEL_NODE] = nodelist
4358
4359     # in case of import lock the source node too
4360     if self.op.mode == constants.INSTANCE_IMPORT:
4361       src_node = getattr(self.op, "src_node", None)
4362       src_path = getattr(self.op, "src_path", None)
4363
4364       if src_path is None:
4365         self.op.src_path = src_path = self.op.instance_name
4366
4367       if src_node is None:
4368         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4369         self.op.src_node = None
4370         if os.path.isabs(src_path):
4371           raise errors.OpPrereqError("Importing an instance from an absolute"
4372                                      " path requires a source node option.")
4373       else:
4374         self.op.src_node = src_node = self._ExpandNode(src_node)
4375         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4376           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4377         if not os.path.isabs(src_path):
4378           self.op.src_path = src_path = \
4379             os.path.join(constants.EXPORT_DIR, src_path)
4380
4381     else: # INSTANCE_CREATE
4382       if getattr(self.op, "os_type", None) is None:
4383         raise errors.OpPrereqError("No guest OS specified")
4384
4385   def _RunAllocator(self):
4386     """Run the allocator based on input opcode.
4387
4388     """
4389     nics = [n.ToDict() for n in self.nics]
4390     ial = IAllocator(self,
4391                      mode=constants.IALLOCATOR_MODE_ALLOC,
4392                      name=self.op.instance_name,
4393                      disk_template=self.op.disk_template,
4394                      tags=[],
4395                      os=self.op.os_type,
4396                      vcpus=self.be_full[constants.BE_VCPUS],
4397                      mem_size=self.be_full[constants.BE_MEMORY],
4398                      disks=self.disks,
4399                      nics=nics,
4400                      hypervisor=self.op.hypervisor,
4401                      )
4402
4403     ial.Run(self.op.iallocator)
4404
4405     if not ial.success:
4406       raise errors.OpPrereqError("Can't compute nodes using"
4407                                  " iallocator '%s': %s" % (self.op.iallocator,
4408                                                            ial.info))
4409     if len(ial.nodes) != ial.required_nodes:
4410       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4411                                  " of nodes (%s), required %s" %
4412                                  (self.op.iallocator, len(ial.nodes),
4413                                   ial.required_nodes))
4414     self.op.pnode = ial.nodes[0]
4415     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4416                  self.op.instance_name, self.op.iallocator,
4417                  ", ".join(ial.nodes))
4418     if ial.required_nodes == 2:
4419       self.op.snode = ial.nodes[1]
4420
4421   def BuildHooksEnv(self):
4422     """Build hooks env.
4423
4424     This runs on master, primary and secondary nodes of the instance.
4425
4426     """
4427     env = {
4428       "ADD_MODE": self.op.mode,
4429       }
4430     if self.op.mode == constants.INSTANCE_IMPORT:
4431       env["SRC_NODE"] = self.op.src_node
4432       env["SRC_PATH"] = self.op.src_path
4433       env["SRC_IMAGES"] = self.src_images
4434
4435     env.update(_BuildInstanceHookEnv(
4436       name=self.op.instance_name,
4437       primary_node=self.op.pnode,
4438       secondary_nodes=self.secondaries,
4439       status=self.op.start,
4440       os_type=self.op.os_type,
4441       memory=self.be_full[constants.BE_MEMORY],
4442       vcpus=self.be_full[constants.BE_VCPUS],
4443       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4444       disk_template=self.op.disk_template,
4445       disks=[(d["size"], d["mode"]) for d in self.disks],
4446     ))
4447
4448     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4449           self.secondaries)
4450     return env, nl, nl
4451
4452
4453   def CheckPrereq(self):
4454     """Check prerequisites.
4455
4456     """
4457     if (not self.cfg.GetVGName() and
4458         self.op.disk_template not in constants.DTS_NOT_LVM):
4459       raise errors.OpPrereqError("Cluster does not support lvm-based"
4460                                  " instances")
4461
4462     if self.op.mode == constants.INSTANCE_IMPORT:
4463       src_node = self.op.src_node
4464       src_path = self.op.src_path
4465
4466       if src_node is None:
4467         exp_list = self.rpc.call_export_list(
4468           self.acquired_locks[locking.LEVEL_NODE])
4469         found = False
4470         for node in exp_list:
4471           if not exp_list[node].failed and src_path in exp_list[node].data:
4472             found = True
4473             self.op.src_node = src_node = node
4474             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4475                                                        src_path)
4476             break
4477         if not found:
4478           raise errors.OpPrereqError("No export found for relative path %s" %
4479                                       src_path)
4480
4481       _CheckNodeOnline(self, src_node)
4482       result = self.rpc.call_export_info(src_node, src_path)
4483       result.Raise()
4484       if not result.data:
4485         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4486
4487       export_info = result.data
4488       if not export_info.has_section(constants.INISECT_EXP):
4489         raise errors.ProgrammerError("Corrupted export config")
4490
4491       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4492       if (int(ei_version) != constants.EXPORT_VERSION):
4493         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4494                                    (ei_version, constants.EXPORT_VERSION))
4495
4496       # Check that the new instance doesn't have less disks than the export
4497       instance_disks = len(self.disks)
4498       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4499       if instance_disks < export_disks:
4500         raise errors.OpPrereqError("Not enough disks to import."
4501                                    " (instance: %d, export: %d)" %
4502                                    (instance_disks, export_disks))
4503
4504       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4505       disk_images = []
4506       for idx in range(export_disks):
4507         option = 'disk%d_dump' % idx
4508         if export_info.has_option(constants.INISECT_INS, option):
4509           # FIXME: are the old os-es, disk sizes, etc. useful?
4510           export_name = export_info.get(constants.INISECT_INS, option)
4511           image = os.path.join(src_path, export_name)
4512           disk_images.append(image)
4513         else:
4514           disk_images.append(False)
4515
4516       self.src_images = disk_images
4517
4518       old_name = export_info.get(constants.INISECT_INS, 'name')
4519       # FIXME: int() here could throw a ValueError on broken exports
4520       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4521       if self.op.instance_name == old_name:
4522         for idx, nic in enumerate(self.nics):
4523           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4524             nic_mac_ini = 'nic%d_mac' % idx
4525             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4526
4527     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4528     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4529     if self.op.start and not self.op.ip_check:
4530       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4531                                  " adding an instance in start mode")
4532
4533     if self.op.ip_check:
4534       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4535         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4536                                    (self.check_ip, self.op.instance_name))
4537
4538     #### mac address generation
4539     # By generating here the mac address both the allocator and the hooks get
4540     # the real final mac address rather than the 'auto' or 'generate' value.
4541     # There is a race condition between the generation and the instance object
4542     # creation, which means that we know the mac is valid now, but we're not
4543     # sure it will be when we actually add the instance. If things go bad
4544     # adding the instance will abort because of a duplicate mac, and the
4545     # creation job will fail.
4546     for nic in self.nics:
4547       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4548         nic.mac = self.cfg.GenerateMAC()
4549
4550     #### allocator run
4551
4552     if self.op.iallocator is not None:
4553       self._RunAllocator()
4554
4555     #### node related checks
4556
4557     # check primary node
4558     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4559     assert self.pnode is not None, \
4560       "Cannot retrieve locked node %s" % self.op.pnode
4561     if pnode.offline:
4562       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4563                                  pnode.name)
4564     if pnode.drained:
4565       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4566                                  pnode.name)
4567
4568     self.secondaries = []
4569
4570     # mirror node verification
4571     if self.op.disk_template in constants.DTS_NET_MIRROR:
4572       if self.op.snode is None:
4573         raise errors.OpPrereqError("The networked disk templates need"
4574                                    " a mirror node")
4575       if self.op.snode == pnode.name:
4576         raise errors.OpPrereqError("The secondary node cannot be"
4577                                    " the primary node.")
4578       _CheckNodeOnline(self, self.op.snode)
4579       _CheckNodeNotDrained(self, self.op.snode)
4580       self.secondaries.append(self.op.snode)
4581
4582     nodenames = [pnode.name] + self.secondaries
4583
4584     req_size = _ComputeDiskSize(self.op.disk_template,
4585                                 self.disks)
4586
4587     # Check lv size requirements
4588     if req_size is not None:
4589       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4590                                          self.op.hypervisor)
4591       for node in nodenames:
4592         info = nodeinfo[node]
4593         info.Raise()
4594         info = info.data
4595         if not info:
4596           raise errors.OpPrereqError("Cannot get current information"
4597                                      " from node '%s'" % node)
4598         vg_free = info.get('vg_free', None)
4599         if not isinstance(vg_free, int):
4600           raise errors.OpPrereqError("Can't compute free disk space on"
4601                                      " node %s" % node)
4602         if req_size > info['vg_free']:
4603           raise errors.OpPrereqError("Not enough disk space on target node %s."
4604                                      " %d MB available, %d MB required" %
4605                                      (node, info['vg_free'], req_size))
4606
4607     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4608
4609     # os verification
4610     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4611     result.Raise()
4612     if not isinstance(result.data, objects.OS):
4613       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4614                                  " primary node"  % self.op.os_type)
4615
4616     # bridge check on primary node
4617     bridges = [n.bridge for n in self.nics]
4618     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4619     result.Raise()
4620     if not result.data:
4621       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4622                                  " exist on destination node '%s'" %
4623                                  (",".join(bridges), pnode.name))
4624
4625     # memory check on primary node
4626     if self.op.start:
4627       _CheckNodeFreeMemory(self, self.pnode.name,
4628                            "creating instance %s" % self.op.instance_name,
4629                            self.be_full[constants.BE_MEMORY],
4630                            self.op.hypervisor)
4631
4632   def Exec(self, feedback_fn):
4633     """Create and add the instance to the cluster.
4634
4635     """
4636     instance = self.op.instance_name
4637     pnode_name = self.pnode.name
4638
4639     ht_kind = self.op.hypervisor
4640     if ht_kind in constants.HTS_REQ_PORT:
4641       network_port = self.cfg.AllocatePort()
4642     else:
4643       network_port = None
4644
4645     ##if self.op.vnc_bind_address is None:
4646     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4647
4648     # this is needed because os.path.join does not accept None arguments
4649     if self.op.file_storage_dir is None:
4650       string_file_storage_dir = ""
4651     else:
4652       string_file_storage_dir = self.op.file_storage_dir
4653
4654     # build the full file storage dir path
4655     file_storage_dir = os.path.normpath(os.path.join(
4656                                         self.cfg.GetFileStorageDir(),
4657                                         string_file_storage_dir, instance))
4658
4659
4660     disks = _GenerateDiskTemplate(self,
4661                                   self.op.disk_template,
4662                                   instance, pnode_name,
4663                                   self.secondaries,
4664                                   self.disks,
4665                                   file_storage_dir,
4666                                   self.op.file_driver,
4667                                   0)
4668
4669     iobj = objects.Instance(name=instance, os=self.op.os_type,
4670                             primary_node=pnode_name,
4671                             nics=self.nics, disks=disks,
4672                             disk_template=self.op.disk_template,
4673                             admin_up=False,
4674                             network_port=network_port,
4675                             beparams=self.op.beparams,
4676                             hvparams=self.op.hvparams,
4677                             hypervisor=self.op.hypervisor,
4678                             )
4679
4680     feedback_fn("* creating instance disks...")
4681     try:
4682       _CreateDisks(self, iobj)
4683     except errors.OpExecError:
4684       self.LogWarning("Device creation failed, reverting...")
4685       try:
4686         _RemoveDisks(self, iobj)
4687       finally:
4688         self.cfg.ReleaseDRBDMinors(instance)
4689         raise
4690
4691     feedback_fn("adding instance %s to cluster config" % instance)
4692
4693     self.cfg.AddInstance(iobj)
4694     # Declare that we don't want to remove the instance lock anymore, as we've
4695     # added the instance to the config
4696     del self.remove_locks[locking.LEVEL_INSTANCE]
4697     # Unlock all the nodes
4698     if self.op.mode == constants.INSTANCE_IMPORT:
4699       nodes_keep = [self.op.src_node]
4700       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4701                        if node != self.op.src_node]
4702       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4703       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4704     else:
4705       self.context.glm.release(locking.LEVEL_NODE)
4706       del self.acquired_locks[locking.LEVEL_NODE]
4707
4708     if self.op.wait_for_sync:
4709       disk_abort = not _WaitForSync(self, iobj)
4710     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4711       # make sure the disks are not degraded (still sync-ing is ok)
4712       time.sleep(15)
4713       feedback_fn("* checking mirrors status")
4714       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4715     else:
4716       disk_abort = False
4717
4718     if disk_abort:
4719       _RemoveDisks(self, iobj)
4720       self.cfg.RemoveInstance(iobj.name)
4721       # Make sure the instance lock gets removed
4722       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4723       raise errors.OpExecError("There are some degraded disks for"
4724                                " this instance")
4725
4726     feedback_fn("creating os for instance %s on node %s" %
4727                 (instance, pnode_name))
4728
4729     if iobj.disk_template != constants.DT_DISKLESS:
4730       if self.op.mode == constants.INSTANCE_CREATE:
4731         feedback_fn("* running the instance OS create scripts...")
4732         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4733         msg = result.RemoteFailMsg()
4734         if msg:
4735           raise errors.OpExecError("Could not add os for instance %s"
4736                                    " on node %s: %s" %
4737                                    (instance, pnode_name, msg))
4738
4739       elif self.op.mode == constants.INSTANCE_IMPORT:
4740         feedback_fn("* running the instance OS import scripts...")
4741         src_node = self.op.src_node
4742         src_images = self.src_images
4743         cluster_name = self.cfg.GetClusterName()
4744         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4745                                                          src_node, src_images,
4746                                                          cluster_name)
4747         import_result.Raise()
4748         for idx, result in enumerate(import_result.data):
4749           if not result:
4750             self.LogWarning("Could not import the image %s for instance"
4751                             " %s, disk %d, on node %s" %
4752                             (src_images[idx], instance, idx, pnode_name))
4753       else:
4754         # also checked in the prereq part
4755         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4756                                      % self.op.mode)
4757
4758     if self.op.start:
4759       iobj.admin_up = True
4760       self.cfg.Update(iobj)
4761       logging.info("Starting instance %s on node %s", instance, pnode_name)
4762       feedback_fn("* starting instance...")
4763       result = self.rpc.call_instance_start(pnode_name, iobj)
4764       msg = result.RemoteFailMsg()
4765       if msg:
4766         raise errors.OpExecError("Could not start instance: %s" % msg)
4767
4768
4769 class LUConnectConsole(NoHooksLU):
4770   """Connect to an instance's console.
4771
4772   This is somewhat special in that it returns the command line that
4773   you need to run on the master node in order to connect to the
4774   console.
4775
4776   """
4777   _OP_REQP = ["instance_name"]
4778   REQ_BGL = False
4779
4780   def ExpandNames(self):
4781     self._ExpandAndLockInstance()
4782
4783   def CheckPrereq(self):
4784     """Check prerequisites.
4785
4786     This checks that the instance is in the cluster.
4787
4788     """
4789     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4790     assert self.instance is not None, \
4791       "Cannot retrieve locked instance %s" % self.op.instance_name
4792     _CheckNodeOnline(self, self.instance.primary_node)
4793
4794   def Exec(self, feedback_fn):
4795     """Connect to the console of an instance
4796
4797     """
4798     instance = self.instance
4799     node = instance.primary_node
4800
4801     node_insts = self.rpc.call_instance_list([node],
4802                                              [instance.hypervisor])[node]
4803     node_insts.Raise()
4804
4805     if instance.name not in node_insts.data:
4806       raise errors.OpExecError("Instance %s is not running." % instance.name)
4807
4808     logging.debug("Connecting to console of %s on %s", instance.name, node)
4809
4810     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4811     cluster = self.cfg.GetClusterInfo()
4812     # beparams and hvparams are passed separately, to avoid editing the
4813     # instance and then saving the defaults in the instance itself.
4814     hvparams = cluster.FillHV(instance)
4815     beparams = cluster.FillBE(instance)
4816     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4817
4818     # build ssh cmdline
4819     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4820
4821
4822 class LUReplaceDisks(LogicalUnit):
4823   """Replace the disks of an instance.
4824
4825   """
4826   HPATH = "mirrors-replace"
4827   HTYPE = constants.HTYPE_INSTANCE
4828   _OP_REQP = ["instance_name", "mode", "disks"]
4829   REQ_BGL = False
4830
4831   def CheckArguments(self):
4832     if not hasattr(self.op, "remote_node"):
4833       self.op.remote_node = None
4834     if not hasattr(self.op, "iallocator"):
4835       self.op.iallocator = None
4836
4837     # check for valid parameter combination
4838     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4839     if self.op.mode == constants.REPLACE_DISK_CHG:
4840       if cnt == 2:
4841         raise errors.OpPrereqError("When changing the secondary either an"
4842                                    " iallocator script must be used or the"
4843                                    " new node given")
4844       elif cnt == 0:
4845         raise errors.OpPrereqError("Give either the iallocator or the new"
4846                                    " secondary, not both")
4847     else: # not replacing the secondary
4848       if cnt != 2:
4849         raise errors.OpPrereqError("The iallocator and new node options can"
4850                                    " be used only when changing the"
4851                                    " secondary node")
4852
4853   def ExpandNames(self):
4854     self._ExpandAndLockInstance()
4855
4856     if self.op.iallocator is not None:
4857       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4858     elif self.op.remote_node is not None:
4859       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4860       if remote_node is None:
4861         raise errors.OpPrereqError("Node '%s' not known" %
4862                                    self.op.remote_node)
4863       self.op.remote_node = remote_node
4864       # Warning: do not remove the locking of the new secondary here
4865       # unless DRBD8.AddChildren is changed to work in parallel;
4866       # currently it doesn't since parallel invocations of
4867       # FindUnusedMinor will conflict
4868       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4869       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4870     else:
4871       self.needed_locks[locking.LEVEL_NODE] = []
4872       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4873
4874   def DeclareLocks(self, level):
4875     # If we're not already locking all nodes in the set we have to declare the
4876     # instance's primary/secondary nodes.
4877     if (level == locking.LEVEL_NODE and
4878         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4879       self._LockInstancesNodes()
4880
4881   def _RunAllocator(self):
4882     """Compute a new secondary node using an IAllocator.
4883
4884     """
4885     ial = IAllocator(self,
4886                      mode=constants.IALLOCATOR_MODE_RELOC,
4887                      name=self.op.instance_name,
4888                      relocate_from=[self.sec_node])
4889
4890     ial.Run(self.op.iallocator)
4891
4892     if not ial.success:
4893       raise errors.OpPrereqError("Can't compute nodes using"
4894                                  " iallocator '%s': %s" % (self.op.iallocator,
4895                                                            ial.info))
4896     if len(ial.nodes) != ial.required_nodes:
4897       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4898                                  " of nodes (%s), required %s" %
4899                                  (len(ial.nodes), ial.required_nodes))
4900     self.op.remote_node = ial.nodes[0]
4901     self.LogInfo("Selected new secondary for the instance: %s",
4902                  self.op.remote_node)
4903
4904   def BuildHooksEnv(self):
4905     """Build hooks env.
4906
4907     This runs on the master, the primary and all the secondaries.
4908
4909     """
4910     env = {
4911       "MODE": self.op.mode,
4912       "NEW_SECONDARY": self.op.remote_node,
4913       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4914       }
4915     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4916     nl = [
4917       self.cfg.GetMasterNode(),
4918       self.instance.primary_node,
4919       ]
4920     if self.op.remote_node is not None:
4921       nl.append(self.op.remote_node)
4922     return env, nl, nl
4923
4924   def CheckPrereq(self):
4925     """Check prerequisites.
4926
4927     This checks that the instance is in the cluster.
4928
4929     """
4930     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4931     assert instance is not None, \
4932       "Cannot retrieve locked instance %s" % self.op.instance_name
4933     self.instance = instance
4934
4935     if instance.disk_template != constants.DT_DRBD8:
4936       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4937                                  " instances")
4938
4939     if len(instance.secondary_nodes) != 1:
4940       raise errors.OpPrereqError("The instance has a strange layout,"
4941                                  " expected one secondary but found %d" %
4942                                  len(instance.secondary_nodes))
4943
4944     self.sec_node = instance.secondary_nodes[0]
4945
4946     if self.op.iallocator is not None:
4947       self._RunAllocator()
4948
4949     remote_node = self.op.remote_node
4950     if remote_node is not None:
4951       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4952       assert self.remote_node_info is not None, \
4953         "Cannot retrieve locked node %s" % remote_node
4954     else:
4955       self.remote_node_info = None
4956     if remote_node == instance.primary_node:
4957       raise errors.OpPrereqError("The specified node is the primary node of"
4958                                  " the instance.")
4959     elif remote_node == self.sec_node:
4960       raise errors.OpPrereqError("The specified node is already the"
4961                                  " secondary node of the instance.")
4962
4963     if self.op.mode == constants.REPLACE_DISK_PRI:
4964       n1 = self.tgt_node = instance.primary_node
4965       n2 = self.oth_node = self.sec_node
4966     elif self.op.mode == constants.REPLACE_DISK_SEC:
4967       n1 = self.tgt_node = self.sec_node
4968       n2 = self.oth_node = instance.primary_node
4969     elif self.op.mode == constants.REPLACE_DISK_CHG:
4970       n1 = self.new_node = remote_node
4971       n2 = self.oth_node = instance.primary_node
4972       self.tgt_node = self.sec_node
4973       _CheckNodeNotDrained(self, remote_node)
4974     else:
4975       raise errors.ProgrammerError("Unhandled disk replace mode")
4976
4977     _CheckNodeOnline(self, n1)
4978     _CheckNodeOnline(self, n2)
4979
4980     if not self.op.disks:
4981       self.op.disks = range(len(instance.disks))
4982
4983     for disk_idx in self.op.disks:
4984       instance.FindDisk(disk_idx)
4985
4986   def _ExecD8DiskOnly(self, feedback_fn):
4987     """Replace a disk on the primary or secondary for dbrd8.
4988
4989     The algorithm for replace is quite complicated:
4990
4991       1. for each disk to be replaced:
4992
4993         1. create new LVs on the target node with unique names
4994         1. detach old LVs from the drbd device
4995         1. rename old LVs to name_replaced.<time_t>
4996         1. rename new LVs to old LVs
4997         1. attach the new LVs (with the old names now) to the drbd device
4998
4999       1. wait for sync across all devices
5000
5001       1. for each modified disk:
5002
5003         1. remove old LVs (which have the name name_replaces.<time_t>)
5004
5005     Failures are not very well handled.
5006
5007     """
5008     steps_total = 6
5009     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5010     instance = self.instance
5011     iv_names = {}
5012     vgname = self.cfg.GetVGName()
5013     # start of work
5014     cfg = self.cfg
5015     tgt_node = self.tgt_node
5016     oth_node = self.oth_node
5017
5018     # Step: check device activation
5019     self.proc.LogStep(1, steps_total, "check device existence")
5020     info("checking volume groups")
5021     my_vg = cfg.GetVGName()
5022     results = self.rpc.call_vg_list([oth_node, tgt_node])
5023     if not results:
5024       raise errors.OpExecError("Can't list volume groups on the nodes")
5025     for node in oth_node, tgt_node:
5026       res = results[node]
5027       if res.failed or not res.data or my_vg not in res.data:
5028         raise errors.OpExecError("Volume group '%s' not found on %s" %
5029                                  (my_vg, node))
5030     for idx, dev in enumerate(instance.disks):
5031       if idx not in self.op.disks:
5032         continue
5033       for node in tgt_node, oth_node:
5034         info("checking disk/%d on %s" % (idx, node))
5035         cfg.SetDiskID(dev, node)
5036         result = self.rpc.call_blockdev_find(node, dev)
5037         msg = result.RemoteFailMsg()
5038         if not msg and not result.payload:
5039           msg = "disk not found"
5040         if msg:
5041           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5042                                    (idx, node, msg))
5043
5044     # Step: check other node consistency
5045     self.proc.LogStep(2, steps_total, "check peer consistency")
5046     for idx, dev in enumerate(instance.disks):
5047       if idx not in self.op.disks:
5048         continue
5049       info("checking disk/%d consistency on %s" % (idx, oth_node))
5050       if not _CheckDiskConsistency(self, dev, oth_node,
5051                                    oth_node==instance.primary_node):
5052         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5053                                  " to replace disks on this node (%s)" %
5054                                  (oth_node, tgt_node))
5055
5056     # Step: create new storage
5057     self.proc.LogStep(3, steps_total, "allocate new storage")
5058     for idx, dev in enumerate(instance.disks):
5059       if idx not in self.op.disks:
5060         continue
5061       size = dev.size
5062       cfg.SetDiskID(dev, tgt_node)
5063       lv_names = [".disk%d_%s" % (idx, suf)
5064                   for suf in ["data", "meta"]]
5065       names = _GenerateUniqueNames(self, lv_names)
5066       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5067                              logical_id=(vgname, names[0]))
5068       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5069                              logical_id=(vgname, names[1]))
5070       new_lvs = [lv_data, lv_meta]
5071       old_lvs = dev.children
5072       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5073       info("creating new local storage on %s for %s" %
5074            (tgt_node, dev.iv_name))
5075       # we pass force_create=True to force the LVM creation
5076       for new_lv in new_lvs:
5077         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5078                         _GetInstanceInfoText(instance), False)
5079
5080     # Step: for each lv, detach+rename*2+attach
5081     self.proc.LogStep(4, steps_total, "change drbd configuration")
5082     for dev, old_lvs, new_lvs in iv_names.itervalues():
5083       info("detaching %s drbd from local storage" % dev.iv_name)
5084       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5085       result.Raise()
5086       if not result.data:
5087         raise errors.OpExecError("Can't detach drbd from local storage on node"
5088                                  " %s for device %s" % (tgt_node, dev.iv_name))
5089       #dev.children = []
5090       #cfg.Update(instance)
5091
5092       # ok, we created the new LVs, so now we know we have the needed
5093       # storage; as such, we proceed on the target node to rename
5094       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5095       # using the assumption that logical_id == physical_id (which in
5096       # turn is the unique_id on that node)
5097
5098       # FIXME(iustin): use a better name for the replaced LVs
5099       temp_suffix = int(time.time())
5100       ren_fn = lambda d, suff: (d.physical_id[0],
5101                                 d.physical_id[1] + "_replaced-%s" % suff)
5102       # build the rename list based on what LVs exist on the node
5103       rlist = []
5104       for to_ren in old_lvs:
5105         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5106         if not result.RemoteFailMsg() and result.payload:
5107           # device exists
5108           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5109
5110       info("renaming the old LVs on the target node")
5111       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5112       result.Raise()
5113       if not result.data:
5114         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5115       # now we rename the new LVs to the old LVs
5116       info("renaming the new LVs on the target node")
5117       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5118       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5119       result.Raise()
5120       if not result.data:
5121         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5122
5123       for old, new in zip(old_lvs, new_lvs):
5124         new.logical_id = old.logical_id
5125         cfg.SetDiskID(new, tgt_node)
5126
5127       for disk in old_lvs:
5128         disk.logical_id = ren_fn(disk, temp_suffix)
5129         cfg.SetDiskID(disk, tgt_node)
5130
5131       # now that the new lvs have the old name, we can add them to the device
5132       info("adding new mirror component on %s" % tgt_node)
5133       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5134       if result.failed or not result.data:
5135         for new_lv in new_lvs:
5136           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5137           if msg:
5138             warning("Can't rollback device %s: %s", dev, msg,
5139                     hint="cleanup manually the unused logical volumes")
5140         raise errors.OpExecError("Can't add local storage to drbd")
5141
5142       dev.children = new_lvs
5143       cfg.Update(instance)
5144
5145     # Step: wait for sync
5146
5147     # this can fail as the old devices are degraded and _WaitForSync
5148     # does a combined result over all disks, so we don't check its
5149     # return value
5150     self.proc.LogStep(5, steps_total, "sync devices")
5151     _WaitForSync(self, instance, unlock=True)
5152
5153     # so check manually all the devices
5154     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5155       cfg.SetDiskID(dev, instance.primary_node)
5156       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5157       msg = result.RemoteFailMsg()
5158       if not msg and not result.payload:
5159         msg = "disk not found"
5160       if msg:
5161         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5162                                  (name, msg))
5163       if result.payload[5]:
5164         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5165
5166     # Step: remove old storage
5167     self.proc.LogStep(6, steps_total, "removing old storage")
5168     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5169       info("remove logical volumes for %s" % name)
5170       for lv in old_lvs:
5171         cfg.SetDiskID(lv, tgt_node)
5172         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5173         if msg:
5174           warning("Can't remove old LV: %s" % msg,
5175                   hint="manually remove unused LVs")
5176           continue
5177
5178   def _ExecD8Secondary(self, feedback_fn):
5179     """Replace the secondary node for drbd8.
5180
5181     The algorithm for replace is quite complicated:
5182       - for all disks of the instance:
5183         - create new LVs on the new node with same names
5184         - shutdown the drbd device on the old secondary
5185         - disconnect the drbd network on the primary
5186         - create the drbd device on the new secondary
5187         - network attach the drbd on the primary, using an artifice:
5188           the drbd code for Attach() will connect to the network if it
5189           finds a device which is connected to the good local disks but
5190           not network enabled
5191       - wait for sync across all devices
5192       - remove all disks from the old secondary
5193
5194     Failures are not very well handled.
5195
5196     """
5197     steps_total = 6
5198     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5199     instance = self.instance
5200     iv_names = {}
5201     # start of work
5202     cfg = self.cfg
5203     old_node = self.tgt_node
5204     new_node = self.new_node
5205     pri_node = instance.primary_node
5206     nodes_ip = {
5207       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5208       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5209       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5210       }
5211
5212     # Step: check device activation
5213     self.proc.LogStep(1, steps_total, "check device existence")
5214     info("checking volume groups")
5215     my_vg = cfg.GetVGName()
5216     results = self.rpc.call_vg_list([pri_node, new_node])
5217     for node in pri_node, new_node:
5218       res = results[node]
5219       if res.failed or not res.data or my_vg not in res.data:
5220         raise errors.OpExecError("Volume group '%s' not found on %s" %
5221                                  (my_vg, node))
5222     for idx, dev in enumerate(instance.disks):
5223       if idx not in self.op.disks:
5224         continue
5225       info("checking disk/%d on %s" % (idx, pri_node))
5226       cfg.SetDiskID(dev, pri_node)
5227       result = self.rpc.call_blockdev_find(pri_node, dev)
5228       msg = result.RemoteFailMsg()
5229       if not msg and not result.payload:
5230         msg = "disk not found"
5231       if msg:
5232         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5233                                  (idx, pri_node, msg))
5234
5235     # Step: check other node consistency
5236     self.proc.LogStep(2, steps_total, "check peer consistency")
5237     for idx, dev in enumerate(instance.disks):
5238       if idx not in self.op.disks:
5239         continue
5240       info("checking disk/%d consistency on %s" % (idx, pri_node))
5241       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5242         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5243                                  " unsafe to replace the secondary" %
5244                                  pri_node)
5245
5246     # Step: create new storage
5247     self.proc.LogStep(3, steps_total, "allocate new storage")
5248     for idx, dev in enumerate(instance.disks):
5249       info("adding new local storage on %s for disk/%d" %
5250            (new_node, idx))
5251       # we pass force_create=True to force LVM creation
5252       for new_lv in dev.children:
5253         _CreateBlockDev(self, new_node, instance, new_lv, True,
5254                         _GetInstanceInfoText(instance), False)
5255
5256     # Step 4: dbrd minors and drbd setups changes
5257     # after this, we must manually remove the drbd minors on both the
5258     # error and the success paths
5259     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5260                                    instance.name)
5261     logging.debug("Allocated minors %s" % (minors,))
5262     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5263     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5264       size = dev.size
5265       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5266       # create new devices on new_node; note that we create two IDs:
5267       # one without port, so the drbd will be activated without
5268       # networking information on the new node at this stage, and one
5269       # with network, for the latter activation in step 4
5270       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5271       if pri_node == o_node1:
5272         p_minor = o_minor1
5273       else:
5274         p_minor = o_minor2
5275
5276       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5277       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5278
5279       iv_names[idx] = (dev, dev.children, new_net_id)
5280       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5281                     new_net_id)
5282       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5283                               logical_id=new_alone_id,
5284                               children=dev.children)
5285       try:
5286         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5287                               _GetInstanceInfoText(instance), False)
5288       except errors.BlockDeviceError:
5289         self.cfg.ReleaseDRBDMinors(instance.name)
5290         raise
5291
5292     for idx, dev in enumerate(instance.disks):
5293       # we have new devices, shutdown the drbd on the old secondary
5294       info("shutting down drbd for disk/%d on old node" % idx)
5295       cfg.SetDiskID(dev, old_node)
5296       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5297       if msg:
5298         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5299                 (idx, msg),
5300                 hint="Please cleanup this device manually as soon as possible")
5301
5302     info("detaching primary drbds from the network (=> standalone)")
5303     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5304                                                instance.disks)[pri_node]
5305
5306     msg = result.RemoteFailMsg()
5307     if msg:
5308       # detaches didn't succeed (unlikely)
5309       self.cfg.ReleaseDRBDMinors(instance.name)
5310       raise errors.OpExecError("Can't detach the disks from the network on"
5311                                " old node: %s" % (msg,))
5312
5313     # if we managed to detach at least one, we update all the disks of
5314     # the instance to point to the new secondary
5315     info("updating instance configuration")
5316     for dev, _, new_logical_id in iv_names.itervalues():
5317       dev.logical_id = new_logical_id
5318       cfg.SetDiskID(dev, pri_node)
5319     cfg.Update(instance)
5320
5321     # and now perform the drbd attach
5322     info("attaching primary drbds to new secondary (standalone => connected)")
5323     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5324                                            instance.disks, instance.name,
5325                                            False)
5326     for to_node, to_result in result.items():
5327       msg = to_result.RemoteFailMsg()
5328       if msg:
5329         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5330                 hint="please do a gnt-instance info to see the"
5331                 " status of disks")
5332
5333     # this can fail as the old devices are degraded and _WaitForSync
5334     # does a combined result over all disks, so we don't check its
5335     # return value
5336     self.proc.LogStep(5, steps_total, "sync devices")
5337     _WaitForSync(self, instance, unlock=True)
5338
5339     # so check manually all the devices
5340     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5341       cfg.SetDiskID(dev, pri_node)
5342       result = self.rpc.call_blockdev_find(pri_node, dev)
5343       msg = result.RemoteFailMsg()
5344       if not msg and not result.payload:
5345         msg = "disk not found"
5346       if msg:
5347         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5348                                  (idx, msg))
5349       if result.payload[5]:
5350         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5351
5352     self.proc.LogStep(6, steps_total, "removing old storage")
5353     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5354       info("remove logical volumes for disk/%d" % idx)
5355       for lv in old_lvs:
5356         cfg.SetDiskID(lv, old_node)
5357         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5358         if msg:
5359           warning("Can't remove LV on old secondary: %s", msg,
5360                   hint="Cleanup stale volumes by hand")
5361
5362   def Exec(self, feedback_fn):
5363     """Execute disk replacement.
5364
5365     This dispatches the disk replacement to the appropriate handler.
5366
5367     """
5368     instance = self.instance
5369
5370     # Activate the instance disks if we're replacing them on a down instance
5371     if not instance.admin_up:
5372       _StartInstanceDisks(self, instance, True)
5373
5374     if self.op.mode == constants.REPLACE_DISK_CHG:
5375       fn = self._ExecD8Secondary
5376     else:
5377       fn = self._ExecD8DiskOnly
5378
5379     ret = fn(feedback_fn)
5380
5381     # Deactivate the instance disks if we're replacing them on a down instance
5382     if not instance.admin_up:
5383       _SafeShutdownInstanceDisks(self, instance)
5384
5385     return ret
5386
5387
5388 class LUGrowDisk(LogicalUnit):
5389   """Grow a disk of an instance.
5390
5391   """
5392   HPATH = "disk-grow"
5393   HTYPE = constants.HTYPE_INSTANCE
5394   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5395   REQ_BGL = False
5396
5397   def ExpandNames(self):
5398     self._ExpandAndLockInstance()
5399     self.needed_locks[locking.LEVEL_NODE] = []
5400     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5401
5402   def DeclareLocks(self, level):
5403     if level == locking.LEVEL_NODE:
5404       self._LockInstancesNodes()
5405
5406   def BuildHooksEnv(self):
5407     """Build hooks env.
5408
5409     This runs on the master, the primary and all the secondaries.
5410
5411     """
5412     env = {
5413       "DISK": self.op.disk,
5414       "AMOUNT": self.op.amount,
5415       }
5416     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5417     nl = [
5418       self.cfg.GetMasterNode(),
5419       self.instance.primary_node,
5420       ]
5421     return env, nl, nl
5422
5423   def CheckPrereq(self):
5424     """Check prerequisites.
5425
5426     This checks that the instance is in the cluster.
5427
5428     """
5429     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5430     assert instance is not None, \
5431       "Cannot retrieve locked instance %s" % self.op.instance_name
5432     nodenames = list(instance.all_nodes)
5433     for node in nodenames:
5434       _CheckNodeOnline(self, node)
5435
5436
5437     self.instance = instance
5438
5439     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5440       raise errors.OpPrereqError("Instance's disk layout does not support"
5441                                  " growing.")
5442
5443     self.disk = instance.FindDisk(self.op.disk)
5444
5445     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5446                                        instance.hypervisor)
5447     for node in nodenames:
5448       info = nodeinfo[node]
5449       if info.failed or not info.data:
5450         raise errors.OpPrereqError("Cannot get current information"
5451                                    " from node '%s'" % node)
5452       vg_free = info.data.get('vg_free', None)
5453       if not isinstance(vg_free, int):
5454         raise errors.OpPrereqError("Can't compute free disk space on"
5455                                    " node %s" % node)
5456       if self.op.amount > vg_free:
5457         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5458                                    " %d MiB available, %d MiB required" %
5459                                    (node, vg_free, self.op.amount))
5460
5461   def Exec(self, feedback_fn):
5462     """Execute disk grow.
5463
5464     """
5465     instance = self.instance
5466     disk = self.disk
5467     for node in instance.all_nodes:
5468       self.cfg.SetDiskID(disk, node)
5469       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5470       msg = result.RemoteFailMsg()
5471       if msg:
5472         raise errors.OpExecError("Grow request failed to node %s: %s" %
5473                                  (node, msg))
5474     disk.RecordGrow(self.op.amount)
5475     self.cfg.Update(instance)
5476     if self.op.wait_for_sync:
5477       disk_abort = not _WaitForSync(self, instance)
5478       if disk_abort:
5479         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5480                              " status.\nPlease check the instance.")
5481
5482
5483 class LUQueryInstanceData(NoHooksLU):
5484   """Query runtime instance data.
5485
5486   """
5487   _OP_REQP = ["instances", "static"]
5488   REQ_BGL = False
5489
5490   def ExpandNames(self):
5491     self.needed_locks = {}
5492     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5493
5494     if not isinstance(self.op.instances, list):
5495       raise errors.OpPrereqError("Invalid argument type 'instances'")
5496
5497     if self.op.instances:
5498       self.wanted_names = []
5499       for name in self.op.instances:
5500         full_name = self.cfg.ExpandInstanceName(name)
5501         if full_name is None:
5502           raise errors.OpPrereqError("Instance '%s' not known" % name)
5503         self.wanted_names.append(full_name)
5504       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5505     else:
5506       self.wanted_names = None
5507       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5508
5509     self.needed_locks[locking.LEVEL_NODE] = []
5510     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5511
5512   def DeclareLocks(self, level):
5513     if level == locking.LEVEL_NODE:
5514       self._LockInstancesNodes()
5515
5516   def CheckPrereq(self):
5517     """Check prerequisites.
5518
5519     This only checks the optional instance list against the existing names.
5520
5521     """
5522     if self.wanted_names is None:
5523       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5524
5525     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5526                              in self.wanted_names]
5527     return
5528
5529   def _ComputeDiskStatus(self, instance, snode, dev):
5530     """Compute block device status.
5531
5532     """
5533     static = self.op.static
5534     if not static:
5535       self.cfg.SetDiskID(dev, instance.primary_node)
5536       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5537       if dev_pstatus.offline:
5538         dev_pstatus = None
5539       else:
5540         msg = dev_pstatus.RemoteFailMsg()
5541         if msg:
5542           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5543                                    (instance.name, msg))
5544         dev_pstatus = dev_pstatus.payload
5545     else:
5546       dev_pstatus = None
5547
5548     if dev.dev_type in constants.LDS_DRBD:
5549       # we change the snode then (otherwise we use the one passed in)
5550       if dev.logical_id[0] == instance.primary_node:
5551         snode = dev.logical_id[1]
5552       else:
5553         snode = dev.logical_id[0]
5554
5555     if snode and not static:
5556       self.cfg.SetDiskID(dev, snode)
5557       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5558       if dev_sstatus.offline:
5559         dev_sstatus = None
5560       else:
5561         msg = dev_sstatus.RemoteFailMsg()
5562         if msg:
5563           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5564                                    (instance.name, msg))
5565         dev_sstatus = dev_sstatus.payload
5566     else:
5567       dev_sstatus = None
5568
5569     if dev.children:
5570       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5571                       for child in dev.children]
5572     else:
5573       dev_children = []
5574
5575     data = {
5576       "iv_name": dev.iv_name,
5577       "dev_type": dev.dev_type,
5578       "logical_id": dev.logical_id,
5579       "physical_id": dev.physical_id,
5580       "pstatus": dev_pstatus,
5581       "sstatus": dev_sstatus,
5582       "children": dev_children,
5583       "mode": dev.mode,
5584       }
5585
5586     return data
5587
5588   def Exec(self, feedback_fn):
5589     """Gather and return data"""
5590     result = {}
5591
5592     cluster = self.cfg.GetClusterInfo()
5593
5594     for instance in self.wanted_instances:
5595       if not self.op.static:
5596         remote_info = self.rpc.call_instance_info(instance.primary_node,
5597                                                   instance.name,
5598                                                   instance.hypervisor)
5599         remote_info.Raise()
5600         remote_info = remote_info.data
5601         if remote_info and "state" in remote_info:
5602           remote_state = "up"
5603         else:
5604           remote_state = "down"
5605       else:
5606         remote_state = None
5607       if instance.admin_up:
5608         config_state = "up"
5609       else:
5610         config_state = "down"
5611
5612       disks = [self._ComputeDiskStatus(instance, None, device)
5613                for device in instance.disks]
5614
5615       idict = {
5616         "name": instance.name,
5617         "config_state": config_state,
5618         "run_state": remote_state,
5619         "pnode": instance.primary_node,
5620         "snodes": instance.secondary_nodes,
5621         "os": instance.os,
5622         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5623         "disks": disks,
5624         "hypervisor": instance.hypervisor,
5625         "network_port": instance.network_port,
5626         "hv_instance": instance.hvparams,
5627         "hv_actual": cluster.FillHV(instance),
5628         "be_instance": instance.beparams,
5629         "be_actual": cluster.FillBE(instance),
5630         }
5631
5632       result[instance.name] = idict
5633
5634     return result
5635
5636
5637 class LUSetInstanceParams(LogicalUnit):
5638   """Modifies an instances's parameters.
5639
5640   """
5641   HPATH = "instance-modify"
5642   HTYPE = constants.HTYPE_INSTANCE
5643   _OP_REQP = ["instance_name"]
5644   REQ_BGL = False
5645
5646   def CheckArguments(self):
5647     if not hasattr(self.op, 'nics'):
5648       self.op.nics = []
5649     if not hasattr(self.op, 'disks'):
5650       self.op.disks = []
5651     if not hasattr(self.op, 'beparams'):
5652       self.op.beparams = {}
5653     if not hasattr(self.op, 'hvparams'):
5654       self.op.hvparams = {}
5655     self.op.force = getattr(self.op, "force", False)
5656     if not (self.op.nics or self.op.disks or
5657             self.op.hvparams or self.op.beparams):
5658       raise errors.OpPrereqError("No changes submitted")
5659
5660     # Disk validation
5661     disk_addremove = 0
5662     for disk_op, disk_dict in self.op.disks:
5663       if disk_op == constants.DDM_REMOVE:
5664         disk_addremove += 1
5665         continue
5666       elif disk_op == constants.DDM_ADD:
5667         disk_addremove += 1
5668       else:
5669         if not isinstance(disk_op, int):
5670           raise errors.OpPrereqError("Invalid disk index")
5671       if disk_op == constants.DDM_ADD:
5672         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5673         if mode not in constants.DISK_ACCESS_SET:
5674           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5675         size = disk_dict.get('size', None)
5676         if size is None:
5677           raise errors.OpPrereqError("Required disk parameter size missing")
5678         try:
5679           size = int(size)
5680         except ValueError, err:
5681           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5682                                      str(err))
5683         disk_dict['size'] = size
5684       else:
5685         # modification of disk
5686         if 'size' in disk_dict:
5687           raise errors.OpPrereqError("Disk size change not possible, use"
5688                                      " grow-disk")
5689
5690     if disk_addremove > 1:
5691       raise errors.OpPrereqError("Only one disk add or remove operation"
5692                                  " supported at a time")
5693
5694     # NIC validation
5695     nic_addremove = 0
5696     for nic_op, nic_dict in self.op.nics:
5697       if nic_op == constants.DDM_REMOVE:
5698         nic_addremove += 1
5699         continue
5700       elif nic_op == constants.DDM_ADD:
5701         nic_addremove += 1
5702       else:
5703         if not isinstance(nic_op, int):
5704           raise errors.OpPrereqError("Invalid nic index")
5705
5706       # nic_dict should be a dict
5707       nic_ip = nic_dict.get('ip', None)
5708       if nic_ip is not None:
5709         if nic_ip.lower() == constants.VALUE_NONE:
5710           nic_dict['ip'] = None
5711         else:
5712           if not utils.IsValidIP(nic_ip):
5713             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5714
5715       if nic_op == constants.DDM_ADD:
5716         nic_bridge = nic_dict.get('bridge', None)
5717         if nic_bridge is None:
5718           nic_dict['bridge'] = self.cfg.GetDefBridge()
5719         nic_mac = nic_dict.get('mac', None)
5720         if nic_mac is None:
5721           nic_dict['mac'] = constants.VALUE_AUTO
5722
5723       if 'mac' in nic_dict:
5724         nic_mac = nic_dict['mac']
5725         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5726           if not utils.IsValidMac(nic_mac):
5727             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5728         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5729           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5730                                      " modifying an existing nic")
5731
5732     if nic_addremove > 1:
5733       raise errors.OpPrereqError("Only one NIC add or remove operation"
5734                                  " supported at a time")
5735
5736   def ExpandNames(self):
5737     self._ExpandAndLockInstance()
5738     self.needed_locks[locking.LEVEL_NODE] = []
5739     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5740
5741   def DeclareLocks(self, level):
5742     if level == locking.LEVEL_NODE:
5743       self._LockInstancesNodes()
5744
5745   def BuildHooksEnv(self):
5746     """Build hooks env.
5747
5748     This runs on the master, primary and secondaries.
5749
5750     """
5751     args = dict()
5752     if constants.BE_MEMORY in self.be_new:
5753       args['memory'] = self.be_new[constants.BE_MEMORY]
5754     if constants.BE_VCPUS in self.be_new:
5755       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5756     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5757     # information at all.
5758     if self.op.nics:
5759       args['nics'] = []
5760       nic_override = dict(self.op.nics)
5761       for idx, nic in enumerate(self.instance.nics):
5762         if idx in nic_override:
5763           this_nic_override = nic_override[idx]
5764         else:
5765           this_nic_override = {}
5766         if 'ip' in this_nic_override:
5767           ip = this_nic_override['ip']
5768         else:
5769           ip = nic.ip
5770         if 'bridge' in this_nic_override:
5771           bridge = this_nic_override['bridge']
5772         else:
5773           bridge = nic.bridge
5774         if 'mac' in this_nic_override:
5775           mac = this_nic_override['mac']
5776         else:
5777           mac = nic.mac
5778         args['nics'].append((ip, bridge, mac))
5779       if constants.DDM_ADD in nic_override:
5780         ip = nic_override[constants.DDM_ADD].get('ip', None)
5781         bridge = nic_override[constants.DDM_ADD]['bridge']
5782         mac = nic_override[constants.DDM_ADD]['mac']
5783         args['nics'].append((ip, bridge, mac))
5784       elif constants.DDM_REMOVE in nic_override:
5785         del args['nics'][-1]
5786
5787     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5788     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5789     return env, nl, nl
5790
5791   def CheckPrereq(self):
5792     """Check prerequisites.
5793
5794     This only checks the instance list against the existing names.
5795
5796     """
5797     force = self.force = self.op.force
5798
5799     # checking the new params on the primary/secondary nodes
5800
5801     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5802     assert self.instance is not None, \
5803       "Cannot retrieve locked instance %s" % self.op.instance_name
5804     pnode = instance.primary_node
5805     nodelist = list(instance.all_nodes)
5806
5807     # hvparams processing
5808     if self.op.hvparams:
5809       i_hvdict = copy.deepcopy(instance.hvparams)
5810       for key, val in self.op.hvparams.iteritems():
5811         if val == constants.VALUE_DEFAULT:
5812           try:
5813             del i_hvdict[key]
5814           except KeyError:
5815             pass
5816         else:
5817           i_hvdict[key] = val
5818       cluster = self.cfg.GetClusterInfo()
5819       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5820       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5821                                 i_hvdict)
5822       # local check
5823       hypervisor.GetHypervisor(
5824         instance.hypervisor).CheckParameterSyntax(hv_new)
5825       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5826       self.hv_new = hv_new # the new actual values
5827       self.hv_inst = i_hvdict # the new dict (without defaults)
5828     else:
5829       self.hv_new = self.hv_inst = {}
5830
5831     # beparams processing
5832     if self.op.beparams:
5833       i_bedict = copy.deepcopy(instance.beparams)
5834       for key, val in self.op.beparams.iteritems():
5835         if val == constants.VALUE_DEFAULT:
5836           try:
5837             del i_bedict[key]
5838           except KeyError:
5839             pass
5840         else:
5841           i_bedict[key] = val
5842       cluster = self.cfg.GetClusterInfo()
5843       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5844       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5845                                 i_bedict)
5846       self.be_new = be_new # the new actual values
5847       self.be_inst = i_bedict # the new dict (without defaults)
5848     else:
5849       self.be_new = self.be_inst = {}
5850
5851     self.warn = []
5852
5853     if constants.BE_MEMORY in self.op.beparams and not self.force:
5854       mem_check_list = [pnode]
5855       if be_new[constants.BE_AUTO_BALANCE]:
5856         # either we changed auto_balance to yes or it was from before
5857         mem_check_list.extend(instance.secondary_nodes)
5858       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5859                                                   instance.hypervisor)
5860       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5861                                          instance.hypervisor)
5862       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5863         # Assume the primary node is unreachable and go ahead
5864         self.warn.append("Can't get info from primary node %s" % pnode)
5865       else:
5866         if not instance_info.failed and instance_info.data:
5867           current_mem = instance_info.data['memory']
5868         else:
5869           # Assume instance not running
5870           # (there is a slight race condition here, but it's not very probable,
5871           # and we have no other way to check)
5872           current_mem = 0
5873         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5874                     nodeinfo[pnode].data['memory_free'])
5875         if miss_mem > 0:
5876           raise errors.OpPrereqError("This change will prevent the instance"
5877                                      " from starting, due to %d MB of memory"
5878                                      " missing on its primary node" % miss_mem)
5879
5880       if be_new[constants.BE_AUTO_BALANCE]:
5881         for node, nres in nodeinfo.iteritems():
5882           if node not in instance.secondary_nodes:
5883             continue
5884           if nres.failed or not isinstance(nres.data, dict):
5885             self.warn.append("Can't get info from secondary node %s" % node)
5886           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5887             self.warn.append("Not enough memory to failover instance to"
5888                              " secondary node %s" % node)
5889
5890     # NIC processing
5891     for nic_op, nic_dict in self.op.nics:
5892       if nic_op == constants.DDM_REMOVE:
5893         if not instance.nics:
5894           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5895         continue
5896       if nic_op != constants.DDM_ADD:
5897         # an existing nic
5898         if nic_op < 0 or nic_op >= len(instance.nics):
5899           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5900                                      " are 0 to %d" %
5901                                      (nic_op, len(instance.nics)))
5902       if 'bridge' in nic_dict:
5903         nic_bridge = nic_dict['bridge']
5904         if nic_bridge is None:
5905           raise errors.OpPrereqError('Cannot set the nic bridge to None')
5906         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5907           msg = ("Bridge '%s' doesn't exist on one of"
5908                  " the instance nodes" % nic_bridge)
5909           if self.force:
5910             self.warn.append(msg)
5911           else:
5912             raise errors.OpPrereqError(msg)
5913       if 'mac' in nic_dict:
5914         nic_mac = nic_dict['mac']
5915         if nic_mac is None:
5916           raise errors.OpPrereqError('Cannot set the nic mac to None')
5917         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5918           # otherwise generate the mac
5919           nic_dict['mac'] = self.cfg.GenerateMAC()
5920         else:
5921           # or validate/reserve the current one
5922           if self.cfg.IsMacInUse(nic_mac):
5923             raise errors.OpPrereqError("MAC address %s already in use"
5924                                        " in cluster" % nic_mac)
5925
5926     # DISK processing
5927     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5928       raise errors.OpPrereqError("Disk operations not supported for"
5929                                  " diskless instances")
5930     for disk_op, disk_dict in self.op.disks:
5931       if disk_op == constants.DDM_REMOVE:
5932         if len(instance.disks) == 1:
5933           raise errors.OpPrereqError("Cannot remove the last disk of"
5934                                      " an instance")
5935         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5936         ins_l = ins_l[pnode]
5937         if ins_l.failed or not isinstance(ins_l.data, list):
5938           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5939         if instance.name in ins_l.data:
5940           raise errors.OpPrereqError("Instance is running, can't remove"
5941                                      " disks.")
5942
5943       if (disk_op == constants.DDM_ADD and
5944           len(instance.nics) >= constants.MAX_DISKS):
5945         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5946                                    " add more" % constants.MAX_DISKS)
5947       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5948         # an existing disk
5949         if disk_op < 0 or disk_op >= len(instance.disks):
5950           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5951                                      " are 0 to %d" %
5952                                      (disk_op, len(instance.disks)))
5953
5954     return
5955
5956   def Exec(self, feedback_fn):
5957     """Modifies an instance.
5958
5959     All parameters take effect only at the next restart of the instance.
5960
5961     """
5962     # Process here the warnings from CheckPrereq, as we don't have a
5963     # feedback_fn there.
5964     for warn in self.warn:
5965       feedback_fn("WARNING: %s" % warn)
5966
5967     result = []
5968     instance = self.instance
5969     # disk changes
5970     for disk_op, disk_dict in self.op.disks:
5971       if disk_op == constants.DDM_REMOVE:
5972         # remove the last disk
5973         device = instance.disks.pop()
5974         device_idx = len(instance.disks)
5975         for node, disk in device.ComputeNodeTree(instance.primary_node):
5976           self.cfg.SetDiskID(disk, node)
5977           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
5978           if msg:
5979             self.LogWarning("Could not remove disk/%d on node %s: %s,"
5980                             " continuing anyway", device_idx, node, msg)
5981         result.append(("disk/%d" % device_idx, "remove"))
5982       elif disk_op == constants.DDM_ADD:
5983         # add a new disk
5984         if instance.disk_template == constants.DT_FILE:
5985           file_driver, file_path = instance.disks[0].logical_id
5986           file_path = os.path.dirname(file_path)
5987         else:
5988           file_driver = file_path = None
5989         disk_idx_base = len(instance.disks)
5990         new_disk = _GenerateDiskTemplate(self,
5991                                          instance.disk_template,
5992                                          instance.name, instance.primary_node,
5993                                          instance.secondary_nodes,
5994                                          [disk_dict],
5995                                          file_path,
5996                                          file_driver,
5997                                          disk_idx_base)[0]
5998         instance.disks.append(new_disk)
5999         info = _GetInstanceInfoText(instance)
6000
6001         logging.info("Creating volume %s for instance %s",
6002                      new_disk.iv_name, instance.name)
6003         # Note: this needs to be kept in sync with _CreateDisks
6004         #HARDCODE
6005         for node in instance.all_nodes:
6006           f_create = node == instance.primary_node
6007           try:
6008             _CreateBlockDev(self, node, instance, new_disk,
6009                             f_create, info, f_create)
6010           except errors.OpExecError, err:
6011             self.LogWarning("Failed to create volume %s (%s) on"
6012                             " node %s: %s",
6013                             new_disk.iv_name, new_disk, node, err)
6014         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6015                        (new_disk.size, new_disk.mode)))
6016       else:
6017         # change a given disk
6018         instance.disks[disk_op].mode = disk_dict['mode']
6019         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6020     # NIC changes
6021     for nic_op, nic_dict in self.op.nics:
6022       if nic_op == constants.DDM_REMOVE:
6023         # remove the last nic
6024         del instance.nics[-1]
6025         result.append(("nic.%d" % len(instance.nics), "remove"))
6026       elif nic_op == constants.DDM_ADD:
6027         # mac and bridge should be set, by now
6028         mac = nic_dict['mac']
6029         bridge = nic_dict['bridge']
6030         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6031                               bridge=bridge)
6032         instance.nics.append(new_nic)
6033         result.append(("nic.%d" % (len(instance.nics) - 1),
6034                        "add:mac=%s,ip=%s,bridge=%s" %
6035                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6036       else:
6037         # change a given nic
6038         for key in 'mac', 'ip', 'bridge':
6039           if key in nic_dict:
6040             setattr(instance.nics[nic_op], key, nic_dict[key])
6041             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6042
6043     # hvparams changes
6044     if self.op.hvparams:
6045       instance.hvparams = self.hv_inst
6046       for key, val in self.op.hvparams.iteritems():
6047         result.append(("hv/%s" % key, val))
6048
6049     # beparams changes
6050     if self.op.beparams:
6051       instance.beparams = self.be_inst
6052       for key, val in self.op.beparams.iteritems():
6053         result.append(("be/%s" % key, val))
6054
6055     self.cfg.Update(instance)
6056
6057     return result
6058
6059
6060 class LUQueryExports(NoHooksLU):
6061   """Query the exports list
6062
6063   """
6064   _OP_REQP = ['nodes']
6065   REQ_BGL = False
6066
6067   def ExpandNames(self):
6068     self.needed_locks = {}
6069     self.share_locks[locking.LEVEL_NODE] = 1
6070     if not self.op.nodes:
6071       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6072     else:
6073       self.needed_locks[locking.LEVEL_NODE] = \
6074         _GetWantedNodes(self, self.op.nodes)
6075
6076   def CheckPrereq(self):
6077     """Check prerequisites.
6078
6079     """
6080     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6081
6082   def Exec(self, feedback_fn):
6083     """Compute the list of all the exported system images.
6084
6085     @rtype: dict
6086     @return: a dictionary with the structure node->(export-list)
6087         where export-list is a list of the instances exported on
6088         that node.
6089
6090     """
6091     rpcresult = self.rpc.call_export_list(self.nodes)
6092     result = {}
6093     for node in rpcresult:
6094       if rpcresult[node].failed:
6095         result[node] = False
6096       else:
6097         result[node] = rpcresult[node].data
6098
6099     return result
6100
6101
6102 class LUExportInstance(LogicalUnit):
6103   """Export an instance to an image in the cluster.
6104
6105   """
6106   HPATH = "instance-export"
6107   HTYPE = constants.HTYPE_INSTANCE
6108   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6109   REQ_BGL = False
6110
6111   def ExpandNames(self):
6112     self._ExpandAndLockInstance()
6113     # FIXME: lock only instance primary and destination node
6114     #
6115     # Sad but true, for now we have do lock all nodes, as we don't know where
6116     # the previous export might be, and and in this LU we search for it and
6117     # remove it from its current node. In the future we could fix this by:
6118     #  - making a tasklet to search (share-lock all), then create the new one,
6119     #    then one to remove, after
6120     #  - removing the removal operation altoghether
6121     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6122
6123   def DeclareLocks(self, level):
6124     """Last minute lock declaration."""
6125     # All nodes are locked anyway, so nothing to do here.
6126
6127   def BuildHooksEnv(self):
6128     """Build hooks env.
6129
6130     This will run on the master, primary node and target node.
6131
6132     """
6133     env = {
6134       "EXPORT_NODE": self.op.target_node,
6135       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6136       }
6137     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6138     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6139           self.op.target_node]
6140     return env, nl, nl
6141
6142   def CheckPrereq(self):
6143     """Check prerequisites.
6144
6145     This checks that the instance and node names are valid.
6146
6147     """
6148     instance_name = self.op.instance_name
6149     self.instance = self.cfg.GetInstanceInfo(instance_name)
6150     assert self.instance is not None, \
6151           "Cannot retrieve locked instance %s" % self.op.instance_name
6152     _CheckNodeOnline(self, self.instance.primary_node)
6153
6154     self.dst_node = self.cfg.GetNodeInfo(
6155       self.cfg.ExpandNodeName(self.op.target_node))
6156
6157     if self.dst_node is None:
6158       # This is wrong node name, not a non-locked node
6159       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6160     _CheckNodeOnline(self, self.dst_node.name)
6161     _CheckNodeNotDrained(self, self.dst_node.name)
6162
6163     # instance disk type verification
6164     for disk in self.instance.disks:
6165       if disk.dev_type == constants.LD_FILE:
6166         raise errors.OpPrereqError("Export not supported for instances with"
6167                                    " file-based disks")
6168
6169   def Exec(self, feedback_fn):
6170     """Export an instance to an image in the cluster.
6171
6172     """
6173     instance = self.instance
6174     dst_node = self.dst_node
6175     src_node = instance.primary_node
6176     if self.op.shutdown:
6177       # shutdown the instance, but not the disks
6178       result = self.rpc.call_instance_shutdown(src_node, instance)
6179       msg = result.RemoteFailMsg()
6180       if msg:
6181         raise errors.OpExecError("Could not shutdown instance %s on"
6182                                  " node %s: %s" %
6183                                  (instance.name, src_node, msg))
6184
6185     vgname = self.cfg.GetVGName()
6186
6187     snap_disks = []
6188
6189     # set the disks ID correctly since call_instance_start needs the
6190     # correct drbd minor to create the symlinks
6191     for disk in instance.disks:
6192       self.cfg.SetDiskID(disk, src_node)
6193
6194     try:
6195       for disk in instance.disks:
6196         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6197         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6198         if new_dev_name.failed or not new_dev_name.data:
6199           self.LogWarning("Could not snapshot block device %s on node %s",
6200                           disk.logical_id[1], src_node)
6201           snap_disks.append(False)
6202         else:
6203           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6204                                  logical_id=(vgname, new_dev_name.data),
6205                                  physical_id=(vgname, new_dev_name.data),
6206                                  iv_name=disk.iv_name)
6207           snap_disks.append(new_dev)
6208
6209     finally:
6210       if self.op.shutdown and instance.admin_up:
6211         result = self.rpc.call_instance_start(src_node, instance)
6212         msg = result.RemoteFailMsg()
6213         if msg:
6214           _ShutdownInstanceDisks(self, instance)
6215           raise errors.OpExecError("Could not start instance: %s" % msg)
6216
6217     # TODO: check for size
6218
6219     cluster_name = self.cfg.GetClusterName()
6220     for idx, dev in enumerate(snap_disks):
6221       if dev:
6222         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6223                                                instance, cluster_name, idx)
6224         if result.failed or not result.data:
6225           self.LogWarning("Could not export block device %s from node %s to"
6226                           " node %s", dev.logical_id[1], src_node,
6227                           dst_node.name)
6228         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6229         if msg:
6230           self.LogWarning("Could not remove snapshot block device %s from node"
6231                           " %s: %s", dev.logical_id[1], src_node, msg)
6232
6233     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6234     if result.failed or not result.data:
6235       self.LogWarning("Could not finalize export for instance %s on node %s",
6236                       instance.name, dst_node.name)
6237
6238     nodelist = self.cfg.GetNodeList()
6239     nodelist.remove(dst_node.name)
6240
6241     # on one-node clusters nodelist will be empty after the removal
6242     # if we proceed the backup would be removed because OpQueryExports
6243     # substitutes an empty list with the full cluster node list.
6244     if nodelist:
6245       exportlist = self.rpc.call_export_list(nodelist)
6246       for node in exportlist:
6247         if exportlist[node].failed:
6248           continue
6249         if instance.name in exportlist[node].data:
6250           if not self.rpc.call_export_remove(node, instance.name):
6251             self.LogWarning("Could not remove older export for instance %s"
6252                             " on node %s", instance.name, node)
6253
6254
6255 class LURemoveExport(NoHooksLU):
6256   """Remove exports related to the named instance.
6257
6258   """
6259   _OP_REQP = ["instance_name"]
6260   REQ_BGL = False
6261
6262   def ExpandNames(self):
6263     self.needed_locks = {}
6264     # We need all nodes to be locked in order for RemoveExport to work, but we
6265     # don't need to lock the instance itself, as nothing will happen to it (and
6266     # we can remove exports also for a removed instance)
6267     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6268
6269   def CheckPrereq(self):
6270     """Check prerequisites.
6271     """
6272     pass
6273
6274   def Exec(self, feedback_fn):
6275     """Remove any export.
6276
6277     """
6278     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6279     # If the instance was not found we'll try with the name that was passed in.
6280     # This will only work if it was an FQDN, though.
6281     fqdn_warn = False
6282     if not instance_name:
6283       fqdn_warn = True
6284       instance_name = self.op.instance_name
6285
6286     exportlist = self.rpc.call_export_list(self.acquired_locks[
6287       locking.LEVEL_NODE])
6288     found = False
6289     for node in exportlist:
6290       if exportlist[node].failed:
6291         self.LogWarning("Failed to query node %s, continuing" % node)
6292         continue
6293       if instance_name in exportlist[node].data:
6294         found = True
6295         result = self.rpc.call_export_remove(node, instance_name)
6296         if result.failed or not result.data:
6297           logging.error("Could not remove export for instance %s"
6298                         " on node %s", instance_name, node)
6299
6300     if fqdn_warn and not found:
6301       feedback_fn("Export not found. If trying to remove an export belonging"
6302                   " to a deleted instance please use its Fully Qualified"
6303                   " Domain Name.")
6304
6305
6306 class TagsLU(NoHooksLU):
6307   """Generic tags LU.
6308
6309   This is an abstract class which is the parent of all the other tags LUs.
6310
6311   """
6312
6313   def ExpandNames(self):
6314     self.needed_locks = {}
6315     if self.op.kind == constants.TAG_NODE:
6316       name = self.cfg.ExpandNodeName(self.op.name)
6317       if name is None:
6318         raise errors.OpPrereqError("Invalid node name (%s)" %
6319                                    (self.op.name,))
6320       self.op.name = name
6321       self.needed_locks[locking.LEVEL_NODE] = name
6322     elif self.op.kind == constants.TAG_INSTANCE:
6323       name = self.cfg.ExpandInstanceName(self.op.name)
6324       if name is None:
6325         raise errors.OpPrereqError("Invalid instance name (%s)" %
6326                                    (self.op.name,))
6327       self.op.name = name
6328       self.needed_locks[locking.LEVEL_INSTANCE] = name
6329
6330   def CheckPrereq(self):
6331     """Check prerequisites.
6332
6333     """
6334     if self.op.kind == constants.TAG_CLUSTER:
6335       self.target = self.cfg.GetClusterInfo()
6336     elif self.op.kind == constants.TAG_NODE:
6337       self.target = self.cfg.GetNodeInfo(self.op.name)
6338     elif self.op.kind == constants.TAG_INSTANCE:
6339       self.target = self.cfg.GetInstanceInfo(self.op.name)
6340     else:
6341       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6342                                  str(self.op.kind))
6343
6344
6345 class LUGetTags(TagsLU):
6346   """Returns the tags of a given object.
6347
6348   """
6349   _OP_REQP = ["kind", "name"]
6350   REQ_BGL = False
6351
6352   def Exec(self, feedback_fn):
6353     """Returns the tag list.
6354
6355     """
6356     return list(self.target.GetTags())
6357
6358
6359 class LUSearchTags(NoHooksLU):
6360   """Searches the tags for a given pattern.
6361
6362   """
6363   _OP_REQP = ["pattern"]
6364   REQ_BGL = False
6365
6366   def ExpandNames(self):
6367     self.needed_locks = {}
6368
6369   def CheckPrereq(self):
6370     """Check prerequisites.
6371
6372     This checks the pattern passed for validity by compiling it.
6373
6374     """
6375     try:
6376       self.re = re.compile(self.op.pattern)
6377     except re.error, err:
6378       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6379                                  (self.op.pattern, err))
6380
6381   def Exec(self, feedback_fn):
6382     """Returns the tag list.
6383
6384     """
6385     cfg = self.cfg
6386     tgts = [("/cluster", cfg.GetClusterInfo())]
6387     ilist = cfg.GetAllInstancesInfo().values()
6388     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6389     nlist = cfg.GetAllNodesInfo().values()
6390     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6391     results = []
6392     for path, target in tgts:
6393       for tag in target.GetTags():
6394         if self.re.search(tag):
6395           results.append((path, tag))
6396     return results
6397
6398
6399 class LUAddTags(TagsLU):
6400   """Sets a tag on a given object.
6401
6402   """
6403   _OP_REQP = ["kind", "name", "tags"]
6404   REQ_BGL = False
6405
6406   def CheckPrereq(self):
6407     """Check prerequisites.
6408
6409     This checks the type and length of the tag name and value.
6410
6411     """
6412     TagsLU.CheckPrereq(self)
6413     for tag in self.op.tags:
6414       objects.TaggableObject.ValidateTag(tag)
6415
6416   def Exec(self, feedback_fn):
6417     """Sets the tag.
6418
6419     """
6420     try:
6421       for tag in self.op.tags:
6422         self.target.AddTag(tag)
6423     except errors.TagError, err:
6424       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6425     try:
6426       self.cfg.Update(self.target)
6427     except errors.ConfigurationError:
6428       raise errors.OpRetryError("There has been a modification to the"
6429                                 " config file and the operation has been"
6430                                 " aborted. Please retry.")
6431
6432
6433 class LUDelTags(TagsLU):
6434   """Delete a list of tags from a given object.
6435
6436   """
6437   _OP_REQP = ["kind", "name", "tags"]
6438   REQ_BGL = False
6439
6440   def CheckPrereq(self):
6441     """Check prerequisites.
6442
6443     This checks that we have the given tag.
6444
6445     """
6446     TagsLU.CheckPrereq(self)
6447     for tag in self.op.tags:
6448       objects.TaggableObject.ValidateTag(tag)
6449     del_tags = frozenset(self.op.tags)
6450     cur_tags = self.target.GetTags()
6451     if not del_tags <= cur_tags:
6452       diff_tags = del_tags - cur_tags
6453       diff_names = ["'%s'" % tag for tag in diff_tags]
6454       diff_names.sort()
6455       raise errors.OpPrereqError("Tag(s) %s not found" %
6456                                  (",".join(diff_names)))
6457
6458   def Exec(self, feedback_fn):
6459     """Remove the tag from the object.
6460
6461     """
6462     for tag in self.op.tags:
6463       self.target.RemoveTag(tag)
6464     try:
6465       self.cfg.Update(self.target)
6466     except errors.ConfigurationError:
6467       raise errors.OpRetryError("There has been a modification to the"
6468                                 " config file and the operation has been"
6469                                 " aborted. Please retry.")
6470
6471
6472 class LUTestDelay(NoHooksLU):
6473   """Sleep for a specified amount of time.
6474
6475   This LU sleeps on the master and/or nodes for a specified amount of
6476   time.
6477
6478   """
6479   _OP_REQP = ["duration", "on_master", "on_nodes"]
6480   REQ_BGL = False
6481
6482   def ExpandNames(self):
6483     """Expand names and set required locks.
6484
6485     This expands the node list, if any.
6486
6487     """
6488     self.needed_locks = {}
6489     if self.op.on_nodes:
6490       # _GetWantedNodes can be used here, but is not always appropriate to use
6491       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6492       # more information.
6493       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6494       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6495
6496   def CheckPrereq(self):
6497     """Check prerequisites.
6498
6499     """
6500
6501   def Exec(self, feedback_fn):
6502     """Do the actual sleep.
6503
6504     """
6505     if self.op.on_master:
6506       if not utils.TestDelay(self.op.duration):
6507         raise errors.OpExecError("Error during master delay test")
6508     if self.op.on_nodes:
6509       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6510       if not result:
6511         raise errors.OpExecError("Complete failure from rpc call")
6512       for node, node_result in result.items():
6513         node_result.Raise()
6514         if not node_result.data:
6515           raise errors.OpExecError("Failure during rpc call to node %s,"
6516                                    " result: %s" % (node, node_result.data))
6517
6518
6519 class IAllocator(object):
6520   """IAllocator framework.
6521
6522   An IAllocator instance has three sets of attributes:
6523     - cfg that is needed to query the cluster
6524     - input data (all members of the _KEYS class attribute are required)
6525     - four buffer attributes (in|out_data|text), that represent the
6526       input (to the external script) in text and data structure format,
6527       and the output from it, again in two formats
6528     - the result variables from the script (success, info, nodes) for
6529       easy usage
6530
6531   """
6532   _ALLO_KEYS = [
6533     "mem_size", "disks", "disk_template",
6534     "os", "tags", "nics", "vcpus", "hypervisor",
6535     ]
6536   _RELO_KEYS = [
6537     "relocate_from",
6538     ]
6539
6540   def __init__(self, lu, mode, name, **kwargs):
6541     self.lu = lu
6542     # init buffer variables
6543     self.in_text = self.out_text = self.in_data = self.out_data = None
6544     # init all input fields so that pylint is happy
6545     self.mode = mode
6546     self.name = name
6547     self.mem_size = self.disks = self.disk_template = None
6548     self.os = self.tags = self.nics = self.vcpus = None
6549     self.hypervisor = None
6550     self.relocate_from = None
6551     # computed fields
6552     self.required_nodes = None
6553     # init result fields
6554     self.success = self.info = self.nodes = None
6555     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6556       keyset = self._ALLO_KEYS
6557     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6558       keyset = self._RELO_KEYS
6559     else:
6560       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6561                                    " IAllocator" % self.mode)
6562     for key in kwargs:
6563       if key not in keyset:
6564         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6565                                      " IAllocator" % key)
6566       setattr(self, key, kwargs[key])
6567     for key in keyset:
6568       if key not in kwargs:
6569         raise errors.ProgrammerError("Missing input parameter '%s' to"
6570                                      " IAllocator" % key)
6571     self._BuildInputData()
6572
6573   def _ComputeClusterData(self):
6574     """Compute the generic allocator input data.
6575
6576     This is the data that is independent of the actual operation.
6577
6578     """
6579     cfg = self.lu.cfg
6580     cluster_info = cfg.GetClusterInfo()
6581     # cluster data
6582     data = {
6583       "version": 1,
6584       "cluster_name": cfg.GetClusterName(),
6585       "cluster_tags": list(cluster_info.GetTags()),
6586       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6587       # we don't have job IDs
6588       }
6589     iinfo = cfg.GetAllInstancesInfo().values()
6590     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6591
6592     # node data
6593     node_results = {}
6594     node_list = cfg.GetNodeList()
6595
6596     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6597       hypervisor_name = self.hypervisor
6598     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6599       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6600
6601     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6602                                            hypervisor_name)
6603     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6604                        cluster_info.enabled_hypervisors)
6605     for nname, nresult in node_data.items():
6606       # first fill in static (config-based) values
6607       ninfo = cfg.GetNodeInfo(nname)
6608       pnr = {
6609         "tags": list(ninfo.GetTags()),
6610         "primary_ip": ninfo.primary_ip,
6611         "secondary_ip": ninfo.secondary_ip,
6612         "offline": ninfo.offline,
6613         "drained": ninfo.drained,
6614         "master_candidate": ninfo.master_candidate,
6615         }
6616
6617       if not ninfo.offline:
6618         nresult.Raise()
6619         if not isinstance(nresult.data, dict):
6620           raise errors.OpExecError("Can't get data for node %s" % nname)
6621         remote_info = nresult.data
6622         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6623                      'vg_size', 'vg_free', 'cpu_total']:
6624           if attr not in remote_info:
6625             raise errors.OpExecError("Node '%s' didn't return attribute"
6626                                      " '%s'" % (nname, attr))
6627           try:
6628             remote_info[attr] = int(remote_info[attr])
6629           except ValueError, err:
6630             raise errors.OpExecError("Node '%s' returned invalid value"
6631                                      " for '%s': %s" % (nname, attr, err))
6632         # compute memory used by primary instances
6633         i_p_mem = i_p_up_mem = 0
6634         for iinfo, beinfo in i_list:
6635           if iinfo.primary_node == nname:
6636             i_p_mem += beinfo[constants.BE_MEMORY]
6637             if iinfo.name not in node_iinfo[nname].data:
6638               i_used_mem = 0
6639             else:
6640               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6641             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6642             remote_info['memory_free'] -= max(0, i_mem_diff)
6643
6644             if iinfo.admin_up:
6645               i_p_up_mem += beinfo[constants.BE_MEMORY]
6646
6647         # compute memory used by instances
6648         pnr_dyn = {
6649           "total_memory": remote_info['memory_total'],
6650           "reserved_memory": remote_info['memory_dom0'],
6651           "free_memory": remote_info['memory_free'],
6652           "total_disk": remote_info['vg_size'],
6653           "free_disk": remote_info['vg_free'],
6654           "total_cpus": remote_info['cpu_total'],
6655           "i_pri_memory": i_p_mem,
6656           "i_pri_up_memory": i_p_up_mem,
6657           }
6658         pnr.update(pnr_dyn)
6659
6660       node_results[nname] = pnr
6661     data["nodes"] = node_results
6662
6663     # instance data
6664     instance_data = {}
6665     for iinfo, beinfo in i_list:
6666       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6667                   for n in iinfo.nics]
6668       pir = {
6669         "tags": list(iinfo.GetTags()),
6670         "admin_up": iinfo.admin_up,
6671         "vcpus": beinfo[constants.BE_VCPUS],
6672         "memory": beinfo[constants.BE_MEMORY],
6673         "os": iinfo.os,
6674         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6675         "nics": nic_data,
6676         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6677         "disk_template": iinfo.disk_template,
6678         "hypervisor": iinfo.hypervisor,
6679         }
6680       instance_data[iinfo.name] = pir
6681
6682     data["instances"] = instance_data
6683
6684     self.in_data = data
6685
6686   def _AddNewInstance(self):
6687     """Add new instance data to allocator structure.
6688
6689     This in combination with _AllocatorGetClusterData will create the
6690     correct structure needed as input for the allocator.
6691
6692     The checks for the completeness of the opcode must have already been
6693     done.
6694
6695     """
6696     data = self.in_data
6697
6698     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6699
6700     if self.disk_template in constants.DTS_NET_MIRROR:
6701       self.required_nodes = 2
6702     else:
6703       self.required_nodes = 1
6704     request = {
6705       "type": "allocate",
6706       "name": self.name,
6707       "disk_template": self.disk_template,
6708       "tags": self.tags,
6709       "os": self.os,
6710       "vcpus": self.vcpus,
6711       "memory": self.mem_size,
6712       "disks": self.disks,
6713       "disk_space_total": disk_space,
6714       "nics": self.nics,
6715       "required_nodes": self.required_nodes,
6716       }
6717     data["request"] = request
6718
6719   def _AddRelocateInstance(self):
6720     """Add relocate instance data to allocator structure.
6721
6722     This in combination with _IAllocatorGetClusterData will create the
6723     correct structure needed as input for the allocator.
6724
6725     The checks for the completeness of the opcode must have already been
6726     done.
6727
6728     """
6729     instance = self.lu.cfg.GetInstanceInfo(self.name)
6730     if instance is None:
6731       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6732                                    " IAllocator" % self.name)
6733
6734     if instance.disk_template not in constants.DTS_NET_MIRROR:
6735       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6736
6737     if len(instance.secondary_nodes) != 1:
6738       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6739
6740     self.required_nodes = 1
6741     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6742     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6743
6744     request = {
6745       "type": "relocate",
6746       "name": self.name,
6747       "disk_space_total": disk_space,
6748       "required_nodes": self.required_nodes,
6749       "relocate_from": self.relocate_from,
6750       }
6751     self.in_data["request"] = request
6752
6753   def _BuildInputData(self):
6754     """Build input data structures.
6755
6756     """
6757     self._ComputeClusterData()
6758
6759     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6760       self._AddNewInstance()
6761     else:
6762       self._AddRelocateInstance()
6763
6764     self.in_text = serializer.Dump(self.in_data)
6765
6766   def Run(self, name, validate=True, call_fn=None):
6767     """Run an instance allocator and return the results.
6768
6769     """
6770     if call_fn is None:
6771       call_fn = self.lu.rpc.call_iallocator_runner
6772     data = self.in_text
6773
6774     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6775     result.Raise()
6776
6777     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6778       raise errors.OpExecError("Invalid result from master iallocator runner")
6779
6780     rcode, stdout, stderr, fail = result.data
6781
6782     if rcode == constants.IARUN_NOTFOUND:
6783       raise errors.OpExecError("Can't find allocator '%s'" % name)
6784     elif rcode == constants.IARUN_FAILURE:
6785       raise errors.OpExecError("Instance allocator call failed: %s,"
6786                                " output: %s" % (fail, stdout+stderr))
6787     self.out_text = stdout
6788     if validate:
6789       self._ValidateResult()
6790
6791   def _ValidateResult(self):
6792     """Process the allocator results.
6793
6794     This will process and if successful save the result in
6795     self.out_data and the other parameters.
6796
6797     """
6798     try:
6799       rdict = serializer.Load(self.out_text)
6800     except Exception, err:
6801       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6802
6803     if not isinstance(rdict, dict):
6804       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6805
6806     for key in "success", "info", "nodes":
6807       if key not in rdict:
6808         raise errors.OpExecError("Can't parse iallocator results:"
6809                                  " missing key '%s'" % key)
6810       setattr(self, key, rdict[key])
6811
6812     if not isinstance(rdict["nodes"], list):
6813       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6814                                " is not a list")
6815     self.out_data = rdict
6816
6817
6818 class LUTestAllocator(NoHooksLU):
6819   """Run allocator tests.
6820
6821   This LU runs the allocator tests
6822
6823   """
6824   _OP_REQP = ["direction", "mode", "name"]
6825
6826   def CheckPrereq(self):
6827     """Check prerequisites.
6828
6829     This checks the opcode parameters depending on the director and mode test.
6830
6831     """
6832     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6833       for attr in ["name", "mem_size", "disks", "disk_template",
6834                    "os", "tags", "nics", "vcpus"]:
6835         if not hasattr(self.op, attr):
6836           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6837                                      attr)
6838       iname = self.cfg.ExpandInstanceName(self.op.name)
6839       if iname is not None:
6840         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6841                                    iname)
6842       if not isinstance(self.op.nics, list):
6843         raise errors.OpPrereqError("Invalid parameter 'nics'")
6844       for row in self.op.nics:
6845         if (not isinstance(row, dict) or
6846             "mac" not in row or
6847             "ip" not in row or
6848             "bridge" not in row):
6849           raise errors.OpPrereqError("Invalid contents of the"
6850                                      " 'nics' parameter")
6851       if not isinstance(self.op.disks, list):
6852         raise errors.OpPrereqError("Invalid parameter 'disks'")
6853       for row in self.op.disks:
6854         if (not isinstance(row, dict) or
6855             "size" not in row or
6856             not isinstance(row["size"], int) or
6857             "mode" not in row or
6858             row["mode"] not in ['r', 'w']):
6859           raise errors.OpPrereqError("Invalid contents of the"
6860                                      " 'disks' parameter")
6861       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6862         self.op.hypervisor = self.cfg.GetHypervisorType()
6863     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6864       if not hasattr(self.op, "name"):
6865         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6866       fname = self.cfg.ExpandInstanceName(self.op.name)
6867       if fname is None:
6868         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6869                                    self.op.name)
6870       self.op.name = fname
6871       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6872     else:
6873       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6874                                  self.op.mode)
6875
6876     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6877       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6878         raise errors.OpPrereqError("Missing allocator name")
6879     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6880       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6881                                  self.op.direction)
6882
6883   def Exec(self, feedback_fn):
6884     """Run the allocator test.
6885
6886     """
6887     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6888       ial = IAllocator(self,
6889                        mode=self.op.mode,
6890                        name=self.op.name,
6891                        mem_size=self.op.mem_size,
6892                        disks=self.op.disks,
6893                        disk_template=self.op.disk_template,
6894                        os=self.op.os,
6895                        tags=self.op.tags,
6896                        nics=self.op.nics,
6897                        vcpus=self.op.vcpus,
6898                        hypervisor=self.op.hypervisor,
6899                        )
6900     else:
6901       ial = IAllocator(self,
6902                        mode=self.op.mode,
6903                        name=self.op.name,
6904                        relocate_from=list(self.relocate_from),
6905                        )
6906
6907     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6908       result = ial.in_text
6909     else:
6910       ial.Run(self.op.allocator, validate=False)
6911       result = ial.out_text
6912     return result