Get rid of constants.HT_HVM_DEFAULT_BOOT_ORDER
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396   return wanted
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _CheckBooleanOpField(op, name):
419   """Validates boolean opcode parameters.
420
421   This will ensure that an opcode parameter is either a boolean value,
422   or None (but that it always exists).
423
424   """
425   val = getattr(op, name, None)
426   if not (val is None or isinstance(val, bool)):
427     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428                                (name, str(val)))
429   setattr(op, name, val)
430
431
432 def _CheckNodeOnline(lu, node):
433   """Ensure that a given node is online.
434
435   @param lu: the LU on behalf of which we make the check
436   @param node: the node to check
437   @raise errors.OpPrereqError: if the nodes is offline
438
439   """
440   if lu.cfg.GetNodeInfo(node).offline:
441     raise errors.OpPrereqError("Can't use offline node %s" % node)
442
443
444 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445                           memory, vcpus, nics):
446   """Builds instance related env variables for hooks
447
448   This builds the hook environment from individual variables.
449
450   @type name: string
451   @param name: the name of the instance
452   @type primary_node: string
453   @param primary_node: the name of the instance's primary node
454   @type secondary_nodes: list
455   @param secondary_nodes: list of secondary nodes as strings
456   @type os_type: string
457   @param os_type: the name of the instance's OS
458   @type status: boolean
459   @param status: the should_run status of the instance
460   @type memory: string
461   @param memory: the memory size of the instance
462   @type vcpus: string
463   @param vcpus: the count of VCPUs the instance has
464   @type nics: list
465   @param nics: list of tuples (ip, bridge, mac) representing
466       the NICs the instance  has
467   @rtype: dict
468   @return: the hook environment for this instance
469
470   """
471   if status:
472     str_status = "up"
473   else:
474     str_status = "down"
475   env = {
476     "OP_TARGET": name,
477     "INSTANCE_NAME": name,
478     "INSTANCE_PRIMARY": primary_node,
479     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
480     "INSTANCE_OS_TYPE": os_type,
481     "INSTANCE_STATUS": str_status,
482     "INSTANCE_MEMORY": memory,
483     "INSTANCE_VCPUS": vcpus,
484   }
485
486   if nics:
487     nic_count = len(nics)
488     for idx, (ip, bridge, mac) in enumerate(nics):
489       if ip is None:
490         ip = ""
491       env["INSTANCE_NIC%d_IP" % idx] = ip
492       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
493       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
494   else:
495     nic_count = 0
496
497   env["INSTANCE_NIC_COUNT"] = nic_count
498
499   return env
500
501
502 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
503   """Builds instance related env variables for hooks from an object.
504
505   @type lu: L{LogicalUnit}
506   @param lu: the logical unit on whose behalf we execute
507   @type instance: L{objects.Instance}
508   @param instance: the instance for which we should build the
509       environment
510   @type override: dict
511   @param override: dictionary with key/values that will override
512       our values
513   @rtype: dict
514   @return: the hook environment dictionary
515
516   """
517   bep = lu.cfg.GetClusterInfo().FillBE(instance)
518   args = {
519     'name': instance.name,
520     'primary_node': instance.primary_node,
521     'secondary_nodes': instance.secondary_nodes,
522     'os_type': instance.os,
523     'status': instance.admin_up,
524     'memory': bep[constants.BE_MEMORY],
525     'vcpus': bep[constants.BE_VCPUS],
526     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
527   }
528   if override:
529     args.update(override)
530   return _BuildInstanceHookEnv(**args)
531
532
533 def _AdjustCandidatePool(lu):
534   """Adjust the candidate pool after node operations.
535
536   """
537   mod_list = lu.cfg.MaintainCandidatePool()
538   if mod_list:
539     lu.LogInfo("Promoted nodes to master candidate role: %s",
540                ", ".join(node.name for node in mod_list))
541     for name in mod_list:
542       lu.context.ReaddNode(name)
543   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
544   if mc_now > mc_max:
545     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
546                (mc_now, mc_max))
547
548
549 def _CheckInstanceBridgesExist(lu, instance):
550   """Check that the brigdes needed by an instance exist.
551
552   """
553   # check bridges existance
554   brlist = [nic.bridge for nic in instance.nics]
555   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
556   result.Raise()
557   if not result.data:
558     raise errors.OpPrereqError("One or more target bridges %s does not"
559                                " exist on destination node '%s'" %
560                                (brlist, instance.primary_node))
561
562
563 class LUDestroyCluster(NoHooksLU):
564   """Logical unit for destroying the cluster.
565
566   """
567   _OP_REQP = []
568
569   def CheckPrereq(self):
570     """Check prerequisites.
571
572     This checks whether the cluster is empty.
573
574     Any errors are signalled by raising errors.OpPrereqError.
575
576     """
577     master = self.cfg.GetMasterNode()
578
579     nodelist = self.cfg.GetNodeList()
580     if len(nodelist) != 1 or nodelist[0] != master:
581       raise errors.OpPrereqError("There are still %d node(s) in"
582                                  " this cluster." % (len(nodelist) - 1))
583     instancelist = self.cfg.GetInstanceList()
584     if instancelist:
585       raise errors.OpPrereqError("There are still %d instance(s) in"
586                                  " this cluster." % len(instancelist))
587
588   def Exec(self, feedback_fn):
589     """Destroys the cluster.
590
591     """
592     master = self.cfg.GetMasterNode()
593     result = self.rpc.call_node_stop_master(master, False)
594     result.Raise()
595     if not result.data:
596       raise errors.OpExecError("Could not disable the master role")
597     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
598     utils.CreateBackup(priv_key)
599     utils.CreateBackup(pub_key)
600     return master
601
602
603 class LUVerifyCluster(LogicalUnit):
604   """Verifies the cluster status.
605
606   """
607   HPATH = "cluster-verify"
608   HTYPE = constants.HTYPE_CLUSTER
609   _OP_REQP = ["skip_checks"]
610   REQ_BGL = False
611
612   def ExpandNames(self):
613     self.needed_locks = {
614       locking.LEVEL_NODE: locking.ALL_SET,
615       locking.LEVEL_INSTANCE: locking.ALL_SET,
616     }
617     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
618
619   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
620                   node_result, feedback_fn, master_files,
621                   drbd_map):
622     """Run multiple tests against a node.
623
624     Test list:
625
626       - compares ganeti version
627       - checks vg existance and size > 20G
628       - checks config file checksum
629       - checks ssh to other nodes
630
631     @type nodeinfo: L{objects.Node}
632     @param nodeinfo: the node to check
633     @param file_list: required list of files
634     @param local_cksum: dictionary of local files and their checksums
635     @param node_result: the results from the node
636     @param feedback_fn: function used to accumulate results
637     @param master_files: list of files that only masters should have
638     @param drbd_map: the useddrbd minors for this node, in
639         form of minor: (instance, must_exist) which correspond to instances
640         and their running status
641
642     """
643     node = nodeinfo.name
644
645     # main result, node_result should be a non-empty dict
646     if not node_result or not isinstance(node_result, dict):
647       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
648       return True
649
650     # compares ganeti version
651     local_version = constants.PROTOCOL_VERSION
652     remote_version = node_result.get('version', None)
653     if not (remote_version and isinstance(remote_version, (list, tuple)) and
654             len(remote_version) == 2):
655       feedback_fn("  - ERROR: connection to %s failed" % (node))
656       return True
657
658     if local_version != remote_version[0]:
659       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
660                   " node %s %s" % (local_version, node, remote_version[0]))
661       return True
662
663     # node seems compatible, we can actually try to look into its results
664
665     bad = False
666
667     # full package version
668     if constants.RELEASE_VERSION != remote_version[1]:
669       feedback_fn("  - WARNING: software version mismatch: master %s,"
670                   " node %s %s" %
671                   (constants.RELEASE_VERSION, node, remote_version[1]))
672
673     # checks vg existence and size > 20G
674
675     vglist = node_result.get(constants.NV_VGLIST, None)
676     if not vglist:
677       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
678                       (node,))
679       bad = True
680     else:
681       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
682                                             constants.MIN_VG_SIZE)
683       if vgstatus:
684         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
685         bad = True
686
687     # checks config file checksum
688
689     remote_cksum = node_result.get(constants.NV_FILELIST, None)
690     if not isinstance(remote_cksum, dict):
691       bad = True
692       feedback_fn("  - ERROR: node hasn't returned file checksum data")
693     else:
694       for file_name in file_list:
695         node_is_mc = nodeinfo.master_candidate
696         must_have_file = file_name not in master_files
697         if file_name not in remote_cksum:
698           if node_is_mc or must_have_file:
699             bad = True
700             feedback_fn("  - ERROR: file '%s' missing" % file_name)
701         elif remote_cksum[file_name] != local_cksum[file_name]:
702           if node_is_mc or must_have_file:
703             bad = True
704             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
705           else:
706             # not candidate and this is not a must-have file
707             bad = True
708             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
709                         " '%s'" % file_name)
710         else:
711           # all good, except non-master/non-must have combination
712           if not node_is_mc and not must_have_file:
713             feedback_fn("  - ERROR: file '%s' should not exist on non master"
714                         " candidates" % file_name)
715
716     # checks ssh to any
717
718     if constants.NV_NODELIST not in node_result:
719       bad = True
720       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
721     else:
722       if node_result[constants.NV_NODELIST]:
723         bad = True
724         for node in node_result[constants.NV_NODELIST]:
725           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
726                           (node, node_result[constants.NV_NODELIST][node]))
727
728     if constants.NV_NODENETTEST not in node_result:
729       bad = True
730       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
731     else:
732       if node_result[constants.NV_NODENETTEST]:
733         bad = True
734         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
735         for node in nlist:
736           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
737                           (node, node_result[constants.NV_NODENETTEST][node]))
738
739     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
740     if isinstance(hyp_result, dict):
741       for hv_name, hv_result in hyp_result.iteritems():
742         if hv_result is not None:
743           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
744                       (hv_name, hv_result))
745
746     # check used drbd list
747     used_minors = node_result.get(constants.NV_DRBDLIST, [])
748     for minor, (iname, must_exist) in drbd_map.items():
749       if minor not in used_minors and must_exist:
750         feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
751                     (minor, iname))
752         bad = True
753     for minor in used_minors:
754       if minor not in drbd_map:
755         feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
756         bad = True
757
758     return bad
759
760   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
761                       node_instance, feedback_fn, n_offline):
762     """Verify an instance.
763
764     This function checks to see if the required block devices are
765     available on the instance's node.
766
767     """
768     bad = False
769
770     node_current = instanceconfig.primary_node
771
772     node_vol_should = {}
773     instanceconfig.MapLVsByNode(node_vol_should)
774
775     for node in node_vol_should:
776       if node in n_offline:
777         # ignore missing volumes on offline nodes
778         continue
779       for volume in node_vol_should[node]:
780         if node not in node_vol_is or volume not in node_vol_is[node]:
781           feedback_fn("  - ERROR: volume %s missing on node %s" %
782                           (volume, node))
783           bad = True
784
785     if instanceconfig.admin_up:
786       if ((node_current not in node_instance or
787           not instance in node_instance[node_current]) and
788           node_current not in n_offline):
789         feedback_fn("  - ERROR: instance %s not running on node %s" %
790                         (instance, node_current))
791         bad = True
792
793     for node in node_instance:
794       if (not node == node_current):
795         if instance in node_instance[node]:
796           feedback_fn("  - ERROR: instance %s should not run on node %s" %
797                           (instance, node))
798           bad = True
799
800     return bad
801
802   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
803     """Verify if there are any unknown volumes in the cluster.
804
805     The .os, .swap and backup volumes are ignored. All other volumes are
806     reported as unknown.
807
808     """
809     bad = False
810
811     for node in node_vol_is:
812       for volume in node_vol_is[node]:
813         if node not in node_vol_should or volume not in node_vol_should[node]:
814           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
815                       (volume, node))
816           bad = True
817     return bad
818
819   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
820     """Verify the list of running instances.
821
822     This checks what instances are running but unknown to the cluster.
823
824     """
825     bad = False
826     for node in node_instance:
827       for runninginstance in node_instance[node]:
828         if runninginstance not in instancelist:
829           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
830                           (runninginstance, node))
831           bad = True
832     return bad
833
834   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
835     """Verify N+1 Memory Resilience.
836
837     Check that if one single node dies we can still start all the instances it
838     was primary for.
839
840     """
841     bad = False
842
843     for node, nodeinfo in node_info.iteritems():
844       # This code checks that every node which is now listed as secondary has
845       # enough memory to host all instances it is supposed to should a single
846       # other node in the cluster fail.
847       # FIXME: not ready for failover to an arbitrary node
848       # FIXME: does not support file-backed instances
849       # WARNING: we currently take into account down instances as well as up
850       # ones, considering that even if they're down someone might want to start
851       # them even in the event of a node failure.
852       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
853         needed_mem = 0
854         for instance in instances:
855           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
856           if bep[constants.BE_AUTO_BALANCE]:
857             needed_mem += bep[constants.BE_MEMORY]
858         if nodeinfo['mfree'] < needed_mem:
859           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
860                       " failovers should node %s fail" % (node, prinode))
861           bad = True
862     return bad
863
864   def CheckPrereq(self):
865     """Check prerequisites.
866
867     Transform the list of checks we're going to skip into a set and check that
868     all its members are valid.
869
870     """
871     self.skip_set = frozenset(self.op.skip_checks)
872     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
873       raise errors.OpPrereqError("Invalid checks to be skipped specified")
874
875   def BuildHooksEnv(self):
876     """Build hooks env.
877
878     Cluster-Verify hooks just rone in the post phase and their failure makes
879     the output be logged in the verify output and the verification to fail.
880
881     """
882     all_nodes = self.cfg.GetNodeList()
883     # TODO: populate the environment with useful information for verify hooks
884     env = {}
885     return env, [], all_nodes
886
887   def Exec(self, feedback_fn):
888     """Verify integrity of cluster, performing various test on nodes.
889
890     """
891     bad = False
892     feedback_fn("* Verifying global settings")
893     for msg in self.cfg.VerifyConfig():
894       feedback_fn("  - ERROR: %s" % msg)
895
896     vg_name = self.cfg.GetVGName()
897     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
898     nodelist = utils.NiceSort(self.cfg.GetNodeList())
899     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
900     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
901     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
902                         for iname in instancelist)
903     i_non_redundant = [] # Non redundant instances
904     i_non_a_balanced = [] # Non auto-balanced instances
905     n_offline = [] # List of offline nodes
906     node_volume = {}
907     node_instance = {}
908     node_info = {}
909     instance_cfg = {}
910
911     # FIXME: verify OS list
912     # do local checksums
913     master_files = [constants.CLUSTER_CONF_FILE]
914
915     file_names = ssconf.SimpleStore().GetFileList()
916     file_names.append(constants.SSL_CERT_FILE)
917     file_names.append(constants.RAPI_CERT_FILE)
918     file_names.extend(master_files)
919
920     local_checksums = utils.FingerprintFiles(file_names)
921
922     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
923     node_verify_param = {
924       constants.NV_FILELIST: file_names,
925       constants.NV_NODELIST: [node.name for node in nodeinfo
926                               if not node.offline],
927       constants.NV_HYPERVISOR: hypervisors,
928       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
929                                   node.secondary_ip) for node in nodeinfo
930                                  if not node.offline],
931       constants.NV_LVLIST: vg_name,
932       constants.NV_INSTANCELIST: hypervisors,
933       constants.NV_VGLIST: None,
934       constants.NV_VERSION: None,
935       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
936       constants.NV_DRBDLIST: None,
937       }
938     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
939                                            self.cfg.GetClusterName())
940
941     cluster = self.cfg.GetClusterInfo()
942     master_node = self.cfg.GetMasterNode()
943     all_drbd_map = self.cfg.ComputeDRBDMap()
944
945     for node_i in nodeinfo:
946       node = node_i.name
947       nresult = all_nvinfo[node].data
948
949       if node_i.offline:
950         feedback_fn("* Skipping offline node %s" % (node,))
951         n_offline.append(node)
952         continue
953
954       if node == master_node:
955         ntype = "master"
956       elif node_i.master_candidate:
957         ntype = "master candidate"
958       else:
959         ntype = "regular"
960       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
961
962       if all_nvinfo[node].failed or not isinstance(nresult, dict):
963         feedback_fn("  - ERROR: connection to %s failed" % (node,))
964         bad = True
965         continue
966
967       node_drbd = {}
968       for minor, instance in all_drbd_map[node].items():
969         instance = instanceinfo[instance]
970         node_drbd[minor] = (instance.name, instance.admin_up)
971       result = self._VerifyNode(node_i, file_names, local_checksums,
972                                 nresult, feedback_fn, master_files,
973                                 node_drbd)
974       bad = bad or result
975
976       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
977       if isinstance(lvdata, basestring):
978         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
979                     (node, utils.SafeEncode(lvdata)))
980         bad = True
981         node_volume[node] = {}
982       elif not isinstance(lvdata, dict):
983         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
984         bad = True
985         continue
986       else:
987         node_volume[node] = lvdata
988
989       # node_instance
990       idata = nresult.get(constants.NV_INSTANCELIST, None)
991       if not isinstance(idata, list):
992         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
993                     (node,))
994         bad = True
995         continue
996
997       node_instance[node] = idata
998
999       # node_info
1000       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1001       if not isinstance(nodeinfo, dict):
1002         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1003         bad = True
1004         continue
1005
1006       try:
1007         node_info[node] = {
1008           "mfree": int(nodeinfo['memory_free']),
1009           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
1010           "pinst": [],
1011           "sinst": [],
1012           # dictionary holding all instances this node is secondary for,
1013           # grouped by their primary node. Each key is a cluster node, and each
1014           # value is a list of instances which have the key as primary and the
1015           # current node as secondary.  this is handy to calculate N+1 memory
1016           # availability if you can only failover from a primary to its
1017           # secondary.
1018           "sinst-by-pnode": {},
1019         }
1020       except ValueError:
1021         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1022         bad = True
1023         continue
1024
1025     node_vol_should = {}
1026
1027     for instance in instancelist:
1028       feedback_fn("* Verifying instance %s" % instance)
1029       inst_config = instanceinfo[instance]
1030       result =  self._VerifyInstance(instance, inst_config, node_volume,
1031                                      node_instance, feedback_fn, n_offline)
1032       bad = bad or result
1033       inst_nodes_offline = []
1034
1035       inst_config.MapLVsByNode(node_vol_should)
1036
1037       instance_cfg[instance] = inst_config
1038
1039       pnode = inst_config.primary_node
1040       if pnode in node_info:
1041         node_info[pnode]['pinst'].append(instance)
1042       elif pnode not in n_offline:
1043         feedback_fn("  - ERROR: instance %s, connection to primary node"
1044                     " %s failed" % (instance, pnode))
1045         bad = True
1046
1047       if pnode in n_offline:
1048         inst_nodes_offline.append(pnode)
1049
1050       # If the instance is non-redundant we cannot survive losing its primary
1051       # node, so we are not N+1 compliant. On the other hand we have no disk
1052       # templates with more than one secondary so that situation is not well
1053       # supported either.
1054       # FIXME: does not support file-backed instances
1055       if len(inst_config.secondary_nodes) == 0:
1056         i_non_redundant.append(instance)
1057       elif len(inst_config.secondary_nodes) > 1:
1058         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1059                     % instance)
1060
1061       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1062         i_non_a_balanced.append(instance)
1063
1064       for snode in inst_config.secondary_nodes:
1065         if snode in node_info:
1066           node_info[snode]['sinst'].append(instance)
1067           if pnode not in node_info[snode]['sinst-by-pnode']:
1068             node_info[snode]['sinst-by-pnode'][pnode] = []
1069           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1070         elif snode not in n_offline:
1071           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1072                       " %s failed" % (instance, snode))
1073           bad = True
1074         if snode in n_offline:
1075           inst_nodes_offline.append(snode)
1076
1077       if inst_nodes_offline:
1078         # warn that the instance lives on offline nodes, and set bad=True
1079         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1080                     ", ".join(inst_nodes_offline))
1081         bad = True
1082
1083     feedback_fn("* Verifying orphan volumes")
1084     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1085                                        feedback_fn)
1086     bad = bad or result
1087
1088     feedback_fn("* Verifying remaining instances")
1089     result = self._VerifyOrphanInstances(instancelist, node_instance,
1090                                          feedback_fn)
1091     bad = bad or result
1092
1093     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1094       feedback_fn("* Verifying N+1 Memory redundancy")
1095       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1096       bad = bad or result
1097
1098     feedback_fn("* Other Notes")
1099     if i_non_redundant:
1100       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1101                   % len(i_non_redundant))
1102
1103     if i_non_a_balanced:
1104       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1105                   % len(i_non_a_balanced))
1106
1107     if n_offline:
1108       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1109
1110     return not bad
1111
1112   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1113     """Analize the post-hooks' result
1114
1115     This method analyses the hook result, handles it, and sends some
1116     nicely-formatted feedback back to the user.
1117
1118     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1119         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1120     @param hooks_results: the results of the multi-node hooks rpc call
1121     @param feedback_fn: function used send feedback back to the caller
1122     @param lu_result: previous Exec result
1123     @return: the new Exec result, based on the previous result
1124         and hook results
1125
1126     """
1127     # We only really run POST phase hooks, and are only interested in
1128     # their results
1129     if phase == constants.HOOKS_PHASE_POST:
1130       # Used to change hooks' output to proper indentation
1131       indent_re = re.compile('^', re.M)
1132       feedback_fn("* Hooks Results")
1133       if not hooks_results:
1134         feedback_fn("  - ERROR: general communication failure")
1135         lu_result = 1
1136       else:
1137         for node_name in hooks_results:
1138           show_node_header = True
1139           res = hooks_results[node_name]
1140           if res.failed or res.data is False or not isinstance(res.data, list):
1141             if res.offline:
1142               # no need to warn or set fail return value
1143               continue
1144             feedback_fn("    Communication failure in hooks execution")
1145             lu_result = 1
1146             continue
1147           for script, hkr, output in res.data:
1148             if hkr == constants.HKR_FAIL:
1149               # The node header is only shown once, if there are
1150               # failing hooks on that node
1151               if show_node_header:
1152                 feedback_fn("  Node %s:" % node_name)
1153                 show_node_header = False
1154               feedback_fn("    ERROR: Script %s failed, output:" % script)
1155               output = indent_re.sub('      ', output)
1156               feedback_fn("%s" % output)
1157               lu_result = 1
1158
1159       return lu_result
1160
1161
1162 class LUVerifyDisks(NoHooksLU):
1163   """Verifies the cluster disks status.
1164
1165   """
1166   _OP_REQP = []
1167   REQ_BGL = False
1168
1169   def ExpandNames(self):
1170     self.needed_locks = {
1171       locking.LEVEL_NODE: locking.ALL_SET,
1172       locking.LEVEL_INSTANCE: locking.ALL_SET,
1173     }
1174     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1175
1176   def CheckPrereq(self):
1177     """Check prerequisites.
1178
1179     This has no prerequisites.
1180
1181     """
1182     pass
1183
1184   def Exec(self, feedback_fn):
1185     """Verify integrity of cluster disks.
1186
1187     """
1188     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1189
1190     vg_name = self.cfg.GetVGName()
1191     nodes = utils.NiceSort(self.cfg.GetNodeList())
1192     instances = [self.cfg.GetInstanceInfo(name)
1193                  for name in self.cfg.GetInstanceList()]
1194
1195     nv_dict = {}
1196     for inst in instances:
1197       inst_lvs = {}
1198       if (not inst.admin_up or
1199           inst.disk_template not in constants.DTS_NET_MIRROR):
1200         continue
1201       inst.MapLVsByNode(inst_lvs)
1202       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1203       for node, vol_list in inst_lvs.iteritems():
1204         for vol in vol_list:
1205           nv_dict[(node, vol)] = inst
1206
1207     if not nv_dict:
1208       return result
1209
1210     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1211
1212     to_act = set()
1213     for node in nodes:
1214       # node_volume
1215       lvs = node_lvs[node]
1216       if lvs.failed:
1217         if not lvs.offline:
1218           self.LogWarning("Connection to node %s failed: %s" %
1219                           (node, lvs.data))
1220         continue
1221       lvs = lvs.data
1222       if isinstance(lvs, basestring):
1223         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1224         res_nlvm[node] = lvs
1225       elif not isinstance(lvs, dict):
1226         logging.warning("Connection to node %s failed or invalid data"
1227                         " returned", node)
1228         res_nodes.append(node)
1229         continue
1230
1231       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1232         inst = nv_dict.pop((node, lv_name), None)
1233         if (not lv_online and inst is not None
1234             and inst.name not in res_instances):
1235           res_instances.append(inst.name)
1236
1237     # any leftover items in nv_dict are missing LVs, let's arrange the
1238     # data better
1239     for key, inst in nv_dict.iteritems():
1240       if inst.name not in res_missing:
1241         res_missing[inst.name] = []
1242       res_missing[inst.name].append(key)
1243
1244     return result
1245
1246
1247 class LURenameCluster(LogicalUnit):
1248   """Rename the cluster.
1249
1250   """
1251   HPATH = "cluster-rename"
1252   HTYPE = constants.HTYPE_CLUSTER
1253   _OP_REQP = ["name"]
1254
1255   def BuildHooksEnv(self):
1256     """Build hooks env.
1257
1258     """
1259     env = {
1260       "OP_TARGET": self.cfg.GetClusterName(),
1261       "NEW_NAME": self.op.name,
1262       }
1263     mn = self.cfg.GetMasterNode()
1264     return env, [mn], [mn]
1265
1266   def CheckPrereq(self):
1267     """Verify that the passed name is a valid one.
1268
1269     """
1270     hostname = utils.HostInfo(self.op.name)
1271
1272     new_name = hostname.name
1273     self.ip = new_ip = hostname.ip
1274     old_name = self.cfg.GetClusterName()
1275     old_ip = self.cfg.GetMasterIP()
1276     if new_name == old_name and new_ip == old_ip:
1277       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1278                                  " cluster has changed")
1279     if new_ip != old_ip:
1280       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1281         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1282                                    " reachable on the network. Aborting." %
1283                                    new_ip)
1284
1285     self.op.name = new_name
1286
1287   def Exec(self, feedback_fn):
1288     """Rename the cluster.
1289
1290     """
1291     clustername = self.op.name
1292     ip = self.ip
1293
1294     # shutdown the master IP
1295     master = self.cfg.GetMasterNode()
1296     result = self.rpc.call_node_stop_master(master, False)
1297     if result.failed or not result.data:
1298       raise errors.OpExecError("Could not disable the master role")
1299
1300     try:
1301       cluster = self.cfg.GetClusterInfo()
1302       cluster.cluster_name = clustername
1303       cluster.master_ip = ip
1304       self.cfg.Update(cluster)
1305
1306       # update the known hosts file
1307       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1308       node_list = self.cfg.GetNodeList()
1309       try:
1310         node_list.remove(master)
1311       except ValueError:
1312         pass
1313       result = self.rpc.call_upload_file(node_list,
1314                                          constants.SSH_KNOWN_HOSTS_FILE)
1315       for to_node, to_result in result.iteritems():
1316         if to_result.failed or not to_result.data:
1317           logging.error("Copy of file %s to node %s failed",
1318                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1319
1320     finally:
1321       result = self.rpc.call_node_start_master(master, False)
1322       if result.failed or not result.data:
1323         self.LogWarning("Could not re-enable the master role on"
1324                         " the master, please restart manually.")
1325
1326
1327 def _RecursiveCheckIfLVMBased(disk):
1328   """Check if the given disk or its children are lvm-based.
1329
1330   @type disk: L{objects.Disk}
1331   @param disk: the disk to check
1332   @rtype: booleean
1333   @return: boolean indicating whether a LD_LV dev_type was found or not
1334
1335   """
1336   if disk.children:
1337     for chdisk in disk.children:
1338       if _RecursiveCheckIfLVMBased(chdisk):
1339         return True
1340   return disk.dev_type == constants.LD_LV
1341
1342
1343 class LUSetClusterParams(LogicalUnit):
1344   """Change the parameters of the cluster.
1345
1346   """
1347   HPATH = "cluster-modify"
1348   HTYPE = constants.HTYPE_CLUSTER
1349   _OP_REQP = []
1350   REQ_BGL = False
1351
1352   def CheckParameters(self):
1353     """Check parameters
1354
1355     """
1356     if not hasattr(self.op, "candidate_pool_size"):
1357       self.op.candidate_pool_size = None
1358     if self.op.candidate_pool_size is not None:
1359       try:
1360         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1361       except ValueError, err:
1362         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1363                                    str(err))
1364       if self.op.candidate_pool_size < 1:
1365         raise errors.OpPrereqError("At least one master candidate needed")
1366
1367   def ExpandNames(self):
1368     # FIXME: in the future maybe other cluster params won't require checking on
1369     # all nodes to be modified.
1370     self.needed_locks = {
1371       locking.LEVEL_NODE: locking.ALL_SET,
1372     }
1373     self.share_locks[locking.LEVEL_NODE] = 1
1374
1375   def BuildHooksEnv(self):
1376     """Build hooks env.
1377
1378     """
1379     env = {
1380       "OP_TARGET": self.cfg.GetClusterName(),
1381       "NEW_VG_NAME": self.op.vg_name,
1382       }
1383     mn = self.cfg.GetMasterNode()
1384     return env, [mn], [mn]
1385
1386   def CheckPrereq(self):
1387     """Check prerequisites.
1388
1389     This checks whether the given params don't conflict and
1390     if the given volume group is valid.
1391
1392     """
1393     # FIXME: This only works because there is only one parameter that can be
1394     # changed or removed.
1395     if self.op.vg_name is not None and not self.op.vg_name:
1396       instances = self.cfg.GetAllInstancesInfo().values()
1397       for inst in instances:
1398         for disk in inst.disks:
1399           if _RecursiveCheckIfLVMBased(disk):
1400             raise errors.OpPrereqError("Cannot disable lvm storage while"
1401                                        " lvm-based instances exist")
1402
1403     node_list = self.acquired_locks[locking.LEVEL_NODE]
1404
1405     # if vg_name not None, checks given volume group on all nodes
1406     if self.op.vg_name:
1407       vglist = self.rpc.call_vg_list(node_list)
1408       for node in node_list:
1409         if vglist[node].failed:
1410           # ignoring down node
1411           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1412           continue
1413         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1414                                               self.op.vg_name,
1415                                               constants.MIN_VG_SIZE)
1416         if vgstatus:
1417           raise errors.OpPrereqError("Error on node '%s': %s" %
1418                                      (node, vgstatus))
1419
1420     self.cluster = cluster = self.cfg.GetClusterInfo()
1421     # validate beparams changes
1422     if self.op.beparams:
1423       utils.CheckBEParams(self.op.beparams)
1424       self.new_beparams = cluster.FillDict(
1425         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1426
1427     # hypervisor list/parameters
1428     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1429     if self.op.hvparams:
1430       if not isinstance(self.op.hvparams, dict):
1431         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1432       for hv_name, hv_dict in self.op.hvparams.items():
1433         if hv_name not in self.new_hvparams:
1434           self.new_hvparams[hv_name] = hv_dict
1435         else:
1436           self.new_hvparams[hv_name].update(hv_dict)
1437
1438     if self.op.enabled_hypervisors is not None:
1439       self.hv_list = self.op.enabled_hypervisors
1440     else:
1441       self.hv_list = cluster.enabled_hypervisors
1442
1443     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1444       # either the enabled list has changed, or the parameters have, validate
1445       for hv_name, hv_params in self.new_hvparams.items():
1446         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1447             (self.op.enabled_hypervisors and
1448              hv_name in self.op.enabled_hypervisors)):
1449           # either this is a new hypervisor, or its parameters have changed
1450           hv_class = hypervisor.GetHypervisor(hv_name)
1451           hv_class.CheckParameterSyntax(hv_params)
1452           _CheckHVParams(self, node_list, hv_name, hv_params)
1453
1454   def Exec(self, feedback_fn):
1455     """Change the parameters of the cluster.
1456
1457     """
1458     if self.op.vg_name is not None:
1459       if self.op.vg_name != self.cfg.GetVGName():
1460         self.cfg.SetVGName(self.op.vg_name)
1461       else:
1462         feedback_fn("Cluster LVM configuration already in desired"
1463                     " state, not changing")
1464     if self.op.hvparams:
1465       self.cluster.hvparams = self.new_hvparams
1466     if self.op.enabled_hypervisors is not None:
1467       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1468     if self.op.beparams:
1469       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1470     if self.op.candidate_pool_size is not None:
1471       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1472
1473     self.cfg.Update(self.cluster)
1474
1475     # we want to update nodes after the cluster so that if any errors
1476     # happen, we have recorded and saved the cluster info
1477     if self.op.candidate_pool_size is not None:
1478       _AdjustCandidatePool(self)
1479
1480
1481 class LURedistributeConfig(NoHooksLU):
1482   """Force the redistribution of cluster configuration.
1483
1484   This is a very simple LU.
1485
1486   """
1487   _OP_REQP = []
1488   REQ_BGL = False
1489
1490   def ExpandNames(self):
1491     self.needed_locks = {
1492       locking.LEVEL_NODE: locking.ALL_SET,
1493     }
1494     self.share_locks[locking.LEVEL_NODE] = 1
1495
1496   def CheckPrereq(self):
1497     """Check prerequisites.
1498
1499     """
1500
1501   def Exec(self, feedback_fn):
1502     """Redistribute the configuration.
1503
1504     """
1505     self.cfg.Update(self.cfg.GetClusterInfo())
1506
1507
1508 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1509   """Sleep and poll for an instance's disk to sync.
1510
1511   """
1512   if not instance.disks:
1513     return True
1514
1515   if not oneshot:
1516     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1517
1518   node = instance.primary_node
1519
1520   for dev in instance.disks:
1521     lu.cfg.SetDiskID(dev, node)
1522
1523   retries = 0
1524   while True:
1525     max_time = 0
1526     done = True
1527     cumul_degraded = False
1528     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1529     if rstats.failed or not rstats.data:
1530       lu.LogWarning("Can't get any data from node %s", node)
1531       retries += 1
1532       if retries >= 10:
1533         raise errors.RemoteError("Can't contact node %s for mirror data,"
1534                                  " aborting." % node)
1535       time.sleep(6)
1536       continue
1537     rstats = rstats.data
1538     retries = 0
1539     for i, mstat in enumerate(rstats):
1540       if mstat is None:
1541         lu.LogWarning("Can't compute data for node %s/%s",
1542                            node, instance.disks[i].iv_name)
1543         continue
1544       # we ignore the ldisk parameter
1545       perc_done, est_time, is_degraded, _ = mstat
1546       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1547       if perc_done is not None:
1548         done = False
1549         if est_time is not None:
1550           rem_time = "%d estimated seconds remaining" % est_time
1551           max_time = est_time
1552         else:
1553           rem_time = "no time estimate"
1554         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1555                         (instance.disks[i].iv_name, perc_done, rem_time))
1556     if done or oneshot:
1557       break
1558
1559     time.sleep(min(60, max_time))
1560
1561   if done:
1562     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1563   return not cumul_degraded
1564
1565
1566 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1567   """Check that mirrors are not degraded.
1568
1569   The ldisk parameter, if True, will change the test from the
1570   is_degraded attribute (which represents overall non-ok status for
1571   the device(s)) to the ldisk (representing the local storage status).
1572
1573   """
1574   lu.cfg.SetDiskID(dev, node)
1575   if ldisk:
1576     idx = 6
1577   else:
1578     idx = 5
1579
1580   result = True
1581   if on_primary or dev.AssembleOnSecondary():
1582     rstats = lu.rpc.call_blockdev_find(node, dev)
1583     if rstats.failed or not rstats.data:
1584       logging.warning("Node %s: disk degraded, not found or node down", node)
1585       result = False
1586     else:
1587       result = result and (not rstats.data[idx])
1588   if dev.children:
1589     for child in dev.children:
1590       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1591
1592   return result
1593
1594
1595 class LUDiagnoseOS(NoHooksLU):
1596   """Logical unit for OS diagnose/query.
1597
1598   """
1599   _OP_REQP = ["output_fields", "names"]
1600   REQ_BGL = False
1601   _FIELDS_STATIC = utils.FieldSet()
1602   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1603
1604   def ExpandNames(self):
1605     if self.op.names:
1606       raise errors.OpPrereqError("Selective OS query not supported")
1607
1608     _CheckOutputFields(static=self._FIELDS_STATIC,
1609                        dynamic=self._FIELDS_DYNAMIC,
1610                        selected=self.op.output_fields)
1611
1612     # Lock all nodes, in shared mode
1613     self.needed_locks = {}
1614     self.share_locks[locking.LEVEL_NODE] = 1
1615     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1616
1617   def CheckPrereq(self):
1618     """Check prerequisites.
1619
1620     """
1621
1622   @staticmethod
1623   def _DiagnoseByOS(node_list, rlist):
1624     """Remaps a per-node return list into an a per-os per-node dictionary
1625
1626     @param node_list: a list with the names of all nodes
1627     @param rlist: a map with node names as keys and OS objects as values
1628
1629     @rtype: dict
1630     @returns: a dictionary with osnames as keys and as value another map, with
1631         nodes as keys and list of OS objects as values, eg::
1632
1633           {"debian-etch": {"node1": [<object>,...],
1634                            "node2": [<object>,]}
1635           }
1636
1637     """
1638     all_os = {}
1639     for node_name, nr in rlist.iteritems():
1640       if nr.failed or not nr.data:
1641         continue
1642       for os_obj in nr.data:
1643         if os_obj.name not in all_os:
1644           # build a list of nodes for this os containing empty lists
1645           # for each node in node_list
1646           all_os[os_obj.name] = {}
1647           for nname in node_list:
1648             all_os[os_obj.name][nname] = []
1649         all_os[os_obj.name][node_name].append(os_obj)
1650     return all_os
1651
1652   def Exec(self, feedback_fn):
1653     """Compute the list of OSes.
1654
1655     """
1656     node_list = self.acquired_locks[locking.LEVEL_NODE]
1657     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1658                    if node in node_list]
1659     node_data = self.rpc.call_os_diagnose(valid_nodes)
1660     if node_data == False:
1661       raise errors.OpExecError("Can't gather the list of OSes")
1662     pol = self._DiagnoseByOS(valid_nodes, node_data)
1663     output = []
1664     for os_name, os_data in pol.iteritems():
1665       row = []
1666       for field in self.op.output_fields:
1667         if field == "name":
1668           val = os_name
1669         elif field == "valid":
1670           val = utils.all([osl and osl[0] for osl in os_data.values()])
1671         elif field == "node_status":
1672           val = {}
1673           for node_name, nos_list in os_data.iteritems():
1674             val[node_name] = [(v.status, v.path) for v in nos_list]
1675         else:
1676           raise errors.ParameterError(field)
1677         row.append(val)
1678       output.append(row)
1679
1680     return output
1681
1682
1683 class LURemoveNode(LogicalUnit):
1684   """Logical unit for removing a node.
1685
1686   """
1687   HPATH = "node-remove"
1688   HTYPE = constants.HTYPE_NODE
1689   _OP_REQP = ["node_name"]
1690
1691   def BuildHooksEnv(self):
1692     """Build hooks env.
1693
1694     This doesn't run on the target node in the pre phase as a failed
1695     node would then be impossible to remove.
1696
1697     """
1698     env = {
1699       "OP_TARGET": self.op.node_name,
1700       "NODE_NAME": self.op.node_name,
1701       }
1702     all_nodes = self.cfg.GetNodeList()
1703     all_nodes.remove(self.op.node_name)
1704     return env, all_nodes, all_nodes
1705
1706   def CheckPrereq(self):
1707     """Check prerequisites.
1708
1709     This checks:
1710      - the node exists in the configuration
1711      - it does not have primary or secondary instances
1712      - it's not the master
1713
1714     Any errors are signalled by raising errors.OpPrereqError.
1715
1716     """
1717     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1718     if node is None:
1719       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1720
1721     instance_list = self.cfg.GetInstanceList()
1722
1723     masternode = self.cfg.GetMasterNode()
1724     if node.name == masternode:
1725       raise errors.OpPrereqError("Node is the master node,"
1726                                  " you need to failover first.")
1727
1728     for instance_name in instance_list:
1729       instance = self.cfg.GetInstanceInfo(instance_name)
1730       if node.name in instance.all_nodes:
1731         raise errors.OpPrereqError("Instance %s is still running on the node,"
1732                                    " please remove first." % instance_name)
1733     self.op.node_name = node.name
1734     self.node = node
1735
1736   def Exec(self, feedback_fn):
1737     """Removes the node from the cluster.
1738
1739     """
1740     node = self.node
1741     logging.info("Stopping the node daemon and removing configs from node %s",
1742                  node.name)
1743
1744     self.context.RemoveNode(node.name)
1745
1746     self.rpc.call_node_leave_cluster(node.name)
1747
1748     # Promote nodes to master candidate as needed
1749     _AdjustCandidatePool(self)
1750
1751
1752 class LUQueryNodes(NoHooksLU):
1753   """Logical unit for querying nodes.
1754
1755   """
1756   _OP_REQP = ["output_fields", "names", "use_locking"]
1757   REQ_BGL = False
1758   _FIELDS_DYNAMIC = utils.FieldSet(
1759     "dtotal", "dfree",
1760     "mtotal", "mnode", "mfree",
1761     "bootid",
1762     "ctotal",
1763     )
1764
1765   _FIELDS_STATIC = utils.FieldSet(
1766     "name", "pinst_cnt", "sinst_cnt",
1767     "pinst_list", "sinst_list",
1768     "pip", "sip", "tags",
1769     "serial_no",
1770     "master_candidate",
1771     "master",
1772     "offline",
1773     )
1774
1775   def ExpandNames(self):
1776     _CheckOutputFields(static=self._FIELDS_STATIC,
1777                        dynamic=self._FIELDS_DYNAMIC,
1778                        selected=self.op.output_fields)
1779
1780     self.needed_locks = {}
1781     self.share_locks[locking.LEVEL_NODE] = 1
1782
1783     if self.op.names:
1784       self.wanted = _GetWantedNodes(self, self.op.names)
1785     else:
1786       self.wanted = locking.ALL_SET
1787
1788     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1789     self.do_locking = self.do_node_query and self.op.use_locking
1790     if self.do_locking:
1791       # if we don't request only static fields, we need to lock the nodes
1792       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1793
1794
1795   def CheckPrereq(self):
1796     """Check prerequisites.
1797
1798     """
1799     # The validation of the node list is done in the _GetWantedNodes,
1800     # if non empty, and if empty, there's no validation to do
1801     pass
1802
1803   def Exec(self, feedback_fn):
1804     """Computes the list of nodes and their attributes.
1805
1806     """
1807     all_info = self.cfg.GetAllNodesInfo()
1808     if self.do_locking:
1809       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1810     elif self.wanted != locking.ALL_SET:
1811       nodenames = self.wanted
1812       missing = set(nodenames).difference(all_info.keys())
1813       if missing:
1814         raise errors.OpExecError(
1815           "Some nodes were removed before retrieving their data: %s" % missing)
1816     else:
1817       nodenames = all_info.keys()
1818
1819     nodenames = utils.NiceSort(nodenames)
1820     nodelist = [all_info[name] for name in nodenames]
1821
1822     # begin data gathering
1823
1824     if self.do_node_query:
1825       live_data = {}
1826       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1827                                           self.cfg.GetHypervisorType())
1828       for name in nodenames:
1829         nodeinfo = node_data[name]
1830         if not nodeinfo.failed and nodeinfo.data:
1831           nodeinfo = nodeinfo.data
1832           fn = utils.TryConvert
1833           live_data[name] = {
1834             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1835             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1836             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1837             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1838             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1839             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1840             "bootid": nodeinfo.get('bootid', None),
1841             }
1842         else:
1843           live_data[name] = {}
1844     else:
1845       live_data = dict.fromkeys(nodenames, {})
1846
1847     node_to_primary = dict([(name, set()) for name in nodenames])
1848     node_to_secondary = dict([(name, set()) for name in nodenames])
1849
1850     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1851                              "sinst_cnt", "sinst_list"))
1852     if inst_fields & frozenset(self.op.output_fields):
1853       instancelist = self.cfg.GetInstanceList()
1854
1855       for instance_name in instancelist:
1856         inst = self.cfg.GetInstanceInfo(instance_name)
1857         if inst.primary_node in node_to_primary:
1858           node_to_primary[inst.primary_node].add(inst.name)
1859         for secnode in inst.secondary_nodes:
1860           if secnode in node_to_secondary:
1861             node_to_secondary[secnode].add(inst.name)
1862
1863     master_node = self.cfg.GetMasterNode()
1864
1865     # end data gathering
1866
1867     output = []
1868     for node in nodelist:
1869       node_output = []
1870       for field in self.op.output_fields:
1871         if field == "name":
1872           val = node.name
1873         elif field == "pinst_list":
1874           val = list(node_to_primary[node.name])
1875         elif field == "sinst_list":
1876           val = list(node_to_secondary[node.name])
1877         elif field == "pinst_cnt":
1878           val = len(node_to_primary[node.name])
1879         elif field == "sinst_cnt":
1880           val = len(node_to_secondary[node.name])
1881         elif field == "pip":
1882           val = node.primary_ip
1883         elif field == "sip":
1884           val = node.secondary_ip
1885         elif field == "tags":
1886           val = list(node.GetTags())
1887         elif field == "serial_no":
1888           val = node.serial_no
1889         elif field == "master_candidate":
1890           val = node.master_candidate
1891         elif field == "master":
1892           val = node.name == master_node
1893         elif field == "offline":
1894           val = node.offline
1895         elif self._FIELDS_DYNAMIC.Matches(field):
1896           val = live_data[node.name].get(field, None)
1897         else:
1898           raise errors.ParameterError(field)
1899         node_output.append(val)
1900       output.append(node_output)
1901
1902     return output
1903
1904
1905 class LUQueryNodeVolumes(NoHooksLU):
1906   """Logical unit for getting volumes on node(s).
1907
1908   """
1909   _OP_REQP = ["nodes", "output_fields"]
1910   REQ_BGL = False
1911   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1912   _FIELDS_STATIC = utils.FieldSet("node")
1913
1914   def ExpandNames(self):
1915     _CheckOutputFields(static=self._FIELDS_STATIC,
1916                        dynamic=self._FIELDS_DYNAMIC,
1917                        selected=self.op.output_fields)
1918
1919     self.needed_locks = {}
1920     self.share_locks[locking.LEVEL_NODE] = 1
1921     if not self.op.nodes:
1922       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1923     else:
1924       self.needed_locks[locking.LEVEL_NODE] = \
1925         _GetWantedNodes(self, self.op.nodes)
1926
1927   def CheckPrereq(self):
1928     """Check prerequisites.
1929
1930     This checks that the fields required are valid output fields.
1931
1932     """
1933     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1934
1935   def Exec(self, feedback_fn):
1936     """Computes the list of nodes and their attributes.
1937
1938     """
1939     nodenames = self.nodes
1940     volumes = self.rpc.call_node_volumes(nodenames)
1941
1942     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1943              in self.cfg.GetInstanceList()]
1944
1945     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1946
1947     output = []
1948     for node in nodenames:
1949       if node not in volumes or volumes[node].failed or not volumes[node].data:
1950         continue
1951
1952       node_vols = volumes[node].data[:]
1953       node_vols.sort(key=lambda vol: vol['dev'])
1954
1955       for vol in node_vols:
1956         node_output = []
1957         for field in self.op.output_fields:
1958           if field == "node":
1959             val = node
1960           elif field == "phys":
1961             val = vol['dev']
1962           elif field == "vg":
1963             val = vol['vg']
1964           elif field == "name":
1965             val = vol['name']
1966           elif field == "size":
1967             val = int(float(vol['size']))
1968           elif field == "instance":
1969             for inst in ilist:
1970               if node not in lv_by_node[inst]:
1971                 continue
1972               if vol['name'] in lv_by_node[inst][node]:
1973                 val = inst.name
1974                 break
1975             else:
1976               val = '-'
1977           else:
1978             raise errors.ParameterError(field)
1979           node_output.append(str(val))
1980
1981         output.append(node_output)
1982
1983     return output
1984
1985
1986 class LUAddNode(LogicalUnit):
1987   """Logical unit for adding node to the cluster.
1988
1989   """
1990   HPATH = "node-add"
1991   HTYPE = constants.HTYPE_NODE
1992   _OP_REQP = ["node_name"]
1993
1994   def BuildHooksEnv(self):
1995     """Build hooks env.
1996
1997     This will run on all nodes before, and on all nodes + the new node after.
1998
1999     """
2000     env = {
2001       "OP_TARGET": self.op.node_name,
2002       "NODE_NAME": self.op.node_name,
2003       "NODE_PIP": self.op.primary_ip,
2004       "NODE_SIP": self.op.secondary_ip,
2005       }
2006     nodes_0 = self.cfg.GetNodeList()
2007     nodes_1 = nodes_0 + [self.op.node_name, ]
2008     return env, nodes_0, nodes_1
2009
2010   def CheckPrereq(self):
2011     """Check prerequisites.
2012
2013     This checks:
2014      - the new node is not already in the config
2015      - it is resolvable
2016      - its parameters (single/dual homed) matches the cluster
2017
2018     Any errors are signalled by raising errors.OpPrereqError.
2019
2020     """
2021     node_name = self.op.node_name
2022     cfg = self.cfg
2023
2024     dns_data = utils.HostInfo(node_name)
2025
2026     node = dns_data.name
2027     primary_ip = self.op.primary_ip = dns_data.ip
2028     secondary_ip = getattr(self.op, "secondary_ip", None)
2029     if secondary_ip is None:
2030       secondary_ip = primary_ip
2031     if not utils.IsValidIP(secondary_ip):
2032       raise errors.OpPrereqError("Invalid secondary IP given")
2033     self.op.secondary_ip = secondary_ip
2034
2035     node_list = cfg.GetNodeList()
2036     if not self.op.readd and node in node_list:
2037       raise errors.OpPrereqError("Node %s is already in the configuration" %
2038                                  node)
2039     elif self.op.readd and node not in node_list:
2040       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2041
2042     for existing_node_name in node_list:
2043       existing_node = cfg.GetNodeInfo(existing_node_name)
2044
2045       if self.op.readd and node == existing_node_name:
2046         if (existing_node.primary_ip != primary_ip or
2047             existing_node.secondary_ip != secondary_ip):
2048           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2049                                      " address configuration as before")
2050         continue
2051
2052       if (existing_node.primary_ip == primary_ip or
2053           existing_node.secondary_ip == primary_ip or
2054           existing_node.primary_ip == secondary_ip or
2055           existing_node.secondary_ip == secondary_ip):
2056         raise errors.OpPrereqError("New node ip address(es) conflict with"
2057                                    " existing node %s" % existing_node.name)
2058
2059     # check that the type of the node (single versus dual homed) is the
2060     # same as for the master
2061     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2062     master_singlehomed = myself.secondary_ip == myself.primary_ip
2063     newbie_singlehomed = secondary_ip == primary_ip
2064     if master_singlehomed != newbie_singlehomed:
2065       if master_singlehomed:
2066         raise errors.OpPrereqError("The master has no private ip but the"
2067                                    " new node has one")
2068       else:
2069         raise errors.OpPrereqError("The master has a private ip but the"
2070                                    " new node doesn't have one")
2071
2072     # checks reachablity
2073     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2074       raise errors.OpPrereqError("Node not reachable by ping")
2075
2076     if not newbie_singlehomed:
2077       # check reachability from my secondary ip to newbie's secondary ip
2078       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2079                            source=myself.secondary_ip):
2080         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2081                                    " based ping to noded port")
2082
2083     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2084     mc_now, _ = self.cfg.GetMasterCandidateStats()
2085     master_candidate = mc_now < cp_size
2086
2087     self.new_node = objects.Node(name=node,
2088                                  primary_ip=primary_ip,
2089                                  secondary_ip=secondary_ip,
2090                                  master_candidate=master_candidate,
2091                                  offline=False)
2092
2093   def Exec(self, feedback_fn):
2094     """Adds the new node to the cluster.
2095
2096     """
2097     new_node = self.new_node
2098     node = new_node.name
2099
2100     # check connectivity
2101     result = self.rpc.call_version([node])[node]
2102     result.Raise()
2103     if result.data:
2104       if constants.PROTOCOL_VERSION == result.data:
2105         logging.info("Communication to node %s fine, sw version %s match",
2106                      node, result.data)
2107       else:
2108         raise errors.OpExecError("Version mismatch master version %s,"
2109                                  " node version %s" %
2110                                  (constants.PROTOCOL_VERSION, result.data))
2111     else:
2112       raise errors.OpExecError("Cannot get version from the new node")
2113
2114     # setup ssh on node
2115     logging.info("Copy ssh key to node %s", node)
2116     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2117     keyarray = []
2118     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2119                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2120                 priv_key, pub_key]
2121
2122     for i in keyfiles:
2123       f = open(i, 'r')
2124       try:
2125         keyarray.append(f.read())
2126       finally:
2127         f.close()
2128
2129     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2130                                     keyarray[2],
2131                                     keyarray[3], keyarray[4], keyarray[5])
2132
2133     msg = result.RemoteFailMsg()
2134     if msg:
2135       raise errors.OpExecError("Cannot transfer ssh keys to the"
2136                                " new node: %s" % msg)
2137
2138     # Add node to our /etc/hosts, and add key to known_hosts
2139     utils.AddHostToEtcHosts(new_node.name)
2140
2141     if new_node.secondary_ip != new_node.primary_ip:
2142       result = self.rpc.call_node_has_ip_address(new_node.name,
2143                                                  new_node.secondary_ip)
2144       if result.failed or not result.data:
2145         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2146                                  " you gave (%s). Please fix and re-run this"
2147                                  " command." % new_node.secondary_ip)
2148
2149     node_verify_list = [self.cfg.GetMasterNode()]
2150     node_verify_param = {
2151       'nodelist': [node],
2152       # TODO: do a node-net-test as well?
2153     }
2154
2155     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2156                                        self.cfg.GetClusterName())
2157     for verifier in node_verify_list:
2158       if result[verifier].failed or not result[verifier].data:
2159         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2160                                  " for remote verification" % verifier)
2161       if result[verifier].data['nodelist']:
2162         for failed in result[verifier].data['nodelist']:
2163           feedback_fn("ssh/hostname verification failed %s -> %s" %
2164                       (verifier, result[verifier].data['nodelist'][failed]))
2165         raise errors.OpExecError("ssh/hostname verification failed.")
2166
2167     # Distribute updated /etc/hosts and known_hosts to all nodes,
2168     # including the node just added
2169     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2170     dist_nodes = self.cfg.GetNodeList()
2171     if not self.op.readd:
2172       dist_nodes.append(node)
2173     if myself.name in dist_nodes:
2174       dist_nodes.remove(myself.name)
2175
2176     logging.debug("Copying hosts and known_hosts to all nodes")
2177     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2178       result = self.rpc.call_upload_file(dist_nodes, fname)
2179       for to_node, to_result in result.iteritems():
2180         if to_result.failed or not to_result.data:
2181           logging.error("Copy of file %s to node %s failed", fname, to_node)
2182
2183     to_copy = []
2184     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2185     if constants.HTS_USE_VNC.intersection(enabled_hypervisors):
2186       to_copy.append(constants.VNC_PASSWORD_FILE)
2187
2188     for fname in to_copy:
2189       result = self.rpc.call_upload_file([node], fname)
2190       if result[node].failed or not result[node]:
2191         logging.error("Could not copy file %s to node %s", fname, node)
2192
2193     if self.op.readd:
2194       self.context.ReaddNode(new_node)
2195     else:
2196       self.context.AddNode(new_node)
2197
2198
2199 class LUSetNodeParams(LogicalUnit):
2200   """Modifies the parameters of a node.
2201
2202   """
2203   HPATH = "node-modify"
2204   HTYPE = constants.HTYPE_NODE
2205   _OP_REQP = ["node_name"]
2206   REQ_BGL = False
2207
2208   def CheckArguments(self):
2209     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2210     if node_name is None:
2211       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2212     self.op.node_name = node_name
2213     _CheckBooleanOpField(self.op, 'master_candidate')
2214     _CheckBooleanOpField(self.op, 'offline')
2215     if self.op.master_candidate is None and self.op.offline is None:
2216       raise errors.OpPrereqError("Please pass at least one modification")
2217     if self.op.offline == True and self.op.master_candidate == True:
2218       raise errors.OpPrereqError("Can't set the node into offline and"
2219                                  " master_candidate at the same time")
2220
2221   def ExpandNames(self):
2222     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2223
2224   def BuildHooksEnv(self):
2225     """Build hooks env.
2226
2227     This runs on the master node.
2228
2229     """
2230     env = {
2231       "OP_TARGET": self.op.node_name,
2232       "MASTER_CANDIDATE": str(self.op.master_candidate),
2233       "OFFLINE": str(self.op.offline),
2234       }
2235     nl = [self.cfg.GetMasterNode(),
2236           self.op.node_name]
2237     return env, nl, nl
2238
2239   def CheckPrereq(self):
2240     """Check prerequisites.
2241
2242     This only checks the instance list against the existing names.
2243
2244     """
2245     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2246
2247     if ((self.op.master_candidate == False or self.op.offline == True)
2248         and node.master_candidate):
2249       # we will demote the node from master_candidate
2250       if self.op.node_name == self.cfg.GetMasterNode():
2251         raise errors.OpPrereqError("The master node has to be a"
2252                                    " master candidate and online")
2253       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2254       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2255       if num_candidates <= cp_size:
2256         msg = ("Not enough master candidates (desired"
2257                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2258         if self.op.force:
2259           self.LogWarning(msg)
2260         else:
2261           raise errors.OpPrereqError(msg)
2262
2263     if (self.op.master_candidate == True and node.offline and
2264         not self.op.offline == False):
2265       raise errors.OpPrereqError("Can't set an offline node to"
2266                                  " master_candidate")
2267
2268     return
2269
2270   def Exec(self, feedback_fn):
2271     """Modifies a node.
2272
2273     """
2274     node = self.node
2275
2276     result = []
2277
2278     if self.op.offline is not None:
2279       node.offline = self.op.offline
2280       result.append(("offline", str(self.op.offline)))
2281       if self.op.offline == True and node.master_candidate:
2282         node.master_candidate = False
2283         result.append(("master_candidate", "auto-demotion due to offline"))
2284
2285     if self.op.master_candidate is not None:
2286       node.master_candidate = self.op.master_candidate
2287       result.append(("master_candidate", str(self.op.master_candidate)))
2288       if self.op.master_candidate == False:
2289         rrc = self.rpc.call_node_demote_from_mc(node.name)
2290         if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2291             or len(rrc.data) != 2):
2292           self.LogWarning("Node rpc error: %s" % rrc.error)
2293         elif not rrc.data[0]:
2294           self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2295
2296     # this will trigger configuration file update, if needed
2297     self.cfg.Update(node)
2298     # this will trigger job queue propagation or cleanup
2299     if self.op.node_name != self.cfg.GetMasterNode():
2300       self.context.ReaddNode(node)
2301
2302     return result
2303
2304
2305 class LUQueryClusterInfo(NoHooksLU):
2306   """Query cluster configuration.
2307
2308   """
2309   _OP_REQP = []
2310   REQ_BGL = False
2311
2312   def ExpandNames(self):
2313     self.needed_locks = {}
2314
2315   def CheckPrereq(self):
2316     """No prerequsites needed for this LU.
2317
2318     """
2319     pass
2320
2321   def Exec(self, feedback_fn):
2322     """Return cluster config.
2323
2324     """
2325     cluster = self.cfg.GetClusterInfo()
2326     result = {
2327       "software_version": constants.RELEASE_VERSION,
2328       "protocol_version": constants.PROTOCOL_VERSION,
2329       "config_version": constants.CONFIG_VERSION,
2330       "os_api_version": constants.OS_API_VERSION,
2331       "export_version": constants.EXPORT_VERSION,
2332       "architecture": (platform.architecture()[0], platform.machine()),
2333       "name": cluster.cluster_name,
2334       "master": cluster.master_node,
2335       "default_hypervisor": cluster.default_hypervisor,
2336       "enabled_hypervisors": cluster.enabled_hypervisors,
2337       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2338                         for hypervisor in cluster.enabled_hypervisors]),
2339       "beparams": cluster.beparams,
2340       "candidate_pool_size": cluster.candidate_pool_size,
2341       }
2342
2343     return result
2344
2345
2346 class LUQueryConfigValues(NoHooksLU):
2347   """Return configuration values.
2348
2349   """
2350   _OP_REQP = []
2351   REQ_BGL = False
2352   _FIELDS_DYNAMIC = utils.FieldSet()
2353   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2354
2355   def ExpandNames(self):
2356     self.needed_locks = {}
2357
2358     _CheckOutputFields(static=self._FIELDS_STATIC,
2359                        dynamic=self._FIELDS_DYNAMIC,
2360                        selected=self.op.output_fields)
2361
2362   def CheckPrereq(self):
2363     """No prerequisites.
2364
2365     """
2366     pass
2367
2368   def Exec(self, feedback_fn):
2369     """Dump a representation of the cluster config to the standard output.
2370
2371     """
2372     values = []
2373     for field in self.op.output_fields:
2374       if field == "cluster_name":
2375         entry = self.cfg.GetClusterName()
2376       elif field == "master_node":
2377         entry = self.cfg.GetMasterNode()
2378       elif field == "drain_flag":
2379         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2380       else:
2381         raise errors.ParameterError(field)
2382       values.append(entry)
2383     return values
2384
2385
2386 class LUActivateInstanceDisks(NoHooksLU):
2387   """Bring up an instance's disks.
2388
2389   """
2390   _OP_REQP = ["instance_name"]
2391   REQ_BGL = False
2392
2393   def ExpandNames(self):
2394     self._ExpandAndLockInstance()
2395     self.needed_locks[locking.LEVEL_NODE] = []
2396     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2397
2398   def DeclareLocks(self, level):
2399     if level == locking.LEVEL_NODE:
2400       self._LockInstancesNodes()
2401
2402   def CheckPrereq(self):
2403     """Check prerequisites.
2404
2405     This checks that the instance is in the cluster.
2406
2407     """
2408     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2409     assert self.instance is not None, \
2410       "Cannot retrieve locked instance %s" % self.op.instance_name
2411     _CheckNodeOnline(self, self.instance.primary_node)
2412
2413   def Exec(self, feedback_fn):
2414     """Activate the disks.
2415
2416     """
2417     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2418     if not disks_ok:
2419       raise errors.OpExecError("Cannot activate block devices")
2420
2421     return disks_info
2422
2423
2424 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2425   """Prepare the block devices for an instance.
2426
2427   This sets up the block devices on all nodes.
2428
2429   @type lu: L{LogicalUnit}
2430   @param lu: the logical unit on whose behalf we execute
2431   @type instance: L{objects.Instance}
2432   @param instance: the instance for whose disks we assemble
2433   @type ignore_secondaries: boolean
2434   @param ignore_secondaries: if true, errors on secondary nodes
2435       won't result in an error return from the function
2436   @return: False if the operation failed, otherwise a list of
2437       (host, instance_visible_name, node_visible_name)
2438       with the mapping from node devices to instance devices
2439
2440   """
2441   device_info = []
2442   disks_ok = True
2443   iname = instance.name
2444   # With the two passes mechanism we try to reduce the window of
2445   # opportunity for the race condition of switching DRBD to primary
2446   # before handshaking occured, but we do not eliminate it
2447
2448   # The proper fix would be to wait (with some limits) until the
2449   # connection has been made and drbd transitions from WFConnection
2450   # into any other network-connected state (Connected, SyncTarget,
2451   # SyncSource, etc.)
2452
2453   # 1st pass, assemble on all nodes in secondary mode
2454   for inst_disk in instance.disks:
2455     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2456       lu.cfg.SetDiskID(node_disk, node)
2457       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2458       if result.failed or not result:
2459         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2460                            " (is_primary=False, pass=1)",
2461                            inst_disk.iv_name, node)
2462         if not ignore_secondaries:
2463           disks_ok = False
2464
2465   # FIXME: race condition on drbd migration to primary
2466
2467   # 2nd pass, do only the primary node
2468   for inst_disk in instance.disks:
2469     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2470       if node != instance.primary_node:
2471         continue
2472       lu.cfg.SetDiskID(node_disk, node)
2473       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2474       if result.failed or not result:
2475         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2476                            " (is_primary=True, pass=2)",
2477                            inst_disk.iv_name, node)
2478         disks_ok = False
2479     device_info.append((instance.primary_node, inst_disk.iv_name, result.data))
2480
2481   # leave the disks configured for the primary node
2482   # this is a workaround that would be fixed better by
2483   # improving the logical/physical id handling
2484   for disk in instance.disks:
2485     lu.cfg.SetDiskID(disk, instance.primary_node)
2486
2487   return disks_ok, device_info
2488
2489
2490 def _StartInstanceDisks(lu, instance, force):
2491   """Start the disks of an instance.
2492
2493   """
2494   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2495                                            ignore_secondaries=force)
2496   if not disks_ok:
2497     _ShutdownInstanceDisks(lu, instance)
2498     if force is not None and not force:
2499       lu.proc.LogWarning("", hint="If the message above refers to a"
2500                          " secondary node,"
2501                          " you can retry the operation using '--force'.")
2502     raise errors.OpExecError("Disk consistency error")
2503
2504
2505 class LUDeactivateInstanceDisks(NoHooksLU):
2506   """Shutdown an instance's disks.
2507
2508   """
2509   _OP_REQP = ["instance_name"]
2510   REQ_BGL = False
2511
2512   def ExpandNames(self):
2513     self._ExpandAndLockInstance()
2514     self.needed_locks[locking.LEVEL_NODE] = []
2515     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2516
2517   def DeclareLocks(self, level):
2518     if level == locking.LEVEL_NODE:
2519       self._LockInstancesNodes()
2520
2521   def CheckPrereq(self):
2522     """Check prerequisites.
2523
2524     This checks that the instance is in the cluster.
2525
2526     """
2527     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2528     assert self.instance is not None, \
2529       "Cannot retrieve locked instance %s" % self.op.instance_name
2530
2531   def Exec(self, feedback_fn):
2532     """Deactivate the disks
2533
2534     """
2535     instance = self.instance
2536     _SafeShutdownInstanceDisks(self, instance)
2537
2538
2539 def _SafeShutdownInstanceDisks(lu, instance):
2540   """Shutdown block devices of an instance.
2541
2542   This function checks if an instance is running, before calling
2543   _ShutdownInstanceDisks.
2544
2545   """
2546   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2547                                       [instance.hypervisor])
2548   ins_l = ins_l[instance.primary_node]
2549   if ins_l.failed or not isinstance(ins_l.data, list):
2550     raise errors.OpExecError("Can't contact node '%s'" %
2551                              instance.primary_node)
2552
2553   if instance.name in ins_l.data:
2554     raise errors.OpExecError("Instance is running, can't shutdown"
2555                              " block devices.")
2556
2557   _ShutdownInstanceDisks(lu, instance)
2558
2559
2560 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2561   """Shutdown block devices of an instance.
2562
2563   This does the shutdown on all nodes of the instance.
2564
2565   If the ignore_primary is false, errors on the primary node are
2566   ignored.
2567
2568   """
2569   result = True
2570   for disk in instance.disks:
2571     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2572       lu.cfg.SetDiskID(top_disk, node)
2573       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2574       if result.failed or not result.data:
2575         logging.error("Could not shutdown block device %s on node %s",
2576                       disk.iv_name, node)
2577         if not ignore_primary or node != instance.primary_node:
2578           result = False
2579   return result
2580
2581
2582 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2583   """Checks if a node has enough free memory.
2584
2585   This function check if a given node has the needed amount of free
2586   memory. In case the node has less memory or we cannot get the
2587   information from the node, this function raise an OpPrereqError
2588   exception.
2589
2590   @type lu: C{LogicalUnit}
2591   @param lu: a logical unit from which we get configuration data
2592   @type node: C{str}
2593   @param node: the node to check
2594   @type reason: C{str}
2595   @param reason: string to use in the error message
2596   @type requested: C{int}
2597   @param requested: the amount of memory in MiB to check for
2598   @type hypervisor_name: C{str}
2599   @param hypervisor_name: the hypervisor to ask for memory stats
2600   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2601       we cannot check the node
2602
2603   """
2604   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2605   nodeinfo[node].Raise()
2606   free_mem = nodeinfo[node].data.get('memory_free')
2607   if not isinstance(free_mem, int):
2608     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2609                              " was '%s'" % (node, free_mem))
2610   if requested > free_mem:
2611     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2612                              " needed %s MiB, available %s MiB" %
2613                              (node, reason, requested, free_mem))
2614
2615
2616 class LUStartupInstance(LogicalUnit):
2617   """Starts an instance.
2618
2619   """
2620   HPATH = "instance-start"
2621   HTYPE = constants.HTYPE_INSTANCE
2622   _OP_REQP = ["instance_name", "force"]
2623   REQ_BGL = False
2624
2625   def ExpandNames(self):
2626     self._ExpandAndLockInstance()
2627
2628   def BuildHooksEnv(self):
2629     """Build hooks env.
2630
2631     This runs on master, primary and secondary nodes of the instance.
2632
2633     """
2634     env = {
2635       "FORCE": self.op.force,
2636       }
2637     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2638     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2639     return env, nl, nl
2640
2641   def CheckPrereq(self):
2642     """Check prerequisites.
2643
2644     This checks that the instance is in the cluster.
2645
2646     """
2647     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2648     assert self.instance is not None, \
2649       "Cannot retrieve locked instance %s" % self.op.instance_name
2650
2651     _CheckNodeOnline(self, instance.primary_node)
2652
2653     bep = self.cfg.GetClusterInfo().FillBE(instance)
2654     # check bridges existance
2655     _CheckInstanceBridgesExist(self, instance)
2656
2657     _CheckNodeFreeMemory(self, instance.primary_node,
2658                          "starting instance %s" % instance.name,
2659                          bep[constants.BE_MEMORY], instance.hypervisor)
2660
2661   def Exec(self, feedback_fn):
2662     """Start the instance.
2663
2664     """
2665     instance = self.instance
2666     force = self.op.force
2667     extra_args = getattr(self.op, "extra_args", "")
2668
2669     self.cfg.MarkInstanceUp(instance.name)
2670
2671     node_current = instance.primary_node
2672
2673     _StartInstanceDisks(self, instance, force)
2674
2675     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2676     msg = result.RemoteFailMsg()
2677     if msg:
2678       _ShutdownInstanceDisks(self, instance)
2679       raise errors.OpExecError("Could not start instance: %s" % msg)
2680
2681
2682 class LURebootInstance(LogicalUnit):
2683   """Reboot an instance.
2684
2685   """
2686   HPATH = "instance-reboot"
2687   HTYPE = constants.HTYPE_INSTANCE
2688   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2689   REQ_BGL = False
2690
2691   def ExpandNames(self):
2692     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2693                                    constants.INSTANCE_REBOOT_HARD,
2694                                    constants.INSTANCE_REBOOT_FULL]:
2695       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2696                                   (constants.INSTANCE_REBOOT_SOFT,
2697                                    constants.INSTANCE_REBOOT_HARD,
2698                                    constants.INSTANCE_REBOOT_FULL))
2699     self._ExpandAndLockInstance()
2700
2701   def BuildHooksEnv(self):
2702     """Build hooks env.
2703
2704     This runs on master, primary and secondary nodes of the instance.
2705
2706     """
2707     env = {
2708       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2709       }
2710     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2711     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2712     return env, nl, nl
2713
2714   def CheckPrereq(self):
2715     """Check prerequisites.
2716
2717     This checks that the instance is in the cluster.
2718
2719     """
2720     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2721     assert self.instance is not None, \
2722       "Cannot retrieve locked instance %s" % self.op.instance_name
2723
2724     _CheckNodeOnline(self, instance.primary_node)
2725
2726     # check bridges existance
2727     _CheckInstanceBridgesExist(self, instance)
2728
2729   def Exec(self, feedback_fn):
2730     """Reboot the instance.
2731
2732     """
2733     instance = self.instance
2734     ignore_secondaries = self.op.ignore_secondaries
2735     reboot_type = self.op.reboot_type
2736     extra_args = getattr(self.op, "extra_args", "")
2737
2738     node_current = instance.primary_node
2739
2740     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2741                        constants.INSTANCE_REBOOT_HARD]:
2742       result = self.rpc.call_instance_reboot(node_current, instance,
2743                                              reboot_type, extra_args)
2744       if result.failed or not result.data:
2745         raise errors.OpExecError("Could not reboot instance")
2746     else:
2747       if not self.rpc.call_instance_shutdown(node_current, instance):
2748         raise errors.OpExecError("could not shutdown instance for full reboot")
2749       _ShutdownInstanceDisks(self, instance)
2750       _StartInstanceDisks(self, instance, ignore_secondaries)
2751       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2752       msg = result.RemoteFailMsg()
2753       if msg:
2754         _ShutdownInstanceDisks(self, instance)
2755         raise errors.OpExecError("Could not start instance for"
2756                                  " full reboot: %s" % msg)
2757
2758     self.cfg.MarkInstanceUp(instance.name)
2759
2760
2761 class LUShutdownInstance(LogicalUnit):
2762   """Shutdown an instance.
2763
2764   """
2765   HPATH = "instance-stop"
2766   HTYPE = constants.HTYPE_INSTANCE
2767   _OP_REQP = ["instance_name"]
2768   REQ_BGL = False
2769
2770   def ExpandNames(self):
2771     self._ExpandAndLockInstance()
2772
2773   def BuildHooksEnv(self):
2774     """Build hooks env.
2775
2776     This runs on master, primary and secondary nodes of the instance.
2777
2778     """
2779     env = _BuildInstanceHookEnvByObject(self, self.instance)
2780     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2781     return env, nl, nl
2782
2783   def CheckPrereq(self):
2784     """Check prerequisites.
2785
2786     This checks that the instance is in the cluster.
2787
2788     """
2789     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2790     assert self.instance is not None, \
2791       "Cannot retrieve locked instance %s" % self.op.instance_name
2792     _CheckNodeOnline(self, self.instance.primary_node)
2793
2794   def Exec(self, feedback_fn):
2795     """Shutdown the instance.
2796
2797     """
2798     instance = self.instance
2799     node_current = instance.primary_node
2800     self.cfg.MarkInstanceDown(instance.name)
2801     result = self.rpc.call_instance_shutdown(node_current, instance)
2802     if result.failed or not result.data:
2803       self.proc.LogWarning("Could not shutdown instance")
2804
2805     _ShutdownInstanceDisks(self, instance)
2806
2807
2808 class LUReinstallInstance(LogicalUnit):
2809   """Reinstall an instance.
2810
2811   """
2812   HPATH = "instance-reinstall"
2813   HTYPE = constants.HTYPE_INSTANCE
2814   _OP_REQP = ["instance_name"]
2815   REQ_BGL = False
2816
2817   def ExpandNames(self):
2818     self._ExpandAndLockInstance()
2819
2820   def BuildHooksEnv(self):
2821     """Build hooks env.
2822
2823     This runs on master, primary and secondary nodes of the instance.
2824
2825     """
2826     env = _BuildInstanceHookEnvByObject(self, self.instance)
2827     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2828     return env, nl, nl
2829
2830   def CheckPrereq(self):
2831     """Check prerequisites.
2832
2833     This checks that the instance is in the cluster and is not running.
2834
2835     """
2836     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2837     assert instance is not None, \
2838       "Cannot retrieve locked instance %s" % self.op.instance_name
2839     _CheckNodeOnline(self, instance.primary_node)
2840
2841     if instance.disk_template == constants.DT_DISKLESS:
2842       raise errors.OpPrereqError("Instance '%s' has no disks" %
2843                                  self.op.instance_name)
2844     if instance.admin_up:
2845       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2846                                  self.op.instance_name)
2847     remote_info = self.rpc.call_instance_info(instance.primary_node,
2848                                               instance.name,
2849                                               instance.hypervisor)
2850     if remote_info.failed or remote_info.data:
2851       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2852                                  (self.op.instance_name,
2853                                   instance.primary_node))
2854
2855     self.op.os_type = getattr(self.op, "os_type", None)
2856     if self.op.os_type is not None:
2857       # OS verification
2858       pnode = self.cfg.GetNodeInfo(
2859         self.cfg.ExpandNodeName(instance.primary_node))
2860       if pnode is None:
2861         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2862                                    self.op.pnode)
2863       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2864       result.Raise()
2865       if not isinstance(result.data, objects.OS):
2866         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2867                                    " primary node"  % self.op.os_type)
2868
2869     self.instance = instance
2870
2871   def Exec(self, feedback_fn):
2872     """Reinstall the instance.
2873
2874     """
2875     inst = self.instance
2876
2877     if self.op.os_type is not None:
2878       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2879       inst.os = self.op.os_type
2880       self.cfg.Update(inst)
2881
2882     _StartInstanceDisks(self, inst, None)
2883     try:
2884       feedback_fn("Running the instance OS create scripts...")
2885       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2886       msg = result.RemoteFailMsg()
2887       if msg:
2888         raise errors.OpExecError("Could not install OS for instance %s"
2889                                  " on node %s: %s" %
2890                                  (inst.name, inst.primary_node, msg))
2891     finally:
2892       _ShutdownInstanceDisks(self, inst)
2893
2894
2895 class LURenameInstance(LogicalUnit):
2896   """Rename an instance.
2897
2898   """
2899   HPATH = "instance-rename"
2900   HTYPE = constants.HTYPE_INSTANCE
2901   _OP_REQP = ["instance_name", "new_name"]
2902
2903   def BuildHooksEnv(self):
2904     """Build hooks env.
2905
2906     This runs on master, primary and secondary nodes of the instance.
2907
2908     """
2909     env = _BuildInstanceHookEnvByObject(self, self.instance)
2910     env["INSTANCE_NEW_NAME"] = self.op.new_name
2911     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2912     return env, nl, nl
2913
2914   def CheckPrereq(self):
2915     """Check prerequisites.
2916
2917     This checks that the instance is in the cluster and is not running.
2918
2919     """
2920     instance = self.cfg.GetInstanceInfo(
2921       self.cfg.ExpandInstanceName(self.op.instance_name))
2922     if instance is None:
2923       raise errors.OpPrereqError("Instance '%s' not known" %
2924                                  self.op.instance_name)
2925     _CheckNodeOnline(self, instance.primary_node)
2926
2927     if instance.admin_up:
2928       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2929                                  self.op.instance_name)
2930     remote_info = self.rpc.call_instance_info(instance.primary_node,
2931                                               instance.name,
2932                                               instance.hypervisor)
2933     remote_info.Raise()
2934     if remote_info.data:
2935       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2936                                  (self.op.instance_name,
2937                                   instance.primary_node))
2938     self.instance = instance
2939
2940     # new name verification
2941     name_info = utils.HostInfo(self.op.new_name)
2942
2943     self.op.new_name = new_name = name_info.name
2944     instance_list = self.cfg.GetInstanceList()
2945     if new_name in instance_list:
2946       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2947                                  new_name)
2948
2949     if not getattr(self.op, "ignore_ip", False):
2950       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2951         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2952                                    (name_info.ip, new_name))
2953
2954
2955   def Exec(self, feedback_fn):
2956     """Reinstall the instance.
2957
2958     """
2959     inst = self.instance
2960     old_name = inst.name
2961
2962     if inst.disk_template == constants.DT_FILE:
2963       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2964
2965     self.cfg.RenameInstance(inst.name, self.op.new_name)
2966     # Change the instance lock. This is definitely safe while we hold the BGL
2967     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2968     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2969
2970     # re-read the instance from the configuration after rename
2971     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2972
2973     if inst.disk_template == constants.DT_FILE:
2974       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2975       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2976                                                      old_file_storage_dir,
2977                                                      new_file_storage_dir)
2978       result.Raise()
2979       if not result.data:
2980         raise errors.OpExecError("Could not connect to node '%s' to rename"
2981                                  " directory '%s' to '%s' (but the instance"
2982                                  " has been renamed in Ganeti)" % (
2983                                  inst.primary_node, old_file_storage_dir,
2984                                  new_file_storage_dir))
2985
2986       if not result.data[0]:
2987         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2988                                  " (but the instance has been renamed in"
2989                                  " Ganeti)" % (old_file_storage_dir,
2990                                                new_file_storage_dir))
2991
2992     _StartInstanceDisks(self, inst, None)
2993     try:
2994       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2995                                                  old_name)
2996       msg = result.RemoteFailMsg()
2997       if msg:
2998         msg = ("Could not run OS rename script for instance %s on node %s"
2999                " (but the instance has been renamed in Ganeti): %s" %
3000                (inst.name, inst.primary_node, msg))
3001         self.proc.LogWarning(msg)
3002     finally:
3003       _ShutdownInstanceDisks(self, inst)
3004
3005
3006 class LURemoveInstance(LogicalUnit):
3007   """Remove an instance.
3008
3009   """
3010   HPATH = "instance-remove"
3011   HTYPE = constants.HTYPE_INSTANCE
3012   _OP_REQP = ["instance_name", "ignore_failures"]
3013   REQ_BGL = False
3014
3015   def ExpandNames(self):
3016     self._ExpandAndLockInstance()
3017     self.needed_locks[locking.LEVEL_NODE] = []
3018     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3019
3020   def DeclareLocks(self, level):
3021     if level == locking.LEVEL_NODE:
3022       self._LockInstancesNodes()
3023
3024   def BuildHooksEnv(self):
3025     """Build hooks env.
3026
3027     This runs on master, primary and secondary nodes of the instance.
3028
3029     """
3030     env = _BuildInstanceHookEnvByObject(self, self.instance)
3031     nl = [self.cfg.GetMasterNode()]
3032     return env, nl, nl
3033
3034   def CheckPrereq(self):
3035     """Check prerequisites.
3036
3037     This checks that the instance is in the cluster.
3038
3039     """
3040     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3041     assert self.instance is not None, \
3042       "Cannot retrieve locked instance %s" % self.op.instance_name
3043
3044   def Exec(self, feedback_fn):
3045     """Remove the instance.
3046
3047     """
3048     instance = self.instance
3049     logging.info("Shutting down instance %s on node %s",
3050                  instance.name, instance.primary_node)
3051
3052     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3053     if result.failed or not result.data:
3054       if self.op.ignore_failures:
3055         feedback_fn("Warning: can't shutdown instance")
3056       else:
3057         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3058                                  (instance.name, instance.primary_node))
3059
3060     logging.info("Removing block devices for instance %s", instance.name)
3061
3062     if not _RemoveDisks(self, instance):
3063       if self.op.ignore_failures:
3064         feedback_fn("Warning: can't remove instance's disks")
3065       else:
3066         raise errors.OpExecError("Can't remove instance's disks")
3067
3068     logging.info("Removing instance %s out of cluster config", instance.name)
3069
3070     self.cfg.RemoveInstance(instance.name)
3071     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3072
3073
3074 class LUQueryInstances(NoHooksLU):
3075   """Logical unit for querying instances.
3076
3077   """
3078   _OP_REQP = ["output_fields", "names", "use_locking"]
3079   REQ_BGL = False
3080   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3081                                     "admin_state", "admin_ram",
3082                                     "disk_template", "ip", "mac", "bridge",
3083                                     "sda_size", "sdb_size", "vcpus", "tags",
3084                                     "network_port", "beparams",
3085                                     "(disk).(size)/([0-9]+)",
3086                                     "(disk).(sizes)",
3087                                     "(nic).(mac|ip|bridge)/([0-9]+)",
3088                                     "(nic).(macs|ips|bridges)",
3089                                     "(disk|nic).(count)",
3090                                     "serial_no", "hypervisor", "hvparams",] +
3091                                   ["hv/%s" % name
3092                                    for name in constants.HVS_PARAMETERS] +
3093                                   ["be/%s" % name
3094                                    for name in constants.BES_PARAMETERS])
3095   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3096
3097
3098   def ExpandNames(self):
3099     _CheckOutputFields(static=self._FIELDS_STATIC,
3100                        dynamic=self._FIELDS_DYNAMIC,
3101                        selected=self.op.output_fields)
3102
3103     self.needed_locks = {}
3104     self.share_locks[locking.LEVEL_INSTANCE] = 1
3105     self.share_locks[locking.LEVEL_NODE] = 1
3106
3107     if self.op.names:
3108       self.wanted = _GetWantedInstances(self, self.op.names)
3109     else:
3110       self.wanted = locking.ALL_SET
3111
3112     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3113     self.do_locking = self.do_node_query and self.op.use_locking
3114     if self.do_locking:
3115       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3116       self.needed_locks[locking.LEVEL_NODE] = []
3117       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3118
3119   def DeclareLocks(self, level):
3120     if level == locking.LEVEL_NODE and self.do_locking:
3121       self._LockInstancesNodes()
3122
3123   def CheckPrereq(self):
3124     """Check prerequisites.
3125
3126     """
3127     pass
3128
3129   def Exec(self, feedback_fn):
3130     """Computes the list of nodes and their attributes.
3131
3132     """
3133     all_info = self.cfg.GetAllInstancesInfo()
3134     if self.wanted == locking.ALL_SET:
3135       # caller didn't specify instance names, so ordering is not important
3136       if self.do_locking:
3137         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3138       else:
3139         instance_names = all_info.keys()
3140       instance_names = utils.NiceSort(instance_names)
3141     else:
3142       # caller did specify names, so we must keep the ordering
3143       if self.do_locking:
3144         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3145       else:
3146         tgt_set = all_info.keys()
3147       missing = set(self.wanted).difference(tgt_set)
3148       if missing:
3149         raise errors.OpExecError("Some instances were removed before"
3150                                  " retrieving their data: %s" % missing)
3151       instance_names = self.wanted
3152
3153     instance_list = [all_info[iname] for iname in instance_names]
3154
3155     # begin data gathering
3156
3157     nodes = frozenset([inst.primary_node for inst in instance_list])
3158     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3159
3160     bad_nodes = []
3161     off_nodes = []
3162     if self.do_node_query:
3163       live_data = {}
3164       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3165       for name in nodes:
3166         result = node_data[name]
3167         if result.offline:
3168           # offline nodes will be in both lists
3169           off_nodes.append(name)
3170         if result.failed:
3171           bad_nodes.append(name)
3172         else:
3173           if result.data:
3174             live_data.update(result.data)
3175             # else no instance is alive
3176     else:
3177       live_data = dict([(name, {}) for name in instance_names])
3178
3179     # end data gathering
3180
3181     HVPREFIX = "hv/"
3182     BEPREFIX = "be/"
3183     output = []
3184     for instance in instance_list:
3185       iout = []
3186       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3187       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3188       for field in self.op.output_fields:
3189         st_match = self._FIELDS_STATIC.Matches(field)
3190         if field == "name":
3191           val = instance.name
3192         elif field == "os":
3193           val = instance.os
3194         elif field == "pnode":
3195           val = instance.primary_node
3196         elif field == "snodes":
3197           val = list(instance.secondary_nodes)
3198         elif field == "admin_state":
3199           val = instance.admin_up
3200         elif field == "oper_state":
3201           if instance.primary_node in bad_nodes:
3202             val = None
3203           else:
3204             val = bool(live_data.get(instance.name))
3205         elif field == "status":
3206           if instance.primary_node in off_nodes:
3207             val = "ERROR_nodeoffline"
3208           elif instance.primary_node in bad_nodes:
3209             val = "ERROR_nodedown"
3210           else:
3211             running = bool(live_data.get(instance.name))
3212             if running:
3213               if instance.admin_up:
3214                 val = "running"
3215               else:
3216                 val = "ERROR_up"
3217             else:
3218               if instance.admin_up:
3219                 val = "ERROR_down"
3220               else:
3221                 val = "ADMIN_down"
3222         elif field == "oper_ram":
3223           if instance.primary_node in bad_nodes:
3224             val = None
3225           elif instance.name in live_data:
3226             val = live_data[instance.name].get("memory", "?")
3227           else:
3228             val = "-"
3229         elif field == "disk_template":
3230           val = instance.disk_template
3231         elif field == "ip":
3232           val = instance.nics[0].ip
3233         elif field == "bridge":
3234           val = instance.nics[0].bridge
3235         elif field == "mac":
3236           val = instance.nics[0].mac
3237         elif field == "sda_size" or field == "sdb_size":
3238           idx = ord(field[2]) - ord('a')
3239           try:
3240             val = instance.FindDisk(idx).size
3241           except errors.OpPrereqError:
3242             val = None
3243         elif field == "tags":
3244           val = list(instance.GetTags())
3245         elif field == "serial_no":
3246           val = instance.serial_no
3247         elif field == "network_port":
3248           val = instance.network_port
3249         elif field == "hypervisor":
3250           val = instance.hypervisor
3251         elif field == "hvparams":
3252           val = i_hv
3253         elif (field.startswith(HVPREFIX) and
3254               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3255           val = i_hv.get(field[len(HVPREFIX):], None)
3256         elif field == "beparams":
3257           val = i_be
3258         elif (field.startswith(BEPREFIX) and
3259               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3260           val = i_be.get(field[len(BEPREFIX):], None)
3261         elif st_match and st_match.groups():
3262           # matches a variable list
3263           st_groups = st_match.groups()
3264           if st_groups and st_groups[0] == "disk":
3265             if st_groups[1] == "count":
3266               val = len(instance.disks)
3267             elif st_groups[1] == "sizes":
3268               val = [disk.size for disk in instance.disks]
3269             elif st_groups[1] == "size":
3270               try:
3271                 val = instance.FindDisk(st_groups[2]).size
3272               except errors.OpPrereqError:
3273                 val = None
3274             else:
3275               assert False, "Unhandled disk parameter"
3276           elif st_groups[0] == "nic":
3277             if st_groups[1] == "count":
3278               val = len(instance.nics)
3279             elif st_groups[1] == "macs":
3280               val = [nic.mac for nic in instance.nics]
3281             elif st_groups[1] == "ips":
3282               val = [nic.ip for nic in instance.nics]
3283             elif st_groups[1] == "bridges":
3284               val = [nic.bridge for nic in instance.nics]
3285             else:
3286               # index-based item
3287               nic_idx = int(st_groups[2])
3288               if nic_idx >= len(instance.nics):
3289                 val = None
3290               else:
3291                 if st_groups[1] == "mac":
3292                   val = instance.nics[nic_idx].mac
3293                 elif st_groups[1] == "ip":
3294                   val = instance.nics[nic_idx].ip
3295                 elif st_groups[1] == "bridge":
3296                   val = instance.nics[nic_idx].bridge
3297                 else:
3298                   assert False, "Unhandled NIC parameter"
3299           else:
3300             assert False, "Unhandled variable parameter"
3301         else:
3302           raise errors.ParameterError(field)
3303         iout.append(val)
3304       output.append(iout)
3305
3306     return output
3307
3308
3309 class LUFailoverInstance(LogicalUnit):
3310   """Failover an instance.
3311
3312   """
3313   HPATH = "instance-failover"
3314   HTYPE = constants.HTYPE_INSTANCE
3315   _OP_REQP = ["instance_name", "ignore_consistency"]
3316   REQ_BGL = False
3317
3318   def ExpandNames(self):
3319     self._ExpandAndLockInstance()
3320     self.needed_locks[locking.LEVEL_NODE] = []
3321     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3322
3323   def DeclareLocks(self, level):
3324     if level == locking.LEVEL_NODE:
3325       self._LockInstancesNodes()
3326
3327   def BuildHooksEnv(self):
3328     """Build hooks env.
3329
3330     This runs on master, primary and secondary nodes of the instance.
3331
3332     """
3333     env = {
3334       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3335       }
3336     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3337     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3338     return env, nl, nl
3339
3340   def CheckPrereq(self):
3341     """Check prerequisites.
3342
3343     This checks that the instance is in the cluster.
3344
3345     """
3346     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3347     assert self.instance is not None, \
3348       "Cannot retrieve locked instance %s" % self.op.instance_name
3349
3350     bep = self.cfg.GetClusterInfo().FillBE(instance)
3351     if instance.disk_template not in constants.DTS_NET_MIRROR:
3352       raise errors.OpPrereqError("Instance's disk layout is not"
3353                                  " network mirrored, cannot failover.")
3354
3355     secondary_nodes = instance.secondary_nodes
3356     if not secondary_nodes:
3357       raise errors.ProgrammerError("no secondary node but using "
3358                                    "a mirrored disk template")
3359
3360     target_node = secondary_nodes[0]
3361     _CheckNodeOnline(self, target_node)
3362     # check memory requirements on the secondary node
3363     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3364                          instance.name, bep[constants.BE_MEMORY],
3365                          instance.hypervisor)
3366
3367     # check bridge existance
3368     brlist = [nic.bridge for nic in instance.nics]
3369     result = self.rpc.call_bridges_exist(target_node, brlist)
3370     result.Raise()
3371     if not result.data:
3372       raise errors.OpPrereqError("One or more target bridges %s does not"
3373                                  " exist on destination node '%s'" %
3374                                  (brlist, target_node))
3375
3376   def Exec(self, feedback_fn):
3377     """Failover an instance.
3378
3379     The failover is done by shutting it down on its present node and
3380     starting it on the secondary.
3381
3382     """
3383     instance = self.instance
3384
3385     source_node = instance.primary_node
3386     target_node = instance.secondary_nodes[0]
3387
3388     feedback_fn("* checking disk consistency between source and target")
3389     for dev in instance.disks:
3390       # for drbd, these are drbd over lvm
3391       if not _CheckDiskConsistency(self, dev, target_node, False):
3392         if instance.admin_up and not self.op.ignore_consistency:
3393           raise errors.OpExecError("Disk %s is degraded on target node,"
3394                                    " aborting failover." % dev.iv_name)
3395
3396     feedback_fn("* shutting down instance on source node")
3397     logging.info("Shutting down instance %s on node %s",
3398                  instance.name, source_node)
3399
3400     result = self.rpc.call_instance_shutdown(source_node, instance)
3401     if result.failed or not result.data:
3402       if self.op.ignore_consistency:
3403         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3404                              " Proceeding"
3405                              " anyway. Please make sure node %s is down",
3406                              instance.name, source_node, source_node)
3407       else:
3408         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3409                                  (instance.name, source_node))
3410
3411     feedback_fn("* deactivating the instance's disks on source node")
3412     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3413       raise errors.OpExecError("Can't shut down the instance's disks.")
3414
3415     instance.primary_node = target_node
3416     # distribute new instance config to the other nodes
3417     self.cfg.Update(instance)
3418
3419     # Only start the instance if it's marked as up
3420     if instance.admin_up:
3421       feedback_fn("* activating the instance's disks on target node")
3422       logging.info("Starting instance %s on node %s",
3423                    instance.name, target_node)
3424
3425       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3426                                                ignore_secondaries=True)
3427       if not disks_ok:
3428         _ShutdownInstanceDisks(self, instance)
3429         raise errors.OpExecError("Can't activate the instance's disks")
3430
3431       feedback_fn("* starting the instance on the target node")
3432       result = self.rpc.call_instance_start(target_node, instance, None)
3433       msg = result.RemoteFailMsg()
3434       if msg:
3435         _ShutdownInstanceDisks(self, instance)
3436         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3437                                  (instance.name, target_node, msg))
3438
3439
3440 class LUMigrateInstance(LogicalUnit):
3441   """Migrate an instance.
3442
3443   This is migration without shutting down, compared to the failover,
3444   which is done with shutdown.
3445
3446   """
3447   HPATH = "instance-migrate"
3448   HTYPE = constants.HTYPE_INSTANCE
3449   _OP_REQP = ["instance_name", "live", "cleanup"]
3450
3451   REQ_BGL = False
3452
3453   def ExpandNames(self):
3454     self._ExpandAndLockInstance()
3455     self.needed_locks[locking.LEVEL_NODE] = []
3456     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3457
3458   def DeclareLocks(self, level):
3459     if level == locking.LEVEL_NODE:
3460       self._LockInstancesNodes()
3461
3462   def BuildHooksEnv(self):
3463     """Build hooks env.
3464
3465     This runs on master, primary and secondary nodes of the instance.
3466
3467     """
3468     env = _BuildInstanceHookEnvByObject(self, self.instance)
3469     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3470     return env, nl, nl
3471
3472   def CheckPrereq(self):
3473     """Check prerequisites.
3474
3475     This checks that the instance is in the cluster.
3476
3477     """
3478     instance = self.cfg.GetInstanceInfo(
3479       self.cfg.ExpandInstanceName(self.op.instance_name))
3480     if instance is None:
3481       raise errors.OpPrereqError("Instance '%s' not known" %
3482                                  self.op.instance_name)
3483
3484     if instance.disk_template != constants.DT_DRBD8:
3485       raise errors.OpPrereqError("Instance's disk layout is not"
3486                                  " drbd8, cannot migrate.")
3487
3488     secondary_nodes = instance.secondary_nodes
3489     if not secondary_nodes:
3490       raise errors.ProgrammerError("no secondary node but using "
3491                                    "drbd8 disk template")
3492
3493     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3494
3495     target_node = secondary_nodes[0]
3496     # check memory requirements on the secondary node
3497     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3498                          instance.name, i_be[constants.BE_MEMORY],
3499                          instance.hypervisor)
3500
3501     # check bridge existance
3502     brlist = [nic.bridge for nic in instance.nics]
3503     result = self.rpc.call_bridges_exist(target_node, brlist)
3504     if result.failed or not result.data:
3505       raise errors.OpPrereqError("One or more target bridges %s does not"
3506                                  " exist on destination node '%s'" %
3507                                  (brlist, target_node))
3508
3509     if not self.op.cleanup:
3510       result = self.rpc.call_instance_migratable(instance.primary_node,
3511                                                  instance)
3512       msg = result.RemoteFailMsg()
3513       if msg:
3514         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3515                                    msg)
3516
3517     self.instance = instance
3518
3519   def _WaitUntilSync(self):
3520     """Poll with custom rpc for disk sync.
3521
3522     This uses our own step-based rpc call.
3523
3524     """
3525     self.feedback_fn("* wait until resync is done")
3526     all_done = False
3527     while not all_done:
3528       all_done = True
3529       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3530                                             self.nodes_ip,
3531                                             self.instance.disks)
3532       min_percent = 100
3533       for node, nres in result.items():
3534         msg = nres.RemoteFailMsg()
3535         if msg:
3536           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3537                                    (node, msg))
3538         node_done, node_percent = nres.data[1]
3539         all_done = all_done and node_done
3540         if node_percent is not None:
3541           min_percent = min(min_percent, node_percent)
3542       if not all_done:
3543         if min_percent < 100:
3544           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3545         time.sleep(2)
3546
3547   def _EnsureSecondary(self, node):
3548     """Demote a node to secondary.
3549
3550     """
3551     self.feedback_fn("* switching node %s to secondary mode" % node)
3552
3553     for dev in self.instance.disks:
3554       self.cfg.SetDiskID(dev, node)
3555
3556     result = self.rpc.call_blockdev_close(node, self.instance.name,
3557                                           self.instance.disks)
3558     msg = result.RemoteFailMsg()
3559     if msg:
3560       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3561                                " error %s" % (node, msg))
3562
3563   def _GoStandalone(self):
3564     """Disconnect from the network.
3565
3566     """
3567     self.feedback_fn("* changing into standalone mode")
3568     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3569                                                self.instance.disks)
3570     for node, nres in result.items():
3571       msg = nres.RemoteFailMsg()
3572       if msg:
3573         raise errors.OpExecError("Cannot disconnect disks node %s,"
3574                                  " error %s" % (node, msg))
3575
3576   def _GoReconnect(self, multimaster):
3577     """Reconnect to the network.
3578
3579     """
3580     if multimaster:
3581       msg = "dual-master"
3582     else:
3583       msg = "single-master"
3584     self.feedback_fn("* changing disks into %s mode" % msg)
3585     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3586                                            self.instance.disks,
3587                                            self.instance.name, multimaster)
3588     for node, nres in result.items():
3589       msg = nres.RemoteFailMsg()
3590       if msg:
3591         raise errors.OpExecError("Cannot change disks config on node %s,"
3592                                  " error: %s" % (node, msg))
3593
3594   def _ExecCleanup(self):
3595     """Try to cleanup after a failed migration.
3596
3597     The cleanup is done by:
3598       - check that the instance is running only on one node
3599         (and update the config if needed)
3600       - change disks on its secondary node to secondary
3601       - wait until disks are fully synchronized
3602       - disconnect from the network
3603       - change disks into single-master mode
3604       - wait again until disks are fully synchronized
3605
3606     """
3607     instance = self.instance
3608     target_node = self.target_node
3609     source_node = self.source_node
3610
3611     # check running on only one node
3612     self.feedback_fn("* checking where the instance actually runs"
3613                      " (if this hangs, the hypervisor might be in"
3614                      " a bad state)")
3615     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3616     for node, result in ins_l.items():
3617       result.Raise()
3618       if not isinstance(result.data, list):
3619         raise errors.OpExecError("Can't contact node '%s'" % node)
3620
3621     runningon_source = instance.name in ins_l[source_node].data
3622     runningon_target = instance.name in ins_l[target_node].data
3623
3624     if runningon_source and runningon_target:
3625       raise errors.OpExecError("Instance seems to be running on two nodes,"
3626                                " or the hypervisor is confused. You will have"
3627                                " to ensure manually that it runs only on one"
3628                                " and restart this operation.")
3629
3630     if not (runningon_source or runningon_target):
3631       raise errors.OpExecError("Instance does not seem to be running at all."
3632                                " In this case, it's safer to repair by"
3633                                " running 'gnt-instance stop' to ensure disk"
3634                                " shutdown, and then restarting it.")
3635
3636     if runningon_target:
3637       # the migration has actually succeeded, we need to update the config
3638       self.feedback_fn("* instance running on secondary node (%s),"
3639                        " updating config" % target_node)
3640       instance.primary_node = target_node
3641       self.cfg.Update(instance)
3642       demoted_node = source_node
3643     else:
3644       self.feedback_fn("* instance confirmed to be running on its"
3645                        " primary node (%s)" % source_node)
3646       demoted_node = target_node
3647
3648     self._EnsureSecondary(demoted_node)
3649     try:
3650       self._WaitUntilSync()
3651     except errors.OpExecError:
3652       # we ignore here errors, since if the device is standalone, it
3653       # won't be able to sync
3654       pass
3655     self._GoStandalone()
3656     self._GoReconnect(False)
3657     self._WaitUntilSync()
3658
3659     self.feedback_fn("* done")
3660
3661   def _RevertDiskStatus(self):
3662     """Try to revert the disk status after a failed migration.
3663
3664     """
3665     target_node = self.target_node
3666     try:
3667       self._EnsureSecondary(target_node)
3668       self._GoStandalone()
3669       self._GoReconnect(False)
3670       self._WaitUntilSync()
3671     except errors.OpExecError, err:
3672       self.LogWarning("Migration failed and I can't reconnect the"
3673                       " drives: error '%s'\n"
3674                       "Please look and recover the instance status" %
3675                       str(err))
3676
3677   def _AbortMigration(self):
3678     """Call the hypervisor code to abort a started migration.
3679
3680     """
3681     instance = self.instance
3682     target_node = self.target_node
3683     migration_info = self.migration_info
3684
3685     abort_result = self.rpc.call_finalize_migration(target_node,
3686                                                     instance,
3687                                                     migration_info,
3688                                                     False)
3689     abort_msg = abort_result.RemoteFailMsg()
3690     if abort_msg:
3691       logging.error("Aborting migration failed on target node %s: %s" %
3692                     (target_node, abort_msg))
3693       # Don't raise an exception here, as we stil have to try to revert the
3694       # disk status, even if this step failed.
3695
3696   def _ExecMigration(self):
3697     """Migrate an instance.
3698
3699     The migrate is done by:
3700       - change the disks into dual-master mode
3701       - wait until disks are fully synchronized again
3702       - migrate the instance
3703       - change disks on the new secondary node (the old primary) to secondary
3704       - wait until disks are fully synchronized
3705       - change disks into single-master mode
3706
3707     """
3708     instance = self.instance
3709     target_node = self.target_node
3710     source_node = self.source_node
3711
3712     self.feedback_fn("* checking disk consistency between source and target")
3713     for dev in instance.disks:
3714       if not _CheckDiskConsistency(self, dev, target_node, False):
3715         raise errors.OpExecError("Disk %s is degraded or not fully"
3716                                  " synchronized on target node,"
3717                                  " aborting migrate." % dev.iv_name)
3718
3719     # First get the migration information from the remote node
3720     result = self.rpc.call_migration_info(source_node, instance)
3721     msg = result.RemoteFailMsg()
3722     if msg:
3723       log_err = ("Failed fetching source migration information from %s: %s" %
3724                   (source_node, msg))
3725       logging.error(log_err)
3726       raise errors.OpExecError(log_err)
3727
3728     self.migration_info = migration_info = result.data[1]
3729
3730     # Then switch the disks to master/master mode
3731     self._EnsureSecondary(target_node)
3732     self._GoStandalone()
3733     self._GoReconnect(True)
3734     self._WaitUntilSync()
3735
3736     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3737     result = self.rpc.call_accept_instance(target_node,
3738                                            instance,
3739                                            migration_info,
3740                                            self.nodes_ip[target_node])
3741
3742     msg = result.RemoteFailMsg()
3743     if msg:
3744       logging.error("Instance pre-migration failed, trying to revert"
3745                     " disk status: %s", msg)
3746       self._AbortMigration()
3747       self._RevertDiskStatus()
3748       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3749                                (instance.name, msg))
3750
3751     self.feedback_fn("* migrating instance to %s" % target_node)
3752     time.sleep(10)
3753     result = self.rpc.call_instance_migrate(source_node, instance,
3754                                             self.nodes_ip[target_node],
3755                                             self.op.live)
3756     msg = result.RemoteFailMsg()
3757     if msg:
3758       logging.error("Instance migration failed, trying to revert"
3759                     " disk status: %s", msg)
3760       self._AbortMigration()
3761       self._RevertDiskStatus()
3762       raise errors.OpExecError("Could not migrate instance %s: %s" %
3763                                (instance.name, msg))
3764     time.sleep(10)
3765
3766     instance.primary_node = target_node
3767     # distribute new instance config to the other nodes
3768     self.cfg.Update(instance)
3769
3770     result = self.rpc.call_finalize_migration(target_node,
3771                                               instance,
3772                                               migration_info,
3773                                               True)
3774     msg = result.RemoteFailMsg()
3775     if msg:
3776       logging.error("Instance migration succeeded, but finalization failed:"
3777                     " %s" % msg)
3778       raise errors.OpExecError("Could not finalize instance migration: %s" %
3779                                msg)
3780
3781     self._EnsureSecondary(source_node)
3782     self._WaitUntilSync()
3783     self._GoStandalone()
3784     self._GoReconnect(False)
3785     self._WaitUntilSync()
3786
3787     self.feedback_fn("* done")
3788
3789   def Exec(self, feedback_fn):
3790     """Perform the migration.
3791
3792     """
3793     self.feedback_fn = feedback_fn
3794
3795     self.source_node = self.instance.primary_node
3796     self.target_node = self.instance.secondary_nodes[0]
3797     self.all_nodes = [self.source_node, self.target_node]
3798     self.nodes_ip = {
3799       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3800       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3801       }
3802     if self.op.cleanup:
3803       return self._ExecCleanup()
3804     else:
3805       return self._ExecMigration()
3806
3807
3808 def _CreateBlockDev(lu, node, instance, device, force_create,
3809                     info, force_open):
3810   """Create a tree of block devices on a given node.
3811
3812   If this device type has to be created on secondaries, create it and
3813   all its children.
3814
3815   If not, just recurse to children keeping the same 'force' value.
3816
3817   @param lu: the lu on whose behalf we execute
3818   @param node: the node on which to create the device
3819   @type instance: L{objects.Instance}
3820   @param instance: the instance which owns the device
3821   @type device: L{objects.Disk}
3822   @param device: the device to create
3823   @type force_create: boolean
3824   @param force_create: whether to force creation of this device; this
3825       will be change to True whenever we find a device which has
3826       CreateOnSecondary() attribute
3827   @param info: the extra 'metadata' we should attach to the device
3828       (this will be represented as a LVM tag)
3829   @type force_open: boolean
3830   @param force_open: this parameter will be passes to the
3831       L{backend.CreateBlockDevice} function where it specifies
3832       whether we run on primary or not, and it affects both
3833       the child assembly and the device own Open() execution
3834
3835   """
3836   if device.CreateOnSecondary():
3837     force_create = True
3838
3839   if device.children:
3840     for child in device.children:
3841       _CreateBlockDev(lu, node, instance, child, force_create,
3842                       info, force_open)
3843
3844   if not force_create:
3845     return
3846
3847   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3848
3849
3850 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3851   """Create a single block device on a given node.
3852
3853   This will not recurse over children of the device, so they must be
3854   created in advance.
3855
3856   @param lu: the lu on whose behalf we execute
3857   @param node: the node on which to create the device
3858   @type instance: L{objects.Instance}
3859   @param instance: the instance which owns the device
3860   @type device: L{objects.Disk}
3861   @param device: the device to create
3862   @param info: the extra 'metadata' we should attach to the device
3863       (this will be represented as a LVM tag)
3864   @type force_open: boolean
3865   @param force_open: this parameter will be passes to the
3866       L{backend.CreateBlockDevice} function where it specifies
3867       whether we run on primary or not, and it affects both
3868       the child assembly and the device own Open() execution
3869
3870   """
3871   lu.cfg.SetDiskID(device, node)
3872   result = lu.rpc.call_blockdev_create(node, device, device.size,
3873                                        instance.name, force_open, info)
3874   msg = result.RemoteFailMsg()
3875   if msg:
3876     raise errors.OpExecError("Can't create block device %s on"
3877                              " node %s for instance %s: %s" %
3878                              (device, node, instance.name, msg))
3879   if device.physical_id is None:
3880     device.physical_id = result.data[1]
3881
3882
3883 def _GenerateUniqueNames(lu, exts):
3884   """Generate a suitable LV name.
3885
3886   This will generate a logical volume name for the given instance.
3887
3888   """
3889   results = []
3890   for val in exts:
3891     new_id = lu.cfg.GenerateUniqueID()
3892     results.append("%s%s" % (new_id, val))
3893   return results
3894
3895
3896 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3897                          p_minor, s_minor):
3898   """Generate a drbd8 device complete with its children.
3899
3900   """
3901   port = lu.cfg.AllocatePort()
3902   vgname = lu.cfg.GetVGName()
3903   shared_secret = lu.cfg.GenerateDRBDSecret()
3904   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3905                           logical_id=(vgname, names[0]))
3906   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3907                           logical_id=(vgname, names[1]))
3908   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3909                           logical_id=(primary, secondary, port,
3910                                       p_minor, s_minor,
3911                                       shared_secret),
3912                           children=[dev_data, dev_meta],
3913                           iv_name=iv_name)
3914   return drbd_dev
3915
3916
3917 def _GenerateDiskTemplate(lu, template_name,
3918                           instance_name, primary_node,
3919                           secondary_nodes, disk_info,
3920                           file_storage_dir, file_driver,
3921                           base_index):
3922   """Generate the entire disk layout for a given template type.
3923
3924   """
3925   #TODO: compute space requirements
3926
3927   vgname = lu.cfg.GetVGName()
3928   disk_count = len(disk_info)
3929   disks = []
3930   if template_name == constants.DT_DISKLESS:
3931     pass
3932   elif template_name == constants.DT_PLAIN:
3933     if len(secondary_nodes) != 0:
3934       raise errors.ProgrammerError("Wrong template configuration")
3935
3936     names = _GenerateUniqueNames(lu, [".disk%d" % i
3937                                       for i in range(disk_count)])
3938     for idx, disk in enumerate(disk_info):
3939       disk_index = idx + base_index
3940       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3941                               logical_id=(vgname, names[idx]),
3942                               iv_name="disk/%d" % disk_index,
3943                               mode=disk["mode"])
3944       disks.append(disk_dev)
3945   elif template_name == constants.DT_DRBD8:
3946     if len(secondary_nodes) != 1:
3947       raise errors.ProgrammerError("Wrong template configuration")
3948     remote_node = secondary_nodes[0]
3949     minors = lu.cfg.AllocateDRBDMinor(
3950       [primary_node, remote_node] * len(disk_info), instance_name)
3951
3952     names = []
3953     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
3954                                                for i in range(disk_count)]):
3955       names.append(lv_prefix + "_data")
3956       names.append(lv_prefix + "_meta")
3957     for idx, disk in enumerate(disk_info):
3958       disk_index = idx + base_index
3959       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3960                                       disk["size"], names[idx*2:idx*2+2],
3961                                       "disk/%d" % disk_index,
3962                                       minors[idx*2], minors[idx*2+1])
3963       disk_dev.mode = disk["mode"]
3964       disks.append(disk_dev)
3965   elif template_name == constants.DT_FILE:
3966     if len(secondary_nodes) != 0:
3967       raise errors.ProgrammerError("Wrong template configuration")
3968
3969     for idx, disk in enumerate(disk_info):
3970       disk_index = idx + base_index
3971       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3972                               iv_name="disk/%d" % disk_index,
3973                               logical_id=(file_driver,
3974                                           "%s/disk%d" % (file_storage_dir,
3975                                                          idx)),
3976                               mode=disk["mode"])
3977       disks.append(disk_dev)
3978   else:
3979     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3980   return disks
3981
3982
3983 def _GetInstanceInfoText(instance):
3984   """Compute that text that should be added to the disk's metadata.
3985
3986   """
3987   return "originstname+%s" % instance.name
3988
3989
3990 def _CreateDisks(lu, instance):
3991   """Create all disks for an instance.
3992
3993   This abstracts away some work from AddInstance.
3994
3995   @type lu: L{LogicalUnit}
3996   @param lu: the logical unit on whose behalf we execute
3997   @type instance: L{objects.Instance}
3998   @param instance: the instance whose disks we should create
3999   @rtype: boolean
4000   @return: the success of the creation
4001
4002   """
4003   info = _GetInstanceInfoText(instance)
4004   pnode = instance.primary_node
4005
4006   if instance.disk_template == constants.DT_FILE:
4007     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4008     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4009
4010     if result.failed or not result.data:
4011       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4012
4013     if not result.data[0]:
4014       raise errors.OpExecError("Failed to create directory '%s'" %
4015                                file_storage_dir)
4016
4017   # Note: this needs to be kept in sync with adding of disks in
4018   # LUSetInstanceParams
4019   for device in instance.disks:
4020     logging.info("Creating volume %s for instance %s",
4021                  device.iv_name, instance.name)
4022     #HARDCODE
4023     for node in instance.all_nodes:
4024       f_create = node == pnode
4025       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4026
4027
4028 def _RemoveDisks(lu, instance):
4029   """Remove all disks for an instance.
4030
4031   This abstracts away some work from `AddInstance()` and
4032   `RemoveInstance()`. Note that in case some of the devices couldn't
4033   be removed, the removal will continue with the other ones (compare
4034   with `_CreateDisks()`).
4035
4036   @type lu: L{LogicalUnit}
4037   @param lu: the logical unit on whose behalf we execute
4038   @type instance: L{objects.Instance}
4039   @param instance: the instance whose disks we should remove
4040   @rtype: boolean
4041   @return: the success of the removal
4042
4043   """
4044   logging.info("Removing block devices for instance %s", instance.name)
4045
4046   result = True
4047   for device in instance.disks:
4048     for node, disk in device.ComputeNodeTree(instance.primary_node):
4049       lu.cfg.SetDiskID(disk, node)
4050       result = lu.rpc.call_blockdev_remove(node, disk)
4051       if result.failed or not result.data:
4052         lu.proc.LogWarning("Could not remove block device %s on node %s,"
4053                            " continuing anyway", device.iv_name, node)
4054         result = False
4055
4056   if instance.disk_template == constants.DT_FILE:
4057     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4058     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4059                                                  file_storage_dir)
4060     if result.failed or not result.data:
4061       logging.error("Could not remove directory '%s'", file_storage_dir)
4062       result = False
4063
4064   return result
4065
4066
4067 def _ComputeDiskSize(disk_template, disks):
4068   """Compute disk size requirements in the volume group
4069
4070   """
4071   # Required free disk space as a function of disk and swap space
4072   req_size_dict = {
4073     constants.DT_DISKLESS: None,
4074     constants.DT_PLAIN: sum(d["size"] for d in disks),
4075     # 128 MB are added for drbd metadata for each disk
4076     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4077     constants.DT_FILE: None,
4078   }
4079
4080   if disk_template not in req_size_dict:
4081     raise errors.ProgrammerError("Disk template '%s' size requirement"
4082                                  " is unknown" %  disk_template)
4083
4084   return req_size_dict[disk_template]
4085
4086
4087 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4088   """Hypervisor parameter validation.
4089
4090   This function abstract the hypervisor parameter validation to be
4091   used in both instance create and instance modify.
4092
4093   @type lu: L{LogicalUnit}
4094   @param lu: the logical unit for which we check
4095   @type nodenames: list
4096   @param nodenames: the list of nodes on which we should check
4097   @type hvname: string
4098   @param hvname: the name of the hypervisor we should use
4099   @type hvparams: dict
4100   @param hvparams: the parameters which we need to check
4101   @raise errors.OpPrereqError: if the parameters are not valid
4102
4103   """
4104   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4105                                                   hvname,
4106                                                   hvparams)
4107   for node in nodenames:
4108     info = hvinfo[node]
4109     if info.offline:
4110       continue
4111     info.Raise()
4112     if not info.data or not isinstance(info.data, (tuple, list)):
4113       raise errors.OpPrereqError("Cannot get current information"
4114                                  " from node '%s' (%s)" % (node, info.data))
4115     if not info.data[0]:
4116       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4117                                  " %s" % info.data[1])
4118
4119
4120 class LUCreateInstance(LogicalUnit):
4121   """Create an instance.
4122
4123   """
4124   HPATH = "instance-add"
4125   HTYPE = constants.HTYPE_INSTANCE
4126   _OP_REQP = ["instance_name", "disks", "disk_template",
4127               "mode", "start",
4128               "wait_for_sync", "ip_check", "nics",
4129               "hvparams", "beparams"]
4130   REQ_BGL = False
4131
4132   def _ExpandNode(self, node):
4133     """Expands and checks one node name.
4134
4135     """
4136     node_full = self.cfg.ExpandNodeName(node)
4137     if node_full is None:
4138       raise errors.OpPrereqError("Unknown node %s" % node)
4139     return node_full
4140
4141   def ExpandNames(self):
4142     """ExpandNames for CreateInstance.
4143
4144     Figure out the right locks for instance creation.
4145
4146     """
4147     self.needed_locks = {}
4148
4149     # set optional parameters to none if they don't exist
4150     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4151       if not hasattr(self.op, attr):
4152         setattr(self.op, attr, None)
4153
4154     # cheap checks, mostly valid constants given
4155
4156     # verify creation mode
4157     if self.op.mode not in (constants.INSTANCE_CREATE,
4158                             constants.INSTANCE_IMPORT):
4159       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4160                                  self.op.mode)
4161
4162     # disk template and mirror node verification
4163     if self.op.disk_template not in constants.DISK_TEMPLATES:
4164       raise errors.OpPrereqError("Invalid disk template name")
4165
4166     if self.op.hypervisor is None:
4167       self.op.hypervisor = self.cfg.GetHypervisorType()
4168
4169     cluster = self.cfg.GetClusterInfo()
4170     enabled_hvs = cluster.enabled_hypervisors
4171     if self.op.hypervisor not in enabled_hvs:
4172       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4173                                  " cluster (%s)" % (self.op.hypervisor,
4174                                   ",".join(enabled_hvs)))
4175
4176     # check hypervisor parameter syntax (locally)
4177
4178     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4179                                   self.op.hvparams)
4180     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4181     hv_type.CheckParameterSyntax(filled_hvp)
4182
4183     # fill and remember the beparams dict
4184     utils.CheckBEParams(self.op.beparams)
4185     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4186                                     self.op.beparams)
4187
4188     #### instance parameters check
4189
4190     # instance name verification
4191     hostname1 = utils.HostInfo(self.op.instance_name)
4192     self.op.instance_name = instance_name = hostname1.name
4193
4194     # this is just a preventive check, but someone might still add this
4195     # instance in the meantime, and creation will fail at lock-add time
4196     if instance_name in self.cfg.GetInstanceList():
4197       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4198                                  instance_name)
4199
4200     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4201
4202     # NIC buildup
4203     self.nics = []
4204     for nic in self.op.nics:
4205       # ip validity checks
4206       ip = nic.get("ip", None)
4207       if ip is None or ip.lower() == "none":
4208         nic_ip = None
4209       elif ip.lower() == constants.VALUE_AUTO:
4210         nic_ip = hostname1.ip
4211       else:
4212         if not utils.IsValidIP(ip):
4213           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4214                                      " like a valid IP" % ip)
4215         nic_ip = ip
4216
4217       # MAC address verification
4218       mac = nic.get("mac", constants.VALUE_AUTO)
4219       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4220         if not utils.IsValidMac(mac.lower()):
4221           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4222                                      mac)
4223       # bridge verification
4224       bridge = nic.get("bridge", None)
4225       if bridge is None:
4226         bridge = self.cfg.GetDefBridge()
4227       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4228
4229     # disk checks/pre-build
4230     self.disks = []
4231     for disk in self.op.disks:
4232       mode = disk.get("mode", constants.DISK_RDWR)
4233       if mode not in constants.DISK_ACCESS_SET:
4234         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4235                                    mode)
4236       size = disk.get("size", None)
4237       if size is None:
4238         raise errors.OpPrereqError("Missing disk size")
4239       try:
4240         size = int(size)
4241       except ValueError:
4242         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4243       self.disks.append({"size": size, "mode": mode})
4244
4245     # used in CheckPrereq for ip ping check
4246     self.check_ip = hostname1.ip
4247
4248     # file storage checks
4249     if (self.op.file_driver and
4250         not self.op.file_driver in constants.FILE_DRIVER):
4251       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4252                                  self.op.file_driver)
4253
4254     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4255       raise errors.OpPrereqError("File storage directory path not absolute")
4256
4257     ### Node/iallocator related checks
4258     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4259       raise errors.OpPrereqError("One and only one of iallocator and primary"
4260                                  " node must be given")
4261
4262     if self.op.iallocator:
4263       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4264     else:
4265       self.op.pnode = self._ExpandNode(self.op.pnode)
4266       nodelist = [self.op.pnode]
4267       if self.op.snode is not None:
4268         self.op.snode = self._ExpandNode(self.op.snode)
4269         nodelist.append(self.op.snode)
4270       self.needed_locks[locking.LEVEL_NODE] = nodelist
4271
4272     # in case of import lock the source node too
4273     if self.op.mode == constants.INSTANCE_IMPORT:
4274       src_node = getattr(self.op, "src_node", None)
4275       src_path = getattr(self.op, "src_path", None)
4276
4277       if src_path is None:
4278         self.op.src_path = src_path = self.op.instance_name
4279
4280       if src_node is None:
4281         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4282         self.op.src_node = None
4283         if os.path.isabs(src_path):
4284           raise errors.OpPrereqError("Importing an instance from an absolute"
4285                                      " path requires a source node option.")
4286       else:
4287         self.op.src_node = src_node = self._ExpandNode(src_node)
4288         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4289           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4290         if not os.path.isabs(src_path):
4291           self.op.src_path = src_path = \
4292             os.path.join(constants.EXPORT_DIR, src_path)
4293
4294     else: # INSTANCE_CREATE
4295       if getattr(self.op, "os_type", None) is None:
4296         raise errors.OpPrereqError("No guest OS specified")
4297
4298   def _RunAllocator(self):
4299     """Run the allocator based on input opcode.
4300
4301     """
4302     nics = [n.ToDict() for n in self.nics]
4303     ial = IAllocator(self,
4304                      mode=constants.IALLOCATOR_MODE_ALLOC,
4305                      name=self.op.instance_name,
4306                      disk_template=self.op.disk_template,
4307                      tags=[],
4308                      os=self.op.os_type,
4309                      vcpus=self.be_full[constants.BE_VCPUS],
4310                      mem_size=self.be_full[constants.BE_MEMORY],
4311                      disks=self.disks,
4312                      nics=nics,
4313                      hypervisor=self.op.hypervisor,
4314                      )
4315
4316     ial.Run(self.op.iallocator)
4317
4318     if not ial.success:
4319       raise errors.OpPrereqError("Can't compute nodes using"
4320                                  " iallocator '%s': %s" % (self.op.iallocator,
4321                                                            ial.info))
4322     if len(ial.nodes) != ial.required_nodes:
4323       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4324                                  " of nodes (%s), required %s" %
4325                                  (self.op.iallocator, len(ial.nodes),
4326                                   ial.required_nodes))
4327     self.op.pnode = ial.nodes[0]
4328     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4329                  self.op.instance_name, self.op.iallocator,
4330                  ", ".join(ial.nodes))
4331     if ial.required_nodes == 2:
4332       self.op.snode = ial.nodes[1]
4333
4334   def BuildHooksEnv(self):
4335     """Build hooks env.
4336
4337     This runs on master, primary and secondary nodes of the instance.
4338
4339     """
4340     env = {
4341       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
4342       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
4343       "INSTANCE_ADD_MODE": self.op.mode,
4344       }
4345     if self.op.mode == constants.INSTANCE_IMPORT:
4346       env["INSTANCE_SRC_NODE"] = self.op.src_node
4347       env["INSTANCE_SRC_PATH"] = self.op.src_path
4348       env["INSTANCE_SRC_IMAGES"] = self.src_images
4349
4350     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
4351       primary_node=self.op.pnode,
4352       secondary_nodes=self.secondaries,
4353       status=self.instance_status,
4354       os_type=self.op.os_type,
4355       memory=self.be_full[constants.BE_MEMORY],
4356       vcpus=self.be_full[constants.BE_VCPUS],
4357       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4358     ))
4359
4360     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4361           self.secondaries)
4362     return env, nl, nl
4363
4364
4365   def CheckPrereq(self):
4366     """Check prerequisites.
4367
4368     """
4369     if (not self.cfg.GetVGName() and
4370         self.op.disk_template not in constants.DTS_NOT_LVM):
4371       raise errors.OpPrereqError("Cluster does not support lvm-based"
4372                                  " instances")
4373
4374
4375     if self.op.mode == constants.INSTANCE_IMPORT:
4376       src_node = self.op.src_node
4377       src_path = self.op.src_path
4378
4379       if src_node is None:
4380         exp_list = self.rpc.call_export_list(
4381           self.acquired_locks[locking.LEVEL_NODE])
4382         found = False
4383         for node in exp_list:
4384           if not exp_list[node].failed and src_path in exp_list[node].data:
4385             found = True
4386             self.op.src_node = src_node = node
4387             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4388                                                        src_path)
4389             break
4390         if not found:
4391           raise errors.OpPrereqError("No export found for relative path %s" %
4392                                       src_path)
4393
4394       _CheckNodeOnline(self, src_node)
4395       result = self.rpc.call_export_info(src_node, src_path)
4396       result.Raise()
4397       if not result.data:
4398         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4399
4400       export_info = result.data
4401       if not export_info.has_section(constants.INISECT_EXP):
4402         raise errors.ProgrammerError("Corrupted export config")
4403
4404       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4405       if (int(ei_version) != constants.EXPORT_VERSION):
4406         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4407                                    (ei_version, constants.EXPORT_VERSION))
4408
4409       # Check that the new instance doesn't have less disks than the export
4410       instance_disks = len(self.disks)
4411       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4412       if instance_disks < export_disks:
4413         raise errors.OpPrereqError("Not enough disks to import."
4414                                    " (instance: %d, export: %d)" %
4415                                    (instance_disks, export_disks))
4416
4417       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4418       disk_images = []
4419       for idx in range(export_disks):
4420         option = 'disk%d_dump' % idx
4421         if export_info.has_option(constants.INISECT_INS, option):
4422           # FIXME: are the old os-es, disk sizes, etc. useful?
4423           export_name = export_info.get(constants.INISECT_INS, option)
4424           image = os.path.join(src_path, export_name)
4425           disk_images.append(image)
4426         else:
4427           disk_images.append(False)
4428
4429       self.src_images = disk_images
4430
4431       old_name = export_info.get(constants.INISECT_INS, 'name')
4432       # FIXME: int() here could throw a ValueError on broken exports
4433       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4434       if self.op.instance_name == old_name:
4435         for idx, nic in enumerate(self.nics):
4436           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4437             nic_mac_ini = 'nic%d_mac' % idx
4438             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4439
4440     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4441     if self.op.start and not self.op.ip_check:
4442       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4443                                  " adding an instance in start mode")
4444
4445     if self.op.ip_check:
4446       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4447         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4448                                    (self.check_ip, self.op.instance_name))
4449
4450     #### allocator run
4451
4452     if self.op.iallocator is not None:
4453       self._RunAllocator()
4454
4455     #### node related checks
4456
4457     # check primary node
4458     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4459     assert self.pnode is not None, \
4460       "Cannot retrieve locked node %s" % self.op.pnode
4461     if pnode.offline:
4462       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4463                                  pnode.name)
4464
4465     self.secondaries = []
4466
4467     # mirror node verification
4468     if self.op.disk_template in constants.DTS_NET_MIRROR:
4469       if self.op.snode is None:
4470         raise errors.OpPrereqError("The networked disk templates need"
4471                                    " a mirror node")
4472       if self.op.snode == pnode.name:
4473         raise errors.OpPrereqError("The secondary node cannot be"
4474                                    " the primary node.")
4475       self.secondaries.append(self.op.snode)
4476       _CheckNodeOnline(self, self.op.snode)
4477
4478     nodenames = [pnode.name] + self.secondaries
4479
4480     req_size = _ComputeDiskSize(self.op.disk_template,
4481                                 self.disks)
4482
4483     # Check lv size requirements
4484     if req_size is not None:
4485       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4486                                          self.op.hypervisor)
4487       for node in nodenames:
4488         info = nodeinfo[node]
4489         info.Raise()
4490         info = info.data
4491         if not info:
4492           raise errors.OpPrereqError("Cannot get current information"
4493                                      " from node '%s'" % node)
4494         vg_free = info.get('vg_free', None)
4495         if not isinstance(vg_free, int):
4496           raise errors.OpPrereqError("Can't compute free disk space on"
4497                                      " node %s" % node)
4498         if req_size > info['vg_free']:
4499           raise errors.OpPrereqError("Not enough disk space on target node %s."
4500                                      " %d MB available, %d MB required" %
4501                                      (node, info['vg_free'], req_size))
4502
4503     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4504
4505     # os verification
4506     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4507     result.Raise()
4508     if not isinstance(result.data, objects.OS):
4509       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4510                                  " primary node"  % self.op.os_type)
4511
4512     # bridge check on primary node
4513     bridges = [n.bridge for n in self.nics]
4514     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4515     result.Raise()
4516     if not result.data:
4517       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4518                                  " exist on destination node '%s'" %
4519                                  (",".join(bridges), pnode.name))
4520
4521     # memory check on primary node
4522     if self.op.start:
4523       _CheckNodeFreeMemory(self, self.pnode.name,
4524                            "creating instance %s" % self.op.instance_name,
4525                            self.be_full[constants.BE_MEMORY],
4526                            self.op.hypervisor)
4527
4528     self.instance_status = self.op.start
4529
4530   def Exec(self, feedback_fn):
4531     """Create and add the instance to the cluster.
4532
4533     """
4534     instance = self.op.instance_name
4535     pnode_name = self.pnode.name
4536
4537     for nic in self.nics:
4538       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4539         nic.mac = self.cfg.GenerateMAC()
4540
4541     ht_kind = self.op.hypervisor
4542     if ht_kind in constants.HTS_REQ_PORT:
4543       network_port = self.cfg.AllocatePort()
4544     else:
4545       network_port = None
4546
4547     ##if self.op.vnc_bind_address is None:
4548     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4549
4550     # this is needed because os.path.join does not accept None arguments
4551     if self.op.file_storage_dir is None:
4552       string_file_storage_dir = ""
4553     else:
4554       string_file_storage_dir = self.op.file_storage_dir
4555
4556     # build the full file storage dir path
4557     file_storage_dir = os.path.normpath(os.path.join(
4558                                         self.cfg.GetFileStorageDir(),
4559                                         string_file_storage_dir, instance))
4560
4561
4562     disks = _GenerateDiskTemplate(self,
4563                                   self.op.disk_template,
4564                                   instance, pnode_name,
4565                                   self.secondaries,
4566                                   self.disks,
4567                                   file_storage_dir,
4568                                   self.op.file_driver,
4569                                   0)
4570
4571     iobj = objects.Instance(name=instance, os=self.op.os_type,
4572                             primary_node=pnode_name,
4573                             nics=self.nics, disks=disks,
4574                             disk_template=self.op.disk_template,
4575                             admin_up=self.instance_status,
4576                             network_port=network_port,
4577                             beparams=self.op.beparams,
4578                             hvparams=self.op.hvparams,
4579                             hypervisor=self.op.hypervisor,
4580                             )
4581
4582     feedback_fn("* creating instance disks...")
4583     try:
4584       _CreateDisks(self, iobj)
4585     except errors.OpExecError:
4586       self.LogWarning("Device creation failed, reverting...")
4587       try:
4588         _RemoveDisks(self, iobj)
4589       finally:
4590         self.cfg.ReleaseDRBDMinors(instance)
4591         raise
4592
4593     feedback_fn("adding instance %s to cluster config" % instance)
4594
4595     self.cfg.AddInstance(iobj)
4596     # Declare that we don't want to remove the instance lock anymore, as we've
4597     # added the instance to the config
4598     del self.remove_locks[locking.LEVEL_INSTANCE]
4599     # Unlock all the nodes
4600     if self.op.mode == constants.INSTANCE_IMPORT:
4601       nodes_keep = [self.op.src_node]
4602       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4603                        if node != self.op.src_node]
4604       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4605       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4606     else:
4607       self.context.glm.release(locking.LEVEL_NODE)
4608       del self.acquired_locks[locking.LEVEL_NODE]
4609
4610     if self.op.wait_for_sync:
4611       disk_abort = not _WaitForSync(self, iobj)
4612     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4613       # make sure the disks are not degraded (still sync-ing is ok)
4614       time.sleep(15)
4615       feedback_fn("* checking mirrors status")
4616       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4617     else:
4618       disk_abort = False
4619
4620     if disk_abort:
4621       _RemoveDisks(self, iobj)
4622       self.cfg.RemoveInstance(iobj.name)
4623       # Make sure the instance lock gets removed
4624       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4625       raise errors.OpExecError("There are some degraded disks for"
4626                                " this instance")
4627
4628     feedback_fn("creating os for instance %s on node %s" %
4629                 (instance, pnode_name))
4630
4631     if iobj.disk_template != constants.DT_DISKLESS:
4632       if self.op.mode == constants.INSTANCE_CREATE:
4633         feedback_fn("* running the instance OS create scripts...")
4634         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4635         msg = result.RemoteFailMsg()
4636         if msg:
4637           raise errors.OpExecError("Could not add os for instance %s"
4638                                    " on node %s: %s" %
4639                                    (instance, pnode_name, msg))
4640
4641       elif self.op.mode == constants.INSTANCE_IMPORT:
4642         feedback_fn("* running the instance OS import scripts...")
4643         src_node = self.op.src_node
4644         src_images = self.src_images
4645         cluster_name = self.cfg.GetClusterName()
4646         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4647                                                          src_node, src_images,
4648                                                          cluster_name)
4649         import_result.Raise()
4650         for idx, result in enumerate(import_result.data):
4651           if not result:
4652             self.LogWarning("Could not import the image %s for instance"
4653                             " %s, disk %d, on node %s" %
4654                             (src_images[idx], instance, idx, pnode_name))
4655       else:
4656         # also checked in the prereq part
4657         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4658                                      % self.op.mode)
4659
4660     if self.op.start:
4661       logging.info("Starting instance %s on node %s", instance, pnode_name)
4662       feedback_fn("* starting instance...")
4663       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4664       msg = result.RemoteFailMsg()
4665       if msg:
4666         raise errors.OpExecError("Could not start instance: %s" % msg)
4667
4668
4669 class LUConnectConsole(NoHooksLU):
4670   """Connect to an instance's console.
4671
4672   This is somewhat special in that it returns the command line that
4673   you need to run on the master node in order to connect to the
4674   console.
4675
4676   """
4677   _OP_REQP = ["instance_name"]
4678   REQ_BGL = False
4679
4680   def ExpandNames(self):
4681     self._ExpandAndLockInstance()
4682
4683   def CheckPrereq(self):
4684     """Check prerequisites.
4685
4686     This checks that the instance is in the cluster.
4687
4688     """
4689     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4690     assert self.instance is not None, \
4691       "Cannot retrieve locked instance %s" % self.op.instance_name
4692     _CheckNodeOnline(self, self.instance.primary_node)
4693
4694   def Exec(self, feedback_fn):
4695     """Connect to the console of an instance
4696
4697     """
4698     instance = self.instance
4699     node = instance.primary_node
4700
4701     node_insts = self.rpc.call_instance_list([node],
4702                                              [instance.hypervisor])[node]
4703     node_insts.Raise()
4704
4705     if instance.name not in node_insts.data:
4706       raise errors.OpExecError("Instance %s is not running." % instance.name)
4707
4708     logging.debug("Connecting to console of %s on %s", instance.name, node)
4709
4710     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4711     cluster = self.cfg.GetClusterInfo()
4712     # beparams and hvparams are passed separately, to avoid editing the
4713     # instance and then saving the defaults in the instance itself.
4714     hvparams = cluster.FillHV(instance)
4715     beparams = cluster.FillBE(instance)
4716     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4717
4718     # build ssh cmdline
4719     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4720
4721
4722 class LUReplaceDisks(LogicalUnit):
4723   """Replace the disks of an instance.
4724
4725   """
4726   HPATH = "mirrors-replace"
4727   HTYPE = constants.HTYPE_INSTANCE
4728   _OP_REQP = ["instance_name", "mode", "disks"]
4729   REQ_BGL = False
4730
4731   def CheckArguments(self):
4732     if not hasattr(self.op, "remote_node"):
4733       self.op.remote_node = None
4734     if not hasattr(self.op, "iallocator"):
4735       self.op.iallocator = None
4736
4737     # check for valid parameter combination
4738     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4739     if self.op.mode == constants.REPLACE_DISK_CHG:
4740       if cnt == 2:
4741         raise errors.OpPrereqError("When changing the secondary either an"
4742                                    " iallocator script must be used or the"
4743                                    " new node given")
4744       elif cnt == 0:
4745         raise errors.OpPrereqError("Give either the iallocator or the new"
4746                                    " secondary, not both")
4747     else: # not replacing the secondary
4748       if cnt != 2:
4749         raise errors.OpPrereqError("The iallocator and new node options can"
4750                                    " be used only when changing the"
4751                                    " secondary node")
4752
4753   def ExpandNames(self):
4754     self._ExpandAndLockInstance()
4755
4756     if self.op.iallocator is not None:
4757       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4758     elif self.op.remote_node is not None:
4759       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4760       if remote_node is None:
4761         raise errors.OpPrereqError("Node '%s' not known" %
4762                                    self.op.remote_node)
4763       self.op.remote_node = remote_node
4764       # Warning: do not remove the locking of the new secondary here
4765       # unless DRBD8.AddChildren is changed to work in parallel;
4766       # currently it doesn't since parallel invocations of
4767       # FindUnusedMinor will conflict
4768       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4769       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4770     else:
4771       self.needed_locks[locking.LEVEL_NODE] = []
4772       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4773
4774   def DeclareLocks(self, level):
4775     # If we're not already locking all nodes in the set we have to declare the
4776     # instance's primary/secondary nodes.
4777     if (level == locking.LEVEL_NODE and
4778         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4779       self._LockInstancesNodes()
4780
4781   def _RunAllocator(self):
4782     """Compute a new secondary node using an IAllocator.
4783
4784     """
4785     ial = IAllocator(self,
4786                      mode=constants.IALLOCATOR_MODE_RELOC,
4787                      name=self.op.instance_name,
4788                      relocate_from=[self.sec_node])
4789
4790     ial.Run(self.op.iallocator)
4791
4792     if not ial.success:
4793       raise errors.OpPrereqError("Can't compute nodes using"
4794                                  " iallocator '%s': %s" % (self.op.iallocator,
4795                                                            ial.info))
4796     if len(ial.nodes) != ial.required_nodes:
4797       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4798                                  " of nodes (%s), required %s" %
4799                                  (len(ial.nodes), ial.required_nodes))
4800     self.op.remote_node = ial.nodes[0]
4801     self.LogInfo("Selected new secondary for the instance: %s",
4802                  self.op.remote_node)
4803
4804   def BuildHooksEnv(self):
4805     """Build hooks env.
4806
4807     This runs on the master, the primary and all the secondaries.
4808
4809     """
4810     env = {
4811       "MODE": self.op.mode,
4812       "NEW_SECONDARY": self.op.remote_node,
4813       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4814       }
4815     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4816     nl = [
4817       self.cfg.GetMasterNode(),
4818       self.instance.primary_node,
4819       ]
4820     if self.op.remote_node is not None:
4821       nl.append(self.op.remote_node)
4822     return env, nl, nl
4823
4824   def CheckPrereq(self):
4825     """Check prerequisites.
4826
4827     This checks that the instance is in the cluster.
4828
4829     """
4830     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4831     assert instance is not None, \
4832       "Cannot retrieve locked instance %s" % self.op.instance_name
4833     self.instance = instance
4834
4835     if instance.disk_template != constants.DT_DRBD8:
4836       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4837                                  " instances")
4838
4839     if len(instance.secondary_nodes) != 1:
4840       raise errors.OpPrereqError("The instance has a strange layout,"
4841                                  " expected one secondary but found %d" %
4842                                  len(instance.secondary_nodes))
4843
4844     self.sec_node = instance.secondary_nodes[0]
4845
4846     if self.op.iallocator is not None:
4847       self._RunAllocator()
4848
4849     remote_node = self.op.remote_node
4850     if remote_node is not None:
4851       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4852       assert self.remote_node_info is not None, \
4853         "Cannot retrieve locked node %s" % remote_node
4854     else:
4855       self.remote_node_info = None
4856     if remote_node == instance.primary_node:
4857       raise errors.OpPrereqError("The specified node is the primary node of"
4858                                  " the instance.")
4859     elif remote_node == self.sec_node:
4860       raise errors.OpPrereqError("The specified node is already the"
4861                                  " secondary node of the instance.")
4862
4863     if self.op.mode == constants.REPLACE_DISK_PRI:
4864       n1 = self.tgt_node = instance.primary_node
4865       n2 = self.oth_node = self.sec_node
4866     elif self.op.mode == constants.REPLACE_DISK_SEC:
4867       n1 = self.tgt_node = self.sec_node
4868       n2 = self.oth_node = instance.primary_node
4869     elif self.op.mode == constants.REPLACE_DISK_CHG:
4870       n1 = self.new_node = remote_node
4871       n2 = self.oth_node = instance.primary_node
4872       self.tgt_node = self.sec_node
4873     else:
4874       raise errors.ProgrammerError("Unhandled disk replace mode")
4875
4876     _CheckNodeOnline(self, n1)
4877     _CheckNodeOnline(self, n2)
4878
4879     if not self.op.disks:
4880       self.op.disks = range(len(instance.disks))
4881
4882     for disk_idx in self.op.disks:
4883       instance.FindDisk(disk_idx)
4884
4885   def _ExecD8DiskOnly(self, feedback_fn):
4886     """Replace a disk on the primary or secondary for dbrd8.
4887
4888     The algorithm for replace is quite complicated:
4889
4890       1. for each disk to be replaced:
4891
4892         1. create new LVs on the target node with unique names
4893         1. detach old LVs from the drbd device
4894         1. rename old LVs to name_replaced.<time_t>
4895         1. rename new LVs to old LVs
4896         1. attach the new LVs (with the old names now) to the drbd device
4897
4898       1. wait for sync across all devices
4899
4900       1. for each modified disk:
4901
4902         1. remove old LVs (which have the name name_replaces.<time_t>)
4903
4904     Failures are not very well handled.
4905
4906     """
4907     steps_total = 6
4908     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4909     instance = self.instance
4910     iv_names = {}
4911     vgname = self.cfg.GetVGName()
4912     # start of work
4913     cfg = self.cfg
4914     tgt_node = self.tgt_node
4915     oth_node = self.oth_node
4916
4917     # Step: check device activation
4918     self.proc.LogStep(1, steps_total, "check device existence")
4919     info("checking volume groups")
4920     my_vg = cfg.GetVGName()
4921     results = self.rpc.call_vg_list([oth_node, tgt_node])
4922     if not results:
4923       raise errors.OpExecError("Can't list volume groups on the nodes")
4924     for node in oth_node, tgt_node:
4925       res = results[node]
4926       if res.failed or not res.data or my_vg not in res.data:
4927         raise errors.OpExecError("Volume group '%s' not found on %s" %
4928                                  (my_vg, node))
4929     for idx, dev in enumerate(instance.disks):
4930       if idx not in self.op.disks:
4931         continue
4932       for node in tgt_node, oth_node:
4933         info("checking disk/%d on %s" % (idx, node))
4934         cfg.SetDiskID(dev, node)
4935         if not self.rpc.call_blockdev_find(node, dev):
4936           raise errors.OpExecError("Can't find disk/%d on node %s" %
4937                                    (idx, node))
4938
4939     # Step: check other node consistency
4940     self.proc.LogStep(2, steps_total, "check peer consistency")
4941     for idx, dev in enumerate(instance.disks):
4942       if idx not in self.op.disks:
4943         continue
4944       info("checking disk/%d consistency on %s" % (idx, oth_node))
4945       if not _CheckDiskConsistency(self, dev, oth_node,
4946                                    oth_node==instance.primary_node):
4947         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4948                                  " to replace disks on this node (%s)" %
4949                                  (oth_node, tgt_node))
4950
4951     # Step: create new storage
4952     self.proc.LogStep(3, steps_total, "allocate new storage")
4953     for idx, dev in enumerate(instance.disks):
4954       if idx not in self.op.disks:
4955         continue
4956       size = dev.size
4957       cfg.SetDiskID(dev, tgt_node)
4958       lv_names = [".disk%d_%s" % (idx, suf)
4959                   for suf in ["data", "meta"]]
4960       names = _GenerateUniqueNames(self, lv_names)
4961       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4962                              logical_id=(vgname, names[0]))
4963       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4964                              logical_id=(vgname, names[1]))
4965       new_lvs = [lv_data, lv_meta]
4966       old_lvs = dev.children
4967       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4968       info("creating new local storage on %s for %s" %
4969            (tgt_node, dev.iv_name))
4970       # we pass force_create=True to force the LVM creation
4971       for new_lv in new_lvs:
4972         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
4973                         _GetInstanceInfoText(instance), False)
4974
4975     # Step: for each lv, detach+rename*2+attach
4976     self.proc.LogStep(4, steps_total, "change drbd configuration")
4977     for dev, old_lvs, new_lvs in iv_names.itervalues():
4978       info("detaching %s drbd from local storage" % dev.iv_name)
4979       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4980       result.Raise()
4981       if not result.data:
4982         raise errors.OpExecError("Can't detach drbd from local storage on node"
4983                                  " %s for device %s" % (tgt_node, dev.iv_name))
4984       #dev.children = []
4985       #cfg.Update(instance)
4986
4987       # ok, we created the new LVs, so now we know we have the needed
4988       # storage; as such, we proceed on the target node to rename
4989       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4990       # using the assumption that logical_id == physical_id (which in
4991       # turn is the unique_id on that node)
4992
4993       # FIXME(iustin): use a better name for the replaced LVs
4994       temp_suffix = int(time.time())
4995       ren_fn = lambda d, suff: (d.physical_id[0],
4996                                 d.physical_id[1] + "_replaced-%s" % suff)
4997       # build the rename list based on what LVs exist on the node
4998       rlist = []
4999       for to_ren in old_lvs:
5000         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
5001         if not find_res.failed and find_res.data is not None: # device exists
5002           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5003
5004       info("renaming the old LVs on the target node")
5005       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5006       result.Raise()
5007       if not result.data:
5008         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5009       # now we rename the new LVs to the old LVs
5010       info("renaming the new LVs on the target node")
5011       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5012       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5013       result.Raise()
5014       if not result.data:
5015         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5016
5017       for old, new in zip(old_lvs, new_lvs):
5018         new.logical_id = old.logical_id
5019         cfg.SetDiskID(new, tgt_node)
5020
5021       for disk in old_lvs:
5022         disk.logical_id = ren_fn(disk, temp_suffix)
5023         cfg.SetDiskID(disk, tgt_node)
5024
5025       # now that the new lvs have the old name, we can add them to the device
5026       info("adding new mirror component on %s" % tgt_node)
5027       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5028       if result.failed or not result.data:
5029         for new_lv in new_lvs:
5030           result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
5031           if result.failed or not result.data:
5032             warning("Can't rollback device %s", hint="manually cleanup unused"
5033                     " logical volumes")
5034         raise errors.OpExecError("Can't add local storage to drbd")
5035
5036       dev.children = new_lvs
5037       cfg.Update(instance)
5038
5039     # Step: wait for sync
5040
5041     # this can fail as the old devices are degraded and _WaitForSync
5042     # does a combined result over all disks, so we don't check its
5043     # return value
5044     self.proc.LogStep(5, steps_total, "sync devices")
5045     _WaitForSync(self, instance, unlock=True)
5046
5047     # so check manually all the devices
5048     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5049       cfg.SetDiskID(dev, instance.primary_node)
5050       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5051       if result.failed or result.data[5]:
5052         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5053
5054     # Step: remove old storage
5055     self.proc.LogStep(6, steps_total, "removing old storage")
5056     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5057       info("remove logical volumes for %s" % name)
5058       for lv in old_lvs:
5059         cfg.SetDiskID(lv, tgt_node)
5060         result = self.rpc.call_blockdev_remove(tgt_node, lv)
5061         if result.failed or not result.data:
5062           warning("Can't remove old LV", hint="manually remove unused LVs")
5063           continue
5064
5065   def _ExecD8Secondary(self, feedback_fn):
5066     """Replace the secondary node for drbd8.
5067
5068     The algorithm for replace is quite complicated:
5069       - for all disks of the instance:
5070         - create new LVs on the new node with same names
5071         - shutdown the drbd device on the old secondary
5072         - disconnect the drbd network on the primary
5073         - create the drbd device on the new secondary
5074         - network attach the drbd on the primary, using an artifice:
5075           the drbd code for Attach() will connect to the network if it
5076           finds a device which is connected to the good local disks but
5077           not network enabled
5078       - wait for sync across all devices
5079       - remove all disks from the old secondary
5080
5081     Failures are not very well handled.
5082
5083     """
5084     steps_total = 6
5085     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5086     instance = self.instance
5087     iv_names = {}
5088     # start of work
5089     cfg = self.cfg
5090     old_node = self.tgt_node
5091     new_node = self.new_node
5092     pri_node = instance.primary_node
5093     nodes_ip = {
5094       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5095       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5096       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5097       }
5098
5099     # Step: check device activation
5100     self.proc.LogStep(1, steps_total, "check device existence")
5101     info("checking volume groups")
5102     my_vg = cfg.GetVGName()
5103     results = self.rpc.call_vg_list([pri_node, new_node])
5104     for node in pri_node, new_node:
5105       res = results[node]
5106       if res.failed or not res.data or my_vg not in res.data:
5107         raise errors.OpExecError("Volume group '%s' not found on %s" %
5108                                  (my_vg, node))
5109     for idx, dev in enumerate(instance.disks):
5110       if idx not in self.op.disks:
5111         continue
5112       info("checking disk/%d on %s" % (idx, pri_node))
5113       cfg.SetDiskID(dev, pri_node)
5114       result = self.rpc.call_blockdev_find(pri_node, dev)
5115       result.Raise()
5116       if not result.data:
5117         raise errors.OpExecError("Can't find disk/%d on node %s" %
5118                                  (idx, pri_node))
5119
5120     # Step: check other node consistency
5121     self.proc.LogStep(2, steps_total, "check peer consistency")
5122     for idx, dev in enumerate(instance.disks):
5123       if idx not in self.op.disks:
5124         continue
5125       info("checking disk/%d consistency on %s" % (idx, pri_node))
5126       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5127         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5128                                  " unsafe to replace the secondary" %
5129                                  pri_node)
5130
5131     # Step: create new storage
5132     self.proc.LogStep(3, steps_total, "allocate new storage")
5133     for idx, dev in enumerate(instance.disks):
5134       info("adding new local storage on %s for disk/%d" %
5135            (new_node, idx))
5136       # we pass force_create=True to force LVM creation
5137       for new_lv in dev.children:
5138         _CreateBlockDev(self, new_node, instance, new_lv, True,
5139                         _GetInstanceInfoText(instance), False)
5140
5141     # Step 4: dbrd minors and drbd setups changes
5142     # after this, we must manually remove the drbd minors on both the
5143     # error and the success paths
5144     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5145                                    instance.name)
5146     logging.debug("Allocated minors %s" % (minors,))
5147     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5148     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5149       size = dev.size
5150       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5151       # create new devices on new_node; note that we create two IDs:
5152       # one without port, so the drbd will be activated without
5153       # networking information on the new node at this stage, and one
5154       # with network, for the latter activation in step 4
5155       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5156       if pri_node == o_node1:
5157         p_minor = o_minor1
5158       else:
5159         p_minor = o_minor2
5160
5161       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5162       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5163
5164       iv_names[idx] = (dev, dev.children, new_net_id)
5165       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5166                     new_net_id)
5167       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5168                               logical_id=new_alone_id,
5169                               children=dev.children)
5170       try:
5171         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5172                               _GetInstanceInfoText(instance), False)
5173       except errors.BlockDeviceError:
5174         self.cfg.ReleaseDRBDMinors(instance.name)
5175         raise
5176
5177     for idx, dev in enumerate(instance.disks):
5178       # we have new devices, shutdown the drbd on the old secondary
5179       info("shutting down drbd for disk/%d on old node" % idx)
5180       cfg.SetDiskID(dev, old_node)
5181       result = self.rpc.call_blockdev_shutdown(old_node, dev)
5182       if result.failed or not result.data:
5183         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
5184                 hint="Please cleanup this device manually as soon as possible")
5185
5186     info("detaching primary drbds from the network (=> standalone)")
5187     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5188                                                instance.disks)[pri_node]
5189
5190     msg = result.RemoteFailMsg()
5191     if msg:
5192       # detaches didn't succeed (unlikely)
5193       self.cfg.ReleaseDRBDMinors(instance.name)
5194       raise errors.OpExecError("Can't detach the disks from the network on"
5195                                " old node: %s" % (msg,))
5196
5197     # if we managed to detach at least one, we update all the disks of
5198     # the instance to point to the new secondary
5199     info("updating instance configuration")
5200     for dev, _, new_logical_id in iv_names.itervalues():
5201       dev.logical_id = new_logical_id
5202       cfg.SetDiskID(dev, pri_node)
5203     cfg.Update(instance)
5204
5205     # and now perform the drbd attach
5206     info("attaching primary drbds to new secondary (standalone => connected)")
5207     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5208                                            instance.disks, instance.name,
5209                                            False)
5210     for to_node, to_result in result.items():
5211       msg = to_result.RemoteFailMsg()
5212       if msg:
5213         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5214                 hint="please do a gnt-instance info to see the"
5215                 " status of disks")
5216
5217     # this can fail as the old devices are degraded and _WaitForSync
5218     # does a combined result over all disks, so we don't check its
5219     # return value
5220     self.proc.LogStep(5, steps_total, "sync devices")
5221     _WaitForSync(self, instance, unlock=True)
5222
5223     # so check manually all the devices
5224     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5225       cfg.SetDiskID(dev, pri_node)
5226       result = self.rpc.call_blockdev_find(pri_node, dev)
5227       result.Raise()
5228       if result.data[5]:
5229         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5230
5231     self.proc.LogStep(6, steps_total, "removing old storage")
5232     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5233       info("remove logical volumes for disk/%d" % idx)
5234       for lv in old_lvs:
5235         cfg.SetDiskID(lv, old_node)
5236         result = self.rpc.call_blockdev_remove(old_node, lv)
5237         if result.failed or not result.data:
5238           warning("Can't remove LV on old secondary",
5239                   hint="Cleanup stale volumes by hand")
5240
5241   def Exec(self, feedback_fn):
5242     """Execute disk replacement.
5243
5244     This dispatches the disk replacement to the appropriate handler.
5245
5246     """
5247     instance = self.instance
5248
5249     # Activate the instance disks if we're replacing them on a down instance
5250     if not instance.admin_up:
5251       _StartInstanceDisks(self, instance, True)
5252
5253     if self.op.mode == constants.REPLACE_DISK_CHG:
5254       fn = self._ExecD8Secondary
5255     else:
5256       fn = self._ExecD8DiskOnly
5257
5258     ret = fn(feedback_fn)
5259
5260     # Deactivate the instance disks if we're replacing them on a down instance
5261     if not instance.admin_up:
5262       _SafeShutdownInstanceDisks(self, instance)
5263
5264     return ret
5265
5266
5267 class LUGrowDisk(LogicalUnit):
5268   """Grow a disk of an instance.
5269
5270   """
5271   HPATH = "disk-grow"
5272   HTYPE = constants.HTYPE_INSTANCE
5273   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5274   REQ_BGL = False
5275
5276   def ExpandNames(self):
5277     self._ExpandAndLockInstance()
5278     self.needed_locks[locking.LEVEL_NODE] = []
5279     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5280
5281   def DeclareLocks(self, level):
5282     if level == locking.LEVEL_NODE:
5283       self._LockInstancesNodes()
5284
5285   def BuildHooksEnv(self):
5286     """Build hooks env.
5287
5288     This runs on the master, the primary and all the secondaries.
5289
5290     """
5291     env = {
5292       "DISK": self.op.disk,
5293       "AMOUNT": self.op.amount,
5294       }
5295     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5296     nl = [
5297       self.cfg.GetMasterNode(),
5298       self.instance.primary_node,
5299       ]
5300     return env, nl, nl
5301
5302   def CheckPrereq(self):
5303     """Check prerequisites.
5304
5305     This checks that the instance is in the cluster.
5306
5307     """
5308     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5309     assert instance is not None, \
5310       "Cannot retrieve locked instance %s" % self.op.instance_name
5311     nodenames = list(instance.all_nodes)
5312     for node in nodenames:
5313       _CheckNodeOnline(self, node)
5314
5315
5316     self.instance = instance
5317
5318     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5319       raise errors.OpPrereqError("Instance's disk layout does not support"
5320                                  " growing.")
5321
5322     self.disk = instance.FindDisk(self.op.disk)
5323
5324     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5325                                        instance.hypervisor)
5326     for node in nodenames:
5327       info = nodeinfo[node]
5328       if info.failed or not info.data:
5329         raise errors.OpPrereqError("Cannot get current information"
5330                                    " from node '%s'" % node)
5331       vg_free = info.data.get('vg_free', None)
5332       if not isinstance(vg_free, int):
5333         raise errors.OpPrereqError("Can't compute free disk space on"
5334                                    " node %s" % node)
5335       if self.op.amount > vg_free:
5336         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5337                                    " %d MiB available, %d MiB required" %
5338                                    (node, vg_free, self.op.amount))
5339
5340   def Exec(self, feedback_fn):
5341     """Execute disk grow.
5342
5343     """
5344     instance = self.instance
5345     disk = self.disk
5346     for node in instance.all_nodes:
5347       self.cfg.SetDiskID(disk, node)
5348       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5349       result.Raise()
5350       if (not result.data or not isinstance(result.data, (list, tuple)) or
5351           len(result.data) != 2):
5352         raise errors.OpExecError("Grow request failed to node %s" % node)
5353       elif not result.data[0]:
5354         raise errors.OpExecError("Grow request failed to node %s: %s" %
5355                                  (node, result.data[1]))
5356     disk.RecordGrow(self.op.amount)
5357     self.cfg.Update(instance)
5358     if self.op.wait_for_sync:
5359       disk_abort = not _WaitForSync(self, instance)
5360       if disk_abort:
5361         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5362                              " status.\nPlease check the instance.")
5363
5364
5365 class LUQueryInstanceData(NoHooksLU):
5366   """Query runtime instance data.
5367
5368   """
5369   _OP_REQP = ["instances", "static"]
5370   REQ_BGL = False
5371
5372   def ExpandNames(self):
5373     self.needed_locks = {}
5374     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5375
5376     if not isinstance(self.op.instances, list):
5377       raise errors.OpPrereqError("Invalid argument type 'instances'")
5378
5379     if self.op.instances:
5380       self.wanted_names = []
5381       for name in self.op.instances:
5382         full_name = self.cfg.ExpandInstanceName(name)
5383         if full_name is None:
5384           raise errors.OpPrereqError("Instance '%s' not known" % name)
5385         self.wanted_names.append(full_name)
5386       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5387     else:
5388       self.wanted_names = None
5389       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5390
5391     self.needed_locks[locking.LEVEL_NODE] = []
5392     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5393
5394   def DeclareLocks(self, level):
5395     if level == locking.LEVEL_NODE:
5396       self._LockInstancesNodes()
5397
5398   def CheckPrereq(self):
5399     """Check prerequisites.
5400
5401     This only checks the optional instance list against the existing names.
5402
5403     """
5404     if self.wanted_names is None:
5405       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5406
5407     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5408                              in self.wanted_names]
5409     return
5410
5411   def _ComputeDiskStatus(self, instance, snode, dev):
5412     """Compute block device status.
5413
5414     """
5415     static = self.op.static
5416     if not static:
5417       self.cfg.SetDiskID(dev, instance.primary_node)
5418       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5419       dev_pstatus.Raise()
5420       dev_pstatus = dev_pstatus.data
5421     else:
5422       dev_pstatus = None
5423
5424     if dev.dev_type in constants.LDS_DRBD:
5425       # we change the snode then (otherwise we use the one passed in)
5426       if dev.logical_id[0] == instance.primary_node:
5427         snode = dev.logical_id[1]
5428       else:
5429         snode = dev.logical_id[0]
5430
5431     if snode and not static:
5432       self.cfg.SetDiskID(dev, snode)
5433       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5434       dev_sstatus.Raise()
5435       dev_sstatus = dev_sstatus.data
5436     else:
5437       dev_sstatus = None
5438
5439     if dev.children:
5440       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5441                       for child in dev.children]
5442     else:
5443       dev_children = []
5444
5445     data = {
5446       "iv_name": dev.iv_name,
5447       "dev_type": dev.dev_type,
5448       "logical_id": dev.logical_id,
5449       "physical_id": dev.physical_id,
5450       "pstatus": dev_pstatus,
5451       "sstatus": dev_sstatus,
5452       "children": dev_children,
5453       "mode": dev.mode,
5454       }
5455
5456     return data
5457
5458   def Exec(self, feedback_fn):
5459     """Gather and return data"""
5460     result = {}
5461
5462     cluster = self.cfg.GetClusterInfo()
5463
5464     for instance in self.wanted_instances:
5465       if not self.op.static:
5466         remote_info = self.rpc.call_instance_info(instance.primary_node,
5467                                                   instance.name,
5468                                                   instance.hypervisor)
5469         remote_info.Raise()
5470         remote_info = remote_info.data
5471         if remote_info and "state" in remote_info:
5472           remote_state = "up"
5473         else:
5474           remote_state = "down"
5475       else:
5476         remote_state = None
5477       if instance.admin_up:
5478         config_state = "up"
5479       else:
5480         config_state = "down"
5481
5482       disks = [self._ComputeDiskStatus(instance, None, device)
5483                for device in instance.disks]
5484
5485       idict = {
5486         "name": instance.name,
5487         "config_state": config_state,
5488         "run_state": remote_state,
5489         "pnode": instance.primary_node,
5490         "snodes": instance.secondary_nodes,
5491         "os": instance.os,
5492         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5493         "disks": disks,
5494         "hypervisor": instance.hypervisor,
5495         "network_port": instance.network_port,
5496         "hv_instance": instance.hvparams,
5497         "hv_actual": cluster.FillHV(instance),
5498         "be_instance": instance.beparams,
5499         "be_actual": cluster.FillBE(instance),
5500         }
5501
5502       result[instance.name] = idict
5503
5504     return result
5505
5506
5507 class LUSetInstanceParams(LogicalUnit):
5508   """Modifies an instances's parameters.
5509
5510   """
5511   HPATH = "instance-modify"
5512   HTYPE = constants.HTYPE_INSTANCE
5513   _OP_REQP = ["instance_name"]
5514   REQ_BGL = False
5515
5516   def CheckArguments(self):
5517     if not hasattr(self.op, 'nics'):
5518       self.op.nics = []
5519     if not hasattr(self.op, 'disks'):
5520       self.op.disks = []
5521     if not hasattr(self.op, 'beparams'):
5522       self.op.beparams = {}
5523     if not hasattr(self.op, 'hvparams'):
5524       self.op.hvparams = {}
5525     self.op.force = getattr(self.op, "force", False)
5526     if not (self.op.nics or self.op.disks or
5527             self.op.hvparams or self.op.beparams):
5528       raise errors.OpPrereqError("No changes submitted")
5529
5530     utils.CheckBEParams(self.op.beparams)
5531
5532     # Disk validation
5533     disk_addremove = 0
5534     for disk_op, disk_dict in self.op.disks:
5535       if disk_op == constants.DDM_REMOVE:
5536         disk_addremove += 1
5537         continue
5538       elif disk_op == constants.DDM_ADD:
5539         disk_addremove += 1
5540       else:
5541         if not isinstance(disk_op, int):
5542           raise errors.OpPrereqError("Invalid disk index")
5543       if disk_op == constants.DDM_ADD:
5544         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5545         if mode not in constants.DISK_ACCESS_SET:
5546           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5547         size = disk_dict.get('size', None)
5548         if size is None:
5549           raise errors.OpPrereqError("Required disk parameter size missing")
5550         try:
5551           size = int(size)
5552         except ValueError, err:
5553           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5554                                      str(err))
5555         disk_dict['size'] = size
5556       else:
5557         # modification of disk
5558         if 'size' in disk_dict:
5559           raise errors.OpPrereqError("Disk size change not possible, use"
5560                                      " grow-disk")
5561
5562     if disk_addremove > 1:
5563       raise errors.OpPrereqError("Only one disk add or remove operation"
5564                                  " supported at a time")
5565
5566     # NIC validation
5567     nic_addremove = 0
5568     for nic_op, nic_dict in self.op.nics:
5569       if nic_op == constants.DDM_REMOVE:
5570         nic_addremove += 1
5571         continue
5572       elif nic_op == constants.DDM_ADD:
5573         nic_addremove += 1
5574       else:
5575         if not isinstance(nic_op, int):
5576           raise errors.OpPrereqError("Invalid nic index")
5577
5578       # nic_dict should be a dict
5579       nic_ip = nic_dict.get('ip', None)
5580       if nic_ip is not None:
5581         if nic_ip.lower() == "none":
5582           nic_dict['ip'] = None
5583         else:
5584           if not utils.IsValidIP(nic_ip):
5585             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5586       # we can only check None bridges and assign the default one
5587       nic_bridge = nic_dict.get('bridge', None)
5588       if nic_bridge is None:
5589         nic_dict['bridge'] = self.cfg.GetDefBridge()
5590       # but we can validate MACs
5591       nic_mac = nic_dict.get('mac', None)
5592       if nic_mac is not None:
5593         if self.cfg.IsMacInUse(nic_mac):
5594           raise errors.OpPrereqError("MAC address %s already in use"
5595                                      " in cluster" % nic_mac)
5596         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5597           if not utils.IsValidMac(nic_mac):
5598             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5599     if nic_addremove > 1:
5600       raise errors.OpPrereqError("Only one NIC add or remove operation"
5601                                  " supported at a time")
5602
5603   def ExpandNames(self):
5604     self._ExpandAndLockInstance()
5605     self.needed_locks[locking.LEVEL_NODE] = []
5606     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5607
5608   def DeclareLocks(self, level):
5609     if level == locking.LEVEL_NODE:
5610       self._LockInstancesNodes()
5611
5612   def BuildHooksEnv(self):
5613     """Build hooks env.
5614
5615     This runs on the master, primary and secondaries.
5616
5617     """
5618     args = dict()
5619     if constants.BE_MEMORY in self.be_new:
5620       args['memory'] = self.be_new[constants.BE_MEMORY]
5621     if constants.BE_VCPUS in self.be_new:
5622       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5623     # FIXME: readd disk/nic changes
5624     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5625     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5626     return env, nl, nl
5627
5628   def CheckPrereq(self):
5629     """Check prerequisites.
5630
5631     This only checks the instance list against the existing names.
5632
5633     """
5634     force = self.force = self.op.force
5635
5636     # checking the new params on the primary/secondary nodes
5637
5638     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5639     assert self.instance is not None, \
5640       "Cannot retrieve locked instance %s" % self.op.instance_name
5641     pnode = instance.primary_node
5642     nodelist = list(instance.all_nodes)
5643
5644     # hvparams processing
5645     if self.op.hvparams:
5646       i_hvdict = copy.deepcopy(instance.hvparams)
5647       for key, val in self.op.hvparams.iteritems():
5648         if val == constants.VALUE_DEFAULT:
5649           try:
5650             del i_hvdict[key]
5651           except KeyError:
5652             pass
5653         elif val == constants.VALUE_NONE:
5654           i_hvdict[key] = None
5655         else:
5656           i_hvdict[key] = val
5657       cluster = self.cfg.GetClusterInfo()
5658       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5659                                 i_hvdict)
5660       # local check
5661       hypervisor.GetHypervisor(
5662         instance.hypervisor).CheckParameterSyntax(hv_new)
5663       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5664       self.hv_new = hv_new # the new actual values
5665       self.hv_inst = i_hvdict # the new dict (without defaults)
5666     else:
5667       self.hv_new = self.hv_inst = {}
5668
5669     # beparams processing
5670     if self.op.beparams:
5671       i_bedict = copy.deepcopy(instance.beparams)
5672       for key, val in self.op.beparams.iteritems():
5673         if val == constants.VALUE_DEFAULT:
5674           try:
5675             del i_bedict[key]
5676           except KeyError:
5677             pass
5678         else:
5679           i_bedict[key] = val
5680       cluster = self.cfg.GetClusterInfo()
5681       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5682                                 i_bedict)
5683       self.be_new = be_new # the new actual values
5684       self.be_inst = i_bedict # the new dict (without defaults)
5685     else:
5686       self.be_new = self.be_inst = {}
5687
5688     self.warn = []
5689
5690     if constants.BE_MEMORY in self.op.beparams and not self.force:
5691       mem_check_list = [pnode]
5692       if be_new[constants.BE_AUTO_BALANCE]:
5693         # either we changed auto_balance to yes or it was from before
5694         mem_check_list.extend(instance.secondary_nodes)
5695       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5696                                                   instance.hypervisor)
5697       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5698                                          instance.hypervisor)
5699       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5700         # Assume the primary node is unreachable and go ahead
5701         self.warn.append("Can't get info from primary node %s" % pnode)
5702       else:
5703         if not instance_info.failed and instance_info.data:
5704           current_mem = instance_info.data['memory']
5705         else:
5706           # Assume instance not running
5707           # (there is a slight race condition here, but it's not very probable,
5708           # and we have no other way to check)
5709           current_mem = 0
5710         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5711                     nodeinfo[pnode].data['memory_free'])
5712         if miss_mem > 0:
5713           raise errors.OpPrereqError("This change will prevent the instance"
5714                                      " from starting, due to %d MB of memory"
5715                                      " missing on its primary node" % miss_mem)
5716
5717       if be_new[constants.BE_AUTO_BALANCE]:
5718         for node, nres in nodeinfo.iteritems():
5719           if node not in instance.secondary_nodes:
5720             continue
5721           if nres.failed or not isinstance(nres.data, dict):
5722             self.warn.append("Can't get info from secondary node %s" % node)
5723           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5724             self.warn.append("Not enough memory to failover instance to"
5725                              " secondary node %s" % node)
5726
5727     # NIC processing
5728     for nic_op, nic_dict in self.op.nics:
5729       if nic_op == constants.DDM_REMOVE:
5730         if not instance.nics:
5731           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5732         continue
5733       if nic_op != constants.DDM_ADD:
5734         # an existing nic
5735         if nic_op < 0 or nic_op >= len(instance.nics):
5736           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5737                                      " are 0 to %d" %
5738                                      (nic_op, len(instance.nics)))
5739       nic_bridge = nic_dict.get('bridge', None)
5740       if nic_bridge is not None:
5741         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5742           msg = ("Bridge '%s' doesn't exist on one of"
5743                  " the instance nodes" % nic_bridge)
5744           if self.force:
5745             self.warn.append(msg)
5746           else:
5747             raise errors.OpPrereqError(msg)
5748
5749     # DISK processing
5750     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5751       raise errors.OpPrereqError("Disk operations not supported for"
5752                                  " diskless instances")
5753     for disk_op, disk_dict in self.op.disks:
5754       if disk_op == constants.DDM_REMOVE:
5755         if len(instance.disks) == 1:
5756           raise errors.OpPrereqError("Cannot remove the last disk of"
5757                                      " an instance")
5758         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5759         ins_l = ins_l[pnode]
5760         if ins_l.failed or not isinstance(ins_l.data, list):
5761           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5762         if instance.name in ins_l.data:
5763           raise errors.OpPrereqError("Instance is running, can't remove"
5764                                      " disks.")
5765
5766       if (disk_op == constants.DDM_ADD and
5767           len(instance.nics) >= constants.MAX_DISKS):
5768         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5769                                    " add more" % constants.MAX_DISKS)
5770       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5771         # an existing disk
5772         if disk_op < 0 or disk_op >= len(instance.disks):
5773           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5774                                      " are 0 to %d" %
5775                                      (disk_op, len(instance.disks)))
5776
5777     return
5778
5779   def Exec(self, feedback_fn):
5780     """Modifies an instance.
5781
5782     All parameters take effect only at the next restart of the instance.
5783
5784     """
5785     # Process here the warnings from CheckPrereq, as we don't have a
5786     # feedback_fn there.
5787     for warn in self.warn:
5788       feedback_fn("WARNING: %s" % warn)
5789
5790     result = []
5791     instance = self.instance
5792     # disk changes
5793     for disk_op, disk_dict in self.op.disks:
5794       if disk_op == constants.DDM_REMOVE:
5795         # remove the last disk
5796         device = instance.disks.pop()
5797         device_idx = len(instance.disks)
5798         for node, disk in device.ComputeNodeTree(instance.primary_node):
5799           self.cfg.SetDiskID(disk, node)
5800           rpc_result = self.rpc.call_blockdev_remove(node, disk)
5801           if rpc_result.failed or not rpc_result.data:
5802             self.proc.LogWarning("Could not remove disk/%d on node %s,"
5803                                  " continuing anyway", device_idx, node)
5804         result.append(("disk/%d" % device_idx, "remove"))
5805       elif disk_op == constants.DDM_ADD:
5806         # add a new disk
5807         if instance.disk_template == constants.DT_FILE:
5808           file_driver, file_path = instance.disks[0].logical_id
5809           file_path = os.path.dirname(file_path)
5810         else:
5811           file_driver = file_path = None
5812         disk_idx_base = len(instance.disks)
5813         new_disk = _GenerateDiskTemplate(self,
5814                                          instance.disk_template,
5815                                          instance.name, instance.primary_node,
5816                                          instance.secondary_nodes,
5817                                          [disk_dict],
5818                                          file_path,
5819                                          file_driver,
5820                                          disk_idx_base)[0]
5821         instance.disks.append(new_disk)
5822         info = _GetInstanceInfoText(instance)
5823
5824         logging.info("Creating volume %s for instance %s",
5825                      new_disk.iv_name, instance.name)
5826         # Note: this needs to be kept in sync with _CreateDisks
5827         #HARDCODE
5828         for node in instance.all_nodes:
5829           f_create = node == instance.primary_node
5830           try:
5831             _CreateBlockDev(self, node, instance, new_disk,
5832                             f_create, info, f_create)
5833           except errors.OpExecError, err:
5834             self.LogWarning("Failed to create volume %s (%s) on"
5835                             " node %s: %s",
5836                             new_disk.iv_name, new_disk, node, err)
5837         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5838                        (new_disk.size, new_disk.mode)))
5839       else:
5840         # change a given disk
5841         instance.disks[disk_op].mode = disk_dict['mode']
5842         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5843     # NIC changes
5844     for nic_op, nic_dict in self.op.nics:
5845       if nic_op == constants.DDM_REMOVE:
5846         # remove the last nic
5847         del instance.nics[-1]
5848         result.append(("nic.%d" % len(instance.nics), "remove"))
5849       elif nic_op == constants.DDM_ADD:
5850         # add a new nic
5851         if 'mac' not in nic_dict:
5852           mac = constants.VALUE_GENERATE
5853         else:
5854           mac = nic_dict['mac']
5855         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5856           mac = self.cfg.GenerateMAC()
5857         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5858                               bridge=nic_dict.get('bridge', None))
5859         instance.nics.append(new_nic)
5860         result.append(("nic.%d" % (len(instance.nics) - 1),
5861                        "add:mac=%s,ip=%s,bridge=%s" %
5862                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5863       else:
5864         # change a given nic
5865         for key in 'mac', 'ip', 'bridge':
5866           if key in nic_dict:
5867             setattr(instance.nics[nic_op], key, nic_dict[key])
5868             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5869
5870     # hvparams changes
5871     if self.op.hvparams:
5872       instance.hvparams = self.hv_new
5873       for key, val in self.op.hvparams.iteritems():
5874         result.append(("hv/%s" % key, val))
5875
5876     # beparams changes
5877     if self.op.beparams:
5878       instance.beparams = self.be_inst
5879       for key, val in self.op.beparams.iteritems():
5880         result.append(("be/%s" % key, val))
5881
5882     self.cfg.Update(instance)
5883
5884     return result
5885
5886
5887 class LUQueryExports(NoHooksLU):
5888   """Query the exports list
5889
5890   """
5891   _OP_REQP = ['nodes']
5892   REQ_BGL = False
5893
5894   def ExpandNames(self):
5895     self.needed_locks = {}
5896     self.share_locks[locking.LEVEL_NODE] = 1
5897     if not self.op.nodes:
5898       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5899     else:
5900       self.needed_locks[locking.LEVEL_NODE] = \
5901         _GetWantedNodes(self, self.op.nodes)
5902
5903   def CheckPrereq(self):
5904     """Check prerequisites.
5905
5906     """
5907     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5908
5909   def Exec(self, feedback_fn):
5910     """Compute the list of all the exported system images.
5911
5912     @rtype: dict
5913     @return: a dictionary with the structure node->(export-list)
5914         where export-list is a list of the instances exported on
5915         that node.
5916
5917     """
5918     rpcresult = self.rpc.call_export_list(self.nodes)
5919     result = {}
5920     for node in rpcresult:
5921       if rpcresult[node].failed:
5922         result[node] = False
5923       else:
5924         result[node] = rpcresult[node].data
5925
5926     return result
5927
5928
5929 class LUExportInstance(LogicalUnit):
5930   """Export an instance to an image in the cluster.
5931
5932   """
5933   HPATH = "instance-export"
5934   HTYPE = constants.HTYPE_INSTANCE
5935   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5936   REQ_BGL = False
5937
5938   def ExpandNames(self):
5939     self._ExpandAndLockInstance()
5940     # FIXME: lock only instance primary and destination node
5941     #
5942     # Sad but true, for now we have do lock all nodes, as we don't know where
5943     # the previous export might be, and and in this LU we search for it and
5944     # remove it from its current node. In the future we could fix this by:
5945     #  - making a tasklet to search (share-lock all), then create the new one,
5946     #    then one to remove, after
5947     #  - removing the removal operation altoghether
5948     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5949
5950   def DeclareLocks(self, level):
5951     """Last minute lock declaration."""
5952     # All nodes are locked anyway, so nothing to do here.
5953
5954   def BuildHooksEnv(self):
5955     """Build hooks env.
5956
5957     This will run on the master, primary node and target node.
5958
5959     """
5960     env = {
5961       "EXPORT_NODE": self.op.target_node,
5962       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5963       }
5964     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5965     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5966           self.op.target_node]
5967     return env, nl, nl
5968
5969   def CheckPrereq(self):
5970     """Check prerequisites.
5971
5972     This checks that the instance and node names are valid.
5973
5974     """
5975     instance_name = self.op.instance_name
5976     self.instance = self.cfg.GetInstanceInfo(instance_name)
5977     assert self.instance is not None, \
5978           "Cannot retrieve locked instance %s" % self.op.instance_name
5979     _CheckNodeOnline(self, self.instance.primary_node)
5980
5981     self.dst_node = self.cfg.GetNodeInfo(
5982       self.cfg.ExpandNodeName(self.op.target_node))
5983
5984     if self.dst_node is None:
5985       # This is wrong node name, not a non-locked node
5986       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5987     _CheckNodeOnline(self, self.dst_node.name)
5988
5989     # instance disk type verification
5990     for disk in self.instance.disks:
5991       if disk.dev_type == constants.LD_FILE:
5992         raise errors.OpPrereqError("Export not supported for instances with"
5993                                    " file-based disks")
5994
5995   def Exec(self, feedback_fn):
5996     """Export an instance to an image in the cluster.
5997
5998     """
5999     instance = self.instance
6000     dst_node = self.dst_node
6001     src_node = instance.primary_node
6002     if self.op.shutdown:
6003       # shutdown the instance, but not the disks
6004       result = self.rpc.call_instance_shutdown(src_node, instance)
6005       result.Raise()
6006       if not result.data:
6007         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
6008                                  (instance.name, src_node))
6009
6010     vgname = self.cfg.GetVGName()
6011
6012     snap_disks = []
6013
6014     # set the disks ID correctly since call_instance_start needs the
6015     # correct drbd minor to create the symlinks
6016     for disk in instance.disks:
6017       self.cfg.SetDiskID(disk, src_node)
6018
6019     try:
6020       for disk in instance.disks:
6021         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6022         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6023         if new_dev_name.failed or not new_dev_name.data:
6024           self.LogWarning("Could not snapshot block device %s on node %s",
6025                           disk.logical_id[1], src_node)
6026           snap_disks.append(False)
6027         else:
6028           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6029                                  logical_id=(vgname, new_dev_name.data),
6030                                  physical_id=(vgname, new_dev_name.data),
6031                                  iv_name=disk.iv_name)
6032           snap_disks.append(new_dev)
6033
6034     finally:
6035       if self.op.shutdown and instance.admin_up:
6036         result = self.rpc.call_instance_start(src_node, instance, None)
6037         msg = result.RemoteFailMsg()
6038         if msg:
6039           _ShutdownInstanceDisks(self, instance)
6040           raise errors.OpExecError("Could not start instance: %s" % msg)
6041
6042     # TODO: check for size
6043
6044     cluster_name = self.cfg.GetClusterName()
6045     for idx, dev in enumerate(snap_disks):
6046       if dev:
6047         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6048                                                instance, cluster_name, idx)
6049         if result.failed or not result.data:
6050           self.LogWarning("Could not export block device %s from node %s to"
6051                           " node %s", dev.logical_id[1], src_node,
6052                           dst_node.name)
6053         result = self.rpc.call_blockdev_remove(src_node, dev)
6054         if result.failed or not result.data:
6055           self.LogWarning("Could not remove snapshot block device %s from node"
6056                           " %s", dev.logical_id[1], src_node)
6057
6058     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6059     if result.failed or not result.data:
6060       self.LogWarning("Could not finalize export for instance %s on node %s",
6061                       instance.name, dst_node.name)
6062
6063     nodelist = self.cfg.GetNodeList()
6064     nodelist.remove(dst_node.name)
6065
6066     # on one-node clusters nodelist will be empty after the removal
6067     # if we proceed the backup would be removed because OpQueryExports
6068     # substitutes an empty list with the full cluster node list.
6069     if nodelist:
6070       exportlist = self.rpc.call_export_list(nodelist)
6071       for node in exportlist:
6072         if exportlist[node].failed:
6073           continue
6074         if instance.name in exportlist[node].data:
6075           if not self.rpc.call_export_remove(node, instance.name):
6076             self.LogWarning("Could not remove older export for instance %s"
6077                             " on node %s", instance.name, node)
6078
6079
6080 class LURemoveExport(NoHooksLU):
6081   """Remove exports related to the named instance.
6082
6083   """
6084   _OP_REQP = ["instance_name"]
6085   REQ_BGL = False
6086
6087   def ExpandNames(self):
6088     self.needed_locks = {}
6089     # We need all nodes to be locked in order for RemoveExport to work, but we
6090     # don't need to lock the instance itself, as nothing will happen to it (and
6091     # we can remove exports also for a removed instance)
6092     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6093
6094   def CheckPrereq(self):
6095     """Check prerequisites.
6096     """
6097     pass
6098
6099   def Exec(self, feedback_fn):
6100     """Remove any export.
6101
6102     """
6103     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6104     # If the instance was not found we'll try with the name that was passed in.
6105     # This will only work if it was an FQDN, though.
6106     fqdn_warn = False
6107     if not instance_name:
6108       fqdn_warn = True
6109       instance_name = self.op.instance_name
6110
6111     exportlist = self.rpc.call_export_list(self.acquired_locks[
6112       locking.LEVEL_NODE])
6113     found = False
6114     for node in exportlist:
6115       if exportlist[node].failed:
6116         self.LogWarning("Failed to query node %s, continuing" % node)
6117         continue
6118       if instance_name in exportlist[node].data:
6119         found = True
6120         result = self.rpc.call_export_remove(node, instance_name)
6121         if result.failed or not result.data:
6122           logging.error("Could not remove export for instance %s"
6123                         " on node %s", instance_name, node)
6124
6125     if fqdn_warn and not found:
6126       feedback_fn("Export not found. If trying to remove an export belonging"
6127                   " to a deleted instance please use its Fully Qualified"
6128                   " Domain Name.")
6129
6130
6131 class TagsLU(NoHooksLU):
6132   """Generic tags LU.
6133
6134   This is an abstract class which is the parent of all the other tags LUs.
6135
6136   """
6137
6138   def ExpandNames(self):
6139     self.needed_locks = {}
6140     if self.op.kind == constants.TAG_NODE:
6141       name = self.cfg.ExpandNodeName(self.op.name)
6142       if name is None:
6143         raise errors.OpPrereqError("Invalid node name (%s)" %
6144                                    (self.op.name,))
6145       self.op.name = name
6146       self.needed_locks[locking.LEVEL_NODE] = name
6147     elif self.op.kind == constants.TAG_INSTANCE:
6148       name = self.cfg.ExpandInstanceName(self.op.name)
6149       if name is None:
6150         raise errors.OpPrereqError("Invalid instance name (%s)" %
6151                                    (self.op.name,))
6152       self.op.name = name
6153       self.needed_locks[locking.LEVEL_INSTANCE] = name
6154
6155   def CheckPrereq(self):
6156     """Check prerequisites.
6157
6158     """
6159     if self.op.kind == constants.TAG_CLUSTER:
6160       self.target = self.cfg.GetClusterInfo()
6161     elif self.op.kind == constants.TAG_NODE:
6162       self.target = self.cfg.GetNodeInfo(self.op.name)
6163     elif self.op.kind == constants.TAG_INSTANCE:
6164       self.target = self.cfg.GetInstanceInfo(self.op.name)
6165     else:
6166       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6167                                  str(self.op.kind))
6168
6169
6170 class LUGetTags(TagsLU):
6171   """Returns the tags of a given object.
6172
6173   """
6174   _OP_REQP = ["kind", "name"]
6175   REQ_BGL = False
6176
6177   def Exec(self, feedback_fn):
6178     """Returns the tag list.
6179
6180     """
6181     return list(self.target.GetTags())
6182
6183
6184 class LUSearchTags(NoHooksLU):
6185   """Searches the tags for a given pattern.
6186
6187   """
6188   _OP_REQP = ["pattern"]
6189   REQ_BGL = False
6190
6191   def ExpandNames(self):
6192     self.needed_locks = {}
6193
6194   def CheckPrereq(self):
6195     """Check prerequisites.
6196
6197     This checks the pattern passed for validity by compiling it.
6198
6199     """
6200     try:
6201       self.re = re.compile(self.op.pattern)
6202     except re.error, err:
6203       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6204                                  (self.op.pattern, err))
6205
6206   def Exec(self, feedback_fn):
6207     """Returns the tag list.
6208
6209     """
6210     cfg = self.cfg
6211     tgts = [("/cluster", cfg.GetClusterInfo())]
6212     ilist = cfg.GetAllInstancesInfo().values()
6213     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6214     nlist = cfg.GetAllNodesInfo().values()
6215     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6216     results = []
6217     for path, target in tgts:
6218       for tag in target.GetTags():
6219         if self.re.search(tag):
6220           results.append((path, tag))
6221     return results
6222
6223
6224 class LUAddTags(TagsLU):
6225   """Sets a tag on a given object.
6226
6227   """
6228   _OP_REQP = ["kind", "name", "tags"]
6229   REQ_BGL = False
6230
6231   def CheckPrereq(self):
6232     """Check prerequisites.
6233
6234     This checks the type and length of the tag name and value.
6235
6236     """
6237     TagsLU.CheckPrereq(self)
6238     for tag in self.op.tags:
6239       objects.TaggableObject.ValidateTag(tag)
6240
6241   def Exec(self, feedback_fn):
6242     """Sets the tag.
6243
6244     """
6245     try:
6246       for tag in self.op.tags:
6247         self.target.AddTag(tag)
6248     except errors.TagError, err:
6249       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6250     try:
6251       self.cfg.Update(self.target)
6252     except errors.ConfigurationError:
6253       raise errors.OpRetryError("There has been a modification to the"
6254                                 " config file and the operation has been"
6255                                 " aborted. Please retry.")
6256
6257
6258 class LUDelTags(TagsLU):
6259   """Delete a list of tags from a given object.
6260
6261   """
6262   _OP_REQP = ["kind", "name", "tags"]
6263   REQ_BGL = False
6264
6265   def CheckPrereq(self):
6266     """Check prerequisites.
6267
6268     This checks that we have the given tag.
6269
6270     """
6271     TagsLU.CheckPrereq(self)
6272     for tag in self.op.tags:
6273       objects.TaggableObject.ValidateTag(tag)
6274     del_tags = frozenset(self.op.tags)
6275     cur_tags = self.target.GetTags()
6276     if not del_tags <= cur_tags:
6277       diff_tags = del_tags - cur_tags
6278       diff_names = ["'%s'" % tag for tag in diff_tags]
6279       diff_names.sort()
6280       raise errors.OpPrereqError("Tag(s) %s not found" %
6281                                  (",".join(diff_names)))
6282
6283   def Exec(self, feedback_fn):
6284     """Remove the tag from the object.
6285
6286     """
6287     for tag in self.op.tags:
6288       self.target.RemoveTag(tag)
6289     try:
6290       self.cfg.Update(self.target)
6291     except errors.ConfigurationError:
6292       raise errors.OpRetryError("There has been a modification to the"
6293                                 " config file and the operation has been"
6294                                 " aborted. Please retry.")
6295
6296
6297 class LUTestDelay(NoHooksLU):
6298   """Sleep for a specified amount of time.
6299
6300   This LU sleeps on the master and/or nodes for a specified amount of
6301   time.
6302
6303   """
6304   _OP_REQP = ["duration", "on_master", "on_nodes"]
6305   REQ_BGL = False
6306
6307   def ExpandNames(self):
6308     """Expand names and set required locks.
6309
6310     This expands the node list, if any.
6311
6312     """
6313     self.needed_locks = {}
6314     if self.op.on_nodes:
6315       # _GetWantedNodes can be used here, but is not always appropriate to use
6316       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6317       # more information.
6318       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6319       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6320
6321   def CheckPrereq(self):
6322     """Check prerequisites.
6323
6324     """
6325
6326   def Exec(self, feedback_fn):
6327     """Do the actual sleep.
6328
6329     """
6330     if self.op.on_master:
6331       if not utils.TestDelay(self.op.duration):
6332         raise errors.OpExecError("Error during master delay test")
6333     if self.op.on_nodes:
6334       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6335       if not result:
6336         raise errors.OpExecError("Complete failure from rpc call")
6337       for node, node_result in result.items():
6338         node_result.Raise()
6339         if not node_result.data:
6340           raise errors.OpExecError("Failure during rpc call to node %s,"
6341                                    " result: %s" % (node, node_result.data))
6342
6343
6344 class IAllocator(object):
6345   """IAllocator framework.
6346
6347   An IAllocator instance has three sets of attributes:
6348     - cfg that is needed to query the cluster
6349     - input data (all members of the _KEYS class attribute are required)
6350     - four buffer attributes (in|out_data|text), that represent the
6351       input (to the external script) in text and data structure format,
6352       and the output from it, again in two formats
6353     - the result variables from the script (success, info, nodes) for
6354       easy usage
6355
6356   """
6357   _ALLO_KEYS = [
6358     "mem_size", "disks", "disk_template",
6359     "os", "tags", "nics", "vcpus", "hypervisor",
6360     ]
6361   _RELO_KEYS = [
6362     "relocate_from",
6363     ]
6364
6365   def __init__(self, lu, mode, name, **kwargs):
6366     self.lu = lu
6367     # init buffer variables
6368     self.in_text = self.out_text = self.in_data = self.out_data = None
6369     # init all input fields so that pylint is happy
6370     self.mode = mode
6371     self.name = name
6372     self.mem_size = self.disks = self.disk_template = None
6373     self.os = self.tags = self.nics = self.vcpus = None
6374     self.hypervisor = None
6375     self.relocate_from = None
6376     # computed fields
6377     self.required_nodes = None
6378     # init result fields
6379     self.success = self.info = self.nodes = None
6380     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6381       keyset = self._ALLO_KEYS
6382     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6383       keyset = self._RELO_KEYS
6384     else:
6385       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6386                                    " IAllocator" % self.mode)
6387     for key in kwargs:
6388       if key not in keyset:
6389         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6390                                      " IAllocator" % key)
6391       setattr(self, key, kwargs[key])
6392     for key in keyset:
6393       if key not in kwargs:
6394         raise errors.ProgrammerError("Missing input parameter '%s' to"
6395                                      " IAllocator" % key)
6396     self._BuildInputData()
6397
6398   def _ComputeClusterData(self):
6399     """Compute the generic allocator input data.
6400
6401     This is the data that is independent of the actual operation.
6402
6403     """
6404     cfg = self.lu.cfg
6405     cluster_info = cfg.GetClusterInfo()
6406     # cluster data
6407     data = {
6408       "version": 1,
6409       "cluster_name": cfg.GetClusterName(),
6410       "cluster_tags": list(cluster_info.GetTags()),
6411       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6412       # we don't have job IDs
6413       }
6414     iinfo = cfg.GetAllInstancesInfo().values()
6415     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6416
6417     # node data
6418     node_results = {}
6419     node_list = cfg.GetNodeList()
6420
6421     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6422       hypervisor_name = self.hypervisor
6423     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6424       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6425
6426     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6427                                            hypervisor_name)
6428     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6429                        cluster_info.enabled_hypervisors)
6430     for nname, nresult in node_data.items():
6431       # first fill in static (config-based) values
6432       ninfo = cfg.GetNodeInfo(nname)
6433       pnr = {
6434         "tags": list(ninfo.GetTags()),
6435         "primary_ip": ninfo.primary_ip,
6436         "secondary_ip": ninfo.secondary_ip,
6437         "offline": ninfo.offline,
6438         "master_candidate": ninfo.master_candidate,
6439         }
6440
6441       if not ninfo.offline:
6442         nresult.Raise()
6443         if not isinstance(nresult.data, dict):
6444           raise errors.OpExecError("Can't get data for node %s" % nname)
6445         remote_info = nresult.data
6446         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6447                      'vg_size', 'vg_free', 'cpu_total']:
6448           if attr not in remote_info:
6449             raise errors.OpExecError("Node '%s' didn't return attribute"
6450                                      " '%s'" % (nname, attr))
6451           try:
6452             remote_info[attr] = int(remote_info[attr])
6453           except ValueError, err:
6454             raise errors.OpExecError("Node '%s' returned invalid value"
6455                                      " for '%s': %s" % (nname, attr, err))
6456         # compute memory used by primary instances
6457         i_p_mem = i_p_up_mem = 0
6458         for iinfo, beinfo in i_list:
6459           if iinfo.primary_node == nname:
6460             i_p_mem += beinfo[constants.BE_MEMORY]
6461             if iinfo.name not in node_iinfo[nname].data:
6462               i_used_mem = 0
6463             else:
6464               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6465             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6466             remote_info['memory_free'] -= max(0, i_mem_diff)
6467
6468             if iinfo.admin_up:
6469               i_p_up_mem += beinfo[constants.BE_MEMORY]
6470
6471         # compute memory used by instances
6472         pnr_dyn = {
6473           "total_memory": remote_info['memory_total'],
6474           "reserved_memory": remote_info['memory_dom0'],
6475           "free_memory": remote_info['memory_free'],
6476           "total_disk": remote_info['vg_size'],
6477           "free_disk": remote_info['vg_free'],
6478           "total_cpus": remote_info['cpu_total'],
6479           "i_pri_memory": i_p_mem,
6480           "i_pri_up_memory": i_p_up_mem,
6481           }
6482         pnr.update(pnr_dyn)
6483
6484       node_results[nname] = pnr
6485     data["nodes"] = node_results
6486
6487     # instance data
6488     instance_data = {}
6489     for iinfo, beinfo in i_list:
6490       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6491                   for n in iinfo.nics]
6492       pir = {
6493         "tags": list(iinfo.GetTags()),
6494         "admin_up": iinfo.admin_up,
6495         "vcpus": beinfo[constants.BE_VCPUS],
6496         "memory": beinfo[constants.BE_MEMORY],
6497         "os": iinfo.os,
6498         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6499         "nics": nic_data,
6500         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6501         "disk_template": iinfo.disk_template,
6502         "hypervisor": iinfo.hypervisor,
6503         }
6504       instance_data[iinfo.name] = pir
6505
6506     data["instances"] = instance_data
6507
6508     self.in_data = data
6509
6510   def _AddNewInstance(self):
6511     """Add new instance data to allocator structure.
6512
6513     This in combination with _AllocatorGetClusterData will create the
6514     correct structure needed as input for the allocator.
6515
6516     The checks for the completeness of the opcode must have already been
6517     done.
6518
6519     """
6520     data = self.in_data
6521
6522     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6523
6524     if self.disk_template in constants.DTS_NET_MIRROR:
6525       self.required_nodes = 2
6526     else:
6527       self.required_nodes = 1
6528     request = {
6529       "type": "allocate",
6530       "name": self.name,
6531       "disk_template": self.disk_template,
6532       "tags": self.tags,
6533       "os": self.os,
6534       "vcpus": self.vcpus,
6535       "memory": self.mem_size,
6536       "disks": self.disks,
6537       "disk_space_total": disk_space,
6538       "nics": self.nics,
6539       "required_nodes": self.required_nodes,
6540       }
6541     data["request"] = request
6542
6543   def _AddRelocateInstance(self):
6544     """Add relocate instance data to allocator structure.
6545
6546     This in combination with _IAllocatorGetClusterData will create the
6547     correct structure needed as input for the allocator.
6548
6549     The checks for the completeness of the opcode must have already been
6550     done.
6551
6552     """
6553     instance = self.lu.cfg.GetInstanceInfo(self.name)
6554     if instance is None:
6555       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6556                                    " IAllocator" % self.name)
6557
6558     if instance.disk_template not in constants.DTS_NET_MIRROR:
6559       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6560
6561     if len(instance.secondary_nodes) != 1:
6562       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6563
6564     self.required_nodes = 1
6565     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6566     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6567
6568     request = {
6569       "type": "relocate",
6570       "name": self.name,
6571       "disk_space_total": disk_space,
6572       "required_nodes": self.required_nodes,
6573       "relocate_from": self.relocate_from,
6574       }
6575     self.in_data["request"] = request
6576
6577   def _BuildInputData(self):
6578     """Build input data structures.
6579
6580     """
6581     self._ComputeClusterData()
6582
6583     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6584       self._AddNewInstance()
6585     else:
6586       self._AddRelocateInstance()
6587
6588     self.in_text = serializer.Dump(self.in_data)
6589
6590   def Run(self, name, validate=True, call_fn=None):
6591     """Run an instance allocator and return the results.
6592
6593     """
6594     if call_fn is None:
6595       call_fn = self.lu.rpc.call_iallocator_runner
6596     data = self.in_text
6597
6598     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6599     result.Raise()
6600
6601     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6602       raise errors.OpExecError("Invalid result from master iallocator runner")
6603
6604     rcode, stdout, stderr, fail = result.data
6605
6606     if rcode == constants.IARUN_NOTFOUND:
6607       raise errors.OpExecError("Can't find allocator '%s'" % name)
6608     elif rcode == constants.IARUN_FAILURE:
6609       raise errors.OpExecError("Instance allocator call failed: %s,"
6610                                " output: %s" % (fail, stdout+stderr))
6611     self.out_text = stdout
6612     if validate:
6613       self._ValidateResult()
6614
6615   def _ValidateResult(self):
6616     """Process the allocator results.
6617
6618     This will process and if successful save the result in
6619     self.out_data and the other parameters.
6620
6621     """
6622     try:
6623       rdict = serializer.Load(self.out_text)
6624     except Exception, err:
6625       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6626
6627     if not isinstance(rdict, dict):
6628       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6629
6630     for key in "success", "info", "nodes":
6631       if key not in rdict:
6632         raise errors.OpExecError("Can't parse iallocator results:"
6633                                  " missing key '%s'" % key)
6634       setattr(self, key, rdict[key])
6635
6636     if not isinstance(rdict["nodes"], list):
6637       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6638                                " is not a list")
6639     self.out_data = rdict
6640
6641
6642 class LUTestAllocator(NoHooksLU):
6643   """Run allocator tests.
6644
6645   This LU runs the allocator tests
6646
6647   """
6648   _OP_REQP = ["direction", "mode", "name"]
6649
6650   def CheckPrereq(self):
6651     """Check prerequisites.
6652
6653     This checks the opcode parameters depending on the director and mode test.
6654
6655     """
6656     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6657       for attr in ["name", "mem_size", "disks", "disk_template",
6658                    "os", "tags", "nics", "vcpus"]:
6659         if not hasattr(self.op, attr):
6660           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6661                                      attr)
6662       iname = self.cfg.ExpandInstanceName(self.op.name)
6663       if iname is not None:
6664         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6665                                    iname)
6666       if not isinstance(self.op.nics, list):
6667         raise errors.OpPrereqError("Invalid parameter 'nics'")
6668       for row in self.op.nics:
6669         if (not isinstance(row, dict) or
6670             "mac" not in row or
6671             "ip" not in row or
6672             "bridge" not in row):
6673           raise errors.OpPrereqError("Invalid contents of the"
6674                                      " 'nics' parameter")
6675       if not isinstance(self.op.disks, list):
6676         raise errors.OpPrereqError("Invalid parameter 'disks'")
6677       for row in self.op.disks:
6678         if (not isinstance(row, dict) or
6679             "size" not in row or
6680             not isinstance(row["size"], int) or
6681             "mode" not in row or
6682             row["mode"] not in ['r', 'w']):
6683           raise errors.OpPrereqError("Invalid contents of the"
6684                                      " 'disks' parameter")
6685       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6686         self.op.hypervisor = self.cfg.GetHypervisorType()
6687     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6688       if not hasattr(self.op, "name"):
6689         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6690       fname = self.cfg.ExpandInstanceName(self.op.name)
6691       if fname is None:
6692         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6693                                    self.op.name)
6694       self.op.name = fname
6695       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6696     else:
6697       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6698                                  self.op.mode)
6699
6700     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6701       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6702         raise errors.OpPrereqError("Missing allocator name")
6703     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6704       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6705                                  self.op.direction)
6706
6707   def Exec(self, feedback_fn):
6708     """Run the allocator test.
6709
6710     """
6711     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6712       ial = IAllocator(self,
6713                        mode=self.op.mode,
6714                        name=self.op.name,
6715                        mem_size=self.op.mem_size,
6716                        disks=self.op.disks,
6717                        disk_template=self.op.disk_template,
6718                        os=self.op.os,
6719                        tags=self.op.tags,
6720                        nics=self.op.nics,
6721                        vcpus=self.op.vcpus,
6722                        hypervisor=self.op.hypervisor,
6723                        )
6724     else:
6725       ial = IAllocator(self,
6726                        mode=self.op.mode,
6727                        name=self.op.name,
6728                        relocate_from=list(self.relocate_from),
6729                        )
6730
6731     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6732       result = ial.in_text
6733     else:
6734       ial.Run(self.op.allocator, validate=False)
6735       result = ial.out_text
6736     return result