Small simplification in MapLVsByNode
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = lu.cfg.GetInstanceList()
396   return utils.NiceSort(wanted)
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _CheckBooleanOpField(op, name):
419   """Validates boolean opcode parameters.
420
421   This will ensure that an opcode parameter is either a boolean value,
422   or None (but that it always exists).
423
424   """
425   val = getattr(op, name, None)
426   if not (val is None or isinstance(val, bool)):
427     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428                                (name, str(val)))
429   setattr(op, name, val)
430
431
432 def _CheckNodeOnline(lu, node):
433   """Ensure that a given node is online.
434
435   @param lu: the LU on behalf of which we make the check
436   @param node: the node to check
437   @raise errors.OpPrereqError: if the nodes is offline
438
439   """
440   if lu.cfg.GetNodeInfo(node).offline:
441     raise errors.OpPrereqError("Can't use offline node %s" % node)
442
443
444 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445                           memory, vcpus, nics):
446   """Builds instance related env variables for hooks
447
448   This builds the hook environment from individual variables.
449
450   @type name: string
451   @param name: the name of the instance
452   @type primary_node: string
453   @param primary_node: the name of the instance's primary node
454   @type secondary_nodes: list
455   @param secondary_nodes: list of secondary nodes as strings
456   @type os_type: string
457   @param os_type: the name of the instance's OS
458   @type status: string
459   @param status: the desired status of the instances
460   @type memory: string
461   @param memory: the memory size of the instance
462   @type vcpus: string
463   @param vcpus: the count of VCPUs the instance has
464   @type nics: list
465   @param nics: list of tuples (ip, bridge, mac) representing
466       the NICs the instance  has
467   @rtype: dict
468   @return: the hook environment for this instance
469
470   """
471   env = {
472     "OP_TARGET": name,
473     "INSTANCE_NAME": name,
474     "INSTANCE_PRIMARY": primary_node,
475     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
476     "INSTANCE_OS_TYPE": os_type,
477     "INSTANCE_STATUS": status,
478     "INSTANCE_MEMORY": memory,
479     "INSTANCE_VCPUS": vcpus,
480   }
481
482   if nics:
483     nic_count = len(nics)
484     for idx, (ip, bridge, mac) in enumerate(nics):
485       if ip is None:
486         ip = ""
487       env["INSTANCE_NIC%d_IP" % idx] = ip
488       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
489       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
490   else:
491     nic_count = 0
492
493   env["INSTANCE_NIC_COUNT"] = nic_count
494
495   return env
496
497
498 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
499   """Builds instance related env variables for hooks from an object.
500
501   @type lu: L{LogicalUnit}
502   @param lu: the logical unit on whose behalf we execute
503   @type instance: L{objects.Instance}
504   @param instance: the instance for which we should build the
505       environment
506   @type override: dict
507   @param override: dictionary with key/values that will override
508       our values
509   @rtype: dict
510   @return: the hook environment dictionary
511
512   """
513   bep = lu.cfg.GetClusterInfo().FillBE(instance)
514   args = {
515     'name': instance.name,
516     'primary_node': instance.primary_node,
517     'secondary_nodes': instance.secondary_nodes,
518     'os_type': instance.os,
519     'status': instance.os,
520     'memory': bep[constants.BE_MEMORY],
521     'vcpus': bep[constants.BE_VCPUS],
522     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
523   }
524   if override:
525     args.update(override)
526   return _BuildInstanceHookEnv(**args)
527
528
529 def _AdjustCandidatePool(lu):
530   """Adjust the candidate pool after node operations.
531
532   """
533   mod_list = lu.cfg.MaintainCandidatePool()
534   if mod_list:
535     lu.LogInfo("Promoted nodes to master candidate role: %s",
536                ", ".join(node.name for node in mod_list))
537     for name in mod_list:
538       lu.context.ReaddNode(name)
539   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
540   if mc_now > mc_max:
541     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
542                (mc_now, mc_max))
543
544
545 def _CheckInstanceBridgesExist(lu, instance):
546   """Check that the brigdes needed by an instance exist.
547
548   """
549   # check bridges existance
550   brlist = [nic.bridge for nic in instance.nics]
551   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
552   result.Raise()
553   if not result.data:
554     raise errors.OpPrereqError("One or more target bridges %s does not"
555                                " exist on destination node '%s'" %
556                                (brlist, instance.primary_node))
557
558
559 class LUDestroyCluster(NoHooksLU):
560   """Logical unit for destroying the cluster.
561
562   """
563   _OP_REQP = []
564
565   def CheckPrereq(self):
566     """Check prerequisites.
567
568     This checks whether the cluster is empty.
569
570     Any errors are signalled by raising errors.OpPrereqError.
571
572     """
573     master = self.cfg.GetMasterNode()
574
575     nodelist = self.cfg.GetNodeList()
576     if len(nodelist) != 1 or nodelist[0] != master:
577       raise errors.OpPrereqError("There are still %d node(s) in"
578                                  " this cluster." % (len(nodelist) - 1))
579     instancelist = self.cfg.GetInstanceList()
580     if instancelist:
581       raise errors.OpPrereqError("There are still %d instance(s) in"
582                                  " this cluster." % len(instancelist))
583
584   def Exec(self, feedback_fn):
585     """Destroys the cluster.
586
587     """
588     master = self.cfg.GetMasterNode()
589     result = self.rpc.call_node_stop_master(master, False)
590     result.Raise()
591     if not result.data:
592       raise errors.OpExecError("Could not disable the master role")
593     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
594     utils.CreateBackup(priv_key)
595     utils.CreateBackup(pub_key)
596     return master
597
598
599 class LUVerifyCluster(LogicalUnit):
600   """Verifies the cluster status.
601
602   """
603   HPATH = "cluster-verify"
604   HTYPE = constants.HTYPE_CLUSTER
605   _OP_REQP = ["skip_checks"]
606   REQ_BGL = False
607
608   def ExpandNames(self):
609     self.needed_locks = {
610       locking.LEVEL_NODE: locking.ALL_SET,
611       locking.LEVEL_INSTANCE: locking.ALL_SET,
612     }
613     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
614
615   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
616                   node_result, feedback_fn, master_files):
617     """Run multiple tests against a node.
618
619     Test list:
620
621       - compares ganeti version
622       - checks vg existance and size > 20G
623       - checks config file checksum
624       - checks ssh to other nodes
625
626     @type nodeinfo: L{objects.Node}
627     @param nodeinfo: the node to check
628     @param file_list: required list of files
629     @param local_cksum: dictionary of local files and their checksums
630     @param node_result: the results from the node
631     @param feedback_fn: function used to accumulate results
632     @param master_files: list of files that only masters should have
633
634     """
635     node = nodeinfo.name
636
637     # main result, node_result should be a non-empty dict
638     if not node_result or not isinstance(node_result, dict):
639       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
640       return True
641
642     # compares ganeti version
643     local_version = constants.PROTOCOL_VERSION
644     remote_version = node_result.get('version', None)
645     if not remote_version:
646       feedback_fn("  - ERROR: connection to %s failed" % (node))
647       return True
648
649     if local_version != remote_version:
650       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
651                       (local_version, node, remote_version))
652       return True
653
654     # checks vg existance and size > 20G
655
656     bad = False
657     vglist = node_result.get(constants.NV_VGLIST, None)
658     if not vglist:
659       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
660                       (node,))
661       bad = True
662     else:
663       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
664                                             constants.MIN_VG_SIZE)
665       if vgstatus:
666         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
667         bad = True
668
669     # checks config file checksum
670
671     remote_cksum = node_result.get(constants.NV_FILELIST, None)
672     if not isinstance(remote_cksum, dict):
673       bad = True
674       feedback_fn("  - ERROR: node hasn't returned file checksum data")
675     else:
676       for file_name in file_list:
677         node_is_mc = nodeinfo.master_candidate
678         must_have_file = file_name not in master_files
679         if file_name not in remote_cksum:
680           if node_is_mc or must_have_file:
681             bad = True
682             feedback_fn("  - ERROR: file '%s' missing" % file_name)
683         elif remote_cksum[file_name] != local_cksum[file_name]:
684           if node_is_mc or must_have_file:
685             bad = True
686             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
687           else:
688             # not candidate and this is not a must-have file
689             bad = True
690             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
691                         " '%s'" % file_name)
692         else:
693           # all good, except non-master/non-must have combination
694           if not node_is_mc and not must_have_file:
695             feedback_fn("  - ERROR: file '%s' should not exist on non master"
696                         " candidates" % file_name)
697
698     # checks ssh to any
699
700     if constants.NV_NODELIST not in node_result:
701       bad = True
702       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
703     else:
704       if node_result[constants.NV_NODELIST]:
705         bad = True
706         for node in node_result[constants.NV_NODELIST]:
707           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
708                           (node, node_result[constants.NV_NODELIST][node]))
709
710     if constants.NV_NODENETTEST not in node_result:
711       bad = True
712       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
713     else:
714       if node_result[constants.NV_NODENETTEST]:
715         bad = True
716         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
717         for node in nlist:
718           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
719                           (node, node_result[constants.NV_NODENETTEST][node]))
720
721     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
722     if isinstance(hyp_result, dict):
723       for hv_name, hv_result in hyp_result.iteritems():
724         if hv_result is not None:
725           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
726                       (hv_name, hv_result))
727     return bad
728
729   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
730                       node_instance, feedback_fn, n_offline):
731     """Verify an instance.
732
733     This function checks to see if the required block devices are
734     available on the instance's node.
735
736     """
737     bad = False
738
739     node_current = instanceconfig.primary_node
740
741     node_vol_should = {}
742     instanceconfig.MapLVsByNode(node_vol_should)
743
744     for node in node_vol_should:
745       if node in n_offline:
746         # ignore missing volumes on offline nodes
747         continue
748       for volume in node_vol_should[node]:
749         if node not in node_vol_is or volume not in node_vol_is[node]:
750           feedback_fn("  - ERROR: volume %s missing on node %s" %
751                           (volume, node))
752           bad = True
753
754     if not instanceconfig.status == 'down':
755       if ((node_current not in node_instance or
756           not instance in node_instance[node_current]) and
757           node_current not in n_offline):
758         feedback_fn("  - ERROR: instance %s not running on node %s" %
759                         (instance, node_current))
760         bad = True
761
762     for node in node_instance:
763       if (not node == node_current):
764         if instance in node_instance[node]:
765           feedback_fn("  - ERROR: instance %s should not run on node %s" %
766                           (instance, node))
767           bad = True
768
769     return bad
770
771   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
772     """Verify if there are any unknown volumes in the cluster.
773
774     The .os, .swap and backup volumes are ignored. All other volumes are
775     reported as unknown.
776
777     """
778     bad = False
779
780     for node in node_vol_is:
781       for volume in node_vol_is[node]:
782         if node not in node_vol_should or volume not in node_vol_should[node]:
783           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
784                       (volume, node))
785           bad = True
786     return bad
787
788   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
789     """Verify the list of running instances.
790
791     This checks what instances are running but unknown to the cluster.
792
793     """
794     bad = False
795     for node in node_instance:
796       for runninginstance in node_instance[node]:
797         if runninginstance not in instancelist:
798           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
799                           (runninginstance, node))
800           bad = True
801     return bad
802
803   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
804     """Verify N+1 Memory Resilience.
805
806     Check that if one single node dies we can still start all the instances it
807     was primary for.
808
809     """
810     bad = False
811
812     for node, nodeinfo in node_info.iteritems():
813       # This code checks that every node which is now listed as secondary has
814       # enough memory to host all instances it is supposed to should a single
815       # other node in the cluster fail.
816       # FIXME: not ready for failover to an arbitrary node
817       # FIXME: does not support file-backed instances
818       # WARNING: we currently take into account down instances as well as up
819       # ones, considering that even if they're down someone might want to start
820       # them even in the event of a node failure.
821       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
822         needed_mem = 0
823         for instance in instances:
824           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
825           if bep[constants.BE_AUTO_BALANCE]:
826             needed_mem += bep[constants.BE_MEMORY]
827         if nodeinfo['mfree'] < needed_mem:
828           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
829                       " failovers should node %s fail" % (node, prinode))
830           bad = True
831     return bad
832
833   def CheckPrereq(self):
834     """Check prerequisites.
835
836     Transform the list of checks we're going to skip into a set and check that
837     all its members are valid.
838
839     """
840     self.skip_set = frozenset(self.op.skip_checks)
841     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
842       raise errors.OpPrereqError("Invalid checks to be skipped specified")
843
844   def BuildHooksEnv(self):
845     """Build hooks env.
846
847     Cluster-Verify hooks just rone in the post phase and their failure makes
848     the output be logged in the verify output and the verification to fail.
849
850     """
851     all_nodes = self.cfg.GetNodeList()
852     # TODO: populate the environment with useful information for verify hooks
853     env = {}
854     return env, [], all_nodes
855
856   def Exec(self, feedback_fn):
857     """Verify integrity of cluster, performing various test on nodes.
858
859     """
860     bad = False
861     feedback_fn("* Verifying global settings")
862     for msg in self.cfg.VerifyConfig():
863       feedback_fn("  - ERROR: %s" % msg)
864
865     vg_name = self.cfg.GetVGName()
866     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
867     nodelist = utils.NiceSort(self.cfg.GetNodeList())
868     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
869     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
870     i_non_redundant = [] # Non redundant instances
871     i_non_a_balanced = [] # Non auto-balanced instances
872     n_offline = [] # List of offline nodes
873     node_volume = {}
874     node_instance = {}
875     node_info = {}
876     instance_cfg = {}
877
878     # FIXME: verify OS list
879     # do local checksums
880     master_files = [constants.CLUSTER_CONF_FILE]
881
882     file_names = ssconf.SimpleStore().GetFileList()
883     file_names.append(constants.SSL_CERT_FILE)
884     file_names.append(constants.RAPI_CERT_FILE)
885     file_names.extend(master_files)
886
887     local_checksums = utils.FingerprintFiles(file_names)
888
889     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
890     node_verify_param = {
891       constants.NV_FILELIST: file_names,
892       constants.NV_NODELIST: [node.name for node in nodeinfo
893                               if not node.offline],
894       constants.NV_HYPERVISOR: hypervisors,
895       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
896                                   node.secondary_ip) for node in nodeinfo
897                                  if not node.offline],
898       constants.NV_LVLIST: vg_name,
899       constants.NV_INSTANCELIST: hypervisors,
900       constants.NV_VGLIST: None,
901       constants.NV_VERSION: None,
902       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
903       }
904     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
905                                            self.cfg.GetClusterName())
906
907     cluster = self.cfg.GetClusterInfo()
908     master_node = self.cfg.GetMasterNode()
909     for node_i in nodeinfo:
910       node = node_i.name
911       nresult = all_nvinfo[node].data
912
913       if node_i.offline:
914         feedback_fn("* Skipping offline node %s" % (node,))
915         n_offline.append(node)
916         continue
917
918       if node == master_node:
919         ntype = "master"
920       elif node_i.master_candidate:
921         ntype = "master candidate"
922       else:
923         ntype = "regular"
924       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
925
926       if all_nvinfo[node].failed or not isinstance(nresult, dict):
927         feedback_fn("  - ERROR: connection to %s failed" % (node,))
928         bad = True
929         continue
930
931       result = self._VerifyNode(node_i, file_names, local_checksums,
932                                 nresult, feedback_fn, master_files)
933       bad = bad or result
934
935       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
936       if isinstance(lvdata, basestring):
937         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
938                     (node, lvdata.encode('string_escape')))
939         bad = True
940         node_volume[node] = {}
941       elif not isinstance(lvdata, dict):
942         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
943         bad = True
944         continue
945       else:
946         node_volume[node] = lvdata
947
948       # node_instance
949       idata = nresult.get(constants.NV_INSTANCELIST, None)
950       if not isinstance(idata, list):
951         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
952                     (node,))
953         bad = True
954         continue
955
956       node_instance[node] = idata
957
958       # node_info
959       nodeinfo = nresult.get(constants.NV_HVINFO, None)
960       if not isinstance(nodeinfo, dict):
961         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
962         bad = True
963         continue
964
965       try:
966         node_info[node] = {
967           "mfree": int(nodeinfo['memory_free']),
968           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
969           "pinst": [],
970           "sinst": [],
971           # dictionary holding all instances this node is secondary for,
972           # grouped by their primary node. Each key is a cluster node, and each
973           # value is a list of instances which have the key as primary and the
974           # current node as secondary.  this is handy to calculate N+1 memory
975           # availability if you can only failover from a primary to its
976           # secondary.
977           "sinst-by-pnode": {},
978         }
979       except ValueError:
980         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
981         bad = True
982         continue
983
984     node_vol_should = {}
985
986     for instance in instancelist:
987       feedback_fn("* Verifying instance %s" % instance)
988       inst_config = self.cfg.GetInstanceInfo(instance)
989       result =  self._VerifyInstance(instance, inst_config, node_volume,
990                                      node_instance, feedback_fn, n_offline)
991       bad = bad or result
992       inst_nodes_offline = []
993
994       inst_config.MapLVsByNode(node_vol_should)
995
996       instance_cfg[instance] = inst_config
997
998       pnode = inst_config.primary_node
999       if pnode in node_info:
1000         node_info[pnode]['pinst'].append(instance)
1001       elif pnode not in n_offline:
1002         feedback_fn("  - ERROR: instance %s, connection to primary node"
1003                     " %s failed" % (instance, pnode))
1004         bad = True
1005
1006       if pnode in n_offline:
1007         inst_nodes_offline.append(pnode)
1008
1009       # If the instance is non-redundant we cannot survive losing its primary
1010       # node, so we are not N+1 compliant. On the other hand we have no disk
1011       # templates with more than one secondary so that situation is not well
1012       # supported either.
1013       # FIXME: does not support file-backed instances
1014       if len(inst_config.secondary_nodes) == 0:
1015         i_non_redundant.append(instance)
1016       elif len(inst_config.secondary_nodes) > 1:
1017         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1018                     % instance)
1019
1020       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1021         i_non_a_balanced.append(instance)
1022
1023       for snode in inst_config.secondary_nodes:
1024         if snode in node_info:
1025           node_info[snode]['sinst'].append(instance)
1026           if pnode not in node_info[snode]['sinst-by-pnode']:
1027             node_info[snode]['sinst-by-pnode'][pnode] = []
1028           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1029         elif snode not in n_offline:
1030           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1031                       " %s failed" % (instance, snode))
1032           bad = True
1033         if snode in n_offline:
1034           inst_nodes_offline.append(snode)
1035
1036       if inst_nodes_offline:
1037         # warn that the instance lives on offline nodes, and set bad=True
1038         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1039                     ", ".join(inst_nodes_offline))
1040         bad = True
1041
1042     feedback_fn("* Verifying orphan volumes")
1043     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1044                                        feedback_fn)
1045     bad = bad or result
1046
1047     feedback_fn("* Verifying remaining instances")
1048     result = self._VerifyOrphanInstances(instancelist, node_instance,
1049                                          feedback_fn)
1050     bad = bad or result
1051
1052     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1053       feedback_fn("* Verifying N+1 Memory redundancy")
1054       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1055       bad = bad or result
1056
1057     feedback_fn("* Other Notes")
1058     if i_non_redundant:
1059       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1060                   % len(i_non_redundant))
1061
1062     if i_non_a_balanced:
1063       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1064                   % len(i_non_a_balanced))
1065
1066     if n_offline:
1067       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1068
1069     return not bad
1070
1071   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1072     """Analize the post-hooks' result
1073
1074     This method analyses the hook result, handles it, and sends some
1075     nicely-formatted feedback back to the user.
1076
1077     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1078         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1079     @param hooks_results: the results of the multi-node hooks rpc call
1080     @param feedback_fn: function used send feedback back to the caller
1081     @param lu_result: previous Exec result
1082     @return: the new Exec result, based on the previous result
1083         and hook results
1084
1085     """
1086     # We only really run POST phase hooks, and are only interested in
1087     # their results
1088     if phase == constants.HOOKS_PHASE_POST:
1089       # Used to change hooks' output to proper indentation
1090       indent_re = re.compile('^', re.M)
1091       feedback_fn("* Hooks Results")
1092       if not hooks_results:
1093         feedback_fn("  - ERROR: general communication failure")
1094         lu_result = 1
1095       else:
1096         for node_name in hooks_results:
1097           show_node_header = True
1098           res = hooks_results[node_name]
1099           if res.failed or res.data is False or not isinstance(res.data, list):
1100             if res.offline:
1101               # no need to warn or set fail return value
1102               continue
1103             feedback_fn("    Communication failure in hooks execution")
1104             lu_result = 1
1105             continue
1106           for script, hkr, output in res.data:
1107             if hkr == constants.HKR_FAIL:
1108               # The node header is only shown once, if there are
1109               # failing hooks on that node
1110               if show_node_header:
1111                 feedback_fn("  Node %s:" % node_name)
1112                 show_node_header = False
1113               feedback_fn("    ERROR: Script %s failed, output:" % script)
1114               output = indent_re.sub('      ', output)
1115               feedback_fn("%s" % output)
1116               lu_result = 1
1117
1118       return lu_result
1119
1120
1121 class LUVerifyDisks(NoHooksLU):
1122   """Verifies the cluster disks status.
1123
1124   """
1125   _OP_REQP = []
1126   REQ_BGL = False
1127
1128   def ExpandNames(self):
1129     self.needed_locks = {
1130       locking.LEVEL_NODE: locking.ALL_SET,
1131       locking.LEVEL_INSTANCE: locking.ALL_SET,
1132     }
1133     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1134
1135   def CheckPrereq(self):
1136     """Check prerequisites.
1137
1138     This has no prerequisites.
1139
1140     """
1141     pass
1142
1143   def Exec(self, feedback_fn):
1144     """Verify integrity of cluster disks.
1145
1146     """
1147     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1148
1149     vg_name = self.cfg.GetVGName()
1150     nodes = utils.NiceSort(self.cfg.GetNodeList())
1151     instances = [self.cfg.GetInstanceInfo(name)
1152                  for name in self.cfg.GetInstanceList()]
1153
1154     nv_dict = {}
1155     for inst in instances:
1156       inst_lvs = {}
1157       if (inst.status != "up" or
1158           inst.disk_template not in constants.DTS_NET_MIRROR):
1159         continue
1160       inst.MapLVsByNode(inst_lvs)
1161       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1162       for node, vol_list in inst_lvs.iteritems():
1163         for vol in vol_list:
1164           nv_dict[(node, vol)] = inst
1165
1166     if not nv_dict:
1167       return result
1168
1169     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1170
1171     to_act = set()
1172     for node in nodes:
1173       # node_volume
1174       lvs = node_lvs[node]
1175       if lvs.failed:
1176         if not lvs.offline:
1177           self.LogWarning("Connection to node %s failed: %s" %
1178                           (node, lvs.data))
1179         continue
1180       lvs = lvs.data
1181       if isinstance(lvs, basestring):
1182         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1183         res_nlvm[node] = lvs
1184       elif not isinstance(lvs, dict):
1185         logging.warning("Connection to node %s failed or invalid data"
1186                         " returned", node)
1187         res_nodes.append(node)
1188         continue
1189
1190       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1191         inst = nv_dict.pop((node, lv_name), None)
1192         if (not lv_online and inst is not None
1193             and inst.name not in res_instances):
1194           res_instances.append(inst.name)
1195
1196     # any leftover items in nv_dict are missing LVs, let's arrange the
1197     # data better
1198     for key, inst in nv_dict.iteritems():
1199       if inst.name not in res_missing:
1200         res_missing[inst.name] = []
1201       res_missing[inst.name].append(key)
1202
1203     return result
1204
1205
1206 class LURenameCluster(LogicalUnit):
1207   """Rename the cluster.
1208
1209   """
1210   HPATH = "cluster-rename"
1211   HTYPE = constants.HTYPE_CLUSTER
1212   _OP_REQP = ["name"]
1213
1214   def BuildHooksEnv(self):
1215     """Build hooks env.
1216
1217     """
1218     env = {
1219       "OP_TARGET": self.cfg.GetClusterName(),
1220       "NEW_NAME": self.op.name,
1221       }
1222     mn = self.cfg.GetMasterNode()
1223     return env, [mn], [mn]
1224
1225   def CheckPrereq(self):
1226     """Verify that the passed name is a valid one.
1227
1228     """
1229     hostname = utils.HostInfo(self.op.name)
1230
1231     new_name = hostname.name
1232     self.ip = new_ip = hostname.ip
1233     old_name = self.cfg.GetClusterName()
1234     old_ip = self.cfg.GetMasterIP()
1235     if new_name == old_name and new_ip == old_ip:
1236       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1237                                  " cluster has changed")
1238     if new_ip != old_ip:
1239       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1240         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1241                                    " reachable on the network. Aborting." %
1242                                    new_ip)
1243
1244     self.op.name = new_name
1245
1246   def Exec(self, feedback_fn):
1247     """Rename the cluster.
1248
1249     """
1250     clustername = self.op.name
1251     ip = self.ip
1252
1253     # shutdown the master IP
1254     master = self.cfg.GetMasterNode()
1255     result = self.rpc.call_node_stop_master(master, False)
1256     if result.failed or not result.data:
1257       raise errors.OpExecError("Could not disable the master role")
1258
1259     try:
1260       cluster = self.cfg.GetClusterInfo()
1261       cluster.cluster_name = clustername
1262       cluster.master_ip = ip
1263       self.cfg.Update(cluster)
1264
1265       # update the known hosts file
1266       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1267       node_list = self.cfg.GetNodeList()
1268       try:
1269         node_list.remove(master)
1270       except ValueError:
1271         pass
1272       result = self.rpc.call_upload_file(node_list,
1273                                          constants.SSH_KNOWN_HOSTS_FILE)
1274       for to_node, to_result in result.iteritems():
1275         if to_result.failed or not to_result.data:
1276           logging.error("Copy of file %s to node %s failed",
1277                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1278
1279     finally:
1280       result = self.rpc.call_node_start_master(master, False)
1281       if result.failed or not result.data:
1282         self.LogWarning("Could not re-enable the master role on"
1283                         " the master, please restart manually.")
1284
1285
1286 def _RecursiveCheckIfLVMBased(disk):
1287   """Check if the given disk or its children are lvm-based.
1288
1289   @type disk: L{objects.Disk}
1290   @param disk: the disk to check
1291   @rtype: booleean
1292   @return: boolean indicating whether a LD_LV dev_type was found or not
1293
1294   """
1295   if disk.children:
1296     for chdisk in disk.children:
1297       if _RecursiveCheckIfLVMBased(chdisk):
1298         return True
1299   return disk.dev_type == constants.LD_LV
1300
1301
1302 class LUSetClusterParams(LogicalUnit):
1303   """Change the parameters of the cluster.
1304
1305   """
1306   HPATH = "cluster-modify"
1307   HTYPE = constants.HTYPE_CLUSTER
1308   _OP_REQP = []
1309   REQ_BGL = False
1310
1311   def CheckParameters(self):
1312     """Check parameters
1313
1314     """
1315     if not hasattr(self.op, "candidate_pool_size"):
1316       self.op.candidate_pool_size = None
1317     if self.op.candidate_pool_size is not None:
1318       try:
1319         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1320       except ValueError, err:
1321         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1322                                    str(err))
1323       if self.op.candidate_pool_size < 1:
1324         raise errors.OpPrereqError("At least one master candidate needed")
1325
1326   def ExpandNames(self):
1327     # FIXME: in the future maybe other cluster params won't require checking on
1328     # all nodes to be modified.
1329     self.needed_locks = {
1330       locking.LEVEL_NODE: locking.ALL_SET,
1331     }
1332     self.share_locks[locking.LEVEL_NODE] = 1
1333
1334   def BuildHooksEnv(self):
1335     """Build hooks env.
1336
1337     """
1338     env = {
1339       "OP_TARGET": self.cfg.GetClusterName(),
1340       "NEW_VG_NAME": self.op.vg_name,
1341       }
1342     mn = self.cfg.GetMasterNode()
1343     return env, [mn], [mn]
1344
1345   def CheckPrereq(self):
1346     """Check prerequisites.
1347
1348     This checks whether the given params don't conflict and
1349     if the given volume group is valid.
1350
1351     """
1352     # FIXME: This only works because there is only one parameter that can be
1353     # changed or removed.
1354     if self.op.vg_name is not None and not self.op.vg_name:
1355       instances = self.cfg.GetAllInstancesInfo().values()
1356       for inst in instances:
1357         for disk in inst.disks:
1358           if _RecursiveCheckIfLVMBased(disk):
1359             raise errors.OpPrereqError("Cannot disable lvm storage while"
1360                                        " lvm-based instances exist")
1361
1362     node_list = self.acquired_locks[locking.LEVEL_NODE]
1363
1364     # if vg_name not None, checks given volume group on all nodes
1365     if self.op.vg_name:
1366       vglist = self.rpc.call_vg_list(node_list)
1367       for node in node_list:
1368         if vglist[node].failed:
1369           # ignoring down node
1370           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1371           continue
1372         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1373                                               self.op.vg_name,
1374                                               constants.MIN_VG_SIZE)
1375         if vgstatus:
1376           raise errors.OpPrereqError("Error on node '%s': %s" %
1377                                      (node, vgstatus))
1378
1379     self.cluster = cluster = self.cfg.GetClusterInfo()
1380     # validate beparams changes
1381     if self.op.beparams:
1382       utils.CheckBEParams(self.op.beparams)
1383       self.new_beparams = cluster.FillDict(
1384         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1385
1386     # hypervisor list/parameters
1387     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1388     if self.op.hvparams:
1389       if not isinstance(self.op.hvparams, dict):
1390         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1391       for hv_name, hv_dict in self.op.hvparams.items():
1392         if hv_name not in self.new_hvparams:
1393           self.new_hvparams[hv_name] = hv_dict
1394         else:
1395           self.new_hvparams[hv_name].update(hv_dict)
1396
1397     if self.op.enabled_hypervisors is not None:
1398       self.hv_list = self.op.enabled_hypervisors
1399     else:
1400       self.hv_list = cluster.enabled_hypervisors
1401
1402     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1403       # either the enabled list has changed, or the parameters have, validate
1404       for hv_name, hv_params in self.new_hvparams.items():
1405         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1406             (self.op.enabled_hypervisors and
1407              hv_name in self.op.enabled_hypervisors)):
1408           # either this is a new hypervisor, or its parameters have changed
1409           hv_class = hypervisor.GetHypervisor(hv_name)
1410           hv_class.CheckParameterSyntax(hv_params)
1411           _CheckHVParams(self, node_list, hv_name, hv_params)
1412
1413   def Exec(self, feedback_fn):
1414     """Change the parameters of the cluster.
1415
1416     """
1417     if self.op.vg_name is not None:
1418       if self.op.vg_name != self.cfg.GetVGName():
1419         self.cfg.SetVGName(self.op.vg_name)
1420       else:
1421         feedback_fn("Cluster LVM configuration already in desired"
1422                     " state, not changing")
1423     if self.op.hvparams:
1424       self.cluster.hvparams = self.new_hvparams
1425     if self.op.enabled_hypervisors is not None:
1426       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1427     if self.op.beparams:
1428       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1429     if self.op.candidate_pool_size is not None:
1430       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1431
1432     self.cfg.Update(self.cluster)
1433
1434     # we want to update nodes after the cluster so that if any errors
1435     # happen, we have recorded and saved the cluster info
1436     if self.op.candidate_pool_size is not None:
1437       _AdjustCandidatePool(self)
1438
1439
1440 class LURedistributeConfig(NoHooksLU):
1441   """Force the redistribution of cluster configuration.
1442
1443   This is a very simple LU.
1444
1445   """
1446   _OP_REQP = []
1447   REQ_BGL = False
1448
1449   def ExpandNames(self):
1450     self.needed_locks = {
1451       locking.LEVEL_NODE: locking.ALL_SET,
1452     }
1453     self.share_locks[locking.LEVEL_NODE] = 1
1454
1455   def CheckPrereq(self):
1456     """Check prerequisites.
1457
1458     """
1459
1460   def Exec(self, feedback_fn):
1461     """Redistribute the configuration.
1462
1463     """
1464     self.cfg.Update(self.cfg.GetClusterInfo())
1465
1466
1467 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1468   """Sleep and poll for an instance's disk to sync.
1469
1470   """
1471   if not instance.disks:
1472     return True
1473
1474   if not oneshot:
1475     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1476
1477   node = instance.primary_node
1478
1479   for dev in instance.disks:
1480     lu.cfg.SetDiskID(dev, node)
1481
1482   retries = 0
1483   while True:
1484     max_time = 0
1485     done = True
1486     cumul_degraded = False
1487     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1488     if rstats.failed or not rstats.data:
1489       lu.LogWarning("Can't get any data from node %s", node)
1490       retries += 1
1491       if retries >= 10:
1492         raise errors.RemoteError("Can't contact node %s for mirror data,"
1493                                  " aborting." % node)
1494       time.sleep(6)
1495       continue
1496     rstats = rstats.data
1497     retries = 0
1498     for i in range(len(rstats)):
1499       mstat = rstats[i]
1500       if mstat is None:
1501         lu.LogWarning("Can't compute data for node %s/%s",
1502                            node, instance.disks[i].iv_name)
1503         continue
1504       # we ignore the ldisk parameter
1505       perc_done, est_time, is_degraded, _ = mstat
1506       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1507       if perc_done is not None:
1508         done = False
1509         if est_time is not None:
1510           rem_time = "%d estimated seconds remaining" % est_time
1511           max_time = est_time
1512         else:
1513           rem_time = "no time estimate"
1514         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1515                         (instance.disks[i].iv_name, perc_done, rem_time))
1516     if done or oneshot:
1517       break
1518
1519     time.sleep(min(60, max_time))
1520
1521   if done:
1522     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1523   return not cumul_degraded
1524
1525
1526 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1527   """Check that mirrors are not degraded.
1528
1529   The ldisk parameter, if True, will change the test from the
1530   is_degraded attribute (which represents overall non-ok status for
1531   the device(s)) to the ldisk (representing the local storage status).
1532
1533   """
1534   lu.cfg.SetDiskID(dev, node)
1535   if ldisk:
1536     idx = 6
1537   else:
1538     idx = 5
1539
1540   result = True
1541   if on_primary or dev.AssembleOnSecondary():
1542     rstats = lu.rpc.call_blockdev_find(node, dev)
1543     if rstats.failed or not rstats.data:
1544       logging.warning("Node %s: disk degraded, not found or node down", node)
1545       result = False
1546     else:
1547       result = result and (not rstats.data[idx])
1548   if dev.children:
1549     for child in dev.children:
1550       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1551
1552   return result
1553
1554
1555 class LUDiagnoseOS(NoHooksLU):
1556   """Logical unit for OS diagnose/query.
1557
1558   """
1559   _OP_REQP = ["output_fields", "names"]
1560   REQ_BGL = False
1561   _FIELDS_STATIC = utils.FieldSet()
1562   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1563
1564   def ExpandNames(self):
1565     if self.op.names:
1566       raise errors.OpPrereqError("Selective OS query not supported")
1567
1568     _CheckOutputFields(static=self._FIELDS_STATIC,
1569                        dynamic=self._FIELDS_DYNAMIC,
1570                        selected=self.op.output_fields)
1571
1572     # Lock all nodes, in shared mode
1573     self.needed_locks = {}
1574     self.share_locks[locking.LEVEL_NODE] = 1
1575     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1576
1577   def CheckPrereq(self):
1578     """Check prerequisites.
1579
1580     """
1581
1582   @staticmethod
1583   def _DiagnoseByOS(node_list, rlist):
1584     """Remaps a per-node return list into an a per-os per-node dictionary
1585
1586     @param node_list: a list with the names of all nodes
1587     @param rlist: a map with node names as keys and OS objects as values
1588
1589     @rtype: dict
1590     @returns: a dictionary with osnames as keys and as value another map, with
1591         nodes as keys and list of OS objects as values, eg::
1592
1593           {"debian-etch": {"node1": [<object>,...],
1594                            "node2": [<object>,]}
1595           }
1596
1597     """
1598     all_os = {}
1599     for node_name, nr in rlist.iteritems():
1600       if nr.failed or not nr.data:
1601         continue
1602       for os_obj in nr.data:
1603         if os_obj.name not in all_os:
1604           # build a list of nodes for this os containing empty lists
1605           # for each node in node_list
1606           all_os[os_obj.name] = {}
1607           for nname in node_list:
1608             all_os[os_obj.name][nname] = []
1609         all_os[os_obj.name][node_name].append(os_obj)
1610     return all_os
1611
1612   def Exec(self, feedback_fn):
1613     """Compute the list of OSes.
1614
1615     """
1616     node_list = self.acquired_locks[locking.LEVEL_NODE]
1617     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1618                    if node in node_list]
1619     node_data = self.rpc.call_os_diagnose(valid_nodes)
1620     if node_data == False:
1621       raise errors.OpExecError("Can't gather the list of OSes")
1622     pol = self._DiagnoseByOS(valid_nodes, node_data)
1623     output = []
1624     for os_name, os_data in pol.iteritems():
1625       row = []
1626       for field in self.op.output_fields:
1627         if field == "name":
1628           val = os_name
1629         elif field == "valid":
1630           val = utils.all([osl and osl[0] for osl in os_data.values()])
1631         elif field == "node_status":
1632           val = {}
1633           for node_name, nos_list in os_data.iteritems():
1634             val[node_name] = [(v.status, v.path) for v in nos_list]
1635         else:
1636           raise errors.ParameterError(field)
1637         row.append(val)
1638       output.append(row)
1639
1640     return output
1641
1642
1643 class LURemoveNode(LogicalUnit):
1644   """Logical unit for removing a node.
1645
1646   """
1647   HPATH = "node-remove"
1648   HTYPE = constants.HTYPE_NODE
1649   _OP_REQP = ["node_name"]
1650
1651   def BuildHooksEnv(self):
1652     """Build hooks env.
1653
1654     This doesn't run on the target node in the pre phase as a failed
1655     node would then be impossible to remove.
1656
1657     """
1658     env = {
1659       "OP_TARGET": self.op.node_name,
1660       "NODE_NAME": self.op.node_name,
1661       }
1662     all_nodes = self.cfg.GetNodeList()
1663     all_nodes.remove(self.op.node_name)
1664     return env, all_nodes, all_nodes
1665
1666   def CheckPrereq(self):
1667     """Check prerequisites.
1668
1669     This checks:
1670      - the node exists in the configuration
1671      - it does not have primary or secondary instances
1672      - it's not the master
1673
1674     Any errors are signalled by raising errors.OpPrereqError.
1675
1676     """
1677     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1678     if node is None:
1679       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1680
1681     instance_list = self.cfg.GetInstanceList()
1682
1683     masternode = self.cfg.GetMasterNode()
1684     if node.name == masternode:
1685       raise errors.OpPrereqError("Node is the master node,"
1686                                  " you need to failover first.")
1687
1688     for instance_name in instance_list:
1689       instance = self.cfg.GetInstanceInfo(instance_name)
1690       if node.name == instance.primary_node:
1691         raise errors.OpPrereqError("Instance %s still running on the node,"
1692                                    " please remove first." % instance_name)
1693       if node.name in instance.secondary_nodes:
1694         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1695                                    " please remove first." % instance_name)
1696     self.op.node_name = node.name
1697     self.node = node
1698
1699   def Exec(self, feedback_fn):
1700     """Removes the node from the cluster.
1701
1702     """
1703     node = self.node
1704     logging.info("Stopping the node daemon and removing configs from node %s",
1705                  node.name)
1706
1707     self.context.RemoveNode(node.name)
1708
1709     self.rpc.call_node_leave_cluster(node.name)
1710
1711     # Promote nodes to master candidate as needed
1712     _AdjustCandidatePool(self)
1713
1714
1715 class LUQueryNodes(NoHooksLU):
1716   """Logical unit for querying nodes.
1717
1718   """
1719   _OP_REQP = ["output_fields", "names"]
1720   REQ_BGL = False
1721   _FIELDS_DYNAMIC = utils.FieldSet(
1722     "dtotal", "dfree",
1723     "mtotal", "mnode", "mfree",
1724     "bootid",
1725     "ctotal",
1726     )
1727
1728   _FIELDS_STATIC = utils.FieldSet(
1729     "name", "pinst_cnt", "sinst_cnt",
1730     "pinst_list", "sinst_list",
1731     "pip", "sip", "tags",
1732     "serial_no",
1733     "master_candidate",
1734     "master",
1735     "offline",
1736     )
1737
1738   def ExpandNames(self):
1739     _CheckOutputFields(static=self._FIELDS_STATIC,
1740                        dynamic=self._FIELDS_DYNAMIC,
1741                        selected=self.op.output_fields)
1742
1743     self.needed_locks = {}
1744     self.share_locks[locking.LEVEL_NODE] = 1
1745
1746     if self.op.names:
1747       self.wanted = _GetWantedNodes(self, self.op.names)
1748     else:
1749       self.wanted = locking.ALL_SET
1750
1751     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1752     if self.do_locking:
1753       # if we don't request only static fields, we need to lock the nodes
1754       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1755
1756
1757   def CheckPrereq(self):
1758     """Check prerequisites.
1759
1760     """
1761     # The validation of the node list is done in the _GetWantedNodes,
1762     # if non empty, and if empty, there's no validation to do
1763     pass
1764
1765   def Exec(self, feedback_fn):
1766     """Computes the list of nodes and their attributes.
1767
1768     """
1769     all_info = self.cfg.GetAllNodesInfo()
1770     if self.do_locking:
1771       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1772     elif self.wanted != locking.ALL_SET:
1773       nodenames = self.wanted
1774       missing = set(nodenames).difference(all_info.keys())
1775       if missing:
1776         raise errors.OpExecError(
1777           "Some nodes were removed before retrieving their data: %s" % missing)
1778     else:
1779       nodenames = all_info.keys()
1780
1781     nodenames = utils.NiceSort(nodenames)
1782     nodelist = [all_info[name] for name in nodenames]
1783
1784     # begin data gathering
1785
1786     if self.do_locking:
1787       live_data = {}
1788       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1789                                           self.cfg.GetHypervisorType())
1790       for name in nodenames:
1791         nodeinfo = node_data[name]
1792         if not nodeinfo.failed and nodeinfo.data:
1793           nodeinfo = nodeinfo.data
1794           fn = utils.TryConvert
1795           live_data[name] = {
1796             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1797             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1798             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1799             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1800             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1801             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1802             "bootid": nodeinfo.get('bootid', None),
1803             }
1804         else:
1805           live_data[name] = {}
1806     else:
1807       live_data = dict.fromkeys(nodenames, {})
1808
1809     node_to_primary = dict([(name, set()) for name in nodenames])
1810     node_to_secondary = dict([(name, set()) for name in nodenames])
1811
1812     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1813                              "sinst_cnt", "sinst_list"))
1814     if inst_fields & frozenset(self.op.output_fields):
1815       instancelist = self.cfg.GetInstanceList()
1816
1817       for instance_name in instancelist:
1818         inst = self.cfg.GetInstanceInfo(instance_name)
1819         if inst.primary_node in node_to_primary:
1820           node_to_primary[inst.primary_node].add(inst.name)
1821         for secnode in inst.secondary_nodes:
1822           if secnode in node_to_secondary:
1823             node_to_secondary[secnode].add(inst.name)
1824
1825     master_node = self.cfg.GetMasterNode()
1826
1827     # end data gathering
1828
1829     output = []
1830     for node in nodelist:
1831       node_output = []
1832       for field in self.op.output_fields:
1833         if field == "name":
1834           val = node.name
1835         elif field == "pinst_list":
1836           val = list(node_to_primary[node.name])
1837         elif field == "sinst_list":
1838           val = list(node_to_secondary[node.name])
1839         elif field == "pinst_cnt":
1840           val = len(node_to_primary[node.name])
1841         elif field == "sinst_cnt":
1842           val = len(node_to_secondary[node.name])
1843         elif field == "pip":
1844           val = node.primary_ip
1845         elif field == "sip":
1846           val = node.secondary_ip
1847         elif field == "tags":
1848           val = list(node.GetTags())
1849         elif field == "serial_no":
1850           val = node.serial_no
1851         elif field == "master_candidate":
1852           val = node.master_candidate
1853         elif field == "master":
1854           val = node.name == master_node
1855         elif field == "offline":
1856           val = node.offline
1857         elif self._FIELDS_DYNAMIC.Matches(field):
1858           val = live_data[node.name].get(field, None)
1859         else:
1860           raise errors.ParameterError(field)
1861         node_output.append(val)
1862       output.append(node_output)
1863
1864     return output
1865
1866
1867 class LUQueryNodeVolumes(NoHooksLU):
1868   """Logical unit for getting volumes on node(s).
1869
1870   """
1871   _OP_REQP = ["nodes", "output_fields"]
1872   REQ_BGL = False
1873   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1874   _FIELDS_STATIC = utils.FieldSet("node")
1875
1876   def ExpandNames(self):
1877     _CheckOutputFields(static=self._FIELDS_STATIC,
1878                        dynamic=self._FIELDS_DYNAMIC,
1879                        selected=self.op.output_fields)
1880
1881     self.needed_locks = {}
1882     self.share_locks[locking.LEVEL_NODE] = 1
1883     if not self.op.nodes:
1884       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1885     else:
1886       self.needed_locks[locking.LEVEL_NODE] = \
1887         _GetWantedNodes(self, self.op.nodes)
1888
1889   def CheckPrereq(self):
1890     """Check prerequisites.
1891
1892     This checks that the fields required are valid output fields.
1893
1894     """
1895     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1896
1897   def Exec(self, feedback_fn):
1898     """Computes the list of nodes and their attributes.
1899
1900     """
1901     nodenames = self.nodes
1902     volumes = self.rpc.call_node_volumes(nodenames)
1903
1904     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1905              in self.cfg.GetInstanceList()]
1906
1907     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1908
1909     output = []
1910     for node in nodenames:
1911       if node not in volumes or volumes[node].failed or not volumes[node].data:
1912         continue
1913
1914       node_vols = volumes[node].data[:]
1915       node_vols.sort(key=lambda vol: vol['dev'])
1916
1917       for vol in node_vols:
1918         node_output = []
1919         for field in self.op.output_fields:
1920           if field == "node":
1921             val = node
1922           elif field == "phys":
1923             val = vol['dev']
1924           elif field == "vg":
1925             val = vol['vg']
1926           elif field == "name":
1927             val = vol['name']
1928           elif field == "size":
1929             val = int(float(vol['size']))
1930           elif field == "instance":
1931             for inst in ilist:
1932               if node not in lv_by_node[inst]:
1933                 continue
1934               if vol['name'] in lv_by_node[inst][node]:
1935                 val = inst.name
1936                 break
1937             else:
1938               val = '-'
1939           else:
1940             raise errors.ParameterError(field)
1941           node_output.append(str(val))
1942
1943         output.append(node_output)
1944
1945     return output
1946
1947
1948 class LUAddNode(LogicalUnit):
1949   """Logical unit for adding node to the cluster.
1950
1951   """
1952   HPATH = "node-add"
1953   HTYPE = constants.HTYPE_NODE
1954   _OP_REQP = ["node_name"]
1955
1956   def BuildHooksEnv(self):
1957     """Build hooks env.
1958
1959     This will run on all nodes before, and on all nodes + the new node after.
1960
1961     """
1962     env = {
1963       "OP_TARGET": self.op.node_name,
1964       "NODE_NAME": self.op.node_name,
1965       "NODE_PIP": self.op.primary_ip,
1966       "NODE_SIP": self.op.secondary_ip,
1967       }
1968     nodes_0 = self.cfg.GetNodeList()
1969     nodes_1 = nodes_0 + [self.op.node_name, ]
1970     return env, nodes_0, nodes_1
1971
1972   def CheckPrereq(self):
1973     """Check prerequisites.
1974
1975     This checks:
1976      - the new node is not already in the config
1977      - it is resolvable
1978      - its parameters (single/dual homed) matches the cluster
1979
1980     Any errors are signalled by raising errors.OpPrereqError.
1981
1982     """
1983     node_name = self.op.node_name
1984     cfg = self.cfg
1985
1986     dns_data = utils.HostInfo(node_name)
1987
1988     node = dns_data.name
1989     primary_ip = self.op.primary_ip = dns_data.ip
1990     secondary_ip = getattr(self.op, "secondary_ip", None)
1991     if secondary_ip is None:
1992       secondary_ip = primary_ip
1993     if not utils.IsValidIP(secondary_ip):
1994       raise errors.OpPrereqError("Invalid secondary IP given")
1995     self.op.secondary_ip = secondary_ip
1996
1997     node_list = cfg.GetNodeList()
1998     if not self.op.readd and node in node_list:
1999       raise errors.OpPrereqError("Node %s is already in the configuration" %
2000                                  node)
2001     elif self.op.readd and node not in node_list:
2002       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2003
2004     for existing_node_name in node_list:
2005       existing_node = cfg.GetNodeInfo(existing_node_name)
2006
2007       if self.op.readd and node == existing_node_name:
2008         if (existing_node.primary_ip != primary_ip or
2009             existing_node.secondary_ip != secondary_ip):
2010           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2011                                      " address configuration as before")
2012         continue
2013
2014       if (existing_node.primary_ip == primary_ip or
2015           existing_node.secondary_ip == primary_ip or
2016           existing_node.primary_ip == secondary_ip or
2017           existing_node.secondary_ip == secondary_ip):
2018         raise errors.OpPrereqError("New node ip address(es) conflict with"
2019                                    " existing node %s" % existing_node.name)
2020
2021     # check that the type of the node (single versus dual homed) is the
2022     # same as for the master
2023     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2024     master_singlehomed = myself.secondary_ip == myself.primary_ip
2025     newbie_singlehomed = secondary_ip == primary_ip
2026     if master_singlehomed != newbie_singlehomed:
2027       if master_singlehomed:
2028         raise errors.OpPrereqError("The master has no private ip but the"
2029                                    " new node has one")
2030       else:
2031         raise errors.OpPrereqError("The master has a private ip but the"
2032                                    " new node doesn't have one")
2033
2034     # checks reachablity
2035     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2036       raise errors.OpPrereqError("Node not reachable by ping")
2037
2038     if not newbie_singlehomed:
2039       # check reachability from my secondary ip to newbie's secondary ip
2040       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2041                            source=myself.secondary_ip):
2042         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2043                                    " based ping to noded port")
2044
2045     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2046     mc_now, _ = self.cfg.GetMasterCandidateStats()
2047     master_candidate = mc_now < cp_size
2048
2049     self.new_node = objects.Node(name=node,
2050                                  primary_ip=primary_ip,
2051                                  secondary_ip=secondary_ip,
2052                                  master_candidate=master_candidate,
2053                                  offline=False)
2054
2055   def Exec(self, feedback_fn):
2056     """Adds the new node to the cluster.
2057
2058     """
2059     new_node = self.new_node
2060     node = new_node.name
2061
2062     # check connectivity
2063     result = self.rpc.call_version([node])[node]
2064     result.Raise()
2065     if result.data:
2066       if constants.PROTOCOL_VERSION == result.data:
2067         logging.info("Communication to node %s fine, sw version %s match",
2068                      node, result.data)
2069       else:
2070         raise errors.OpExecError("Version mismatch master version %s,"
2071                                  " node version %s" %
2072                                  (constants.PROTOCOL_VERSION, result.data))
2073     else:
2074       raise errors.OpExecError("Cannot get version from the new node")
2075
2076     # setup ssh on node
2077     logging.info("Copy ssh key to node %s", node)
2078     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2079     keyarray = []
2080     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2081                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2082                 priv_key, pub_key]
2083
2084     for i in keyfiles:
2085       f = open(i, 'r')
2086       try:
2087         keyarray.append(f.read())
2088       finally:
2089         f.close()
2090
2091     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2092                                     keyarray[2],
2093                                     keyarray[3], keyarray[4], keyarray[5])
2094
2095     if result.failed or not result.data:
2096       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2097
2098     # Add node to our /etc/hosts, and add key to known_hosts
2099     utils.AddHostToEtcHosts(new_node.name)
2100
2101     if new_node.secondary_ip != new_node.primary_ip:
2102       result = self.rpc.call_node_has_ip_address(new_node.name,
2103                                                  new_node.secondary_ip)
2104       if result.failed or not result.data:
2105         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2106                                  " you gave (%s). Please fix and re-run this"
2107                                  " command." % new_node.secondary_ip)
2108
2109     node_verify_list = [self.cfg.GetMasterNode()]
2110     node_verify_param = {
2111       'nodelist': [node],
2112       # TODO: do a node-net-test as well?
2113     }
2114
2115     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2116                                        self.cfg.GetClusterName())
2117     for verifier in node_verify_list:
2118       if result[verifier].failed or not result[verifier].data:
2119         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2120                                  " for remote verification" % verifier)
2121       if result[verifier].data['nodelist']:
2122         for failed in result[verifier].data['nodelist']:
2123           feedback_fn("ssh/hostname verification failed %s -> %s" %
2124                       (verifier, result[verifier]['nodelist'][failed]))
2125         raise errors.OpExecError("ssh/hostname verification failed.")
2126
2127     # Distribute updated /etc/hosts and known_hosts to all nodes,
2128     # including the node just added
2129     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2130     dist_nodes = self.cfg.GetNodeList()
2131     if not self.op.readd:
2132       dist_nodes.append(node)
2133     if myself.name in dist_nodes:
2134       dist_nodes.remove(myself.name)
2135
2136     logging.debug("Copying hosts and known_hosts to all nodes")
2137     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2138       result = self.rpc.call_upload_file(dist_nodes, fname)
2139       for to_node, to_result in result.iteritems():
2140         if to_result.failed or not to_result.data:
2141           logging.error("Copy of file %s to node %s failed", fname, to_node)
2142
2143     to_copy = []
2144     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2145       to_copy.append(constants.VNC_PASSWORD_FILE)
2146     for fname in to_copy:
2147       result = self.rpc.call_upload_file([node], fname)
2148       if result[node].failed or not result[node]:
2149         logging.error("Could not copy file %s to node %s", fname, node)
2150
2151     if self.op.readd:
2152       self.context.ReaddNode(new_node)
2153     else:
2154       self.context.AddNode(new_node)
2155
2156
2157 class LUSetNodeParams(LogicalUnit):
2158   """Modifies the parameters of a node.
2159
2160   """
2161   HPATH = "node-modify"
2162   HTYPE = constants.HTYPE_NODE
2163   _OP_REQP = ["node_name"]
2164   REQ_BGL = False
2165
2166   def CheckArguments(self):
2167     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2168     if node_name is None:
2169       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2170     self.op.node_name = node_name
2171     _CheckBooleanOpField(self.op, 'master_candidate')
2172     _CheckBooleanOpField(self.op, 'offline')
2173     if self.op.master_candidate is None and self.op.offline is None:
2174       raise errors.OpPrereqError("Please pass at least one modification")
2175     if self.op.offline == True and self.op.master_candidate == True:
2176       raise errors.OpPrereqError("Can't set the node into offline and"
2177                                  " master_candidate at the same time")
2178
2179   def ExpandNames(self):
2180     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2181
2182   def BuildHooksEnv(self):
2183     """Build hooks env.
2184
2185     This runs on the master node.
2186
2187     """
2188     env = {
2189       "OP_TARGET": self.op.node_name,
2190       "MASTER_CANDIDATE": str(self.op.master_candidate),
2191       "OFFLINE": str(self.op.offline),
2192       }
2193     nl = [self.cfg.GetMasterNode(),
2194           self.op.node_name]
2195     return env, nl, nl
2196
2197   def CheckPrereq(self):
2198     """Check prerequisites.
2199
2200     This only checks the instance list against the existing names.
2201
2202     """
2203     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2204
2205     if ((self.op.master_candidate == False or self.op.offline == True)
2206         and node.master_candidate):
2207       # we will demote the node from master_candidate
2208       if self.op.node_name == self.cfg.GetMasterNode():
2209         raise errors.OpPrereqError("The master node has to be a"
2210                                    " master candidate and online")
2211       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2212       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2213       if num_candidates <= cp_size:
2214         msg = ("Not enough master candidates (desired"
2215                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2216         if self.op.force:
2217           self.LogWarning(msg)
2218         else:
2219           raise errors.OpPrereqError(msg)
2220
2221     if (self.op.master_candidate == True and node.offline and
2222         not self.op.offline == False):
2223       raise errors.OpPrereqError("Can't set an offline node to"
2224                                  " master_candidate")
2225
2226     return
2227
2228   def Exec(self, feedback_fn):
2229     """Modifies a node.
2230
2231     """
2232     node = self.node
2233
2234     result = []
2235
2236     if self.op.offline is not None:
2237       node.offline = self.op.offline
2238       result.append(("offline", str(self.op.offline)))
2239       if self.op.offline == True and node.master_candidate:
2240         node.master_candidate = False
2241         result.append(("master_candidate", "auto-demotion due to offline"))
2242
2243     if self.op.master_candidate is not None:
2244       node.master_candidate = self.op.master_candidate
2245       result.append(("master_candidate", str(self.op.master_candidate)))
2246       if self.op.master_candidate == False:
2247         rrc = self.rpc.call_node_demote_from_mc(node.name)
2248         if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2249             or len(rrc.data) != 2):
2250           self.LogWarning("Node rpc error: %s" % rrc.error)
2251         elif not rrc.data[0]:
2252           self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2253
2254     # this will trigger configuration file update, if needed
2255     self.cfg.Update(node)
2256     # this will trigger job queue propagation or cleanup
2257     if self.op.node_name != self.cfg.GetMasterNode():
2258       self.context.ReaddNode(node)
2259
2260     return result
2261
2262
2263 class LUQueryClusterInfo(NoHooksLU):
2264   """Query cluster configuration.
2265
2266   """
2267   _OP_REQP = []
2268   REQ_BGL = False
2269
2270   def ExpandNames(self):
2271     self.needed_locks = {}
2272
2273   def CheckPrereq(self):
2274     """No prerequsites needed for this LU.
2275
2276     """
2277     pass
2278
2279   def Exec(self, feedback_fn):
2280     """Return cluster config.
2281
2282     """
2283     cluster = self.cfg.GetClusterInfo()
2284     result = {
2285       "software_version": constants.RELEASE_VERSION,
2286       "protocol_version": constants.PROTOCOL_VERSION,
2287       "config_version": constants.CONFIG_VERSION,
2288       "os_api_version": constants.OS_API_VERSION,
2289       "export_version": constants.EXPORT_VERSION,
2290       "architecture": (platform.architecture()[0], platform.machine()),
2291       "name": cluster.cluster_name,
2292       "master": cluster.master_node,
2293       "default_hypervisor": cluster.default_hypervisor,
2294       "enabled_hypervisors": cluster.enabled_hypervisors,
2295       "hvparams": cluster.hvparams,
2296       "beparams": cluster.beparams,
2297       "candidate_pool_size": cluster.candidate_pool_size,
2298       }
2299
2300     return result
2301
2302
2303 class LUQueryConfigValues(NoHooksLU):
2304   """Return configuration values.
2305
2306   """
2307   _OP_REQP = []
2308   REQ_BGL = False
2309   _FIELDS_DYNAMIC = utils.FieldSet()
2310   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2311
2312   def ExpandNames(self):
2313     self.needed_locks = {}
2314
2315     _CheckOutputFields(static=self._FIELDS_STATIC,
2316                        dynamic=self._FIELDS_DYNAMIC,
2317                        selected=self.op.output_fields)
2318
2319   def CheckPrereq(self):
2320     """No prerequisites.
2321
2322     """
2323     pass
2324
2325   def Exec(self, feedback_fn):
2326     """Dump a representation of the cluster config to the standard output.
2327
2328     """
2329     values = []
2330     for field in self.op.output_fields:
2331       if field == "cluster_name":
2332         entry = self.cfg.GetClusterName()
2333       elif field == "master_node":
2334         entry = self.cfg.GetMasterNode()
2335       elif field == "drain_flag":
2336         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2337       else:
2338         raise errors.ParameterError(field)
2339       values.append(entry)
2340     return values
2341
2342
2343 class LUActivateInstanceDisks(NoHooksLU):
2344   """Bring up an instance's disks.
2345
2346   """
2347   _OP_REQP = ["instance_name"]
2348   REQ_BGL = False
2349
2350   def ExpandNames(self):
2351     self._ExpandAndLockInstance()
2352     self.needed_locks[locking.LEVEL_NODE] = []
2353     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2354
2355   def DeclareLocks(self, level):
2356     if level == locking.LEVEL_NODE:
2357       self._LockInstancesNodes()
2358
2359   def CheckPrereq(self):
2360     """Check prerequisites.
2361
2362     This checks that the instance is in the cluster.
2363
2364     """
2365     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2366     assert self.instance is not None, \
2367       "Cannot retrieve locked instance %s" % self.op.instance_name
2368     _CheckNodeOnline(self, self.instance.primary_node)
2369
2370   def Exec(self, feedback_fn):
2371     """Activate the disks.
2372
2373     """
2374     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2375     if not disks_ok:
2376       raise errors.OpExecError("Cannot activate block devices")
2377
2378     return disks_info
2379
2380
2381 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2382   """Prepare the block devices for an instance.
2383
2384   This sets up the block devices on all nodes.
2385
2386   @type lu: L{LogicalUnit}
2387   @param lu: the logical unit on whose behalf we execute
2388   @type instance: L{objects.Instance}
2389   @param instance: the instance for whose disks we assemble
2390   @type ignore_secondaries: boolean
2391   @param ignore_secondaries: if true, errors on secondary nodes
2392       won't result in an error return from the function
2393   @return: False if the operation failed, otherwise a list of
2394       (host, instance_visible_name, node_visible_name)
2395       with the mapping from node devices to instance devices
2396
2397   """
2398   device_info = []
2399   disks_ok = True
2400   iname = instance.name
2401   # With the two passes mechanism we try to reduce the window of
2402   # opportunity for the race condition of switching DRBD to primary
2403   # before handshaking occured, but we do not eliminate it
2404
2405   # The proper fix would be to wait (with some limits) until the
2406   # connection has been made and drbd transitions from WFConnection
2407   # into any other network-connected state (Connected, SyncTarget,
2408   # SyncSource, etc.)
2409
2410   # 1st pass, assemble on all nodes in secondary mode
2411   for inst_disk in instance.disks:
2412     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2413       lu.cfg.SetDiskID(node_disk, node)
2414       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2415       if result.failed or not result:
2416         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2417                            " (is_primary=False, pass=1)",
2418                            inst_disk.iv_name, node)
2419         if not ignore_secondaries:
2420           disks_ok = False
2421
2422   # FIXME: race condition on drbd migration to primary
2423
2424   # 2nd pass, do only the primary node
2425   for inst_disk in instance.disks:
2426     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2427       if node != instance.primary_node:
2428         continue
2429       lu.cfg.SetDiskID(node_disk, node)
2430       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2431       if result.failed or not result:
2432         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2433                            " (is_primary=True, pass=2)",
2434                            inst_disk.iv_name, node)
2435         disks_ok = False
2436     device_info.append((instance.primary_node, inst_disk.iv_name, result.data))
2437
2438   # leave the disks configured for the primary node
2439   # this is a workaround that would be fixed better by
2440   # improving the logical/physical id handling
2441   for disk in instance.disks:
2442     lu.cfg.SetDiskID(disk, instance.primary_node)
2443
2444   return disks_ok, device_info
2445
2446
2447 def _StartInstanceDisks(lu, instance, force):
2448   """Start the disks of an instance.
2449
2450   """
2451   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2452                                            ignore_secondaries=force)
2453   if not disks_ok:
2454     _ShutdownInstanceDisks(lu, instance)
2455     if force is not None and not force:
2456       lu.proc.LogWarning("", hint="If the message above refers to a"
2457                          " secondary node,"
2458                          " you can retry the operation using '--force'.")
2459     raise errors.OpExecError("Disk consistency error")
2460
2461
2462 class LUDeactivateInstanceDisks(NoHooksLU):
2463   """Shutdown an instance's disks.
2464
2465   """
2466   _OP_REQP = ["instance_name"]
2467   REQ_BGL = False
2468
2469   def ExpandNames(self):
2470     self._ExpandAndLockInstance()
2471     self.needed_locks[locking.LEVEL_NODE] = []
2472     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2473
2474   def DeclareLocks(self, level):
2475     if level == locking.LEVEL_NODE:
2476       self._LockInstancesNodes()
2477
2478   def CheckPrereq(self):
2479     """Check prerequisites.
2480
2481     This checks that the instance is in the cluster.
2482
2483     """
2484     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2485     assert self.instance is not None, \
2486       "Cannot retrieve locked instance %s" % self.op.instance_name
2487
2488   def Exec(self, feedback_fn):
2489     """Deactivate the disks
2490
2491     """
2492     instance = self.instance
2493     _SafeShutdownInstanceDisks(self, instance)
2494
2495
2496 def _SafeShutdownInstanceDisks(lu, instance):
2497   """Shutdown block devices of an instance.
2498
2499   This function checks if an instance is running, before calling
2500   _ShutdownInstanceDisks.
2501
2502   """
2503   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2504                                       [instance.hypervisor])
2505   ins_l = ins_l[instance.primary_node]
2506   if ins_l.failed or not isinstance(ins_l.data, list):
2507     raise errors.OpExecError("Can't contact node '%s'" %
2508                              instance.primary_node)
2509
2510   if instance.name in ins_l.data:
2511     raise errors.OpExecError("Instance is running, can't shutdown"
2512                              " block devices.")
2513
2514   _ShutdownInstanceDisks(lu, instance)
2515
2516
2517 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2518   """Shutdown block devices of an instance.
2519
2520   This does the shutdown on all nodes of the instance.
2521
2522   If the ignore_primary is false, errors on the primary node are
2523   ignored.
2524
2525   """
2526   result = True
2527   for disk in instance.disks:
2528     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2529       lu.cfg.SetDiskID(top_disk, node)
2530       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2531       if result.failed or not result.data:
2532         logging.error("Could not shutdown block device %s on node %s",
2533                       disk.iv_name, node)
2534         if not ignore_primary or node != instance.primary_node:
2535           result = False
2536   return result
2537
2538
2539 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2540   """Checks if a node has enough free memory.
2541
2542   This function check if a given node has the needed amount of free
2543   memory. In case the node has less memory or we cannot get the
2544   information from the node, this function raise an OpPrereqError
2545   exception.
2546
2547   @type lu: C{LogicalUnit}
2548   @param lu: a logical unit from which we get configuration data
2549   @type node: C{str}
2550   @param node: the node to check
2551   @type reason: C{str}
2552   @param reason: string to use in the error message
2553   @type requested: C{int}
2554   @param requested: the amount of memory in MiB to check for
2555   @type hypervisor_name: C{str}
2556   @param hypervisor_name: the hypervisor to ask for memory stats
2557   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2558       we cannot check the node
2559
2560   """
2561   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2562   nodeinfo[node].Raise()
2563   free_mem = nodeinfo[node].data.get('memory_free')
2564   if not isinstance(free_mem, int):
2565     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2566                              " was '%s'" % (node, free_mem))
2567   if requested > free_mem:
2568     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2569                              " needed %s MiB, available %s MiB" %
2570                              (node, reason, requested, free_mem))
2571
2572
2573 class LUStartupInstance(LogicalUnit):
2574   """Starts an instance.
2575
2576   """
2577   HPATH = "instance-start"
2578   HTYPE = constants.HTYPE_INSTANCE
2579   _OP_REQP = ["instance_name", "force"]
2580   REQ_BGL = False
2581
2582   def ExpandNames(self):
2583     self._ExpandAndLockInstance()
2584
2585   def BuildHooksEnv(self):
2586     """Build hooks env.
2587
2588     This runs on master, primary and secondary nodes of the instance.
2589
2590     """
2591     env = {
2592       "FORCE": self.op.force,
2593       }
2594     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2595     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2596           list(self.instance.secondary_nodes))
2597     return env, nl, nl
2598
2599   def CheckPrereq(self):
2600     """Check prerequisites.
2601
2602     This checks that the instance is in the cluster.
2603
2604     """
2605     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2606     assert self.instance is not None, \
2607       "Cannot retrieve locked instance %s" % self.op.instance_name
2608
2609     _CheckNodeOnline(self, instance.primary_node)
2610
2611     bep = self.cfg.GetClusterInfo().FillBE(instance)
2612     # check bridges existance
2613     _CheckInstanceBridgesExist(self, instance)
2614
2615     _CheckNodeFreeMemory(self, instance.primary_node,
2616                          "starting instance %s" % instance.name,
2617                          bep[constants.BE_MEMORY], instance.hypervisor)
2618
2619   def Exec(self, feedback_fn):
2620     """Start the instance.
2621
2622     """
2623     instance = self.instance
2624     force = self.op.force
2625     extra_args = getattr(self.op, "extra_args", "")
2626
2627     self.cfg.MarkInstanceUp(instance.name)
2628
2629     node_current = instance.primary_node
2630
2631     _StartInstanceDisks(self, instance, force)
2632
2633     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2634     if result.failed or not result.data:
2635       _ShutdownInstanceDisks(self, instance)
2636       raise errors.OpExecError("Could not start instance")
2637
2638
2639 class LURebootInstance(LogicalUnit):
2640   """Reboot an instance.
2641
2642   """
2643   HPATH = "instance-reboot"
2644   HTYPE = constants.HTYPE_INSTANCE
2645   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2646   REQ_BGL = False
2647
2648   def ExpandNames(self):
2649     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2650                                    constants.INSTANCE_REBOOT_HARD,
2651                                    constants.INSTANCE_REBOOT_FULL]:
2652       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2653                                   (constants.INSTANCE_REBOOT_SOFT,
2654                                    constants.INSTANCE_REBOOT_HARD,
2655                                    constants.INSTANCE_REBOOT_FULL))
2656     self._ExpandAndLockInstance()
2657
2658   def BuildHooksEnv(self):
2659     """Build hooks env.
2660
2661     This runs on master, primary and secondary nodes of the instance.
2662
2663     """
2664     env = {
2665       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2666       }
2667     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2668     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2669           list(self.instance.secondary_nodes))
2670     return env, nl, nl
2671
2672   def CheckPrereq(self):
2673     """Check prerequisites.
2674
2675     This checks that the instance is in the cluster.
2676
2677     """
2678     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2679     assert self.instance is not None, \
2680       "Cannot retrieve locked instance %s" % self.op.instance_name
2681
2682     _CheckNodeOnline(self, instance.primary_node)
2683
2684     # check bridges existance
2685     _CheckInstanceBridgesExist(self, instance)
2686
2687   def Exec(self, feedback_fn):
2688     """Reboot the instance.
2689
2690     """
2691     instance = self.instance
2692     ignore_secondaries = self.op.ignore_secondaries
2693     reboot_type = self.op.reboot_type
2694     extra_args = getattr(self.op, "extra_args", "")
2695
2696     node_current = instance.primary_node
2697
2698     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2699                        constants.INSTANCE_REBOOT_HARD]:
2700       result = self.rpc.call_instance_reboot(node_current, instance,
2701                                              reboot_type, extra_args)
2702       if result.failed or not result.data:
2703         raise errors.OpExecError("Could not reboot instance")
2704     else:
2705       if not self.rpc.call_instance_shutdown(node_current, instance):
2706         raise errors.OpExecError("could not shutdown instance for full reboot")
2707       _ShutdownInstanceDisks(self, instance)
2708       _StartInstanceDisks(self, instance, ignore_secondaries)
2709       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2710       if result.failed or not result.data:
2711         _ShutdownInstanceDisks(self, instance)
2712         raise errors.OpExecError("Could not start instance for full reboot")
2713
2714     self.cfg.MarkInstanceUp(instance.name)
2715
2716
2717 class LUShutdownInstance(LogicalUnit):
2718   """Shutdown an instance.
2719
2720   """
2721   HPATH = "instance-stop"
2722   HTYPE = constants.HTYPE_INSTANCE
2723   _OP_REQP = ["instance_name"]
2724   REQ_BGL = False
2725
2726   def ExpandNames(self):
2727     self._ExpandAndLockInstance()
2728
2729   def BuildHooksEnv(self):
2730     """Build hooks env.
2731
2732     This runs on master, primary and secondary nodes of the instance.
2733
2734     """
2735     env = _BuildInstanceHookEnvByObject(self, self.instance)
2736     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2737           list(self.instance.secondary_nodes))
2738     return env, nl, nl
2739
2740   def CheckPrereq(self):
2741     """Check prerequisites.
2742
2743     This checks that the instance is in the cluster.
2744
2745     """
2746     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2747     assert self.instance is not None, \
2748       "Cannot retrieve locked instance %s" % self.op.instance_name
2749     _CheckNodeOnline(self, self.instance.primary_node)
2750
2751   def Exec(self, feedback_fn):
2752     """Shutdown the instance.
2753
2754     """
2755     instance = self.instance
2756     node_current = instance.primary_node
2757     self.cfg.MarkInstanceDown(instance.name)
2758     result = self.rpc.call_instance_shutdown(node_current, instance)
2759     if result.failed or not result.data:
2760       self.proc.LogWarning("Could not shutdown instance")
2761
2762     _ShutdownInstanceDisks(self, instance)
2763
2764
2765 class LUReinstallInstance(LogicalUnit):
2766   """Reinstall an instance.
2767
2768   """
2769   HPATH = "instance-reinstall"
2770   HTYPE = constants.HTYPE_INSTANCE
2771   _OP_REQP = ["instance_name"]
2772   REQ_BGL = False
2773
2774   def ExpandNames(self):
2775     self._ExpandAndLockInstance()
2776
2777   def BuildHooksEnv(self):
2778     """Build hooks env.
2779
2780     This runs on master, primary and secondary nodes of the instance.
2781
2782     """
2783     env = _BuildInstanceHookEnvByObject(self, self.instance)
2784     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2785           list(self.instance.secondary_nodes))
2786     return env, nl, nl
2787
2788   def CheckPrereq(self):
2789     """Check prerequisites.
2790
2791     This checks that the instance is in the cluster and is not running.
2792
2793     """
2794     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2795     assert instance is not None, \
2796       "Cannot retrieve locked instance %s" % self.op.instance_name
2797     _CheckNodeOnline(self, instance.primary_node)
2798
2799     if instance.disk_template == constants.DT_DISKLESS:
2800       raise errors.OpPrereqError("Instance '%s' has no disks" %
2801                                  self.op.instance_name)
2802     if instance.status != "down":
2803       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2804                                  self.op.instance_name)
2805     remote_info = self.rpc.call_instance_info(instance.primary_node,
2806                                               instance.name,
2807                                               instance.hypervisor)
2808     if remote_info.failed or remote_info.data:
2809       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2810                                  (self.op.instance_name,
2811                                   instance.primary_node))
2812
2813     self.op.os_type = getattr(self.op, "os_type", None)
2814     if self.op.os_type is not None:
2815       # OS verification
2816       pnode = self.cfg.GetNodeInfo(
2817         self.cfg.ExpandNodeName(instance.primary_node))
2818       if pnode is None:
2819         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2820                                    self.op.pnode)
2821       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2822       result.Raise()
2823       if not isinstance(result.data, objects.OS):
2824         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2825                                    " primary node"  % self.op.os_type)
2826
2827     self.instance = instance
2828
2829   def Exec(self, feedback_fn):
2830     """Reinstall the instance.
2831
2832     """
2833     inst = self.instance
2834
2835     if self.op.os_type is not None:
2836       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2837       inst.os = self.op.os_type
2838       self.cfg.Update(inst)
2839
2840     _StartInstanceDisks(self, inst, None)
2841     try:
2842       feedback_fn("Running the instance OS create scripts...")
2843       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2844       result.Raise()
2845       if not result.data:
2846         raise errors.OpExecError("Could not install OS for instance %s"
2847                                  " on node %s" %
2848                                  (inst.name, inst.primary_node))
2849     finally:
2850       _ShutdownInstanceDisks(self, inst)
2851
2852
2853 class LURenameInstance(LogicalUnit):
2854   """Rename an instance.
2855
2856   """
2857   HPATH = "instance-rename"
2858   HTYPE = constants.HTYPE_INSTANCE
2859   _OP_REQP = ["instance_name", "new_name"]
2860
2861   def BuildHooksEnv(self):
2862     """Build hooks env.
2863
2864     This runs on master, primary and secondary nodes of the instance.
2865
2866     """
2867     env = _BuildInstanceHookEnvByObject(self, self.instance)
2868     env["INSTANCE_NEW_NAME"] = self.op.new_name
2869     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2870           list(self.instance.secondary_nodes))
2871     return env, nl, nl
2872
2873   def CheckPrereq(self):
2874     """Check prerequisites.
2875
2876     This checks that the instance is in the cluster and is not running.
2877
2878     """
2879     instance = self.cfg.GetInstanceInfo(
2880       self.cfg.ExpandInstanceName(self.op.instance_name))
2881     if instance is None:
2882       raise errors.OpPrereqError("Instance '%s' not known" %
2883                                  self.op.instance_name)
2884     _CheckNodeOnline(self, instance.primary_node)
2885
2886     if instance.status != "down":
2887       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2888                                  self.op.instance_name)
2889     remote_info = self.rpc.call_instance_info(instance.primary_node,
2890                                               instance.name,
2891                                               instance.hypervisor)
2892     remote_info.Raise()
2893     if remote_info.data:
2894       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2895                                  (self.op.instance_name,
2896                                   instance.primary_node))
2897     self.instance = instance
2898
2899     # new name verification
2900     name_info = utils.HostInfo(self.op.new_name)
2901
2902     self.op.new_name = new_name = name_info.name
2903     instance_list = self.cfg.GetInstanceList()
2904     if new_name in instance_list:
2905       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2906                                  new_name)
2907
2908     if not getattr(self.op, "ignore_ip", False):
2909       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2910         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2911                                    (name_info.ip, new_name))
2912
2913
2914   def Exec(self, feedback_fn):
2915     """Reinstall the instance.
2916
2917     """
2918     inst = self.instance
2919     old_name = inst.name
2920
2921     if inst.disk_template == constants.DT_FILE:
2922       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2923
2924     self.cfg.RenameInstance(inst.name, self.op.new_name)
2925     # Change the instance lock. This is definitely safe while we hold the BGL
2926     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2927     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2928
2929     # re-read the instance from the configuration after rename
2930     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2931
2932     if inst.disk_template == constants.DT_FILE:
2933       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2934       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2935                                                      old_file_storage_dir,
2936                                                      new_file_storage_dir)
2937       result.Raise()
2938       if not result.data:
2939         raise errors.OpExecError("Could not connect to node '%s' to rename"
2940                                  " directory '%s' to '%s' (but the instance"
2941                                  " has been renamed in Ganeti)" % (
2942                                  inst.primary_node, old_file_storage_dir,
2943                                  new_file_storage_dir))
2944
2945       if not result.data[0]:
2946         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2947                                  " (but the instance has been renamed in"
2948                                  " Ganeti)" % (old_file_storage_dir,
2949                                                new_file_storage_dir))
2950
2951     _StartInstanceDisks(self, inst, None)
2952     try:
2953       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2954                                                  old_name)
2955       if result.failed or not result.data:
2956         msg = ("Could not run OS rename script for instance %s on node %s"
2957                " (but the instance has been renamed in Ganeti)" %
2958                (inst.name, inst.primary_node))
2959         self.proc.LogWarning(msg)
2960     finally:
2961       _ShutdownInstanceDisks(self, inst)
2962
2963
2964 class LURemoveInstance(LogicalUnit):
2965   """Remove an instance.
2966
2967   """
2968   HPATH = "instance-remove"
2969   HTYPE = constants.HTYPE_INSTANCE
2970   _OP_REQP = ["instance_name", "ignore_failures"]
2971   REQ_BGL = False
2972
2973   def ExpandNames(self):
2974     self._ExpandAndLockInstance()
2975     self.needed_locks[locking.LEVEL_NODE] = []
2976     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2977
2978   def DeclareLocks(self, level):
2979     if level == locking.LEVEL_NODE:
2980       self._LockInstancesNodes()
2981
2982   def BuildHooksEnv(self):
2983     """Build hooks env.
2984
2985     This runs on master, primary and secondary nodes of the instance.
2986
2987     """
2988     env = _BuildInstanceHookEnvByObject(self, self.instance)
2989     nl = [self.cfg.GetMasterNode()]
2990     return env, nl, nl
2991
2992   def CheckPrereq(self):
2993     """Check prerequisites.
2994
2995     This checks that the instance is in the cluster.
2996
2997     """
2998     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2999     assert self.instance is not None, \
3000       "Cannot retrieve locked instance %s" % self.op.instance_name
3001
3002   def Exec(self, feedback_fn):
3003     """Remove the instance.
3004
3005     """
3006     instance = self.instance
3007     logging.info("Shutting down instance %s on node %s",
3008                  instance.name, instance.primary_node)
3009
3010     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3011     if result.failed or not result.data:
3012       if self.op.ignore_failures:
3013         feedback_fn("Warning: can't shutdown instance")
3014       else:
3015         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3016                                  (instance.name, instance.primary_node))
3017
3018     logging.info("Removing block devices for instance %s", instance.name)
3019
3020     if not _RemoveDisks(self, instance):
3021       if self.op.ignore_failures:
3022         feedback_fn("Warning: can't remove instance's disks")
3023       else:
3024         raise errors.OpExecError("Can't remove instance's disks")
3025
3026     logging.info("Removing instance %s out of cluster config", instance.name)
3027
3028     self.cfg.RemoveInstance(instance.name)
3029     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3030
3031
3032 class LUQueryInstances(NoHooksLU):
3033   """Logical unit for querying instances.
3034
3035   """
3036   _OP_REQP = ["output_fields", "names"]
3037   REQ_BGL = False
3038   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3039                                     "admin_state", "admin_ram",
3040                                     "disk_template", "ip", "mac", "bridge",
3041                                     "sda_size", "sdb_size", "vcpus", "tags",
3042                                     "network_port", "beparams",
3043                                     "(disk).(size)/([0-9]+)",
3044                                     "(disk).(sizes)",
3045                                     "(nic).(mac|ip|bridge)/([0-9]+)",
3046                                     "(nic).(macs|ips|bridges)",
3047                                     "(disk|nic).(count)",
3048                                     "serial_no", "hypervisor", "hvparams",] +
3049                                   ["hv/%s" % name
3050                                    for name in constants.HVS_PARAMETERS] +
3051                                   ["be/%s" % name
3052                                    for name in constants.BES_PARAMETERS])
3053   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3054
3055
3056   def ExpandNames(self):
3057     _CheckOutputFields(static=self._FIELDS_STATIC,
3058                        dynamic=self._FIELDS_DYNAMIC,
3059                        selected=self.op.output_fields)
3060
3061     self.needed_locks = {}
3062     self.share_locks[locking.LEVEL_INSTANCE] = 1
3063     self.share_locks[locking.LEVEL_NODE] = 1
3064
3065     if self.op.names:
3066       self.wanted = _GetWantedInstances(self, self.op.names)
3067     else:
3068       self.wanted = locking.ALL_SET
3069
3070     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3071     if self.do_locking:
3072       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3073       self.needed_locks[locking.LEVEL_NODE] = []
3074       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3075
3076   def DeclareLocks(self, level):
3077     if level == locking.LEVEL_NODE and self.do_locking:
3078       self._LockInstancesNodes()
3079
3080   def CheckPrereq(self):
3081     """Check prerequisites.
3082
3083     """
3084     pass
3085
3086   def Exec(self, feedback_fn):
3087     """Computes the list of nodes and their attributes.
3088
3089     """
3090     all_info = self.cfg.GetAllInstancesInfo()
3091     if self.do_locking:
3092       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3093     elif self.wanted != locking.ALL_SET:
3094       instance_names = self.wanted
3095       missing = set(instance_names).difference(all_info.keys())
3096       if missing:
3097         raise errors.OpExecError(
3098           "Some instances were removed before retrieving their data: %s"
3099           % missing)
3100     else:
3101       instance_names = all_info.keys()
3102
3103     instance_names = utils.NiceSort(instance_names)
3104     instance_list = [all_info[iname] for iname in instance_names]
3105
3106     # begin data gathering
3107
3108     nodes = frozenset([inst.primary_node for inst in instance_list])
3109     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3110
3111     bad_nodes = []
3112     off_nodes = []
3113     if self.do_locking:
3114       live_data = {}
3115       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3116       for name in nodes:
3117         result = node_data[name]
3118         if result.offline:
3119           # offline nodes will be in both lists
3120           off_nodes.append(name)
3121         if result.failed:
3122           bad_nodes.append(name)
3123         else:
3124           if result.data:
3125             live_data.update(result.data)
3126             # else no instance is alive
3127     else:
3128       live_data = dict([(name, {}) for name in instance_names])
3129
3130     # end data gathering
3131
3132     HVPREFIX = "hv/"
3133     BEPREFIX = "be/"
3134     output = []
3135     for instance in instance_list:
3136       iout = []
3137       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3138       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3139       for field in self.op.output_fields:
3140         st_match = self._FIELDS_STATIC.Matches(field)
3141         if field == "name":
3142           val = instance.name
3143         elif field == "os":
3144           val = instance.os
3145         elif field == "pnode":
3146           val = instance.primary_node
3147         elif field == "snodes":
3148           val = list(instance.secondary_nodes)
3149         elif field == "admin_state":
3150           val = (instance.status != "down")
3151         elif field == "oper_state":
3152           if instance.primary_node in bad_nodes:
3153             val = None
3154           else:
3155             val = bool(live_data.get(instance.name))
3156         elif field == "status":
3157           if instance.primary_node in off_nodes:
3158             val = "ERROR_nodeoffline"
3159           elif instance.primary_node in bad_nodes:
3160             val = "ERROR_nodedown"
3161           else:
3162             running = bool(live_data.get(instance.name))
3163             if running:
3164               if instance.status != "down":
3165                 val = "running"
3166               else:
3167                 val = "ERROR_up"
3168             else:
3169               if instance.status != "down":
3170                 val = "ERROR_down"
3171               else:
3172                 val = "ADMIN_down"
3173         elif field == "oper_ram":
3174           if instance.primary_node in bad_nodes:
3175             val = None
3176           elif instance.name in live_data:
3177             val = live_data[instance.name].get("memory", "?")
3178           else:
3179             val = "-"
3180         elif field == "disk_template":
3181           val = instance.disk_template
3182         elif field == "ip":
3183           val = instance.nics[0].ip
3184         elif field == "bridge":
3185           val = instance.nics[0].bridge
3186         elif field == "mac":
3187           val = instance.nics[0].mac
3188         elif field == "sda_size" or field == "sdb_size":
3189           idx = ord(field[2]) - ord('a')
3190           try:
3191             val = instance.FindDisk(idx).size
3192           except errors.OpPrereqError:
3193             val = None
3194         elif field == "tags":
3195           val = list(instance.GetTags())
3196         elif field == "serial_no":
3197           val = instance.serial_no
3198         elif field == "network_port":
3199           val = instance.network_port
3200         elif field == "hypervisor":
3201           val = instance.hypervisor
3202         elif field == "hvparams":
3203           val = i_hv
3204         elif (field.startswith(HVPREFIX) and
3205               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3206           val = i_hv.get(field[len(HVPREFIX):], None)
3207         elif field == "beparams":
3208           val = i_be
3209         elif (field.startswith(BEPREFIX) and
3210               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3211           val = i_be.get(field[len(BEPREFIX):], None)
3212         elif st_match and st_match.groups():
3213           # matches a variable list
3214           st_groups = st_match.groups()
3215           if st_groups and st_groups[0] == "disk":
3216             if st_groups[1] == "count":
3217               val = len(instance.disks)
3218             elif st_groups[1] == "sizes":
3219               val = [disk.size for disk in instance.disks]
3220             elif st_groups[1] == "size":
3221               try:
3222                 val = instance.FindDisk(st_groups[2]).size
3223               except errors.OpPrereqError:
3224                 val = None
3225             else:
3226               assert False, "Unhandled disk parameter"
3227           elif st_groups[0] == "nic":
3228             if st_groups[1] == "count":
3229               val = len(instance.nics)
3230             elif st_groups[1] == "macs":
3231               val = [nic.mac for nic in instance.nics]
3232             elif st_groups[1] == "ips":
3233               val = [nic.ip for nic in instance.nics]
3234             elif st_groups[1] == "bridges":
3235               val = [nic.bridge for nic in instance.nics]
3236             else:
3237               # index-based item
3238               nic_idx = int(st_groups[2])
3239               if nic_idx >= len(instance.nics):
3240                 val = None
3241               else:
3242                 if st_groups[1] == "mac":
3243                   val = instance.nics[nic_idx].mac
3244                 elif st_groups[1] == "ip":
3245                   val = instance.nics[nic_idx].ip
3246                 elif st_groups[1] == "bridge":
3247                   val = instance.nics[nic_idx].bridge
3248                 else:
3249                   assert False, "Unhandled NIC parameter"
3250           else:
3251             assert False, "Unhandled variable parameter"
3252         else:
3253           raise errors.ParameterError(field)
3254         iout.append(val)
3255       output.append(iout)
3256
3257     return output
3258
3259
3260 class LUFailoverInstance(LogicalUnit):
3261   """Failover an instance.
3262
3263   """
3264   HPATH = "instance-failover"
3265   HTYPE = constants.HTYPE_INSTANCE
3266   _OP_REQP = ["instance_name", "ignore_consistency"]
3267   REQ_BGL = False
3268
3269   def ExpandNames(self):
3270     self._ExpandAndLockInstance()
3271     self.needed_locks[locking.LEVEL_NODE] = []
3272     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3273
3274   def DeclareLocks(self, level):
3275     if level == locking.LEVEL_NODE:
3276       self._LockInstancesNodes()
3277
3278   def BuildHooksEnv(self):
3279     """Build hooks env.
3280
3281     This runs on master, primary and secondary nodes of the instance.
3282
3283     """
3284     env = {
3285       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3286       }
3287     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3288     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3289     return env, nl, nl
3290
3291   def CheckPrereq(self):
3292     """Check prerequisites.
3293
3294     This checks that the instance is in the cluster.
3295
3296     """
3297     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3298     assert self.instance is not None, \
3299       "Cannot retrieve locked instance %s" % self.op.instance_name
3300
3301     bep = self.cfg.GetClusterInfo().FillBE(instance)
3302     if instance.disk_template not in constants.DTS_NET_MIRROR:
3303       raise errors.OpPrereqError("Instance's disk layout is not"
3304                                  " network mirrored, cannot failover.")
3305
3306     secondary_nodes = instance.secondary_nodes
3307     if not secondary_nodes:
3308       raise errors.ProgrammerError("no secondary node but using "
3309                                    "a mirrored disk template")
3310
3311     target_node = secondary_nodes[0]
3312     _CheckNodeOnline(self, target_node)
3313     # check memory requirements on the secondary node
3314     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3315                          instance.name, bep[constants.BE_MEMORY],
3316                          instance.hypervisor)
3317
3318     # check bridge existance
3319     brlist = [nic.bridge for nic in instance.nics]
3320     result = self.rpc.call_bridges_exist(target_node, brlist)
3321     result.Raise()
3322     if not result.data:
3323       raise errors.OpPrereqError("One or more target bridges %s does not"
3324                                  " exist on destination node '%s'" %
3325                                  (brlist, target_node))
3326
3327   def Exec(self, feedback_fn):
3328     """Failover an instance.
3329
3330     The failover is done by shutting it down on its present node and
3331     starting it on the secondary.
3332
3333     """
3334     instance = self.instance
3335
3336     source_node = instance.primary_node
3337     target_node = instance.secondary_nodes[0]
3338
3339     feedback_fn("* checking disk consistency between source and target")
3340     for dev in instance.disks:
3341       # for drbd, these are drbd over lvm
3342       if not _CheckDiskConsistency(self, dev, target_node, False):
3343         if instance.status == "up" and not self.op.ignore_consistency:
3344           raise errors.OpExecError("Disk %s is degraded on target node,"
3345                                    " aborting failover." % dev.iv_name)
3346
3347     feedback_fn("* shutting down instance on source node")
3348     logging.info("Shutting down instance %s on node %s",
3349                  instance.name, source_node)
3350
3351     result = self.rpc.call_instance_shutdown(source_node, instance)
3352     if result.failed or not result.data:
3353       if self.op.ignore_consistency:
3354         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3355                              " Proceeding"
3356                              " anyway. Please make sure node %s is down",
3357                              instance.name, source_node, source_node)
3358       else:
3359         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3360                                  (instance.name, source_node))
3361
3362     feedback_fn("* deactivating the instance's disks on source node")
3363     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3364       raise errors.OpExecError("Can't shut down the instance's disks.")
3365
3366     instance.primary_node = target_node
3367     # distribute new instance config to the other nodes
3368     self.cfg.Update(instance)
3369
3370     # Only start the instance if it's marked as up
3371     if instance.status == "up":
3372       feedback_fn("* activating the instance's disks on target node")
3373       logging.info("Starting instance %s on node %s",
3374                    instance.name, target_node)
3375
3376       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3377                                                ignore_secondaries=True)
3378       if not disks_ok:
3379         _ShutdownInstanceDisks(self, instance)
3380         raise errors.OpExecError("Can't activate the instance's disks")
3381
3382       feedback_fn("* starting the instance on the target node")
3383       result = self.rpc.call_instance_start(target_node, instance, None)
3384       if result.failed or not result.data:
3385         _ShutdownInstanceDisks(self, instance)
3386         raise errors.OpExecError("Could not start instance %s on node %s." %
3387                                  (instance.name, target_node))
3388
3389
3390 class LUMigrateInstance(LogicalUnit):
3391   """Migrate an instance.
3392
3393   This is migration without shutting down, compared to the failover,
3394   which is done with shutdown.
3395
3396   """
3397   HPATH = "instance-migrate"
3398   HTYPE = constants.HTYPE_INSTANCE
3399   _OP_REQP = ["instance_name", "live", "cleanup"]
3400
3401   REQ_BGL = False
3402
3403   def ExpandNames(self):
3404     self._ExpandAndLockInstance()
3405     self.needed_locks[locking.LEVEL_NODE] = []
3406     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3407
3408   def DeclareLocks(self, level):
3409     if level == locking.LEVEL_NODE:
3410       self._LockInstancesNodes()
3411
3412   def BuildHooksEnv(self):
3413     """Build hooks env.
3414
3415     This runs on master, primary and secondary nodes of the instance.
3416
3417     """
3418     env = _BuildInstanceHookEnvByObject(self, self.instance)
3419     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3420     return env, nl, nl
3421
3422   def CheckPrereq(self):
3423     """Check prerequisites.
3424
3425     This checks that the instance is in the cluster.
3426
3427     """
3428     instance = self.cfg.GetInstanceInfo(
3429       self.cfg.ExpandInstanceName(self.op.instance_name))
3430     if instance is None:
3431       raise errors.OpPrereqError("Instance '%s' not known" %
3432                                  self.op.instance_name)
3433
3434     if instance.disk_template != constants.DT_DRBD8:
3435       raise errors.OpPrereqError("Instance's disk layout is not"
3436                                  " drbd8, cannot migrate.")
3437
3438     secondary_nodes = instance.secondary_nodes
3439     if not secondary_nodes:
3440       raise errors.ProgrammerError("no secondary node but using "
3441                                    "drbd8 disk template")
3442
3443     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3444
3445     target_node = secondary_nodes[0]
3446     # check memory requirements on the secondary node
3447     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3448                          instance.name, i_be[constants.BE_MEMORY],
3449                          instance.hypervisor)
3450
3451     # check bridge existance
3452     brlist = [nic.bridge for nic in instance.nics]
3453     result = self.rpc.call_bridges_exist(target_node, brlist)
3454     if result.failed or not result.data:
3455       raise errors.OpPrereqError("One or more target bridges %s does not"
3456                                  " exist on destination node '%s'" %
3457                                  (brlist, target_node))
3458
3459     if not self.op.cleanup:
3460       result = self.rpc.call_instance_migratable(instance.primary_node,
3461                                                  instance)
3462       msg = result.RemoteFailMsg()
3463       if msg:
3464         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3465                                    msg)
3466
3467     self.instance = instance
3468
3469   def _WaitUntilSync(self):
3470     """Poll with custom rpc for disk sync.
3471
3472     This uses our own step-based rpc call.
3473
3474     """
3475     self.feedback_fn("* wait until resync is done")
3476     all_done = False
3477     while not all_done:
3478       all_done = True
3479       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3480                                             self.nodes_ip,
3481                                             self.instance.disks)
3482       min_percent = 100
3483       for node, nres in result.items():
3484         msg = nres.RemoteFailMsg()
3485         if msg:
3486           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3487                                    (node, msg))
3488         node_done, node_percent = nres.data[1]
3489         all_done = all_done and node_done
3490         if node_percent is not None:
3491           min_percent = min(min_percent, node_percent)
3492       if not all_done:
3493         if min_percent < 100:
3494           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3495         time.sleep(2)
3496
3497   def _EnsureSecondary(self, node):
3498     """Demote a node to secondary.
3499
3500     """
3501     self.feedback_fn("* switching node %s to secondary mode" % node)
3502
3503     for dev in self.instance.disks:
3504       self.cfg.SetDiskID(dev, node)
3505
3506     result = self.rpc.call_blockdev_close(node, self.instance.name,
3507                                           self.instance.disks)
3508     msg = result.RemoteFailMsg()
3509     if msg:
3510       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3511                                " error %s" % (node, msg))
3512
3513   def _GoStandalone(self):
3514     """Disconnect from the network.
3515
3516     """
3517     self.feedback_fn("* changing into standalone mode")
3518     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3519                                                self.instance.disks)
3520     for node, nres in result.items():
3521       msg = nres.RemoteFailMsg()
3522       if msg:
3523         raise errors.OpExecError("Cannot disconnect disks node %s,"
3524                                  " error %s" % (node, msg))
3525
3526   def _GoReconnect(self, multimaster):
3527     """Reconnect to the network.
3528
3529     """
3530     if multimaster:
3531       msg = "dual-master"
3532     else:
3533       msg = "single-master"
3534     self.feedback_fn("* changing disks into %s mode" % msg)
3535     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3536                                            self.instance.disks,
3537                                            self.instance.name, multimaster)
3538     for node, nres in result.items():
3539       msg = nres.RemoteFailMsg()
3540       if msg:
3541         raise errors.OpExecError("Cannot change disks config on node %s,"
3542                                  " error: %s" % (node, msg))
3543
3544   def _ExecCleanup(self):
3545     """Try to cleanup after a failed migration.
3546
3547     The cleanup is done by:
3548       - check that the instance is running only on one node
3549         (and update the config if needed)
3550       - change disks on its secondary node to secondary
3551       - wait until disks are fully synchronized
3552       - disconnect from the network
3553       - change disks into single-master mode
3554       - wait again until disks are fully synchronized
3555
3556     """
3557     instance = self.instance
3558     target_node = self.target_node
3559     source_node = self.source_node
3560
3561     # check running on only one node
3562     self.feedback_fn("* checking where the instance actually runs"
3563                      " (if this hangs, the hypervisor might be in"
3564                      " a bad state)")
3565     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3566     for node, result in ins_l.items():
3567       result.Raise()
3568       if not isinstance(result.data, list):
3569         raise errors.OpExecError("Can't contact node '%s'" % node)
3570
3571     runningon_source = instance.name in ins_l[source_node].data
3572     runningon_target = instance.name in ins_l[target_node].data
3573
3574     if runningon_source and runningon_target:
3575       raise errors.OpExecError("Instance seems to be running on two nodes,"
3576                                " or the hypervisor is confused. You will have"
3577                                " to ensure manually that it runs only on one"
3578                                " and restart this operation.")
3579
3580     if not (runningon_source or runningon_target):
3581       raise errors.OpExecError("Instance does not seem to be running at all."
3582                                " In this case, it's safer to repair by"
3583                                " running 'gnt-instance stop' to ensure disk"
3584                                " shutdown, and then restarting it.")
3585
3586     if runningon_target:
3587       # the migration has actually succeeded, we need to update the config
3588       self.feedback_fn("* instance running on secondary node (%s),"
3589                        " updating config" % target_node)
3590       instance.primary_node = target_node
3591       self.cfg.Update(instance)
3592       demoted_node = source_node
3593     else:
3594       self.feedback_fn("* instance confirmed to be running on its"
3595                        " primary node (%s)" % source_node)
3596       demoted_node = target_node
3597
3598     self._EnsureSecondary(demoted_node)
3599     try:
3600       self._WaitUntilSync()
3601     except errors.OpExecError:
3602       # we ignore here errors, since if the device is standalone, it
3603       # won't be able to sync
3604       pass
3605     self._GoStandalone()
3606     self._GoReconnect(False)
3607     self._WaitUntilSync()
3608
3609     self.feedback_fn("* done")
3610
3611   def _ExecMigration(self):
3612     """Migrate an instance.
3613
3614     The migrate is done by:
3615       - change the disks into dual-master mode
3616       - wait until disks are fully synchronized again
3617       - migrate the instance
3618       - change disks on the new secondary node (the old primary) to secondary
3619       - wait until disks are fully synchronized
3620       - change disks into single-master mode
3621
3622     """
3623     instance = self.instance
3624     target_node = self.target_node
3625     source_node = self.source_node
3626
3627     self.feedback_fn("* checking disk consistency between source and target")
3628     for dev in instance.disks:
3629       if not _CheckDiskConsistency(self, dev, target_node, False):
3630         raise errors.OpExecError("Disk %s is degraded or not fully"
3631                                  " synchronized on target node,"
3632                                  " aborting migrate." % dev.iv_name)
3633
3634     self._EnsureSecondary(target_node)
3635     self._GoStandalone()
3636     self._GoReconnect(True)
3637     self._WaitUntilSync()
3638
3639     self.feedback_fn("* migrating instance to %s" % target_node)
3640     time.sleep(10)
3641     result = self.rpc.call_instance_migrate(source_node, instance,
3642                                             self.nodes_ip[target_node],
3643                                             self.op.live)
3644     msg = result.RemoteFailMsg()
3645     if msg:
3646       logging.error("Instance migration failed, trying to revert"
3647                     " disk status: %s", msg)
3648       try:
3649         self._EnsureSecondary(target_node)
3650         self._GoStandalone()
3651         self._GoReconnect(False)
3652         self._WaitUntilSync()
3653       except errors.OpExecError, err:
3654         self.LogWarning("Migration failed and I can't reconnect the"
3655                         " drives: error '%s'\n"
3656                         "Please look and recover the instance status" %
3657                         str(err))
3658
3659       raise errors.OpExecError("Could not migrate instance %s: %s" %
3660                                (instance.name, msg))
3661     time.sleep(10)
3662
3663     instance.primary_node = target_node
3664     # distribute new instance config to the other nodes
3665     self.cfg.Update(instance)
3666
3667     self._EnsureSecondary(source_node)
3668     self._WaitUntilSync()
3669     self._GoStandalone()
3670     self._GoReconnect(False)
3671     self._WaitUntilSync()
3672
3673     self.feedback_fn("* done")
3674
3675   def Exec(self, feedback_fn):
3676     """Perform the migration.
3677
3678     """
3679     self.feedback_fn = feedback_fn
3680
3681     self.source_node = self.instance.primary_node
3682     self.target_node = self.instance.secondary_nodes[0]
3683     self.all_nodes = [self.source_node, self.target_node]
3684     self.nodes_ip = {
3685       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3686       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3687       }
3688     if self.op.cleanup:
3689       return self._ExecCleanup()
3690     else:
3691       return self._ExecMigration()
3692
3693
3694 def _CreateBlockDev(lu, node, instance, device, force_create,
3695                     info, force_open):
3696   """Create a tree of block devices on a given node.
3697
3698   If this device type has to be created on secondaries, create it and
3699   all its children.
3700
3701   If not, just recurse to children keeping the same 'force' value.
3702
3703   @param lu: the lu on whose behalf we execute
3704   @param node: the node on which to create the device
3705   @type instance: L{objects.Instance}
3706   @param instance: the instance which owns the device
3707   @type device: L{objects.Disk}
3708   @param device: the device to create
3709   @type force_create: boolean
3710   @param force_create: whether to force creation of this device; this
3711       will be change to True whenever we find a device which has
3712       CreateOnSecondary() attribute
3713   @param info: the extra 'metadata' we should attach to the device
3714       (this will be represented as a LVM tag)
3715   @type force_open: boolean
3716   @param force_open: this parameter will be passes to the
3717       L{backend.CreateBlockDevice} function where it specifies
3718       whether we run on primary or not, and it affects both
3719       the child assembly and the device own Open() execution
3720
3721   """
3722   if device.CreateOnSecondary():
3723     force_create = True
3724
3725   if device.children:
3726     for child in device.children:
3727       _CreateBlockDev(lu, node, instance, child, force_create,
3728                       info, force_open)
3729
3730   if not force_create:
3731     return
3732
3733   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3734
3735
3736 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3737   """Create a single block device on a given node.
3738
3739   This will not recurse over children of the device, so they must be
3740   created in advance.
3741
3742   @param lu: the lu on whose behalf we execute
3743   @param node: the node on which to create the device
3744   @type instance: L{objects.Instance}
3745   @param instance: the instance which owns the device
3746   @type device: L{objects.Disk}
3747   @param device: the device to create
3748   @param info: the extra 'metadata' we should attach to the device
3749       (this will be represented as a LVM tag)
3750   @type force_open: boolean
3751   @param force_open: this parameter will be passes to the
3752       L{backend.CreateBlockDevice} function where it specifies
3753       whether we run on primary or not, and it affects both
3754       the child assembly and the device own Open() execution
3755
3756   """
3757   lu.cfg.SetDiskID(device, node)
3758   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3759                                        instance.name, force_open, info)
3760   if new_id.failed or not new_id.data:
3761     raise errors.OpExecError("Can't create block device %s on"
3762                              " node %s for instance %s" %
3763                              (device, node, instance.name))
3764   if device.physical_id is None:
3765     device.physical_id = new_id
3766
3767
3768 def _GenerateUniqueNames(lu, exts):
3769   """Generate a suitable LV name.
3770
3771   This will generate a logical volume name for the given instance.
3772
3773   """
3774   results = []
3775   for val in exts:
3776     new_id = lu.cfg.GenerateUniqueID()
3777     results.append("%s%s" % (new_id, val))
3778   return results
3779
3780
3781 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3782                          p_minor, s_minor):
3783   """Generate a drbd8 device complete with its children.
3784
3785   """
3786   port = lu.cfg.AllocatePort()
3787   vgname = lu.cfg.GetVGName()
3788   shared_secret = lu.cfg.GenerateDRBDSecret()
3789   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3790                           logical_id=(vgname, names[0]))
3791   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3792                           logical_id=(vgname, names[1]))
3793   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3794                           logical_id=(primary, secondary, port,
3795                                       p_minor, s_minor,
3796                                       shared_secret),
3797                           children=[dev_data, dev_meta],
3798                           iv_name=iv_name)
3799   return drbd_dev
3800
3801
3802 def _GenerateDiskTemplate(lu, template_name,
3803                           instance_name, primary_node,
3804                           secondary_nodes, disk_info,
3805                           file_storage_dir, file_driver,
3806                           base_index):
3807   """Generate the entire disk layout for a given template type.
3808
3809   """
3810   #TODO: compute space requirements
3811
3812   vgname = lu.cfg.GetVGName()
3813   disk_count = len(disk_info)
3814   disks = []
3815   if template_name == constants.DT_DISKLESS:
3816     pass
3817   elif template_name == constants.DT_PLAIN:
3818     if len(secondary_nodes) != 0:
3819       raise errors.ProgrammerError("Wrong template configuration")
3820
3821     names = _GenerateUniqueNames(lu, [".disk%d" % i
3822                                       for i in range(disk_count)])
3823     for idx, disk in enumerate(disk_info):
3824       disk_index = idx + base_index
3825       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3826                               logical_id=(vgname, names[idx]),
3827                               iv_name="disk/%d" % disk_index)
3828       disks.append(disk_dev)
3829   elif template_name == constants.DT_DRBD8:
3830     if len(secondary_nodes) != 1:
3831       raise errors.ProgrammerError("Wrong template configuration")
3832     remote_node = secondary_nodes[0]
3833     minors = lu.cfg.AllocateDRBDMinor(
3834       [primary_node, remote_node] * len(disk_info), instance_name)
3835
3836     names = []
3837     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
3838                                                for i in range(disk_count)]):
3839       names.append(lv_prefix + "_data")
3840       names.append(lv_prefix + "_meta")
3841     for idx, disk in enumerate(disk_info):
3842       disk_index = idx + base_index
3843       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3844                                       disk["size"], names[idx*2:idx*2+2],
3845                                       "disk/%d" % disk_index,
3846                                       minors[idx*2], minors[idx*2+1])
3847       disks.append(disk_dev)
3848   elif template_name == constants.DT_FILE:
3849     if len(secondary_nodes) != 0:
3850       raise errors.ProgrammerError("Wrong template configuration")
3851
3852     for idx, disk in enumerate(disk_info):
3853       disk_index = idx + base_index
3854       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3855                               iv_name="disk/%d" % disk_index,
3856                               logical_id=(file_driver,
3857                                           "%s/disk%d" % (file_storage_dir,
3858                                                          idx)))
3859       disks.append(disk_dev)
3860   else:
3861     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3862   return disks
3863
3864
3865 def _GetInstanceInfoText(instance):
3866   """Compute that text that should be added to the disk's metadata.
3867
3868   """
3869   return "originstname+%s" % instance.name
3870
3871
3872 def _CreateDisks(lu, instance):
3873   """Create all disks for an instance.
3874
3875   This abstracts away some work from AddInstance.
3876
3877   @type lu: L{LogicalUnit}
3878   @param lu: the logical unit on whose behalf we execute
3879   @type instance: L{objects.Instance}
3880   @param instance: the instance whose disks we should create
3881   @rtype: boolean
3882   @return: the success of the creation
3883
3884   """
3885   info = _GetInstanceInfoText(instance)
3886   pnode = instance.primary_node
3887
3888   if instance.disk_template == constants.DT_FILE:
3889     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3890     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
3891
3892     if result.failed or not result.data:
3893       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
3894
3895     if not result.data[0]:
3896       raise errors.OpExecError("Failed to create directory '%s'" %
3897                                file_storage_dir)
3898
3899   # Note: this needs to be kept in sync with adding of disks in
3900   # LUSetInstanceParams
3901   for device in instance.disks:
3902     logging.info("Creating volume %s for instance %s",
3903                  device.iv_name, instance.name)
3904     #HARDCODE
3905     for node in instance.all_nodes:
3906       f_create = node == pnode
3907       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
3908
3909
3910 def _RemoveDisks(lu, instance):
3911   """Remove all disks for an instance.
3912
3913   This abstracts away some work from `AddInstance()` and
3914   `RemoveInstance()`. Note that in case some of the devices couldn't
3915   be removed, the removal will continue with the other ones (compare
3916   with `_CreateDisks()`).
3917
3918   @type lu: L{LogicalUnit}
3919   @param lu: the logical unit on whose behalf we execute
3920   @type instance: L{objects.Instance}
3921   @param instance: the instance whose disks we should remove
3922   @rtype: boolean
3923   @return: the success of the removal
3924
3925   """
3926   logging.info("Removing block devices for instance %s", instance.name)
3927
3928   result = True
3929   for device in instance.disks:
3930     for node, disk in device.ComputeNodeTree(instance.primary_node):
3931       lu.cfg.SetDiskID(disk, node)
3932       result = lu.rpc.call_blockdev_remove(node, disk)
3933       if result.failed or not result.data:
3934         lu.proc.LogWarning("Could not remove block device %s on node %s,"
3935                            " continuing anyway", device.iv_name, node)
3936         result = False
3937
3938   if instance.disk_template == constants.DT_FILE:
3939     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3940     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3941                                                  file_storage_dir)
3942     if result.failed or not result.data:
3943       logging.error("Could not remove directory '%s'", file_storage_dir)
3944       result = False
3945
3946   return result
3947
3948
3949 def _ComputeDiskSize(disk_template, disks):
3950   """Compute disk size requirements in the volume group
3951
3952   """
3953   # Required free disk space as a function of disk and swap space
3954   req_size_dict = {
3955     constants.DT_DISKLESS: None,
3956     constants.DT_PLAIN: sum(d["size"] for d in disks),
3957     # 128 MB are added for drbd metadata for each disk
3958     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3959     constants.DT_FILE: None,
3960   }
3961
3962   if disk_template not in req_size_dict:
3963     raise errors.ProgrammerError("Disk template '%s' size requirement"
3964                                  " is unknown" %  disk_template)
3965
3966   return req_size_dict[disk_template]
3967
3968
3969 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3970   """Hypervisor parameter validation.
3971
3972   This function abstract the hypervisor parameter validation to be
3973   used in both instance create and instance modify.
3974
3975   @type lu: L{LogicalUnit}
3976   @param lu: the logical unit for which we check
3977   @type nodenames: list
3978   @param nodenames: the list of nodes on which we should check
3979   @type hvname: string
3980   @param hvname: the name of the hypervisor we should use
3981   @type hvparams: dict
3982   @param hvparams: the parameters which we need to check
3983   @raise errors.OpPrereqError: if the parameters are not valid
3984
3985   """
3986   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3987                                                   hvname,
3988                                                   hvparams)
3989   for node in nodenames:
3990     info = hvinfo[node]
3991     info.Raise()
3992     if not info.data or not isinstance(info.data, (tuple, list)):
3993       raise errors.OpPrereqError("Cannot get current information"
3994                                  " from node '%s' (%s)" % (node, info.data))
3995     if not info.data[0]:
3996       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3997                                  " %s" % info.data[1])
3998
3999
4000 class LUCreateInstance(LogicalUnit):
4001   """Create an instance.
4002
4003   """
4004   HPATH = "instance-add"
4005   HTYPE = constants.HTYPE_INSTANCE
4006   _OP_REQP = ["instance_name", "disks", "disk_template",
4007               "mode", "start",
4008               "wait_for_sync", "ip_check", "nics",
4009               "hvparams", "beparams"]
4010   REQ_BGL = False
4011
4012   def _ExpandNode(self, node):
4013     """Expands and checks one node name.
4014
4015     """
4016     node_full = self.cfg.ExpandNodeName(node)
4017     if node_full is None:
4018       raise errors.OpPrereqError("Unknown node %s" % node)
4019     return node_full
4020
4021   def ExpandNames(self):
4022     """ExpandNames for CreateInstance.
4023
4024     Figure out the right locks for instance creation.
4025
4026     """
4027     self.needed_locks = {}
4028
4029     # set optional parameters to none if they don't exist
4030     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4031       if not hasattr(self.op, attr):
4032         setattr(self.op, attr, None)
4033
4034     # cheap checks, mostly valid constants given
4035
4036     # verify creation mode
4037     if self.op.mode not in (constants.INSTANCE_CREATE,
4038                             constants.INSTANCE_IMPORT):
4039       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4040                                  self.op.mode)
4041
4042     # disk template and mirror node verification
4043     if self.op.disk_template not in constants.DISK_TEMPLATES:
4044       raise errors.OpPrereqError("Invalid disk template name")
4045
4046     if self.op.hypervisor is None:
4047       self.op.hypervisor = self.cfg.GetHypervisorType()
4048
4049     cluster = self.cfg.GetClusterInfo()
4050     enabled_hvs = cluster.enabled_hypervisors
4051     if self.op.hypervisor not in enabled_hvs:
4052       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4053                                  " cluster (%s)" % (self.op.hypervisor,
4054                                   ",".join(enabled_hvs)))
4055
4056     # check hypervisor parameter syntax (locally)
4057
4058     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4059                                   self.op.hvparams)
4060     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4061     hv_type.CheckParameterSyntax(filled_hvp)
4062
4063     # fill and remember the beparams dict
4064     utils.CheckBEParams(self.op.beparams)
4065     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4066                                     self.op.beparams)
4067
4068     #### instance parameters check
4069
4070     # instance name verification
4071     hostname1 = utils.HostInfo(self.op.instance_name)
4072     self.op.instance_name = instance_name = hostname1.name
4073
4074     # this is just a preventive check, but someone might still add this
4075     # instance in the meantime, and creation will fail at lock-add time
4076     if instance_name in self.cfg.GetInstanceList():
4077       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4078                                  instance_name)
4079
4080     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4081
4082     # NIC buildup
4083     self.nics = []
4084     for nic in self.op.nics:
4085       # ip validity checks
4086       ip = nic.get("ip", None)
4087       if ip is None or ip.lower() == "none":
4088         nic_ip = None
4089       elif ip.lower() == constants.VALUE_AUTO:
4090         nic_ip = hostname1.ip
4091       else:
4092         if not utils.IsValidIP(ip):
4093           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4094                                      " like a valid IP" % ip)
4095         nic_ip = ip
4096
4097       # MAC address verification
4098       mac = nic.get("mac", constants.VALUE_AUTO)
4099       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4100         if not utils.IsValidMac(mac.lower()):
4101           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4102                                      mac)
4103       # bridge verification
4104       bridge = nic.get("bridge", self.cfg.GetDefBridge())
4105       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4106
4107     # disk checks/pre-build
4108     self.disks = []
4109     for disk in self.op.disks:
4110       mode = disk.get("mode", constants.DISK_RDWR)
4111       if mode not in constants.DISK_ACCESS_SET:
4112         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4113                                    mode)
4114       size = disk.get("size", None)
4115       if size is None:
4116         raise errors.OpPrereqError("Missing disk size")
4117       try:
4118         size = int(size)
4119       except ValueError:
4120         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4121       self.disks.append({"size": size, "mode": mode})
4122
4123     # used in CheckPrereq for ip ping check
4124     self.check_ip = hostname1.ip
4125
4126     # file storage checks
4127     if (self.op.file_driver and
4128         not self.op.file_driver in constants.FILE_DRIVER):
4129       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4130                                  self.op.file_driver)
4131
4132     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4133       raise errors.OpPrereqError("File storage directory path not absolute")
4134
4135     ### Node/iallocator related checks
4136     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4137       raise errors.OpPrereqError("One and only one of iallocator and primary"
4138                                  " node must be given")
4139
4140     if self.op.iallocator:
4141       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4142     else:
4143       self.op.pnode = self._ExpandNode(self.op.pnode)
4144       nodelist = [self.op.pnode]
4145       if self.op.snode is not None:
4146         self.op.snode = self._ExpandNode(self.op.snode)
4147         nodelist.append(self.op.snode)
4148       self.needed_locks[locking.LEVEL_NODE] = nodelist
4149
4150     # in case of import lock the source node too
4151     if self.op.mode == constants.INSTANCE_IMPORT:
4152       src_node = getattr(self.op, "src_node", None)
4153       src_path = getattr(self.op, "src_path", None)
4154
4155       if src_path is None:
4156         self.op.src_path = src_path = self.op.instance_name
4157
4158       if src_node is None:
4159         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4160         self.op.src_node = None
4161         if os.path.isabs(src_path):
4162           raise errors.OpPrereqError("Importing an instance from an absolute"
4163                                      " path requires a source node option.")
4164       else:
4165         self.op.src_node = src_node = self._ExpandNode(src_node)
4166         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4167           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4168         if not os.path.isabs(src_path):
4169           self.op.src_path = src_path = \
4170             os.path.join(constants.EXPORT_DIR, src_path)
4171
4172     else: # INSTANCE_CREATE
4173       if getattr(self.op, "os_type", None) is None:
4174         raise errors.OpPrereqError("No guest OS specified")
4175
4176   def _RunAllocator(self):
4177     """Run the allocator based on input opcode.
4178
4179     """
4180     nics = [n.ToDict() for n in self.nics]
4181     ial = IAllocator(self,
4182                      mode=constants.IALLOCATOR_MODE_ALLOC,
4183                      name=self.op.instance_name,
4184                      disk_template=self.op.disk_template,
4185                      tags=[],
4186                      os=self.op.os_type,
4187                      vcpus=self.be_full[constants.BE_VCPUS],
4188                      mem_size=self.be_full[constants.BE_MEMORY],
4189                      disks=self.disks,
4190                      nics=nics,
4191                      hypervisor=self.op.hypervisor,
4192                      )
4193
4194     ial.Run(self.op.iallocator)
4195
4196     if not ial.success:
4197       raise errors.OpPrereqError("Can't compute nodes using"
4198                                  " iallocator '%s': %s" % (self.op.iallocator,
4199                                                            ial.info))
4200     if len(ial.nodes) != ial.required_nodes:
4201       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4202                                  " of nodes (%s), required %s" %
4203                                  (self.op.iallocator, len(ial.nodes),
4204                                   ial.required_nodes))
4205     self.op.pnode = ial.nodes[0]
4206     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4207                  self.op.instance_name, self.op.iallocator,
4208                  ", ".join(ial.nodes))
4209     if ial.required_nodes == 2:
4210       self.op.snode = ial.nodes[1]
4211
4212   def BuildHooksEnv(self):
4213     """Build hooks env.
4214
4215     This runs on master, primary and secondary nodes of the instance.
4216
4217     """
4218     env = {
4219       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
4220       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
4221       "INSTANCE_ADD_MODE": self.op.mode,
4222       }
4223     if self.op.mode == constants.INSTANCE_IMPORT:
4224       env["INSTANCE_SRC_NODE"] = self.op.src_node
4225       env["INSTANCE_SRC_PATH"] = self.op.src_path
4226       env["INSTANCE_SRC_IMAGES"] = self.src_images
4227
4228     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
4229       primary_node=self.op.pnode,
4230       secondary_nodes=self.secondaries,
4231       status=self.instance_status,
4232       os_type=self.op.os_type,
4233       memory=self.be_full[constants.BE_MEMORY],
4234       vcpus=self.be_full[constants.BE_VCPUS],
4235       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4236     ))
4237
4238     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4239           self.secondaries)
4240     return env, nl, nl
4241
4242
4243   def CheckPrereq(self):
4244     """Check prerequisites.
4245
4246     """
4247     if (not self.cfg.GetVGName() and
4248         self.op.disk_template not in constants.DTS_NOT_LVM):
4249       raise errors.OpPrereqError("Cluster does not support lvm-based"
4250                                  " instances")
4251
4252
4253     if self.op.mode == constants.INSTANCE_IMPORT:
4254       src_node = self.op.src_node
4255       src_path = self.op.src_path
4256
4257       if src_node is None:
4258         exp_list = self.rpc.call_export_list(
4259           self.acquired_locks[locking.LEVEL_NODE])
4260         found = False
4261         for node in exp_list:
4262           if not exp_list[node].failed and src_path in exp_list[node].data:
4263             found = True
4264             self.op.src_node = src_node = node
4265             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4266                                                        src_path)
4267             break
4268         if not found:
4269           raise errors.OpPrereqError("No export found for relative path %s" %
4270                                       src_path)
4271
4272       _CheckNodeOnline(self, src_node)
4273       result = self.rpc.call_export_info(src_node, src_path)
4274       result.Raise()
4275       if not result.data:
4276         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4277
4278       export_info = result.data
4279       if not export_info.has_section(constants.INISECT_EXP):
4280         raise errors.ProgrammerError("Corrupted export config")
4281
4282       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4283       if (int(ei_version) != constants.EXPORT_VERSION):
4284         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4285                                    (ei_version, constants.EXPORT_VERSION))
4286
4287       # Check that the new instance doesn't have less disks than the export
4288       instance_disks = len(self.disks)
4289       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4290       if instance_disks < export_disks:
4291         raise errors.OpPrereqError("Not enough disks to import."
4292                                    " (instance: %d, export: %d)" %
4293                                    (instance_disks, export_disks))
4294
4295       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4296       disk_images = []
4297       for idx in range(export_disks):
4298         option = 'disk%d_dump' % idx
4299         if export_info.has_option(constants.INISECT_INS, option):
4300           # FIXME: are the old os-es, disk sizes, etc. useful?
4301           export_name = export_info.get(constants.INISECT_INS, option)
4302           image = os.path.join(src_path, export_name)
4303           disk_images.append(image)
4304         else:
4305           disk_images.append(False)
4306
4307       self.src_images = disk_images
4308
4309       old_name = export_info.get(constants.INISECT_INS, 'name')
4310       # FIXME: int() here could throw a ValueError on broken exports
4311       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4312       if self.op.instance_name == old_name:
4313         for idx, nic in enumerate(self.nics):
4314           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4315             nic_mac_ini = 'nic%d_mac' % idx
4316             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4317
4318     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4319     if self.op.start and not self.op.ip_check:
4320       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4321                                  " adding an instance in start mode")
4322
4323     if self.op.ip_check:
4324       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4325         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4326                                    (self.check_ip, self.op.instance_name))
4327
4328     #### allocator run
4329
4330     if self.op.iallocator is not None:
4331       self._RunAllocator()
4332
4333     #### node related checks
4334
4335     # check primary node
4336     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4337     assert self.pnode is not None, \
4338       "Cannot retrieve locked node %s" % self.op.pnode
4339     if pnode.offline:
4340       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4341                                  pnode.name)
4342
4343     self.secondaries = []
4344
4345     # mirror node verification
4346     if self.op.disk_template in constants.DTS_NET_MIRROR:
4347       if self.op.snode is None:
4348         raise errors.OpPrereqError("The networked disk templates need"
4349                                    " a mirror node")
4350       if self.op.snode == pnode.name:
4351         raise errors.OpPrereqError("The secondary node cannot be"
4352                                    " the primary node.")
4353       self.secondaries.append(self.op.snode)
4354       _CheckNodeOnline(self, self.op.snode)
4355
4356     nodenames = [pnode.name] + self.secondaries
4357
4358     req_size = _ComputeDiskSize(self.op.disk_template,
4359                                 self.disks)
4360
4361     # Check lv size requirements
4362     if req_size is not None:
4363       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4364                                          self.op.hypervisor)
4365       for node in nodenames:
4366         info = nodeinfo[node]
4367         info.Raise()
4368         info = info.data
4369         if not info:
4370           raise errors.OpPrereqError("Cannot get current information"
4371                                      " from node '%s'" % node)
4372         vg_free = info.get('vg_free', None)
4373         if not isinstance(vg_free, int):
4374           raise errors.OpPrereqError("Can't compute free disk space on"
4375                                      " node %s" % node)
4376         if req_size > info['vg_free']:
4377           raise errors.OpPrereqError("Not enough disk space on target node %s."
4378                                      " %d MB available, %d MB required" %
4379                                      (node, info['vg_free'], req_size))
4380
4381     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4382
4383     # os verification
4384     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4385     result.Raise()
4386     if not isinstance(result.data, objects.OS):
4387       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4388                                  " primary node"  % self.op.os_type)
4389
4390     # bridge check on primary node
4391     bridges = [n.bridge for n in self.nics]
4392     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4393     result.Raise()
4394     if not result.data:
4395       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4396                                  " exist on destination node '%s'" %
4397                                  (",".join(bridges), pnode.name))
4398
4399     # memory check on primary node
4400     if self.op.start:
4401       _CheckNodeFreeMemory(self, self.pnode.name,
4402                            "creating instance %s" % self.op.instance_name,
4403                            self.be_full[constants.BE_MEMORY],
4404                            self.op.hypervisor)
4405
4406     if self.op.start:
4407       self.instance_status = 'up'
4408     else:
4409       self.instance_status = 'down'
4410
4411   def Exec(self, feedback_fn):
4412     """Create and add the instance to the cluster.
4413
4414     """
4415     instance = self.op.instance_name
4416     pnode_name = self.pnode.name
4417
4418     for nic in self.nics:
4419       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4420         nic.mac = self.cfg.GenerateMAC()
4421
4422     ht_kind = self.op.hypervisor
4423     if ht_kind in constants.HTS_REQ_PORT:
4424       network_port = self.cfg.AllocatePort()
4425     else:
4426       network_port = None
4427
4428     ##if self.op.vnc_bind_address is None:
4429     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4430
4431     # this is needed because os.path.join does not accept None arguments
4432     if self.op.file_storage_dir is None:
4433       string_file_storage_dir = ""
4434     else:
4435       string_file_storage_dir = self.op.file_storage_dir
4436
4437     # build the full file storage dir path
4438     file_storage_dir = os.path.normpath(os.path.join(
4439                                         self.cfg.GetFileStorageDir(),
4440                                         string_file_storage_dir, instance))
4441
4442
4443     disks = _GenerateDiskTemplate(self,
4444                                   self.op.disk_template,
4445                                   instance, pnode_name,
4446                                   self.secondaries,
4447                                   self.disks,
4448                                   file_storage_dir,
4449                                   self.op.file_driver,
4450                                   0)
4451
4452     iobj = objects.Instance(name=instance, os=self.op.os_type,
4453                             primary_node=pnode_name,
4454                             nics=self.nics, disks=disks,
4455                             disk_template=self.op.disk_template,
4456                             status=self.instance_status,
4457                             network_port=network_port,
4458                             beparams=self.op.beparams,
4459                             hvparams=self.op.hvparams,
4460                             hypervisor=self.op.hypervisor,
4461                             )
4462
4463     feedback_fn("* creating instance disks...")
4464     try:
4465       _CreateDisks(self, iobj)
4466     except errors.OpExecError:
4467       self.LogWarning("Device creation failed, reverting...")
4468       try:
4469         _RemoveDisks(self, iobj)
4470       finally:
4471         self.cfg.ReleaseDRBDMinors(instance)
4472         raise
4473
4474     feedback_fn("adding instance %s to cluster config" % instance)
4475
4476     self.cfg.AddInstance(iobj)
4477     # Declare that we don't want to remove the instance lock anymore, as we've
4478     # added the instance to the config
4479     del self.remove_locks[locking.LEVEL_INSTANCE]
4480     # Remove the temp. assignements for the instance's drbds
4481     self.cfg.ReleaseDRBDMinors(instance)
4482     # Unlock all the nodes
4483     if self.op.mode == constants.INSTANCE_IMPORT:
4484       nodes_keep = [self.op.src_node]
4485       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4486                        if node != self.op.src_node]
4487       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4488       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4489     else:
4490       self.context.glm.release(locking.LEVEL_NODE)
4491       del self.acquired_locks[locking.LEVEL_NODE]
4492
4493     if self.op.wait_for_sync:
4494       disk_abort = not _WaitForSync(self, iobj)
4495     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4496       # make sure the disks are not degraded (still sync-ing is ok)
4497       time.sleep(15)
4498       feedback_fn("* checking mirrors status")
4499       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4500     else:
4501       disk_abort = False
4502
4503     if disk_abort:
4504       _RemoveDisks(self, iobj)
4505       self.cfg.RemoveInstance(iobj.name)
4506       # Make sure the instance lock gets removed
4507       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4508       raise errors.OpExecError("There are some degraded disks for"
4509                                " this instance")
4510
4511     feedback_fn("creating os for instance %s on node %s" %
4512                 (instance, pnode_name))
4513
4514     if iobj.disk_template != constants.DT_DISKLESS:
4515       if self.op.mode == constants.INSTANCE_CREATE:
4516         feedback_fn("* running the instance OS create scripts...")
4517         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4518         result.Raise()
4519         if not result.data:
4520           raise errors.OpExecError("Could not add os for instance %s"
4521                                    " on node %s" %
4522                                    (instance, pnode_name))
4523
4524       elif self.op.mode == constants.INSTANCE_IMPORT:
4525         feedback_fn("* running the instance OS import scripts...")
4526         src_node = self.op.src_node
4527         src_images = self.src_images
4528         cluster_name = self.cfg.GetClusterName()
4529         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4530                                                          src_node, src_images,
4531                                                          cluster_name)
4532         import_result.Raise()
4533         for idx, result in enumerate(import_result.data):
4534           if not result:
4535             self.LogWarning("Could not import the image %s for instance"
4536                             " %s, disk %d, on node %s" %
4537                             (src_images[idx], instance, idx, pnode_name))
4538       else:
4539         # also checked in the prereq part
4540         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4541                                      % self.op.mode)
4542
4543     if self.op.start:
4544       logging.info("Starting instance %s on node %s", instance, pnode_name)
4545       feedback_fn("* starting instance...")
4546       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4547       result.Raise()
4548       if not result.data:
4549         raise errors.OpExecError("Could not start instance")
4550
4551
4552 class LUConnectConsole(NoHooksLU):
4553   """Connect to an instance's console.
4554
4555   This is somewhat special in that it returns the command line that
4556   you need to run on the master node in order to connect to the
4557   console.
4558
4559   """
4560   _OP_REQP = ["instance_name"]
4561   REQ_BGL = False
4562
4563   def ExpandNames(self):
4564     self._ExpandAndLockInstance()
4565
4566   def CheckPrereq(self):
4567     """Check prerequisites.
4568
4569     This checks that the instance is in the cluster.
4570
4571     """
4572     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4573     assert self.instance is not None, \
4574       "Cannot retrieve locked instance %s" % self.op.instance_name
4575     _CheckNodeOnline(self, self.instance.primary_node)
4576
4577   def Exec(self, feedback_fn):
4578     """Connect to the console of an instance
4579
4580     """
4581     instance = self.instance
4582     node = instance.primary_node
4583
4584     node_insts = self.rpc.call_instance_list([node],
4585                                              [instance.hypervisor])[node]
4586     node_insts.Raise()
4587
4588     if instance.name not in node_insts.data:
4589       raise errors.OpExecError("Instance %s is not running." % instance.name)
4590
4591     logging.debug("Connecting to console of %s on %s", instance.name, node)
4592
4593     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4594     console_cmd = hyper.GetShellCommandForConsole(instance)
4595
4596     # build ssh cmdline
4597     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4598
4599
4600 class LUReplaceDisks(LogicalUnit):
4601   """Replace the disks of an instance.
4602
4603   """
4604   HPATH = "mirrors-replace"
4605   HTYPE = constants.HTYPE_INSTANCE
4606   _OP_REQP = ["instance_name", "mode", "disks"]
4607   REQ_BGL = False
4608
4609   def CheckArguments(self):
4610     if not hasattr(self.op, "remote_node"):
4611       self.op.remote_node = None
4612     if not hasattr(self.op, "iallocator"):
4613       self.op.iallocator = None
4614
4615     # check for valid parameter combination
4616     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4617     if self.op.mode == constants.REPLACE_DISK_CHG:
4618       if cnt == 2:
4619         raise errors.OpPrereqError("When changing the secondary either an"
4620                                    " iallocator script must be used or the"
4621                                    " new node given")
4622       elif cnt == 0:
4623         raise errors.OpPrereqError("Give either the iallocator or the new"
4624                                    " secondary, not both")
4625     else: # not replacing the secondary
4626       if cnt != 2:
4627         raise errors.OpPrereqError("The iallocator and new node options can"
4628                                    " be used only when changing the"
4629                                    " secondary node")
4630
4631   def ExpandNames(self):
4632     self._ExpandAndLockInstance()
4633
4634     if self.op.iallocator is not None:
4635       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4636     elif self.op.remote_node is not None:
4637       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4638       if remote_node is None:
4639         raise errors.OpPrereqError("Node '%s' not known" %
4640                                    self.op.remote_node)
4641       self.op.remote_node = remote_node
4642       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4643       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4644     else:
4645       self.needed_locks[locking.LEVEL_NODE] = []
4646       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4647
4648   def DeclareLocks(self, level):
4649     # If we're not already locking all nodes in the set we have to declare the
4650     # instance's primary/secondary nodes.
4651     if (level == locking.LEVEL_NODE and
4652         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4653       self._LockInstancesNodes()
4654
4655   def _RunAllocator(self):
4656     """Compute a new secondary node using an IAllocator.
4657
4658     """
4659     ial = IAllocator(self,
4660                      mode=constants.IALLOCATOR_MODE_RELOC,
4661                      name=self.op.instance_name,
4662                      relocate_from=[self.sec_node])
4663
4664     ial.Run(self.op.iallocator)
4665
4666     if not ial.success:
4667       raise errors.OpPrereqError("Can't compute nodes using"
4668                                  " iallocator '%s': %s" % (self.op.iallocator,
4669                                                            ial.info))
4670     if len(ial.nodes) != ial.required_nodes:
4671       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4672                                  " of nodes (%s), required %s" %
4673                                  (len(ial.nodes), ial.required_nodes))
4674     self.op.remote_node = ial.nodes[0]
4675     self.LogInfo("Selected new secondary for the instance: %s",
4676                  self.op.remote_node)
4677
4678   def BuildHooksEnv(self):
4679     """Build hooks env.
4680
4681     This runs on the master, the primary and all the secondaries.
4682
4683     """
4684     env = {
4685       "MODE": self.op.mode,
4686       "NEW_SECONDARY": self.op.remote_node,
4687       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4688       }
4689     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4690     nl = [
4691       self.cfg.GetMasterNode(),
4692       self.instance.primary_node,
4693       ]
4694     if self.op.remote_node is not None:
4695       nl.append(self.op.remote_node)
4696     return env, nl, nl
4697
4698   def CheckPrereq(self):
4699     """Check prerequisites.
4700
4701     This checks that the instance is in the cluster.
4702
4703     """
4704     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4705     assert instance is not None, \
4706       "Cannot retrieve locked instance %s" % self.op.instance_name
4707     self.instance = instance
4708
4709     if instance.disk_template != constants.DT_DRBD8:
4710       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4711                                  " instances")
4712
4713     if len(instance.secondary_nodes) != 1:
4714       raise errors.OpPrereqError("The instance has a strange layout,"
4715                                  " expected one secondary but found %d" %
4716                                  len(instance.secondary_nodes))
4717
4718     self.sec_node = instance.secondary_nodes[0]
4719
4720     if self.op.iallocator is not None:
4721       self._RunAllocator()
4722
4723     remote_node = self.op.remote_node
4724     if remote_node is not None:
4725       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4726       assert self.remote_node_info is not None, \
4727         "Cannot retrieve locked node %s" % remote_node
4728     else:
4729       self.remote_node_info = None
4730     if remote_node == instance.primary_node:
4731       raise errors.OpPrereqError("The specified node is the primary node of"
4732                                  " the instance.")
4733     elif remote_node == self.sec_node:
4734       raise errors.OpPrereqError("The specified node is already the"
4735                                  " secondary node of the instance.")
4736
4737     if self.op.mode == constants.REPLACE_DISK_PRI:
4738       n1 = self.tgt_node = instance.primary_node
4739       n2 = self.oth_node = self.sec_node
4740     elif self.op.mode == constants.REPLACE_DISK_SEC:
4741       n1 = self.tgt_node = self.sec_node
4742       n2 = self.oth_node = instance.primary_node
4743     elif self.op.mode == constants.REPLACE_DISK_CHG:
4744       n1 = self.new_node = remote_node
4745       n2 = self.oth_node = instance.primary_node
4746       self.tgt_node = self.sec_node
4747     else:
4748       raise errors.ProgrammerError("Unhandled disk replace mode")
4749
4750     _CheckNodeOnline(self, n1)
4751     _CheckNodeOnline(self, n2)
4752
4753     if not self.op.disks:
4754       self.op.disks = range(len(instance.disks))
4755
4756     for disk_idx in self.op.disks:
4757       instance.FindDisk(disk_idx)
4758
4759   def _ExecD8DiskOnly(self, feedback_fn):
4760     """Replace a disk on the primary or secondary for dbrd8.
4761
4762     The algorithm for replace is quite complicated:
4763
4764       1. for each disk to be replaced:
4765
4766         1. create new LVs on the target node with unique names
4767         1. detach old LVs from the drbd device
4768         1. rename old LVs to name_replaced.<time_t>
4769         1. rename new LVs to old LVs
4770         1. attach the new LVs (with the old names now) to the drbd device
4771
4772       1. wait for sync across all devices
4773
4774       1. for each modified disk:
4775
4776         1. remove old LVs (which have the name name_replaces.<time_t>)
4777
4778     Failures are not very well handled.
4779
4780     """
4781     steps_total = 6
4782     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4783     instance = self.instance
4784     iv_names = {}
4785     vgname = self.cfg.GetVGName()
4786     # start of work
4787     cfg = self.cfg
4788     tgt_node = self.tgt_node
4789     oth_node = self.oth_node
4790
4791     # Step: check device activation
4792     self.proc.LogStep(1, steps_total, "check device existence")
4793     info("checking volume groups")
4794     my_vg = cfg.GetVGName()
4795     results = self.rpc.call_vg_list([oth_node, tgt_node])
4796     if not results:
4797       raise errors.OpExecError("Can't list volume groups on the nodes")
4798     for node in oth_node, tgt_node:
4799       res = results[node]
4800       if res.failed or not res.data or my_vg not in res.data:
4801         raise errors.OpExecError("Volume group '%s' not found on %s" %
4802                                  (my_vg, node))
4803     for idx, dev in enumerate(instance.disks):
4804       if idx not in self.op.disks:
4805         continue
4806       for node in tgt_node, oth_node:
4807         info("checking disk/%d on %s" % (idx, node))
4808         cfg.SetDiskID(dev, node)
4809         if not self.rpc.call_blockdev_find(node, dev):
4810           raise errors.OpExecError("Can't find disk/%d on node %s" %
4811                                    (idx, node))
4812
4813     # Step: check other node consistency
4814     self.proc.LogStep(2, steps_total, "check peer consistency")
4815     for idx, dev in enumerate(instance.disks):
4816       if idx not in self.op.disks:
4817         continue
4818       info("checking disk/%d consistency on %s" % (idx, oth_node))
4819       if not _CheckDiskConsistency(self, dev, oth_node,
4820                                    oth_node==instance.primary_node):
4821         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4822                                  " to replace disks on this node (%s)" %
4823                                  (oth_node, tgt_node))
4824
4825     # Step: create new storage
4826     self.proc.LogStep(3, steps_total, "allocate new storage")
4827     for idx, dev in enumerate(instance.disks):
4828       if idx not in self.op.disks:
4829         continue
4830       size = dev.size
4831       cfg.SetDiskID(dev, tgt_node)
4832       lv_names = [".disk%d_%s" % (idx, suf)
4833                   for suf in ["data", "meta"]]
4834       names = _GenerateUniqueNames(self, lv_names)
4835       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4836                              logical_id=(vgname, names[0]))
4837       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4838                              logical_id=(vgname, names[1]))
4839       new_lvs = [lv_data, lv_meta]
4840       old_lvs = dev.children
4841       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4842       info("creating new local storage on %s for %s" %
4843            (tgt_node, dev.iv_name))
4844       # we pass force_create=True to force the LVM creation
4845       for new_lv in new_lvs:
4846         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
4847                         _GetInstanceInfoText(instance), False)
4848
4849     # Step: for each lv, detach+rename*2+attach
4850     self.proc.LogStep(4, steps_total, "change drbd configuration")
4851     for dev, old_lvs, new_lvs in iv_names.itervalues():
4852       info("detaching %s drbd from local storage" % dev.iv_name)
4853       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4854       result.Raise()
4855       if not result.data:
4856         raise errors.OpExecError("Can't detach drbd from local storage on node"
4857                                  " %s for device %s" % (tgt_node, dev.iv_name))
4858       #dev.children = []
4859       #cfg.Update(instance)
4860
4861       # ok, we created the new LVs, so now we know we have the needed
4862       # storage; as such, we proceed on the target node to rename
4863       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4864       # using the assumption that logical_id == physical_id (which in
4865       # turn is the unique_id on that node)
4866
4867       # FIXME(iustin): use a better name for the replaced LVs
4868       temp_suffix = int(time.time())
4869       ren_fn = lambda d, suff: (d.physical_id[0],
4870                                 d.physical_id[1] + "_replaced-%s" % suff)
4871       # build the rename list based on what LVs exist on the node
4872       rlist = []
4873       for to_ren in old_lvs:
4874         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4875         if not find_res.failed and find_res.data is not None: # device exists
4876           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4877
4878       info("renaming the old LVs on the target node")
4879       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4880       result.Raise()
4881       if not result.data:
4882         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4883       # now we rename the new LVs to the old LVs
4884       info("renaming the new LVs on the target node")
4885       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4886       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4887       result.Raise()
4888       if not result.data:
4889         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4890
4891       for old, new in zip(old_lvs, new_lvs):
4892         new.logical_id = old.logical_id
4893         cfg.SetDiskID(new, tgt_node)
4894
4895       for disk in old_lvs:
4896         disk.logical_id = ren_fn(disk, temp_suffix)
4897         cfg.SetDiskID(disk, tgt_node)
4898
4899       # now that the new lvs have the old name, we can add them to the device
4900       info("adding new mirror component on %s" % tgt_node)
4901       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4902       if result.failed or not result.data:
4903         for new_lv in new_lvs:
4904           result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4905           if result.failed or not result.data:
4906             warning("Can't rollback device %s", hint="manually cleanup unused"
4907                     " logical volumes")
4908         raise errors.OpExecError("Can't add local storage to drbd")
4909
4910       dev.children = new_lvs
4911       cfg.Update(instance)
4912
4913     # Step: wait for sync
4914
4915     # this can fail as the old devices are degraded and _WaitForSync
4916     # does a combined result over all disks, so we don't check its
4917     # return value
4918     self.proc.LogStep(5, steps_total, "sync devices")
4919     _WaitForSync(self, instance, unlock=True)
4920
4921     # so check manually all the devices
4922     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4923       cfg.SetDiskID(dev, instance.primary_node)
4924       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4925       if result.failed or result.data[5]:
4926         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4927
4928     # Step: remove old storage
4929     self.proc.LogStep(6, steps_total, "removing old storage")
4930     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4931       info("remove logical volumes for %s" % name)
4932       for lv in old_lvs:
4933         cfg.SetDiskID(lv, tgt_node)
4934         result = self.rpc.call_blockdev_remove(tgt_node, lv)
4935         if result.failed or not result.data:
4936           warning("Can't remove old LV", hint="manually remove unused LVs")
4937           continue
4938
4939   def _ExecD8Secondary(self, feedback_fn):
4940     """Replace the secondary node for drbd8.
4941
4942     The algorithm for replace is quite complicated:
4943       - for all disks of the instance:
4944         - create new LVs on the new node with same names
4945         - shutdown the drbd device on the old secondary
4946         - disconnect the drbd network on the primary
4947         - create the drbd device on the new secondary
4948         - network attach the drbd on the primary, using an artifice:
4949           the drbd code for Attach() will connect to the network if it
4950           finds a device which is connected to the good local disks but
4951           not network enabled
4952       - wait for sync across all devices
4953       - remove all disks from the old secondary
4954
4955     Failures are not very well handled.
4956
4957     """
4958     steps_total = 6
4959     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4960     instance = self.instance
4961     iv_names = {}
4962     # start of work
4963     cfg = self.cfg
4964     old_node = self.tgt_node
4965     new_node = self.new_node
4966     pri_node = instance.primary_node
4967     nodes_ip = {
4968       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
4969       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
4970       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
4971       }
4972
4973     # Step: check device activation
4974     self.proc.LogStep(1, steps_total, "check device existence")
4975     info("checking volume groups")
4976     my_vg = cfg.GetVGName()
4977     results = self.rpc.call_vg_list([pri_node, new_node])
4978     for node in pri_node, new_node:
4979       res = results[node]
4980       if res.failed or not res.data or my_vg not in res.data:
4981         raise errors.OpExecError("Volume group '%s' not found on %s" %
4982                                  (my_vg, node))
4983     for idx, dev in enumerate(instance.disks):
4984       if idx not in self.op.disks:
4985         continue
4986       info("checking disk/%d on %s" % (idx, pri_node))
4987       cfg.SetDiskID(dev, pri_node)
4988       result = self.rpc.call_blockdev_find(pri_node, dev)
4989       result.Raise()
4990       if not result.data:
4991         raise errors.OpExecError("Can't find disk/%d on node %s" %
4992                                  (idx, pri_node))
4993
4994     # Step: check other node consistency
4995     self.proc.LogStep(2, steps_total, "check peer consistency")
4996     for idx, dev in enumerate(instance.disks):
4997       if idx not in self.op.disks:
4998         continue
4999       info("checking disk/%d consistency on %s" % (idx, pri_node))
5000       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5001         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5002                                  " unsafe to replace the secondary" %
5003                                  pri_node)
5004
5005     # Step: create new storage
5006     self.proc.LogStep(3, steps_total, "allocate new storage")
5007     for idx, dev in enumerate(instance.disks):
5008       info("adding new local storage on %s for disk/%d" %
5009            (new_node, idx))
5010       # we pass force_create=True to force LVM creation
5011       for new_lv in dev.children:
5012         _CreateBlockDev(self, new_node, instance, new_lv, True,
5013                         _GetInstanceInfoText(instance), False)
5014
5015     # Step 4: dbrd minors and drbd setups changes
5016     # after this, we must manually remove the drbd minors on both the
5017     # error and the success paths
5018     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5019                                    instance.name)
5020     logging.debug("Allocated minors %s" % (minors,))
5021     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5022     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5023       size = dev.size
5024       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5025       # create new devices on new_node; note that we create two IDs:
5026       # one without port, so the drbd will be activated without
5027       # networking information on the new node at this stage, and one
5028       # with network, for the latter activation in step 4
5029       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5030       if pri_node == o_node1:
5031         p_minor = o_minor1
5032       else:
5033         p_minor = o_minor2
5034
5035       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5036       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5037
5038       iv_names[idx] = (dev, dev.children, new_net_id)
5039       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5040                     new_net_id)
5041       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5042                               logical_id=new_alone_id,
5043                               children=dev.children)
5044       try:
5045         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5046                               _GetInstanceInfoText(instance), False)
5047       except error.BlockDeviceError:
5048         self.cfg.ReleaseDRBDMinors(instance.name)
5049         raise
5050
5051     for idx, dev in enumerate(instance.disks):
5052       # we have new devices, shutdown the drbd on the old secondary
5053       info("shutting down drbd for disk/%d on old node" % idx)
5054       cfg.SetDiskID(dev, old_node)
5055       result = self.rpc.call_blockdev_shutdown(old_node, dev)
5056       if result.failed or not result.data:
5057         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
5058                 hint="Please cleanup this device manually as soon as possible")
5059
5060     info("detaching primary drbds from the network (=> standalone)")
5061     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5062                                                instance.disks)[pri_node]
5063
5064     msg = result.RemoteFailMsg()
5065     if msg:
5066       # detaches didn't succeed (unlikely)
5067       self.cfg.ReleaseDRBDMinors(instance.name)
5068       raise errors.OpExecError("Can't detach the disks from the network on"
5069                                " old node: %s" % (msg,))
5070
5071     # if we managed to detach at least one, we update all the disks of
5072     # the instance to point to the new secondary
5073     info("updating instance configuration")
5074     for dev, _, new_logical_id in iv_names.itervalues():
5075       dev.logical_id = new_logical_id
5076       cfg.SetDiskID(dev, pri_node)
5077     cfg.Update(instance)
5078     # we can remove now the temp minors as now the new values are
5079     # written to the config file (and therefore stable)
5080     self.cfg.ReleaseDRBDMinors(instance.name)
5081
5082     # and now perform the drbd attach
5083     info("attaching primary drbds to new secondary (standalone => connected)")
5084     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5085                                            instance.disks, instance.name,
5086                                            False)
5087     for to_node, to_result in result.items():
5088       msg = to_result.RemoteFailMsg()
5089       if msg:
5090         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5091                 hint="please do a gnt-instance info to see the"
5092                 " status of disks")
5093
5094     # this can fail as the old devices are degraded and _WaitForSync
5095     # does a combined result over all disks, so we don't check its
5096     # return value
5097     self.proc.LogStep(5, steps_total, "sync devices")
5098     _WaitForSync(self, instance, unlock=True)
5099
5100     # so check manually all the devices
5101     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5102       cfg.SetDiskID(dev, pri_node)
5103       result = self.rpc.call_blockdev_find(pri_node, dev)
5104       result.Raise()
5105       if result.data[5]:
5106         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5107
5108     self.proc.LogStep(6, steps_total, "removing old storage")
5109     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5110       info("remove logical volumes for disk/%d" % idx)
5111       for lv in old_lvs:
5112         cfg.SetDiskID(lv, old_node)
5113         result = self.rpc.call_blockdev_remove(old_node, lv)
5114         if result.failed or not result.data:
5115           warning("Can't remove LV on old secondary",
5116                   hint="Cleanup stale volumes by hand")
5117
5118   def Exec(self, feedback_fn):
5119     """Execute disk replacement.
5120
5121     This dispatches the disk replacement to the appropriate handler.
5122
5123     """
5124     instance = self.instance
5125
5126     # Activate the instance disks if we're replacing them on a down instance
5127     if instance.status == "down":
5128       _StartInstanceDisks(self, instance, True)
5129
5130     if self.op.mode == constants.REPLACE_DISK_CHG:
5131       fn = self._ExecD8Secondary
5132     else:
5133       fn = self._ExecD8DiskOnly
5134
5135     ret = fn(feedback_fn)
5136
5137     # Deactivate the instance disks if we're replacing them on a down instance
5138     if instance.status == "down":
5139       _SafeShutdownInstanceDisks(self, instance)
5140
5141     return ret
5142
5143
5144 class LUGrowDisk(LogicalUnit):
5145   """Grow a disk of an instance.
5146
5147   """
5148   HPATH = "disk-grow"
5149   HTYPE = constants.HTYPE_INSTANCE
5150   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5151   REQ_BGL = False
5152
5153   def ExpandNames(self):
5154     self._ExpandAndLockInstance()
5155     self.needed_locks[locking.LEVEL_NODE] = []
5156     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5157
5158   def DeclareLocks(self, level):
5159     if level == locking.LEVEL_NODE:
5160       self._LockInstancesNodes()
5161
5162   def BuildHooksEnv(self):
5163     """Build hooks env.
5164
5165     This runs on the master, the primary and all the secondaries.
5166
5167     """
5168     env = {
5169       "DISK": self.op.disk,
5170       "AMOUNT": self.op.amount,
5171       }
5172     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5173     nl = [
5174       self.cfg.GetMasterNode(),
5175       self.instance.primary_node,
5176       ]
5177     return env, nl, nl
5178
5179   def CheckPrereq(self):
5180     """Check prerequisites.
5181
5182     This checks that the instance is in the cluster.
5183
5184     """
5185     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5186     assert instance is not None, \
5187       "Cannot retrieve locked instance %s" % self.op.instance_name
5188     _CheckNodeOnline(self, instance.primary_node)
5189     for node in instance.secondary_nodes:
5190       _CheckNodeOnline(self, node)
5191
5192
5193     self.instance = instance
5194
5195     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5196       raise errors.OpPrereqError("Instance's disk layout does not support"
5197                                  " growing.")
5198
5199     self.disk = instance.FindDisk(self.op.disk)
5200
5201     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
5202     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5203                                        instance.hypervisor)
5204     for node in nodenames:
5205       info = nodeinfo[node]
5206       if info.failed or not info.data:
5207         raise errors.OpPrereqError("Cannot get current information"
5208                                    " from node '%s'" % node)
5209       vg_free = info.data.get('vg_free', None)
5210       if not isinstance(vg_free, int):
5211         raise errors.OpPrereqError("Can't compute free disk space on"
5212                                    " node %s" % node)
5213       if self.op.amount > vg_free:
5214         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5215                                    " %d MiB available, %d MiB required" %
5216                                    (node, vg_free, self.op.amount))
5217
5218   def Exec(self, feedback_fn):
5219     """Execute disk grow.
5220
5221     """
5222     instance = self.instance
5223     disk = self.disk
5224     for node in (instance.secondary_nodes + (instance.primary_node,)):
5225       self.cfg.SetDiskID(disk, node)
5226       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5227       result.Raise()
5228       if (not result.data or not isinstance(result.data, (list, tuple)) or
5229           len(result.data) != 2):
5230         raise errors.OpExecError("Grow request failed to node %s" % node)
5231       elif not result.data[0]:
5232         raise errors.OpExecError("Grow request failed to node %s: %s" %
5233                                  (node, result.data[1]))
5234     disk.RecordGrow(self.op.amount)
5235     self.cfg.Update(instance)
5236     if self.op.wait_for_sync:
5237       disk_abort = not _WaitForSync(self, instance)
5238       if disk_abort:
5239         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5240                              " status.\nPlease check the instance.")
5241
5242
5243 class LUQueryInstanceData(NoHooksLU):
5244   """Query runtime instance data.
5245
5246   """
5247   _OP_REQP = ["instances", "static"]
5248   REQ_BGL = False
5249
5250   def ExpandNames(self):
5251     self.needed_locks = {}
5252     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5253
5254     if not isinstance(self.op.instances, list):
5255       raise errors.OpPrereqError("Invalid argument type 'instances'")
5256
5257     if self.op.instances:
5258       self.wanted_names = []
5259       for name in self.op.instances:
5260         full_name = self.cfg.ExpandInstanceName(name)
5261         if full_name is None:
5262           raise errors.OpPrereqError("Instance '%s' not known" % name)
5263         self.wanted_names.append(full_name)
5264       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5265     else:
5266       self.wanted_names = None
5267       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5268
5269     self.needed_locks[locking.LEVEL_NODE] = []
5270     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5271
5272   def DeclareLocks(self, level):
5273     if level == locking.LEVEL_NODE:
5274       self._LockInstancesNodes()
5275
5276   def CheckPrereq(self):
5277     """Check prerequisites.
5278
5279     This only checks the optional instance list against the existing names.
5280
5281     """
5282     if self.wanted_names is None:
5283       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5284
5285     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5286                              in self.wanted_names]
5287     return
5288
5289   def _ComputeDiskStatus(self, instance, snode, dev):
5290     """Compute block device status.
5291
5292     """
5293     static = self.op.static
5294     if not static:
5295       self.cfg.SetDiskID(dev, instance.primary_node)
5296       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5297       dev_pstatus.Raise()
5298       dev_pstatus = dev_pstatus.data
5299     else:
5300       dev_pstatus = None
5301
5302     if dev.dev_type in constants.LDS_DRBD:
5303       # we change the snode then (otherwise we use the one passed in)
5304       if dev.logical_id[0] == instance.primary_node:
5305         snode = dev.logical_id[1]
5306       else:
5307         snode = dev.logical_id[0]
5308
5309     if snode and not static:
5310       self.cfg.SetDiskID(dev, snode)
5311       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5312       dev_sstatus.Raise()
5313       dev_sstatus = dev_sstatus.data
5314     else:
5315       dev_sstatus = None
5316
5317     if dev.children:
5318       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5319                       for child in dev.children]
5320     else:
5321       dev_children = []
5322
5323     data = {
5324       "iv_name": dev.iv_name,
5325       "dev_type": dev.dev_type,
5326       "logical_id": dev.logical_id,
5327       "physical_id": dev.physical_id,
5328       "pstatus": dev_pstatus,
5329       "sstatus": dev_sstatus,
5330       "children": dev_children,
5331       "mode": dev.mode,
5332       }
5333
5334     return data
5335
5336   def Exec(self, feedback_fn):
5337     """Gather and return data"""
5338     result = {}
5339
5340     cluster = self.cfg.GetClusterInfo()
5341
5342     for instance in self.wanted_instances:
5343       if not self.op.static:
5344         remote_info = self.rpc.call_instance_info(instance.primary_node,
5345                                                   instance.name,
5346                                                   instance.hypervisor)
5347         remote_info.Raise()
5348         remote_info = remote_info.data
5349         if remote_info and "state" in remote_info:
5350           remote_state = "up"
5351         else:
5352           remote_state = "down"
5353       else:
5354         remote_state = None
5355       if instance.status == "down":
5356         config_state = "down"
5357       else:
5358         config_state = "up"
5359
5360       disks = [self._ComputeDiskStatus(instance, None, device)
5361                for device in instance.disks]
5362
5363       idict = {
5364         "name": instance.name,
5365         "config_state": config_state,
5366         "run_state": remote_state,
5367         "pnode": instance.primary_node,
5368         "snodes": instance.secondary_nodes,
5369         "os": instance.os,
5370         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5371         "disks": disks,
5372         "hypervisor": instance.hypervisor,
5373         "network_port": instance.network_port,
5374         "hv_instance": instance.hvparams,
5375         "hv_actual": cluster.FillHV(instance),
5376         "be_instance": instance.beparams,
5377         "be_actual": cluster.FillBE(instance),
5378         }
5379
5380       result[instance.name] = idict
5381
5382     return result
5383
5384
5385 class LUSetInstanceParams(LogicalUnit):
5386   """Modifies an instances's parameters.
5387
5388   """
5389   HPATH = "instance-modify"
5390   HTYPE = constants.HTYPE_INSTANCE
5391   _OP_REQP = ["instance_name"]
5392   REQ_BGL = False
5393
5394   def CheckArguments(self):
5395     if not hasattr(self.op, 'nics'):
5396       self.op.nics = []
5397     if not hasattr(self.op, 'disks'):
5398       self.op.disks = []
5399     if not hasattr(self.op, 'beparams'):
5400       self.op.beparams = {}
5401     if not hasattr(self.op, 'hvparams'):
5402       self.op.hvparams = {}
5403     self.op.force = getattr(self.op, "force", False)
5404     if not (self.op.nics or self.op.disks or
5405             self.op.hvparams or self.op.beparams):
5406       raise errors.OpPrereqError("No changes submitted")
5407
5408     utils.CheckBEParams(self.op.beparams)
5409
5410     # Disk validation
5411     disk_addremove = 0
5412     for disk_op, disk_dict in self.op.disks:
5413       if disk_op == constants.DDM_REMOVE:
5414         disk_addremove += 1
5415         continue
5416       elif disk_op == constants.DDM_ADD:
5417         disk_addremove += 1
5418       else:
5419         if not isinstance(disk_op, int):
5420           raise errors.OpPrereqError("Invalid disk index")
5421       if disk_op == constants.DDM_ADD:
5422         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5423         if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
5424           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5425         size = disk_dict.get('size', None)
5426         if size is None:
5427           raise errors.OpPrereqError("Required disk parameter size missing")
5428         try:
5429           size = int(size)
5430         except ValueError, err:
5431           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5432                                      str(err))
5433         disk_dict['size'] = size
5434       else:
5435         # modification of disk
5436         if 'size' in disk_dict:
5437           raise errors.OpPrereqError("Disk size change not possible, use"
5438                                      " grow-disk")
5439
5440     if disk_addremove > 1:
5441       raise errors.OpPrereqError("Only one disk add or remove operation"
5442                                  " supported at a time")
5443
5444     # NIC validation
5445     nic_addremove = 0
5446     for nic_op, nic_dict in self.op.nics:
5447       if nic_op == constants.DDM_REMOVE:
5448         nic_addremove += 1
5449         continue
5450       elif nic_op == constants.DDM_ADD:
5451         nic_addremove += 1
5452       else:
5453         if not isinstance(nic_op, int):
5454           raise errors.OpPrereqError("Invalid nic index")
5455
5456       # nic_dict should be a dict
5457       nic_ip = nic_dict.get('ip', None)
5458       if nic_ip is not None:
5459         if nic_ip.lower() == "none":
5460           nic_dict['ip'] = None
5461         else:
5462           if not utils.IsValidIP(nic_ip):
5463             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5464       # we can only check None bridges and assign the default one
5465       nic_bridge = nic_dict.get('bridge', None)
5466       if nic_bridge is None:
5467         nic_dict['bridge'] = self.cfg.GetDefBridge()
5468       # but we can validate MACs
5469       nic_mac = nic_dict.get('mac', None)
5470       if nic_mac is not None:
5471         if self.cfg.IsMacInUse(nic_mac):
5472           raise errors.OpPrereqError("MAC address %s already in use"
5473                                      " in cluster" % nic_mac)
5474         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5475           if not utils.IsValidMac(nic_mac):
5476             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5477     if nic_addremove > 1:
5478       raise errors.OpPrereqError("Only one NIC add or remove operation"
5479                                  " supported at a time")
5480
5481   def ExpandNames(self):
5482     self._ExpandAndLockInstance()
5483     self.needed_locks[locking.LEVEL_NODE] = []
5484     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5485
5486   def DeclareLocks(self, level):
5487     if level == locking.LEVEL_NODE:
5488       self._LockInstancesNodes()
5489
5490   def BuildHooksEnv(self):
5491     """Build hooks env.
5492
5493     This runs on the master, primary and secondaries.
5494
5495     """
5496     args = dict()
5497     if constants.BE_MEMORY in self.be_new:
5498       args['memory'] = self.be_new[constants.BE_MEMORY]
5499     if constants.BE_VCPUS in self.be_new:
5500       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5501     # FIXME: readd disk/nic changes
5502     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5503     nl = [self.cfg.GetMasterNode(),
5504           self.instance.primary_node] + list(self.instance.secondary_nodes)
5505     return env, nl, nl
5506
5507   def CheckPrereq(self):
5508     """Check prerequisites.
5509
5510     This only checks the instance list against the existing names.
5511
5512     """
5513     force = self.force = self.op.force
5514
5515     # checking the new params on the primary/secondary nodes
5516
5517     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5518     assert self.instance is not None, \
5519       "Cannot retrieve locked instance %s" % self.op.instance_name
5520     pnode = self.instance.primary_node
5521     nodelist = [pnode]
5522     nodelist.extend(instance.secondary_nodes)
5523
5524     # hvparams processing
5525     if self.op.hvparams:
5526       i_hvdict = copy.deepcopy(instance.hvparams)
5527       for key, val in self.op.hvparams.iteritems():
5528         if val == constants.VALUE_DEFAULT:
5529           try:
5530             del i_hvdict[key]
5531           except KeyError:
5532             pass
5533         elif val == constants.VALUE_NONE:
5534           i_hvdict[key] = None
5535         else:
5536           i_hvdict[key] = val
5537       cluster = self.cfg.GetClusterInfo()
5538       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5539                                 i_hvdict)
5540       # local check
5541       hypervisor.GetHypervisor(
5542         instance.hypervisor).CheckParameterSyntax(hv_new)
5543       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5544       self.hv_new = hv_new # the new actual values
5545       self.hv_inst = i_hvdict # the new dict (without defaults)
5546     else:
5547       self.hv_new = self.hv_inst = {}
5548
5549     # beparams processing
5550     if self.op.beparams:
5551       i_bedict = copy.deepcopy(instance.beparams)
5552       for key, val in self.op.beparams.iteritems():
5553         if val == constants.VALUE_DEFAULT:
5554           try:
5555             del i_bedict[key]
5556           except KeyError:
5557             pass
5558         else:
5559           i_bedict[key] = val
5560       cluster = self.cfg.GetClusterInfo()
5561       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5562                                 i_bedict)
5563       self.be_new = be_new # the new actual values
5564       self.be_inst = i_bedict # the new dict (without defaults)
5565     else:
5566       self.be_new = self.be_inst = {}
5567
5568     self.warn = []
5569
5570     if constants.BE_MEMORY in self.op.beparams and not self.force:
5571       mem_check_list = [pnode]
5572       if be_new[constants.BE_AUTO_BALANCE]:
5573         # either we changed auto_balance to yes or it was from before
5574         mem_check_list.extend(instance.secondary_nodes)
5575       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5576                                                   instance.hypervisor)
5577       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5578                                          instance.hypervisor)
5579       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5580         # Assume the primary node is unreachable and go ahead
5581         self.warn.append("Can't get info from primary node %s" % pnode)
5582       else:
5583         if not instance_info.failed and instance_info.data:
5584           current_mem = instance_info.data['memory']
5585         else:
5586           # Assume instance not running
5587           # (there is a slight race condition here, but it's not very probable,
5588           # and we have no other way to check)
5589           current_mem = 0
5590         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5591                     nodeinfo[pnode].data['memory_free'])
5592         if miss_mem > 0:
5593           raise errors.OpPrereqError("This change will prevent the instance"
5594                                      " from starting, due to %d MB of memory"
5595                                      " missing on its primary node" % miss_mem)
5596
5597       if be_new[constants.BE_AUTO_BALANCE]:
5598         for node, nres in nodeinfo.iteritems():
5599           if node not in instance.secondary_nodes:
5600             continue
5601           if nres.failed or not isinstance(nres.data, dict):
5602             self.warn.append("Can't get info from secondary node %s" % node)
5603           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5604             self.warn.append("Not enough memory to failover instance to"
5605                              " secondary node %s" % node)
5606
5607     # NIC processing
5608     for nic_op, nic_dict in self.op.nics:
5609       if nic_op == constants.DDM_REMOVE:
5610         if not instance.nics:
5611           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5612         continue
5613       if nic_op != constants.DDM_ADD:
5614         # an existing nic
5615         if nic_op < 0 or nic_op >= len(instance.nics):
5616           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5617                                      " are 0 to %d" %
5618                                      (nic_op, len(instance.nics)))
5619       nic_bridge = nic_dict.get('bridge', None)
5620       if nic_bridge is not None:
5621         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5622           msg = ("Bridge '%s' doesn't exist on one of"
5623                  " the instance nodes" % nic_bridge)
5624           if self.force:
5625             self.warn.append(msg)
5626           else:
5627             raise errors.OpPrereqError(msg)
5628
5629     # DISK processing
5630     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5631       raise errors.OpPrereqError("Disk operations not supported for"
5632                                  " diskless instances")
5633     for disk_op, disk_dict in self.op.disks:
5634       if disk_op == constants.DDM_REMOVE:
5635         if len(instance.disks) == 1:
5636           raise errors.OpPrereqError("Cannot remove the last disk of"
5637                                      " an instance")
5638         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5639         ins_l = ins_l[pnode]
5640         if ins_l.failed or not isinstance(ins_l.data, list):
5641           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5642         if instance.name in ins_l.data:
5643           raise errors.OpPrereqError("Instance is running, can't remove"
5644                                      " disks.")
5645
5646       if (disk_op == constants.DDM_ADD and
5647           len(instance.nics) >= constants.MAX_DISKS):
5648         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5649                                    " add more" % constants.MAX_DISKS)
5650       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5651         # an existing disk
5652         if disk_op < 0 or disk_op >= len(instance.disks):
5653           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5654                                      " are 0 to %d" %
5655                                      (disk_op, len(instance.disks)))
5656
5657     return
5658
5659   def Exec(self, feedback_fn):
5660     """Modifies an instance.
5661
5662     All parameters take effect only at the next restart of the instance.
5663
5664     """
5665     # Process here the warnings from CheckPrereq, as we don't have a
5666     # feedback_fn there.
5667     for warn in self.warn:
5668       feedback_fn("WARNING: %s" % warn)
5669
5670     result = []
5671     instance = self.instance
5672     # disk changes
5673     for disk_op, disk_dict in self.op.disks:
5674       if disk_op == constants.DDM_REMOVE:
5675         # remove the last disk
5676         device = instance.disks.pop()
5677         device_idx = len(instance.disks)
5678         for node, disk in device.ComputeNodeTree(instance.primary_node):
5679           self.cfg.SetDiskID(disk, node)
5680           rpc_result = self.rpc.call_blockdev_remove(node, disk)
5681           if rpc_result.failed or not rpc_result.data:
5682             self.proc.LogWarning("Could not remove disk/%d on node %s,"
5683                                  " continuing anyway", device_idx, node)
5684         result.append(("disk/%d" % device_idx, "remove"))
5685       elif disk_op == constants.DDM_ADD:
5686         # add a new disk
5687         if instance.disk_template == constants.DT_FILE:
5688           file_driver, file_path = instance.disks[0].logical_id
5689           file_path = os.path.dirname(file_path)
5690         else:
5691           file_driver = file_path = None
5692         disk_idx_base = len(instance.disks)
5693         new_disk = _GenerateDiskTemplate(self,
5694                                          instance.disk_template,
5695                                          instance, instance.primary_node,
5696                                          instance.secondary_nodes,
5697                                          [disk_dict],
5698                                          file_path,
5699                                          file_driver,
5700                                          disk_idx_base)[0]
5701         new_disk.mode = disk_dict['mode']
5702         instance.disks.append(new_disk)
5703         info = _GetInstanceInfoText(instance)
5704
5705         logging.info("Creating volume %s for instance %s",
5706                      new_disk.iv_name, instance.name)
5707         # Note: this needs to be kept in sync with _CreateDisks
5708         #HARDCODE
5709         for node in instance.all_nodes:
5710           f_create = node == instance.primary_node
5711           try:
5712             _CreateBlockDev(self, node, instance, new_disk,
5713                             f_create, info, f_create)
5714           except error.OpExecError, err:
5715             self.LogWarning("Failed to create volume %s (%s) on"
5716                             " node %s: %s",
5717                             new_disk.iv_name, new_disk, node, err)
5718         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5719                        (new_disk.size, new_disk.mode)))
5720       else:
5721         # change a given disk
5722         instance.disks[disk_op].mode = disk_dict['mode']
5723         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5724     # NIC changes
5725     for nic_op, nic_dict in self.op.nics:
5726       if nic_op == constants.DDM_REMOVE:
5727         # remove the last nic
5728         del instance.nics[-1]
5729         result.append(("nic.%d" % len(instance.nics), "remove"))
5730       elif nic_op == constants.DDM_ADD:
5731         # add a new nic
5732         if 'mac' not in nic_dict:
5733           mac = constants.VALUE_GENERATE
5734         else:
5735           mac = nic_dict['mac']
5736         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5737           mac = self.cfg.GenerateMAC()
5738         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5739                               bridge=nic_dict.get('bridge', None))
5740         instance.nics.append(new_nic)
5741         result.append(("nic.%d" % (len(instance.nics) - 1),
5742                        "add:mac=%s,ip=%s,bridge=%s" %
5743                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5744       else:
5745         # change a given nic
5746         for key in 'mac', 'ip', 'bridge':
5747           if key in nic_dict:
5748             setattr(instance.nics[nic_op], key, nic_dict[key])
5749             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5750
5751     # hvparams changes
5752     if self.op.hvparams:
5753       instance.hvparams = self.hv_new
5754       for key, val in self.op.hvparams.iteritems():
5755         result.append(("hv/%s" % key, val))
5756
5757     # beparams changes
5758     if self.op.beparams:
5759       instance.beparams = self.be_inst
5760       for key, val in self.op.beparams.iteritems():
5761         result.append(("be/%s" % key, val))
5762
5763     self.cfg.Update(instance)
5764
5765     return result
5766
5767
5768 class LUQueryExports(NoHooksLU):
5769   """Query the exports list
5770
5771   """
5772   _OP_REQP = ['nodes']
5773   REQ_BGL = False
5774
5775   def ExpandNames(self):
5776     self.needed_locks = {}
5777     self.share_locks[locking.LEVEL_NODE] = 1
5778     if not self.op.nodes:
5779       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5780     else:
5781       self.needed_locks[locking.LEVEL_NODE] = \
5782         _GetWantedNodes(self, self.op.nodes)
5783
5784   def CheckPrereq(self):
5785     """Check prerequisites.
5786
5787     """
5788     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5789
5790   def Exec(self, feedback_fn):
5791     """Compute the list of all the exported system images.
5792
5793     @rtype: dict
5794     @return: a dictionary with the structure node->(export-list)
5795         where export-list is a list of the instances exported on
5796         that node.
5797
5798     """
5799     rpcresult = self.rpc.call_export_list(self.nodes)
5800     result = {}
5801     for node in rpcresult:
5802       if rpcresult[node].failed:
5803         result[node] = False
5804       else:
5805         result[node] = rpcresult[node].data
5806
5807     return result
5808
5809
5810 class LUExportInstance(LogicalUnit):
5811   """Export an instance to an image in the cluster.
5812
5813   """
5814   HPATH = "instance-export"
5815   HTYPE = constants.HTYPE_INSTANCE
5816   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5817   REQ_BGL = False
5818
5819   def ExpandNames(self):
5820     self._ExpandAndLockInstance()
5821     # FIXME: lock only instance primary and destination node
5822     #
5823     # Sad but true, for now we have do lock all nodes, as we don't know where
5824     # the previous export might be, and and in this LU we search for it and
5825     # remove it from its current node. In the future we could fix this by:
5826     #  - making a tasklet to search (share-lock all), then create the new one,
5827     #    then one to remove, after
5828     #  - removing the removal operation altoghether
5829     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5830
5831   def DeclareLocks(self, level):
5832     """Last minute lock declaration."""
5833     # All nodes are locked anyway, so nothing to do here.
5834
5835   def BuildHooksEnv(self):
5836     """Build hooks env.
5837
5838     This will run on the master, primary node and target node.
5839
5840     """
5841     env = {
5842       "EXPORT_NODE": self.op.target_node,
5843       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5844       }
5845     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5846     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5847           self.op.target_node]
5848     return env, nl, nl
5849
5850   def CheckPrereq(self):
5851     """Check prerequisites.
5852
5853     This checks that the instance and node names are valid.
5854
5855     """
5856     instance_name = self.op.instance_name
5857     self.instance = self.cfg.GetInstanceInfo(instance_name)
5858     assert self.instance is not None, \
5859           "Cannot retrieve locked instance %s" % self.op.instance_name
5860     _CheckNodeOnline(self, self.instance.primary_node)
5861
5862     self.dst_node = self.cfg.GetNodeInfo(
5863       self.cfg.ExpandNodeName(self.op.target_node))
5864
5865     if self.dst_node is None:
5866       # This is wrong node name, not a non-locked node
5867       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5868     _CheckNodeOnline(self, self.dst_node.name)
5869
5870     # instance disk type verification
5871     for disk in self.instance.disks:
5872       if disk.dev_type == constants.LD_FILE:
5873         raise errors.OpPrereqError("Export not supported for instances with"
5874                                    " file-based disks")
5875
5876   def Exec(self, feedback_fn):
5877     """Export an instance to an image in the cluster.
5878
5879     """
5880     instance = self.instance
5881     dst_node = self.dst_node
5882     src_node = instance.primary_node
5883     if self.op.shutdown:
5884       # shutdown the instance, but not the disks
5885       result = self.rpc.call_instance_shutdown(src_node, instance)
5886       result.Raise()
5887       if not result.data:
5888         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5889                                  (instance.name, src_node))
5890
5891     vgname = self.cfg.GetVGName()
5892
5893     snap_disks = []
5894
5895     # set the disks ID correctly since call_instance_start needs the
5896     # correct drbd minor to create the symlinks
5897     for disk in instance.disks:
5898       self.cfg.SetDiskID(disk, src_node)
5899
5900     try:
5901       for disk in instance.disks:
5902         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5903         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5904         if new_dev_name.failed or not new_dev_name.data:
5905           self.LogWarning("Could not snapshot block device %s on node %s",
5906                           disk.logical_id[1], src_node)
5907           snap_disks.append(False)
5908         else:
5909           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5910                                  logical_id=(vgname, new_dev_name.data),
5911                                  physical_id=(vgname, new_dev_name.data),
5912                                  iv_name=disk.iv_name)
5913           snap_disks.append(new_dev)
5914
5915     finally:
5916       if self.op.shutdown and instance.status == "up":
5917         result = self.rpc.call_instance_start(src_node, instance, None)
5918         if result.failed or not result.data:
5919           _ShutdownInstanceDisks(self, instance)
5920           raise errors.OpExecError("Could not start instance")
5921
5922     # TODO: check for size
5923
5924     cluster_name = self.cfg.GetClusterName()
5925     for idx, dev in enumerate(snap_disks):
5926       if dev:
5927         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5928                                                instance, cluster_name, idx)
5929         if result.failed or not result.data:
5930           self.LogWarning("Could not export block device %s from node %s to"
5931                           " node %s", dev.logical_id[1], src_node,
5932                           dst_node.name)
5933         result = self.rpc.call_blockdev_remove(src_node, dev)
5934         if result.failed or not result.data:
5935           self.LogWarning("Could not remove snapshot block device %s from node"
5936                           " %s", dev.logical_id[1], src_node)
5937
5938     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5939     if result.failed or not result.data:
5940       self.LogWarning("Could not finalize export for instance %s on node %s",
5941                       instance.name, dst_node.name)
5942
5943     nodelist = self.cfg.GetNodeList()
5944     nodelist.remove(dst_node.name)
5945
5946     # on one-node clusters nodelist will be empty after the removal
5947     # if we proceed the backup would be removed because OpQueryExports
5948     # substitutes an empty list with the full cluster node list.
5949     if nodelist:
5950       exportlist = self.rpc.call_export_list(nodelist)
5951       for node in exportlist:
5952         if exportlist[node].failed:
5953           continue
5954         if instance.name in exportlist[node].data:
5955           if not self.rpc.call_export_remove(node, instance.name):
5956             self.LogWarning("Could not remove older export for instance %s"
5957                             " on node %s", instance.name, node)
5958
5959
5960 class LURemoveExport(NoHooksLU):
5961   """Remove exports related to the named instance.
5962
5963   """
5964   _OP_REQP = ["instance_name"]
5965   REQ_BGL = False
5966
5967   def ExpandNames(self):
5968     self.needed_locks = {}
5969     # We need all nodes to be locked in order for RemoveExport to work, but we
5970     # don't need to lock the instance itself, as nothing will happen to it (and
5971     # we can remove exports also for a removed instance)
5972     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5973
5974   def CheckPrereq(self):
5975     """Check prerequisites.
5976     """
5977     pass
5978
5979   def Exec(self, feedback_fn):
5980     """Remove any export.
5981
5982     """
5983     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5984     # If the instance was not found we'll try with the name that was passed in.
5985     # This will only work if it was an FQDN, though.
5986     fqdn_warn = False
5987     if not instance_name:
5988       fqdn_warn = True
5989       instance_name = self.op.instance_name
5990
5991     exportlist = self.rpc.call_export_list(self.acquired_locks[
5992       locking.LEVEL_NODE])
5993     found = False
5994     for node in exportlist:
5995       if exportlist[node].failed:
5996         self.LogWarning("Failed to query node %s, continuing" % node)
5997         continue
5998       if instance_name in exportlist[node].data:
5999         found = True
6000         result = self.rpc.call_export_remove(node, instance_name)
6001         if result.failed or not result.data:
6002           logging.error("Could not remove export for instance %s"
6003                         " on node %s", instance_name, node)
6004
6005     if fqdn_warn and not found:
6006       feedback_fn("Export not found. If trying to remove an export belonging"
6007                   " to a deleted instance please use its Fully Qualified"
6008                   " Domain Name.")
6009
6010
6011 class TagsLU(NoHooksLU):
6012   """Generic tags LU.
6013
6014   This is an abstract class which is the parent of all the other tags LUs.
6015
6016   """
6017
6018   def ExpandNames(self):
6019     self.needed_locks = {}
6020     if self.op.kind == constants.TAG_NODE:
6021       name = self.cfg.ExpandNodeName(self.op.name)
6022       if name is None:
6023         raise errors.OpPrereqError("Invalid node name (%s)" %
6024                                    (self.op.name,))
6025       self.op.name = name
6026       self.needed_locks[locking.LEVEL_NODE] = name
6027     elif self.op.kind == constants.TAG_INSTANCE:
6028       name = self.cfg.ExpandInstanceName(self.op.name)
6029       if name is None:
6030         raise errors.OpPrereqError("Invalid instance name (%s)" %
6031                                    (self.op.name,))
6032       self.op.name = name
6033       self.needed_locks[locking.LEVEL_INSTANCE] = name
6034
6035   def CheckPrereq(self):
6036     """Check prerequisites.
6037
6038     """
6039     if self.op.kind == constants.TAG_CLUSTER:
6040       self.target = self.cfg.GetClusterInfo()
6041     elif self.op.kind == constants.TAG_NODE:
6042       self.target = self.cfg.GetNodeInfo(self.op.name)
6043     elif self.op.kind == constants.TAG_INSTANCE:
6044       self.target = self.cfg.GetInstanceInfo(self.op.name)
6045     else:
6046       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6047                                  str(self.op.kind))
6048
6049
6050 class LUGetTags(TagsLU):
6051   """Returns the tags of a given object.
6052
6053   """
6054   _OP_REQP = ["kind", "name"]
6055   REQ_BGL = False
6056
6057   def Exec(self, feedback_fn):
6058     """Returns the tag list.
6059
6060     """
6061     return list(self.target.GetTags())
6062
6063
6064 class LUSearchTags(NoHooksLU):
6065   """Searches the tags for a given pattern.
6066
6067   """
6068   _OP_REQP = ["pattern"]
6069   REQ_BGL = False
6070
6071   def ExpandNames(self):
6072     self.needed_locks = {}
6073
6074   def CheckPrereq(self):
6075     """Check prerequisites.
6076
6077     This checks the pattern passed for validity by compiling it.
6078
6079     """
6080     try:
6081       self.re = re.compile(self.op.pattern)
6082     except re.error, err:
6083       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6084                                  (self.op.pattern, err))
6085
6086   def Exec(self, feedback_fn):
6087     """Returns the tag list.
6088
6089     """
6090     cfg = self.cfg
6091     tgts = [("/cluster", cfg.GetClusterInfo())]
6092     ilist = cfg.GetAllInstancesInfo().values()
6093     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6094     nlist = cfg.GetAllNodesInfo().values()
6095     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6096     results = []
6097     for path, target in tgts:
6098       for tag in target.GetTags():
6099         if self.re.search(tag):
6100           results.append((path, tag))
6101     return results
6102
6103
6104 class LUAddTags(TagsLU):
6105   """Sets a tag on a given object.
6106
6107   """
6108   _OP_REQP = ["kind", "name", "tags"]
6109   REQ_BGL = False
6110
6111   def CheckPrereq(self):
6112     """Check prerequisites.
6113
6114     This checks the type and length of the tag name and value.
6115
6116     """
6117     TagsLU.CheckPrereq(self)
6118     for tag in self.op.tags:
6119       objects.TaggableObject.ValidateTag(tag)
6120
6121   def Exec(self, feedback_fn):
6122     """Sets the tag.
6123
6124     """
6125     try:
6126       for tag in self.op.tags:
6127         self.target.AddTag(tag)
6128     except errors.TagError, err:
6129       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6130     try:
6131       self.cfg.Update(self.target)
6132     except errors.ConfigurationError:
6133       raise errors.OpRetryError("There has been a modification to the"
6134                                 " config file and the operation has been"
6135                                 " aborted. Please retry.")
6136
6137
6138 class LUDelTags(TagsLU):
6139   """Delete a list of tags from a given object.
6140
6141   """
6142   _OP_REQP = ["kind", "name", "tags"]
6143   REQ_BGL = False
6144
6145   def CheckPrereq(self):
6146     """Check prerequisites.
6147
6148     This checks that we have the given tag.
6149
6150     """
6151     TagsLU.CheckPrereq(self)
6152     for tag in self.op.tags:
6153       objects.TaggableObject.ValidateTag(tag)
6154     del_tags = frozenset(self.op.tags)
6155     cur_tags = self.target.GetTags()
6156     if not del_tags <= cur_tags:
6157       diff_tags = del_tags - cur_tags
6158       diff_names = ["'%s'" % tag for tag in diff_tags]
6159       diff_names.sort()
6160       raise errors.OpPrereqError("Tag(s) %s not found" %
6161                                  (",".join(diff_names)))
6162
6163   def Exec(self, feedback_fn):
6164     """Remove the tag from the object.
6165
6166     """
6167     for tag in self.op.tags:
6168       self.target.RemoveTag(tag)
6169     try:
6170       self.cfg.Update(self.target)
6171     except errors.ConfigurationError:
6172       raise errors.OpRetryError("There has been a modification to the"
6173                                 " config file and the operation has been"
6174                                 " aborted. Please retry.")
6175
6176
6177 class LUTestDelay(NoHooksLU):
6178   """Sleep for a specified amount of time.
6179
6180   This LU sleeps on the master and/or nodes for a specified amount of
6181   time.
6182
6183   """
6184   _OP_REQP = ["duration", "on_master", "on_nodes"]
6185   REQ_BGL = False
6186
6187   def ExpandNames(self):
6188     """Expand names and set required locks.
6189
6190     This expands the node list, if any.
6191
6192     """
6193     self.needed_locks = {}
6194     if self.op.on_nodes:
6195       # _GetWantedNodes can be used here, but is not always appropriate to use
6196       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6197       # more information.
6198       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6199       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6200
6201   def CheckPrereq(self):
6202     """Check prerequisites.
6203
6204     """
6205
6206   def Exec(self, feedback_fn):
6207     """Do the actual sleep.
6208
6209     """
6210     if self.op.on_master:
6211       if not utils.TestDelay(self.op.duration):
6212         raise errors.OpExecError("Error during master delay test")
6213     if self.op.on_nodes:
6214       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6215       if not result:
6216         raise errors.OpExecError("Complete failure from rpc call")
6217       for node, node_result in result.items():
6218         node_result.Raise()
6219         if not node_result.data:
6220           raise errors.OpExecError("Failure during rpc call to node %s,"
6221                                    " result: %s" % (node, node_result.data))
6222
6223
6224 class IAllocator(object):
6225   """IAllocator framework.
6226
6227   An IAllocator instance has three sets of attributes:
6228     - cfg that is needed to query the cluster
6229     - input data (all members of the _KEYS class attribute are required)
6230     - four buffer attributes (in|out_data|text), that represent the
6231       input (to the external script) in text and data structure format,
6232       and the output from it, again in two formats
6233     - the result variables from the script (success, info, nodes) for
6234       easy usage
6235
6236   """
6237   _ALLO_KEYS = [
6238     "mem_size", "disks", "disk_template",
6239     "os", "tags", "nics", "vcpus", "hypervisor",
6240     ]
6241   _RELO_KEYS = [
6242     "relocate_from",
6243     ]
6244
6245   def __init__(self, lu, mode, name, **kwargs):
6246     self.lu = lu
6247     # init buffer variables
6248     self.in_text = self.out_text = self.in_data = self.out_data = None
6249     # init all input fields so that pylint is happy
6250     self.mode = mode
6251     self.name = name
6252     self.mem_size = self.disks = self.disk_template = None
6253     self.os = self.tags = self.nics = self.vcpus = None
6254     self.hypervisor = None
6255     self.relocate_from = None
6256     # computed fields
6257     self.required_nodes = None
6258     # init result fields
6259     self.success = self.info = self.nodes = None
6260     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6261       keyset = self._ALLO_KEYS
6262     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6263       keyset = self._RELO_KEYS
6264     else:
6265       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6266                                    " IAllocator" % self.mode)
6267     for key in kwargs:
6268       if key not in keyset:
6269         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6270                                      " IAllocator" % key)
6271       setattr(self, key, kwargs[key])
6272     for key in keyset:
6273       if key not in kwargs:
6274         raise errors.ProgrammerError("Missing input parameter '%s' to"
6275                                      " IAllocator" % key)
6276     self._BuildInputData()
6277
6278   def _ComputeClusterData(self):
6279     """Compute the generic allocator input data.
6280
6281     This is the data that is independent of the actual operation.
6282
6283     """
6284     cfg = self.lu.cfg
6285     cluster_info = cfg.GetClusterInfo()
6286     # cluster data
6287     data = {
6288       "version": 1,
6289       "cluster_name": cfg.GetClusterName(),
6290       "cluster_tags": list(cluster_info.GetTags()),
6291       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
6292       # we don't have job IDs
6293       }
6294     iinfo = cfg.GetAllInstancesInfo().values()
6295     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6296
6297     # node data
6298     node_results = {}
6299     node_list = cfg.GetNodeList()
6300
6301     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6302       hypervisor_name = self.hypervisor
6303     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6304       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6305
6306     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6307                                            hypervisor_name)
6308     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6309                        cluster_info.enabled_hypervisors)
6310     for nname in node_list:
6311       ninfo = cfg.GetNodeInfo(nname)
6312       node_data[nname].Raise()
6313       if not isinstance(node_data[nname].data, dict):
6314         raise errors.OpExecError("Can't get data for node %s" % nname)
6315       remote_info = node_data[nname].data
6316       for attr in ['memory_total', 'memory_free', 'memory_dom0',
6317                    'vg_size', 'vg_free', 'cpu_total']:
6318         if attr not in remote_info:
6319           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
6320                                    (nname, attr))
6321         try:
6322           remote_info[attr] = int(remote_info[attr])
6323         except ValueError, err:
6324           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
6325                                    " %s" % (nname, attr, str(err)))
6326       # compute memory used by primary instances
6327       i_p_mem = i_p_up_mem = 0
6328       for iinfo, beinfo in i_list:
6329         if iinfo.primary_node == nname:
6330           i_p_mem += beinfo[constants.BE_MEMORY]
6331           if iinfo.name not in node_iinfo[nname]:
6332             i_used_mem = 0
6333           else:
6334             i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
6335           i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6336           remote_info['memory_free'] -= max(0, i_mem_diff)
6337
6338           if iinfo.status == "up":
6339             i_p_up_mem += beinfo[constants.BE_MEMORY]
6340
6341       # compute memory used by instances
6342       pnr = {
6343         "tags": list(ninfo.GetTags()),
6344         "total_memory": remote_info['memory_total'],
6345         "reserved_memory": remote_info['memory_dom0'],
6346         "free_memory": remote_info['memory_free'],
6347         "i_pri_memory": i_p_mem,
6348         "i_pri_up_memory": i_p_up_mem,
6349         "total_disk": remote_info['vg_size'],
6350         "free_disk": remote_info['vg_free'],
6351         "primary_ip": ninfo.primary_ip,
6352         "secondary_ip": ninfo.secondary_ip,
6353         "total_cpus": remote_info['cpu_total'],
6354         "offline": ninfo.offline,
6355         }
6356       node_results[nname] = pnr
6357     data["nodes"] = node_results
6358
6359     # instance data
6360     instance_data = {}
6361     for iinfo, beinfo in i_list:
6362       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6363                   for n in iinfo.nics]
6364       pir = {
6365         "tags": list(iinfo.GetTags()),
6366         "should_run": iinfo.status == "up",
6367         "vcpus": beinfo[constants.BE_VCPUS],
6368         "memory": beinfo[constants.BE_MEMORY],
6369         "os": iinfo.os,
6370         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6371         "nics": nic_data,
6372         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
6373         "disk_template": iinfo.disk_template,
6374         "hypervisor": iinfo.hypervisor,
6375         }
6376       instance_data[iinfo.name] = pir
6377
6378     data["instances"] = instance_data
6379
6380     self.in_data = data
6381
6382   def _AddNewInstance(self):
6383     """Add new instance data to allocator structure.
6384
6385     This in combination with _AllocatorGetClusterData will create the
6386     correct structure needed as input for the allocator.
6387
6388     The checks for the completeness of the opcode must have already been
6389     done.
6390
6391     """
6392     data = self.in_data
6393     if len(self.disks) != 2:
6394       raise errors.OpExecError("Only two-disk configurations supported")
6395
6396     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6397
6398     if self.disk_template in constants.DTS_NET_MIRROR:
6399       self.required_nodes = 2
6400     else:
6401       self.required_nodes = 1
6402     request = {
6403       "type": "allocate",
6404       "name": self.name,
6405       "disk_template": self.disk_template,
6406       "tags": self.tags,
6407       "os": self.os,
6408       "vcpus": self.vcpus,
6409       "memory": self.mem_size,
6410       "disks": self.disks,
6411       "disk_space_total": disk_space,
6412       "nics": self.nics,
6413       "required_nodes": self.required_nodes,
6414       }
6415     data["request"] = request
6416
6417   def _AddRelocateInstance(self):
6418     """Add relocate instance data to allocator structure.
6419
6420     This in combination with _IAllocatorGetClusterData will create the
6421     correct structure needed as input for the allocator.
6422
6423     The checks for the completeness of the opcode must have already been
6424     done.
6425
6426     """
6427     instance = self.lu.cfg.GetInstanceInfo(self.name)
6428     if instance is None:
6429       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6430                                    " IAllocator" % self.name)
6431
6432     if instance.disk_template not in constants.DTS_NET_MIRROR:
6433       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6434
6435     if len(instance.secondary_nodes) != 1:
6436       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6437
6438     self.required_nodes = 1
6439     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6440     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6441
6442     request = {
6443       "type": "relocate",
6444       "name": self.name,
6445       "disk_space_total": disk_space,
6446       "required_nodes": self.required_nodes,
6447       "relocate_from": self.relocate_from,
6448       }
6449     self.in_data["request"] = request
6450
6451   def _BuildInputData(self):
6452     """Build input data structures.
6453
6454     """
6455     self._ComputeClusterData()
6456
6457     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6458       self._AddNewInstance()
6459     else:
6460       self._AddRelocateInstance()
6461
6462     self.in_text = serializer.Dump(self.in_data)
6463
6464   def Run(self, name, validate=True, call_fn=None):
6465     """Run an instance allocator and return the results.
6466
6467     """
6468     if call_fn is None:
6469       call_fn = self.lu.rpc.call_iallocator_runner
6470     data = self.in_text
6471
6472     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6473     result.Raise()
6474
6475     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6476       raise errors.OpExecError("Invalid result from master iallocator runner")
6477
6478     rcode, stdout, stderr, fail = result.data
6479
6480     if rcode == constants.IARUN_NOTFOUND:
6481       raise errors.OpExecError("Can't find allocator '%s'" % name)
6482     elif rcode == constants.IARUN_FAILURE:
6483       raise errors.OpExecError("Instance allocator call failed: %s,"
6484                                " output: %s" % (fail, stdout+stderr))
6485     self.out_text = stdout
6486     if validate:
6487       self._ValidateResult()
6488
6489   def _ValidateResult(self):
6490     """Process the allocator results.
6491
6492     This will process and if successful save the result in
6493     self.out_data and the other parameters.
6494
6495     """
6496     try:
6497       rdict = serializer.Load(self.out_text)
6498     except Exception, err:
6499       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6500
6501     if not isinstance(rdict, dict):
6502       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6503
6504     for key in "success", "info", "nodes":
6505       if key not in rdict:
6506         raise errors.OpExecError("Can't parse iallocator results:"
6507                                  " missing key '%s'" % key)
6508       setattr(self, key, rdict[key])
6509
6510     if not isinstance(rdict["nodes"], list):
6511       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6512                                " is not a list")
6513     self.out_data = rdict
6514
6515
6516 class LUTestAllocator(NoHooksLU):
6517   """Run allocator tests.
6518
6519   This LU runs the allocator tests
6520
6521   """
6522   _OP_REQP = ["direction", "mode", "name"]
6523
6524   def CheckPrereq(self):
6525     """Check prerequisites.
6526
6527     This checks the opcode parameters depending on the director and mode test.
6528
6529     """
6530     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6531       for attr in ["name", "mem_size", "disks", "disk_template",
6532                    "os", "tags", "nics", "vcpus"]:
6533         if not hasattr(self.op, attr):
6534           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6535                                      attr)
6536       iname = self.cfg.ExpandInstanceName(self.op.name)
6537       if iname is not None:
6538         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6539                                    iname)
6540       if not isinstance(self.op.nics, list):
6541         raise errors.OpPrereqError("Invalid parameter 'nics'")
6542       for row in self.op.nics:
6543         if (not isinstance(row, dict) or
6544             "mac" not in row or
6545             "ip" not in row or
6546             "bridge" not in row):
6547           raise errors.OpPrereqError("Invalid contents of the"
6548                                      " 'nics' parameter")
6549       if not isinstance(self.op.disks, list):
6550         raise errors.OpPrereqError("Invalid parameter 'disks'")
6551       if len(self.op.disks) != 2:
6552         raise errors.OpPrereqError("Only two-disk configurations supported")
6553       for row in self.op.disks:
6554         if (not isinstance(row, dict) or
6555             "size" not in row or
6556             not isinstance(row["size"], int) or
6557             "mode" not in row or
6558             row["mode"] not in ['r', 'w']):
6559           raise errors.OpPrereqError("Invalid contents of the"
6560                                      " 'disks' parameter")
6561       if self.op.hypervisor is None:
6562         self.op.hypervisor = self.cfg.GetHypervisorType()
6563     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6564       if not hasattr(self.op, "name"):
6565         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6566       fname = self.cfg.ExpandInstanceName(self.op.name)
6567       if fname is None:
6568         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6569                                    self.op.name)
6570       self.op.name = fname
6571       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6572     else:
6573       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6574                                  self.op.mode)
6575
6576     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6577       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6578         raise errors.OpPrereqError("Missing allocator name")
6579     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6580       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6581                                  self.op.direction)
6582
6583   def Exec(self, feedback_fn):
6584     """Run the allocator test.
6585
6586     """
6587     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6588       ial = IAllocator(self,
6589                        mode=self.op.mode,
6590                        name=self.op.name,
6591                        mem_size=self.op.mem_size,
6592                        disks=self.op.disks,
6593                        disk_template=self.op.disk_template,
6594                        os=self.op.os,
6595                        tags=self.op.tags,
6596                        nics=self.op.nics,
6597                        vcpus=self.op.vcpus,
6598                        hypervisor=self.op.hypervisor,
6599                        )
6600     else:
6601       ial = IAllocator(self,
6602                        mode=self.op.mode,
6603                        name=self.op.name,
6604                        relocate_from=list(self.relocate_from),
6605                        )
6606
6607     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6608       result = ial.in_text
6609     else:
6610       ial.Run(self.op.allocator, validate=False)
6611       result = ial.out_text
6612     return result