Convert AddOSToInstance to (status, data)
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = lu.cfg.GetInstanceList()
396   return utils.NiceSort(wanted)
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _CheckBooleanOpField(op, name):
419   """Validates boolean opcode parameters.
420
421   This will ensure that an opcode parameter is either a boolean value,
422   or None (but that it always exists).
423
424   """
425   val = getattr(op, name, None)
426   if not (val is None or isinstance(val, bool)):
427     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428                                (name, str(val)))
429   setattr(op, name, val)
430
431
432 def _CheckNodeOnline(lu, node):
433   """Ensure that a given node is online.
434
435   @param lu: the LU on behalf of which we make the check
436   @param node: the node to check
437   @raise errors.OpPrereqError: if the nodes is offline
438
439   """
440   if lu.cfg.GetNodeInfo(node).offline:
441     raise errors.OpPrereqError("Can't use offline node %s" % node)
442
443
444 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445                           memory, vcpus, nics):
446   """Builds instance related env variables for hooks
447
448   This builds the hook environment from individual variables.
449
450   @type name: string
451   @param name: the name of the instance
452   @type primary_node: string
453   @param primary_node: the name of the instance's primary node
454   @type secondary_nodes: list
455   @param secondary_nodes: list of secondary nodes as strings
456   @type os_type: string
457   @param os_type: the name of the instance's OS
458   @type status: string
459   @param status: the desired status of the instances
460   @type memory: string
461   @param memory: the memory size of the instance
462   @type vcpus: string
463   @param vcpus: the count of VCPUs the instance has
464   @type nics: list
465   @param nics: list of tuples (ip, bridge, mac) representing
466       the NICs the instance  has
467   @rtype: dict
468   @return: the hook environment for this instance
469
470   """
471   env = {
472     "OP_TARGET": name,
473     "INSTANCE_NAME": name,
474     "INSTANCE_PRIMARY": primary_node,
475     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
476     "INSTANCE_OS_TYPE": os_type,
477     "INSTANCE_STATUS": status,
478     "INSTANCE_MEMORY": memory,
479     "INSTANCE_VCPUS": vcpus,
480   }
481
482   if nics:
483     nic_count = len(nics)
484     for idx, (ip, bridge, mac) in enumerate(nics):
485       if ip is None:
486         ip = ""
487       env["INSTANCE_NIC%d_IP" % idx] = ip
488       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
489       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
490   else:
491     nic_count = 0
492
493   env["INSTANCE_NIC_COUNT"] = nic_count
494
495   return env
496
497
498 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
499   """Builds instance related env variables for hooks from an object.
500
501   @type lu: L{LogicalUnit}
502   @param lu: the logical unit on whose behalf we execute
503   @type instance: L{objects.Instance}
504   @param instance: the instance for which we should build the
505       environment
506   @type override: dict
507   @param override: dictionary with key/values that will override
508       our values
509   @rtype: dict
510   @return: the hook environment dictionary
511
512   """
513   bep = lu.cfg.GetClusterInfo().FillBE(instance)
514   args = {
515     'name': instance.name,
516     'primary_node': instance.primary_node,
517     'secondary_nodes': instance.secondary_nodes,
518     'os_type': instance.os,
519     'status': instance.os,
520     'memory': bep[constants.BE_MEMORY],
521     'vcpus': bep[constants.BE_VCPUS],
522     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
523   }
524   if override:
525     args.update(override)
526   return _BuildInstanceHookEnv(**args)
527
528
529 def _AdjustCandidatePool(lu):
530   """Adjust the candidate pool after node operations.
531
532   """
533   mod_list = lu.cfg.MaintainCandidatePool()
534   if mod_list:
535     lu.LogInfo("Promoted nodes to master candidate role: %s",
536                ", ".join(node.name for node in mod_list))
537     for name in mod_list:
538       lu.context.ReaddNode(name)
539   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
540   if mc_now > mc_max:
541     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
542                (mc_now, mc_max))
543
544
545 def _CheckInstanceBridgesExist(lu, instance):
546   """Check that the brigdes needed by an instance exist.
547
548   """
549   # check bridges existance
550   brlist = [nic.bridge for nic in instance.nics]
551   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
552   result.Raise()
553   if not result.data:
554     raise errors.OpPrereqError("One or more target bridges %s does not"
555                                " exist on destination node '%s'" %
556                                (brlist, instance.primary_node))
557
558
559 class LUDestroyCluster(NoHooksLU):
560   """Logical unit for destroying the cluster.
561
562   """
563   _OP_REQP = []
564
565   def CheckPrereq(self):
566     """Check prerequisites.
567
568     This checks whether the cluster is empty.
569
570     Any errors are signalled by raising errors.OpPrereqError.
571
572     """
573     master = self.cfg.GetMasterNode()
574
575     nodelist = self.cfg.GetNodeList()
576     if len(nodelist) != 1 or nodelist[0] != master:
577       raise errors.OpPrereqError("There are still %d node(s) in"
578                                  " this cluster." % (len(nodelist) - 1))
579     instancelist = self.cfg.GetInstanceList()
580     if instancelist:
581       raise errors.OpPrereqError("There are still %d instance(s) in"
582                                  " this cluster." % len(instancelist))
583
584   def Exec(self, feedback_fn):
585     """Destroys the cluster.
586
587     """
588     master = self.cfg.GetMasterNode()
589     result = self.rpc.call_node_stop_master(master, False)
590     result.Raise()
591     if not result.data:
592       raise errors.OpExecError("Could not disable the master role")
593     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
594     utils.CreateBackup(priv_key)
595     utils.CreateBackup(pub_key)
596     return master
597
598
599 class LUVerifyCluster(LogicalUnit):
600   """Verifies the cluster status.
601
602   """
603   HPATH = "cluster-verify"
604   HTYPE = constants.HTYPE_CLUSTER
605   _OP_REQP = ["skip_checks"]
606   REQ_BGL = False
607
608   def ExpandNames(self):
609     self.needed_locks = {
610       locking.LEVEL_NODE: locking.ALL_SET,
611       locking.LEVEL_INSTANCE: locking.ALL_SET,
612     }
613     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
614
615   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
616                   node_result, feedback_fn, master_files):
617     """Run multiple tests against a node.
618
619     Test list:
620
621       - compares ganeti version
622       - checks vg existance and size > 20G
623       - checks config file checksum
624       - checks ssh to other nodes
625
626     @type nodeinfo: L{objects.Node}
627     @param nodeinfo: the node to check
628     @param file_list: required list of files
629     @param local_cksum: dictionary of local files and their checksums
630     @param node_result: the results from the node
631     @param feedback_fn: function used to accumulate results
632     @param master_files: list of files that only masters should have
633
634     """
635     node = nodeinfo.name
636
637     # main result, node_result should be a non-empty dict
638     if not node_result or not isinstance(node_result, dict):
639       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
640       return True
641
642     # compares ganeti version
643     local_version = constants.PROTOCOL_VERSION
644     remote_version = node_result.get('version', None)
645     if not remote_version:
646       feedback_fn("  - ERROR: connection to %s failed" % (node))
647       return True
648
649     if local_version != remote_version:
650       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
651                       (local_version, node, remote_version))
652       return True
653
654     # checks vg existance and size > 20G
655
656     bad = False
657     vglist = node_result.get(constants.NV_VGLIST, None)
658     if not vglist:
659       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
660                       (node,))
661       bad = True
662     else:
663       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
664                                             constants.MIN_VG_SIZE)
665       if vgstatus:
666         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
667         bad = True
668
669     # checks config file checksum
670
671     remote_cksum = node_result.get(constants.NV_FILELIST, None)
672     if not isinstance(remote_cksum, dict):
673       bad = True
674       feedback_fn("  - ERROR: node hasn't returned file checksum data")
675     else:
676       for file_name in file_list:
677         node_is_mc = nodeinfo.master_candidate
678         must_have_file = file_name not in master_files
679         if file_name not in remote_cksum:
680           if node_is_mc or must_have_file:
681             bad = True
682             feedback_fn("  - ERROR: file '%s' missing" % file_name)
683         elif remote_cksum[file_name] != local_cksum[file_name]:
684           if node_is_mc or must_have_file:
685             bad = True
686             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
687           else:
688             # not candidate and this is not a must-have file
689             bad = True
690             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
691                         " '%s'" % file_name)
692         else:
693           # all good, except non-master/non-must have combination
694           if not node_is_mc and not must_have_file:
695             feedback_fn("  - ERROR: file '%s' should not exist on non master"
696                         " candidates" % file_name)
697
698     # checks ssh to any
699
700     if constants.NV_NODELIST not in node_result:
701       bad = True
702       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
703     else:
704       if node_result[constants.NV_NODELIST]:
705         bad = True
706         for node in node_result[constants.NV_NODELIST]:
707           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
708                           (node, node_result[constants.NV_NODELIST][node]))
709
710     if constants.NV_NODENETTEST not in node_result:
711       bad = True
712       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
713     else:
714       if node_result[constants.NV_NODENETTEST]:
715         bad = True
716         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
717         for node in nlist:
718           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
719                           (node, node_result[constants.NV_NODENETTEST][node]))
720
721     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
722     if isinstance(hyp_result, dict):
723       for hv_name, hv_result in hyp_result.iteritems():
724         if hv_result is not None:
725           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
726                       (hv_name, hv_result))
727     return bad
728
729   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
730                       node_instance, feedback_fn, n_offline):
731     """Verify an instance.
732
733     This function checks to see if the required block devices are
734     available on the instance's node.
735
736     """
737     bad = False
738
739     node_current = instanceconfig.primary_node
740
741     node_vol_should = {}
742     instanceconfig.MapLVsByNode(node_vol_should)
743
744     for node in node_vol_should:
745       if node in n_offline:
746         # ignore missing volumes on offline nodes
747         continue
748       for volume in node_vol_should[node]:
749         if node not in node_vol_is or volume not in node_vol_is[node]:
750           feedback_fn("  - ERROR: volume %s missing on node %s" %
751                           (volume, node))
752           bad = True
753
754     if not instanceconfig.status == 'down':
755       if ((node_current not in node_instance or
756           not instance in node_instance[node_current]) and
757           node_current not in n_offline):
758         feedback_fn("  - ERROR: instance %s not running on node %s" %
759                         (instance, node_current))
760         bad = True
761
762     for node in node_instance:
763       if (not node == node_current):
764         if instance in node_instance[node]:
765           feedback_fn("  - ERROR: instance %s should not run on node %s" %
766                           (instance, node))
767           bad = True
768
769     return bad
770
771   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
772     """Verify if there are any unknown volumes in the cluster.
773
774     The .os, .swap and backup volumes are ignored. All other volumes are
775     reported as unknown.
776
777     """
778     bad = False
779
780     for node in node_vol_is:
781       for volume in node_vol_is[node]:
782         if node not in node_vol_should or volume not in node_vol_should[node]:
783           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
784                       (volume, node))
785           bad = True
786     return bad
787
788   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
789     """Verify the list of running instances.
790
791     This checks what instances are running but unknown to the cluster.
792
793     """
794     bad = False
795     for node in node_instance:
796       for runninginstance in node_instance[node]:
797         if runninginstance not in instancelist:
798           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
799                           (runninginstance, node))
800           bad = True
801     return bad
802
803   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
804     """Verify N+1 Memory Resilience.
805
806     Check that if one single node dies we can still start all the instances it
807     was primary for.
808
809     """
810     bad = False
811
812     for node, nodeinfo in node_info.iteritems():
813       # This code checks that every node which is now listed as secondary has
814       # enough memory to host all instances it is supposed to should a single
815       # other node in the cluster fail.
816       # FIXME: not ready for failover to an arbitrary node
817       # FIXME: does not support file-backed instances
818       # WARNING: we currently take into account down instances as well as up
819       # ones, considering that even if they're down someone might want to start
820       # them even in the event of a node failure.
821       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
822         needed_mem = 0
823         for instance in instances:
824           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
825           if bep[constants.BE_AUTO_BALANCE]:
826             needed_mem += bep[constants.BE_MEMORY]
827         if nodeinfo['mfree'] < needed_mem:
828           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
829                       " failovers should node %s fail" % (node, prinode))
830           bad = True
831     return bad
832
833   def CheckPrereq(self):
834     """Check prerequisites.
835
836     Transform the list of checks we're going to skip into a set and check that
837     all its members are valid.
838
839     """
840     self.skip_set = frozenset(self.op.skip_checks)
841     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
842       raise errors.OpPrereqError("Invalid checks to be skipped specified")
843
844   def BuildHooksEnv(self):
845     """Build hooks env.
846
847     Cluster-Verify hooks just rone in the post phase and their failure makes
848     the output be logged in the verify output and the verification to fail.
849
850     """
851     all_nodes = self.cfg.GetNodeList()
852     # TODO: populate the environment with useful information for verify hooks
853     env = {}
854     return env, [], all_nodes
855
856   def Exec(self, feedback_fn):
857     """Verify integrity of cluster, performing various test on nodes.
858
859     """
860     bad = False
861     feedback_fn("* Verifying global settings")
862     for msg in self.cfg.VerifyConfig():
863       feedback_fn("  - ERROR: %s" % msg)
864
865     vg_name = self.cfg.GetVGName()
866     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
867     nodelist = utils.NiceSort(self.cfg.GetNodeList())
868     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
869     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
870     i_non_redundant = [] # Non redundant instances
871     i_non_a_balanced = [] # Non auto-balanced instances
872     n_offline = [] # List of offline nodes
873     node_volume = {}
874     node_instance = {}
875     node_info = {}
876     instance_cfg = {}
877
878     # FIXME: verify OS list
879     # do local checksums
880     master_files = [constants.CLUSTER_CONF_FILE]
881
882     file_names = ssconf.SimpleStore().GetFileList()
883     file_names.append(constants.SSL_CERT_FILE)
884     file_names.append(constants.RAPI_CERT_FILE)
885     file_names.extend(master_files)
886
887     local_checksums = utils.FingerprintFiles(file_names)
888
889     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
890     node_verify_param = {
891       constants.NV_FILELIST: file_names,
892       constants.NV_NODELIST: [node.name for node in nodeinfo
893                               if not node.offline],
894       constants.NV_HYPERVISOR: hypervisors,
895       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
896                                   node.secondary_ip) for node in nodeinfo
897                                  if not node.offline],
898       constants.NV_LVLIST: vg_name,
899       constants.NV_INSTANCELIST: hypervisors,
900       constants.NV_VGLIST: None,
901       constants.NV_VERSION: None,
902       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
903       }
904     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
905                                            self.cfg.GetClusterName())
906
907     cluster = self.cfg.GetClusterInfo()
908     master_node = self.cfg.GetMasterNode()
909     for node_i in nodeinfo:
910       node = node_i.name
911       nresult = all_nvinfo[node].data
912
913       if node_i.offline:
914         feedback_fn("* Skipping offline node %s" % (node,))
915         n_offline.append(node)
916         continue
917
918       if node == master_node:
919         ntype = "master"
920       elif node_i.master_candidate:
921         ntype = "master candidate"
922       else:
923         ntype = "regular"
924       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
925
926       if all_nvinfo[node].failed or not isinstance(nresult, dict):
927         feedback_fn("  - ERROR: connection to %s failed" % (node,))
928         bad = True
929         continue
930
931       result = self._VerifyNode(node_i, file_names, local_checksums,
932                                 nresult, feedback_fn, master_files)
933       bad = bad or result
934
935       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
936       if isinstance(lvdata, basestring):
937         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
938                     (node, lvdata.encode('string_escape')))
939         bad = True
940         node_volume[node] = {}
941       elif not isinstance(lvdata, dict):
942         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
943         bad = True
944         continue
945       else:
946         node_volume[node] = lvdata
947
948       # node_instance
949       idata = nresult.get(constants.NV_INSTANCELIST, None)
950       if not isinstance(idata, list):
951         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
952                     (node,))
953         bad = True
954         continue
955
956       node_instance[node] = idata
957
958       # node_info
959       nodeinfo = nresult.get(constants.NV_HVINFO, None)
960       if not isinstance(nodeinfo, dict):
961         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
962         bad = True
963         continue
964
965       try:
966         node_info[node] = {
967           "mfree": int(nodeinfo['memory_free']),
968           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
969           "pinst": [],
970           "sinst": [],
971           # dictionary holding all instances this node is secondary for,
972           # grouped by their primary node. Each key is a cluster node, and each
973           # value is a list of instances which have the key as primary and the
974           # current node as secondary.  this is handy to calculate N+1 memory
975           # availability if you can only failover from a primary to its
976           # secondary.
977           "sinst-by-pnode": {},
978         }
979       except ValueError:
980         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
981         bad = True
982         continue
983
984     node_vol_should = {}
985
986     for instance in instancelist:
987       feedback_fn("* Verifying instance %s" % instance)
988       inst_config = self.cfg.GetInstanceInfo(instance)
989       result =  self._VerifyInstance(instance, inst_config, node_volume,
990                                      node_instance, feedback_fn, n_offline)
991       bad = bad or result
992       inst_nodes_offline = []
993
994       inst_config.MapLVsByNode(node_vol_should)
995
996       instance_cfg[instance] = inst_config
997
998       pnode = inst_config.primary_node
999       if pnode in node_info:
1000         node_info[pnode]['pinst'].append(instance)
1001       elif pnode not in n_offline:
1002         feedback_fn("  - ERROR: instance %s, connection to primary node"
1003                     " %s failed" % (instance, pnode))
1004         bad = True
1005
1006       if pnode in n_offline:
1007         inst_nodes_offline.append(pnode)
1008
1009       # If the instance is non-redundant we cannot survive losing its primary
1010       # node, so we are not N+1 compliant. On the other hand we have no disk
1011       # templates with more than one secondary so that situation is not well
1012       # supported either.
1013       # FIXME: does not support file-backed instances
1014       if len(inst_config.secondary_nodes) == 0:
1015         i_non_redundant.append(instance)
1016       elif len(inst_config.secondary_nodes) > 1:
1017         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1018                     % instance)
1019
1020       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1021         i_non_a_balanced.append(instance)
1022
1023       for snode in inst_config.secondary_nodes:
1024         if snode in node_info:
1025           node_info[snode]['sinst'].append(instance)
1026           if pnode not in node_info[snode]['sinst-by-pnode']:
1027             node_info[snode]['sinst-by-pnode'][pnode] = []
1028           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1029         elif snode not in n_offline:
1030           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1031                       " %s failed" % (instance, snode))
1032           bad = True
1033         if snode in n_offline:
1034           inst_nodes_offline.append(snode)
1035
1036       if inst_nodes_offline:
1037         # warn that the instance lives on offline nodes, and set bad=True
1038         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1039                     ", ".join(inst_nodes_offline))
1040         bad = True
1041
1042     feedback_fn("* Verifying orphan volumes")
1043     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1044                                        feedback_fn)
1045     bad = bad or result
1046
1047     feedback_fn("* Verifying remaining instances")
1048     result = self._VerifyOrphanInstances(instancelist, node_instance,
1049                                          feedback_fn)
1050     bad = bad or result
1051
1052     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1053       feedback_fn("* Verifying N+1 Memory redundancy")
1054       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1055       bad = bad or result
1056
1057     feedback_fn("* Other Notes")
1058     if i_non_redundant:
1059       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1060                   % len(i_non_redundant))
1061
1062     if i_non_a_balanced:
1063       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1064                   % len(i_non_a_balanced))
1065
1066     if n_offline:
1067       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1068
1069     return not bad
1070
1071   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1072     """Analize the post-hooks' result
1073
1074     This method analyses the hook result, handles it, and sends some
1075     nicely-formatted feedback back to the user.
1076
1077     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1078         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1079     @param hooks_results: the results of the multi-node hooks rpc call
1080     @param feedback_fn: function used send feedback back to the caller
1081     @param lu_result: previous Exec result
1082     @return: the new Exec result, based on the previous result
1083         and hook results
1084
1085     """
1086     # We only really run POST phase hooks, and are only interested in
1087     # their results
1088     if phase == constants.HOOKS_PHASE_POST:
1089       # Used to change hooks' output to proper indentation
1090       indent_re = re.compile('^', re.M)
1091       feedback_fn("* Hooks Results")
1092       if not hooks_results:
1093         feedback_fn("  - ERROR: general communication failure")
1094         lu_result = 1
1095       else:
1096         for node_name in hooks_results:
1097           show_node_header = True
1098           res = hooks_results[node_name]
1099           if res.failed or res.data is False or not isinstance(res.data, list):
1100             if res.offline:
1101               # no need to warn or set fail return value
1102               continue
1103             feedback_fn("    Communication failure in hooks execution")
1104             lu_result = 1
1105             continue
1106           for script, hkr, output in res.data:
1107             if hkr == constants.HKR_FAIL:
1108               # The node header is only shown once, if there are
1109               # failing hooks on that node
1110               if show_node_header:
1111                 feedback_fn("  Node %s:" % node_name)
1112                 show_node_header = False
1113               feedback_fn("    ERROR: Script %s failed, output:" % script)
1114               output = indent_re.sub('      ', output)
1115               feedback_fn("%s" % output)
1116               lu_result = 1
1117
1118       return lu_result
1119
1120
1121 class LUVerifyDisks(NoHooksLU):
1122   """Verifies the cluster disks status.
1123
1124   """
1125   _OP_REQP = []
1126   REQ_BGL = False
1127
1128   def ExpandNames(self):
1129     self.needed_locks = {
1130       locking.LEVEL_NODE: locking.ALL_SET,
1131       locking.LEVEL_INSTANCE: locking.ALL_SET,
1132     }
1133     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1134
1135   def CheckPrereq(self):
1136     """Check prerequisites.
1137
1138     This has no prerequisites.
1139
1140     """
1141     pass
1142
1143   def Exec(self, feedback_fn):
1144     """Verify integrity of cluster disks.
1145
1146     """
1147     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1148
1149     vg_name = self.cfg.GetVGName()
1150     nodes = utils.NiceSort(self.cfg.GetNodeList())
1151     instances = [self.cfg.GetInstanceInfo(name)
1152                  for name in self.cfg.GetInstanceList()]
1153
1154     nv_dict = {}
1155     for inst in instances:
1156       inst_lvs = {}
1157       if (inst.status != "up" or
1158           inst.disk_template not in constants.DTS_NET_MIRROR):
1159         continue
1160       inst.MapLVsByNode(inst_lvs)
1161       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1162       for node, vol_list in inst_lvs.iteritems():
1163         for vol in vol_list:
1164           nv_dict[(node, vol)] = inst
1165
1166     if not nv_dict:
1167       return result
1168
1169     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1170
1171     to_act = set()
1172     for node in nodes:
1173       # node_volume
1174       lvs = node_lvs[node]
1175       if lvs.failed:
1176         if not lvs.offline:
1177           self.LogWarning("Connection to node %s failed: %s" %
1178                           (node, lvs.data))
1179         continue
1180       lvs = lvs.data
1181       if isinstance(lvs, basestring):
1182         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1183         res_nlvm[node] = lvs
1184       elif not isinstance(lvs, dict):
1185         logging.warning("Connection to node %s failed or invalid data"
1186                         " returned", node)
1187         res_nodes.append(node)
1188         continue
1189
1190       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1191         inst = nv_dict.pop((node, lv_name), None)
1192         if (not lv_online and inst is not None
1193             and inst.name not in res_instances):
1194           res_instances.append(inst.name)
1195
1196     # any leftover items in nv_dict are missing LVs, let's arrange the
1197     # data better
1198     for key, inst in nv_dict.iteritems():
1199       if inst.name not in res_missing:
1200         res_missing[inst.name] = []
1201       res_missing[inst.name].append(key)
1202
1203     return result
1204
1205
1206 class LURenameCluster(LogicalUnit):
1207   """Rename the cluster.
1208
1209   """
1210   HPATH = "cluster-rename"
1211   HTYPE = constants.HTYPE_CLUSTER
1212   _OP_REQP = ["name"]
1213
1214   def BuildHooksEnv(self):
1215     """Build hooks env.
1216
1217     """
1218     env = {
1219       "OP_TARGET": self.cfg.GetClusterName(),
1220       "NEW_NAME": self.op.name,
1221       }
1222     mn = self.cfg.GetMasterNode()
1223     return env, [mn], [mn]
1224
1225   def CheckPrereq(self):
1226     """Verify that the passed name is a valid one.
1227
1228     """
1229     hostname = utils.HostInfo(self.op.name)
1230
1231     new_name = hostname.name
1232     self.ip = new_ip = hostname.ip
1233     old_name = self.cfg.GetClusterName()
1234     old_ip = self.cfg.GetMasterIP()
1235     if new_name == old_name and new_ip == old_ip:
1236       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1237                                  " cluster has changed")
1238     if new_ip != old_ip:
1239       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1240         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1241                                    " reachable on the network. Aborting." %
1242                                    new_ip)
1243
1244     self.op.name = new_name
1245
1246   def Exec(self, feedback_fn):
1247     """Rename the cluster.
1248
1249     """
1250     clustername = self.op.name
1251     ip = self.ip
1252
1253     # shutdown the master IP
1254     master = self.cfg.GetMasterNode()
1255     result = self.rpc.call_node_stop_master(master, False)
1256     if result.failed or not result.data:
1257       raise errors.OpExecError("Could not disable the master role")
1258
1259     try:
1260       cluster = self.cfg.GetClusterInfo()
1261       cluster.cluster_name = clustername
1262       cluster.master_ip = ip
1263       self.cfg.Update(cluster)
1264
1265       # update the known hosts file
1266       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1267       node_list = self.cfg.GetNodeList()
1268       try:
1269         node_list.remove(master)
1270       except ValueError:
1271         pass
1272       result = self.rpc.call_upload_file(node_list,
1273                                          constants.SSH_KNOWN_HOSTS_FILE)
1274       for to_node, to_result in result.iteritems():
1275         if to_result.failed or not to_result.data:
1276           logging.error("Copy of file %s to node %s failed",
1277                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1278
1279     finally:
1280       result = self.rpc.call_node_start_master(master, False)
1281       if result.failed or not result.data:
1282         self.LogWarning("Could not re-enable the master role on"
1283                         " the master, please restart manually.")
1284
1285
1286 def _RecursiveCheckIfLVMBased(disk):
1287   """Check if the given disk or its children are lvm-based.
1288
1289   @type disk: L{objects.Disk}
1290   @param disk: the disk to check
1291   @rtype: booleean
1292   @return: boolean indicating whether a LD_LV dev_type was found or not
1293
1294   """
1295   if disk.children:
1296     for chdisk in disk.children:
1297       if _RecursiveCheckIfLVMBased(chdisk):
1298         return True
1299   return disk.dev_type == constants.LD_LV
1300
1301
1302 class LUSetClusterParams(LogicalUnit):
1303   """Change the parameters of the cluster.
1304
1305   """
1306   HPATH = "cluster-modify"
1307   HTYPE = constants.HTYPE_CLUSTER
1308   _OP_REQP = []
1309   REQ_BGL = False
1310
1311   def CheckParameters(self):
1312     """Check parameters
1313
1314     """
1315     if not hasattr(self.op, "candidate_pool_size"):
1316       self.op.candidate_pool_size = None
1317     if self.op.candidate_pool_size is not None:
1318       try:
1319         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1320       except ValueError, err:
1321         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1322                                    str(err))
1323       if self.op.candidate_pool_size < 1:
1324         raise errors.OpPrereqError("At least one master candidate needed")
1325
1326   def ExpandNames(self):
1327     # FIXME: in the future maybe other cluster params won't require checking on
1328     # all nodes to be modified.
1329     self.needed_locks = {
1330       locking.LEVEL_NODE: locking.ALL_SET,
1331     }
1332     self.share_locks[locking.LEVEL_NODE] = 1
1333
1334   def BuildHooksEnv(self):
1335     """Build hooks env.
1336
1337     """
1338     env = {
1339       "OP_TARGET": self.cfg.GetClusterName(),
1340       "NEW_VG_NAME": self.op.vg_name,
1341       }
1342     mn = self.cfg.GetMasterNode()
1343     return env, [mn], [mn]
1344
1345   def CheckPrereq(self):
1346     """Check prerequisites.
1347
1348     This checks whether the given params don't conflict and
1349     if the given volume group is valid.
1350
1351     """
1352     # FIXME: This only works because there is only one parameter that can be
1353     # changed or removed.
1354     if self.op.vg_name is not None and not self.op.vg_name:
1355       instances = self.cfg.GetAllInstancesInfo().values()
1356       for inst in instances:
1357         for disk in inst.disks:
1358           if _RecursiveCheckIfLVMBased(disk):
1359             raise errors.OpPrereqError("Cannot disable lvm storage while"
1360                                        " lvm-based instances exist")
1361
1362     node_list = self.acquired_locks[locking.LEVEL_NODE]
1363
1364     # if vg_name not None, checks given volume group on all nodes
1365     if self.op.vg_name:
1366       vglist = self.rpc.call_vg_list(node_list)
1367       for node in node_list:
1368         if vglist[node].failed:
1369           # ignoring down node
1370           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1371           continue
1372         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1373                                               self.op.vg_name,
1374                                               constants.MIN_VG_SIZE)
1375         if vgstatus:
1376           raise errors.OpPrereqError("Error on node '%s': %s" %
1377                                      (node, vgstatus))
1378
1379     self.cluster = cluster = self.cfg.GetClusterInfo()
1380     # validate beparams changes
1381     if self.op.beparams:
1382       utils.CheckBEParams(self.op.beparams)
1383       self.new_beparams = cluster.FillDict(
1384         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1385
1386     # hypervisor list/parameters
1387     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1388     if self.op.hvparams:
1389       if not isinstance(self.op.hvparams, dict):
1390         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1391       for hv_name, hv_dict in self.op.hvparams.items():
1392         if hv_name not in self.new_hvparams:
1393           self.new_hvparams[hv_name] = hv_dict
1394         else:
1395           self.new_hvparams[hv_name].update(hv_dict)
1396
1397     if self.op.enabled_hypervisors is not None:
1398       self.hv_list = self.op.enabled_hypervisors
1399     else:
1400       self.hv_list = cluster.enabled_hypervisors
1401
1402     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1403       # either the enabled list has changed, or the parameters have, validate
1404       for hv_name, hv_params in self.new_hvparams.items():
1405         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1406             (self.op.enabled_hypervisors and
1407              hv_name in self.op.enabled_hypervisors)):
1408           # either this is a new hypervisor, or its parameters have changed
1409           hv_class = hypervisor.GetHypervisor(hv_name)
1410           hv_class.CheckParameterSyntax(hv_params)
1411           _CheckHVParams(self, node_list, hv_name, hv_params)
1412
1413   def Exec(self, feedback_fn):
1414     """Change the parameters of the cluster.
1415
1416     """
1417     if self.op.vg_name is not None:
1418       if self.op.vg_name != self.cfg.GetVGName():
1419         self.cfg.SetVGName(self.op.vg_name)
1420       else:
1421         feedback_fn("Cluster LVM configuration already in desired"
1422                     " state, not changing")
1423     if self.op.hvparams:
1424       self.cluster.hvparams = self.new_hvparams
1425     if self.op.enabled_hypervisors is not None:
1426       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1427     if self.op.beparams:
1428       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1429     if self.op.candidate_pool_size is not None:
1430       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1431
1432     self.cfg.Update(self.cluster)
1433
1434     # we want to update nodes after the cluster so that if any errors
1435     # happen, we have recorded and saved the cluster info
1436     if self.op.candidate_pool_size is not None:
1437       _AdjustCandidatePool(self)
1438
1439
1440 class LURedistributeConfig(NoHooksLU):
1441   """Force the redistribution of cluster configuration.
1442
1443   This is a very simple LU.
1444
1445   """
1446   _OP_REQP = []
1447   REQ_BGL = False
1448
1449   def ExpandNames(self):
1450     self.needed_locks = {
1451       locking.LEVEL_NODE: locking.ALL_SET,
1452     }
1453     self.share_locks[locking.LEVEL_NODE] = 1
1454
1455   def CheckPrereq(self):
1456     """Check prerequisites.
1457
1458     """
1459
1460   def Exec(self, feedback_fn):
1461     """Redistribute the configuration.
1462
1463     """
1464     self.cfg.Update(self.cfg.GetClusterInfo())
1465
1466
1467 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1468   """Sleep and poll for an instance's disk to sync.
1469
1470   """
1471   if not instance.disks:
1472     return True
1473
1474   if not oneshot:
1475     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1476
1477   node = instance.primary_node
1478
1479   for dev in instance.disks:
1480     lu.cfg.SetDiskID(dev, node)
1481
1482   retries = 0
1483   while True:
1484     max_time = 0
1485     done = True
1486     cumul_degraded = False
1487     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1488     if rstats.failed or not rstats.data:
1489       lu.LogWarning("Can't get any data from node %s", node)
1490       retries += 1
1491       if retries >= 10:
1492         raise errors.RemoteError("Can't contact node %s for mirror data,"
1493                                  " aborting." % node)
1494       time.sleep(6)
1495       continue
1496     rstats = rstats.data
1497     retries = 0
1498     for i in range(len(rstats)):
1499       mstat = rstats[i]
1500       if mstat is None:
1501         lu.LogWarning("Can't compute data for node %s/%s",
1502                            node, instance.disks[i].iv_name)
1503         continue
1504       # we ignore the ldisk parameter
1505       perc_done, est_time, is_degraded, _ = mstat
1506       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1507       if perc_done is not None:
1508         done = False
1509         if est_time is not None:
1510           rem_time = "%d estimated seconds remaining" % est_time
1511           max_time = est_time
1512         else:
1513           rem_time = "no time estimate"
1514         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1515                         (instance.disks[i].iv_name, perc_done, rem_time))
1516     if done or oneshot:
1517       break
1518
1519     time.sleep(min(60, max_time))
1520
1521   if done:
1522     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1523   return not cumul_degraded
1524
1525
1526 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1527   """Check that mirrors are not degraded.
1528
1529   The ldisk parameter, if True, will change the test from the
1530   is_degraded attribute (which represents overall non-ok status for
1531   the device(s)) to the ldisk (representing the local storage status).
1532
1533   """
1534   lu.cfg.SetDiskID(dev, node)
1535   if ldisk:
1536     idx = 6
1537   else:
1538     idx = 5
1539
1540   result = True
1541   if on_primary or dev.AssembleOnSecondary():
1542     rstats = lu.rpc.call_blockdev_find(node, dev)
1543     if rstats.failed or not rstats.data:
1544       logging.warning("Node %s: disk degraded, not found or node down", node)
1545       result = False
1546     else:
1547       result = result and (not rstats.data[idx])
1548   if dev.children:
1549     for child in dev.children:
1550       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1551
1552   return result
1553
1554
1555 class LUDiagnoseOS(NoHooksLU):
1556   """Logical unit for OS diagnose/query.
1557
1558   """
1559   _OP_REQP = ["output_fields", "names"]
1560   REQ_BGL = False
1561   _FIELDS_STATIC = utils.FieldSet()
1562   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1563
1564   def ExpandNames(self):
1565     if self.op.names:
1566       raise errors.OpPrereqError("Selective OS query not supported")
1567
1568     _CheckOutputFields(static=self._FIELDS_STATIC,
1569                        dynamic=self._FIELDS_DYNAMIC,
1570                        selected=self.op.output_fields)
1571
1572     # Lock all nodes, in shared mode
1573     self.needed_locks = {}
1574     self.share_locks[locking.LEVEL_NODE] = 1
1575     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1576
1577   def CheckPrereq(self):
1578     """Check prerequisites.
1579
1580     """
1581
1582   @staticmethod
1583   def _DiagnoseByOS(node_list, rlist):
1584     """Remaps a per-node return list into an a per-os per-node dictionary
1585
1586     @param node_list: a list with the names of all nodes
1587     @param rlist: a map with node names as keys and OS objects as values
1588
1589     @rtype: dict
1590     @returns: a dictionary with osnames as keys and as value another map, with
1591         nodes as keys and list of OS objects as values, eg::
1592
1593           {"debian-etch": {"node1": [<object>,...],
1594                            "node2": [<object>,]}
1595           }
1596
1597     """
1598     all_os = {}
1599     for node_name, nr in rlist.iteritems():
1600       if nr.failed or not nr.data:
1601         continue
1602       for os_obj in nr.data:
1603         if os_obj.name not in all_os:
1604           # build a list of nodes for this os containing empty lists
1605           # for each node in node_list
1606           all_os[os_obj.name] = {}
1607           for nname in node_list:
1608             all_os[os_obj.name][nname] = []
1609         all_os[os_obj.name][node_name].append(os_obj)
1610     return all_os
1611
1612   def Exec(self, feedback_fn):
1613     """Compute the list of OSes.
1614
1615     """
1616     node_list = self.acquired_locks[locking.LEVEL_NODE]
1617     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1618                    if node in node_list]
1619     node_data = self.rpc.call_os_diagnose(valid_nodes)
1620     if node_data == False:
1621       raise errors.OpExecError("Can't gather the list of OSes")
1622     pol = self._DiagnoseByOS(valid_nodes, node_data)
1623     output = []
1624     for os_name, os_data in pol.iteritems():
1625       row = []
1626       for field in self.op.output_fields:
1627         if field == "name":
1628           val = os_name
1629         elif field == "valid":
1630           val = utils.all([osl and osl[0] for osl in os_data.values()])
1631         elif field == "node_status":
1632           val = {}
1633           for node_name, nos_list in os_data.iteritems():
1634             val[node_name] = [(v.status, v.path) for v in nos_list]
1635         else:
1636           raise errors.ParameterError(field)
1637         row.append(val)
1638       output.append(row)
1639
1640     return output
1641
1642
1643 class LURemoveNode(LogicalUnit):
1644   """Logical unit for removing a node.
1645
1646   """
1647   HPATH = "node-remove"
1648   HTYPE = constants.HTYPE_NODE
1649   _OP_REQP = ["node_name"]
1650
1651   def BuildHooksEnv(self):
1652     """Build hooks env.
1653
1654     This doesn't run on the target node in the pre phase as a failed
1655     node would then be impossible to remove.
1656
1657     """
1658     env = {
1659       "OP_TARGET": self.op.node_name,
1660       "NODE_NAME": self.op.node_name,
1661       }
1662     all_nodes = self.cfg.GetNodeList()
1663     all_nodes.remove(self.op.node_name)
1664     return env, all_nodes, all_nodes
1665
1666   def CheckPrereq(self):
1667     """Check prerequisites.
1668
1669     This checks:
1670      - the node exists in the configuration
1671      - it does not have primary or secondary instances
1672      - it's not the master
1673
1674     Any errors are signalled by raising errors.OpPrereqError.
1675
1676     """
1677     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1678     if node is None:
1679       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1680
1681     instance_list = self.cfg.GetInstanceList()
1682
1683     masternode = self.cfg.GetMasterNode()
1684     if node.name == masternode:
1685       raise errors.OpPrereqError("Node is the master node,"
1686                                  " you need to failover first.")
1687
1688     for instance_name in instance_list:
1689       instance = self.cfg.GetInstanceInfo(instance_name)
1690       if node.name in instance.all_nodes:
1691         raise errors.OpPrereqError("Instance %s is still running on the node,"
1692                                    " please remove first." % instance_name)
1693     self.op.node_name = node.name
1694     self.node = node
1695
1696   def Exec(self, feedback_fn):
1697     """Removes the node from the cluster.
1698
1699     """
1700     node = self.node
1701     logging.info("Stopping the node daemon and removing configs from node %s",
1702                  node.name)
1703
1704     self.context.RemoveNode(node.name)
1705
1706     self.rpc.call_node_leave_cluster(node.name)
1707
1708     # Promote nodes to master candidate as needed
1709     _AdjustCandidatePool(self)
1710
1711
1712 class LUQueryNodes(NoHooksLU):
1713   """Logical unit for querying nodes.
1714
1715   """
1716   _OP_REQP = ["output_fields", "names"]
1717   REQ_BGL = False
1718   _FIELDS_DYNAMIC = utils.FieldSet(
1719     "dtotal", "dfree",
1720     "mtotal", "mnode", "mfree",
1721     "bootid",
1722     "ctotal",
1723     )
1724
1725   _FIELDS_STATIC = utils.FieldSet(
1726     "name", "pinst_cnt", "sinst_cnt",
1727     "pinst_list", "sinst_list",
1728     "pip", "sip", "tags",
1729     "serial_no",
1730     "master_candidate",
1731     "master",
1732     "offline",
1733     )
1734
1735   def ExpandNames(self):
1736     _CheckOutputFields(static=self._FIELDS_STATIC,
1737                        dynamic=self._FIELDS_DYNAMIC,
1738                        selected=self.op.output_fields)
1739
1740     self.needed_locks = {}
1741     self.share_locks[locking.LEVEL_NODE] = 1
1742
1743     if self.op.names:
1744       self.wanted = _GetWantedNodes(self, self.op.names)
1745     else:
1746       self.wanted = locking.ALL_SET
1747
1748     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1749     if self.do_locking:
1750       # if we don't request only static fields, we need to lock the nodes
1751       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1752
1753
1754   def CheckPrereq(self):
1755     """Check prerequisites.
1756
1757     """
1758     # The validation of the node list is done in the _GetWantedNodes,
1759     # if non empty, and if empty, there's no validation to do
1760     pass
1761
1762   def Exec(self, feedback_fn):
1763     """Computes the list of nodes and their attributes.
1764
1765     """
1766     all_info = self.cfg.GetAllNodesInfo()
1767     if self.do_locking:
1768       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1769     elif self.wanted != locking.ALL_SET:
1770       nodenames = self.wanted
1771       missing = set(nodenames).difference(all_info.keys())
1772       if missing:
1773         raise errors.OpExecError(
1774           "Some nodes were removed before retrieving their data: %s" % missing)
1775     else:
1776       nodenames = all_info.keys()
1777
1778     nodenames = utils.NiceSort(nodenames)
1779     nodelist = [all_info[name] for name in nodenames]
1780
1781     # begin data gathering
1782
1783     if self.do_locking:
1784       live_data = {}
1785       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1786                                           self.cfg.GetHypervisorType())
1787       for name in nodenames:
1788         nodeinfo = node_data[name]
1789         if not nodeinfo.failed and nodeinfo.data:
1790           nodeinfo = nodeinfo.data
1791           fn = utils.TryConvert
1792           live_data[name] = {
1793             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1794             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1795             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1796             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1797             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1798             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1799             "bootid": nodeinfo.get('bootid', None),
1800             }
1801         else:
1802           live_data[name] = {}
1803     else:
1804       live_data = dict.fromkeys(nodenames, {})
1805
1806     node_to_primary = dict([(name, set()) for name in nodenames])
1807     node_to_secondary = dict([(name, set()) for name in nodenames])
1808
1809     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1810                              "sinst_cnt", "sinst_list"))
1811     if inst_fields & frozenset(self.op.output_fields):
1812       instancelist = self.cfg.GetInstanceList()
1813
1814       for instance_name in instancelist:
1815         inst = self.cfg.GetInstanceInfo(instance_name)
1816         if inst.primary_node in node_to_primary:
1817           node_to_primary[inst.primary_node].add(inst.name)
1818         for secnode in inst.secondary_nodes:
1819           if secnode in node_to_secondary:
1820             node_to_secondary[secnode].add(inst.name)
1821
1822     master_node = self.cfg.GetMasterNode()
1823
1824     # end data gathering
1825
1826     output = []
1827     for node in nodelist:
1828       node_output = []
1829       for field in self.op.output_fields:
1830         if field == "name":
1831           val = node.name
1832         elif field == "pinst_list":
1833           val = list(node_to_primary[node.name])
1834         elif field == "sinst_list":
1835           val = list(node_to_secondary[node.name])
1836         elif field == "pinst_cnt":
1837           val = len(node_to_primary[node.name])
1838         elif field == "sinst_cnt":
1839           val = len(node_to_secondary[node.name])
1840         elif field == "pip":
1841           val = node.primary_ip
1842         elif field == "sip":
1843           val = node.secondary_ip
1844         elif field == "tags":
1845           val = list(node.GetTags())
1846         elif field == "serial_no":
1847           val = node.serial_no
1848         elif field == "master_candidate":
1849           val = node.master_candidate
1850         elif field == "master":
1851           val = node.name == master_node
1852         elif field == "offline":
1853           val = node.offline
1854         elif self._FIELDS_DYNAMIC.Matches(field):
1855           val = live_data[node.name].get(field, None)
1856         else:
1857           raise errors.ParameterError(field)
1858         node_output.append(val)
1859       output.append(node_output)
1860
1861     return output
1862
1863
1864 class LUQueryNodeVolumes(NoHooksLU):
1865   """Logical unit for getting volumes on node(s).
1866
1867   """
1868   _OP_REQP = ["nodes", "output_fields"]
1869   REQ_BGL = False
1870   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1871   _FIELDS_STATIC = utils.FieldSet("node")
1872
1873   def ExpandNames(self):
1874     _CheckOutputFields(static=self._FIELDS_STATIC,
1875                        dynamic=self._FIELDS_DYNAMIC,
1876                        selected=self.op.output_fields)
1877
1878     self.needed_locks = {}
1879     self.share_locks[locking.LEVEL_NODE] = 1
1880     if not self.op.nodes:
1881       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1882     else:
1883       self.needed_locks[locking.LEVEL_NODE] = \
1884         _GetWantedNodes(self, self.op.nodes)
1885
1886   def CheckPrereq(self):
1887     """Check prerequisites.
1888
1889     This checks that the fields required are valid output fields.
1890
1891     """
1892     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1893
1894   def Exec(self, feedback_fn):
1895     """Computes the list of nodes and their attributes.
1896
1897     """
1898     nodenames = self.nodes
1899     volumes = self.rpc.call_node_volumes(nodenames)
1900
1901     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1902              in self.cfg.GetInstanceList()]
1903
1904     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1905
1906     output = []
1907     for node in nodenames:
1908       if node not in volumes or volumes[node].failed or not volumes[node].data:
1909         continue
1910
1911       node_vols = volumes[node].data[:]
1912       node_vols.sort(key=lambda vol: vol['dev'])
1913
1914       for vol in node_vols:
1915         node_output = []
1916         for field in self.op.output_fields:
1917           if field == "node":
1918             val = node
1919           elif field == "phys":
1920             val = vol['dev']
1921           elif field == "vg":
1922             val = vol['vg']
1923           elif field == "name":
1924             val = vol['name']
1925           elif field == "size":
1926             val = int(float(vol['size']))
1927           elif field == "instance":
1928             for inst in ilist:
1929               if node not in lv_by_node[inst]:
1930                 continue
1931               if vol['name'] in lv_by_node[inst][node]:
1932                 val = inst.name
1933                 break
1934             else:
1935               val = '-'
1936           else:
1937             raise errors.ParameterError(field)
1938           node_output.append(str(val))
1939
1940         output.append(node_output)
1941
1942     return output
1943
1944
1945 class LUAddNode(LogicalUnit):
1946   """Logical unit for adding node to the cluster.
1947
1948   """
1949   HPATH = "node-add"
1950   HTYPE = constants.HTYPE_NODE
1951   _OP_REQP = ["node_name"]
1952
1953   def BuildHooksEnv(self):
1954     """Build hooks env.
1955
1956     This will run on all nodes before, and on all nodes + the new node after.
1957
1958     """
1959     env = {
1960       "OP_TARGET": self.op.node_name,
1961       "NODE_NAME": self.op.node_name,
1962       "NODE_PIP": self.op.primary_ip,
1963       "NODE_SIP": self.op.secondary_ip,
1964       }
1965     nodes_0 = self.cfg.GetNodeList()
1966     nodes_1 = nodes_0 + [self.op.node_name, ]
1967     return env, nodes_0, nodes_1
1968
1969   def CheckPrereq(self):
1970     """Check prerequisites.
1971
1972     This checks:
1973      - the new node is not already in the config
1974      - it is resolvable
1975      - its parameters (single/dual homed) matches the cluster
1976
1977     Any errors are signalled by raising errors.OpPrereqError.
1978
1979     """
1980     node_name = self.op.node_name
1981     cfg = self.cfg
1982
1983     dns_data = utils.HostInfo(node_name)
1984
1985     node = dns_data.name
1986     primary_ip = self.op.primary_ip = dns_data.ip
1987     secondary_ip = getattr(self.op, "secondary_ip", None)
1988     if secondary_ip is None:
1989       secondary_ip = primary_ip
1990     if not utils.IsValidIP(secondary_ip):
1991       raise errors.OpPrereqError("Invalid secondary IP given")
1992     self.op.secondary_ip = secondary_ip
1993
1994     node_list = cfg.GetNodeList()
1995     if not self.op.readd and node in node_list:
1996       raise errors.OpPrereqError("Node %s is already in the configuration" %
1997                                  node)
1998     elif self.op.readd and node not in node_list:
1999       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2000
2001     for existing_node_name in node_list:
2002       existing_node = cfg.GetNodeInfo(existing_node_name)
2003
2004       if self.op.readd and node == existing_node_name:
2005         if (existing_node.primary_ip != primary_ip or
2006             existing_node.secondary_ip != secondary_ip):
2007           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2008                                      " address configuration as before")
2009         continue
2010
2011       if (existing_node.primary_ip == primary_ip or
2012           existing_node.secondary_ip == primary_ip or
2013           existing_node.primary_ip == secondary_ip or
2014           existing_node.secondary_ip == secondary_ip):
2015         raise errors.OpPrereqError("New node ip address(es) conflict with"
2016                                    " existing node %s" % existing_node.name)
2017
2018     # check that the type of the node (single versus dual homed) is the
2019     # same as for the master
2020     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2021     master_singlehomed = myself.secondary_ip == myself.primary_ip
2022     newbie_singlehomed = secondary_ip == primary_ip
2023     if master_singlehomed != newbie_singlehomed:
2024       if master_singlehomed:
2025         raise errors.OpPrereqError("The master has no private ip but the"
2026                                    " new node has one")
2027       else:
2028         raise errors.OpPrereqError("The master has a private ip but the"
2029                                    " new node doesn't have one")
2030
2031     # checks reachablity
2032     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2033       raise errors.OpPrereqError("Node not reachable by ping")
2034
2035     if not newbie_singlehomed:
2036       # check reachability from my secondary ip to newbie's secondary ip
2037       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2038                            source=myself.secondary_ip):
2039         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2040                                    " based ping to noded port")
2041
2042     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2043     mc_now, _ = self.cfg.GetMasterCandidateStats()
2044     master_candidate = mc_now < cp_size
2045
2046     self.new_node = objects.Node(name=node,
2047                                  primary_ip=primary_ip,
2048                                  secondary_ip=secondary_ip,
2049                                  master_candidate=master_candidate,
2050                                  offline=False)
2051
2052   def Exec(self, feedback_fn):
2053     """Adds the new node to the cluster.
2054
2055     """
2056     new_node = self.new_node
2057     node = new_node.name
2058
2059     # check connectivity
2060     result = self.rpc.call_version([node])[node]
2061     result.Raise()
2062     if result.data:
2063       if constants.PROTOCOL_VERSION == result.data:
2064         logging.info("Communication to node %s fine, sw version %s match",
2065                      node, result.data)
2066       else:
2067         raise errors.OpExecError("Version mismatch master version %s,"
2068                                  " node version %s" %
2069                                  (constants.PROTOCOL_VERSION, result.data))
2070     else:
2071       raise errors.OpExecError("Cannot get version from the new node")
2072
2073     # setup ssh on node
2074     logging.info("Copy ssh key to node %s", node)
2075     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2076     keyarray = []
2077     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2078                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2079                 priv_key, pub_key]
2080
2081     for i in keyfiles:
2082       f = open(i, 'r')
2083       try:
2084         keyarray.append(f.read())
2085       finally:
2086         f.close()
2087
2088     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2089                                     keyarray[2],
2090                                     keyarray[3], keyarray[4], keyarray[5])
2091
2092     if result.failed or not result.data:
2093       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2094
2095     # Add node to our /etc/hosts, and add key to known_hosts
2096     utils.AddHostToEtcHosts(new_node.name)
2097
2098     if new_node.secondary_ip != new_node.primary_ip:
2099       result = self.rpc.call_node_has_ip_address(new_node.name,
2100                                                  new_node.secondary_ip)
2101       if result.failed or not result.data:
2102         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2103                                  " you gave (%s). Please fix and re-run this"
2104                                  " command." % new_node.secondary_ip)
2105
2106     node_verify_list = [self.cfg.GetMasterNode()]
2107     node_verify_param = {
2108       'nodelist': [node],
2109       # TODO: do a node-net-test as well?
2110     }
2111
2112     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2113                                        self.cfg.GetClusterName())
2114     for verifier in node_verify_list:
2115       if result[verifier].failed or not result[verifier].data:
2116         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2117                                  " for remote verification" % verifier)
2118       if result[verifier].data['nodelist']:
2119         for failed in result[verifier].data['nodelist']:
2120           feedback_fn("ssh/hostname verification failed %s -> %s" %
2121                       (verifier, result[verifier]['nodelist'][failed]))
2122         raise errors.OpExecError("ssh/hostname verification failed.")
2123
2124     # Distribute updated /etc/hosts and known_hosts to all nodes,
2125     # including the node just added
2126     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2127     dist_nodes = self.cfg.GetNodeList()
2128     if not self.op.readd:
2129       dist_nodes.append(node)
2130     if myself.name in dist_nodes:
2131       dist_nodes.remove(myself.name)
2132
2133     logging.debug("Copying hosts and known_hosts to all nodes")
2134     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2135       result = self.rpc.call_upload_file(dist_nodes, fname)
2136       for to_node, to_result in result.iteritems():
2137         if to_result.failed or not to_result.data:
2138           logging.error("Copy of file %s to node %s failed", fname, to_node)
2139
2140     to_copy = []
2141     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2142       to_copy.append(constants.VNC_PASSWORD_FILE)
2143     for fname in to_copy:
2144       result = self.rpc.call_upload_file([node], fname)
2145       if result[node].failed or not result[node]:
2146         logging.error("Could not copy file %s to node %s", fname, node)
2147
2148     if self.op.readd:
2149       self.context.ReaddNode(new_node)
2150     else:
2151       self.context.AddNode(new_node)
2152
2153
2154 class LUSetNodeParams(LogicalUnit):
2155   """Modifies the parameters of a node.
2156
2157   """
2158   HPATH = "node-modify"
2159   HTYPE = constants.HTYPE_NODE
2160   _OP_REQP = ["node_name"]
2161   REQ_BGL = False
2162
2163   def CheckArguments(self):
2164     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2165     if node_name is None:
2166       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2167     self.op.node_name = node_name
2168     _CheckBooleanOpField(self.op, 'master_candidate')
2169     _CheckBooleanOpField(self.op, 'offline')
2170     if self.op.master_candidate is None and self.op.offline is None:
2171       raise errors.OpPrereqError("Please pass at least one modification")
2172     if self.op.offline == True and self.op.master_candidate == True:
2173       raise errors.OpPrereqError("Can't set the node into offline and"
2174                                  " master_candidate at the same time")
2175
2176   def ExpandNames(self):
2177     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2178
2179   def BuildHooksEnv(self):
2180     """Build hooks env.
2181
2182     This runs on the master node.
2183
2184     """
2185     env = {
2186       "OP_TARGET": self.op.node_name,
2187       "MASTER_CANDIDATE": str(self.op.master_candidate),
2188       "OFFLINE": str(self.op.offline),
2189       }
2190     nl = [self.cfg.GetMasterNode(),
2191           self.op.node_name]
2192     return env, nl, nl
2193
2194   def CheckPrereq(self):
2195     """Check prerequisites.
2196
2197     This only checks the instance list against the existing names.
2198
2199     """
2200     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2201
2202     if ((self.op.master_candidate == False or self.op.offline == True)
2203         and node.master_candidate):
2204       # we will demote the node from master_candidate
2205       if self.op.node_name == self.cfg.GetMasterNode():
2206         raise errors.OpPrereqError("The master node has to be a"
2207                                    " master candidate and online")
2208       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2209       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2210       if num_candidates <= cp_size:
2211         msg = ("Not enough master candidates (desired"
2212                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2213         if self.op.force:
2214           self.LogWarning(msg)
2215         else:
2216           raise errors.OpPrereqError(msg)
2217
2218     if (self.op.master_candidate == True and node.offline and
2219         not self.op.offline == False):
2220       raise errors.OpPrereqError("Can't set an offline node to"
2221                                  " master_candidate")
2222
2223     return
2224
2225   def Exec(self, feedback_fn):
2226     """Modifies a node.
2227
2228     """
2229     node = self.node
2230
2231     result = []
2232
2233     if self.op.offline is not None:
2234       node.offline = self.op.offline
2235       result.append(("offline", str(self.op.offline)))
2236       if self.op.offline == True and node.master_candidate:
2237         node.master_candidate = False
2238         result.append(("master_candidate", "auto-demotion due to offline"))
2239
2240     if self.op.master_candidate is not None:
2241       node.master_candidate = self.op.master_candidate
2242       result.append(("master_candidate", str(self.op.master_candidate)))
2243       if self.op.master_candidate == False:
2244         rrc = self.rpc.call_node_demote_from_mc(node.name)
2245         if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2246             or len(rrc.data) != 2):
2247           self.LogWarning("Node rpc error: %s" % rrc.error)
2248         elif not rrc.data[0]:
2249           self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2250
2251     # this will trigger configuration file update, if needed
2252     self.cfg.Update(node)
2253     # this will trigger job queue propagation or cleanup
2254     if self.op.node_name != self.cfg.GetMasterNode():
2255       self.context.ReaddNode(node)
2256
2257     return result
2258
2259
2260 class LUQueryClusterInfo(NoHooksLU):
2261   """Query cluster configuration.
2262
2263   """
2264   _OP_REQP = []
2265   REQ_BGL = False
2266
2267   def ExpandNames(self):
2268     self.needed_locks = {}
2269
2270   def CheckPrereq(self):
2271     """No prerequsites needed for this LU.
2272
2273     """
2274     pass
2275
2276   def Exec(self, feedback_fn):
2277     """Return cluster config.
2278
2279     """
2280     cluster = self.cfg.GetClusterInfo()
2281     result = {
2282       "software_version": constants.RELEASE_VERSION,
2283       "protocol_version": constants.PROTOCOL_VERSION,
2284       "config_version": constants.CONFIG_VERSION,
2285       "os_api_version": constants.OS_API_VERSION,
2286       "export_version": constants.EXPORT_VERSION,
2287       "architecture": (platform.architecture()[0], platform.machine()),
2288       "name": cluster.cluster_name,
2289       "master": cluster.master_node,
2290       "default_hypervisor": cluster.default_hypervisor,
2291       "enabled_hypervisors": cluster.enabled_hypervisors,
2292       "hvparams": cluster.hvparams,
2293       "beparams": cluster.beparams,
2294       "candidate_pool_size": cluster.candidate_pool_size,
2295       }
2296
2297     return result
2298
2299
2300 class LUQueryConfigValues(NoHooksLU):
2301   """Return configuration values.
2302
2303   """
2304   _OP_REQP = []
2305   REQ_BGL = False
2306   _FIELDS_DYNAMIC = utils.FieldSet()
2307   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2308
2309   def ExpandNames(self):
2310     self.needed_locks = {}
2311
2312     _CheckOutputFields(static=self._FIELDS_STATIC,
2313                        dynamic=self._FIELDS_DYNAMIC,
2314                        selected=self.op.output_fields)
2315
2316   def CheckPrereq(self):
2317     """No prerequisites.
2318
2319     """
2320     pass
2321
2322   def Exec(self, feedback_fn):
2323     """Dump a representation of the cluster config to the standard output.
2324
2325     """
2326     values = []
2327     for field in self.op.output_fields:
2328       if field == "cluster_name":
2329         entry = self.cfg.GetClusterName()
2330       elif field == "master_node":
2331         entry = self.cfg.GetMasterNode()
2332       elif field == "drain_flag":
2333         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2334       else:
2335         raise errors.ParameterError(field)
2336       values.append(entry)
2337     return values
2338
2339
2340 class LUActivateInstanceDisks(NoHooksLU):
2341   """Bring up an instance's disks.
2342
2343   """
2344   _OP_REQP = ["instance_name"]
2345   REQ_BGL = False
2346
2347   def ExpandNames(self):
2348     self._ExpandAndLockInstance()
2349     self.needed_locks[locking.LEVEL_NODE] = []
2350     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2351
2352   def DeclareLocks(self, level):
2353     if level == locking.LEVEL_NODE:
2354       self._LockInstancesNodes()
2355
2356   def CheckPrereq(self):
2357     """Check prerequisites.
2358
2359     This checks that the instance is in the cluster.
2360
2361     """
2362     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2363     assert self.instance is not None, \
2364       "Cannot retrieve locked instance %s" % self.op.instance_name
2365     _CheckNodeOnline(self, self.instance.primary_node)
2366
2367   def Exec(self, feedback_fn):
2368     """Activate the disks.
2369
2370     """
2371     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2372     if not disks_ok:
2373       raise errors.OpExecError("Cannot activate block devices")
2374
2375     return disks_info
2376
2377
2378 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2379   """Prepare the block devices for an instance.
2380
2381   This sets up the block devices on all nodes.
2382
2383   @type lu: L{LogicalUnit}
2384   @param lu: the logical unit on whose behalf we execute
2385   @type instance: L{objects.Instance}
2386   @param instance: the instance for whose disks we assemble
2387   @type ignore_secondaries: boolean
2388   @param ignore_secondaries: if true, errors on secondary nodes
2389       won't result in an error return from the function
2390   @return: False if the operation failed, otherwise a list of
2391       (host, instance_visible_name, node_visible_name)
2392       with the mapping from node devices to instance devices
2393
2394   """
2395   device_info = []
2396   disks_ok = True
2397   iname = instance.name
2398   # With the two passes mechanism we try to reduce the window of
2399   # opportunity for the race condition of switching DRBD to primary
2400   # before handshaking occured, but we do not eliminate it
2401
2402   # The proper fix would be to wait (with some limits) until the
2403   # connection has been made and drbd transitions from WFConnection
2404   # into any other network-connected state (Connected, SyncTarget,
2405   # SyncSource, etc.)
2406
2407   # 1st pass, assemble on all nodes in secondary mode
2408   for inst_disk in instance.disks:
2409     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2410       lu.cfg.SetDiskID(node_disk, node)
2411       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2412       if result.failed or not result:
2413         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2414                            " (is_primary=False, pass=1)",
2415                            inst_disk.iv_name, node)
2416         if not ignore_secondaries:
2417           disks_ok = False
2418
2419   # FIXME: race condition on drbd migration to primary
2420
2421   # 2nd pass, do only the primary node
2422   for inst_disk in instance.disks:
2423     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2424       if node != instance.primary_node:
2425         continue
2426       lu.cfg.SetDiskID(node_disk, node)
2427       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2428       if result.failed or not result:
2429         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2430                            " (is_primary=True, pass=2)",
2431                            inst_disk.iv_name, node)
2432         disks_ok = False
2433     device_info.append((instance.primary_node, inst_disk.iv_name, result.data))
2434
2435   # leave the disks configured for the primary node
2436   # this is a workaround that would be fixed better by
2437   # improving the logical/physical id handling
2438   for disk in instance.disks:
2439     lu.cfg.SetDiskID(disk, instance.primary_node)
2440
2441   return disks_ok, device_info
2442
2443
2444 def _StartInstanceDisks(lu, instance, force):
2445   """Start the disks of an instance.
2446
2447   """
2448   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2449                                            ignore_secondaries=force)
2450   if not disks_ok:
2451     _ShutdownInstanceDisks(lu, instance)
2452     if force is not None and not force:
2453       lu.proc.LogWarning("", hint="If the message above refers to a"
2454                          " secondary node,"
2455                          " you can retry the operation using '--force'.")
2456     raise errors.OpExecError("Disk consistency error")
2457
2458
2459 class LUDeactivateInstanceDisks(NoHooksLU):
2460   """Shutdown an instance's disks.
2461
2462   """
2463   _OP_REQP = ["instance_name"]
2464   REQ_BGL = False
2465
2466   def ExpandNames(self):
2467     self._ExpandAndLockInstance()
2468     self.needed_locks[locking.LEVEL_NODE] = []
2469     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2470
2471   def DeclareLocks(self, level):
2472     if level == locking.LEVEL_NODE:
2473       self._LockInstancesNodes()
2474
2475   def CheckPrereq(self):
2476     """Check prerequisites.
2477
2478     This checks that the instance is in the cluster.
2479
2480     """
2481     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2482     assert self.instance is not None, \
2483       "Cannot retrieve locked instance %s" % self.op.instance_name
2484
2485   def Exec(self, feedback_fn):
2486     """Deactivate the disks
2487
2488     """
2489     instance = self.instance
2490     _SafeShutdownInstanceDisks(self, instance)
2491
2492
2493 def _SafeShutdownInstanceDisks(lu, instance):
2494   """Shutdown block devices of an instance.
2495
2496   This function checks if an instance is running, before calling
2497   _ShutdownInstanceDisks.
2498
2499   """
2500   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2501                                       [instance.hypervisor])
2502   ins_l = ins_l[instance.primary_node]
2503   if ins_l.failed or not isinstance(ins_l.data, list):
2504     raise errors.OpExecError("Can't contact node '%s'" %
2505                              instance.primary_node)
2506
2507   if instance.name in ins_l.data:
2508     raise errors.OpExecError("Instance is running, can't shutdown"
2509                              " block devices.")
2510
2511   _ShutdownInstanceDisks(lu, instance)
2512
2513
2514 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2515   """Shutdown block devices of an instance.
2516
2517   This does the shutdown on all nodes of the instance.
2518
2519   If the ignore_primary is false, errors on the primary node are
2520   ignored.
2521
2522   """
2523   result = True
2524   for disk in instance.disks:
2525     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2526       lu.cfg.SetDiskID(top_disk, node)
2527       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2528       if result.failed or not result.data:
2529         logging.error("Could not shutdown block device %s on node %s",
2530                       disk.iv_name, node)
2531         if not ignore_primary or node != instance.primary_node:
2532           result = False
2533   return result
2534
2535
2536 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2537   """Checks if a node has enough free memory.
2538
2539   This function check if a given node has the needed amount of free
2540   memory. In case the node has less memory or we cannot get the
2541   information from the node, this function raise an OpPrereqError
2542   exception.
2543
2544   @type lu: C{LogicalUnit}
2545   @param lu: a logical unit from which we get configuration data
2546   @type node: C{str}
2547   @param node: the node to check
2548   @type reason: C{str}
2549   @param reason: string to use in the error message
2550   @type requested: C{int}
2551   @param requested: the amount of memory in MiB to check for
2552   @type hypervisor_name: C{str}
2553   @param hypervisor_name: the hypervisor to ask for memory stats
2554   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2555       we cannot check the node
2556
2557   """
2558   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2559   nodeinfo[node].Raise()
2560   free_mem = nodeinfo[node].data.get('memory_free')
2561   if not isinstance(free_mem, int):
2562     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2563                              " was '%s'" % (node, free_mem))
2564   if requested > free_mem:
2565     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2566                              " needed %s MiB, available %s MiB" %
2567                              (node, reason, requested, free_mem))
2568
2569
2570 class LUStartupInstance(LogicalUnit):
2571   """Starts an instance.
2572
2573   """
2574   HPATH = "instance-start"
2575   HTYPE = constants.HTYPE_INSTANCE
2576   _OP_REQP = ["instance_name", "force"]
2577   REQ_BGL = False
2578
2579   def ExpandNames(self):
2580     self._ExpandAndLockInstance()
2581
2582   def BuildHooksEnv(self):
2583     """Build hooks env.
2584
2585     This runs on master, primary and secondary nodes of the instance.
2586
2587     """
2588     env = {
2589       "FORCE": self.op.force,
2590       }
2591     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2592     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2593     return env, nl, nl
2594
2595   def CheckPrereq(self):
2596     """Check prerequisites.
2597
2598     This checks that the instance is in the cluster.
2599
2600     """
2601     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2602     assert self.instance is not None, \
2603       "Cannot retrieve locked instance %s" % self.op.instance_name
2604
2605     _CheckNodeOnline(self, instance.primary_node)
2606
2607     bep = self.cfg.GetClusterInfo().FillBE(instance)
2608     # check bridges existance
2609     _CheckInstanceBridgesExist(self, instance)
2610
2611     _CheckNodeFreeMemory(self, instance.primary_node,
2612                          "starting instance %s" % instance.name,
2613                          bep[constants.BE_MEMORY], instance.hypervisor)
2614
2615   def Exec(self, feedback_fn):
2616     """Start the instance.
2617
2618     """
2619     instance = self.instance
2620     force = self.op.force
2621     extra_args = getattr(self.op, "extra_args", "")
2622
2623     self.cfg.MarkInstanceUp(instance.name)
2624
2625     node_current = instance.primary_node
2626
2627     _StartInstanceDisks(self, instance, force)
2628
2629     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2630     msg = result.RemoteFailMsg()
2631     if msg:
2632       _ShutdownInstanceDisks(self, instance)
2633       raise errors.OpExecError("Could not start instance: %s" % msg)
2634
2635
2636 class LURebootInstance(LogicalUnit):
2637   """Reboot an instance.
2638
2639   """
2640   HPATH = "instance-reboot"
2641   HTYPE = constants.HTYPE_INSTANCE
2642   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2643   REQ_BGL = False
2644
2645   def ExpandNames(self):
2646     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2647                                    constants.INSTANCE_REBOOT_HARD,
2648                                    constants.INSTANCE_REBOOT_FULL]:
2649       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2650                                   (constants.INSTANCE_REBOOT_SOFT,
2651                                    constants.INSTANCE_REBOOT_HARD,
2652                                    constants.INSTANCE_REBOOT_FULL))
2653     self._ExpandAndLockInstance()
2654
2655   def BuildHooksEnv(self):
2656     """Build hooks env.
2657
2658     This runs on master, primary and secondary nodes of the instance.
2659
2660     """
2661     env = {
2662       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2663       }
2664     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2665     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2666     return env, nl, nl
2667
2668   def CheckPrereq(self):
2669     """Check prerequisites.
2670
2671     This checks that the instance is in the cluster.
2672
2673     """
2674     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2675     assert self.instance is not None, \
2676       "Cannot retrieve locked instance %s" % self.op.instance_name
2677
2678     _CheckNodeOnline(self, instance.primary_node)
2679
2680     # check bridges existance
2681     _CheckInstanceBridgesExist(self, instance)
2682
2683   def Exec(self, feedback_fn):
2684     """Reboot the instance.
2685
2686     """
2687     instance = self.instance
2688     ignore_secondaries = self.op.ignore_secondaries
2689     reboot_type = self.op.reboot_type
2690     extra_args = getattr(self.op, "extra_args", "")
2691
2692     node_current = instance.primary_node
2693
2694     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2695                        constants.INSTANCE_REBOOT_HARD]:
2696       result = self.rpc.call_instance_reboot(node_current, instance,
2697                                              reboot_type, extra_args)
2698       if result.failed or not result.data:
2699         raise errors.OpExecError("Could not reboot instance")
2700     else:
2701       if not self.rpc.call_instance_shutdown(node_current, instance):
2702         raise errors.OpExecError("could not shutdown instance for full reboot")
2703       _ShutdownInstanceDisks(self, instance)
2704       _StartInstanceDisks(self, instance, ignore_secondaries)
2705       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2706       msg = result.RemoteFailMsg()
2707       if msg:
2708         _ShutdownInstanceDisks(self, instance)
2709         raise errors.OpExecError("Could not start instance for"
2710                                  " full reboot: %s" % msg)
2711
2712     self.cfg.MarkInstanceUp(instance.name)
2713
2714
2715 class LUShutdownInstance(LogicalUnit):
2716   """Shutdown an instance.
2717
2718   """
2719   HPATH = "instance-stop"
2720   HTYPE = constants.HTYPE_INSTANCE
2721   _OP_REQP = ["instance_name"]
2722   REQ_BGL = False
2723
2724   def ExpandNames(self):
2725     self._ExpandAndLockInstance()
2726
2727   def BuildHooksEnv(self):
2728     """Build hooks env.
2729
2730     This runs on master, primary and secondary nodes of the instance.
2731
2732     """
2733     env = _BuildInstanceHookEnvByObject(self, self.instance)
2734     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2735     return env, nl, nl
2736
2737   def CheckPrereq(self):
2738     """Check prerequisites.
2739
2740     This checks that the instance is in the cluster.
2741
2742     """
2743     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2744     assert self.instance is not None, \
2745       "Cannot retrieve locked instance %s" % self.op.instance_name
2746     _CheckNodeOnline(self, self.instance.primary_node)
2747
2748   def Exec(self, feedback_fn):
2749     """Shutdown the instance.
2750
2751     """
2752     instance = self.instance
2753     node_current = instance.primary_node
2754     self.cfg.MarkInstanceDown(instance.name)
2755     result = self.rpc.call_instance_shutdown(node_current, instance)
2756     if result.failed or not result.data:
2757       self.proc.LogWarning("Could not shutdown instance")
2758
2759     _ShutdownInstanceDisks(self, instance)
2760
2761
2762 class LUReinstallInstance(LogicalUnit):
2763   """Reinstall an instance.
2764
2765   """
2766   HPATH = "instance-reinstall"
2767   HTYPE = constants.HTYPE_INSTANCE
2768   _OP_REQP = ["instance_name"]
2769   REQ_BGL = False
2770
2771   def ExpandNames(self):
2772     self._ExpandAndLockInstance()
2773
2774   def BuildHooksEnv(self):
2775     """Build hooks env.
2776
2777     This runs on master, primary and secondary nodes of the instance.
2778
2779     """
2780     env = _BuildInstanceHookEnvByObject(self, self.instance)
2781     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2782     return env, nl, nl
2783
2784   def CheckPrereq(self):
2785     """Check prerequisites.
2786
2787     This checks that the instance is in the cluster and is not running.
2788
2789     """
2790     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2791     assert instance is not None, \
2792       "Cannot retrieve locked instance %s" % self.op.instance_name
2793     _CheckNodeOnline(self, instance.primary_node)
2794
2795     if instance.disk_template == constants.DT_DISKLESS:
2796       raise errors.OpPrereqError("Instance '%s' has no disks" %
2797                                  self.op.instance_name)
2798     if instance.status != "down":
2799       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2800                                  self.op.instance_name)
2801     remote_info = self.rpc.call_instance_info(instance.primary_node,
2802                                               instance.name,
2803                                               instance.hypervisor)
2804     if remote_info.failed or remote_info.data:
2805       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2806                                  (self.op.instance_name,
2807                                   instance.primary_node))
2808
2809     self.op.os_type = getattr(self.op, "os_type", None)
2810     if self.op.os_type is not None:
2811       # OS verification
2812       pnode = self.cfg.GetNodeInfo(
2813         self.cfg.ExpandNodeName(instance.primary_node))
2814       if pnode is None:
2815         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2816                                    self.op.pnode)
2817       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2818       result.Raise()
2819       if not isinstance(result.data, objects.OS):
2820         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2821                                    " primary node"  % self.op.os_type)
2822
2823     self.instance = instance
2824
2825   def Exec(self, feedback_fn):
2826     """Reinstall the instance.
2827
2828     """
2829     inst = self.instance
2830
2831     if self.op.os_type is not None:
2832       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2833       inst.os = self.op.os_type
2834       self.cfg.Update(inst)
2835
2836     _StartInstanceDisks(self, inst, None)
2837     try:
2838       feedback_fn("Running the instance OS create scripts...")
2839       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2840       msg = result.RemoteFailMsg()
2841       if msg:
2842         raise errors.OpExecError("Could not install OS for instance %s"
2843                                  " on node %s: %s" %
2844                                  (inst.name, inst.primary_node, msg))
2845     finally:
2846       _ShutdownInstanceDisks(self, inst)
2847
2848
2849 class LURenameInstance(LogicalUnit):
2850   """Rename an instance.
2851
2852   """
2853   HPATH = "instance-rename"
2854   HTYPE = constants.HTYPE_INSTANCE
2855   _OP_REQP = ["instance_name", "new_name"]
2856
2857   def BuildHooksEnv(self):
2858     """Build hooks env.
2859
2860     This runs on master, primary and secondary nodes of the instance.
2861
2862     """
2863     env = _BuildInstanceHookEnvByObject(self, self.instance)
2864     env["INSTANCE_NEW_NAME"] = self.op.new_name
2865     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2866     return env, nl, nl
2867
2868   def CheckPrereq(self):
2869     """Check prerequisites.
2870
2871     This checks that the instance is in the cluster and is not running.
2872
2873     """
2874     instance = self.cfg.GetInstanceInfo(
2875       self.cfg.ExpandInstanceName(self.op.instance_name))
2876     if instance is None:
2877       raise errors.OpPrereqError("Instance '%s' not known" %
2878                                  self.op.instance_name)
2879     _CheckNodeOnline(self, instance.primary_node)
2880
2881     if instance.status != "down":
2882       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2883                                  self.op.instance_name)
2884     remote_info = self.rpc.call_instance_info(instance.primary_node,
2885                                               instance.name,
2886                                               instance.hypervisor)
2887     remote_info.Raise()
2888     if remote_info.data:
2889       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2890                                  (self.op.instance_name,
2891                                   instance.primary_node))
2892     self.instance = instance
2893
2894     # new name verification
2895     name_info = utils.HostInfo(self.op.new_name)
2896
2897     self.op.new_name = new_name = name_info.name
2898     instance_list = self.cfg.GetInstanceList()
2899     if new_name in instance_list:
2900       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2901                                  new_name)
2902
2903     if not getattr(self.op, "ignore_ip", False):
2904       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2905         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2906                                    (name_info.ip, new_name))
2907
2908
2909   def Exec(self, feedback_fn):
2910     """Reinstall the instance.
2911
2912     """
2913     inst = self.instance
2914     old_name = inst.name
2915
2916     if inst.disk_template == constants.DT_FILE:
2917       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2918
2919     self.cfg.RenameInstance(inst.name, self.op.new_name)
2920     # Change the instance lock. This is definitely safe while we hold the BGL
2921     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2922     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2923
2924     # re-read the instance from the configuration after rename
2925     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2926
2927     if inst.disk_template == constants.DT_FILE:
2928       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2929       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2930                                                      old_file_storage_dir,
2931                                                      new_file_storage_dir)
2932       result.Raise()
2933       if not result.data:
2934         raise errors.OpExecError("Could not connect to node '%s' to rename"
2935                                  " directory '%s' to '%s' (but the instance"
2936                                  " has been renamed in Ganeti)" % (
2937                                  inst.primary_node, old_file_storage_dir,
2938                                  new_file_storage_dir))
2939
2940       if not result.data[0]:
2941         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2942                                  " (but the instance has been renamed in"
2943                                  " Ganeti)" % (old_file_storage_dir,
2944                                                new_file_storage_dir))
2945
2946     _StartInstanceDisks(self, inst, None)
2947     try:
2948       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2949                                                  old_name)
2950       if result.failed or not result.data:
2951         msg = ("Could not run OS rename script for instance %s on node %s"
2952                " (but the instance has been renamed in Ganeti)" %
2953                (inst.name, inst.primary_node))
2954         self.proc.LogWarning(msg)
2955     finally:
2956       _ShutdownInstanceDisks(self, inst)
2957
2958
2959 class LURemoveInstance(LogicalUnit):
2960   """Remove an instance.
2961
2962   """
2963   HPATH = "instance-remove"
2964   HTYPE = constants.HTYPE_INSTANCE
2965   _OP_REQP = ["instance_name", "ignore_failures"]
2966   REQ_BGL = False
2967
2968   def ExpandNames(self):
2969     self._ExpandAndLockInstance()
2970     self.needed_locks[locking.LEVEL_NODE] = []
2971     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2972
2973   def DeclareLocks(self, level):
2974     if level == locking.LEVEL_NODE:
2975       self._LockInstancesNodes()
2976
2977   def BuildHooksEnv(self):
2978     """Build hooks env.
2979
2980     This runs on master, primary and secondary nodes of the instance.
2981
2982     """
2983     env = _BuildInstanceHookEnvByObject(self, self.instance)
2984     nl = [self.cfg.GetMasterNode()]
2985     return env, nl, nl
2986
2987   def CheckPrereq(self):
2988     """Check prerequisites.
2989
2990     This checks that the instance is in the cluster.
2991
2992     """
2993     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2994     assert self.instance is not None, \
2995       "Cannot retrieve locked instance %s" % self.op.instance_name
2996
2997   def Exec(self, feedback_fn):
2998     """Remove the instance.
2999
3000     """
3001     instance = self.instance
3002     logging.info("Shutting down instance %s on node %s",
3003                  instance.name, instance.primary_node)
3004
3005     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3006     if result.failed or not result.data:
3007       if self.op.ignore_failures:
3008         feedback_fn("Warning: can't shutdown instance")
3009       else:
3010         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3011                                  (instance.name, instance.primary_node))
3012
3013     logging.info("Removing block devices for instance %s", instance.name)
3014
3015     if not _RemoveDisks(self, instance):
3016       if self.op.ignore_failures:
3017         feedback_fn("Warning: can't remove instance's disks")
3018       else:
3019         raise errors.OpExecError("Can't remove instance's disks")
3020
3021     logging.info("Removing instance %s out of cluster config", instance.name)
3022
3023     self.cfg.RemoveInstance(instance.name)
3024     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3025
3026
3027 class LUQueryInstances(NoHooksLU):
3028   """Logical unit for querying instances.
3029
3030   """
3031   _OP_REQP = ["output_fields", "names"]
3032   REQ_BGL = False
3033   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3034                                     "admin_state", "admin_ram",
3035                                     "disk_template", "ip", "mac", "bridge",
3036                                     "sda_size", "sdb_size", "vcpus", "tags",
3037                                     "network_port", "beparams",
3038                                     "(disk).(size)/([0-9]+)",
3039                                     "(disk).(sizes)",
3040                                     "(nic).(mac|ip|bridge)/([0-9]+)",
3041                                     "(nic).(macs|ips|bridges)",
3042                                     "(disk|nic).(count)",
3043                                     "serial_no", "hypervisor", "hvparams",] +
3044                                   ["hv/%s" % name
3045                                    for name in constants.HVS_PARAMETERS] +
3046                                   ["be/%s" % name
3047                                    for name in constants.BES_PARAMETERS])
3048   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3049
3050
3051   def ExpandNames(self):
3052     _CheckOutputFields(static=self._FIELDS_STATIC,
3053                        dynamic=self._FIELDS_DYNAMIC,
3054                        selected=self.op.output_fields)
3055
3056     self.needed_locks = {}
3057     self.share_locks[locking.LEVEL_INSTANCE] = 1
3058     self.share_locks[locking.LEVEL_NODE] = 1
3059
3060     if self.op.names:
3061       self.wanted = _GetWantedInstances(self, self.op.names)
3062     else:
3063       self.wanted = locking.ALL_SET
3064
3065     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3066     if self.do_locking:
3067       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3068       self.needed_locks[locking.LEVEL_NODE] = []
3069       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3070
3071   def DeclareLocks(self, level):
3072     if level == locking.LEVEL_NODE and self.do_locking:
3073       self._LockInstancesNodes()
3074
3075   def CheckPrereq(self):
3076     """Check prerequisites.
3077
3078     """
3079     pass
3080
3081   def Exec(self, feedback_fn):
3082     """Computes the list of nodes and their attributes.
3083
3084     """
3085     all_info = self.cfg.GetAllInstancesInfo()
3086     if self.do_locking:
3087       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3088     elif self.wanted != locking.ALL_SET:
3089       instance_names = self.wanted
3090       missing = set(instance_names).difference(all_info.keys())
3091       if missing:
3092         raise errors.OpExecError(
3093           "Some instances were removed before retrieving their data: %s"
3094           % missing)
3095     else:
3096       instance_names = all_info.keys()
3097
3098     instance_names = utils.NiceSort(instance_names)
3099     instance_list = [all_info[iname] for iname in instance_names]
3100
3101     # begin data gathering
3102
3103     nodes = frozenset([inst.primary_node for inst in instance_list])
3104     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3105
3106     bad_nodes = []
3107     off_nodes = []
3108     if self.do_locking:
3109       live_data = {}
3110       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3111       for name in nodes:
3112         result = node_data[name]
3113         if result.offline:
3114           # offline nodes will be in both lists
3115           off_nodes.append(name)
3116         if result.failed:
3117           bad_nodes.append(name)
3118         else:
3119           if result.data:
3120             live_data.update(result.data)
3121             # else no instance is alive
3122     else:
3123       live_data = dict([(name, {}) for name in instance_names])
3124
3125     # end data gathering
3126
3127     HVPREFIX = "hv/"
3128     BEPREFIX = "be/"
3129     output = []
3130     for instance in instance_list:
3131       iout = []
3132       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3133       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3134       for field in self.op.output_fields:
3135         st_match = self._FIELDS_STATIC.Matches(field)
3136         if field == "name":
3137           val = instance.name
3138         elif field == "os":
3139           val = instance.os
3140         elif field == "pnode":
3141           val = instance.primary_node
3142         elif field == "snodes":
3143           val = list(instance.secondary_nodes)
3144         elif field == "admin_state":
3145           val = (instance.status != "down")
3146         elif field == "oper_state":
3147           if instance.primary_node in bad_nodes:
3148             val = None
3149           else:
3150             val = bool(live_data.get(instance.name))
3151         elif field == "status":
3152           if instance.primary_node in off_nodes:
3153             val = "ERROR_nodeoffline"
3154           elif instance.primary_node in bad_nodes:
3155             val = "ERROR_nodedown"
3156           else:
3157             running = bool(live_data.get(instance.name))
3158             if running:
3159               if instance.status != "down":
3160                 val = "running"
3161               else:
3162                 val = "ERROR_up"
3163             else:
3164               if instance.status != "down":
3165                 val = "ERROR_down"
3166               else:
3167                 val = "ADMIN_down"
3168         elif field == "oper_ram":
3169           if instance.primary_node in bad_nodes:
3170             val = None
3171           elif instance.name in live_data:
3172             val = live_data[instance.name].get("memory", "?")
3173           else:
3174             val = "-"
3175         elif field == "disk_template":
3176           val = instance.disk_template
3177         elif field == "ip":
3178           val = instance.nics[0].ip
3179         elif field == "bridge":
3180           val = instance.nics[0].bridge
3181         elif field == "mac":
3182           val = instance.nics[0].mac
3183         elif field == "sda_size" or field == "sdb_size":
3184           idx = ord(field[2]) - ord('a')
3185           try:
3186             val = instance.FindDisk(idx).size
3187           except errors.OpPrereqError:
3188             val = None
3189         elif field == "tags":
3190           val = list(instance.GetTags())
3191         elif field == "serial_no":
3192           val = instance.serial_no
3193         elif field == "network_port":
3194           val = instance.network_port
3195         elif field == "hypervisor":
3196           val = instance.hypervisor
3197         elif field == "hvparams":
3198           val = i_hv
3199         elif (field.startswith(HVPREFIX) and
3200               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3201           val = i_hv.get(field[len(HVPREFIX):], None)
3202         elif field == "beparams":
3203           val = i_be
3204         elif (field.startswith(BEPREFIX) and
3205               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3206           val = i_be.get(field[len(BEPREFIX):], None)
3207         elif st_match and st_match.groups():
3208           # matches a variable list
3209           st_groups = st_match.groups()
3210           if st_groups and st_groups[0] == "disk":
3211             if st_groups[1] == "count":
3212               val = len(instance.disks)
3213             elif st_groups[1] == "sizes":
3214               val = [disk.size for disk in instance.disks]
3215             elif st_groups[1] == "size":
3216               try:
3217                 val = instance.FindDisk(st_groups[2]).size
3218               except errors.OpPrereqError:
3219                 val = None
3220             else:
3221               assert False, "Unhandled disk parameter"
3222           elif st_groups[0] == "nic":
3223             if st_groups[1] == "count":
3224               val = len(instance.nics)
3225             elif st_groups[1] == "macs":
3226               val = [nic.mac for nic in instance.nics]
3227             elif st_groups[1] == "ips":
3228               val = [nic.ip for nic in instance.nics]
3229             elif st_groups[1] == "bridges":
3230               val = [nic.bridge for nic in instance.nics]
3231             else:
3232               # index-based item
3233               nic_idx = int(st_groups[2])
3234               if nic_idx >= len(instance.nics):
3235                 val = None
3236               else:
3237                 if st_groups[1] == "mac":
3238                   val = instance.nics[nic_idx].mac
3239                 elif st_groups[1] == "ip":
3240                   val = instance.nics[nic_idx].ip
3241                 elif st_groups[1] == "bridge":
3242                   val = instance.nics[nic_idx].bridge
3243                 else:
3244                   assert False, "Unhandled NIC parameter"
3245           else:
3246             assert False, "Unhandled variable parameter"
3247         else:
3248           raise errors.ParameterError(field)
3249         iout.append(val)
3250       output.append(iout)
3251
3252     return output
3253
3254
3255 class LUFailoverInstance(LogicalUnit):
3256   """Failover an instance.
3257
3258   """
3259   HPATH = "instance-failover"
3260   HTYPE = constants.HTYPE_INSTANCE
3261   _OP_REQP = ["instance_name", "ignore_consistency"]
3262   REQ_BGL = False
3263
3264   def ExpandNames(self):
3265     self._ExpandAndLockInstance()
3266     self.needed_locks[locking.LEVEL_NODE] = []
3267     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3268
3269   def DeclareLocks(self, level):
3270     if level == locking.LEVEL_NODE:
3271       self._LockInstancesNodes()
3272
3273   def BuildHooksEnv(self):
3274     """Build hooks env.
3275
3276     This runs on master, primary and secondary nodes of the instance.
3277
3278     """
3279     env = {
3280       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3281       }
3282     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3283     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3284     return env, nl, nl
3285
3286   def CheckPrereq(self):
3287     """Check prerequisites.
3288
3289     This checks that the instance is in the cluster.
3290
3291     """
3292     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3293     assert self.instance is not None, \
3294       "Cannot retrieve locked instance %s" % self.op.instance_name
3295
3296     bep = self.cfg.GetClusterInfo().FillBE(instance)
3297     if instance.disk_template not in constants.DTS_NET_MIRROR:
3298       raise errors.OpPrereqError("Instance's disk layout is not"
3299                                  " network mirrored, cannot failover.")
3300
3301     secondary_nodes = instance.secondary_nodes
3302     if not secondary_nodes:
3303       raise errors.ProgrammerError("no secondary node but using "
3304                                    "a mirrored disk template")
3305
3306     target_node = secondary_nodes[0]
3307     _CheckNodeOnline(self, target_node)
3308     # check memory requirements on the secondary node
3309     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3310                          instance.name, bep[constants.BE_MEMORY],
3311                          instance.hypervisor)
3312
3313     # check bridge existance
3314     brlist = [nic.bridge for nic in instance.nics]
3315     result = self.rpc.call_bridges_exist(target_node, brlist)
3316     result.Raise()
3317     if not result.data:
3318       raise errors.OpPrereqError("One or more target bridges %s does not"
3319                                  " exist on destination node '%s'" %
3320                                  (brlist, target_node))
3321
3322   def Exec(self, feedback_fn):
3323     """Failover an instance.
3324
3325     The failover is done by shutting it down on its present node and
3326     starting it on the secondary.
3327
3328     """
3329     instance = self.instance
3330
3331     source_node = instance.primary_node
3332     target_node = instance.secondary_nodes[0]
3333
3334     feedback_fn("* checking disk consistency between source and target")
3335     for dev in instance.disks:
3336       # for drbd, these are drbd over lvm
3337       if not _CheckDiskConsistency(self, dev, target_node, False):
3338         if instance.status == "up" and not self.op.ignore_consistency:
3339           raise errors.OpExecError("Disk %s is degraded on target node,"
3340                                    " aborting failover." % dev.iv_name)
3341
3342     feedback_fn("* shutting down instance on source node")
3343     logging.info("Shutting down instance %s on node %s",
3344                  instance.name, source_node)
3345
3346     result = self.rpc.call_instance_shutdown(source_node, instance)
3347     if result.failed or not result.data:
3348       if self.op.ignore_consistency:
3349         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3350                              " Proceeding"
3351                              " anyway. Please make sure node %s is down",
3352                              instance.name, source_node, source_node)
3353       else:
3354         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3355                                  (instance.name, source_node))
3356
3357     feedback_fn("* deactivating the instance's disks on source node")
3358     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3359       raise errors.OpExecError("Can't shut down the instance's disks.")
3360
3361     instance.primary_node = target_node
3362     # distribute new instance config to the other nodes
3363     self.cfg.Update(instance)
3364
3365     # Only start the instance if it's marked as up
3366     if instance.status == "up":
3367       feedback_fn("* activating the instance's disks on target node")
3368       logging.info("Starting instance %s on node %s",
3369                    instance.name, target_node)
3370
3371       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3372                                                ignore_secondaries=True)
3373       if not disks_ok:
3374         _ShutdownInstanceDisks(self, instance)
3375         raise errors.OpExecError("Can't activate the instance's disks")
3376
3377       feedback_fn("* starting the instance on the target node")
3378       result = self.rpc.call_instance_start(target_node, instance, None)
3379       msg = result.RemoteFailMsg()
3380       if msg:
3381         _ShutdownInstanceDisks(self, instance)
3382         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3383                                  (instance.name, target_node, msg))
3384
3385
3386 class LUMigrateInstance(LogicalUnit):
3387   """Migrate an instance.
3388
3389   This is migration without shutting down, compared to the failover,
3390   which is done with shutdown.
3391
3392   """
3393   HPATH = "instance-migrate"
3394   HTYPE = constants.HTYPE_INSTANCE
3395   _OP_REQP = ["instance_name", "live", "cleanup"]
3396
3397   REQ_BGL = False
3398
3399   def ExpandNames(self):
3400     self._ExpandAndLockInstance()
3401     self.needed_locks[locking.LEVEL_NODE] = []
3402     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3403
3404   def DeclareLocks(self, level):
3405     if level == locking.LEVEL_NODE:
3406       self._LockInstancesNodes()
3407
3408   def BuildHooksEnv(self):
3409     """Build hooks env.
3410
3411     This runs on master, primary and secondary nodes of the instance.
3412
3413     """
3414     env = _BuildInstanceHookEnvByObject(self, self.instance)
3415     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3416     return env, nl, nl
3417
3418   def CheckPrereq(self):
3419     """Check prerequisites.
3420
3421     This checks that the instance is in the cluster.
3422
3423     """
3424     instance = self.cfg.GetInstanceInfo(
3425       self.cfg.ExpandInstanceName(self.op.instance_name))
3426     if instance is None:
3427       raise errors.OpPrereqError("Instance '%s' not known" %
3428                                  self.op.instance_name)
3429
3430     if instance.disk_template != constants.DT_DRBD8:
3431       raise errors.OpPrereqError("Instance's disk layout is not"
3432                                  " drbd8, cannot migrate.")
3433
3434     secondary_nodes = instance.secondary_nodes
3435     if not secondary_nodes:
3436       raise errors.ProgrammerError("no secondary node but using "
3437                                    "drbd8 disk template")
3438
3439     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3440
3441     target_node = secondary_nodes[0]
3442     # check memory requirements on the secondary node
3443     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3444                          instance.name, i_be[constants.BE_MEMORY],
3445                          instance.hypervisor)
3446
3447     # check bridge existance
3448     brlist = [nic.bridge for nic in instance.nics]
3449     result = self.rpc.call_bridges_exist(target_node, brlist)
3450     if result.failed or not result.data:
3451       raise errors.OpPrereqError("One or more target bridges %s does not"
3452                                  " exist on destination node '%s'" %
3453                                  (brlist, target_node))
3454
3455     if not self.op.cleanup:
3456       result = self.rpc.call_instance_migratable(instance.primary_node,
3457                                                  instance)
3458       msg = result.RemoteFailMsg()
3459       if msg:
3460         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3461                                    msg)
3462
3463     self.instance = instance
3464
3465   def _WaitUntilSync(self):
3466     """Poll with custom rpc for disk sync.
3467
3468     This uses our own step-based rpc call.
3469
3470     """
3471     self.feedback_fn("* wait until resync is done")
3472     all_done = False
3473     while not all_done:
3474       all_done = True
3475       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3476                                             self.nodes_ip,
3477                                             self.instance.disks)
3478       min_percent = 100
3479       for node, nres in result.items():
3480         msg = nres.RemoteFailMsg()
3481         if msg:
3482           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3483                                    (node, msg))
3484         node_done, node_percent = nres.data[1]
3485         all_done = all_done and node_done
3486         if node_percent is not None:
3487           min_percent = min(min_percent, node_percent)
3488       if not all_done:
3489         if min_percent < 100:
3490           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3491         time.sleep(2)
3492
3493   def _EnsureSecondary(self, node):
3494     """Demote a node to secondary.
3495
3496     """
3497     self.feedback_fn("* switching node %s to secondary mode" % node)
3498
3499     for dev in self.instance.disks:
3500       self.cfg.SetDiskID(dev, node)
3501
3502     result = self.rpc.call_blockdev_close(node, self.instance.name,
3503                                           self.instance.disks)
3504     msg = result.RemoteFailMsg()
3505     if msg:
3506       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3507                                " error %s" % (node, msg))
3508
3509   def _GoStandalone(self):
3510     """Disconnect from the network.
3511
3512     """
3513     self.feedback_fn("* changing into standalone mode")
3514     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3515                                                self.instance.disks)
3516     for node, nres in result.items():
3517       msg = nres.RemoteFailMsg()
3518       if msg:
3519         raise errors.OpExecError("Cannot disconnect disks node %s,"
3520                                  " error %s" % (node, msg))
3521
3522   def _GoReconnect(self, multimaster):
3523     """Reconnect to the network.
3524
3525     """
3526     if multimaster:
3527       msg = "dual-master"
3528     else:
3529       msg = "single-master"
3530     self.feedback_fn("* changing disks into %s mode" % msg)
3531     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3532                                            self.instance.disks,
3533                                            self.instance.name, multimaster)
3534     for node, nres in result.items():
3535       msg = nres.RemoteFailMsg()
3536       if msg:
3537         raise errors.OpExecError("Cannot change disks config on node %s,"
3538                                  " error: %s" % (node, msg))
3539
3540   def _ExecCleanup(self):
3541     """Try to cleanup after a failed migration.
3542
3543     The cleanup is done by:
3544       - check that the instance is running only on one node
3545         (and update the config if needed)
3546       - change disks on its secondary node to secondary
3547       - wait until disks are fully synchronized
3548       - disconnect from the network
3549       - change disks into single-master mode
3550       - wait again until disks are fully synchronized
3551
3552     """
3553     instance = self.instance
3554     target_node = self.target_node
3555     source_node = self.source_node
3556
3557     # check running on only one node
3558     self.feedback_fn("* checking where the instance actually runs"
3559                      " (if this hangs, the hypervisor might be in"
3560                      " a bad state)")
3561     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3562     for node, result in ins_l.items():
3563       result.Raise()
3564       if not isinstance(result.data, list):
3565         raise errors.OpExecError("Can't contact node '%s'" % node)
3566
3567     runningon_source = instance.name in ins_l[source_node].data
3568     runningon_target = instance.name in ins_l[target_node].data
3569
3570     if runningon_source and runningon_target:
3571       raise errors.OpExecError("Instance seems to be running on two nodes,"
3572                                " or the hypervisor is confused. You will have"
3573                                " to ensure manually that it runs only on one"
3574                                " and restart this operation.")
3575
3576     if not (runningon_source or runningon_target):
3577       raise errors.OpExecError("Instance does not seem to be running at all."
3578                                " In this case, it's safer to repair by"
3579                                " running 'gnt-instance stop' to ensure disk"
3580                                " shutdown, and then restarting it.")
3581
3582     if runningon_target:
3583       # the migration has actually succeeded, we need to update the config
3584       self.feedback_fn("* instance running on secondary node (%s),"
3585                        " updating config" % target_node)
3586       instance.primary_node = target_node
3587       self.cfg.Update(instance)
3588       demoted_node = source_node
3589     else:
3590       self.feedback_fn("* instance confirmed to be running on its"
3591                        " primary node (%s)" % source_node)
3592       demoted_node = target_node
3593
3594     self._EnsureSecondary(demoted_node)
3595     try:
3596       self._WaitUntilSync()
3597     except errors.OpExecError:
3598       # we ignore here errors, since if the device is standalone, it
3599       # won't be able to sync
3600       pass
3601     self._GoStandalone()
3602     self._GoReconnect(False)
3603     self._WaitUntilSync()
3604
3605     self.feedback_fn("* done")
3606
3607   def _ExecMigration(self):
3608     """Migrate an instance.
3609
3610     The migrate is done by:
3611       - change the disks into dual-master mode
3612       - wait until disks are fully synchronized again
3613       - migrate the instance
3614       - change disks on the new secondary node (the old primary) to secondary
3615       - wait until disks are fully synchronized
3616       - change disks into single-master mode
3617
3618     """
3619     instance = self.instance
3620     target_node = self.target_node
3621     source_node = self.source_node
3622
3623     self.feedback_fn("* checking disk consistency between source and target")
3624     for dev in instance.disks:
3625       if not _CheckDiskConsistency(self, dev, target_node, False):
3626         raise errors.OpExecError("Disk %s is degraded or not fully"
3627                                  " synchronized on target node,"
3628                                  " aborting migrate." % dev.iv_name)
3629
3630     self._EnsureSecondary(target_node)
3631     self._GoStandalone()
3632     self._GoReconnect(True)
3633     self._WaitUntilSync()
3634
3635     self.feedback_fn("* migrating instance to %s" % target_node)
3636     time.sleep(10)
3637     result = self.rpc.call_instance_migrate(source_node, instance,
3638                                             self.nodes_ip[target_node],
3639                                             self.op.live)
3640     msg = result.RemoteFailMsg()
3641     if msg:
3642       logging.error("Instance migration failed, trying to revert"
3643                     " disk status: %s", msg)
3644       try:
3645         self._EnsureSecondary(target_node)
3646         self._GoStandalone()
3647         self._GoReconnect(False)
3648         self._WaitUntilSync()
3649       except errors.OpExecError, err:
3650         self.LogWarning("Migration failed and I can't reconnect the"
3651                         " drives: error '%s'\n"
3652                         "Please look and recover the instance status" %
3653                         str(err))
3654
3655       raise errors.OpExecError("Could not migrate instance %s: %s" %
3656                                (instance.name, msg))
3657     time.sleep(10)
3658
3659     instance.primary_node = target_node
3660     # distribute new instance config to the other nodes
3661     self.cfg.Update(instance)
3662
3663     self._EnsureSecondary(source_node)
3664     self._WaitUntilSync()
3665     self._GoStandalone()
3666     self._GoReconnect(False)
3667     self._WaitUntilSync()
3668
3669     self.feedback_fn("* done")
3670
3671   def Exec(self, feedback_fn):
3672     """Perform the migration.
3673
3674     """
3675     self.feedback_fn = feedback_fn
3676
3677     self.source_node = self.instance.primary_node
3678     self.target_node = self.instance.secondary_nodes[0]
3679     self.all_nodes = [self.source_node, self.target_node]
3680     self.nodes_ip = {
3681       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3682       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3683       }
3684     if self.op.cleanup:
3685       return self._ExecCleanup()
3686     else:
3687       return self._ExecMigration()
3688
3689
3690 def _CreateBlockDev(lu, node, instance, device, force_create,
3691                     info, force_open):
3692   """Create a tree of block devices on a given node.
3693
3694   If this device type has to be created on secondaries, create it and
3695   all its children.
3696
3697   If not, just recurse to children keeping the same 'force' value.
3698
3699   @param lu: the lu on whose behalf we execute
3700   @param node: the node on which to create the device
3701   @type instance: L{objects.Instance}
3702   @param instance: the instance which owns the device
3703   @type device: L{objects.Disk}
3704   @param device: the device to create
3705   @type force_create: boolean
3706   @param force_create: whether to force creation of this device; this
3707       will be change to True whenever we find a device which has
3708       CreateOnSecondary() attribute
3709   @param info: the extra 'metadata' we should attach to the device
3710       (this will be represented as a LVM tag)
3711   @type force_open: boolean
3712   @param force_open: this parameter will be passes to the
3713       L{backend.CreateBlockDevice} function where it specifies
3714       whether we run on primary or not, and it affects both
3715       the child assembly and the device own Open() execution
3716
3717   """
3718   if device.CreateOnSecondary():
3719     force_create = True
3720
3721   if device.children:
3722     for child in device.children:
3723       _CreateBlockDev(lu, node, instance, child, force_create,
3724                       info, force_open)
3725
3726   if not force_create:
3727     return
3728
3729   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3730
3731
3732 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3733   """Create a single block device on a given node.
3734
3735   This will not recurse over children of the device, so they must be
3736   created in advance.
3737
3738   @param lu: the lu on whose behalf we execute
3739   @param node: the node on which to create the device
3740   @type instance: L{objects.Instance}
3741   @param instance: the instance which owns the device
3742   @type device: L{objects.Disk}
3743   @param device: the device to create
3744   @param info: the extra 'metadata' we should attach to the device
3745       (this will be represented as a LVM tag)
3746   @type force_open: boolean
3747   @param force_open: this parameter will be passes to the
3748       L{backend.CreateBlockDevice} function where it specifies
3749       whether we run on primary or not, and it affects both
3750       the child assembly and the device own Open() execution
3751
3752   """
3753   lu.cfg.SetDiskID(device, node)
3754   result = lu.rpc.call_blockdev_create(node, device, device.size,
3755                                        instance.name, force_open, info)
3756   msg = result.RemoteFailMsg()
3757   if msg:
3758     raise errors.OpExecError("Can't create block device %s on"
3759                              " node %s for instance %s: %s" %
3760                              (device, node, instance.name, msg))
3761   if device.physical_id is None:
3762     device.physical_id = result.data[1]
3763
3764
3765 def _GenerateUniqueNames(lu, exts):
3766   """Generate a suitable LV name.
3767
3768   This will generate a logical volume name for the given instance.
3769
3770   """
3771   results = []
3772   for val in exts:
3773     new_id = lu.cfg.GenerateUniqueID()
3774     results.append("%s%s" % (new_id, val))
3775   return results
3776
3777
3778 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3779                          p_minor, s_minor):
3780   """Generate a drbd8 device complete with its children.
3781
3782   """
3783   port = lu.cfg.AllocatePort()
3784   vgname = lu.cfg.GetVGName()
3785   shared_secret = lu.cfg.GenerateDRBDSecret()
3786   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3787                           logical_id=(vgname, names[0]))
3788   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3789                           logical_id=(vgname, names[1]))
3790   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3791                           logical_id=(primary, secondary, port,
3792                                       p_minor, s_minor,
3793                                       shared_secret),
3794                           children=[dev_data, dev_meta],
3795                           iv_name=iv_name)
3796   return drbd_dev
3797
3798
3799 def _GenerateDiskTemplate(lu, template_name,
3800                           instance_name, primary_node,
3801                           secondary_nodes, disk_info,
3802                           file_storage_dir, file_driver,
3803                           base_index):
3804   """Generate the entire disk layout for a given template type.
3805
3806   """
3807   #TODO: compute space requirements
3808
3809   vgname = lu.cfg.GetVGName()
3810   disk_count = len(disk_info)
3811   disks = []
3812   if template_name == constants.DT_DISKLESS:
3813     pass
3814   elif template_name == constants.DT_PLAIN:
3815     if len(secondary_nodes) != 0:
3816       raise errors.ProgrammerError("Wrong template configuration")
3817
3818     names = _GenerateUniqueNames(lu, [".disk%d" % i
3819                                       for i in range(disk_count)])
3820     for idx, disk in enumerate(disk_info):
3821       disk_index = idx + base_index
3822       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3823                               logical_id=(vgname, names[idx]),
3824                               iv_name="disk/%d" % disk_index)
3825       disks.append(disk_dev)
3826   elif template_name == constants.DT_DRBD8:
3827     if len(secondary_nodes) != 1:
3828       raise errors.ProgrammerError("Wrong template configuration")
3829     remote_node = secondary_nodes[0]
3830     minors = lu.cfg.AllocateDRBDMinor(
3831       [primary_node, remote_node] * len(disk_info), instance_name)
3832
3833     names = []
3834     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
3835                                                for i in range(disk_count)]):
3836       names.append(lv_prefix + "_data")
3837       names.append(lv_prefix + "_meta")
3838     for idx, disk in enumerate(disk_info):
3839       disk_index = idx + base_index
3840       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3841                                       disk["size"], names[idx*2:idx*2+2],
3842                                       "disk/%d" % disk_index,
3843                                       minors[idx*2], minors[idx*2+1])
3844       disks.append(disk_dev)
3845   elif template_name == constants.DT_FILE:
3846     if len(secondary_nodes) != 0:
3847       raise errors.ProgrammerError("Wrong template configuration")
3848
3849     for idx, disk in enumerate(disk_info):
3850       disk_index = idx + base_index
3851       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3852                               iv_name="disk/%d" % disk_index,
3853                               logical_id=(file_driver,
3854                                           "%s/disk%d" % (file_storage_dir,
3855                                                          idx)))
3856       disks.append(disk_dev)
3857   else:
3858     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3859   return disks
3860
3861
3862 def _GetInstanceInfoText(instance):
3863   """Compute that text that should be added to the disk's metadata.
3864
3865   """
3866   return "originstname+%s" % instance.name
3867
3868
3869 def _CreateDisks(lu, instance):
3870   """Create all disks for an instance.
3871
3872   This abstracts away some work from AddInstance.
3873
3874   @type lu: L{LogicalUnit}
3875   @param lu: the logical unit on whose behalf we execute
3876   @type instance: L{objects.Instance}
3877   @param instance: the instance whose disks we should create
3878   @rtype: boolean
3879   @return: the success of the creation
3880
3881   """
3882   info = _GetInstanceInfoText(instance)
3883   pnode = instance.primary_node
3884
3885   if instance.disk_template == constants.DT_FILE:
3886     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3887     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
3888
3889     if result.failed or not result.data:
3890       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
3891
3892     if not result.data[0]:
3893       raise errors.OpExecError("Failed to create directory '%s'" %
3894                                file_storage_dir)
3895
3896   # Note: this needs to be kept in sync with adding of disks in
3897   # LUSetInstanceParams
3898   for device in instance.disks:
3899     logging.info("Creating volume %s for instance %s",
3900                  device.iv_name, instance.name)
3901     #HARDCODE
3902     for node in instance.all_nodes:
3903       f_create = node == pnode
3904       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
3905
3906
3907 def _RemoveDisks(lu, instance):
3908   """Remove all disks for an instance.
3909
3910   This abstracts away some work from `AddInstance()` and
3911   `RemoveInstance()`. Note that in case some of the devices couldn't
3912   be removed, the removal will continue with the other ones (compare
3913   with `_CreateDisks()`).
3914
3915   @type lu: L{LogicalUnit}
3916   @param lu: the logical unit on whose behalf we execute
3917   @type instance: L{objects.Instance}
3918   @param instance: the instance whose disks we should remove
3919   @rtype: boolean
3920   @return: the success of the removal
3921
3922   """
3923   logging.info("Removing block devices for instance %s", instance.name)
3924
3925   result = True
3926   for device in instance.disks:
3927     for node, disk in device.ComputeNodeTree(instance.primary_node):
3928       lu.cfg.SetDiskID(disk, node)
3929       result = lu.rpc.call_blockdev_remove(node, disk)
3930       if result.failed or not result.data:
3931         lu.proc.LogWarning("Could not remove block device %s on node %s,"
3932                            " continuing anyway", device.iv_name, node)
3933         result = False
3934
3935   if instance.disk_template == constants.DT_FILE:
3936     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3937     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3938                                                  file_storage_dir)
3939     if result.failed or not result.data:
3940       logging.error("Could not remove directory '%s'", file_storage_dir)
3941       result = False
3942
3943   return result
3944
3945
3946 def _ComputeDiskSize(disk_template, disks):
3947   """Compute disk size requirements in the volume group
3948
3949   """
3950   # Required free disk space as a function of disk and swap space
3951   req_size_dict = {
3952     constants.DT_DISKLESS: None,
3953     constants.DT_PLAIN: sum(d["size"] for d in disks),
3954     # 128 MB are added for drbd metadata for each disk
3955     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3956     constants.DT_FILE: None,
3957   }
3958
3959   if disk_template not in req_size_dict:
3960     raise errors.ProgrammerError("Disk template '%s' size requirement"
3961                                  " is unknown" %  disk_template)
3962
3963   return req_size_dict[disk_template]
3964
3965
3966 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3967   """Hypervisor parameter validation.
3968
3969   This function abstract the hypervisor parameter validation to be
3970   used in both instance create and instance modify.
3971
3972   @type lu: L{LogicalUnit}
3973   @param lu: the logical unit for which we check
3974   @type nodenames: list
3975   @param nodenames: the list of nodes on which we should check
3976   @type hvname: string
3977   @param hvname: the name of the hypervisor we should use
3978   @type hvparams: dict
3979   @param hvparams: the parameters which we need to check
3980   @raise errors.OpPrereqError: if the parameters are not valid
3981
3982   """
3983   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3984                                                   hvname,
3985                                                   hvparams)
3986   for node in nodenames:
3987     info = hvinfo[node]
3988     info.Raise()
3989     if not info.data or not isinstance(info.data, (tuple, list)):
3990       raise errors.OpPrereqError("Cannot get current information"
3991                                  " from node '%s' (%s)" % (node, info.data))
3992     if not info.data[0]:
3993       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3994                                  " %s" % info.data[1])
3995
3996
3997 class LUCreateInstance(LogicalUnit):
3998   """Create an instance.
3999
4000   """
4001   HPATH = "instance-add"
4002   HTYPE = constants.HTYPE_INSTANCE
4003   _OP_REQP = ["instance_name", "disks", "disk_template",
4004               "mode", "start",
4005               "wait_for_sync", "ip_check", "nics",
4006               "hvparams", "beparams"]
4007   REQ_BGL = False
4008
4009   def _ExpandNode(self, node):
4010     """Expands and checks one node name.
4011
4012     """
4013     node_full = self.cfg.ExpandNodeName(node)
4014     if node_full is None:
4015       raise errors.OpPrereqError("Unknown node %s" % node)
4016     return node_full
4017
4018   def ExpandNames(self):
4019     """ExpandNames for CreateInstance.
4020
4021     Figure out the right locks for instance creation.
4022
4023     """
4024     self.needed_locks = {}
4025
4026     # set optional parameters to none if they don't exist
4027     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4028       if not hasattr(self.op, attr):
4029         setattr(self.op, attr, None)
4030
4031     # cheap checks, mostly valid constants given
4032
4033     # verify creation mode
4034     if self.op.mode not in (constants.INSTANCE_CREATE,
4035                             constants.INSTANCE_IMPORT):
4036       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4037                                  self.op.mode)
4038
4039     # disk template and mirror node verification
4040     if self.op.disk_template not in constants.DISK_TEMPLATES:
4041       raise errors.OpPrereqError("Invalid disk template name")
4042
4043     if self.op.hypervisor is None:
4044       self.op.hypervisor = self.cfg.GetHypervisorType()
4045
4046     cluster = self.cfg.GetClusterInfo()
4047     enabled_hvs = cluster.enabled_hypervisors
4048     if self.op.hypervisor not in enabled_hvs:
4049       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4050                                  " cluster (%s)" % (self.op.hypervisor,
4051                                   ",".join(enabled_hvs)))
4052
4053     # check hypervisor parameter syntax (locally)
4054
4055     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4056                                   self.op.hvparams)
4057     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4058     hv_type.CheckParameterSyntax(filled_hvp)
4059
4060     # fill and remember the beparams dict
4061     utils.CheckBEParams(self.op.beparams)
4062     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4063                                     self.op.beparams)
4064
4065     #### instance parameters check
4066
4067     # instance name verification
4068     hostname1 = utils.HostInfo(self.op.instance_name)
4069     self.op.instance_name = instance_name = hostname1.name
4070
4071     # this is just a preventive check, but someone might still add this
4072     # instance in the meantime, and creation will fail at lock-add time
4073     if instance_name in self.cfg.GetInstanceList():
4074       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4075                                  instance_name)
4076
4077     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4078
4079     # NIC buildup
4080     self.nics = []
4081     for nic in self.op.nics:
4082       # ip validity checks
4083       ip = nic.get("ip", None)
4084       if ip is None or ip.lower() == "none":
4085         nic_ip = None
4086       elif ip.lower() == constants.VALUE_AUTO:
4087         nic_ip = hostname1.ip
4088       else:
4089         if not utils.IsValidIP(ip):
4090           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4091                                      " like a valid IP" % ip)
4092         nic_ip = ip
4093
4094       # MAC address verification
4095       mac = nic.get("mac", constants.VALUE_AUTO)
4096       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4097         if not utils.IsValidMac(mac.lower()):
4098           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4099                                      mac)
4100       # bridge verification
4101       bridge = nic.get("bridge", self.cfg.GetDefBridge())
4102       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4103
4104     # disk checks/pre-build
4105     self.disks = []
4106     for disk in self.op.disks:
4107       mode = disk.get("mode", constants.DISK_RDWR)
4108       if mode not in constants.DISK_ACCESS_SET:
4109         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4110                                    mode)
4111       size = disk.get("size", None)
4112       if size is None:
4113         raise errors.OpPrereqError("Missing disk size")
4114       try:
4115         size = int(size)
4116       except ValueError:
4117         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4118       self.disks.append({"size": size, "mode": mode})
4119
4120     # used in CheckPrereq for ip ping check
4121     self.check_ip = hostname1.ip
4122
4123     # file storage checks
4124     if (self.op.file_driver and
4125         not self.op.file_driver in constants.FILE_DRIVER):
4126       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4127                                  self.op.file_driver)
4128
4129     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4130       raise errors.OpPrereqError("File storage directory path not absolute")
4131
4132     ### Node/iallocator related checks
4133     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4134       raise errors.OpPrereqError("One and only one of iallocator and primary"
4135                                  " node must be given")
4136
4137     if self.op.iallocator:
4138       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4139     else:
4140       self.op.pnode = self._ExpandNode(self.op.pnode)
4141       nodelist = [self.op.pnode]
4142       if self.op.snode is not None:
4143         self.op.snode = self._ExpandNode(self.op.snode)
4144         nodelist.append(self.op.snode)
4145       self.needed_locks[locking.LEVEL_NODE] = nodelist
4146
4147     # in case of import lock the source node too
4148     if self.op.mode == constants.INSTANCE_IMPORT:
4149       src_node = getattr(self.op, "src_node", None)
4150       src_path = getattr(self.op, "src_path", None)
4151
4152       if src_path is None:
4153         self.op.src_path = src_path = self.op.instance_name
4154
4155       if src_node is None:
4156         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4157         self.op.src_node = None
4158         if os.path.isabs(src_path):
4159           raise errors.OpPrereqError("Importing an instance from an absolute"
4160                                      " path requires a source node option.")
4161       else:
4162         self.op.src_node = src_node = self._ExpandNode(src_node)
4163         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4164           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4165         if not os.path.isabs(src_path):
4166           self.op.src_path = src_path = \
4167             os.path.join(constants.EXPORT_DIR, src_path)
4168
4169     else: # INSTANCE_CREATE
4170       if getattr(self.op, "os_type", None) is None:
4171         raise errors.OpPrereqError("No guest OS specified")
4172
4173   def _RunAllocator(self):
4174     """Run the allocator based on input opcode.
4175
4176     """
4177     nics = [n.ToDict() for n in self.nics]
4178     ial = IAllocator(self,
4179                      mode=constants.IALLOCATOR_MODE_ALLOC,
4180                      name=self.op.instance_name,
4181                      disk_template=self.op.disk_template,
4182                      tags=[],
4183                      os=self.op.os_type,
4184                      vcpus=self.be_full[constants.BE_VCPUS],
4185                      mem_size=self.be_full[constants.BE_MEMORY],
4186                      disks=self.disks,
4187                      nics=nics,
4188                      hypervisor=self.op.hypervisor,
4189                      )
4190
4191     ial.Run(self.op.iallocator)
4192
4193     if not ial.success:
4194       raise errors.OpPrereqError("Can't compute nodes using"
4195                                  " iallocator '%s': %s" % (self.op.iallocator,
4196                                                            ial.info))
4197     if len(ial.nodes) != ial.required_nodes:
4198       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4199                                  " of nodes (%s), required %s" %
4200                                  (self.op.iallocator, len(ial.nodes),
4201                                   ial.required_nodes))
4202     self.op.pnode = ial.nodes[0]
4203     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4204                  self.op.instance_name, self.op.iallocator,
4205                  ", ".join(ial.nodes))
4206     if ial.required_nodes == 2:
4207       self.op.snode = ial.nodes[1]
4208
4209   def BuildHooksEnv(self):
4210     """Build hooks env.
4211
4212     This runs on master, primary and secondary nodes of the instance.
4213
4214     """
4215     env = {
4216       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
4217       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
4218       "INSTANCE_ADD_MODE": self.op.mode,
4219       }
4220     if self.op.mode == constants.INSTANCE_IMPORT:
4221       env["INSTANCE_SRC_NODE"] = self.op.src_node
4222       env["INSTANCE_SRC_PATH"] = self.op.src_path
4223       env["INSTANCE_SRC_IMAGES"] = self.src_images
4224
4225     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
4226       primary_node=self.op.pnode,
4227       secondary_nodes=self.secondaries,
4228       status=self.instance_status,
4229       os_type=self.op.os_type,
4230       memory=self.be_full[constants.BE_MEMORY],
4231       vcpus=self.be_full[constants.BE_VCPUS],
4232       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4233     ))
4234
4235     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4236           self.secondaries)
4237     return env, nl, nl
4238
4239
4240   def CheckPrereq(self):
4241     """Check prerequisites.
4242
4243     """
4244     if (not self.cfg.GetVGName() and
4245         self.op.disk_template not in constants.DTS_NOT_LVM):
4246       raise errors.OpPrereqError("Cluster does not support lvm-based"
4247                                  " instances")
4248
4249
4250     if self.op.mode == constants.INSTANCE_IMPORT:
4251       src_node = self.op.src_node
4252       src_path = self.op.src_path
4253
4254       if src_node is None:
4255         exp_list = self.rpc.call_export_list(
4256           self.acquired_locks[locking.LEVEL_NODE])
4257         found = False
4258         for node in exp_list:
4259           if not exp_list[node].failed and src_path in exp_list[node].data:
4260             found = True
4261             self.op.src_node = src_node = node
4262             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4263                                                        src_path)
4264             break
4265         if not found:
4266           raise errors.OpPrereqError("No export found for relative path %s" %
4267                                       src_path)
4268
4269       _CheckNodeOnline(self, src_node)
4270       result = self.rpc.call_export_info(src_node, src_path)
4271       result.Raise()
4272       if not result.data:
4273         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4274
4275       export_info = result.data
4276       if not export_info.has_section(constants.INISECT_EXP):
4277         raise errors.ProgrammerError("Corrupted export config")
4278
4279       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4280       if (int(ei_version) != constants.EXPORT_VERSION):
4281         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4282                                    (ei_version, constants.EXPORT_VERSION))
4283
4284       # Check that the new instance doesn't have less disks than the export
4285       instance_disks = len(self.disks)
4286       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4287       if instance_disks < export_disks:
4288         raise errors.OpPrereqError("Not enough disks to import."
4289                                    " (instance: %d, export: %d)" %
4290                                    (instance_disks, export_disks))
4291
4292       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4293       disk_images = []
4294       for idx in range(export_disks):
4295         option = 'disk%d_dump' % idx
4296         if export_info.has_option(constants.INISECT_INS, option):
4297           # FIXME: are the old os-es, disk sizes, etc. useful?
4298           export_name = export_info.get(constants.INISECT_INS, option)
4299           image = os.path.join(src_path, export_name)
4300           disk_images.append(image)
4301         else:
4302           disk_images.append(False)
4303
4304       self.src_images = disk_images
4305
4306       old_name = export_info.get(constants.INISECT_INS, 'name')
4307       # FIXME: int() here could throw a ValueError on broken exports
4308       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4309       if self.op.instance_name == old_name:
4310         for idx, nic in enumerate(self.nics):
4311           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4312             nic_mac_ini = 'nic%d_mac' % idx
4313             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4314
4315     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4316     if self.op.start and not self.op.ip_check:
4317       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4318                                  " adding an instance in start mode")
4319
4320     if self.op.ip_check:
4321       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4322         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4323                                    (self.check_ip, self.op.instance_name))
4324
4325     #### allocator run
4326
4327     if self.op.iallocator is not None:
4328       self._RunAllocator()
4329
4330     #### node related checks
4331
4332     # check primary node
4333     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4334     assert self.pnode is not None, \
4335       "Cannot retrieve locked node %s" % self.op.pnode
4336     if pnode.offline:
4337       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4338                                  pnode.name)
4339
4340     self.secondaries = []
4341
4342     # mirror node verification
4343     if self.op.disk_template in constants.DTS_NET_MIRROR:
4344       if self.op.snode is None:
4345         raise errors.OpPrereqError("The networked disk templates need"
4346                                    " a mirror node")
4347       if self.op.snode == pnode.name:
4348         raise errors.OpPrereqError("The secondary node cannot be"
4349                                    " the primary node.")
4350       self.secondaries.append(self.op.snode)
4351       _CheckNodeOnline(self, self.op.snode)
4352
4353     nodenames = [pnode.name] + self.secondaries
4354
4355     req_size = _ComputeDiskSize(self.op.disk_template,
4356                                 self.disks)
4357
4358     # Check lv size requirements
4359     if req_size is not None:
4360       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4361                                          self.op.hypervisor)
4362       for node in nodenames:
4363         info = nodeinfo[node]
4364         info.Raise()
4365         info = info.data
4366         if not info:
4367           raise errors.OpPrereqError("Cannot get current information"
4368                                      " from node '%s'" % node)
4369         vg_free = info.get('vg_free', None)
4370         if not isinstance(vg_free, int):
4371           raise errors.OpPrereqError("Can't compute free disk space on"
4372                                      " node %s" % node)
4373         if req_size > info['vg_free']:
4374           raise errors.OpPrereqError("Not enough disk space on target node %s."
4375                                      " %d MB available, %d MB required" %
4376                                      (node, info['vg_free'], req_size))
4377
4378     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4379
4380     # os verification
4381     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4382     result.Raise()
4383     if not isinstance(result.data, objects.OS):
4384       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4385                                  " primary node"  % self.op.os_type)
4386
4387     # bridge check on primary node
4388     bridges = [n.bridge for n in self.nics]
4389     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4390     result.Raise()
4391     if not result.data:
4392       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4393                                  " exist on destination node '%s'" %
4394                                  (",".join(bridges), pnode.name))
4395
4396     # memory check on primary node
4397     if self.op.start:
4398       _CheckNodeFreeMemory(self, self.pnode.name,
4399                            "creating instance %s" % self.op.instance_name,
4400                            self.be_full[constants.BE_MEMORY],
4401                            self.op.hypervisor)
4402
4403     if self.op.start:
4404       self.instance_status = 'up'
4405     else:
4406       self.instance_status = 'down'
4407
4408   def Exec(self, feedback_fn):
4409     """Create and add the instance to the cluster.
4410
4411     """
4412     instance = self.op.instance_name
4413     pnode_name = self.pnode.name
4414
4415     for nic in self.nics:
4416       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4417         nic.mac = self.cfg.GenerateMAC()
4418
4419     ht_kind = self.op.hypervisor
4420     if ht_kind in constants.HTS_REQ_PORT:
4421       network_port = self.cfg.AllocatePort()
4422     else:
4423       network_port = None
4424
4425     ##if self.op.vnc_bind_address is None:
4426     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4427
4428     # this is needed because os.path.join does not accept None arguments
4429     if self.op.file_storage_dir is None:
4430       string_file_storage_dir = ""
4431     else:
4432       string_file_storage_dir = self.op.file_storage_dir
4433
4434     # build the full file storage dir path
4435     file_storage_dir = os.path.normpath(os.path.join(
4436                                         self.cfg.GetFileStorageDir(),
4437                                         string_file_storage_dir, instance))
4438
4439
4440     disks = _GenerateDiskTemplate(self,
4441                                   self.op.disk_template,
4442                                   instance, pnode_name,
4443                                   self.secondaries,
4444                                   self.disks,
4445                                   file_storage_dir,
4446                                   self.op.file_driver,
4447                                   0)
4448
4449     iobj = objects.Instance(name=instance, os=self.op.os_type,
4450                             primary_node=pnode_name,
4451                             nics=self.nics, disks=disks,
4452                             disk_template=self.op.disk_template,
4453                             status=self.instance_status,
4454                             network_port=network_port,
4455                             beparams=self.op.beparams,
4456                             hvparams=self.op.hvparams,
4457                             hypervisor=self.op.hypervisor,
4458                             )
4459
4460     feedback_fn("* creating instance disks...")
4461     try:
4462       _CreateDisks(self, iobj)
4463     except errors.OpExecError:
4464       self.LogWarning("Device creation failed, reverting...")
4465       try:
4466         _RemoveDisks(self, iobj)
4467       finally:
4468         self.cfg.ReleaseDRBDMinors(instance)
4469         raise
4470
4471     feedback_fn("adding instance %s to cluster config" % instance)
4472
4473     self.cfg.AddInstance(iobj)
4474     # Declare that we don't want to remove the instance lock anymore, as we've
4475     # added the instance to the config
4476     del self.remove_locks[locking.LEVEL_INSTANCE]
4477     # Remove the temp. assignements for the instance's drbds
4478     self.cfg.ReleaseDRBDMinors(instance)
4479     # Unlock all the nodes
4480     if self.op.mode == constants.INSTANCE_IMPORT:
4481       nodes_keep = [self.op.src_node]
4482       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4483                        if node != self.op.src_node]
4484       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4485       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4486     else:
4487       self.context.glm.release(locking.LEVEL_NODE)
4488       del self.acquired_locks[locking.LEVEL_NODE]
4489
4490     if self.op.wait_for_sync:
4491       disk_abort = not _WaitForSync(self, iobj)
4492     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4493       # make sure the disks are not degraded (still sync-ing is ok)
4494       time.sleep(15)
4495       feedback_fn("* checking mirrors status")
4496       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4497     else:
4498       disk_abort = False
4499
4500     if disk_abort:
4501       _RemoveDisks(self, iobj)
4502       self.cfg.RemoveInstance(iobj.name)
4503       # Make sure the instance lock gets removed
4504       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4505       raise errors.OpExecError("There are some degraded disks for"
4506                                " this instance")
4507
4508     feedback_fn("creating os for instance %s on node %s" %
4509                 (instance, pnode_name))
4510
4511     if iobj.disk_template != constants.DT_DISKLESS:
4512       if self.op.mode == constants.INSTANCE_CREATE:
4513         feedback_fn("* running the instance OS create scripts...")
4514         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4515         msg = result.RemoteFailMsg()
4516         if msg:
4517           raise errors.OpExecError("Could not add os for instance %s"
4518                                    " on node %s: %s" %
4519                                    (instance, pnode_name, msg))
4520
4521       elif self.op.mode == constants.INSTANCE_IMPORT:
4522         feedback_fn("* running the instance OS import scripts...")
4523         src_node = self.op.src_node
4524         src_images = self.src_images
4525         cluster_name = self.cfg.GetClusterName()
4526         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4527                                                          src_node, src_images,
4528                                                          cluster_name)
4529         import_result.Raise()
4530         for idx, result in enumerate(import_result.data):
4531           if not result:
4532             self.LogWarning("Could not import the image %s for instance"
4533                             " %s, disk %d, on node %s" %
4534                             (src_images[idx], instance, idx, pnode_name))
4535       else:
4536         # also checked in the prereq part
4537         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4538                                      % self.op.mode)
4539
4540     if self.op.start:
4541       logging.info("Starting instance %s on node %s", instance, pnode_name)
4542       feedback_fn("* starting instance...")
4543       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4544       msg = result.RemoteFailMsg()
4545       if msg:
4546         raise errors.OpExecError("Could not start instance: %s" % msg)
4547
4548
4549 class LUConnectConsole(NoHooksLU):
4550   """Connect to an instance's console.
4551
4552   This is somewhat special in that it returns the command line that
4553   you need to run on the master node in order to connect to the
4554   console.
4555
4556   """
4557   _OP_REQP = ["instance_name"]
4558   REQ_BGL = False
4559
4560   def ExpandNames(self):
4561     self._ExpandAndLockInstance()
4562
4563   def CheckPrereq(self):
4564     """Check prerequisites.
4565
4566     This checks that the instance is in the cluster.
4567
4568     """
4569     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4570     assert self.instance is not None, \
4571       "Cannot retrieve locked instance %s" % self.op.instance_name
4572     _CheckNodeOnline(self, self.instance.primary_node)
4573
4574   def Exec(self, feedback_fn):
4575     """Connect to the console of an instance
4576
4577     """
4578     instance = self.instance
4579     node = instance.primary_node
4580
4581     node_insts = self.rpc.call_instance_list([node],
4582                                              [instance.hypervisor])[node]
4583     node_insts.Raise()
4584
4585     if instance.name not in node_insts.data:
4586       raise errors.OpExecError("Instance %s is not running." % instance.name)
4587
4588     logging.debug("Connecting to console of %s on %s", instance.name, node)
4589
4590     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4591     console_cmd = hyper.GetShellCommandForConsole(instance)
4592
4593     # build ssh cmdline
4594     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4595
4596
4597 class LUReplaceDisks(LogicalUnit):
4598   """Replace the disks of an instance.
4599
4600   """
4601   HPATH = "mirrors-replace"
4602   HTYPE = constants.HTYPE_INSTANCE
4603   _OP_REQP = ["instance_name", "mode", "disks"]
4604   REQ_BGL = False
4605
4606   def CheckArguments(self):
4607     if not hasattr(self.op, "remote_node"):
4608       self.op.remote_node = None
4609     if not hasattr(self.op, "iallocator"):
4610       self.op.iallocator = None
4611
4612     # check for valid parameter combination
4613     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4614     if self.op.mode == constants.REPLACE_DISK_CHG:
4615       if cnt == 2:
4616         raise errors.OpPrereqError("When changing the secondary either an"
4617                                    " iallocator script must be used or the"
4618                                    " new node given")
4619       elif cnt == 0:
4620         raise errors.OpPrereqError("Give either the iallocator or the new"
4621                                    " secondary, not both")
4622     else: # not replacing the secondary
4623       if cnt != 2:
4624         raise errors.OpPrereqError("The iallocator and new node options can"
4625                                    " be used only when changing the"
4626                                    " secondary node")
4627
4628   def ExpandNames(self):
4629     self._ExpandAndLockInstance()
4630
4631     if self.op.iallocator is not None:
4632       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4633     elif self.op.remote_node is not None:
4634       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4635       if remote_node is None:
4636         raise errors.OpPrereqError("Node '%s' not known" %
4637                                    self.op.remote_node)
4638       self.op.remote_node = remote_node
4639       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4640       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4641     else:
4642       self.needed_locks[locking.LEVEL_NODE] = []
4643       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4644
4645   def DeclareLocks(self, level):
4646     # If we're not already locking all nodes in the set we have to declare the
4647     # instance's primary/secondary nodes.
4648     if (level == locking.LEVEL_NODE and
4649         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4650       self._LockInstancesNodes()
4651
4652   def _RunAllocator(self):
4653     """Compute a new secondary node using an IAllocator.
4654
4655     """
4656     ial = IAllocator(self,
4657                      mode=constants.IALLOCATOR_MODE_RELOC,
4658                      name=self.op.instance_name,
4659                      relocate_from=[self.sec_node])
4660
4661     ial.Run(self.op.iallocator)
4662
4663     if not ial.success:
4664       raise errors.OpPrereqError("Can't compute nodes using"
4665                                  " iallocator '%s': %s" % (self.op.iallocator,
4666                                                            ial.info))
4667     if len(ial.nodes) != ial.required_nodes:
4668       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4669                                  " of nodes (%s), required %s" %
4670                                  (len(ial.nodes), ial.required_nodes))
4671     self.op.remote_node = ial.nodes[0]
4672     self.LogInfo("Selected new secondary for the instance: %s",
4673                  self.op.remote_node)
4674
4675   def BuildHooksEnv(self):
4676     """Build hooks env.
4677
4678     This runs on the master, the primary and all the secondaries.
4679
4680     """
4681     env = {
4682       "MODE": self.op.mode,
4683       "NEW_SECONDARY": self.op.remote_node,
4684       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4685       }
4686     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4687     nl = [
4688       self.cfg.GetMasterNode(),
4689       self.instance.primary_node,
4690       ]
4691     if self.op.remote_node is not None:
4692       nl.append(self.op.remote_node)
4693     return env, nl, nl
4694
4695   def CheckPrereq(self):
4696     """Check prerequisites.
4697
4698     This checks that the instance is in the cluster.
4699
4700     """
4701     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4702     assert instance is not None, \
4703       "Cannot retrieve locked instance %s" % self.op.instance_name
4704     self.instance = instance
4705
4706     if instance.disk_template != constants.DT_DRBD8:
4707       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4708                                  " instances")
4709
4710     if len(instance.secondary_nodes) != 1:
4711       raise errors.OpPrereqError("The instance has a strange layout,"
4712                                  " expected one secondary but found %d" %
4713                                  len(instance.secondary_nodes))
4714
4715     self.sec_node = instance.secondary_nodes[0]
4716
4717     if self.op.iallocator is not None:
4718       self._RunAllocator()
4719
4720     remote_node = self.op.remote_node
4721     if remote_node is not None:
4722       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4723       assert self.remote_node_info is not None, \
4724         "Cannot retrieve locked node %s" % remote_node
4725     else:
4726       self.remote_node_info = None
4727     if remote_node == instance.primary_node:
4728       raise errors.OpPrereqError("The specified node is the primary node of"
4729                                  " the instance.")
4730     elif remote_node == self.sec_node:
4731       raise errors.OpPrereqError("The specified node is already the"
4732                                  " secondary node of the instance.")
4733
4734     if self.op.mode == constants.REPLACE_DISK_PRI:
4735       n1 = self.tgt_node = instance.primary_node
4736       n2 = self.oth_node = self.sec_node
4737     elif self.op.mode == constants.REPLACE_DISK_SEC:
4738       n1 = self.tgt_node = self.sec_node
4739       n2 = self.oth_node = instance.primary_node
4740     elif self.op.mode == constants.REPLACE_DISK_CHG:
4741       n1 = self.new_node = remote_node
4742       n2 = self.oth_node = instance.primary_node
4743       self.tgt_node = self.sec_node
4744     else:
4745       raise errors.ProgrammerError("Unhandled disk replace mode")
4746
4747     _CheckNodeOnline(self, n1)
4748     _CheckNodeOnline(self, n2)
4749
4750     if not self.op.disks:
4751       self.op.disks = range(len(instance.disks))
4752
4753     for disk_idx in self.op.disks:
4754       instance.FindDisk(disk_idx)
4755
4756   def _ExecD8DiskOnly(self, feedback_fn):
4757     """Replace a disk on the primary or secondary for dbrd8.
4758
4759     The algorithm for replace is quite complicated:
4760
4761       1. for each disk to be replaced:
4762
4763         1. create new LVs on the target node with unique names
4764         1. detach old LVs from the drbd device
4765         1. rename old LVs to name_replaced.<time_t>
4766         1. rename new LVs to old LVs
4767         1. attach the new LVs (with the old names now) to the drbd device
4768
4769       1. wait for sync across all devices
4770
4771       1. for each modified disk:
4772
4773         1. remove old LVs (which have the name name_replaces.<time_t>)
4774
4775     Failures are not very well handled.
4776
4777     """
4778     steps_total = 6
4779     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4780     instance = self.instance
4781     iv_names = {}
4782     vgname = self.cfg.GetVGName()
4783     # start of work
4784     cfg = self.cfg
4785     tgt_node = self.tgt_node
4786     oth_node = self.oth_node
4787
4788     # Step: check device activation
4789     self.proc.LogStep(1, steps_total, "check device existence")
4790     info("checking volume groups")
4791     my_vg = cfg.GetVGName()
4792     results = self.rpc.call_vg_list([oth_node, tgt_node])
4793     if not results:
4794       raise errors.OpExecError("Can't list volume groups on the nodes")
4795     for node in oth_node, tgt_node:
4796       res = results[node]
4797       if res.failed or not res.data or my_vg not in res.data:
4798         raise errors.OpExecError("Volume group '%s' not found on %s" %
4799                                  (my_vg, node))
4800     for idx, dev in enumerate(instance.disks):
4801       if idx not in self.op.disks:
4802         continue
4803       for node in tgt_node, oth_node:
4804         info("checking disk/%d on %s" % (idx, node))
4805         cfg.SetDiskID(dev, node)
4806         if not self.rpc.call_blockdev_find(node, dev):
4807           raise errors.OpExecError("Can't find disk/%d on node %s" %
4808                                    (idx, node))
4809
4810     # Step: check other node consistency
4811     self.proc.LogStep(2, steps_total, "check peer consistency")
4812     for idx, dev in enumerate(instance.disks):
4813       if idx not in self.op.disks:
4814         continue
4815       info("checking disk/%d consistency on %s" % (idx, oth_node))
4816       if not _CheckDiskConsistency(self, dev, oth_node,
4817                                    oth_node==instance.primary_node):
4818         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4819                                  " to replace disks on this node (%s)" %
4820                                  (oth_node, tgt_node))
4821
4822     # Step: create new storage
4823     self.proc.LogStep(3, steps_total, "allocate new storage")
4824     for idx, dev in enumerate(instance.disks):
4825       if idx not in self.op.disks:
4826         continue
4827       size = dev.size
4828       cfg.SetDiskID(dev, tgt_node)
4829       lv_names = [".disk%d_%s" % (idx, suf)
4830                   for suf in ["data", "meta"]]
4831       names = _GenerateUniqueNames(self, lv_names)
4832       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4833                              logical_id=(vgname, names[0]))
4834       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4835                              logical_id=(vgname, names[1]))
4836       new_lvs = [lv_data, lv_meta]
4837       old_lvs = dev.children
4838       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4839       info("creating new local storage on %s for %s" %
4840            (tgt_node, dev.iv_name))
4841       # we pass force_create=True to force the LVM creation
4842       for new_lv in new_lvs:
4843         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
4844                         _GetInstanceInfoText(instance), False)
4845
4846     # Step: for each lv, detach+rename*2+attach
4847     self.proc.LogStep(4, steps_total, "change drbd configuration")
4848     for dev, old_lvs, new_lvs in iv_names.itervalues():
4849       info("detaching %s drbd from local storage" % dev.iv_name)
4850       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4851       result.Raise()
4852       if not result.data:
4853         raise errors.OpExecError("Can't detach drbd from local storage on node"
4854                                  " %s for device %s" % (tgt_node, dev.iv_name))
4855       #dev.children = []
4856       #cfg.Update(instance)
4857
4858       # ok, we created the new LVs, so now we know we have the needed
4859       # storage; as such, we proceed on the target node to rename
4860       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4861       # using the assumption that logical_id == physical_id (which in
4862       # turn is the unique_id on that node)
4863
4864       # FIXME(iustin): use a better name for the replaced LVs
4865       temp_suffix = int(time.time())
4866       ren_fn = lambda d, suff: (d.physical_id[0],
4867                                 d.physical_id[1] + "_replaced-%s" % suff)
4868       # build the rename list based on what LVs exist on the node
4869       rlist = []
4870       for to_ren in old_lvs:
4871         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4872         if not find_res.failed and find_res.data is not None: # device exists
4873           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4874
4875       info("renaming the old LVs on the target node")
4876       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4877       result.Raise()
4878       if not result.data:
4879         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4880       # now we rename the new LVs to the old LVs
4881       info("renaming the new LVs on the target node")
4882       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4883       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4884       result.Raise()
4885       if not result.data:
4886         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4887
4888       for old, new in zip(old_lvs, new_lvs):
4889         new.logical_id = old.logical_id
4890         cfg.SetDiskID(new, tgt_node)
4891
4892       for disk in old_lvs:
4893         disk.logical_id = ren_fn(disk, temp_suffix)
4894         cfg.SetDiskID(disk, tgt_node)
4895
4896       # now that the new lvs have the old name, we can add them to the device
4897       info("adding new mirror component on %s" % tgt_node)
4898       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4899       if result.failed or not result.data:
4900         for new_lv in new_lvs:
4901           result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4902           if result.failed or not result.data:
4903             warning("Can't rollback device %s", hint="manually cleanup unused"
4904                     " logical volumes")
4905         raise errors.OpExecError("Can't add local storage to drbd")
4906
4907       dev.children = new_lvs
4908       cfg.Update(instance)
4909
4910     # Step: wait for sync
4911
4912     # this can fail as the old devices are degraded and _WaitForSync
4913     # does a combined result over all disks, so we don't check its
4914     # return value
4915     self.proc.LogStep(5, steps_total, "sync devices")
4916     _WaitForSync(self, instance, unlock=True)
4917
4918     # so check manually all the devices
4919     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4920       cfg.SetDiskID(dev, instance.primary_node)
4921       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4922       if result.failed or result.data[5]:
4923         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4924
4925     # Step: remove old storage
4926     self.proc.LogStep(6, steps_total, "removing old storage")
4927     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4928       info("remove logical volumes for %s" % name)
4929       for lv in old_lvs:
4930         cfg.SetDiskID(lv, tgt_node)
4931         result = self.rpc.call_blockdev_remove(tgt_node, lv)
4932         if result.failed or not result.data:
4933           warning("Can't remove old LV", hint="manually remove unused LVs")
4934           continue
4935
4936   def _ExecD8Secondary(self, feedback_fn):
4937     """Replace the secondary node for drbd8.
4938
4939     The algorithm for replace is quite complicated:
4940       - for all disks of the instance:
4941         - create new LVs on the new node with same names
4942         - shutdown the drbd device on the old secondary
4943         - disconnect the drbd network on the primary
4944         - create the drbd device on the new secondary
4945         - network attach the drbd on the primary, using an artifice:
4946           the drbd code for Attach() will connect to the network if it
4947           finds a device which is connected to the good local disks but
4948           not network enabled
4949       - wait for sync across all devices
4950       - remove all disks from the old secondary
4951
4952     Failures are not very well handled.
4953
4954     """
4955     steps_total = 6
4956     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4957     instance = self.instance
4958     iv_names = {}
4959     # start of work
4960     cfg = self.cfg
4961     old_node = self.tgt_node
4962     new_node = self.new_node
4963     pri_node = instance.primary_node
4964     nodes_ip = {
4965       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
4966       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
4967       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
4968       }
4969
4970     # Step: check device activation
4971     self.proc.LogStep(1, steps_total, "check device existence")
4972     info("checking volume groups")
4973     my_vg = cfg.GetVGName()
4974     results = self.rpc.call_vg_list([pri_node, new_node])
4975     for node in pri_node, new_node:
4976       res = results[node]
4977       if res.failed or not res.data or my_vg not in res.data:
4978         raise errors.OpExecError("Volume group '%s' not found on %s" %
4979                                  (my_vg, node))
4980     for idx, dev in enumerate(instance.disks):
4981       if idx not in self.op.disks:
4982         continue
4983       info("checking disk/%d on %s" % (idx, pri_node))
4984       cfg.SetDiskID(dev, pri_node)
4985       result = self.rpc.call_blockdev_find(pri_node, dev)
4986       result.Raise()
4987       if not result.data:
4988         raise errors.OpExecError("Can't find disk/%d on node %s" %
4989                                  (idx, pri_node))
4990
4991     # Step: check other node consistency
4992     self.proc.LogStep(2, steps_total, "check peer consistency")
4993     for idx, dev in enumerate(instance.disks):
4994       if idx not in self.op.disks:
4995         continue
4996       info("checking disk/%d consistency on %s" % (idx, pri_node))
4997       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4998         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4999                                  " unsafe to replace the secondary" %
5000                                  pri_node)
5001
5002     # Step: create new storage
5003     self.proc.LogStep(3, steps_total, "allocate new storage")
5004     for idx, dev in enumerate(instance.disks):
5005       info("adding new local storage on %s for disk/%d" %
5006            (new_node, idx))
5007       # we pass force_create=True to force LVM creation
5008       for new_lv in dev.children:
5009         _CreateBlockDev(self, new_node, instance, new_lv, True,
5010                         _GetInstanceInfoText(instance), False)
5011
5012     # Step 4: dbrd minors and drbd setups changes
5013     # after this, we must manually remove the drbd minors on both the
5014     # error and the success paths
5015     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5016                                    instance.name)
5017     logging.debug("Allocated minors %s" % (minors,))
5018     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5019     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5020       size = dev.size
5021       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5022       # create new devices on new_node; note that we create two IDs:
5023       # one without port, so the drbd will be activated without
5024       # networking information on the new node at this stage, and one
5025       # with network, for the latter activation in step 4
5026       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5027       if pri_node == o_node1:
5028         p_minor = o_minor1
5029       else:
5030         p_minor = o_minor2
5031
5032       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5033       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5034
5035       iv_names[idx] = (dev, dev.children, new_net_id)
5036       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5037                     new_net_id)
5038       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5039                               logical_id=new_alone_id,
5040                               children=dev.children)
5041       try:
5042         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5043                               _GetInstanceInfoText(instance), False)
5044       except error.BlockDeviceError:
5045         self.cfg.ReleaseDRBDMinors(instance.name)
5046         raise
5047
5048     for idx, dev in enumerate(instance.disks):
5049       # we have new devices, shutdown the drbd on the old secondary
5050       info("shutting down drbd for disk/%d on old node" % idx)
5051       cfg.SetDiskID(dev, old_node)
5052       result = self.rpc.call_blockdev_shutdown(old_node, dev)
5053       if result.failed or not result.data:
5054         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
5055                 hint="Please cleanup this device manually as soon as possible")
5056
5057     info("detaching primary drbds from the network (=> standalone)")
5058     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5059                                                instance.disks)[pri_node]
5060
5061     msg = result.RemoteFailMsg()
5062     if msg:
5063       # detaches didn't succeed (unlikely)
5064       self.cfg.ReleaseDRBDMinors(instance.name)
5065       raise errors.OpExecError("Can't detach the disks from the network on"
5066                                " old node: %s" % (msg,))
5067
5068     # if we managed to detach at least one, we update all the disks of
5069     # the instance to point to the new secondary
5070     info("updating instance configuration")
5071     for dev, _, new_logical_id in iv_names.itervalues():
5072       dev.logical_id = new_logical_id
5073       cfg.SetDiskID(dev, pri_node)
5074     cfg.Update(instance)
5075     # we can remove now the temp minors as now the new values are
5076     # written to the config file (and therefore stable)
5077     self.cfg.ReleaseDRBDMinors(instance.name)
5078
5079     # and now perform the drbd attach
5080     info("attaching primary drbds to new secondary (standalone => connected)")
5081     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5082                                            instance.disks, instance.name,
5083                                            False)
5084     for to_node, to_result in result.items():
5085       msg = to_result.RemoteFailMsg()
5086       if msg:
5087         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5088                 hint="please do a gnt-instance info to see the"
5089                 " status of disks")
5090
5091     # this can fail as the old devices are degraded and _WaitForSync
5092     # does a combined result over all disks, so we don't check its
5093     # return value
5094     self.proc.LogStep(5, steps_total, "sync devices")
5095     _WaitForSync(self, instance, unlock=True)
5096
5097     # so check manually all the devices
5098     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5099       cfg.SetDiskID(dev, pri_node)
5100       result = self.rpc.call_blockdev_find(pri_node, dev)
5101       result.Raise()
5102       if result.data[5]:
5103         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5104
5105     self.proc.LogStep(6, steps_total, "removing old storage")
5106     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5107       info("remove logical volumes for disk/%d" % idx)
5108       for lv in old_lvs:
5109         cfg.SetDiskID(lv, old_node)
5110         result = self.rpc.call_blockdev_remove(old_node, lv)
5111         if result.failed or not result.data:
5112           warning("Can't remove LV on old secondary",
5113                   hint="Cleanup stale volumes by hand")
5114
5115   def Exec(self, feedback_fn):
5116     """Execute disk replacement.
5117
5118     This dispatches the disk replacement to the appropriate handler.
5119
5120     """
5121     instance = self.instance
5122
5123     # Activate the instance disks if we're replacing them on a down instance
5124     if instance.status == "down":
5125       _StartInstanceDisks(self, instance, True)
5126
5127     if self.op.mode == constants.REPLACE_DISK_CHG:
5128       fn = self._ExecD8Secondary
5129     else:
5130       fn = self._ExecD8DiskOnly
5131
5132     ret = fn(feedback_fn)
5133
5134     # Deactivate the instance disks if we're replacing them on a down instance
5135     if instance.status == "down":
5136       _SafeShutdownInstanceDisks(self, instance)
5137
5138     return ret
5139
5140
5141 class LUGrowDisk(LogicalUnit):
5142   """Grow a disk of an instance.
5143
5144   """
5145   HPATH = "disk-grow"
5146   HTYPE = constants.HTYPE_INSTANCE
5147   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5148   REQ_BGL = False
5149
5150   def ExpandNames(self):
5151     self._ExpandAndLockInstance()
5152     self.needed_locks[locking.LEVEL_NODE] = []
5153     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5154
5155   def DeclareLocks(self, level):
5156     if level == locking.LEVEL_NODE:
5157       self._LockInstancesNodes()
5158
5159   def BuildHooksEnv(self):
5160     """Build hooks env.
5161
5162     This runs on the master, the primary and all the secondaries.
5163
5164     """
5165     env = {
5166       "DISK": self.op.disk,
5167       "AMOUNT": self.op.amount,
5168       }
5169     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5170     nl = [
5171       self.cfg.GetMasterNode(),
5172       self.instance.primary_node,
5173       ]
5174     return env, nl, nl
5175
5176   def CheckPrereq(self):
5177     """Check prerequisites.
5178
5179     This checks that the instance is in the cluster.
5180
5181     """
5182     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5183     assert instance is not None, \
5184       "Cannot retrieve locked instance %s" % self.op.instance_name
5185     nodenames = list(instance.all_nodes)
5186     for node in nodenames:
5187       _CheckNodeOnline(self, node)
5188
5189
5190     self.instance = instance
5191
5192     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5193       raise errors.OpPrereqError("Instance's disk layout does not support"
5194                                  " growing.")
5195
5196     self.disk = instance.FindDisk(self.op.disk)
5197
5198     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5199                                        instance.hypervisor)
5200     for node in nodenames:
5201       info = nodeinfo[node]
5202       if info.failed or not info.data:
5203         raise errors.OpPrereqError("Cannot get current information"
5204                                    " from node '%s'" % node)
5205       vg_free = info.data.get('vg_free', None)
5206       if not isinstance(vg_free, int):
5207         raise errors.OpPrereqError("Can't compute free disk space on"
5208                                    " node %s" % node)
5209       if self.op.amount > vg_free:
5210         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5211                                    " %d MiB available, %d MiB required" %
5212                                    (node, vg_free, self.op.amount))
5213
5214   def Exec(self, feedback_fn):
5215     """Execute disk grow.
5216
5217     """
5218     instance = self.instance
5219     disk = self.disk
5220     for node in instance.all_nodes:
5221       self.cfg.SetDiskID(disk, node)
5222       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5223       result.Raise()
5224       if (not result.data or not isinstance(result.data, (list, tuple)) or
5225           len(result.data) != 2):
5226         raise errors.OpExecError("Grow request failed to node %s" % node)
5227       elif not result.data[0]:
5228         raise errors.OpExecError("Grow request failed to node %s: %s" %
5229                                  (node, result.data[1]))
5230     disk.RecordGrow(self.op.amount)
5231     self.cfg.Update(instance)
5232     if self.op.wait_for_sync:
5233       disk_abort = not _WaitForSync(self, instance)
5234       if disk_abort:
5235         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5236                              " status.\nPlease check the instance.")
5237
5238
5239 class LUQueryInstanceData(NoHooksLU):
5240   """Query runtime instance data.
5241
5242   """
5243   _OP_REQP = ["instances", "static"]
5244   REQ_BGL = False
5245
5246   def ExpandNames(self):
5247     self.needed_locks = {}
5248     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5249
5250     if not isinstance(self.op.instances, list):
5251       raise errors.OpPrereqError("Invalid argument type 'instances'")
5252
5253     if self.op.instances:
5254       self.wanted_names = []
5255       for name in self.op.instances:
5256         full_name = self.cfg.ExpandInstanceName(name)
5257         if full_name is None:
5258           raise errors.OpPrereqError("Instance '%s' not known" % name)
5259         self.wanted_names.append(full_name)
5260       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5261     else:
5262       self.wanted_names = None
5263       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5264
5265     self.needed_locks[locking.LEVEL_NODE] = []
5266     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5267
5268   def DeclareLocks(self, level):
5269     if level == locking.LEVEL_NODE:
5270       self._LockInstancesNodes()
5271
5272   def CheckPrereq(self):
5273     """Check prerequisites.
5274
5275     This only checks the optional instance list against the existing names.
5276
5277     """
5278     if self.wanted_names is None:
5279       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5280
5281     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5282                              in self.wanted_names]
5283     return
5284
5285   def _ComputeDiskStatus(self, instance, snode, dev):
5286     """Compute block device status.
5287
5288     """
5289     static = self.op.static
5290     if not static:
5291       self.cfg.SetDiskID(dev, instance.primary_node)
5292       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5293       dev_pstatus.Raise()
5294       dev_pstatus = dev_pstatus.data
5295     else:
5296       dev_pstatus = None
5297
5298     if dev.dev_type in constants.LDS_DRBD:
5299       # we change the snode then (otherwise we use the one passed in)
5300       if dev.logical_id[0] == instance.primary_node:
5301         snode = dev.logical_id[1]
5302       else:
5303         snode = dev.logical_id[0]
5304
5305     if snode and not static:
5306       self.cfg.SetDiskID(dev, snode)
5307       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5308       dev_sstatus.Raise()
5309       dev_sstatus = dev_sstatus.data
5310     else:
5311       dev_sstatus = None
5312
5313     if dev.children:
5314       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5315                       for child in dev.children]
5316     else:
5317       dev_children = []
5318
5319     data = {
5320       "iv_name": dev.iv_name,
5321       "dev_type": dev.dev_type,
5322       "logical_id": dev.logical_id,
5323       "physical_id": dev.physical_id,
5324       "pstatus": dev_pstatus,
5325       "sstatus": dev_sstatus,
5326       "children": dev_children,
5327       "mode": dev.mode,
5328       }
5329
5330     return data
5331
5332   def Exec(self, feedback_fn):
5333     """Gather and return data"""
5334     result = {}
5335
5336     cluster = self.cfg.GetClusterInfo()
5337
5338     for instance in self.wanted_instances:
5339       if not self.op.static:
5340         remote_info = self.rpc.call_instance_info(instance.primary_node,
5341                                                   instance.name,
5342                                                   instance.hypervisor)
5343         remote_info.Raise()
5344         remote_info = remote_info.data
5345         if remote_info and "state" in remote_info:
5346           remote_state = "up"
5347         else:
5348           remote_state = "down"
5349       else:
5350         remote_state = None
5351       if instance.status == "down":
5352         config_state = "down"
5353       else:
5354         config_state = "up"
5355
5356       disks = [self._ComputeDiskStatus(instance, None, device)
5357                for device in instance.disks]
5358
5359       idict = {
5360         "name": instance.name,
5361         "config_state": config_state,
5362         "run_state": remote_state,
5363         "pnode": instance.primary_node,
5364         "snodes": instance.secondary_nodes,
5365         "os": instance.os,
5366         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5367         "disks": disks,
5368         "hypervisor": instance.hypervisor,
5369         "network_port": instance.network_port,
5370         "hv_instance": instance.hvparams,
5371         "hv_actual": cluster.FillHV(instance),
5372         "be_instance": instance.beparams,
5373         "be_actual": cluster.FillBE(instance),
5374         }
5375
5376       result[instance.name] = idict
5377
5378     return result
5379
5380
5381 class LUSetInstanceParams(LogicalUnit):
5382   """Modifies an instances's parameters.
5383
5384   """
5385   HPATH = "instance-modify"
5386   HTYPE = constants.HTYPE_INSTANCE
5387   _OP_REQP = ["instance_name"]
5388   REQ_BGL = False
5389
5390   def CheckArguments(self):
5391     if not hasattr(self.op, 'nics'):
5392       self.op.nics = []
5393     if not hasattr(self.op, 'disks'):
5394       self.op.disks = []
5395     if not hasattr(self.op, 'beparams'):
5396       self.op.beparams = {}
5397     if not hasattr(self.op, 'hvparams'):
5398       self.op.hvparams = {}
5399     self.op.force = getattr(self.op, "force", False)
5400     if not (self.op.nics or self.op.disks or
5401             self.op.hvparams or self.op.beparams):
5402       raise errors.OpPrereqError("No changes submitted")
5403
5404     utils.CheckBEParams(self.op.beparams)
5405
5406     # Disk validation
5407     disk_addremove = 0
5408     for disk_op, disk_dict in self.op.disks:
5409       if disk_op == constants.DDM_REMOVE:
5410         disk_addremove += 1
5411         continue
5412       elif disk_op == constants.DDM_ADD:
5413         disk_addremove += 1
5414       else:
5415         if not isinstance(disk_op, int):
5416           raise errors.OpPrereqError("Invalid disk index")
5417       if disk_op == constants.DDM_ADD:
5418         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5419         if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
5420           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5421         size = disk_dict.get('size', None)
5422         if size is None:
5423           raise errors.OpPrereqError("Required disk parameter size missing")
5424         try:
5425           size = int(size)
5426         except ValueError, err:
5427           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5428                                      str(err))
5429         disk_dict['size'] = size
5430       else:
5431         # modification of disk
5432         if 'size' in disk_dict:
5433           raise errors.OpPrereqError("Disk size change not possible, use"
5434                                      " grow-disk")
5435
5436     if disk_addremove > 1:
5437       raise errors.OpPrereqError("Only one disk add or remove operation"
5438                                  " supported at a time")
5439
5440     # NIC validation
5441     nic_addremove = 0
5442     for nic_op, nic_dict in self.op.nics:
5443       if nic_op == constants.DDM_REMOVE:
5444         nic_addremove += 1
5445         continue
5446       elif nic_op == constants.DDM_ADD:
5447         nic_addremove += 1
5448       else:
5449         if not isinstance(nic_op, int):
5450           raise errors.OpPrereqError("Invalid nic index")
5451
5452       # nic_dict should be a dict
5453       nic_ip = nic_dict.get('ip', None)
5454       if nic_ip is not None:
5455         if nic_ip.lower() == "none":
5456           nic_dict['ip'] = None
5457         else:
5458           if not utils.IsValidIP(nic_ip):
5459             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5460       # we can only check None bridges and assign the default one
5461       nic_bridge = nic_dict.get('bridge', None)
5462       if nic_bridge is None:
5463         nic_dict['bridge'] = self.cfg.GetDefBridge()
5464       # but we can validate MACs
5465       nic_mac = nic_dict.get('mac', None)
5466       if nic_mac is not None:
5467         if self.cfg.IsMacInUse(nic_mac):
5468           raise errors.OpPrereqError("MAC address %s already in use"
5469                                      " in cluster" % nic_mac)
5470         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5471           if not utils.IsValidMac(nic_mac):
5472             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5473     if nic_addremove > 1:
5474       raise errors.OpPrereqError("Only one NIC add or remove operation"
5475                                  " supported at a time")
5476
5477   def ExpandNames(self):
5478     self._ExpandAndLockInstance()
5479     self.needed_locks[locking.LEVEL_NODE] = []
5480     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5481
5482   def DeclareLocks(self, level):
5483     if level == locking.LEVEL_NODE:
5484       self._LockInstancesNodes()
5485
5486   def BuildHooksEnv(self):
5487     """Build hooks env.
5488
5489     This runs on the master, primary and secondaries.
5490
5491     """
5492     args = dict()
5493     if constants.BE_MEMORY in self.be_new:
5494       args['memory'] = self.be_new[constants.BE_MEMORY]
5495     if constants.BE_VCPUS in self.be_new:
5496       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5497     # FIXME: readd disk/nic changes
5498     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5499     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5500     return env, nl, nl
5501
5502   def CheckPrereq(self):
5503     """Check prerequisites.
5504
5505     This only checks the instance list against the existing names.
5506
5507     """
5508     force = self.force = self.op.force
5509
5510     # checking the new params on the primary/secondary nodes
5511
5512     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5513     assert self.instance is not None, \
5514       "Cannot retrieve locked instance %s" % self.op.instance_name
5515     pnode = instance.primary_node
5516     nodelist = list(instance.all_nodes)
5517
5518     # hvparams processing
5519     if self.op.hvparams:
5520       i_hvdict = copy.deepcopy(instance.hvparams)
5521       for key, val in self.op.hvparams.iteritems():
5522         if val == constants.VALUE_DEFAULT:
5523           try:
5524             del i_hvdict[key]
5525           except KeyError:
5526             pass
5527         elif val == constants.VALUE_NONE:
5528           i_hvdict[key] = None
5529         else:
5530           i_hvdict[key] = val
5531       cluster = self.cfg.GetClusterInfo()
5532       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5533                                 i_hvdict)
5534       # local check
5535       hypervisor.GetHypervisor(
5536         instance.hypervisor).CheckParameterSyntax(hv_new)
5537       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5538       self.hv_new = hv_new # the new actual values
5539       self.hv_inst = i_hvdict # the new dict (without defaults)
5540     else:
5541       self.hv_new = self.hv_inst = {}
5542
5543     # beparams processing
5544     if self.op.beparams:
5545       i_bedict = copy.deepcopy(instance.beparams)
5546       for key, val in self.op.beparams.iteritems():
5547         if val == constants.VALUE_DEFAULT:
5548           try:
5549             del i_bedict[key]
5550           except KeyError:
5551             pass
5552         else:
5553           i_bedict[key] = val
5554       cluster = self.cfg.GetClusterInfo()
5555       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5556                                 i_bedict)
5557       self.be_new = be_new # the new actual values
5558       self.be_inst = i_bedict # the new dict (without defaults)
5559     else:
5560       self.be_new = self.be_inst = {}
5561
5562     self.warn = []
5563
5564     if constants.BE_MEMORY in self.op.beparams and not self.force:
5565       mem_check_list = [pnode]
5566       if be_new[constants.BE_AUTO_BALANCE]:
5567         # either we changed auto_balance to yes or it was from before
5568         mem_check_list.extend(instance.secondary_nodes)
5569       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5570                                                   instance.hypervisor)
5571       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5572                                          instance.hypervisor)
5573       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5574         # Assume the primary node is unreachable and go ahead
5575         self.warn.append("Can't get info from primary node %s" % pnode)
5576       else:
5577         if not instance_info.failed and instance_info.data:
5578           current_mem = instance_info.data['memory']
5579         else:
5580           # Assume instance not running
5581           # (there is a slight race condition here, but it's not very probable,
5582           # and we have no other way to check)
5583           current_mem = 0
5584         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5585                     nodeinfo[pnode].data['memory_free'])
5586         if miss_mem > 0:
5587           raise errors.OpPrereqError("This change will prevent the instance"
5588                                      " from starting, due to %d MB of memory"
5589                                      " missing on its primary node" % miss_mem)
5590
5591       if be_new[constants.BE_AUTO_BALANCE]:
5592         for node, nres in nodeinfo.iteritems():
5593           if node not in instance.secondary_nodes:
5594             continue
5595           if nres.failed or not isinstance(nres.data, dict):
5596             self.warn.append("Can't get info from secondary node %s" % node)
5597           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5598             self.warn.append("Not enough memory to failover instance to"
5599                              " secondary node %s" % node)
5600
5601     # NIC processing
5602     for nic_op, nic_dict in self.op.nics:
5603       if nic_op == constants.DDM_REMOVE:
5604         if not instance.nics:
5605           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5606         continue
5607       if nic_op != constants.DDM_ADD:
5608         # an existing nic
5609         if nic_op < 0 or nic_op >= len(instance.nics):
5610           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5611                                      " are 0 to %d" %
5612                                      (nic_op, len(instance.nics)))
5613       nic_bridge = nic_dict.get('bridge', None)
5614       if nic_bridge is not None:
5615         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5616           msg = ("Bridge '%s' doesn't exist on one of"
5617                  " the instance nodes" % nic_bridge)
5618           if self.force:
5619             self.warn.append(msg)
5620           else:
5621             raise errors.OpPrereqError(msg)
5622
5623     # DISK processing
5624     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5625       raise errors.OpPrereqError("Disk operations not supported for"
5626                                  " diskless instances")
5627     for disk_op, disk_dict in self.op.disks:
5628       if disk_op == constants.DDM_REMOVE:
5629         if len(instance.disks) == 1:
5630           raise errors.OpPrereqError("Cannot remove the last disk of"
5631                                      " an instance")
5632         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5633         ins_l = ins_l[pnode]
5634         if ins_l.failed or not isinstance(ins_l.data, list):
5635           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5636         if instance.name in ins_l.data:
5637           raise errors.OpPrereqError("Instance is running, can't remove"
5638                                      " disks.")
5639
5640       if (disk_op == constants.DDM_ADD and
5641           len(instance.nics) >= constants.MAX_DISKS):
5642         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5643                                    " add more" % constants.MAX_DISKS)
5644       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5645         # an existing disk
5646         if disk_op < 0 or disk_op >= len(instance.disks):
5647           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5648                                      " are 0 to %d" %
5649                                      (disk_op, len(instance.disks)))
5650
5651     return
5652
5653   def Exec(self, feedback_fn):
5654     """Modifies an instance.
5655
5656     All parameters take effect only at the next restart of the instance.
5657
5658     """
5659     # Process here the warnings from CheckPrereq, as we don't have a
5660     # feedback_fn there.
5661     for warn in self.warn:
5662       feedback_fn("WARNING: %s" % warn)
5663
5664     result = []
5665     instance = self.instance
5666     # disk changes
5667     for disk_op, disk_dict in self.op.disks:
5668       if disk_op == constants.DDM_REMOVE:
5669         # remove the last disk
5670         device = instance.disks.pop()
5671         device_idx = len(instance.disks)
5672         for node, disk in device.ComputeNodeTree(instance.primary_node):
5673           self.cfg.SetDiskID(disk, node)
5674           rpc_result = self.rpc.call_blockdev_remove(node, disk)
5675           if rpc_result.failed or not rpc_result.data:
5676             self.proc.LogWarning("Could not remove disk/%d on node %s,"
5677                                  " continuing anyway", device_idx, node)
5678         result.append(("disk/%d" % device_idx, "remove"))
5679       elif disk_op == constants.DDM_ADD:
5680         # add a new disk
5681         if instance.disk_template == constants.DT_FILE:
5682           file_driver, file_path = instance.disks[0].logical_id
5683           file_path = os.path.dirname(file_path)
5684         else:
5685           file_driver = file_path = None
5686         disk_idx_base = len(instance.disks)
5687         new_disk = _GenerateDiskTemplate(self,
5688                                          instance.disk_template,
5689                                          instance, instance.primary_node,
5690                                          instance.secondary_nodes,
5691                                          [disk_dict],
5692                                          file_path,
5693                                          file_driver,
5694                                          disk_idx_base)[0]
5695         new_disk.mode = disk_dict['mode']
5696         instance.disks.append(new_disk)
5697         info = _GetInstanceInfoText(instance)
5698
5699         logging.info("Creating volume %s for instance %s",
5700                      new_disk.iv_name, instance.name)
5701         # Note: this needs to be kept in sync with _CreateDisks
5702         #HARDCODE
5703         for node in instance.all_nodes:
5704           f_create = node == instance.primary_node
5705           try:
5706             _CreateBlockDev(self, node, instance, new_disk,
5707                             f_create, info, f_create)
5708           except error.OpExecError, err:
5709             self.LogWarning("Failed to create volume %s (%s) on"
5710                             " node %s: %s",
5711                             new_disk.iv_name, new_disk, node, err)
5712         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5713                        (new_disk.size, new_disk.mode)))
5714       else:
5715         # change a given disk
5716         instance.disks[disk_op].mode = disk_dict['mode']
5717         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5718     # NIC changes
5719     for nic_op, nic_dict in self.op.nics:
5720       if nic_op == constants.DDM_REMOVE:
5721         # remove the last nic
5722         del instance.nics[-1]
5723         result.append(("nic.%d" % len(instance.nics), "remove"))
5724       elif nic_op == constants.DDM_ADD:
5725         # add a new nic
5726         if 'mac' not in nic_dict:
5727           mac = constants.VALUE_GENERATE
5728         else:
5729           mac = nic_dict['mac']
5730         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5731           mac = self.cfg.GenerateMAC()
5732         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5733                               bridge=nic_dict.get('bridge', None))
5734         instance.nics.append(new_nic)
5735         result.append(("nic.%d" % (len(instance.nics) - 1),
5736                        "add:mac=%s,ip=%s,bridge=%s" %
5737                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5738       else:
5739         # change a given nic
5740         for key in 'mac', 'ip', 'bridge':
5741           if key in nic_dict:
5742             setattr(instance.nics[nic_op], key, nic_dict[key])
5743             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5744
5745     # hvparams changes
5746     if self.op.hvparams:
5747       instance.hvparams = self.hv_new
5748       for key, val in self.op.hvparams.iteritems():
5749         result.append(("hv/%s" % key, val))
5750
5751     # beparams changes
5752     if self.op.beparams:
5753       instance.beparams = self.be_inst
5754       for key, val in self.op.beparams.iteritems():
5755         result.append(("be/%s" % key, val))
5756
5757     self.cfg.Update(instance)
5758
5759     return result
5760
5761
5762 class LUQueryExports(NoHooksLU):
5763   """Query the exports list
5764
5765   """
5766   _OP_REQP = ['nodes']
5767   REQ_BGL = False
5768
5769   def ExpandNames(self):
5770     self.needed_locks = {}
5771     self.share_locks[locking.LEVEL_NODE] = 1
5772     if not self.op.nodes:
5773       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5774     else:
5775       self.needed_locks[locking.LEVEL_NODE] = \
5776         _GetWantedNodes(self, self.op.nodes)
5777
5778   def CheckPrereq(self):
5779     """Check prerequisites.
5780
5781     """
5782     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5783
5784   def Exec(self, feedback_fn):
5785     """Compute the list of all the exported system images.
5786
5787     @rtype: dict
5788     @return: a dictionary with the structure node->(export-list)
5789         where export-list is a list of the instances exported on
5790         that node.
5791
5792     """
5793     rpcresult = self.rpc.call_export_list(self.nodes)
5794     result = {}
5795     for node in rpcresult:
5796       if rpcresult[node].failed:
5797         result[node] = False
5798       else:
5799         result[node] = rpcresult[node].data
5800
5801     return result
5802
5803
5804 class LUExportInstance(LogicalUnit):
5805   """Export an instance to an image in the cluster.
5806
5807   """
5808   HPATH = "instance-export"
5809   HTYPE = constants.HTYPE_INSTANCE
5810   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5811   REQ_BGL = False
5812
5813   def ExpandNames(self):
5814     self._ExpandAndLockInstance()
5815     # FIXME: lock only instance primary and destination node
5816     #
5817     # Sad but true, for now we have do lock all nodes, as we don't know where
5818     # the previous export might be, and and in this LU we search for it and
5819     # remove it from its current node. In the future we could fix this by:
5820     #  - making a tasklet to search (share-lock all), then create the new one,
5821     #    then one to remove, after
5822     #  - removing the removal operation altoghether
5823     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5824
5825   def DeclareLocks(self, level):
5826     """Last minute lock declaration."""
5827     # All nodes are locked anyway, so nothing to do here.
5828
5829   def BuildHooksEnv(self):
5830     """Build hooks env.
5831
5832     This will run on the master, primary node and target node.
5833
5834     """
5835     env = {
5836       "EXPORT_NODE": self.op.target_node,
5837       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5838       }
5839     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5840     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5841           self.op.target_node]
5842     return env, nl, nl
5843
5844   def CheckPrereq(self):
5845     """Check prerequisites.
5846
5847     This checks that the instance and node names are valid.
5848
5849     """
5850     instance_name = self.op.instance_name
5851     self.instance = self.cfg.GetInstanceInfo(instance_name)
5852     assert self.instance is not None, \
5853           "Cannot retrieve locked instance %s" % self.op.instance_name
5854     _CheckNodeOnline(self, self.instance.primary_node)
5855
5856     self.dst_node = self.cfg.GetNodeInfo(
5857       self.cfg.ExpandNodeName(self.op.target_node))
5858
5859     if self.dst_node is None:
5860       # This is wrong node name, not a non-locked node
5861       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5862     _CheckNodeOnline(self, self.dst_node.name)
5863
5864     # instance disk type verification
5865     for disk in self.instance.disks:
5866       if disk.dev_type == constants.LD_FILE:
5867         raise errors.OpPrereqError("Export not supported for instances with"
5868                                    " file-based disks")
5869
5870   def Exec(self, feedback_fn):
5871     """Export an instance to an image in the cluster.
5872
5873     """
5874     instance = self.instance
5875     dst_node = self.dst_node
5876     src_node = instance.primary_node
5877     if self.op.shutdown:
5878       # shutdown the instance, but not the disks
5879       result = self.rpc.call_instance_shutdown(src_node, instance)
5880       result.Raise()
5881       if not result.data:
5882         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5883                                  (instance.name, src_node))
5884
5885     vgname = self.cfg.GetVGName()
5886
5887     snap_disks = []
5888
5889     # set the disks ID correctly since call_instance_start needs the
5890     # correct drbd minor to create the symlinks
5891     for disk in instance.disks:
5892       self.cfg.SetDiskID(disk, src_node)
5893
5894     try:
5895       for disk in instance.disks:
5896         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5897         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5898         if new_dev_name.failed or not new_dev_name.data:
5899           self.LogWarning("Could not snapshot block device %s on node %s",
5900                           disk.logical_id[1], src_node)
5901           snap_disks.append(False)
5902         else:
5903           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5904                                  logical_id=(vgname, new_dev_name.data),
5905                                  physical_id=(vgname, new_dev_name.data),
5906                                  iv_name=disk.iv_name)
5907           snap_disks.append(new_dev)
5908
5909     finally:
5910       if self.op.shutdown and instance.status == "up":
5911         result = self.rpc.call_instance_start(src_node, instance, None)
5912         msg = result.RemoteFailMsg()
5913         if msg:
5914           _ShutdownInstanceDisks(self, instance)
5915           raise errors.OpExecError("Could not start instance: %s" % msg)
5916
5917     # TODO: check for size
5918
5919     cluster_name = self.cfg.GetClusterName()
5920     for idx, dev in enumerate(snap_disks):
5921       if dev:
5922         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5923                                                instance, cluster_name, idx)
5924         if result.failed or not result.data:
5925           self.LogWarning("Could not export block device %s from node %s to"
5926                           " node %s", dev.logical_id[1], src_node,
5927                           dst_node.name)
5928         result = self.rpc.call_blockdev_remove(src_node, dev)
5929         if result.failed or not result.data:
5930           self.LogWarning("Could not remove snapshot block device %s from node"
5931                           " %s", dev.logical_id[1], src_node)
5932
5933     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5934     if result.failed or not result.data:
5935       self.LogWarning("Could not finalize export for instance %s on node %s",
5936                       instance.name, dst_node.name)
5937
5938     nodelist = self.cfg.GetNodeList()
5939     nodelist.remove(dst_node.name)
5940
5941     # on one-node clusters nodelist will be empty after the removal
5942     # if we proceed the backup would be removed because OpQueryExports
5943     # substitutes an empty list with the full cluster node list.
5944     if nodelist:
5945       exportlist = self.rpc.call_export_list(nodelist)
5946       for node in exportlist:
5947         if exportlist[node].failed:
5948           continue
5949         if instance.name in exportlist[node].data:
5950           if not self.rpc.call_export_remove(node, instance.name):
5951             self.LogWarning("Could not remove older export for instance %s"
5952                             " on node %s", instance.name, node)
5953
5954
5955 class LURemoveExport(NoHooksLU):
5956   """Remove exports related to the named instance.
5957
5958   """
5959   _OP_REQP = ["instance_name"]
5960   REQ_BGL = False
5961
5962   def ExpandNames(self):
5963     self.needed_locks = {}
5964     # We need all nodes to be locked in order for RemoveExport to work, but we
5965     # don't need to lock the instance itself, as nothing will happen to it (and
5966     # we can remove exports also for a removed instance)
5967     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5968
5969   def CheckPrereq(self):
5970     """Check prerequisites.
5971     """
5972     pass
5973
5974   def Exec(self, feedback_fn):
5975     """Remove any export.
5976
5977     """
5978     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5979     # If the instance was not found we'll try with the name that was passed in.
5980     # This will only work if it was an FQDN, though.
5981     fqdn_warn = False
5982     if not instance_name:
5983       fqdn_warn = True
5984       instance_name = self.op.instance_name
5985
5986     exportlist = self.rpc.call_export_list(self.acquired_locks[
5987       locking.LEVEL_NODE])
5988     found = False
5989     for node in exportlist:
5990       if exportlist[node].failed:
5991         self.LogWarning("Failed to query node %s, continuing" % node)
5992         continue
5993       if instance_name in exportlist[node].data:
5994         found = True
5995         result = self.rpc.call_export_remove(node, instance_name)
5996         if result.failed or not result.data:
5997           logging.error("Could not remove export for instance %s"
5998                         " on node %s", instance_name, node)
5999
6000     if fqdn_warn and not found:
6001       feedback_fn("Export not found. If trying to remove an export belonging"
6002                   " to a deleted instance please use its Fully Qualified"
6003                   " Domain Name.")
6004
6005
6006 class TagsLU(NoHooksLU):
6007   """Generic tags LU.
6008
6009   This is an abstract class which is the parent of all the other tags LUs.
6010
6011   """
6012
6013   def ExpandNames(self):
6014     self.needed_locks = {}
6015     if self.op.kind == constants.TAG_NODE:
6016       name = self.cfg.ExpandNodeName(self.op.name)
6017       if name is None:
6018         raise errors.OpPrereqError("Invalid node name (%s)" %
6019                                    (self.op.name,))
6020       self.op.name = name
6021       self.needed_locks[locking.LEVEL_NODE] = name
6022     elif self.op.kind == constants.TAG_INSTANCE:
6023       name = self.cfg.ExpandInstanceName(self.op.name)
6024       if name is None:
6025         raise errors.OpPrereqError("Invalid instance name (%s)" %
6026                                    (self.op.name,))
6027       self.op.name = name
6028       self.needed_locks[locking.LEVEL_INSTANCE] = name
6029
6030   def CheckPrereq(self):
6031     """Check prerequisites.
6032
6033     """
6034     if self.op.kind == constants.TAG_CLUSTER:
6035       self.target = self.cfg.GetClusterInfo()
6036     elif self.op.kind == constants.TAG_NODE:
6037       self.target = self.cfg.GetNodeInfo(self.op.name)
6038     elif self.op.kind == constants.TAG_INSTANCE:
6039       self.target = self.cfg.GetInstanceInfo(self.op.name)
6040     else:
6041       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6042                                  str(self.op.kind))
6043
6044
6045 class LUGetTags(TagsLU):
6046   """Returns the tags of a given object.
6047
6048   """
6049   _OP_REQP = ["kind", "name"]
6050   REQ_BGL = False
6051
6052   def Exec(self, feedback_fn):
6053     """Returns the tag list.
6054
6055     """
6056     return list(self.target.GetTags())
6057
6058
6059 class LUSearchTags(NoHooksLU):
6060   """Searches the tags for a given pattern.
6061
6062   """
6063   _OP_REQP = ["pattern"]
6064   REQ_BGL = False
6065
6066   def ExpandNames(self):
6067     self.needed_locks = {}
6068
6069   def CheckPrereq(self):
6070     """Check prerequisites.
6071
6072     This checks the pattern passed for validity by compiling it.
6073
6074     """
6075     try:
6076       self.re = re.compile(self.op.pattern)
6077     except re.error, err:
6078       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6079                                  (self.op.pattern, err))
6080
6081   def Exec(self, feedback_fn):
6082     """Returns the tag list.
6083
6084     """
6085     cfg = self.cfg
6086     tgts = [("/cluster", cfg.GetClusterInfo())]
6087     ilist = cfg.GetAllInstancesInfo().values()
6088     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6089     nlist = cfg.GetAllNodesInfo().values()
6090     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6091     results = []
6092     for path, target in tgts:
6093       for tag in target.GetTags():
6094         if self.re.search(tag):
6095           results.append((path, tag))
6096     return results
6097
6098
6099 class LUAddTags(TagsLU):
6100   """Sets a tag on a given object.
6101
6102   """
6103   _OP_REQP = ["kind", "name", "tags"]
6104   REQ_BGL = False
6105
6106   def CheckPrereq(self):
6107     """Check prerequisites.
6108
6109     This checks the type and length of the tag name and value.
6110
6111     """
6112     TagsLU.CheckPrereq(self)
6113     for tag in self.op.tags:
6114       objects.TaggableObject.ValidateTag(tag)
6115
6116   def Exec(self, feedback_fn):
6117     """Sets the tag.
6118
6119     """
6120     try:
6121       for tag in self.op.tags:
6122         self.target.AddTag(tag)
6123     except errors.TagError, err:
6124       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6125     try:
6126       self.cfg.Update(self.target)
6127     except errors.ConfigurationError:
6128       raise errors.OpRetryError("There has been a modification to the"
6129                                 " config file and the operation has been"
6130                                 " aborted. Please retry.")
6131
6132
6133 class LUDelTags(TagsLU):
6134   """Delete a list of tags from a given object.
6135
6136   """
6137   _OP_REQP = ["kind", "name", "tags"]
6138   REQ_BGL = False
6139
6140   def CheckPrereq(self):
6141     """Check prerequisites.
6142
6143     This checks that we have the given tag.
6144
6145     """
6146     TagsLU.CheckPrereq(self)
6147     for tag in self.op.tags:
6148       objects.TaggableObject.ValidateTag(tag)
6149     del_tags = frozenset(self.op.tags)
6150     cur_tags = self.target.GetTags()
6151     if not del_tags <= cur_tags:
6152       diff_tags = del_tags - cur_tags
6153       diff_names = ["'%s'" % tag for tag in diff_tags]
6154       diff_names.sort()
6155       raise errors.OpPrereqError("Tag(s) %s not found" %
6156                                  (",".join(diff_names)))
6157
6158   def Exec(self, feedback_fn):
6159     """Remove the tag from the object.
6160
6161     """
6162     for tag in self.op.tags:
6163       self.target.RemoveTag(tag)
6164     try:
6165       self.cfg.Update(self.target)
6166     except errors.ConfigurationError:
6167       raise errors.OpRetryError("There has been a modification to the"
6168                                 " config file and the operation has been"
6169                                 " aborted. Please retry.")
6170
6171
6172 class LUTestDelay(NoHooksLU):
6173   """Sleep for a specified amount of time.
6174
6175   This LU sleeps on the master and/or nodes for a specified amount of
6176   time.
6177
6178   """
6179   _OP_REQP = ["duration", "on_master", "on_nodes"]
6180   REQ_BGL = False
6181
6182   def ExpandNames(self):
6183     """Expand names and set required locks.
6184
6185     This expands the node list, if any.
6186
6187     """
6188     self.needed_locks = {}
6189     if self.op.on_nodes:
6190       # _GetWantedNodes can be used here, but is not always appropriate to use
6191       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6192       # more information.
6193       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6194       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6195
6196   def CheckPrereq(self):
6197     """Check prerequisites.
6198
6199     """
6200
6201   def Exec(self, feedback_fn):
6202     """Do the actual sleep.
6203
6204     """
6205     if self.op.on_master:
6206       if not utils.TestDelay(self.op.duration):
6207         raise errors.OpExecError("Error during master delay test")
6208     if self.op.on_nodes:
6209       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6210       if not result:
6211         raise errors.OpExecError("Complete failure from rpc call")
6212       for node, node_result in result.items():
6213         node_result.Raise()
6214         if not node_result.data:
6215           raise errors.OpExecError("Failure during rpc call to node %s,"
6216                                    " result: %s" % (node, node_result.data))
6217
6218
6219 class IAllocator(object):
6220   """IAllocator framework.
6221
6222   An IAllocator instance has three sets of attributes:
6223     - cfg that is needed to query the cluster
6224     - input data (all members of the _KEYS class attribute are required)
6225     - four buffer attributes (in|out_data|text), that represent the
6226       input (to the external script) in text and data structure format,
6227       and the output from it, again in two formats
6228     - the result variables from the script (success, info, nodes) for
6229       easy usage
6230
6231   """
6232   _ALLO_KEYS = [
6233     "mem_size", "disks", "disk_template",
6234     "os", "tags", "nics", "vcpus", "hypervisor",
6235     ]
6236   _RELO_KEYS = [
6237     "relocate_from",
6238     ]
6239
6240   def __init__(self, lu, mode, name, **kwargs):
6241     self.lu = lu
6242     # init buffer variables
6243     self.in_text = self.out_text = self.in_data = self.out_data = None
6244     # init all input fields so that pylint is happy
6245     self.mode = mode
6246     self.name = name
6247     self.mem_size = self.disks = self.disk_template = None
6248     self.os = self.tags = self.nics = self.vcpus = None
6249     self.hypervisor = None
6250     self.relocate_from = None
6251     # computed fields
6252     self.required_nodes = None
6253     # init result fields
6254     self.success = self.info = self.nodes = None
6255     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6256       keyset = self._ALLO_KEYS
6257     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6258       keyset = self._RELO_KEYS
6259     else:
6260       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6261                                    " IAllocator" % self.mode)
6262     for key in kwargs:
6263       if key not in keyset:
6264         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6265                                      " IAllocator" % key)
6266       setattr(self, key, kwargs[key])
6267     for key in keyset:
6268       if key not in kwargs:
6269         raise errors.ProgrammerError("Missing input parameter '%s' to"
6270                                      " IAllocator" % key)
6271     self._BuildInputData()
6272
6273   def _ComputeClusterData(self):
6274     """Compute the generic allocator input data.
6275
6276     This is the data that is independent of the actual operation.
6277
6278     """
6279     cfg = self.lu.cfg
6280     cluster_info = cfg.GetClusterInfo()
6281     # cluster data
6282     data = {
6283       "version": 1,
6284       "cluster_name": cfg.GetClusterName(),
6285       "cluster_tags": list(cluster_info.GetTags()),
6286       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
6287       # we don't have job IDs
6288       }
6289     iinfo = cfg.GetAllInstancesInfo().values()
6290     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6291
6292     # node data
6293     node_results = {}
6294     node_list = cfg.GetNodeList()
6295
6296     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6297       hypervisor_name = self.hypervisor
6298     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6299       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6300
6301     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6302                                            hypervisor_name)
6303     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6304                        cluster_info.enabled_hypervisors)
6305     for nname in node_list:
6306       ninfo = cfg.GetNodeInfo(nname)
6307       node_data[nname].Raise()
6308       if not isinstance(node_data[nname].data, dict):
6309         raise errors.OpExecError("Can't get data for node %s" % nname)
6310       remote_info = node_data[nname].data
6311       for attr in ['memory_total', 'memory_free', 'memory_dom0',
6312                    'vg_size', 'vg_free', 'cpu_total']:
6313         if attr not in remote_info:
6314           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
6315                                    (nname, attr))
6316         try:
6317           remote_info[attr] = int(remote_info[attr])
6318         except ValueError, err:
6319           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
6320                                    " %s" % (nname, attr, str(err)))
6321       # compute memory used by primary instances
6322       i_p_mem = i_p_up_mem = 0
6323       for iinfo, beinfo in i_list:
6324         if iinfo.primary_node == nname:
6325           i_p_mem += beinfo[constants.BE_MEMORY]
6326           if iinfo.name not in node_iinfo[nname]:
6327             i_used_mem = 0
6328           else:
6329             i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
6330           i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6331           remote_info['memory_free'] -= max(0, i_mem_diff)
6332
6333           if iinfo.status == "up":
6334             i_p_up_mem += beinfo[constants.BE_MEMORY]
6335
6336       # compute memory used by instances
6337       pnr = {
6338         "tags": list(ninfo.GetTags()),
6339         "total_memory": remote_info['memory_total'],
6340         "reserved_memory": remote_info['memory_dom0'],
6341         "free_memory": remote_info['memory_free'],
6342         "i_pri_memory": i_p_mem,
6343         "i_pri_up_memory": i_p_up_mem,
6344         "total_disk": remote_info['vg_size'],
6345         "free_disk": remote_info['vg_free'],
6346         "primary_ip": ninfo.primary_ip,
6347         "secondary_ip": ninfo.secondary_ip,
6348         "total_cpus": remote_info['cpu_total'],
6349         "offline": ninfo.offline,
6350         }
6351       node_results[nname] = pnr
6352     data["nodes"] = node_results
6353
6354     # instance data
6355     instance_data = {}
6356     for iinfo, beinfo in i_list:
6357       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6358                   for n in iinfo.nics]
6359       pir = {
6360         "tags": list(iinfo.GetTags()),
6361         "should_run": iinfo.status == "up",
6362         "vcpus": beinfo[constants.BE_VCPUS],
6363         "memory": beinfo[constants.BE_MEMORY],
6364         "os": iinfo.os,
6365         "nodes": list(iinfo.all_nodes),
6366         "nics": nic_data,
6367         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
6368         "disk_template": iinfo.disk_template,
6369         "hypervisor": iinfo.hypervisor,
6370         }
6371       instance_data[iinfo.name] = pir
6372
6373     data["instances"] = instance_data
6374
6375     self.in_data = data
6376
6377   def _AddNewInstance(self):
6378     """Add new instance data to allocator structure.
6379
6380     This in combination with _AllocatorGetClusterData will create the
6381     correct structure needed as input for the allocator.
6382
6383     The checks for the completeness of the opcode must have already been
6384     done.
6385
6386     """
6387     data = self.in_data
6388     if len(self.disks) != 2:
6389       raise errors.OpExecError("Only two-disk configurations supported")
6390
6391     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6392
6393     if self.disk_template in constants.DTS_NET_MIRROR:
6394       self.required_nodes = 2
6395     else:
6396       self.required_nodes = 1
6397     request = {
6398       "type": "allocate",
6399       "name": self.name,
6400       "disk_template": self.disk_template,
6401       "tags": self.tags,
6402       "os": self.os,
6403       "vcpus": self.vcpus,
6404       "memory": self.mem_size,
6405       "disks": self.disks,
6406       "disk_space_total": disk_space,
6407       "nics": self.nics,
6408       "required_nodes": self.required_nodes,
6409       }
6410     data["request"] = request
6411
6412   def _AddRelocateInstance(self):
6413     """Add relocate instance data to allocator structure.
6414
6415     This in combination with _IAllocatorGetClusterData will create the
6416     correct structure needed as input for the allocator.
6417
6418     The checks for the completeness of the opcode must have already been
6419     done.
6420
6421     """
6422     instance = self.lu.cfg.GetInstanceInfo(self.name)
6423     if instance is None:
6424       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6425                                    " IAllocator" % self.name)
6426
6427     if instance.disk_template not in constants.DTS_NET_MIRROR:
6428       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6429
6430     if len(instance.secondary_nodes) != 1:
6431       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6432
6433     self.required_nodes = 1
6434     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6435     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6436
6437     request = {
6438       "type": "relocate",
6439       "name": self.name,
6440       "disk_space_total": disk_space,
6441       "required_nodes": self.required_nodes,
6442       "relocate_from": self.relocate_from,
6443       }
6444     self.in_data["request"] = request
6445
6446   def _BuildInputData(self):
6447     """Build input data structures.
6448
6449     """
6450     self._ComputeClusterData()
6451
6452     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6453       self._AddNewInstance()
6454     else:
6455       self._AddRelocateInstance()
6456
6457     self.in_text = serializer.Dump(self.in_data)
6458
6459   def Run(self, name, validate=True, call_fn=None):
6460     """Run an instance allocator and return the results.
6461
6462     """
6463     if call_fn is None:
6464       call_fn = self.lu.rpc.call_iallocator_runner
6465     data = self.in_text
6466
6467     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6468     result.Raise()
6469
6470     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6471       raise errors.OpExecError("Invalid result from master iallocator runner")
6472
6473     rcode, stdout, stderr, fail = result.data
6474
6475     if rcode == constants.IARUN_NOTFOUND:
6476       raise errors.OpExecError("Can't find allocator '%s'" % name)
6477     elif rcode == constants.IARUN_FAILURE:
6478       raise errors.OpExecError("Instance allocator call failed: %s,"
6479                                " output: %s" % (fail, stdout+stderr))
6480     self.out_text = stdout
6481     if validate:
6482       self._ValidateResult()
6483
6484   def _ValidateResult(self):
6485     """Process the allocator results.
6486
6487     This will process and if successful save the result in
6488     self.out_data and the other parameters.
6489
6490     """
6491     try:
6492       rdict = serializer.Load(self.out_text)
6493     except Exception, err:
6494       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6495
6496     if not isinstance(rdict, dict):
6497       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6498
6499     for key in "success", "info", "nodes":
6500       if key not in rdict:
6501         raise errors.OpExecError("Can't parse iallocator results:"
6502                                  " missing key '%s'" % key)
6503       setattr(self, key, rdict[key])
6504
6505     if not isinstance(rdict["nodes"], list):
6506       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6507                                " is not a list")
6508     self.out_data = rdict
6509
6510
6511 class LUTestAllocator(NoHooksLU):
6512   """Run allocator tests.
6513
6514   This LU runs the allocator tests
6515
6516   """
6517   _OP_REQP = ["direction", "mode", "name"]
6518
6519   def CheckPrereq(self):
6520     """Check prerequisites.
6521
6522     This checks the opcode parameters depending on the director and mode test.
6523
6524     """
6525     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6526       for attr in ["name", "mem_size", "disks", "disk_template",
6527                    "os", "tags", "nics", "vcpus"]:
6528         if not hasattr(self.op, attr):
6529           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6530                                      attr)
6531       iname = self.cfg.ExpandInstanceName(self.op.name)
6532       if iname is not None:
6533         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6534                                    iname)
6535       if not isinstance(self.op.nics, list):
6536         raise errors.OpPrereqError("Invalid parameter 'nics'")
6537       for row in self.op.nics:
6538         if (not isinstance(row, dict) or
6539             "mac" not in row or
6540             "ip" not in row or
6541             "bridge" not in row):
6542           raise errors.OpPrereqError("Invalid contents of the"
6543                                      " 'nics' parameter")
6544       if not isinstance(self.op.disks, list):
6545         raise errors.OpPrereqError("Invalid parameter 'disks'")
6546       if len(self.op.disks) != 2:
6547         raise errors.OpPrereqError("Only two-disk configurations supported")
6548       for row in self.op.disks:
6549         if (not isinstance(row, dict) or
6550             "size" not in row or
6551             not isinstance(row["size"], int) or
6552             "mode" not in row or
6553             row["mode"] not in ['r', 'w']):
6554           raise errors.OpPrereqError("Invalid contents of the"
6555                                      " 'disks' parameter")
6556       if self.op.hypervisor is None:
6557         self.op.hypervisor = self.cfg.GetHypervisorType()
6558     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6559       if not hasattr(self.op, "name"):
6560         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6561       fname = self.cfg.ExpandInstanceName(self.op.name)
6562       if fname is None:
6563         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6564                                    self.op.name)
6565       self.op.name = fname
6566       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6567     else:
6568       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6569                                  self.op.mode)
6570
6571     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6572       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6573         raise errors.OpPrereqError("Missing allocator name")
6574     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6575       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6576                                  self.op.direction)
6577
6578   def Exec(self, feedback_fn):
6579     """Run the allocator test.
6580
6581     """
6582     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6583       ial = IAllocator(self,
6584                        mode=self.op.mode,
6585                        name=self.op.name,
6586                        mem_size=self.op.mem_size,
6587                        disks=self.op.disks,
6588                        disk_template=self.op.disk_template,
6589                        os=self.op.os,
6590                        tags=self.op.tags,
6591                        nics=self.op.nics,
6592                        vcpus=self.op.vcpus,
6593                        hypervisor=self.op.hypervisor,
6594                        )
6595     else:
6596       ial = IAllocator(self,
6597                        mode=self.op.mode,
6598                        name=self.op.name,
6599                        relocate_from=list(self.relocate_from),
6600                        )
6601
6602     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6603       result = ial.in_text
6604     else:
6605       ial.Run(self.op.allocator, validate=False)
6606       result = ial.out_text
6607     return result