Give a sane permission to the known_host file
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396   return wanted
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _CheckBooleanOpField(op, name):
419   """Validates boolean opcode parameters.
420
421   This will ensure that an opcode parameter is either a boolean value,
422   or None (but that it always exists).
423
424   """
425   val = getattr(op, name, None)
426   if not (val is None or isinstance(val, bool)):
427     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428                                (name, str(val)))
429   setattr(op, name, val)
430
431
432 def _CheckNodeOnline(lu, node):
433   """Ensure that a given node is online.
434
435   @param lu: the LU on behalf of which we make the check
436   @param node: the node to check
437   @raise errors.OpPrereqError: if the nodes is offline
438
439   """
440   if lu.cfg.GetNodeInfo(node).offline:
441     raise errors.OpPrereqError("Can't use offline node %s" % node)
442
443
444 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445                           memory, vcpus, nics):
446   """Builds instance related env variables for hooks
447
448   This builds the hook environment from individual variables.
449
450   @type name: string
451   @param name: the name of the instance
452   @type primary_node: string
453   @param primary_node: the name of the instance's primary node
454   @type secondary_nodes: list
455   @param secondary_nodes: list of secondary nodes as strings
456   @type os_type: string
457   @param os_type: the name of the instance's OS
458   @type status: boolean
459   @param status: the should_run status of the instance
460   @type memory: string
461   @param memory: the memory size of the instance
462   @type vcpus: string
463   @param vcpus: the count of VCPUs the instance has
464   @type nics: list
465   @param nics: list of tuples (ip, bridge, mac) representing
466       the NICs the instance  has
467   @rtype: dict
468   @return: the hook environment for this instance
469
470   """
471   if status:
472     str_status = "up"
473   else:
474     str_status = "down"
475   env = {
476     "OP_TARGET": name,
477     "INSTANCE_NAME": name,
478     "INSTANCE_PRIMARY": primary_node,
479     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
480     "INSTANCE_OS_TYPE": os_type,
481     "INSTANCE_STATUS": str_status,
482     "INSTANCE_MEMORY": memory,
483     "INSTANCE_VCPUS": vcpus,
484   }
485
486   if nics:
487     nic_count = len(nics)
488     for idx, (ip, bridge, mac) in enumerate(nics):
489       if ip is None:
490         ip = ""
491       env["INSTANCE_NIC%d_IP" % idx] = ip
492       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
493       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
494   else:
495     nic_count = 0
496
497   env["INSTANCE_NIC_COUNT"] = nic_count
498
499   return env
500
501
502 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
503   """Builds instance related env variables for hooks from an object.
504
505   @type lu: L{LogicalUnit}
506   @param lu: the logical unit on whose behalf we execute
507   @type instance: L{objects.Instance}
508   @param instance: the instance for which we should build the
509       environment
510   @type override: dict
511   @param override: dictionary with key/values that will override
512       our values
513   @rtype: dict
514   @return: the hook environment dictionary
515
516   """
517   bep = lu.cfg.GetClusterInfo().FillBE(instance)
518   args = {
519     'name': instance.name,
520     'primary_node': instance.primary_node,
521     'secondary_nodes': instance.secondary_nodes,
522     'os_type': instance.os,
523     'status': instance.admin_up,
524     'memory': bep[constants.BE_MEMORY],
525     'vcpus': bep[constants.BE_VCPUS],
526     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
527   }
528   if override:
529     args.update(override)
530   return _BuildInstanceHookEnv(**args)
531
532
533 def _AdjustCandidatePool(lu):
534   """Adjust the candidate pool after node operations.
535
536   """
537   mod_list = lu.cfg.MaintainCandidatePool()
538   if mod_list:
539     lu.LogInfo("Promoted nodes to master candidate role: %s",
540                ", ".join(node.name for node in mod_list))
541     for name in mod_list:
542       lu.context.ReaddNode(name)
543   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
544   if mc_now > mc_max:
545     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
546                (mc_now, mc_max))
547
548
549 def _CheckInstanceBridgesExist(lu, instance):
550   """Check that the brigdes needed by an instance exist.
551
552   """
553   # check bridges existance
554   brlist = [nic.bridge for nic in instance.nics]
555   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
556   result.Raise()
557   if not result.data:
558     raise errors.OpPrereqError("One or more target bridges %s does not"
559                                " exist on destination node '%s'" %
560                                (brlist, instance.primary_node))
561
562
563 class LUDestroyCluster(NoHooksLU):
564   """Logical unit for destroying the cluster.
565
566   """
567   _OP_REQP = []
568
569   def CheckPrereq(self):
570     """Check prerequisites.
571
572     This checks whether the cluster is empty.
573
574     Any errors are signalled by raising errors.OpPrereqError.
575
576     """
577     master = self.cfg.GetMasterNode()
578
579     nodelist = self.cfg.GetNodeList()
580     if len(nodelist) != 1 or nodelist[0] != master:
581       raise errors.OpPrereqError("There are still %d node(s) in"
582                                  " this cluster." % (len(nodelist) - 1))
583     instancelist = self.cfg.GetInstanceList()
584     if instancelist:
585       raise errors.OpPrereqError("There are still %d instance(s) in"
586                                  " this cluster." % len(instancelist))
587
588   def Exec(self, feedback_fn):
589     """Destroys the cluster.
590
591     """
592     master = self.cfg.GetMasterNode()
593     result = self.rpc.call_node_stop_master(master, False)
594     result.Raise()
595     if not result.data:
596       raise errors.OpExecError("Could not disable the master role")
597     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
598     utils.CreateBackup(priv_key)
599     utils.CreateBackup(pub_key)
600     return master
601
602
603 class LUVerifyCluster(LogicalUnit):
604   """Verifies the cluster status.
605
606   """
607   HPATH = "cluster-verify"
608   HTYPE = constants.HTYPE_CLUSTER
609   _OP_REQP = ["skip_checks"]
610   REQ_BGL = False
611
612   def ExpandNames(self):
613     self.needed_locks = {
614       locking.LEVEL_NODE: locking.ALL_SET,
615       locking.LEVEL_INSTANCE: locking.ALL_SET,
616     }
617     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
618
619   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
620                   node_result, feedback_fn, master_files,
621                   drbd_map):
622     """Run multiple tests against a node.
623
624     Test list:
625
626       - compares ganeti version
627       - checks vg existance and size > 20G
628       - checks config file checksum
629       - checks ssh to other nodes
630
631     @type nodeinfo: L{objects.Node}
632     @param nodeinfo: the node to check
633     @param file_list: required list of files
634     @param local_cksum: dictionary of local files and their checksums
635     @param node_result: the results from the node
636     @param feedback_fn: function used to accumulate results
637     @param master_files: list of files that only masters should have
638     @param drbd_map: the useddrbd minors for this node, in
639         form of minor: (instance, must_exist) which correspond to instances
640         and their running status
641
642     """
643     node = nodeinfo.name
644
645     # main result, node_result should be a non-empty dict
646     if not node_result or not isinstance(node_result, dict):
647       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
648       return True
649
650     # compares ganeti version
651     local_version = constants.PROTOCOL_VERSION
652     remote_version = node_result.get('version', None)
653     if not (remote_version and isinstance(remote_version, (list, tuple)) and
654             len(remote_version) == 2):
655       feedback_fn("  - ERROR: connection to %s failed" % (node))
656       return True
657
658     if local_version != remote_version[0]:
659       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
660                   " node %s %s" % (local_version, node, remote_version[0]))
661       return True
662
663     # node seems compatible, we can actually try to look into its results
664
665     bad = False
666
667     # full package version
668     if constants.RELEASE_VERSION != remote_version[1]:
669       feedback_fn("  - WARNING: software version mismatch: master %s,"
670                   " node %s %s" %
671                   (constants.RELEASE_VERSION, node, remote_version[1]))
672
673     # checks vg existence and size > 20G
674
675     vglist = node_result.get(constants.NV_VGLIST, None)
676     if not vglist:
677       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
678                       (node,))
679       bad = True
680     else:
681       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
682                                             constants.MIN_VG_SIZE)
683       if vgstatus:
684         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
685         bad = True
686
687     # checks config file checksum
688
689     remote_cksum = node_result.get(constants.NV_FILELIST, None)
690     if not isinstance(remote_cksum, dict):
691       bad = True
692       feedback_fn("  - ERROR: node hasn't returned file checksum data")
693     else:
694       for file_name in file_list:
695         node_is_mc = nodeinfo.master_candidate
696         must_have_file = file_name not in master_files
697         if file_name not in remote_cksum:
698           if node_is_mc or must_have_file:
699             bad = True
700             feedback_fn("  - ERROR: file '%s' missing" % file_name)
701         elif remote_cksum[file_name] != local_cksum[file_name]:
702           if node_is_mc or must_have_file:
703             bad = True
704             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
705           else:
706             # not candidate and this is not a must-have file
707             bad = True
708             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
709                         " '%s'" % file_name)
710         else:
711           # all good, except non-master/non-must have combination
712           if not node_is_mc and not must_have_file:
713             feedback_fn("  - ERROR: file '%s' should not exist on non master"
714                         " candidates" % file_name)
715
716     # checks ssh to any
717
718     if constants.NV_NODELIST not in node_result:
719       bad = True
720       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
721     else:
722       if node_result[constants.NV_NODELIST]:
723         bad = True
724         for node in node_result[constants.NV_NODELIST]:
725           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
726                           (node, node_result[constants.NV_NODELIST][node]))
727
728     if constants.NV_NODENETTEST not in node_result:
729       bad = True
730       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
731     else:
732       if node_result[constants.NV_NODENETTEST]:
733         bad = True
734         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
735         for node in nlist:
736           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
737                           (node, node_result[constants.NV_NODENETTEST][node]))
738
739     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
740     if isinstance(hyp_result, dict):
741       for hv_name, hv_result in hyp_result.iteritems():
742         if hv_result is not None:
743           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
744                       (hv_name, hv_result))
745
746     # check used drbd list
747     used_minors = node_result.get(constants.NV_DRBDLIST, [])
748     for minor, (iname, must_exist) in drbd_map.items():
749       if minor not in used_minors and must_exist:
750         feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
751                     (minor, iname))
752         bad = True
753     for minor in used_minors:
754       if minor not in drbd_map:
755         feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
756         bad = True
757
758     return bad
759
760   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
761                       node_instance, feedback_fn, n_offline):
762     """Verify an instance.
763
764     This function checks to see if the required block devices are
765     available on the instance's node.
766
767     """
768     bad = False
769
770     node_current = instanceconfig.primary_node
771
772     node_vol_should = {}
773     instanceconfig.MapLVsByNode(node_vol_should)
774
775     for node in node_vol_should:
776       if node in n_offline:
777         # ignore missing volumes on offline nodes
778         continue
779       for volume in node_vol_should[node]:
780         if node not in node_vol_is or volume not in node_vol_is[node]:
781           feedback_fn("  - ERROR: volume %s missing on node %s" %
782                           (volume, node))
783           bad = True
784
785     if instanceconfig.admin_up:
786       if ((node_current not in node_instance or
787           not instance in node_instance[node_current]) and
788           node_current not in n_offline):
789         feedback_fn("  - ERROR: instance %s not running on node %s" %
790                         (instance, node_current))
791         bad = True
792
793     for node in node_instance:
794       if (not node == node_current):
795         if instance in node_instance[node]:
796           feedback_fn("  - ERROR: instance %s should not run on node %s" %
797                           (instance, node))
798           bad = True
799
800     return bad
801
802   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
803     """Verify if there are any unknown volumes in the cluster.
804
805     The .os, .swap and backup volumes are ignored. All other volumes are
806     reported as unknown.
807
808     """
809     bad = False
810
811     for node in node_vol_is:
812       for volume in node_vol_is[node]:
813         if node not in node_vol_should or volume not in node_vol_should[node]:
814           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
815                       (volume, node))
816           bad = True
817     return bad
818
819   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
820     """Verify the list of running instances.
821
822     This checks what instances are running but unknown to the cluster.
823
824     """
825     bad = False
826     for node in node_instance:
827       for runninginstance in node_instance[node]:
828         if runninginstance not in instancelist:
829           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
830                           (runninginstance, node))
831           bad = True
832     return bad
833
834   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
835     """Verify N+1 Memory Resilience.
836
837     Check that if one single node dies we can still start all the instances it
838     was primary for.
839
840     """
841     bad = False
842
843     for node, nodeinfo in node_info.iteritems():
844       # This code checks that every node which is now listed as secondary has
845       # enough memory to host all instances it is supposed to should a single
846       # other node in the cluster fail.
847       # FIXME: not ready for failover to an arbitrary node
848       # FIXME: does not support file-backed instances
849       # WARNING: we currently take into account down instances as well as up
850       # ones, considering that even if they're down someone might want to start
851       # them even in the event of a node failure.
852       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
853         needed_mem = 0
854         for instance in instances:
855           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
856           if bep[constants.BE_AUTO_BALANCE]:
857             needed_mem += bep[constants.BE_MEMORY]
858         if nodeinfo['mfree'] < needed_mem:
859           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
860                       " failovers should node %s fail" % (node, prinode))
861           bad = True
862     return bad
863
864   def CheckPrereq(self):
865     """Check prerequisites.
866
867     Transform the list of checks we're going to skip into a set and check that
868     all its members are valid.
869
870     """
871     self.skip_set = frozenset(self.op.skip_checks)
872     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
873       raise errors.OpPrereqError("Invalid checks to be skipped specified")
874
875   def BuildHooksEnv(self):
876     """Build hooks env.
877
878     Cluster-Verify hooks just rone in the post phase and their failure makes
879     the output be logged in the verify output and the verification to fail.
880
881     """
882     all_nodes = self.cfg.GetNodeList()
883     # TODO: populate the environment with useful information for verify hooks
884     env = {}
885     return env, [], all_nodes
886
887   def Exec(self, feedback_fn):
888     """Verify integrity of cluster, performing various test on nodes.
889
890     """
891     bad = False
892     feedback_fn("* Verifying global settings")
893     for msg in self.cfg.VerifyConfig():
894       feedback_fn("  - ERROR: %s" % msg)
895
896     vg_name = self.cfg.GetVGName()
897     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
898     nodelist = utils.NiceSort(self.cfg.GetNodeList())
899     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
900     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
901     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
902                         for iname in instancelist)
903     i_non_redundant = [] # Non redundant instances
904     i_non_a_balanced = [] # Non auto-balanced instances
905     n_offline = [] # List of offline nodes
906     node_volume = {}
907     node_instance = {}
908     node_info = {}
909     instance_cfg = {}
910
911     # FIXME: verify OS list
912     # do local checksums
913     master_files = [constants.CLUSTER_CONF_FILE]
914
915     file_names = ssconf.SimpleStore().GetFileList()
916     file_names.append(constants.SSL_CERT_FILE)
917     file_names.append(constants.RAPI_CERT_FILE)
918     file_names.extend(master_files)
919
920     local_checksums = utils.FingerprintFiles(file_names)
921
922     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
923     node_verify_param = {
924       constants.NV_FILELIST: file_names,
925       constants.NV_NODELIST: [node.name for node in nodeinfo
926                               if not node.offline],
927       constants.NV_HYPERVISOR: hypervisors,
928       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
929                                   node.secondary_ip) for node in nodeinfo
930                                  if not node.offline],
931       constants.NV_LVLIST: vg_name,
932       constants.NV_INSTANCELIST: hypervisors,
933       constants.NV_VGLIST: None,
934       constants.NV_VERSION: None,
935       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
936       constants.NV_DRBDLIST: None,
937       }
938     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
939                                            self.cfg.GetClusterName())
940
941     cluster = self.cfg.GetClusterInfo()
942     master_node = self.cfg.GetMasterNode()
943     all_drbd_map = self.cfg.ComputeDRBDMap()
944
945     for node_i in nodeinfo:
946       node = node_i.name
947       nresult = all_nvinfo[node].data
948
949       if node_i.offline:
950         feedback_fn("* Skipping offline node %s" % (node,))
951         n_offline.append(node)
952         continue
953
954       if node == master_node:
955         ntype = "master"
956       elif node_i.master_candidate:
957         ntype = "master candidate"
958       else:
959         ntype = "regular"
960       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
961
962       if all_nvinfo[node].failed or not isinstance(nresult, dict):
963         feedback_fn("  - ERROR: connection to %s failed" % (node,))
964         bad = True
965         continue
966
967       node_drbd = {}
968       for minor, instance in all_drbd_map[node].items():
969         instance = instanceinfo[instance]
970         node_drbd[minor] = (instance.name, instance.admin_up)
971       result = self._VerifyNode(node_i, file_names, local_checksums,
972                                 nresult, feedback_fn, master_files,
973                                 node_drbd)
974       bad = bad or result
975
976       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
977       if isinstance(lvdata, basestring):
978         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
979                     (node, lvdata.encode('string_escape')))
980         bad = True
981         node_volume[node] = {}
982       elif not isinstance(lvdata, dict):
983         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
984         bad = True
985         continue
986       else:
987         node_volume[node] = lvdata
988
989       # node_instance
990       idata = nresult.get(constants.NV_INSTANCELIST, None)
991       if not isinstance(idata, list):
992         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
993                     (node,))
994         bad = True
995         continue
996
997       node_instance[node] = idata
998
999       # node_info
1000       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1001       if not isinstance(nodeinfo, dict):
1002         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1003         bad = True
1004         continue
1005
1006       try:
1007         node_info[node] = {
1008           "mfree": int(nodeinfo['memory_free']),
1009           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
1010           "pinst": [],
1011           "sinst": [],
1012           # dictionary holding all instances this node is secondary for,
1013           # grouped by their primary node. Each key is a cluster node, and each
1014           # value is a list of instances which have the key as primary and the
1015           # current node as secondary.  this is handy to calculate N+1 memory
1016           # availability if you can only failover from a primary to its
1017           # secondary.
1018           "sinst-by-pnode": {},
1019         }
1020       except ValueError:
1021         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1022         bad = True
1023         continue
1024
1025     node_vol_should = {}
1026
1027     for instance in instancelist:
1028       feedback_fn("* Verifying instance %s" % instance)
1029       inst_config = instanceinfo[instance]
1030       result =  self._VerifyInstance(instance, inst_config, node_volume,
1031                                      node_instance, feedback_fn, n_offline)
1032       bad = bad or result
1033       inst_nodes_offline = []
1034
1035       inst_config.MapLVsByNode(node_vol_should)
1036
1037       instance_cfg[instance] = inst_config
1038
1039       pnode = inst_config.primary_node
1040       if pnode in node_info:
1041         node_info[pnode]['pinst'].append(instance)
1042       elif pnode not in n_offline:
1043         feedback_fn("  - ERROR: instance %s, connection to primary node"
1044                     " %s failed" % (instance, pnode))
1045         bad = True
1046
1047       if pnode in n_offline:
1048         inst_nodes_offline.append(pnode)
1049
1050       # If the instance is non-redundant we cannot survive losing its primary
1051       # node, so we are not N+1 compliant. On the other hand we have no disk
1052       # templates with more than one secondary so that situation is not well
1053       # supported either.
1054       # FIXME: does not support file-backed instances
1055       if len(inst_config.secondary_nodes) == 0:
1056         i_non_redundant.append(instance)
1057       elif len(inst_config.secondary_nodes) > 1:
1058         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1059                     % instance)
1060
1061       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1062         i_non_a_balanced.append(instance)
1063
1064       for snode in inst_config.secondary_nodes:
1065         if snode in node_info:
1066           node_info[snode]['sinst'].append(instance)
1067           if pnode not in node_info[snode]['sinst-by-pnode']:
1068             node_info[snode]['sinst-by-pnode'][pnode] = []
1069           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1070         elif snode not in n_offline:
1071           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1072                       " %s failed" % (instance, snode))
1073           bad = True
1074         if snode in n_offline:
1075           inst_nodes_offline.append(snode)
1076
1077       if inst_nodes_offline:
1078         # warn that the instance lives on offline nodes, and set bad=True
1079         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1080                     ", ".join(inst_nodes_offline))
1081         bad = True
1082
1083     feedback_fn("* Verifying orphan volumes")
1084     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1085                                        feedback_fn)
1086     bad = bad or result
1087
1088     feedback_fn("* Verifying remaining instances")
1089     result = self._VerifyOrphanInstances(instancelist, node_instance,
1090                                          feedback_fn)
1091     bad = bad or result
1092
1093     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1094       feedback_fn("* Verifying N+1 Memory redundancy")
1095       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1096       bad = bad or result
1097
1098     feedback_fn("* Other Notes")
1099     if i_non_redundant:
1100       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1101                   % len(i_non_redundant))
1102
1103     if i_non_a_balanced:
1104       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1105                   % len(i_non_a_balanced))
1106
1107     if n_offline:
1108       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1109
1110     return not bad
1111
1112   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1113     """Analize the post-hooks' result
1114
1115     This method analyses the hook result, handles it, and sends some
1116     nicely-formatted feedback back to the user.
1117
1118     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1119         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1120     @param hooks_results: the results of the multi-node hooks rpc call
1121     @param feedback_fn: function used send feedback back to the caller
1122     @param lu_result: previous Exec result
1123     @return: the new Exec result, based on the previous result
1124         and hook results
1125
1126     """
1127     # We only really run POST phase hooks, and are only interested in
1128     # their results
1129     if phase == constants.HOOKS_PHASE_POST:
1130       # Used to change hooks' output to proper indentation
1131       indent_re = re.compile('^', re.M)
1132       feedback_fn("* Hooks Results")
1133       if not hooks_results:
1134         feedback_fn("  - ERROR: general communication failure")
1135         lu_result = 1
1136       else:
1137         for node_name in hooks_results:
1138           show_node_header = True
1139           res = hooks_results[node_name]
1140           if res.failed or res.data is False or not isinstance(res.data, list):
1141             if res.offline:
1142               # no need to warn or set fail return value
1143               continue
1144             feedback_fn("    Communication failure in hooks execution")
1145             lu_result = 1
1146             continue
1147           for script, hkr, output in res.data:
1148             if hkr == constants.HKR_FAIL:
1149               # The node header is only shown once, if there are
1150               # failing hooks on that node
1151               if show_node_header:
1152                 feedback_fn("  Node %s:" % node_name)
1153                 show_node_header = False
1154               feedback_fn("    ERROR: Script %s failed, output:" % script)
1155               output = indent_re.sub('      ', output)
1156               feedback_fn("%s" % output)
1157               lu_result = 1
1158
1159       return lu_result
1160
1161
1162 class LUVerifyDisks(NoHooksLU):
1163   """Verifies the cluster disks status.
1164
1165   """
1166   _OP_REQP = []
1167   REQ_BGL = False
1168
1169   def ExpandNames(self):
1170     self.needed_locks = {
1171       locking.LEVEL_NODE: locking.ALL_SET,
1172       locking.LEVEL_INSTANCE: locking.ALL_SET,
1173     }
1174     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1175
1176   def CheckPrereq(self):
1177     """Check prerequisites.
1178
1179     This has no prerequisites.
1180
1181     """
1182     pass
1183
1184   def Exec(self, feedback_fn):
1185     """Verify integrity of cluster disks.
1186
1187     """
1188     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1189
1190     vg_name = self.cfg.GetVGName()
1191     nodes = utils.NiceSort(self.cfg.GetNodeList())
1192     instances = [self.cfg.GetInstanceInfo(name)
1193                  for name in self.cfg.GetInstanceList()]
1194
1195     nv_dict = {}
1196     for inst in instances:
1197       inst_lvs = {}
1198       if (not inst.admin_up or
1199           inst.disk_template not in constants.DTS_NET_MIRROR):
1200         continue
1201       inst.MapLVsByNode(inst_lvs)
1202       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1203       for node, vol_list in inst_lvs.iteritems():
1204         for vol in vol_list:
1205           nv_dict[(node, vol)] = inst
1206
1207     if not nv_dict:
1208       return result
1209
1210     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1211
1212     to_act = set()
1213     for node in nodes:
1214       # node_volume
1215       lvs = node_lvs[node]
1216       if lvs.failed:
1217         if not lvs.offline:
1218           self.LogWarning("Connection to node %s failed: %s" %
1219                           (node, lvs.data))
1220         continue
1221       lvs = lvs.data
1222       if isinstance(lvs, basestring):
1223         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1224         res_nlvm[node] = lvs
1225       elif not isinstance(lvs, dict):
1226         logging.warning("Connection to node %s failed or invalid data"
1227                         " returned", node)
1228         res_nodes.append(node)
1229         continue
1230
1231       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1232         inst = nv_dict.pop((node, lv_name), None)
1233         if (not lv_online and inst is not None
1234             and inst.name not in res_instances):
1235           res_instances.append(inst.name)
1236
1237     # any leftover items in nv_dict are missing LVs, let's arrange the
1238     # data better
1239     for key, inst in nv_dict.iteritems():
1240       if inst.name not in res_missing:
1241         res_missing[inst.name] = []
1242       res_missing[inst.name].append(key)
1243
1244     return result
1245
1246
1247 class LURenameCluster(LogicalUnit):
1248   """Rename the cluster.
1249
1250   """
1251   HPATH = "cluster-rename"
1252   HTYPE = constants.HTYPE_CLUSTER
1253   _OP_REQP = ["name"]
1254
1255   def BuildHooksEnv(self):
1256     """Build hooks env.
1257
1258     """
1259     env = {
1260       "OP_TARGET": self.cfg.GetClusterName(),
1261       "NEW_NAME": self.op.name,
1262       }
1263     mn = self.cfg.GetMasterNode()
1264     return env, [mn], [mn]
1265
1266   def CheckPrereq(self):
1267     """Verify that the passed name is a valid one.
1268
1269     """
1270     hostname = utils.HostInfo(self.op.name)
1271
1272     new_name = hostname.name
1273     self.ip = new_ip = hostname.ip
1274     old_name = self.cfg.GetClusterName()
1275     old_ip = self.cfg.GetMasterIP()
1276     if new_name == old_name and new_ip == old_ip:
1277       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1278                                  " cluster has changed")
1279     if new_ip != old_ip:
1280       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1281         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1282                                    " reachable on the network. Aborting." %
1283                                    new_ip)
1284
1285     self.op.name = new_name
1286
1287   def Exec(self, feedback_fn):
1288     """Rename the cluster.
1289
1290     """
1291     clustername = self.op.name
1292     ip = self.ip
1293
1294     # shutdown the master IP
1295     master = self.cfg.GetMasterNode()
1296     result = self.rpc.call_node_stop_master(master, False)
1297     if result.failed or not result.data:
1298       raise errors.OpExecError("Could not disable the master role")
1299
1300     try:
1301       cluster = self.cfg.GetClusterInfo()
1302       cluster.cluster_name = clustername
1303       cluster.master_ip = ip
1304       self.cfg.Update(cluster)
1305
1306       # update the known hosts file
1307       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1308       node_list = self.cfg.GetNodeList()
1309       try:
1310         node_list.remove(master)
1311       except ValueError:
1312         pass
1313       result = self.rpc.call_upload_file(node_list,
1314                                          constants.SSH_KNOWN_HOSTS_FILE)
1315       for to_node, to_result in result.iteritems():
1316         if to_result.failed or not to_result.data:
1317           logging.error("Copy of file %s to node %s failed",
1318                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1319
1320     finally:
1321       result = self.rpc.call_node_start_master(master, False)
1322       if result.failed or not result.data:
1323         self.LogWarning("Could not re-enable the master role on"
1324                         " the master, please restart manually.")
1325
1326
1327 def _RecursiveCheckIfLVMBased(disk):
1328   """Check if the given disk or its children are lvm-based.
1329
1330   @type disk: L{objects.Disk}
1331   @param disk: the disk to check
1332   @rtype: booleean
1333   @return: boolean indicating whether a LD_LV dev_type was found or not
1334
1335   """
1336   if disk.children:
1337     for chdisk in disk.children:
1338       if _RecursiveCheckIfLVMBased(chdisk):
1339         return True
1340   return disk.dev_type == constants.LD_LV
1341
1342
1343 class LUSetClusterParams(LogicalUnit):
1344   """Change the parameters of the cluster.
1345
1346   """
1347   HPATH = "cluster-modify"
1348   HTYPE = constants.HTYPE_CLUSTER
1349   _OP_REQP = []
1350   REQ_BGL = False
1351
1352   def CheckParameters(self):
1353     """Check parameters
1354
1355     """
1356     if not hasattr(self.op, "candidate_pool_size"):
1357       self.op.candidate_pool_size = None
1358     if self.op.candidate_pool_size is not None:
1359       try:
1360         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1361       except ValueError, err:
1362         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1363                                    str(err))
1364       if self.op.candidate_pool_size < 1:
1365         raise errors.OpPrereqError("At least one master candidate needed")
1366
1367   def ExpandNames(self):
1368     # FIXME: in the future maybe other cluster params won't require checking on
1369     # all nodes to be modified.
1370     self.needed_locks = {
1371       locking.LEVEL_NODE: locking.ALL_SET,
1372     }
1373     self.share_locks[locking.LEVEL_NODE] = 1
1374
1375   def BuildHooksEnv(self):
1376     """Build hooks env.
1377
1378     """
1379     env = {
1380       "OP_TARGET": self.cfg.GetClusterName(),
1381       "NEW_VG_NAME": self.op.vg_name,
1382       }
1383     mn = self.cfg.GetMasterNode()
1384     return env, [mn], [mn]
1385
1386   def CheckPrereq(self):
1387     """Check prerequisites.
1388
1389     This checks whether the given params don't conflict and
1390     if the given volume group is valid.
1391
1392     """
1393     # FIXME: This only works because there is only one parameter that can be
1394     # changed or removed.
1395     if self.op.vg_name is not None and not self.op.vg_name:
1396       instances = self.cfg.GetAllInstancesInfo().values()
1397       for inst in instances:
1398         for disk in inst.disks:
1399           if _RecursiveCheckIfLVMBased(disk):
1400             raise errors.OpPrereqError("Cannot disable lvm storage while"
1401                                        " lvm-based instances exist")
1402
1403     node_list = self.acquired_locks[locking.LEVEL_NODE]
1404
1405     # if vg_name not None, checks given volume group on all nodes
1406     if self.op.vg_name:
1407       vglist = self.rpc.call_vg_list(node_list)
1408       for node in node_list:
1409         if vglist[node].failed:
1410           # ignoring down node
1411           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1412           continue
1413         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1414                                               self.op.vg_name,
1415                                               constants.MIN_VG_SIZE)
1416         if vgstatus:
1417           raise errors.OpPrereqError("Error on node '%s': %s" %
1418                                      (node, vgstatus))
1419
1420     self.cluster = cluster = self.cfg.GetClusterInfo()
1421     # validate beparams changes
1422     if self.op.beparams:
1423       utils.CheckBEParams(self.op.beparams)
1424       self.new_beparams = cluster.FillDict(
1425         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1426
1427     # hypervisor list/parameters
1428     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1429     if self.op.hvparams:
1430       if not isinstance(self.op.hvparams, dict):
1431         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1432       for hv_name, hv_dict in self.op.hvparams.items():
1433         if hv_name not in self.new_hvparams:
1434           self.new_hvparams[hv_name] = hv_dict
1435         else:
1436           self.new_hvparams[hv_name].update(hv_dict)
1437
1438     if self.op.enabled_hypervisors is not None:
1439       self.hv_list = self.op.enabled_hypervisors
1440     else:
1441       self.hv_list = cluster.enabled_hypervisors
1442
1443     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1444       # either the enabled list has changed, or the parameters have, validate
1445       for hv_name, hv_params in self.new_hvparams.items():
1446         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1447             (self.op.enabled_hypervisors and
1448              hv_name in self.op.enabled_hypervisors)):
1449           # either this is a new hypervisor, or its parameters have changed
1450           hv_class = hypervisor.GetHypervisor(hv_name)
1451           hv_class.CheckParameterSyntax(hv_params)
1452           _CheckHVParams(self, node_list, hv_name, hv_params)
1453
1454   def Exec(self, feedback_fn):
1455     """Change the parameters of the cluster.
1456
1457     """
1458     if self.op.vg_name is not None:
1459       if self.op.vg_name != self.cfg.GetVGName():
1460         self.cfg.SetVGName(self.op.vg_name)
1461       else:
1462         feedback_fn("Cluster LVM configuration already in desired"
1463                     " state, not changing")
1464     if self.op.hvparams:
1465       self.cluster.hvparams = self.new_hvparams
1466     if self.op.enabled_hypervisors is not None:
1467       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1468     if self.op.beparams:
1469       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1470     if self.op.candidate_pool_size is not None:
1471       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1472
1473     self.cfg.Update(self.cluster)
1474
1475     # we want to update nodes after the cluster so that if any errors
1476     # happen, we have recorded and saved the cluster info
1477     if self.op.candidate_pool_size is not None:
1478       _AdjustCandidatePool(self)
1479
1480
1481 class LURedistributeConfig(NoHooksLU):
1482   """Force the redistribution of cluster configuration.
1483
1484   This is a very simple LU.
1485
1486   """
1487   _OP_REQP = []
1488   REQ_BGL = False
1489
1490   def ExpandNames(self):
1491     self.needed_locks = {
1492       locking.LEVEL_NODE: locking.ALL_SET,
1493     }
1494     self.share_locks[locking.LEVEL_NODE] = 1
1495
1496   def CheckPrereq(self):
1497     """Check prerequisites.
1498
1499     """
1500
1501   def Exec(self, feedback_fn):
1502     """Redistribute the configuration.
1503
1504     """
1505     self.cfg.Update(self.cfg.GetClusterInfo())
1506
1507
1508 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1509   """Sleep and poll for an instance's disk to sync.
1510
1511   """
1512   if not instance.disks:
1513     return True
1514
1515   if not oneshot:
1516     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1517
1518   node = instance.primary_node
1519
1520   for dev in instance.disks:
1521     lu.cfg.SetDiskID(dev, node)
1522
1523   retries = 0
1524   while True:
1525     max_time = 0
1526     done = True
1527     cumul_degraded = False
1528     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1529     if rstats.failed or not rstats.data:
1530       lu.LogWarning("Can't get any data from node %s", node)
1531       retries += 1
1532       if retries >= 10:
1533         raise errors.RemoteError("Can't contact node %s for mirror data,"
1534                                  " aborting." % node)
1535       time.sleep(6)
1536       continue
1537     rstats = rstats.data
1538     retries = 0
1539     for i, mstat in enumerate(rstats):
1540       if mstat is None:
1541         lu.LogWarning("Can't compute data for node %s/%s",
1542                            node, instance.disks[i].iv_name)
1543         continue
1544       # we ignore the ldisk parameter
1545       perc_done, est_time, is_degraded, _ = mstat
1546       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1547       if perc_done is not None:
1548         done = False
1549         if est_time is not None:
1550           rem_time = "%d estimated seconds remaining" % est_time
1551           max_time = est_time
1552         else:
1553           rem_time = "no time estimate"
1554         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1555                         (instance.disks[i].iv_name, perc_done, rem_time))
1556     if done or oneshot:
1557       break
1558
1559     time.sleep(min(60, max_time))
1560
1561   if done:
1562     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1563   return not cumul_degraded
1564
1565
1566 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1567   """Check that mirrors are not degraded.
1568
1569   The ldisk parameter, if True, will change the test from the
1570   is_degraded attribute (which represents overall non-ok status for
1571   the device(s)) to the ldisk (representing the local storage status).
1572
1573   """
1574   lu.cfg.SetDiskID(dev, node)
1575   if ldisk:
1576     idx = 6
1577   else:
1578     idx = 5
1579
1580   result = True
1581   if on_primary or dev.AssembleOnSecondary():
1582     rstats = lu.rpc.call_blockdev_find(node, dev)
1583     if rstats.failed or not rstats.data:
1584       logging.warning("Node %s: disk degraded, not found or node down", node)
1585       result = False
1586     else:
1587       result = result and (not rstats.data[idx])
1588   if dev.children:
1589     for child in dev.children:
1590       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1591
1592   return result
1593
1594
1595 class LUDiagnoseOS(NoHooksLU):
1596   """Logical unit for OS diagnose/query.
1597
1598   """
1599   _OP_REQP = ["output_fields", "names"]
1600   REQ_BGL = False
1601   _FIELDS_STATIC = utils.FieldSet()
1602   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1603
1604   def ExpandNames(self):
1605     if self.op.names:
1606       raise errors.OpPrereqError("Selective OS query not supported")
1607
1608     _CheckOutputFields(static=self._FIELDS_STATIC,
1609                        dynamic=self._FIELDS_DYNAMIC,
1610                        selected=self.op.output_fields)
1611
1612     # Lock all nodes, in shared mode
1613     self.needed_locks = {}
1614     self.share_locks[locking.LEVEL_NODE] = 1
1615     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1616
1617   def CheckPrereq(self):
1618     """Check prerequisites.
1619
1620     """
1621
1622   @staticmethod
1623   def _DiagnoseByOS(node_list, rlist):
1624     """Remaps a per-node return list into an a per-os per-node dictionary
1625
1626     @param node_list: a list with the names of all nodes
1627     @param rlist: a map with node names as keys and OS objects as values
1628
1629     @rtype: dict
1630     @returns: a dictionary with osnames as keys and as value another map, with
1631         nodes as keys and list of OS objects as values, eg::
1632
1633           {"debian-etch": {"node1": [<object>,...],
1634                            "node2": [<object>,]}
1635           }
1636
1637     """
1638     all_os = {}
1639     for node_name, nr in rlist.iteritems():
1640       if nr.failed or not nr.data:
1641         continue
1642       for os_obj in nr.data:
1643         if os_obj.name not in all_os:
1644           # build a list of nodes for this os containing empty lists
1645           # for each node in node_list
1646           all_os[os_obj.name] = {}
1647           for nname in node_list:
1648             all_os[os_obj.name][nname] = []
1649         all_os[os_obj.name][node_name].append(os_obj)
1650     return all_os
1651
1652   def Exec(self, feedback_fn):
1653     """Compute the list of OSes.
1654
1655     """
1656     node_list = self.acquired_locks[locking.LEVEL_NODE]
1657     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1658                    if node in node_list]
1659     node_data = self.rpc.call_os_diagnose(valid_nodes)
1660     if node_data == False:
1661       raise errors.OpExecError("Can't gather the list of OSes")
1662     pol = self._DiagnoseByOS(valid_nodes, node_data)
1663     output = []
1664     for os_name, os_data in pol.iteritems():
1665       row = []
1666       for field in self.op.output_fields:
1667         if field == "name":
1668           val = os_name
1669         elif field == "valid":
1670           val = utils.all([osl and osl[0] for osl in os_data.values()])
1671         elif field == "node_status":
1672           val = {}
1673           for node_name, nos_list in os_data.iteritems():
1674             val[node_name] = [(v.status, v.path) for v in nos_list]
1675         else:
1676           raise errors.ParameterError(field)
1677         row.append(val)
1678       output.append(row)
1679
1680     return output
1681
1682
1683 class LURemoveNode(LogicalUnit):
1684   """Logical unit for removing a node.
1685
1686   """
1687   HPATH = "node-remove"
1688   HTYPE = constants.HTYPE_NODE
1689   _OP_REQP = ["node_name"]
1690
1691   def BuildHooksEnv(self):
1692     """Build hooks env.
1693
1694     This doesn't run on the target node in the pre phase as a failed
1695     node would then be impossible to remove.
1696
1697     """
1698     env = {
1699       "OP_TARGET": self.op.node_name,
1700       "NODE_NAME": self.op.node_name,
1701       }
1702     all_nodes = self.cfg.GetNodeList()
1703     all_nodes.remove(self.op.node_name)
1704     return env, all_nodes, all_nodes
1705
1706   def CheckPrereq(self):
1707     """Check prerequisites.
1708
1709     This checks:
1710      - the node exists in the configuration
1711      - it does not have primary or secondary instances
1712      - it's not the master
1713
1714     Any errors are signalled by raising errors.OpPrereqError.
1715
1716     """
1717     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1718     if node is None:
1719       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1720
1721     instance_list = self.cfg.GetInstanceList()
1722
1723     masternode = self.cfg.GetMasterNode()
1724     if node.name == masternode:
1725       raise errors.OpPrereqError("Node is the master node,"
1726                                  " you need to failover first.")
1727
1728     for instance_name in instance_list:
1729       instance = self.cfg.GetInstanceInfo(instance_name)
1730       if node.name in instance.all_nodes:
1731         raise errors.OpPrereqError("Instance %s is still running on the node,"
1732                                    " please remove first." % instance_name)
1733     self.op.node_name = node.name
1734     self.node = node
1735
1736   def Exec(self, feedback_fn):
1737     """Removes the node from the cluster.
1738
1739     """
1740     node = self.node
1741     logging.info("Stopping the node daemon and removing configs from node %s",
1742                  node.name)
1743
1744     self.context.RemoveNode(node.name)
1745
1746     self.rpc.call_node_leave_cluster(node.name)
1747
1748     # Promote nodes to master candidate as needed
1749     _AdjustCandidatePool(self)
1750
1751
1752 class LUQueryNodes(NoHooksLU):
1753   """Logical unit for querying nodes.
1754
1755   """
1756   _OP_REQP = ["output_fields", "names"]
1757   REQ_BGL = False
1758   _FIELDS_DYNAMIC = utils.FieldSet(
1759     "dtotal", "dfree",
1760     "mtotal", "mnode", "mfree",
1761     "bootid",
1762     "ctotal",
1763     )
1764
1765   _FIELDS_STATIC = utils.FieldSet(
1766     "name", "pinst_cnt", "sinst_cnt",
1767     "pinst_list", "sinst_list",
1768     "pip", "sip", "tags",
1769     "serial_no",
1770     "master_candidate",
1771     "master",
1772     "offline",
1773     )
1774
1775   def ExpandNames(self):
1776     _CheckOutputFields(static=self._FIELDS_STATIC,
1777                        dynamic=self._FIELDS_DYNAMIC,
1778                        selected=self.op.output_fields)
1779
1780     self.needed_locks = {}
1781     self.share_locks[locking.LEVEL_NODE] = 1
1782
1783     if self.op.names:
1784       self.wanted = _GetWantedNodes(self, self.op.names)
1785     else:
1786       self.wanted = locking.ALL_SET
1787
1788     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1789     if self.do_locking:
1790       # if we don't request only static fields, we need to lock the nodes
1791       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1792
1793
1794   def CheckPrereq(self):
1795     """Check prerequisites.
1796
1797     """
1798     # The validation of the node list is done in the _GetWantedNodes,
1799     # if non empty, and if empty, there's no validation to do
1800     pass
1801
1802   def Exec(self, feedback_fn):
1803     """Computes the list of nodes and their attributes.
1804
1805     """
1806     all_info = self.cfg.GetAllNodesInfo()
1807     if self.do_locking:
1808       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1809     elif self.wanted != locking.ALL_SET:
1810       nodenames = self.wanted
1811       missing = set(nodenames).difference(all_info.keys())
1812       if missing:
1813         raise errors.OpExecError(
1814           "Some nodes were removed before retrieving their data: %s" % missing)
1815     else:
1816       nodenames = all_info.keys()
1817
1818     nodenames = utils.NiceSort(nodenames)
1819     nodelist = [all_info[name] for name in nodenames]
1820
1821     # begin data gathering
1822
1823     if self.do_locking:
1824       live_data = {}
1825       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1826                                           self.cfg.GetHypervisorType())
1827       for name in nodenames:
1828         nodeinfo = node_data[name]
1829         if not nodeinfo.failed and nodeinfo.data:
1830           nodeinfo = nodeinfo.data
1831           fn = utils.TryConvert
1832           live_data[name] = {
1833             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1834             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1835             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1836             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1837             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1838             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1839             "bootid": nodeinfo.get('bootid', None),
1840             }
1841         else:
1842           live_data[name] = {}
1843     else:
1844       live_data = dict.fromkeys(nodenames, {})
1845
1846     node_to_primary = dict([(name, set()) for name in nodenames])
1847     node_to_secondary = dict([(name, set()) for name in nodenames])
1848
1849     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1850                              "sinst_cnt", "sinst_list"))
1851     if inst_fields & frozenset(self.op.output_fields):
1852       instancelist = self.cfg.GetInstanceList()
1853
1854       for instance_name in instancelist:
1855         inst = self.cfg.GetInstanceInfo(instance_name)
1856         if inst.primary_node in node_to_primary:
1857           node_to_primary[inst.primary_node].add(inst.name)
1858         for secnode in inst.secondary_nodes:
1859           if secnode in node_to_secondary:
1860             node_to_secondary[secnode].add(inst.name)
1861
1862     master_node = self.cfg.GetMasterNode()
1863
1864     # end data gathering
1865
1866     output = []
1867     for node in nodelist:
1868       node_output = []
1869       for field in self.op.output_fields:
1870         if field == "name":
1871           val = node.name
1872         elif field == "pinst_list":
1873           val = list(node_to_primary[node.name])
1874         elif field == "sinst_list":
1875           val = list(node_to_secondary[node.name])
1876         elif field == "pinst_cnt":
1877           val = len(node_to_primary[node.name])
1878         elif field == "sinst_cnt":
1879           val = len(node_to_secondary[node.name])
1880         elif field == "pip":
1881           val = node.primary_ip
1882         elif field == "sip":
1883           val = node.secondary_ip
1884         elif field == "tags":
1885           val = list(node.GetTags())
1886         elif field == "serial_no":
1887           val = node.serial_no
1888         elif field == "master_candidate":
1889           val = node.master_candidate
1890         elif field == "master":
1891           val = node.name == master_node
1892         elif field == "offline":
1893           val = node.offline
1894         elif self._FIELDS_DYNAMIC.Matches(field):
1895           val = live_data[node.name].get(field, None)
1896         else:
1897           raise errors.ParameterError(field)
1898         node_output.append(val)
1899       output.append(node_output)
1900
1901     return output
1902
1903
1904 class LUQueryNodeVolumes(NoHooksLU):
1905   """Logical unit for getting volumes on node(s).
1906
1907   """
1908   _OP_REQP = ["nodes", "output_fields"]
1909   REQ_BGL = False
1910   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1911   _FIELDS_STATIC = utils.FieldSet("node")
1912
1913   def ExpandNames(self):
1914     _CheckOutputFields(static=self._FIELDS_STATIC,
1915                        dynamic=self._FIELDS_DYNAMIC,
1916                        selected=self.op.output_fields)
1917
1918     self.needed_locks = {}
1919     self.share_locks[locking.LEVEL_NODE] = 1
1920     if not self.op.nodes:
1921       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1922     else:
1923       self.needed_locks[locking.LEVEL_NODE] = \
1924         _GetWantedNodes(self, self.op.nodes)
1925
1926   def CheckPrereq(self):
1927     """Check prerequisites.
1928
1929     This checks that the fields required are valid output fields.
1930
1931     """
1932     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1933
1934   def Exec(self, feedback_fn):
1935     """Computes the list of nodes and their attributes.
1936
1937     """
1938     nodenames = self.nodes
1939     volumes = self.rpc.call_node_volumes(nodenames)
1940
1941     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1942              in self.cfg.GetInstanceList()]
1943
1944     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1945
1946     output = []
1947     for node in nodenames:
1948       if node not in volumes or volumes[node].failed or not volumes[node].data:
1949         continue
1950
1951       node_vols = volumes[node].data[:]
1952       node_vols.sort(key=lambda vol: vol['dev'])
1953
1954       for vol in node_vols:
1955         node_output = []
1956         for field in self.op.output_fields:
1957           if field == "node":
1958             val = node
1959           elif field == "phys":
1960             val = vol['dev']
1961           elif field == "vg":
1962             val = vol['vg']
1963           elif field == "name":
1964             val = vol['name']
1965           elif field == "size":
1966             val = int(float(vol['size']))
1967           elif field == "instance":
1968             for inst in ilist:
1969               if node not in lv_by_node[inst]:
1970                 continue
1971               if vol['name'] in lv_by_node[inst][node]:
1972                 val = inst.name
1973                 break
1974             else:
1975               val = '-'
1976           else:
1977             raise errors.ParameterError(field)
1978           node_output.append(str(val))
1979
1980         output.append(node_output)
1981
1982     return output
1983
1984
1985 class LUAddNode(LogicalUnit):
1986   """Logical unit for adding node to the cluster.
1987
1988   """
1989   HPATH = "node-add"
1990   HTYPE = constants.HTYPE_NODE
1991   _OP_REQP = ["node_name"]
1992
1993   def BuildHooksEnv(self):
1994     """Build hooks env.
1995
1996     This will run on all nodes before, and on all nodes + the new node after.
1997
1998     """
1999     env = {
2000       "OP_TARGET": self.op.node_name,
2001       "NODE_NAME": self.op.node_name,
2002       "NODE_PIP": self.op.primary_ip,
2003       "NODE_SIP": self.op.secondary_ip,
2004       }
2005     nodes_0 = self.cfg.GetNodeList()
2006     nodes_1 = nodes_0 + [self.op.node_name, ]
2007     return env, nodes_0, nodes_1
2008
2009   def CheckPrereq(self):
2010     """Check prerequisites.
2011
2012     This checks:
2013      - the new node is not already in the config
2014      - it is resolvable
2015      - its parameters (single/dual homed) matches the cluster
2016
2017     Any errors are signalled by raising errors.OpPrereqError.
2018
2019     """
2020     node_name = self.op.node_name
2021     cfg = self.cfg
2022
2023     dns_data = utils.HostInfo(node_name)
2024
2025     node = dns_data.name
2026     primary_ip = self.op.primary_ip = dns_data.ip
2027     secondary_ip = getattr(self.op, "secondary_ip", None)
2028     if secondary_ip is None:
2029       secondary_ip = primary_ip
2030     if not utils.IsValidIP(secondary_ip):
2031       raise errors.OpPrereqError("Invalid secondary IP given")
2032     self.op.secondary_ip = secondary_ip
2033
2034     node_list = cfg.GetNodeList()
2035     if not self.op.readd and node in node_list:
2036       raise errors.OpPrereqError("Node %s is already in the configuration" %
2037                                  node)
2038     elif self.op.readd and node not in node_list:
2039       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2040
2041     for existing_node_name in node_list:
2042       existing_node = cfg.GetNodeInfo(existing_node_name)
2043
2044       if self.op.readd and node == existing_node_name:
2045         if (existing_node.primary_ip != primary_ip or
2046             existing_node.secondary_ip != secondary_ip):
2047           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2048                                      " address configuration as before")
2049         continue
2050
2051       if (existing_node.primary_ip == primary_ip or
2052           existing_node.secondary_ip == primary_ip or
2053           existing_node.primary_ip == secondary_ip or
2054           existing_node.secondary_ip == secondary_ip):
2055         raise errors.OpPrereqError("New node ip address(es) conflict with"
2056                                    " existing node %s" % existing_node.name)
2057
2058     # check that the type of the node (single versus dual homed) is the
2059     # same as for the master
2060     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2061     master_singlehomed = myself.secondary_ip == myself.primary_ip
2062     newbie_singlehomed = secondary_ip == primary_ip
2063     if master_singlehomed != newbie_singlehomed:
2064       if master_singlehomed:
2065         raise errors.OpPrereqError("The master has no private ip but the"
2066                                    " new node has one")
2067       else:
2068         raise errors.OpPrereqError("The master has a private ip but the"
2069                                    " new node doesn't have one")
2070
2071     # checks reachablity
2072     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2073       raise errors.OpPrereqError("Node not reachable by ping")
2074
2075     if not newbie_singlehomed:
2076       # check reachability from my secondary ip to newbie's secondary ip
2077       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2078                            source=myself.secondary_ip):
2079         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2080                                    " based ping to noded port")
2081
2082     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2083     mc_now, _ = self.cfg.GetMasterCandidateStats()
2084     master_candidate = mc_now < cp_size
2085
2086     self.new_node = objects.Node(name=node,
2087                                  primary_ip=primary_ip,
2088                                  secondary_ip=secondary_ip,
2089                                  master_candidate=master_candidate,
2090                                  offline=False)
2091
2092   def Exec(self, feedback_fn):
2093     """Adds the new node to the cluster.
2094
2095     """
2096     new_node = self.new_node
2097     node = new_node.name
2098
2099     # check connectivity
2100     result = self.rpc.call_version([node])[node]
2101     result.Raise()
2102     if result.data:
2103       if constants.PROTOCOL_VERSION == result.data:
2104         logging.info("Communication to node %s fine, sw version %s match",
2105                      node, result.data)
2106       else:
2107         raise errors.OpExecError("Version mismatch master version %s,"
2108                                  " node version %s" %
2109                                  (constants.PROTOCOL_VERSION, result.data))
2110     else:
2111       raise errors.OpExecError("Cannot get version from the new node")
2112
2113     # setup ssh on node
2114     logging.info("Copy ssh key to node %s", node)
2115     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2116     keyarray = []
2117     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2118                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2119                 priv_key, pub_key]
2120
2121     for i in keyfiles:
2122       f = open(i, 'r')
2123       try:
2124         keyarray.append(f.read())
2125       finally:
2126         f.close()
2127
2128     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2129                                     keyarray[2],
2130                                     keyarray[3], keyarray[4], keyarray[5])
2131
2132     msg = result.RemoteFailMsg()
2133     if msg:
2134       raise errors.OpExecError("Cannot transfer ssh keys to the"
2135                                " new node: %s" % msg)
2136
2137     # Add node to our /etc/hosts, and add key to known_hosts
2138     utils.AddHostToEtcHosts(new_node.name)
2139
2140     if new_node.secondary_ip != new_node.primary_ip:
2141       result = self.rpc.call_node_has_ip_address(new_node.name,
2142                                                  new_node.secondary_ip)
2143       if result.failed or not result.data:
2144         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2145                                  " you gave (%s). Please fix and re-run this"
2146                                  " command." % new_node.secondary_ip)
2147
2148     node_verify_list = [self.cfg.GetMasterNode()]
2149     node_verify_param = {
2150       'nodelist': [node],
2151       # TODO: do a node-net-test as well?
2152     }
2153
2154     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2155                                        self.cfg.GetClusterName())
2156     for verifier in node_verify_list:
2157       if result[verifier].failed or not result[verifier].data:
2158         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2159                                  " for remote verification" % verifier)
2160       if result[verifier].data['nodelist']:
2161         for failed in result[verifier].data['nodelist']:
2162           feedback_fn("ssh/hostname verification failed %s -> %s" %
2163                       (verifier, result[verifier]['nodelist'][failed]))
2164         raise errors.OpExecError("ssh/hostname verification failed.")
2165
2166     # Distribute updated /etc/hosts and known_hosts to all nodes,
2167     # including the node just added
2168     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2169     dist_nodes = self.cfg.GetNodeList()
2170     if not self.op.readd:
2171       dist_nodes.append(node)
2172     if myself.name in dist_nodes:
2173       dist_nodes.remove(myself.name)
2174
2175     logging.debug("Copying hosts and known_hosts to all nodes")
2176     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2177       result = self.rpc.call_upload_file(dist_nodes, fname)
2178       for to_node, to_result in result.iteritems():
2179         if to_result.failed or not to_result.data:
2180           logging.error("Copy of file %s to node %s failed", fname, to_node)
2181
2182     to_copy = []
2183     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2184     if constants.HTS_USE_VNC.intersection(enabled_hypervisors):
2185       to_copy.append(constants.VNC_PASSWORD_FILE)
2186
2187     for fname in to_copy:
2188       result = self.rpc.call_upload_file([node], fname)
2189       if result[node].failed or not result[node]:
2190         logging.error("Could not copy file %s to node %s", fname, node)
2191
2192     if self.op.readd:
2193       self.context.ReaddNode(new_node)
2194     else:
2195       self.context.AddNode(new_node)
2196
2197
2198 class LUSetNodeParams(LogicalUnit):
2199   """Modifies the parameters of a node.
2200
2201   """
2202   HPATH = "node-modify"
2203   HTYPE = constants.HTYPE_NODE
2204   _OP_REQP = ["node_name"]
2205   REQ_BGL = False
2206
2207   def CheckArguments(self):
2208     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2209     if node_name is None:
2210       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2211     self.op.node_name = node_name
2212     _CheckBooleanOpField(self.op, 'master_candidate')
2213     _CheckBooleanOpField(self.op, 'offline')
2214     if self.op.master_candidate is None and self.op.offline is None:
2215       raise errors.OpPrereqError("Please pass at least one modification")
2216     if self.op.offline == True and self.op.master_candidate == True:
2217       raise errors.OpPrereqError("Can't set the node into offline and"
2218                                  " master_candidate at the same time")
2219
2220   def ExpandNames(self):
2221     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2222
2223   def BuildHooksEnv(self):
2224     """Build hooks env.
2225
2226     This runs on the master node.
2227
2228     """
2229     env = {
2230       "OP_TARGET": self.op.node_name,
2231       "MASTER_CANDIDATE": str(self.op.master_candidate),
2232       "OFFLINE": str(self.op.offline),
2233       }
2234     nl = [self.cfg.GetMasterNode(),
2235           self.op.node_name]
2236     return env, nl, nl
2237
2238   def CheckPrereq(self):
2239     """Check prerequisites.
2240
2241     This only checks the instance list against the existing names.
2242
2243     """
2244     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2245
2246     if ((self.op.master_candidate == False or self.op.offline == True)
2247         and node.master_candidate):
2248       # we will demote the node from master_candidate
2249       if self.op.node_name == self.cfg.GetMasterNode():
2250         raise errors.OpPrereqError("The master node has to be a"
2251                                    " master candidate and online")
2252       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2253       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2254       if num_candidates <= cp_size:
2255         msg = ("Not enough master candidates (desired"
2256                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2257         if self.op.force:
2258           self.LogWarning(msg)
2259         else:
2260           raise errors.OpPrereqError(msg)
2261
2262     if (self.op.master_candidate == True and node.offline and
2263         not self.op.offline == False):
2264       raise errors.OpPrereqError("Can't set an offline node to"
2265                                  " master_candidate")
2266
2267     return
2268
2269   def Exec(self, feedback_fn):
2270     """Modifies a node.
2271
2272     """
2273     node = self.node
2274
2275     result = []
2276
2277     if self.op.offline is not None:
2278       node.offline = self.op.offline
2279       result.append(("offline", str(self.op.offline)))
2280       if self.op.offline == True and node.master_candidate:
2281         node.master_candidate = False
2282         result.append(("master_candidate", "auto-demotion due to offline"))
2283
2284     if self.op.master_candidate is not None:
2285       node.master_candidate = self.op.master_candidate
2286       result.append(("master_candidate", str(self.op.master_candidate)))
2287       if self.op.master_candidate == False:
2288         rrc = self.rpc.call_node_demote_from_mc(node.name)
2289         if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2290             or len(rrc.data) != 2):
2291           self.LogWarning("Node rpc error: %s" % rrc.error)
2292         elif not rrc.data[0]:
2293           self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2294
2295     # this will trigger configuration file update, if needed
2296     self.cfg.Update(node)
2297     # this will trigger job queue propagation or cleanup
2298     if self.op.node_name != self.cfg.GetMasterNode():
2299       self.context.ReaddNode(node)
2300
2301     return result
2302
2303
2304 class LUQueryClusterInfo(NoHooksLU):
2305   """Query cluster configuration.
2306
2307   """
2308   _OP_REQP = []
2309   REQ_BGL = False
2310
2311   def ExpandNames(self):
2312     self.needed_locks = {}
2313
2314   def CheckPrereq(self):
2315     """No prerequsites needed for this LU.
2316
2317     """
2318     pass
2319
2320   def Exec(self, feedback_fn):
2321     """Return cluster config.
2322
2323     """
2324     cluster = self.cfg.GetClusterInfo()
2325     result = {
2326       "software_version": constants.RELEASE_VERSION,
2327       "protocol_version": constants.PROTOCOL_VERSION,
2328       "config_version": constants.CONFIG_VERSION,
2329       "os_api_version": constants.OS_API_VERSION,
2330       "export_version": constants.EXPORT_VERSION,
2331       "architecture": (platform.architecture()[0], platform.machine()),
2332       "name": cluster.cluster_name,
2333       "master": cluster.master_node,
2334       "default_hypervisor": cluster.default_hypervisor,
2335       "enabled_hypervisors": cluster.enabled_hypervisors,
2336       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2337                         for hypervisor in cluster.enabled_hypervisors]),
2338       "beparams": cluster.beparams,
2339       "candidate_pool_size": cluster.candidate_pool_size,
2340       }
2341
2342     return result
2343
2344
2345 class LUQueryConfigValues(NoHooksLU):
2346   """Return configuration values.
2347
2348   """
2349   _OP_REQP = []
2350   REQ_BGL = False
2351   _FIELDS_DYNAMIC = utils.FieldSet()
2352   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2353
2354   def ExpandNames(self):
2355     self.needed_locks = {}
2356
2357     _CheckOutputFields(static=self._FIELDS_STATIC,
2358                        dynamic=self._FIELDS_DYNAMIC,
2359                        selected=self.op.output_fields)
2360
2361   def CheckPrereq(self):
2362     """No prerequisites.
2363
2364     """
2365     pass
2366
2367   def Exec(self, feedback_fn):
2368     """Dump a representation of the cluster config to the standard output.
2369
2370     """
2371     values = []
2372     for field in self.op.output_fields:
2373       if field == "cluster_name":
2374         entry = self.cfg.GetClusterName()
2375       elif field == "master_node":
2376         entry = self.cfg.GetMasterNode()
2377       elif field == "drain_flag":
2378         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2379       else:
2380         raise errors.ParameterError(field)
2381       values.append(entry)
2382     return values
2383
2384
2385 class LUActivateInstanceDisks(NoHooksLU):
2386   """Bring up an instance's disks.
2387
2388   """
2389   _OP_REQP = ["instance_name"]
2390   REQ_BGL = False
2391
2392   def ExpandNames(self):
2393     self._ExpandAndLockInstance()
2394     self.needed_locks[locking.LEVEL_NODE] = []
2395     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2396
2397   def DeclareLocks(self, level):
2398     if level == locking.LEVEL_NODE:
2399       self._LockInstancesNodes()
2400
2401   def CheckPrereq(self):
2402     """Check prerequisites.
2403
2404     This checks that the instance is in the cluster.
2405
2406     """
2407     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2408     assert self.instance is not None, \
2409       "Cannot retrieve locked instance %s" % self.op.instance_name
2410     _CheckNodeOnline(self, self.instance.primary_node)
2411
2412   def Exec(self, feedback_fn):
2413     """Activate the disks.
2414
2415     """
2416     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2417     if not disks_ok:
2418       raise errors.OpExecError("Cannot activate block devices")
2419
2420     return disks_info
2421
2422
2423 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2424   """Prepare the block devices for an instance.
2425
2426   This sets up the block devices on all nodes.
2427
2428   @type lu: L{LogicalUnit}
2429   @param lu: the logical unit on whose behalf we execute
2430   @type instance: L{objects.Instance}
2431   @param instance: the instance for whose disks we assemble
2432   @type ignore_secondaries: boolean
2433   @param ignore_secondaries: if true, errors on secondary nodes
2434       won't result in an error return from the function
2435   @return: False if the operation failed, otherwise a list of
2436       (host, instance_visible_name, node_visible_name)
2437       with the mapping from node devices to instance devices
2438
2439   """
2440   device_info = []
2441   disks_ok = True
2442   iname = instance.name
2443   # With the two passes mechanism we try to reduce the window of
2444   # opportunity for the race condition of switching DRBD to primary
2445   # before handshaking occured, but we do not eliminate it
2446
2447   # The proper fix would be to wait (with some limits) until the
2448   # connection has been made and drbd transitions from WFConnection
2449   # into any other network-connected state (Connected, SyncTarget,
2450   # SyncSource, etc.)
2451
2452   # 1st pass, assemble on all nodes in secondary mode
2453   for inst_disk in instance.disks:
2454     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2455       lu.cfg.SetDiskID(node_disk, node)
2456       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2457       if result.failed or not result:
2458         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2459                            " (is_primary=False, pass=1)",
2460                            inst_disk.iv_name, node)
2461         if not ignore_secondaries:
2462           disks_ok = False
2463
2464   # FIXME: race condition on drbd migration to primary
2465
2466   # 2nd pass, do only the primary node
2467   for inst_disk in instance.disks:
2468     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2469       if node != instance.primary_node:
2470         continue
2471       lu.cfg.SetDiskID(node_disk, node)
2472       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2473       if result.failed or not result:
2474         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2475                            " (is_primary=True, pass=2)",
2476                            inst_disk.iv_name, node)
2477         disks_ok = False
2478     device_info.append((instance.primary_node, inst_disk.iv_name, result.data))
2479
2480   # leave the disks configured for the primary node
2481   # this is a workaround that would be fixed better by
2482   # improving the logical/physical id handling
2483   for disk in instance.disks:
2484     lu.cfg.SetDiskID(disk, instance.primary_node)
2485
2486   return disks_ok, device_info
2487
2488
2489 def _StartInstanceDisks(lu, instance, force):
2490   """Start the disks of an instance.
2491
2492   """
2493   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2494                                            ignore_secondaries=force)
2495   if not disks_ok:
2496     _ShutdownInstanceDisks(lu, instance)
2497     if force is not None and not force:
2498       lu.proc.LogWarning("", hint="If the message above refers to a"
2499                          " secondary node,"
2500                          " you can retry the operation using '--force'.")
2501     raise errors.OpExecError("Disk consistency error")
2502
2503
2504 class LUDeactivateInstanceDisks(NoHooksLU):
2505   """Shutdown an instance's disks.
2506
2507   """
2508   _OP_REQP = ["instance_name"]
2509   REQ_BGL = False
2510
2511   def ExpandNames(self):
2512     self._ExpandAndLockInstance()
2513     self.needed_locks[locking.LEVEL_NODE] = []
2514     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2515
2516   def DeclareLocks(self, level):
2517     if level == locking.LEVEL_NODE:
2518       self._LockInstancesNodes()
2519
2520   def CheckPrereq(self):
2521     """Check prerequisites.
2522
2523     This checks that the instance is in the cluster.
2524
2525     """
2526     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2527     assert self.instance is not None, \
2528       "Cannot retrieve locked instance %s" % self.op.instance_name
2529
2530   def Exec(self, feedback_fn):
2531     """Deactivate the disks
2532
2533     """
2534     instance = self.instance
2535     _SafeShutdownInstanceDisks(self, instance)
2536
2537
2538 def _SafeShutdownInstanceDisks(lu, instance):
2539   """Shutdown block devices of an instance.
2540
2541   This function checks if an instance is running, before calling
2542   _ShutdownInstanceDisks.
2543
2544   """
2545   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2546                                       [instance.hypervisor])
2547   ins_l = ins_l[instance.primary_node]
2548   if ins_l.failed or not isinstance(ins_l.data, list):
2549     raise errors.OpExecError("Can't contact node '%s'" %
2550                              instance.primary_node)
2551
2552   if instance.name in ins_l.data:
2553     raise errors.OpExecError("Instance is running, can't shutdown"
2554                              " block devices.")
2555
2556   _ShutdownInstanceDisks(lu, instance)
2557
2558
2559 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2560   """Shutdown block devices of an instance.
2561
2562   This does the shutdown on all nodes of the instance.
2563
2564   If the ignore_primary is false, errors on the primary node are
2565   ignored.
2566
2567   """
2568   result = True
2569   for disk in instance.disks:
2570     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2571       lu.cfg.SetDiskID(top_disk, node)
2572       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2573       if result.failed or not result.data:
2574         logging.error("Could not shutdown block device %s on node %s",
2575                       disk.iv_name, node)
2576         if not ignore_primary or node != instance.primary_node:
2577           result = False
2578   return result
2579
2580
2581 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2582   """Checks if a node has enough free memory.
2583
2584   This function check if a given node has the needed amount of free
2585   memory. In case the node has less memory or we cannot get the
2586   information from the node, this function raise an OpPrereqError
2587   exception.
2588
2589   @type lu: C{LogicalUnit}
2590   @param lu: a logical unit from which we get configuration data
2591   @type node: C{str}
2592   @param node: the node to check
2593   @type reason: C{str}
2594   @param reason: string to use in the error message
2595   @type requested: C{int}
2596   @param requested: the amount of memory in MiB to check for
2597   @type hypervisor_name: C{str}
2598   @param hypervisor_name: the hypervisor to ask for memory stats
2599   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2600       we cannot check the node
2601
2602   """
2603   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2604   nodeinfo[node].Raise()
2605   free_mem = nodeinfo[node].data.get('memory_free')
2606   if not isinstance(free_mem, int):
2607     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2608                              " was '%s'" % (node, free_mem))
2609   if requested > free_mem:
2610     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2611                              " needed %s MiB, available %s MiB" %
2612                              (node, reason, requested, free_mem))
2613
2614
2615 class LUStartupInstance(LogicalUnit):
2616   """Starts an instance.
2617
2618   """
2619   HPATH = "instance-start"
2620   HTYPE = constants.HTYPE_INSTANCE
2621   _OP_REQP = ["instance_name", "force"]
2622   REQ_BGL = False
2623
2624   def ExpandNames(self):
2625     self._ExpandAndLockInstance()
2626
2627   def BuildHooksEnv(self):
2628     """Build hooks env.
2629
2630     This runs on master, primary and secondary nodes of the instance.
2631
2632     """
2633     env = {
2634       "FORCE": self.op.force,
2635       }
2636     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2637     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2638     return env, nl, nl
2639
2640   def CheckPrereq(self):
2641     """Check prerequisites.
2642
2643     This checks that the instance is in the cluster.
2644
2645     """
2646     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2647     assert self.instance is not None, \
2648       "Cannot retrieve locked instance %s" % self.op.instance_name
2649
2650     _CheckNodeOnline(self, instance.primary_node)
2651
2652     bep = self.cfg.GetClusterInfo().FillBE(instance)
2653     # check bridges existance
2654     _CheckInstanceBridgesExist(self, instance)
2655
2656     _CheckNodeFreeMemory(self, instance.primary_node,
2657                          "starting instance %s" % instance.name,
2658                          bep[constants.BE_MEMORY], instance.hypervisor)
2659
2660   def Exec(self, feedback_fn):
2661     """Start the instance.
2662
2663     """
2664     instance = self.instance
2665     force = self.op.force
2666     extra_args = getattr(self.op, "extra_args", "")
2667
2668     self.cfg.MarkInstanceUp(instance.name)
2669
2670     node_current = instance.primary_node
2671
2672     _StartInstanceDisks(self, instance, force)
2673
2674     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2675     msg = result.RemoteFailMsg()
2676     if msg:
2677       _ShutdownInstanceDisks(self, instance)
2678       raise errors.OpExecError("Could not start instance: %s" % msg)
2679
2680
2681 class LURebootInstance(LogicalUnit):
2682   """Reboot an instance.
2683
2684   """
2685   HPATH = "instance-reboot"
2686   HTYPE = constants.HTYPE_INSTANCE
2687   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2688   REQ_BGL = False
2689
2690   def ExpandNames(self):
2691     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2692                                    constants.INSTANCE_REBOOT_HARD,
2693                                    constants.INSTANCE_REBOOT_FULL]:
2694       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2695                                   (constants.INSTANCE_REBOOT_SOFT,
2696                                    constants.INSTANCE_REBOOT_HARD,
2697                                    constants.INSTANCE_REBOOT_FULL))
2698     self._ExpandAndLockInstance()
2699
2700   def BuildHooksEnv(self):
2701     """Build hooks env.
2702
2703     This runs on master, primary and secondary nodes of the instance.
2704
2705     """
2706     env = {
2707       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2708       }
2709     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2710     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2711     return env, nl, nl
2712
2713   def CheckPrereq(self):
2714     """Check prerequisites.
2715
2716     This checks that the instance is in the cluster.
2717
2718     """
2719     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2720     assert self.instance is not None, \
2721       "Cannot retrieve locked instance %s" % self.op.instance_name
2722
2723     _CheckNodeOnline(self, instance.primary_node)
2724
2725     # check bridges existance
2726     _CheckInstanceBridgesExist(self, instance)
2727
2728   def Exec(self, feedback_fn):
2729     """Reboot the instance.
2730
2731     """
2732     instance = self.instance
2733     ignore_secondaries = self.op.ignore_secondaries
2734     reboot_type = self.op.reboot_type
2735     extra_args = getattr(self.op, "extra_args", "")
2736
2737     node_current = instance.primary_node
2738
2739     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2740                        constants.INSTANCE_REBOOT_HARD]:
2741       result = self.rpc.call_instance_reboot(node_current, instance,
2742                                              reboot_type, extra_args)
2743       if result.failed or not result.data:
2744         raise errors.OpExecError("Could not reboot instance")
2745     else:
2746       if not self.rpc.call_instance_shutdown(node_current, instance):
2747         raise errors.OpExecError("could not shutdown instance for full reboot")
2748       _ShutdownInstanceDisks(self, instance)
2749       _StartInstanceDisks(self, instance, ignore_secondaries)
2750       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2751       msg = result.RemoteFailMsg()
2752       if msg:
2753         _ShutdownInstanceDisks(self, instance)
2754         raise errors.OpExecError("Could not start instance for"
2755                                  " full reboot: %s" % msg)
2756
2757     self.cfg.MarkInstanceUp(instance.name)
2758
2759
2760 class LUShutdownInstance(LogicalUnit):
2761   """Shutdown an instance.
2762
2763   """
2764   HPATH = "instance-stop"
2765   HTYPE = constants.HTYPE_INSTANCE
2766   _OP_REQP = ["instance_name"]
2767   REQ_BGL = False
2768
2769   def ExpandNames(self):
2770     self._ExpandAndLockInstance()
2771
2772   def BuildHooksEnv(self):
2773     """Build hooks env.
2774
2775     This runs on master, primary and secondary nodes of the instance.
2776
2777     """
2778     env = _BuildInstanceHookEnvByObject(self, self.instance)
2779     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2780     return env, nl, nl
2781
2782   def CheckPrereq(self):
2783     """Check prerequisites.
2784
2785     This checks that the instance is in the cluster.
2786
2787     """
2788     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2789     assert self.instance is not None, \
2790       "Cannot retrieve locked instance %s" % self.op.instance_name
2791     _CheckNodeOnline(self, self.instance.primary_node)
2792
2793   def Exec(self, feedback_fn):
2794     """Shutdown the instance.
2795
2796     """
2797     instance = self.instance
2798     node_current = instance.primary_node
2799     self.cfg.MarkInstanceDown(instance.name)
2800     result = self.rpc.call_instance_shutdown(node_current, instance)
2801     if result.failed or not result.data:
2802       self.proc.LogWarning("Could not shutdown instance")
2803
2804     _ShutdownInstanceDisks(self, instance)
2805
2806
2807 class LUReinstallInstance(LogicalUnit):
2808   """Reinstall an instance.
2809
2810   """
2811   HPATH = "instance-reinstall"
2812   HTYPE = constants.HTYPE_INSTANCE
2813   _OP_REQP = ["instance_name"]
2814   REQ_BGL = False
2815
2816   def ExpandNames(self):
2817     self._ExpandAndLockInstance()
2818
2819   def BuildHooksEnv(self):
2820     """Build hooks env.
2821
2822     This runs on master, primary and secondary nodes of the instance.
2823
2824     """
2825     env = _BuildInstanceHookEnvByObject(self, self.instance)
2826     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2827     return env, nl, nl
2828
2829   def CheckPrereq(self):
2830     """Check prerequisites.
2831
2832     This checks that the instance is in the cluster and is not running.
2833
2834     """
2835     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2836     assert instance is not None, \
2837       "Cannot retrieve locked instance %s" % self.op.instance_name
2838     _CheckNodeOnline(self, instance.primary_node)
2839
2840     if instance.disk_template == constants.DT_DISKLESS:
2841       raise errors.OpPrereqError("Instance '%s' has no disks" %
2842                                  self.op.instance_name)
2843     if instance.admin_up:
2844       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2845                                  self.op.instance_name)
2846     remote_info = self.rpc.call_instance_info(instance.primary_node,
2847                                               instance.name,
2848                                               instance.hypervisor)
2849     if remote_info.failed or remote_info.data:
2850       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2851                                  (self.op.instance_name,
2852                                   instance.primary_node))
2853
2854     self.op.os_type = getattr(self.op, "os_type", None)
2855     if self.op.os_type is not None:
2856       # OS verification
2857       pnode = self.cfg.GetNodeInfo(
2858         self.cfg.ExpandNodeName(instance.primary_node))
2859       if pnode is None:
2860         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2861                                    self.op.pnode)
2862       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2863       result.Raise()
2864       if not isinstance(result.data, objects.OS):
2865         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2866                                    " primary node"  % self.op.os_type)
2867
2868     self.instance = instance
2869
2870   def Exec(self, feedback_fn):
2871     """Reinstall the instance.
2872
2873     """
2874     inst = self.instance
2875
2876     if self.op.os_type is not None:
2877       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2878       inst.os = self.op.os_type
2879       self.cfg.Update(inst)
2880
2881     _StartInstanceDisks(self, inst, None)
2882     try:
2883       feedback_fn("Running the instance OS create scripts...")
2884       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2885       msg = result.RemoteFailMsg()
2886       if msg:
2887         raise errors.OpExecError("Could not install OS for instance %s"
2888                                  " on node %s: %s" %
2889                                  (inst.name, inst.primary_node, msg))
2890     finally:
2891       _ShutdownInstanceDisks(self, inst)
2892
2893
2894 class LURenameInstance(LogicalUnit):
2895   """Rename an instance.
2896
2897   """
2898   HPATH = "instance-rename"
2899   HTYPE = constants.HTYPE_INSTANCE
2900   _OP_REQP = ["instance_name", "new_name"]
2901
2902   def BuildHooksEnv(self):
2903     """Build hooks env.
2904
2905     This runs on master, primary and secondary nodes of the instance.
2906
2907     """
2908     env = _BuildInstanceHookEnvByObject(self, self.instance)
2909     env["INSTANCE_NEW_NAME"] = self.op.new_name
2910     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2911     return env, nl, nl
2912
2913   def CheckPrereq(self):
2914     """Check prerequisites.
2915
2916     This checks that the instance is in the cluster and is not running.
2917
2918     """
2919     instance = self.cfg.GetInstanceInfo(
2920       self.cfg.ExpandInstanceName(self.op.instance_name))
2921     if instance is None:
2922       raise errors.OpPrereqError("Instance '%s' not known" %
2923                                  self.op.instance_name)
2924     _CheckNodeOnline(self, instance.primary_node)
2925
2926     if instance.admin_up:
2927       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2928                                  self.op.instance_name)
2929     remote_info = self.rpc.call_instance_info(instance.primary_node,
2930                                               instance.name,
2931                                               instance.hypervisor)
2932     remote_info.Raise()
2933     if remote_info.data:
2934       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2935                                  (self.op.instance_name,
2936                                   instance.primary_node))
2937     self.instance = instance
2938
2939     # new name verification
2940     name_info = utils.HostInfo(self.op.new_name)
2941
2942     self.op.new_name = new_name = name_info.name
2943     instance_list = self.cfg.GetInstanceList()
2944     if new_name in instance_list:
2945       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2946                                  new_name)
2947
2948     if not getattr(self.op, "ignore_ip", False):
2949       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2950         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2951                                    (name_info.ip, new_name))
2952
2953
2954   def Exec(self, feedback_fn):
2955     """Reinstall the instance.
2956
2957     """
2958     inst = self.instance
2959     old_name = inst.name
2960
2961     if inst.disk_template == constants.DT_FILE:
2962       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2963
2964     self.cfg.RenameInstance(inst.name, self.op.new_name)
2965     # Change the instance lock. This is definitely safe while we hold the BGL
2966     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2967     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2968
2969     # re-read the instance from the configuration after rename
2970     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2971
2972     if inst.disk_template == constants.DT_FILE:
2973       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2974       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2975                                                      old_file_storage_dir,
2976                                                      new_file_storage_dir)
2977       result.Raise()
2978       if not result.data:
2979         raise errors.OpExecError("Could not connect to node '%s' to rename"
2980                                  " directory '%s' to '%s' (but the instance"
2981                                  " has been renamed in Ganeti)" % (
2982                                  inst.primary_node, old_file_storage_dir,
2983                                  new_file_storage_dir))
2984
2985       if not result.data[0]:
2986         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2987                                  " (but the instance has been renamed in"
2988                                  " Ganeti)" % (old_file_storage_dir,
2989                                                new_file_storage_dir))
2990
2991     _StartInstanceDisks(self, inst, None)
2992     try:
2993       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2994                                                  old_name)
2995       msg = result.RemoteFailMsg()
2996       if msg:
2997         msg = ("Could not run OS rename script for instance %s on node %s"
2998                " (but the instance has been renamed in Ganeti): %s" %
2999                (inst.name, inst.primary_node, msg))
3000         self.proc.LogWarning(msg)
3001     finally:
3002       _ShutdownInstanceDisks(self, inst)
3003
3004
3005 class LURemoveInstance(LogicalUnit):
3006   """Remove an instance.
3007
3008   """
3009   HPATH = "instance-remove"
3010   HTYPE = constants.HTYPE_INSTANCE
3011   _OP_REQP = ["instance_name", "ignore_failures"]
3012   REQ_BGL = False
3013
3014   def ExpandNames(self):
3015     self._ExpandAndLockInstance()
3016     self.needed_locks[locking.LEVEL_NODE] = []
3017     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3018
3019   def DeclareLocks(self, level):
3020     if level == locking.LEVEL_NODE:
3021       self._LockInstancesNodes()
3022
3023   def BuildHooksEnv(self):
3024     """Build hooks env.
3025
3026     This runs on master, primary and secondary nodes of the instance.
3027
3028     """
3029     env = _BuildInstanceHookEnvByObject(self, self.instance)
3030     nl = [self.cfg.GetMasterNode()]
3031     return env, nl, nl
3032
3033   def CheckPrereq(self):
3034     """Check prerequisites.
3035
3036     This checks that the instance is in the cluster.
3037
3038     """
3039     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3040     assert self.instance is not None, \
3041       "Cannot retrieve locked instance %s" % self.op.instance_name
3042
3043   def Exec(self, feedback_fn):
3044     """Remove the instance.
3045
3046     """
3047     instance = self.instance
3048     logging.info("Shutting down instance %s on node %s",
3049                  instance.name, instance.primary_node)
3050
3051     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3052     if result.failed or not result.data:
3053       if self.op.ignore_failures:
3054         feedback_fn("Warning: can't shutdown instance")
3055       else:
3056         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3057                                  (instance.name, instance.primary_node))
3058
3059     logging.info("Removing block devices for instance %s", instance.name)
3060
3061     if not _RemoveDisks(self, instance):
3062       if self.op.ignore_failures:
3063         feedback_fn("Warning: can't remove instance's disks")
3064       else:
3065         raise errors.OpExecError("Can't remove instance's disks")
3066
3067     logging.info("Removing instance %s out of cluster config", instance.name)
3068
3069     self.cfg.RemoveInstance(instance.name)
3070     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3071
3072
3073 class LUQueryInstances(NoHooksLU):
3074   """Logical unit for querying instances.
3075
3076   """
3077   _OP_REQP = ["output_fields", "names"]
3078   REQ_BGL = False
3079   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3080                                     "admin_state", "admin_ram",
3081                                     "disk_template", "ip", "mac", "bridge",
3082                                     "sda_size", "sdb_size", "vcpus", "tags",
3083                                     "network_port", "beparams",
3084                                     "(disk).(size)/([0-9]+)",
3085                                     "(disk).(sizes)",
3086                                     "(nic).(mac|ip|bridge)/([0-9]+)",
3087                                     "(nic).(macs|ips|bridges)",
3088                                     "(disk|nic).(count)",
3089                                     "serial_no", "hypervisor", "hvparams",] +
3090                                   ["hv/%s" % name
3091                                    for name in constants.HVS_PARAMETERS] +
3092                                   ["be/%s" % name
3093                                    for name in constants.BES_PARAMETERS])
3094   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3095
3096
3097   def ExpandNames(self):
3098     _CheckOutputFields(static=self._FIELDS_STATIC,
3099                        dynamic=self._FIELDS_DYNAMIC,
3100                        selected=self.op.output_fields)
3101
3102     self.needed_locks = {}
3103     self.share_locks[locking.LEVEL_INSTANCE] = 1
3104     self.share_locks[locking.LEVEL_NODE] = 1
3105
3106     if self.op.names:
3107       self.wanted = _GetWantedInstances(self, self.op.names)
3108     else:
3109       self.wanted = locking.ALL_SET
3110
3111     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3112     if self.do_locking:
3113       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3114       self.needed_locks[locking.LEVEL_NODE] = []
3115       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3116
3117   def DeclareLocks(self, level):
3118     if level == locking.LEVEL_NODE and self.do_locking:
3119       self._LockInstancesNodes()
3120
3121   def CheckPrereq(self):
3122     """Check prerequisites.
3123
3124     """
3125     pass
3126
3127   def Exec(self, feedback_fn):
3128     """Computes the list of nodes and their attributes.
3129
3130     """
3131     all_info = self.cfg.GetAllInstancesInfo()
3132     if self.wanted == locking.ALL_SET:
3133       # caller didn't specify instance names, so ordering is not important
3134       if self.do_locking:
3135         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3136       else:
3137         instance_names = all_info.keys()
3138       instance_names = utils.NiceSort(instance_names)
3139     else:
3140       # caller did specify names, so we must keep the ordering
3141       if self.do_locking:
3142         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3143       else:
3144         tgt_set = all_info.keys()
3145       missing = set(self.wanted).difference(tgt_set)
3146       if missing:
3147         raise errors.OpExecError("Some instances were removed before"
3148                                  " retrieving their data: %s" % missing)
3149       instance_names = self.wanted
3150
3151     instance_list = [all_info[iname] for iname in instance_names]
3152
3153     # begin data gathering
3154
3155     nodes = frozenset([inst.primary_node for inst in instance_list])
3156     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3157
3158     bad_nodes = []
3159     off_nodes = []
3160     if self.do_locking:
3161       live_data = {}
3162       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3163       for name in nodes:
3164         result = node_data[name]
3165         if result.offline:
3166           # offline nodes will be in both lists
3167           off_nodes.append(name)
3168         if result.failed:
3169           bad_nodes.append(name)
3170         else:
3171           if result.data:
3172             live_data.update(result.data)
3173             # else no instance is alive
3174     else:
3175       live_data = dict([(name, {}) for name in instance_names])
3176
3177     # end data gathering
3178
3179     HVPREFIX = "hv/"
3180     BEPREFIX = "be/"
3181     output = []
3182     for instance in instance_list:
3183       iout = []
3184       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3185       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3186       for field in self.op.output_fields:
3187         st_match = self._FIELDS_STATIC.Matches(field)
3188         if field == "name":
3189           val = instance.name
3190         elif field == "os":
3191           val = instance.os
3192         elif field == "pnode":
3193           val = instance.primary_node
3194         elif field == "snodes":
3195           val = list(instance.secondary_nodes)
3196         elif field == "admin_state":
3197           val = instance.admin_up
3198         elif field == "oper_state":
3199           if instance.primary_node in bad_nodes:
3200             val = None
3201           else:
3202             val = bool(live_data.get(instance.name))
3203         elif field == "status":
3204           if instance.primary_node in off_nodes:
3205             val = "ERROR_nodeoffline"
3206           elif instance.primary_node in bad_nodes:
3207             val = "ERROR_nodedown"
3208           else:
3209             running = bool(live_data.get(instance.name))
3210             if running:
3211               if instance.admin_up:
3212                 val = "running"
3213               else:
3214                 val = "ERROR_up"
3215             else:
3216               if instance.admin_up:
3217                 val = "ERROR_down"
3218               else:
3219                 val = "ADMIN_down"
3220         elif field == "oper_ram":
3221           if instance.primary_node in bad_nodes:
3222             val = None
3223           elif instance.name in live_data:
3224             val = live_data[instance.name].get("memory", "?")
3225           else:
3226             val = "-"
3227         elif field == "disk_template":
3228           val = instance.disk_template
3229         elif field == "ip":
3230           val = instance.nics[0].ip
3231         elif field == "bridge":
3232           val = instance.nics[0].bridge
3233         elif field == "mac":
3234           val = instance.nics[0].mac
3235         elif field == "sda_size" or field == "sdb_size":
3236           idx = ord(field[2]) - ord('a')
3237           try:
3238             val = instance.FindDisk(idx).size
3239           except errors.OpPrereqError:
3240             val = None
3241         elif field == "tags":
3242           val = list(instance.GetTags())
3243         elif field == "serial_no":
3244           val = instance.serial_no
3245         elif field == "network_port":
3246           val = instance.network_port
3247         elif field == "hypervisor":
3248           val = instance.hypervisor
3249         elif field == "hvparams":
3250           val = i_hv
3251         elif (field.startswith(HVPREFIX) and
3252               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3253           val = i_hv.get(field[len(HVPREFIX):], None)
3254         elif field == "beparams":
3255           val = i_be
3256         elif (field.startswith(BEPREFIX) and
3257               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3258           val = i_be.get(field[len(BEPREFIX):], None)
3259         elif st_match and st_match.groups():
3260           # matches a variable list
3261           st_groups = st_match.groups()
3262           if st_groups and st_groups[0] == "disk":
3263             if st_groups[1] == "count":
3264               val = len(instance.disks)
3265             elif st_groups[1] == "sizes":
3266               val = [disk.size for disk in instance.disks]
3267             elif st_groups[1] == "size":
3268               try:
3269                 val = instance.FindDisk(st_groups[2]).size
3270               except errors.OpPrereqError:
3271                 val = None
3272             else:
3273               assert False, "Unhandled disk parameter"
3274           elif st_groups[0] == "nic":
3275             if st_groups[1] == "count":
3276               val = len(instance.nics)
3277             elif st_groups[1] == "macs":
3278               val = [nic.mac for nic in instance.nics]
3279             elif st_groups[1] == "ips":
3280               val = [nic.ip for nic in instance.nics]
3281             elif st_groups[1] == "bridges":
3282               val = [nic.bridge for nic in instance.nics]
3283             else:
3284               # index-based item
3285               nic_idx = int(st_groups[2])
3286               if nic_idx >= len(instance.nics):
3287                 val = None
3288               else:
3289                 if st_groups[1] == "mac":
3290                   val = instance.nics[nic_idx].mac
3291                 elif st_groups[1] == "ip":
3292                   val = instance.nics[nic_idx].ip
3293                 elif st_groups[1] == "bridge":
3294                   val = instance.nics[nic_idx].bridge
3295                 else:
3296                   assert False, "Unhandled NIC parameter"
3297           else:
3298             assert False, "Unhandled variable parameter"
3299         else:
3300           raise errors.ParameterError(field)
3301         iout.append(val)
3302       output.append(iout)
3303
3304     return output
3305
3306
3307 class LUFailoverInstance(LogicalUnit):
3308   """Failover an instance.
3309
3310   """
3311   HPATH = "instance-failover"
3312   HTYPE = constants.HTYPE_INSTANCE
3313   _OP_REQP = ["instance_name", "ignore_consistency"]
3314   REQ_BGL = False
3315
3316   def ExpandNames(self):
3317     self._ExpandAndLockInstance()
3318     self.needed_locks[locking.LEVEL_NODE] = []
3319     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3320
3321   def DeclareLocks(self, level):
3322     if level == locking.LEVEL_NODE:
3323       self._LockInstancesNodes()
3324
3325   def BuildHooksEnv(self):
3326     """Build hooks env.
3327
3328     This runs on master, primary and secondary nodes of the instance.
3329
3330     """
3331     env = {
3332       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3333       }
3334     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3335     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3336     return env, nl, nl
3337
3338   def CheckPrereq(self):
3339     """Check prerequisites.
3340
3341     This checks that the instance is in the cluster.
3342
3343     """
3344     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3345     assert self.instance is not None, \
3346       "Cannot retrieve locked instance %s" % self.op.instance_name
3347
3348     bep = self.cfg.GetClusterInfo().FillBE(instance)
3349     if instance.disk_template not in constants.DTS_NET_MIRROR:
3350       raise errors.OpPrereqError("Instance's disk layout is not"
3351                                  " network mirrored, cannot failover.")
3352
3353     secondary_nodes = instance.secondary_nodes
3354     if not secondary_nodes:
3355       raise errors.ProgrammerError("no secondary node but using "
3356                                    "a mirrored disk template")
3357
3358     target_node = secondary_nodes[0]
3359     _CheckNodeOnline(self, target_node)
3360     # check memory requirements on the secondary node
3361     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3362                          instance.name, bep[constants.BE_MEMORY],
3363                          instance.hypervisor)
3364
3365     # check bridge existance
3366     brlist = [nic.bridge for nic in instance.nics]
3367     result = self.rpc.call_bridges_exist(target_node, brlist)
3368     result.Raise()
3369     if not result.data:
3370       raise errors.OpPrereqError("One or more target bridges %s does not"
3371                                  " exist on destination node '%s'" %
3372                                  (brlist, target_node))
3373
3374   def Exec(self, feedback_fn):
3375     """Failover an instance.
3376
3377     The failover is done by shutting it down on its present node and
3378     starting it on the secondary.
3379
3380     """
3381     instance = self.instance
3382
3383     source_node = instance.primary_node
3384     target_node = instance.secondary_nodes[0]
3385
3386     feedback_fn("* checking disk consistency between source and target")
3387     for dev in instance.disks:
3388       # for drbd, these are drbd over lvm
3389       if not _CheckDiskConsistency(self, dev, target_node, False):
3390         if instance.admin_up and not self.op.ignore_consistency:
3391           raise errors.OpExecError("Disk %s is degraded on target node,"
3392                                    " aborting failover." % dev.iv_name)
3393
3394     feedback_fn("* shutting down instance on source node")
3395     logging.info("Shutting down instance %s on node %s",
3396                  instance.name, source_node)
3397
3398     result = self.rpc.call_instance_shutdown(source_node, instance)
3399     if result.failed or not result.data:
3400       if self.op.ignore_consistency:
3401         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3402                              " Proceeding"
3403                              " anyway. Please make sure node %s is down",
3404                              instance.name, source_node, source_node)
3405       else:
3406         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3407                                  (instance.name, source_node))
3408
3409     feedback_fn("* deactivating the instance's disks on source node")
3410     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3411       raise errors.OpExecError("Can't shut down the instance's disks.")
3412
3413     instance.primary_node = target_node
3414     # distribute new instance config to the other nodes
3415     self.cfg.Update(instance)
3416
3417     # Only start the instance if it's marked as up
3418     if instance.admin_up:
3419       feedback_fn("* activating the instance's disks on target node")
3420       logging.info("Starting instance %s on node %s",
3421                    instance.name, target_node)
3422
3423       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3424                                                ignore_secondaries=True)
3425       if not disks_ok:
3426         _ShutdownInstanceDisks(self, instance)
3427         raise errors.OpExecError("Can't activate the instance's disks")
3428
3429       feedback_fn("* starting the instance on the target node")
3430       result = self.rpc.call_instance_start(target_node, instance, None)
3431       msg = result.RemoteFailMsg()
3432       if msg:
3433         _ShutdownInstanceDisks(self, instance)
3434         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3435                                  (instance.name, target_node, msg))
3436
3437
3438 class LUMigrateInstance(LogicalUnit):
3439   """Migrate an instance.
3440
3441   This is migration without shutting down, compared to the failover,
3442   which is done with shutdown.
3443
3444   """
3445   HPATH = "instance-migrate"
3446   HTYPE = constants.HTYPE_INSTANCE
3447   _OP_REQP = ["instance_name", "live", "cleanup"]
3448
3449   REQ_BGL = False
3450
3451   def ExpandNames(self):
3452     self._ExpandAndLockInstance()
3453     self.needed_locks[locking.LEVEL_NODE] = []
3454     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3455
3456   def DeclareLocks(self, level):
3457     if level == locking.LEVEL_NODE:
3458       self._LockInstancesNodes()
3459
3460   def BuildHooksEnv(self):
3461     """Build hooks env.
3462
3463     This runs on master, primary and secondary nodes of the instance.
3464
3465     """
3466     env = _BuildInstanceHookEnvByObject(self, self.instance)
3467     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3468     return env, nl, nl
3469
3470   def CheckPrereq(self):
3471     """Check prerequisites.
3472
3473     This checks that the instance is in the cluster.
3474
3475     """
3476     instance = self.cfg.GetInstanceInfo(
3477       self.cfg.ExpandInstanceName(self.op.instance_name))
3478     if instance is None:
3479       raise errors.OpPrereqError("Instance '%s' not known" %
3480                                  self.op.instance_name)
3481
3482     if instance.disk_template != constants.DT_DRBD8:
3483       raise errors.OpPrereqError("Instance's disk layout is not"
3484                                  " drbd8, cannot migrate.")
3485
3486     secondary_nodes = instance.secondary_nodes
3487     if not secondary_nodes:
3488       raise errors.ProgrammerError("no secondary node but using "
3489                                    "drbd8 disk template")
3490
3491     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3492
3493     target_node = secondary_nodes[0]
3494     # check memory requirements on the secondary node
3495     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3496                          instance.name, i_be[constants.BE_MEMORY],
3497                          instance.hypervisor)
3498
3499     # check bridge existance
3500     brlist = [nic.bridge for nic in instance.nics]
3501     result = self.rpc.call_bridges_exist(target_node, brlist)
3502     if result.failed or not result.data:
3503       raise errors.OpPrereqError("One or more target bridges %s does not"
3504                                  " exist on destination node '%s'" %
3505                                  (brlist, target_node))
3506
3507     if not self.op.cleanup:
3508       result = self.rpc.call_instance_migratable(instance.primary_node,
3509                                                  instance)
3510       msg = result.RemoteFailMsg()
3511       if msg:
3512         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3513                                    msg)
3514
3515     self.instance = instance
3516
3517   def _WaitUntilSync(self):
3518     """Poll with custom rpc for disk sync.
3519
3520     This uses our own step-based rpc call.
3521
3522     """
3523     self.feedback_fn("* wait until resync is done")
3524     all_done = False
3525     while not all_done:
3526       all_done = True
3527       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3528                                             self.nodes_ip,
3529                                             self.instance.disks)
3530       min_percent = 100
3531       for node, nres in result.items():
3532         msg = nres.RemoteFailMsg()
3533         if msg:
3534           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3535                                    (node, msg))
3536         node_done, node_percent = nres.data[1]
3537         all_done = all_done and node_done
3538         if node_percent is not None:
3539           min_percent = min(min_percent, node_percent)
3540       if not all_done:
3541         if min_percent < 100:
3542           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3543         time.sleep(2)
3544
3545   def _EnsureSecondary(self, node):
3546     """Demote a node to secondary.
3547
3548     """
3549     self.feedback_fn("* switching node %s to secondary mode" % node)
3550
3551     for dev in self.instance.disks:
3552       self.cfg.SetDiskID(dev, node)
3553
3554     result = self.rpc.call_blockdev_close(node, self.instance.name,
3555                                           self.instance.disks)
3556     msg = result.RemoteFailMsg()
3557     if msg:
3558       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3559                                " error %s" % (node, msg))
3560
3561   def _GoStandalone(self):
3562     """Disconnect from the network.
3563
3564     """
3565     self.feedback_fn("* changing into standalone mode")
3566     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3567                                                self.instance.disks)
3568     for node, nres in result.items():
3569       msg = nres.RemoteFailMsg()
3570       if msg:
3571         raise errors.OpExecError("Cannot disconnect disks node %s,"
3572                                  " error %s" % (node, msg))
3573
3574   def _GoReconnect(self, multimaster):
3575     """Reconnect to the network.
3576
3577     """
3578     if multimaster:
3579       msg = "dual-master"
3580     else:
3581       msg = "single-master"
3582     self.feedback_fn("* changing disks into %s mode" % msg)
3583     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3584                                            self.instance.disks,
3585                                            self.instance.name, multimaster)
3586     for node, nres in result.items():
3587       msg = nres.RemoteFailMsg()
3588       if msg:
3589         raise errors.OpExecError("Cannot change disks config on node %s,"
3590                                  " error: %s" % (node, msg))
3591
3592   def _ExecCleanup(self):
3593     """Try to cleanup after a failed migration.
3594
3595     The cleanup is done by:
3596       - check that the instance is running only on one node
3597         (and update the config if needed)
3598       - change disks on its secondary node to secondary
3599       - wait until disks are fully synchronized
3600       - disconnect from the network
3601       - change disks into single-master mode
3602       - wait again until disks are fully synchronized
3603
3604     """
3605     instance = self.instance
3606     target_node = self.target_node
3607     source_node = self.source_node
3608
3609     # check running on only one node
3610     self.feedback_fn("* checking where the instance actually runs"
3611                      " (if this hangs, the hypervisor might be in"
3612                      " a bad state)")
3613     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3614     for node, result in ins_l.items():
3615       result.Raise()
3616       if not isinstance(result.data, list):
3617         raise errors.OpExecError("Can't contact node '%s'" % node)
3618
3619     runningon_source = instance.name in ins_l[source_node].data
3620     runningon_target = instance.name in ins_l[target_node].data
3621
3622     if runningon_source and runningon_target:
3623       raise errors.OpExecError("Instance seems to be running on two nodes,"
3624                                " or the hypervisor is confused. You will have"
3625                                " to ensure manually that it runs only on one"
3626                                " and restart this operation.")
3627
3628     if not (runningon_source or runningon_target):
3629       raise errors.OpExecError("Instance does not seem to be running at all."
3630                                " In this case, it's safer to repair by"
3631                                " running 'gnt-instance stop' to ensure disk"
3632                                " shutdown, and then restarting it.")
3633
3634     if runningon_target:
3635       # the migration has actually succeeded, we need to update the config
3636       self.feedback_fn("* instance running on secondary node (%s),"
3637                        " updating config" % target_node)
3638       instance.primary_node = target_node
3639       self.cfg.Update(instance)
3640       demoted_node = source_node
3641     else:
3642       self.feedback_fn("* instance confirmed to be running on its"
3643                        " primary node (%s)" % source_node)
3644       demoted_node = target_node
3645
3646     self._EnsureSecondary(demoted_node)
3647     try:
3648       self._WaitUntilSync()
3649     except errors.OpExecError:
3650       # we ignore here errors, since if the device is standalone, it
3651       # won't be able to sync
3652       pass
3653     self._GoStandalone()
3654     self._GoReconnect(False)
3655     self._WaitUntilSync()
3656
3657     self.feedback_fn("* done")
3658
3659   def _RevertDiskStatus(self):
3660     """Try to revert the disk status after a failed migration.
3661
3662     """
3663     target_node = self.target_node
3664     try:
3665       self._EnsureSecondary(target_node)
3666       self._GoStandalone()
3667       self._GoReconnect(False)
3668       self._WaitUntilSync()
3669     except errors.OpExecError, err:
3670       self.LogWarning("Migration failed and I can't reconnect the"
3671                       " drives: error '%s'\n"
3672                       "Please look and recover the instance status" %
3673                       str(err))
3674
3675   def _AbortMigration(self):
3676     """Call the hypervisor code to abort a started migration.
3677
3678     """
3679     instance = self.instance
3680     target_node = self.target_node
3681     migration_info = self.migration_info
3682
3683     abort_result = self.rpc.call_finalize_migration(target_node,
3684                                                     instance,
3685                                                     migration_info,
3686                                                     False)
3687     abort_msg = abort_result.RemoteFailMsg()
3688     if abort_msg:
3689       logging.error("Aborting migration failed on target node %s: %s" %
3690                     (target_node, abort_msg))
3691       # Don't raise an exception here, as we stil have to try to revert the
3692       # disk status, even if this step failed.
3693
3694   def _ExecMigration(self):
3695     """Migrate an instance.
3696
3697     The migrate is done by:
3698       - change the disks into dual-master mode
3699       - wait until disks are fully synchronized again
3700       - migrate the instance
3701       - change disks on the new secondary node (the old primary) to secondary
3702       - wait until disks are fully synchronized
3703       - change disks into single-master mode
3704
3705     """
3706     instance = self.instance
3707     target_node = self.target_node
3708     source_node = self.source_node
3709
3710     self.feedback_fn("* checking disk consistency between source and target")
3711     for dev in instance.disks:
3712       if not _CheckDiskConsistency(self, dev, target_node, False):
3713         raise errors.OpExecError("Disk %s is degraded or not fully"
3714                                  " synchronized on target node,"
3715                                  " aborting migrate." % dev.iv_name)
3716
3717     # First get the migration information from the remote node
3718     result = self.rpc.call_migration_info(source_node, instance)
3719     msg = result.RemoteFailMsg()
3720     if msg:
3721       log_err = ("Failed fetching source migration information from %s: %s" %
3722                   (source_node, msg))
3723       logging.error(log_err)
3724       raise errors.OpExecError(log_err)
3725
3726     self.migration_info = migration_info = result.data[1]
3727
3728     # Then switch the disks to master/master mode
3729     self._EnsureSecondary(target_node)
3730     self._GoStandalone()
3731     self._GoReconnect(True)
3732     self._WaitUntilSync()
3733
3734     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3735     result = self.rpc.call_accept_instance(target_node,
3736                                            instance,
3737                                            migration_info,
3738                                            self.nodes_ip[target_node])
3739
3740     msg = result.RemoteFailMsg()
3741     if msg:
3742       logging.error("Instance pre-migration failed, trying to revert"
3743                     " disk status: %s", msg)
3744       self._AbortMigration()
3745       self._RevertDiskStatus()
3746       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3747                                (instance.name, msg))
3748
3749     self.feedback_fn("* migrating instance to %s" % target_node)
3750     time.sleep(10)
3751     result = self.rpc.call_instance_migrate(source_node, instance,
3752                                             self.nodes_ip[target_node],
3753                                             self.op.live)
3754     msg = result.RemoteFailMsg()
3755     if msg:
3756       logging.error("Instance migration failed, trying to revert"
3757                     " disk status: %s", msg)
3758       self._AbortMigration()
3759       self._RevertDiskStatus()
3760       raise errors.OpExecError("Could not migrate instance %s: %s" %
3761                                (instance.name, msg))
3762     time.sleep(10)
3763
3764     instance.primary_node = target_node
3765     # distribute new instance config to the other nodes
3766     self.cfg.Update(instance)
3767
3768     result = self.rpc.call_finalize_migration(target_node,
3769                                               instance,
3770                                               migration_info,
3771                                               True)
3772     msg = result.RemoteFailMsg()
3773     if msg:
3774       logging.error("Instance migration succeeded, but finalization failed:"
3775                     " %s" % msg)
3776       raise errors.OpExecError("Could not finalize instance migration: %s" %
3777                                msg)
3778
3779     self._EnsureSecondary(source_node)
3780     self._WaitUntilSync()
3781     self._GoStandalone()
3782     self._GoReconnect(False)
3783     self._WaitUntilSync()
3784
3785     self.feedback_fn("* done")
3786
3787   def Exec(self, feedback_fn):
3788     """Perform the migration.
3789
3790     """
3791     self.feedback_fn = feedback_fn
3792
3793     self.source_node = self.instance.primary_node
3794     self.target_node = self.instance.secondary_nodes[0]
3795     self.all_nodes = [self.source_node, self.target_node]
3796     self.nodes_ip = {
3797       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3798       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3799       }
3800     if self.op.cleanup:
3801       return self._ExecCleanup()
3802     else:
3803       return self._ExecMigration()
3804
3805
3806 def _CreateBlockDev(lu, node, instance, device, force_create,
3807                     info, force_open):
3808   """Create a tree of block devices on a given node.
3809
3810   If this device type has to be created on secondaries, create it and
3811   all its children.
3812
3813   If not, just recurse to children keeping the same 'force' value.
3814
3815   @param lu: the lu on whose behalf we execute
3816   @param node: the node on which to create the device
3817   @type instance: L{objects.Instance}
3818   @param instance: the instance which owns the device
3819   @type device: L{objects.Disk}
3820   @param device: the device to create
3821   @type force_create: boolean
3822   @param force_create: whether to force creation of this device; this
3823       will be change to True whenever we find a device which has
3824       CreateOnSecondary() attribute
3825   @param info: the extra 'metadata' we should attach to the device
3826       (this will be represented as a LVM tag)
3827   @type force_open: boolean
3828   @param force_open: this parameter will be passes to the
3829       L{backend.CreateBlockDevice} function where it specifies
3830       whether we run on primary or not, and it affects both
3831       the child assembly and the device own Open() execution
3832
3833   """
3834   if device.CreateOnSecondary():
3835     force_create = True
3836
3837   if device.children:
3838     for child in device.children:
3839       _CreateBlockDev(lu, node, instance, child, force_create,
3840                       info, force_open)
3841
3842   if not force_create:
3843     return
3844
3845   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3846
3847
3848 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3849   """Create a single block device on a given node.
3850
3851   This will not recurse over children of the device, so they must be
3852   created in advance.
3853
3854   @param lu: the lu on whose behalf we execute
3855   @param node: the node on which to create the device
3856   @type instance: L{objects.Instance}
3857   @param instance: the instance which owns the device
3858   @type device: L{objects.Disk}
3859   @param device: the device to create
3860   @param info: the extra 'metadata' we should attach to the device
3861       (this will be represented as a LVM tag)
3862   @type force_open: boolean
3863   @param force_open: this parameter will be passes to the
3864       L{backend.CreateBlockDevice} function where it specifies
3865       whether we run on primary or not, and it affects both
3866       the child assembly and the device own Open() execution
3867
3868   """
3869   lu.cfg.SetDiskID(device, node)
3870   result = lu.rpc.call_blockdev_create(node, device, device.size,
3871                                        instance.name, force_open, info)
3872   msg = result.RemoteFailMsg()
3873   if msg:
3874     raise errors.OpExecError("Can't create block device %s on"
3875                              " node %s for instance %s: %s" %
3876                              (device, node, instance.name, msg))
3877   if device.physical_id is None:
3878     device.physical_id = result.data[1]
3879
3880
3881 def _GenerateUniqueNames(lu, exts):
3882   """Generate a suitable LV name.
3883
3884   This will generate a logical volume name for the given instance.
3885
3886   """
3887   results = []
3888   for val in exts:
3889     new_id = lu.cfg.GenerateUniqueID()
3890     results.append("%s%s" % (new_id, val))
3891   return results
3892
3893
3894 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3895                          p_minor, s_minor):
3896   """Generate a drbd8 device complete with its children.
3897
3898   """
3899   port = lu.cfg.AllocatePort()
3900   vgname = lu.cfg.GetVGName()
3901   shared_secret = lu.cfg.GenerateDRBDSecret()
3902   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3903                           logical_id=(vgname, names[0]))
3904   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3905                           logical_id=(vgname, names[1]))
3906   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3907                           logical_id=(primary, secondary, port,
3908                                       p_minor, s_minor,
3909                                       shared_secret),
3910                           children=[dev_data, dev_meta],
3911                           iv_name=iv_name)
3912   return drbd_dev
3913
3914
3915 def _GenerateDiskTemplate(lu, template_name,
3916                           instance_name, primary_node,
3917                           secondary_nodes, disk_info,
3918                           file_storage_dir, file_driver,
3919                           base_index):
3920   """Generate the entire disk layout for a given template type.
3921
3922   """
3923   #TODO: compute space requirements
3924
3925   vgname = lu.cfg.GetVGName()
3926   disk_count = len(disk_info)
3927   disks = []
3928   if template_name == constants.DT_DISKLESS:
3929     pass
3930   elif template_name == constants.DT_PLAIN:
3931     if len(secondary_nodes) != 0:
3932       raise errors.ProgrammerError("Wrong template configuration")
3933
3934     names = _GenerateUniqueNames(lu, [".disk%d" % i
3935                                       for i in range(disk_count)])
3936     for idx, disk in enumerate(disk_info):
3937       disk_index = idx + base_index
3938       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3939                               logical_id=(vgname, names[idx]),
3940                               iv_name="disk/%d" % disk_index,
3941                               mode=disk["mode"])
3942       disks.append(disk_dev)
3943   elif template_name == constants.DT_DRBD8:
3944     if len(secondary_nodes) != 1:
3945       raise errors.ProgrammerError("Wrong template configuration")
3946     remote_node = secondary_nodes[0]
3947     minors = lu.cfg.AllocateDRBDMinor(
3948       [primary_node, remote_node] * len(disk_info), instance_name)
3949
3950     names = []
3951     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
3952                                                for i in range(disk_count)]):
3953       names.append(lv_prefix + "_data")
3954       names.append(lv_prefix + "_meta")
3955     for idx, disk in enumerate(disk_info):
3956       disk_index = idx + base_index
3957       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3958                                       disk["size"], names[idx*2:idx*2+2],
3959                                       "disk/%d" % disk_index,
3960                                       minors[idx*2], minors[idx*2+1])
3961       disk_dev.mode = disk["mode"]
3962       disks.append(disk_dev)
3963   elif template_name == constants.DT_FILE:
3964     if len(secondary_nodes) != 0:
3965       raise errors.ProgrammerError("Wrong template configuration")
3966
3967     for idx, disk in enumerate(disk_info):
3968       disk_index = idx + base_index
3969       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3970                               iv_name="disk/%d" % disk_index,
3971                               logical_id=(file_driver,
3972                                           "%s/disk%d" % (file_storage_dir,
3973                                                          idx)),
3974                               mode=disk["mode"])
3975       disks.append(disk_dev)
3976   else:
3977     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3978   return disks
3979
3980
3981 def _GetInstanceInfoText(instance):
3982   """Compute that text that should be added to the disk's metadata.
3983
3984   """
3985   return "originstname+%s" % instance.name
3986
3987
3988 def _CreateDisks(lu, instance):
3989   """Create all disks for an instance.
3990
3991   This abstracts away some work from AddInstance.
3992
3993   @type lu: L{LogicalUnit}
3994   @param lu: the logical unit on whose behalf we execute
3995   @type instance: L{objects.Instance}
3996   @param instance: the instance whose disks we should create
3997   @rtype: boolean
3998   @return: the success of the creation
3999
4000   """
4001   info = _GetInstanceInfoText(instance)
4002   pnode = instance.primary_node
4003
4004   if instance.disk_template == constants.DT_FILE:
4005     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4006     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4007
4008     if result.failed or not result.data:
4009       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4010
4011     if not result.data[0]:
4012       raise errors.OpExecError("Failed to create directory '%s'" %
4013                                file_storage_dir)
4014
4015   # Note: this needs to be kept in sync with adding of disks in
4016   # LUSetInstanceParams
4017   for device in instance.disks:
4018     logging.info("Creating volume %s for instance %s",
4019                  device.iv_name, instance.name)
4020     #HARDCODE
4021     for node in instance.all_nodes:
4022       f_create = node == pnode
4023       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4024
4025
4026 def _RemoveDisks(lu, instance):
4027   """Remove all disks for an instance.
4028
4029   This abstracts away some work from `AddInstance()` and
4030   `RemoveInstance()`. Note that in case some of the devices couldn't
4031   be removed, the removal will continue with the other ones (compare
4032   with `_CreateDisks()`).
4033
4034   @type lu: L{LogicalUnit}
4035   @param lu: the logical unit on whose behalf we execute
4036   @type instance: L{objects.Instance}
4037   @param instance: the instance whose disks we should remove
4038   @rtype: boolean
4039   @return: the success of the removal
4040
4041   """
4042   logging.info("Removing block devices for instance %s", instance.name)
4043
4044   result = True
4045   for device in instance.disks:
4046     for node, disk in device.ComputeNodeTree(instance.primary_node):
4047       lu.cfg.SetDiskID(disk, node)
4048       result = lu.rpc.call_blockdev_remove(node, disk)
4049       if result.failed or not result.data:
4050         lu.proc.LogWarning("Could not remove block device %s on node %s,"
4051                            " continuing anyway", device.iv_name, node)
4052         result = False
4053
4054   if instance.disk_template == constants.DT_FILE:
4055     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4056     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4057                                                  file_storage_dir)
4058     if result.failed or not result.data:
4059       logging.error("Could not remove directory '%s'", file_storage_dir)
4060       result = False
4061
4062   return result
4063
4064
4065 def _ComputeDiskSize(disk_template, disks):
4066   """Compute disk size requirements in the volume group
4067
4068   """
4069   # Required free disk space as a function of disk and swap space
4070   req_size_dict = {
4071     constants.DT_DISKLESS: None,
4072     constants.DT_PLAIN: sum(d["size"] for d in disks),
4073     # 128 MB are added for drbd metadata for each disk
4074     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4075     constants.DT_FILE: None,
4076   }
4077
4078   if disk_template not in req_size_dict:
4079     raise errors.ProgrammerError("Disk template '%s' size requirement"
4080                                  " is unknown" %  disk_template)
4081
4082   return req_size_dict[disk_template]
4083
4084
4085 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4086   """Hypervisor parameter validation.
4087
4088   This function abstract the hypervisor parameter validation to be
4089   used in both instance create and instance modify.
4090
4091   @type lu: L{LogicalUnit}
4092   @param lu: the logical unit for which we check
4093   @type nodenames: list
4094   @param nodenames: the list of nodes on which we should check
4095   @type hvname: string
4096   @param hvname: the name of the hypervisor we should use
4097   @type hvparams: dict
4098   @param hvparams: the parameters which we need to check
4099   @raise errors.OpPrereqError: if the parameters are not valid
4100
4101   """
4102   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4103                                                   hvname,
4104                                                   hvparams)
4105   for node in nodenames:
4106     info = hvinfo[node]
4107     if info.offline:
4108       continue
4109     info.Raise()
4110     if not info.data or not isinstance(info.data, (tuple, list)):
4111       raise errors.OpPrereqError("Cannot get current information"
4112                                  " from node '%s' (%s)" % (node, info.data))
4113     if not info.data[0]:
4114       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4115                                  " %s" % info.data[1])
4116
4117
4118 class LUCreateInstance(LogicalUnit):
4119   """Create an instance.
4120
4121   """
4122   HPATH = "instance-add"
4123   HTYPE = constants.HTYPE_INSTANCE
4124   _OP_REQP = ["instance_name", "disks", "disk_template",
4125               "mode", "start",
4126               "wait_for_sync", "ip_check", "nics",
4127               "hvparams", "beparams"]
4128   REQ_BGL = False
4129
4130   def _ExpandNode(self, node):
4131     """Expands and checks one node name.
4132
4133     """
4134     node_full = self.cfg.ExpandNodeName(node)
4135     if node_full is None:
4136       raise errors.OpPrereqError("Unknown node %s" % node)
4137     return node_full
4138
4139   def ExpandNames(self):
4140     """ExpandNames for CreateInstance.
4141
4142     Figure out the right locks for instance creation.
4143
4144     """
4145     self.needed_locks = {}
4146
4147     # set optional parameters to none if they don't exist
4148     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4149       if not hasattr(self.op, attr):
4150         setattr(self.op, attr, None)
4151
4152     # cheap checks, mostly valid constants given
4153
4154     # verify creation mode
4155     if self.op.mode not in (constants.INSTANCE_CREATE,
4156                             constants.INSTANCE_IMPORT):
4157       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4158                                  self.op.mode)
4159
4160     # disk template and mirror node verification
4161     if self.op.disk_template not in constants.DISK_TEMPLATES:
4162       raise errors.OpPrereqError("Invalid disk template name")
4163
4164     if self.op.hypervisor is None:
4165       self.op.hypervisor = self.cfg.GetHypervisorType()
4166
4167     cluster = self.cfg.GetClusterInfo()
4168     enabled_hvs = cluster.enabled_hypervisors
4169     if self.op.hypervisor not in enabled_hvs:
4170       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4171                                  " cluster (%s)" % (self.op.hypervisor,
4172                                   ",".join(enabled_hvs)))
4173
4174     # check hypervisor parameter syntax (locally)
4175
4176     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4177                                   self.op.hvparams)
4178     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4179     hv_type.CheckParameterSyntax(filled_hvp)
4180
4181     # fill and remember the beparams dict
4182     utils.CheckBEParams(self.op.beparams)
4183     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4184                                     self.op.beparams)
4185
4186     #### instance parameters check
4187
4188     # instance name verification
4189     hostname1 = utils.HostInfo(self.op.instance_name)
4190     self.op.instance_name = instance_name = hostname1.name
4191
4192     # this is just a preventive check, but someone might still add this
4193     # instance in the meantime, and creation will fail at lock-add time
4194     if instance_name in self.cfg.GetInstanceList():
4195       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4196                                  instance_name)
4197
4198     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4199
4200     # NIC buildup
4201     self.nics = []
4202     for nic in self.op.nics:
4203       # ip validity checks
4204       ip = nic.get("ip", None)
4205       if ip is None or ip.lower() == "none":
4206         nic_ip = None
4207       elif ip.lower() == constants.VALUE_AUTO:
4208         nic_ip = hostname1.ip
4209       else:
4210         if not utils.IsValidIP(ip):
4211           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4212                                      " like a valid IP" % ip)
4213         nic_ip = ip
4214
4215       # MAC address verification
4216       mac = nic.get("mac", constants.VALUE_AUTO)
4217       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4218         if not utils.IsValidMac(mac.lower()):
4219           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4220                                      mac)
4221       # bridge verification
4222       bridge = nic.get("bridge", None)
4223       if bridge is None:
4224         bridge = self.cfg.GetDefBridge()
4225       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4226
4227     # disk checks/pre-build
4228     self.disks = []
4229     for disk in self.op.disks:
4230       mode = disk.get("mode", constants.DISK_RDWR)
4231       if mode not in constants.DISK_ACCESS_SET:
4232         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4233                                    mode)
4234       size = disk.get("size", None)
4235       if size is None:
4236         raise errors.OpPrereqError("Missing disk size")
4237       try:
4238         size = int(size)
4239       except ValueError:
4240         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4241       self.disks.append({"size": size, "mode": mode})
4242
4243     # used in CheckPrereq for ip ping check
4244     self.check_ip = hostname1.ip
4245
4246     # file storage checks
4247     if (self.op.file_driver and
4248         not self.op.file_driver in constants.FILE_DRIVER):
4249       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4250                                  self.op.file_driver)
4251
4252     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4253       raise errors.OpPrereqError("File storage directory path not absolute")
4254
4255     ### Node/iallocator related checks
4256     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4257       raise errors.OpPrereqError("One and only one of iallocator and primary"
4258                                  " node must be given")
4259
4260     if self.op.iallocator:
4261       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4262     else:
4263       self.op.pnode = self._ExpandNode(self.op.pnode)
4264       nodelist = [self.op.pnode]
4265       if self.op.snode is not None:
4266         self.op.snode = self._ExpandNode(self.op.snode)
4267         nodelist.append(self.op.snode)
4268       self.needed_locks[locking.LEVEL_NODE] = nodelist
4269
4270     # in case of import lock the source node too
4271     if self.op.mode == constants.INSTANCE_IMPORT:
4272       src_node = getattr(self.op, "src_node", None)
4273       src_path = getattr(self.op, "src_path", None)
4274
4275       if src_path is None:
4276         self.op.src_path = src_path = self.op.instance_name
4277
4278       if src_node is None:
4279         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4280         self.op.src_node = None
4281         if os.path.isabs(src_path):
4282           raise errors.OpPrereqError("Importing an instance from an absolute"
4283                                      " path requires a source node option.")
4284       else:
4285         self.op.src_node = src_node = self._ExpandNode(src_node)
4286         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4287           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4288         if not os.path.isabs(src_path):
4289           self.op.src_path = src_path = \
4290             os.path.join(constants.EXPORT_DIR, src_path)
4291
4292     else: # INSTANCE_CREATE
4293       if getattr(self.op, "os_type", None) is None:
4294         raise errors.OpPrereqError("No guest OS specified")
4295
4296   def _RunAllocator(self):
4297     """Run the allocator based on input opcode.
4298
4299     """
4300     nics = [n.ToDict() for n in self.nics]
4301     ial = IAllocator(self,
4302                      mode=constants.IALLOCATOR_MODE_ALLOC,
4303                      name=self.op.instance_name,
4304                      disk_template=self.op.disk_template,
4305                      tags=[],
4306                      os=self.op.os_type,
4307                      vcpus=self.be_full[constants.BE_VCPUS],
4308                      mem_size=self.be_full[constants.BE_MEMORY],
4309                      disks=self.disks,
4310                      nics=nics,
4311                      hypervisor=self.op.hypervisor,
4312                      )
4313
4314     ial.Run(self.op.iallocator)
4315
4316     if not ial.success:
4317       raise errors.OpPrereqError("Can't compute nodes using"
4318                                  " iallocator '%s': %s" % (self.op.iallocator,
4319                                                            ial.info))
4320     if len(ial.nodes) != ial.required_nodes:
4321       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4322                                  " of nodes (%s), required %s" %
4323                                  (self.op.iallocator, len(ial.nodes),
4324                                   ial.required_nodes))
4325     self.op.pnode = ial.nodes[0]
4326     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4327                  self.op.instance_name, self.op.iallocator,
4328                  ", ".join(ial.nodes))
4329     if ial.required_nodes == 2:
4330       self.op.snode = ial.nodes[1]
4331
4332   def BuildHooksEnv(self):
4333     """Build hooks env.
4334
4335     This runs on master, primary and secondary nodes of the instance.
4336
4337     """
4338     env = {
4339       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
4340       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
4341       "INSTANCE_ADD_MODE": self.op.mode,
4342       }
4343     if self.op.mode == constants.INSTANCE_IMPORT:
4344       env["INSTANCE_SRC_NODE"] = self.op.src_node
4345       env["INSTANCE_SRC_PATH"] = self.op.src_path
4346       env["INSTANCE_SRC_IMAGES"] = self.src_images
4347
4348     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
4349       primary_node=self.op.pnode,
4350       secondary_nodes=self.secondaries,
4351       status=self.instance_status,
4352       os_type=self.op.os_type,
4353       memory=self.be_full[constants.BE_MEMORY],
4354       vcpus=self.be_full[constants.BE_VCPUS],
4355       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4356     ))
4357
4358     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4359           self.secondaries)
4360     return env, nl, nl
4361
4362
4363   def CheckPrereq(self):
4364     """Check prerequisites.
4365
4366     """
4367     if (not self.cfg.GetVGName() and
4368         self.op.disk_template not in constants.DTS_NOT_LVM):
4369       raise errors.OpPrereqError("Cluster does not support lvm-based"
4370                                  " instances")
4371
4372
4373     if self.op.mode == constants.INSTANCE_IMPORT:
4374       src_node = self.op.src_node
4375       src_path = self.op.src_path
4376
4377       if src_node is None:
4378         exp_list = self.rpc.call_export_list(
4379           self.acquired_locks[locking.LEVEL_NODE])
4380         found = False
4381         for node in exp_list:
4382           if not exp_list[node].failed and src_path in exp_list[node].data:
4383             found = True
4384             self.op.src_node = src_node = node
4385             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4386                                                        src_path)
4387             break
4388         if not found:
4389           raise errors.OpPrereqError("No export found for relative path %s" %
4390                                       src_path)
4391
4392       _CheckNodeOnline(self, src_node)
4393       result = self.rpc.call_export_info(src_node, src_path)
4394       result.Raise()
4395       if not result.data:
4396         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4397
4398       export_info = result.data
4399       if not export_info.has_section(constants.INISECT_EXP):
4400         raise errors.ProgrammerError("Corrupted export config")
4401
4402       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4403       if (int(ei_version) != constants.EXPORT_VERSION):
4404         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4405                                    (ei_version, constants.EXPORT_VERSION))
4406
4407       # Check that the new instance doesn't have less disks than the export
4408       instance_disks = len(self.disks)
4409       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4410       if instance_disks < export_disks:
4411         raise errors.OpPrereqError("Not enough disks to import."
4412                                    " (instance: %d, export: %d)" %
4413                                    (instance_disks, export_disks))
4414
4415       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4416       disk_images = []
4417       for idx in range(export_disks):
4418         option = 'disk%d_dump' % idx
4419         if export_info.has_option(constants.INISECT_INS, option):
4420           # FIXME: are the old os-es, disk sizes, etc. useful?
4421           export_name = export_info.get(constants.INISECT_INS, option)
4422           image = os.path.join(src_path, export_name)
4423           disk_images.append(image)
4424         else:
4425           disk_images.append(False)
4426
4427       self.src_images = disk_images
4428
4429       old_name = export_info.get(constants.INISECT_INS, 'name')
4430       # FIXME: int() here could throw a ValueError on broken exports
4431       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4432       if self.op.instance_name == old_name:
4433         for idx, nic in enumerate(self.nics):
4434           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4435             nic_mac_ini = 'nic%d_mac' % idx
4436             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4437
4438     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4439     if self.op.start and not self.op.ip_check:
4440       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4441                                  " adding an instance in start mode")
4442
4443     if self.op.ip_check:
4444       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4445         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4446                                    (self.check_ip, self.op.instance_name))
4447
4448     #### allocator run
4449
4450     if self.op.iallocator is not None:
4451       self._RunAllocator()
4452
4453     #### node related checks
4454
4455     # check primary node
4456     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4457     assert self.pnode is not None, \
4458       "Cannot retrieve locked node %s" % self.op.pnode
4459     if pnode.offline:
4460       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4461                                  pnode.name)
4462
4463     self.secondaries = []
4464
4465     # mirror node verification
4466     if self.op.disk_template in constants.DTS_NET_MIRROR:
4467       if self.op.snode is None:
4468         raise errors.OpPrereqError("The networked disk templates need"
4469                                    " a mirror node")
4470       if self.op.snode == pnode.name:
4471         raise errors.OpPrereqError("The secondary node cannot be"
4472                                    " the primary node.")
4473       self.secondaries.append(self.op.snode)
4474       _CheckNodeOnline(self, self.op.snode)
4475
4476     nodenames = [pnode.name] + self.secondaries
4477
4478     req_size = _ComputeDiskSize(self.op.disk_template,
4479                                 self.disks)
4480
4481     # Check lv size requirements
4482     if req_size is not None:
4483       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4484                                          self.op.hypervisor)
4485       for node in nodenames:
4486         info = nodeinfo[node]
4487         info.Raise()
4488         info = info.data
4489         if not info:
4490           raise errors.OpPrereqError("Cannot get current information"
4491                                      " from node '%s'" % node)
4492         vg_free = info.get('vg_free', None)
4493         if not isinstance(vg_free, int):
4494           raise errors.OpPrereqError("Can't compute free disk space on"
4495                                      " node %s" % node)
4496         if req_size > info['vg_free']:
4497           raise errors.OpPrereqError("Not enough disk space on target node %s."
4498                                      " %d MB available, %d MB required" %
4499                                      (node, info['vg_free'], req_size))
4500
4501     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4502
4503     # os verification
4504     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4505     result.Raise()
4506     if not isinstance(result.data, objects.OS):
4507       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4508                                  " primary node"  % self.op.os_type)
4509
4510     # bridge check on primary node
4511     bridges = [n.bridge for n in self.nics]
4512     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4513     result.Raise()
4514     if not result.data:
4515       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4516                                  " exist on destination node '%s'" %
4517                                  (",".join(bridges), pnode.name))
4518
4519     # memory check on primary node
4520     if self.op.start:
4521       _CheckNodeFreeMemory(self, self.pnode.name,
4522                            "creating instance %s" % self.op.instance_name,
4523                            self.be_full[constants.BE_MEMORY],
4524                            self.op.hypervisor)
4525
4526     self.instance_status = self.op.start
4527
4528   def Exec(self, feedback_fn):
4529     """Create and add the instance to the cluster.
4530
4531     """
4532     instance = self.op.instance_name
4533     pnode_name = self.pnode.name
4534
4535     for nic in self.nics:
4536       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4537         nic.mac = self.cfg.GenerateMAC()
4538
4539     ht_kind = self.op.hypervisor
4540     if ht_kind in constants.HTS_REQ_PORT:
4541       network_port = self.cfg.AllocatePort()
4542     else:
4543       network_port = None
4544
4545     ##if self.op.vnc_bind_address is None:
4546     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4547
4548     # this is needed because os.path.join does not accept None arguments
4549     if self.op.file_storage_dir is None:
4550       string_file_storage_dir = ""
4551     else:
4552       string_file_storage_dir = self.op.file_storage_dir
4553
4554     # build the full file storage dir path
4555     file_storage_dir = os.path.normpath(os.path.join(
4556                                         self.cfg.GetFileStorageDir(),
4557                                         string_file_storage_dir, instance))
4558
4559
4560     disks = _GenerateDiskTemplate(self,
4561                                   self.op.disk_template,
4562                                   instance, pnode_name,
4563                                   self.secondaries,
4564                                   self.disks,
4565                                   file_storage_dir,
4566                                   self.op.file_driver,
4567                                   0)
4568
4569     iobj = objects.Instance(name=instance, os=self.op.os_type,
4570                             primary_node=pnode_name,
4571                             nics=self.nics, disks=disks,
4572                             disk_template=self.op.disk_template,
4573                             admin_up=self.instance_status,
4574                             network_port=network_port,
4575                             beparams=self.op.beparams,
4576                             hvparams=self.op.hvparams,
4577                             hypervisor=self.op.hypervisor,
4578                             )
4579
4580     feedback_fn("* creating instance disks...")
4581     try:
4582       _CreateDisks(self, iobj)
4583     except errors.OpExecError:
4584       self.LogWarning("Device creation failed, reverting...")
4585       try:
4586         _RemoveDisks(self, iobj)
4587       finally:
4588         self.cfg.ReleaseDRBDMinors(instance)
4589         raise
4590
4591     feedback_fn("adding instance %s to cluster config" % instance)
4592
4593     self.cfg.AddInstance(iobj)
4594     # Declare that we don't want to remove the instance lock anymore, as we've
4595     # added the instance to the config
4596     del self.remove_locks[locking.LEVEL_INSTANCE]
4597     # Unlock all the nodes
4598     if self.op.mode == constants.INSTANCE_IMPORT:
4599       nodes_keep = [self.op.src_node]
4600       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4601                        if node != self.op.src_node]
4602       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4603       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4604     else:
4605       self.context.glm.release(locking.LEVEL_NODE)
4606       del self.acquired_locks[locking.LEVEL_NODE]
4607
4608     if self.op.wait_for_sync:
4609       disk_abort = not _WaitForSync(self, iobj)
4610     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4611       # make sure the disks are not degraded (still sync-ing is ok)
4612       time.sleep(15)
4613       feedback_fn("* checking mirrors status")
4614       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4615     else:
4616       disk_abort = False
4617
4618     if disk_abort:
4619       _RemoveDisks(self, iobj)
4620       self.cfg.RemoveInstance(iobj.name)
4621       # Make sure the instance lock gets removed
4622       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4623       raise errors.OpExecError("There are some degraded disks for"
4624                                " this instance")
4625
4626     feedback_fn("creating os for instance %s on node %s" %
4627                 (instance, pnode_name))
4628
4629     if iobj.disk_template != constants.DT_DISKLESS:
4630       if self.op.mode == constants.INSTANCE_CREATE:
4631         feedback_fn("* running the instance OS create scripts...")
4632         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4633         msg = result.RemoteFailMsg()
4634         if msg:
4635           raise errors.OpExecError("Could not add os for instance %s"
4636                                    " on node %s: %s" %
4637                                    (instance, pnode_name, msg))
4638
4639       elif self.op.mode == constants.INSTANCE_IMPORT:
4640         feedback_fn("* running the instance OS import scripts...")
4641         src_node = self.op.src_node
4642         src_images = self.src_images
4643         cluster_name = self.cfg.GetClusterName()
4644         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4645                                                          src_node, src_images,
4646                                                          cluster_name)
4647         import_result.Raise()
4648         for idx, result in enumerate(import_result.data):
4649           if not result:
4650             self.LogWarning("Could not import the image %s for instance"
4651                             " %s, disk %d, on node %s" %
4652                             (src_images[idx], instance, idx, pnode_name))
4653       else:
4654         # also checked in the prereq part
4655         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4656                                      % self.op.mode)
4657
4658     if self.op.start:
4659       logging.info("Starting instance %s on node %s", instance, pnode_name)
4660       feedback_fn("* starting instance...")
4661       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4662       msg = result.RemoteFailMsg()
4663       if msg:
4664         raise errors.OpExecError("Could not start instance: %s" % msg)
4665
4666
4667 class LUConnectConsole(NoHooksLU):
4668   """Connect to an instance's console.
4669
4670   This is somewhat special in that it returns the command line that
4671   you need to run on the master node in order to connect to the
4672   console.
4673
4674   """
4675   _OP_REQP = ["instance_name"]
4676   REQ_BGL = False
4677
4678   def ExpandNames(self):
4679     self._ExpandAndLockInstance()
4680
4681   def CheckPrereq(self):
4682     """Check prerequisites.
4683
4684     This checks that the instance is in the cluster.
4685
4686     """
4687     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4688     assert self.instance is not None, \
4689       "Cannot retrieve locked instance %s" % self.op.instance_name
4690     _CheckNodeOnline(self, self.instance.primary_node)
4691
4692   def Exec(self, feedback_fn):
4693     """Connect to the console of an instance
4694
4695     """
4696     instance = self.instance
4697     node = instance.primary_node
4698
4699     node_insts = self.rpc.call_instance_list([node],
4700                                              [instance.hypervisor])[node]
4701     node_insts.Raise()
4702
4703     if instance.name not in node_insts.data:
4704       raise errors.OpExecError("Instance %s is not running." % instance.name)
4705
4706     logging.debug("Connecting to console of %s on %s", instance.name, node)
4707
4708     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4709     cluster = self.cfg.GetClusterInfo()
4710     # beparams and hvparams are passed separately, to avoid editing the
4711     # instance and then saving the defaults in the instance itself.
4712     hvparams = cluster.FillHV(instance)
4713     beparams = cluster.FillBE(instance)
4714     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4715
4716     # build ssh cmdline
4717     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4718
4719
4720 class LUReplaceDisks(LogicalUnit):
4721   """Replace the disks of an instance.
4722
4723   """
4724   HPATH = "mirrors-replace"
4725   HTYPE = constants.HTYPE_INSTANCE
4726   _OP_REQP = ["instance_name", "mode", "disks"]
4727   REQ_BGL = False
4728
4729   def CheckArguments(self):
4730     if not hasattr(self.op, "remote_node"):
4731       self.op.remote_node = None
4732     if not hasattr(self.op, "iallocator"):
4733       self.op.iallocator = None
4734
4735     # check for valid parameter combination
4736     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4737     if self.op.mode == constants.REPLACE_DISK_CHG:
4738       if cnt == 2:
4739         raise errors.OpPrereqError("When changing the secondary either an"
4740                                    " iallocator script must be used or the"
4741                                    " new node given")
4742       elif cnt == 0:
4743         raise errors.OpPrereqError("Give either the iallocator or the new"
4744                                    " secondary, not both")
4745     else: # not replacing the secondary
4746       if cnt != 2:
4747         raise errors.OpPrereqError("The iallocator and new node options can"
4748                                    " be used only when changing the"
4749                                    " secondary node")
4750
4751   def ExpandNames(self):
4752     self._ExpandAndLockInstance()
4753
4754     if self.op.iallocator is not None:
4755       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4756     elif self.op.remote_node is not None:
4757       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4758       if remote_node is None:
4759         raise errors.OpPrereqError("Node '%s' not known" %
4760                                    self.op.remote_node)
4761       self.op.remote_node = remote_node
4762       # Warning: do not remove the locking of the new secondary here
4763       # unless DRBD8.AddChildren is changed to work in parallel;
4764       # currently it doesn't since parallel invocations of
4765       # FindUnusedMinor will conflict
4766       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4767       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4768     else:
4769       self.needed_locks[locking.LEVEL_NODE] = []
4770       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4771
4772   def DeclareLocks(self, level):
4773     # If we're not already locking all nodes in the set we have to declare the
4774     # instance's primary/secondary nodes.
4775     if (level == locking.LEVEL_NODE and
4776         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4777       self._LockInstancesNodes()
4778
4779   def _RunAllocator(self):
4780     """Compute a new secondary node using an IAllocator.
4781
4782     """
4783     ial = IAllocator(self,
4784                      mode=constants.IALLOCATOR_MODE_RELOC,
4785                      name=self.op.instance_name,
4786                      relocate_from=[self.sec_node])
4787
4788     ial.Run(self.op.iallocator)
4789
4790     if not ial.success:
4791       raise errors.OpPrereqError("Can't compute nodes using"
4792                                  " iallocator '%s': %s" % (self.op.iallocator,
4793                                                            ial.info))
4794     if len(ial.nodes) != ial.required_nodes:
4795       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4796                                  " of nodes (%s), required %s" %
4797                                  (len(ial.nodes), ial.required_nodes))
4798     self.op.remote_node = ial.nodes[0]
4799     self.LogInfo("Selected new secondary for the instance: %s",
4800                  self.op.remote_node)
4801
4802   def BuildHooksEnv(self):
4803     """Build hooks env.
4804
4805     This runs on the master, the primary and all the secondaries.
4806
4807     """
4808     env = {
4809       "MODE": self.op.mode,
4810       "NEW_SECONDARY": self.op.remote_node,
4811       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4812       }
4813     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4814     nl = [
4815       self.cfg.GetMasterNode(),
4816       self.instance.primary_node,
4817       ]
4818     if self.op.remote_node is not None:
4819       nl.append(self.op.remote_node)
4820     return env, nl, nl
4821
4822   def CheckPrereq(self):
4823     """Check prerequisites.
4824
4825     This checks that the instance is in the cluster.
4826
4827     """
4828     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4829     assert instance is not None, \
4830       "Cannot retrieve locked instance %s" % self.op.instance_name
4831     self.instance = instance
4832
4833     if instance.disk_template != constants.DT_DRBD8:
4834       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4835                                  " instances")
4836
4837     if len(instance.secondary_nodes) != 1:
4838       raise errors.OpPrereqError("The instance has a strange layout,"
4839                                  " expected one secondary but found %d" %
4840                                  len(instance.secondary_nodes))
4841
4842     self.sec_node = instance.secondary_nodes[0]
4843
4844     if self.op.iallocator is not None:
4845       self._RunAllocator()
4846
4847     remote_node = self.op.remote_node
4848     if remote_node is not None:
4849       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4850       assert self.remote_node_info is not None, \
4851         "Cannot retrieve locked node %s" % remote_node
4852     else:
4853       self.remote_node_info = None
4854     if remote_node == instance.primary_node:
4855       raise errors.OpPrereqError("The specified node is the primary node of"
4856                                  " the instance.")
4857     elif remote_node == self.sec_node:
4858       raise errors.OpPrereqError("The specified node is already the"
4859                                  " secondary node of the instance.")
4860
4861     if self.op.mode == constants.REPLACE_DISK_PRI:
4862       n1 = self.tgt_node = instance.primary_node
4863       n2 = self.oth_node = self.sec_node
4864     elif self.op.mode == constants.REPLACE_DISK_SEC:
4865       n1 = self.tgt_node = self.sec_node
4866       n2 = self.oth_node = instance.primary_node
4867     elif self.op.mode == constants.REPLACE_DISK_CHG:
4868       n1 = self.new_node = remote_node
4869       n2 = self.oth_node = instance.primary_node
4870       self.tgt_node = self.sec_node
4871     else:
4872       raise errors.ProgrammerError("Unhandled disk replace mode")
4873
4874     _CheckNodeOnline(self, n1)
4875     _CheckNodeOnline(self, n2)
4876
4877     if not self.op.disks:
4878       self.op.disks = range(len(instance.disks))
4879
4880     for disk_idx in self.op.disks:
4881       instance.FindDisk(disk_idx)
4882
4883   def _ExecD8DiskOnly(self, feedback_fn):
4884     """Replace a disk on the primary or secondary for dbrd8.
4885
4886     The algorithm for replace is quite complicated:
4887
4888       1. for each disk to be replaced:
4889
4890         1. create new LVs on the target node with unique names
4891         1. detach old LVs from the drbd device
4892         1. rename old LVs to name_replaced.<time_t>
4893         1. rename new LVs to old LVs
4894         1. attach the new LVs (with the old names now) to the drbd device
4895
4896       1. wait for sync across all devices
4897
4898       1. for each modified disk:
4899
4900         1. remove old LVs (which have the name name_replaces.<time_t>)
4901
4902     Failures are not very well handled.
4903
4904     """
4905     steps_total = 6
4906     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4907     instance = self.instance
4908     iv_names = {}
4909     vgname = self.cfg.GetVGName()
4910     # start of work
4911     cfg = self.cfg
4912     tgt_node = self.tgt_node
4913     oth_node = self.oth_node
4914
4915     # Step: check device activation
4916     self.proc.LogStep(1, steps_total, "check device existence")
4917     info("checking volume groups")
4918     my_vg = cfg.GetVGName()
4919     results = self.rpc.call_vg_list([oth_node, tgt_node])
4920     if not results:
4921       raise errors.OpExecError("Can't list volume groups on the nodes")
4922     for node in oth_node, tgt_node:
4923       res = results[node]
4924       if res.failed or not res.data or my_vg not in res.data:
4925         raise errors.OpExecError("Volume group '%s' not found on %s" %
4926                                  (my_vg, node))
4927     for idx, dev in enumerate(instance.disks):
4928       if idx not in self.op.disks:
4929         continue
4930       for node in tgt_node, oth_node:
4931         info("checking disk/%d on %s" % (idx, node))
4932         cfg.SetDiskID(dev, node)
4933         if not self.rpc.call_blockdev_find(node, dev):
4934           raise errors.OpExecError("Can't find disk/%d on node %s" %
4935                                    (idx, node))
4936
4937     # Step: check other node consistency
4938     self.proc.LogStep(2, steps_total, "check peer consistency")
4939     for idx, dev in enumerate(instance.disks):
4940       if idx not in self.op.disks:
4941         continue
4942       info("checking disk/%d consistency on %s" % (idx, oth_node))
4943       if not _CheckDiskConsistency(self, dev, oth_node,
4944                                    oth_node==instance.primary_node):
4945         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4946                                  " to replace disks on this node (%s)" %
4947                                  (oth_node, tgt_node))
4948
4949     # Step: create new storage
4950     self.proc.LogStep(3, steps_total, "allocate new storage")
4951     for idx, dev in enumerate(instance.disks):
4952       if idx not in self.op.disks:
4953         continue
4954       size = dev.size
4955       cfg.SetDiskID(dev, tgt_node)
4956       lv_names = [".disk%d_%s" % (idx, suf)
4957                   for suf in ["data", "meta"]]
4958       names = _GenerateUniqueNames(self, lv_names)
4959       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4960                              logical_id=(vgname, names[0]))
4961       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4962                              logical_id=(vgname, names[1]))
4963       new_lvs = [lv_data, lv_meta]
4964       old_lvs = dev.children
4965       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4966       info("creating new local storage on %s for %s" %
4967            (tgt_node, dev.iv_name))
4968       # we pass force_create=True to force the LVM creation
4969       for new_lv in new_lvs:
4970         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
4971                         _GetInstanceInfoText(instance), False)
4972
4973     # Step: for each lv, detach+rename*2+attach
4974     self.proc.LogStep(4, steps_total, "change drbd configuration")
4975     for dev, old_lvs, new_lvs in iv_names.itervalues():
4976       info("detaching %s drbd from local storage" % dev.iv_name)
4977       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4978       result.Raise()
4979       if not result.data:
4980         raise errors.OpExecError("Can't detach drbd from local storage on node"
4981                                  " %s for device %s" % (tgt_node, dev.iv_name))
4982       #dev.children = []
4983       #cfg.Update(instance)
4984
4985       # ok, we created the new LVs, so now we know we have the needed
4986       # storage; as such, we proceed on the target node to rename
4987       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4988       # using the assumption that logical_id == physical_id (which in
4989       # turn is the unique_id on that node)
4990
4991       # FIXME(iustin): use a better name for the replaced LVs
4992       temp_suffix = int(time.time())
4993       ren_fn = lambda d, suff: (d.physical_id[0],
4994                                 d.physical_id[1] + "_replaced-%s" % suff)
4995       # build the rename list based on what LVs exist on the node
4996       rlist = []
4997       for to_ren in old_lvs:
4998         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4999         if not find_res.failed and find_res.data is not None: # device exists
5000           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5001
5002       info("renaming the old LVs on the target node")
5003       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5004       result.Raise()
5005       if not result.data:
5006         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5007       # now we rename the new LVs to the old LVs
5008       info("renaming the new LVs on the target node")
5009       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5010       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5011       result.Raise()
5012       if not result.data:
5013         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5014
5015       for old, new in zip(old_lvs, new_lvs):
5016         new.logical_id = old.logical_id
5017         cfg.SetDiskID(new, tgt_node)
5018
5019       for disk in old_lvs:
5020         disk.logical_id = ren_fn(disk, temp_suffix)
5021         cfg.SetDiskID(disk, tgt_node)
5022
5023       # now that the new lvs have the old name, we can add them to the device
5024       info("adding new mirror component on %s" % tgt_node)
5025       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5026       if result.failed or not result.data:
5027         for new_lv in new_lvs:
5028           result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
5029           if result.failed or not result.data:
5030             warning("Can't rollback device %s", hint="manually cleanup unused"
5031                     " logical volumes")
5032         raise errors.OpExecError("Can't add local storage to drbd")
5033
5034       dev.children = new_lvs
5035       cfg.Update(instance)
5036
5037     # Step: wait for sync
5038
5039     # this can fail as the old devices are degraded and _WaitForSync
5040     # does a combined result over all disks, so we don't check its
5041     # return value
5042     self.proc.LogStep(5, steps_total, "sync devices")
5043     _WaitForSync(self, instance, unlock=True)
5044
5045     # so check manually all the devices
5046     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5047       cfg.SetDiskID(dev, instance.primary_node)
5048       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5049       if result.failed or result.data[5]:
5050         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5051
5052     # Step: remove old storage
5053     self.proc.LogStep(6, steps_total, "removing old storage")
5054     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5055       info("remove logical volumes for %s" % name)
5056       for lv in old_lvs:
5057         cfg.SetDiskID(lv, tgt_node)
5058         result = self.rpc.call_blockdev_remove(tgt_node, lv)
5059         if result.failed or not result.data:
5060           warning("Can't remove old LV", hint="manually remove unused LVs")
5061           continue
5062
5063   def _ExecD8Secondary(self, feedback_fn):
5064     """Replace the secondary node for drbd8.
5065
5066     The algorithm for replace is quite complicated:
5067       - for all disks of the instance:
5068         - create new LVs on the new node with same names
5069         - shutdown the drbd device on the old secondary
5070         - disconnect the drbd network on the primary
5071         - create the drbd device on the new secondary
5072         - network attach the drbd on the primary, using an artifice:
5073           the drbd code for Attach() will connect to the network if it
5074           finds a device which is connected to the good local disks but
5075           not network enabled
5076       - wait for sync across all devices
5077       - remove all disks from the old secondary
5078
5079     Failures are not very well handled.
5080
5081     """
5082     steps_total = 6
5083     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5084     instance = self.instance
5085     iv_names = {}
5086     # start of work
5087     cfg = self.cfg
5088     old_node = self.tgt_node
5089     new_node = self.new_node
5090     pri_node = instance.primary_node
5091     nodes_ip = {
5092       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5093       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5094       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5095       }
5096
5097     # Step: check device activation
5098     self.proc.LogStep(1, steps_total, "check device existence")
5099     info("checking volume groups")
5100     my_vg = cfg.GetVGName()
5101     results = self.rpc.call_vg_list([pri_node, new_node])
5102     for node in pri_node, new_node:
5103       res = results[node]
5104       if res.failed or not res.data or my_vg not in res.data:
5105         raise errors.OpExecError("Volume group '%s' not found on %s" %
5106                                  (my_vg, node))
5107     for idx, dev in enumerate(instance.disks):
5108       if idx not in self.op.disks:
5109         continue
5110       info("checking disk/%d on %s" % (idx, pri_node))
5111       cfg.SetDiskID(dev, pri_node)
5112       result = self.rpc.call_blockdev_find(pri_node, dev)
5113       result.Raise()
5114       if not result.data:
5115         raise errors.OpExecError("Can't find disk/%d on node %s" %
5116                                  (idx, pri_node))
5117
5118     # Step: check other node consistency
5119     self.proc.LogStep(2, steps_total, "check peer consistency")
5120     for idx, dev in enumerate(instance.disks):
5121       if idx not in self.op.disks:
5122         continue
5123       info("checking disk/%d consistency on %s" % (idx, pri_node))
5124       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5125         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5126                                  " unsafe to replace the secondary" %
5127                                  pri_node)
5128
5129     # Step: create new storage
5130     self.proc.LogStep(3, steps_total, "allocate new storage")
5131     for idx, dev in enumerate(instance.disks):
5132       info("adding new local storage on %s for disk/%d" %
5133            (new_node, idx))
5134       # we pass force_create=True to force LVM creation
5135       for new_lv in dev.children:
5136         _CreateBlockDev(self, new_node, instance, new_lv, True,
5137                         _GetInstanceInfoText(instance), False)
5138
5139     # Step 4: dbrd minors and drbd setups changes
5140     # after this, we must manually remove the drbd minors on both the
5141     # error and the success paths
5142     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5143                                    instance.name)
5144     logging.debug("Allocated minors %s" % (minors,))
5145     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5146     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5147       size = dev.size
5148       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5149       # create new devices on new_node; note that we create two IDs:
5150       # one without port, so the drbd will be activated without
5151       # networking information on the new node at this stage, and one
5152       # with network, for the latter activation in step 4
5153       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5154       if pri_node == o_node1:
5155         p_minor = o_minor1
5156       else:
5157         p_minor = o_minor2
5158
5159       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5160       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5161
5162       iv_names[idx] = (dev, dev.children, new_net_id)
5163       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5164                     new_net_id)
5165       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5166                               logical_id=new_alone_id,
5167                               children=dev.children)
5168       try:
5169         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5170                               _GetInstanceInfoText(instance), False)
5171       except errors.BlockDeviceError:
5172         self.cfg.ReleaseDRBDMinors(instance.name)
5173         raise
5174
5175     for idx, dev in enumerate(instance.disks):
5176       # we have new devices, shutdown the drbd on the old secondary
5177       info("shutting down drbd for disk/%d on old node" % idx)
5178       cfg.SetDiskID(dev, old_node)
5179       result = self.rpc.call_blockdev_shutdown(old_node, dev)
5180       if result.failed or not result.data:
5181         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
5182                 hint="Please cleanup this device manually as soon as possible")
5183
5184     info("detaching primary drbds from the network (=> standalone)")
5185     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5186                                                instance.disks)[pri_node]
5187
5188     msg = result.RemoteFailMsg()
5189     if msg:
5190       # detaches didn't succeed (unlikely)
5191       self.cfg.ReleaseDRBDMinors(instance.name)
5192       raise errors.OpExecError("Can't detach the disks from the network on"
5193                                " old node: %s" % (msg,))
5194
5195     # if we managed to detach at least one, we update all the disks of
5196     # the instance to point to the new secondary
5197     info("updating instance configuration")
5198     for dev, _, new_logical_id in iv_names.itervalues():
5199       dev.logical_id = new_logical_id
5200       cfg.SetDiskID(dev, pri_node)
5201     cfg.Update(instance)
5202
5203     # and now perform the drbd attach
5204     info("attaching primary drbds to new secondary (standalone => connected)")
5205     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5206                                            instance.disks, instance.name,
5207                                            False)
5208     for to_node, to_result in result.items():
5209       msg = to_result.RemoteFailMsg()
5210       if msg:
5211         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5212                 hint="please do a gnt-instance info to see the"
5213                 " status of disks")
5214
5215     # this can fail as the old devices are degraded and _WaitForSync
5216     # does a combined result over all disks, so we don't check its
5217     # return value
5218     self.proc.LogStep(5, steps_total, "sync devices")
5219     _WaitForSync(self, instance, unlock=True)
5220
5221     # so check manually all the devices
5222     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5223       cfg.SetDiskID(dev, pri_node)
5224       result = self.rpc.call_blockdev_find(pri_node, dev)
5225       result.Raise()
5226       if result.data[5]:
5227         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5228
5229     self.proc.LogStep(6, steps_total, "removing old storage")
5230     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5231       info("remove logical volumes for disk/%d" % idx)
5232       for lv in old_lvs:
5233         cfg.SetDiskID(lv, old_node)
5234         result = self.rpc.call_blockdev_remove(old_node, lv)
5235         if result.failed or not result.data:
5236           warning("Can't remove LV on old secondary",
5237                   hint="Cleanup stale volumes by hand")
5238
5239   def Exec(self, feedback_fn):
5240     """Execute disk replacement.
5241
5242     This dispatches the disk replacement to the appropriate handler.
5243
5244     """
5245     instance = self.instance
5246
5247     # Activate the instance disks if we're replacing them on a down instance
5248     if not instance.admin_up:
5249       _StartInstanceDisks(self, instance, True)
5250
5251     if self.op.mode == constants.REPLACE_DISK_CHG:
5252       fn = self._ExecD8Secondary
5253     else:
5254       fn = self._ExecD8DiskOnly
5255
5256     ret = fn(feedback_fn)
5257
5258     # Deactivate the instance disks if we're replacing them on a down instance
5259     if not instance.admin_up:
5260       _SafeShutdownInstanceDisks(self, instance)
5261
5262     return ret
5263
5264
5265 class LUGrowDisk(LogicalUnit):
5266   """Grow a disk of an instance.
5267
5268   """
5269   HPATH = "disk-grow"
5270   HTYPE = constants.HTYPE_INSTANCE
5271   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5272   REQ_BGL = False
5273
5274   def ExpandNames(self):
5275     self._ExpandAndLockInstance()
5276     self.needed_locks[locking.LEVEL_NODE] = []
5277     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5278
5279   def DeclareLocks(self, level):
5280     if level == locking.LEVEL_NODE:
5281       self._LockInstancesNodes()
5282
5283   def BuildHooksEnv(self):
5284     """Build hooks env.
5285
5286     This runs on the master, the primary and all the secondaries.
5287
5288     """
5289     env = {
5290       "DISK": self.op.disk,
5291       "AMOUNT": self.op.amount,
5292       }
5293     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5294     nl = [
5295       self.cfg.GetMasterNode(),
5296       self.instance.primary_node,
5297       ]
5298     return env, nl, nl
5299
5300   def CheckPrereq(self):
5301     """Check prerequisites.
5302
5303     This checks that the instance is in the cluster.
5304
5305     """
5306     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5307     assert instance is not None, \
5308       "Cannot retrieve locked instance %s" % self.op.instance_name
5309     nodenames = list(instance.all_nodes)
5310     for node in nodenames:
5311       _CheckNodeOnline(self, node)
5312
5313
5314     self.instance = instance
5315
5316     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5317       raise errors.OpPrereqError("Instance's disk layout does not support"
5318                                  " growing.")
5319
5320     self.disk = instance.FindDisk(self.op.disk)
5321
5322     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5323                                        instance.hypervisor)
5324     for node in nodenames:
5325       info = nodeinfo[node]
5326       if info.failed or not info.data:
5327         raise errors.OpPrereqError("Cannot get current information"
5328                                    " from node '%s'" % node)
5329       vg_free = info.data.get('vg_free', None)
5330       if not isinstance(vg_free, int):
5331         raise errors.OpPrereqError("Can't compute free disk space on"
5332                                    " node %s" % node)
5333       if self.op.amount > vg_free:
5334         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5335                                    " %d MiB available, %d MiB required" %
5336                                    (node, vg_free, self.op.amount))
5337
5338   def Exec(self, feedback_fn):
5339     """Execute disk grow.
5340
5341     """
5342     instance = self.instance
5343     disk = self.disk
5344     for node in instance.all_nodes:
5345       self.cfg.SetDiskID(disk, node)
5346       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5347       result.Raise()
5348       if (not result.data or not isinstance(result.data, (list, tuple)) or
5349           len(result.data) != 2):
5350         raise errors.OpExecError("Grow request failed to node %s" % node)
5351       elif not result.data[0]:
5352         raise errors.OpExecError("Grow request failed to node %s: %s" %
5353                                  (node, result.data[1]))
5354     disk.RecordGrow(self.op.amount)
5355     self.cfg.Update(instance)
5356     if self.op.wait_for_sync:
5357       disk_abort = not _WaitForSync(self, instance)
5358       if disk_abort:
5359         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5360                              " status.\nPlease check the instance.")
5361
5362
5363 class LUQueryInstanceData(NoHooksLU):
5364   """Query runtime instance data.
5365
5366   """
5367   _OP_REQP = ["instances", "static"]
5368   REQ_BGL = False
5369
5370   def ExpandNames(self):
5371     self.needed_locks = {}
5372     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5373
5374     if not isinstance(self.op.instances, list):
5375       raise errors.OpPrereqError("Invalid argument type 'instances'")
5376
5377     if self.op.instances:
5378       self.wanted_names = []
5379       for name in self.op.instances:
5380         full_name = self.cfg.ExpandInstanceName(name)
5381         if full_name is None:
5382           raise errors.OpPrereqError("Instance '%s' not known" % name)
5383         self.wanted_names.append(full_name)
5384       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5385     else:
5386       self.wanted_names = None
5387       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5388
5389     self.needed_locks[locking.LEVEL_NODE] = []
5390     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5391
5392   def DeclareLocks(self, level):
5393     if level == locking.LEVEL_NODE:
5394       self._LockInstancesNodes()
5395
5396   def CheckPrereq(self):
5397     """Check prerequisites.
5398
5399     This only checks the optional instance list against the existing names.
5400
5401     """
5402     if self.wanted_names is None:
5403       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5404
5405     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5406                              in self.wanted_names]
5407     return
5408
5409   def _ComputeDiskStatus(self, instance, snode, dev):
5410     """Compute block device status.
5411
5412     """
5413     static = self.op.static
5414     if not static:
5415       self.cfg.SetDiskID(dev, instance.primary_node)
5416       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5417       dev_pstatus.Raise()
5418       dev_pstatus = dev_pstatus.data
5419     else:
5420       dev_pstatus = None
5421
5422     if dev.dev_type in constants.LDS_DRBD:
5423       # we change the snode then (otherwise we use the one passed in)
5424       if dev.logical_id[0] == instance.primary_node:
5425         snode = dev.logical_id[1]
5426       else:
5427         snode = dev.logical_id[0]
5428
5429     if snode and not static:
5430       self.cfg.SetDiskID(dev, snode)
5431       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5432       dev_sstatus.Raise()
5433       dev_sstatus = dev_sstatus.data
5434     else:
5435       dev_sstatus = None
5436
5437     if dev.children:
5438       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5439                       for child in dev.children]
5440     else:
5441       dev_children = []
5442
5443     data = {
5444       "iv_name": dev.iv_name,
5445       "dev_type": dev.dev_type,
5446       "logical_id": dev.logical_id,
5447       "physical_id": dev.physical_id,
5448       "pstatus": dev_pstatus,
5449       "sstatus": dev_sstatus,
5450       "children": dev_children,
5451       "mode": dev.mode,
5452       }
5453
5454     return data
5455
5456   def Exec(self, feedback_fn):
5457     """Gather and return data"""
5458     result = {}
5459
5460     cluster = self.cfg.GetClusterInfo()
5461
5462     for instance in self.wanted_instances:
5463       if not self.op.static:
5464         remote_info = self.rpc.call_instance_info(instance.primary_node,
5465                                                   instance.name,
5466                                                   instance.hypervisor)
5467         remote_info.Raise()
5468         remote_info = remote_info.data
5469         if remote_info and "state" in remote_info:
5470           remote_state = "up"
5471         else:
5472           remote_state = "down"
5473       else:
5474         remote_state = None
5475       if instance.admin_up:
5476         config_state = "up"
5477       else:
5478         config_state = "down"
5479
5480       disks = [self._ComputeDiskStatus(instance, None, device)
5481                for device in instance.disks]
5482
5483       idict = {
5484         "name": instance.name,
5485         "config_state": config_state,
5486         "run_state": remote_state,
5487         "pnode": instance.primary_node,
5488         "snodes": instance.secondary_nodes,
5489         "os": instance.os,
5490         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5491         "disks": disks,
5492         "hypervisor": instance.hypervisor,
5493         "network_port": instance.network_port,
5494         "hv_instance": instance.hvparams,
5495         "hv_actual": cluster.FillHV(instance),
5496         "be_instance": instance.beparams,
5497         "be_actual": cluster.FillBE(instance),
5498         }
5499
5500       result[instance.name] = idict
5501
5502     return result
5503
5504
5505 class LUSetInstanceParams(LogicalUnit):
5506   """Modifies an instances's parameters.
5507
5508   """
5509   HPATH = "instance-modify"
5510   HTYPE = constants.HTYPE_INSTANCE
5511   _OP_REQP = ["instance_name"]
5512   REQ_BGL = False
5513
5514   def CheckArguments(self):
5515     if not hasattr(self.op, 'nics'):
5516       self.op.nics = []
5517     if not hasattr(self.op, 'disks'):
5518       self.op.disks = []
5519     if not hasattr(self.op, 'beparams'):
5520       self.op.beparams = {}
5521     if not hasattr(self.op, 'hvparams'):
5522       self.op.hvparams = {}
5523     self.op.force = getattr(self.op, "force", False)
5524     if not (self.op.nics or self.op.disks or
5525             self.op.hvparams or self.op.beparams):
5526       raise errors.OpPrereqError("No changes submitted")
5527
5528     utils.CheckBEParams(self.op.beparams)
5529
5530     # Disk validation
5531     disk_addremove = 0
5532     for disk_op, disk_dict in self.op.disks:
5533       if disk_op == constants.DDM_REMOVE:
5534         disk_addremove += 1
5535         continue
5536       elif disk_op == constants.DDM_ADD:
5537         disk_addremove += 1
5538       else:
5539         if not isinstance(disk_op, int):
5540           raise errors.OpPrereqError("Invalid disk index")
5541       if disk_op == constants.DDM_ADD:
5542         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5543         if mode not in constants.DISK_ACCESS_SET:
5544           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5545         size = disk_dict.get('size', None)
5546         if size is None:
5547           raise errors.OpPrereqError("Required disk parameter size missing")
5548         try:
5549           size = int(size)
5550         except ValueError, err:
5551           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5552                                      str(err))
5553         disk_dict['size'] = size
5554       else:
5555         # modification of disk
5556         if 'size' in disk_dict:
5557           raise errors.OpPrereqError("Disk size change not possible, use"
5558                                      " grow-disk")
5559
5560     if disk_addremove > 1:
5561       raise errors.OpPrereqError("Only one disk add or remove operation"
5562                                  " supported at a time")
5563
5564     # NIC validation
5565     nic_addremove = 0
5566     for nic_op, nic_dict in self.op.nics:
5567       if nic_op == constants.DDM_REMOVE:
5568         nic_addremove += 1
5569         continue
5570       elif nic_op == constants.DDM_ADD:
5571         nic_addremove += 1
5572       else:
5573         if not isinstance(nic_op, int):
5574           raise errors.OpPrereqError("Invalid nic index")
5575
5576       # nic_dict should be a dict
5577       nic_ip = nic_dict.get('ip', None)
5578       if nic_ip is not None:
5579         if nic_ip.lower() == "none":
5580           nic_dict['ip'] = None
5581         else:
5582           if not utils.IsValidIP(nic_ip):
5583             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5584       # we can only check None bridges and assign the default one
5585       nic_bridge = nic_dict.get('bridge', None)
5586       if nic_bridge is None:
5587         nic_dict['bridge'] = self.cfg.GetDefBridge()
5588       # but we can validate MACs
5589       nic_mac = nic_dict.get('mac', None)
5590       if nic_mac is not None:
5591         if self.cfg.IsMacInUse(nic_mac):
5592           raise errors.OpPrereqError("MAC address %s already in use"
5593                                      " in cluster" % nic_mac)
5594         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5595           if not utils.IsValidMac(nic_mac):
5596             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5597     if nic_addremove > 1:
5598       raise errors.OpPrereqError("Only one NIC add or remove operation"
5599                                  " supported at a time")
5600
5601   def ExpandNames(self):
5602     self._ExpandAndLockInstance()
5603     self.needed_locks[locking.LEVEL_NODE] = []
5604     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5605
5606   def DeclareLocks(self, level):
5607     if level == locking.LEVEL_NODE:
5608       self._LockInstancesNodes()
5609
5610   def BuildHooksEnv(self):
5611     """Build hooks env.
5612
5613     This runs on the master, primary and secondaries.
5614
5615     """
5616     args = dict()
5617     if constants.BE_MEMORY in self.be_new:
5618       args['memory'] = self.be_new[constants.BE_MEMORY]
5619     if constants.BE_VCPUS in self.be_new:
5620       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5621     # FIXME: readd disk/nic changes
5622     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5623     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5624     return env, nl, nl
5625
5626   def CheckPrereq(self):
5627     """Check prerequisites.
5628
5629     This only checks the instance list against the existing names.
5630
5631     """
5632     force = self.force = self.op.force
5633
5634     # checking the new params on the primary/secondary nodes
5635
5636     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5637     assert self.instance is not None, \
5638       "Cannot retrieve locked instance %s" % self.op.instance_name
5639     pnode = instance.primary_node
5640     nodelist = list(instance.all_nodes)
5641
5642     # hvparams processing
5643     if self.op.hvparams:
5644       i_hvdict = copy.deepcopy(instance.hvparams)
5645       for key, val in self.op.hvparams.iteritems():
5646         if val == constants.VALUE_DEFAULT:
5647           try:
5648             del i_hvdict[key]
5649           except KeyError:
5650             pass
5651         elif val == constants.VALUE_NONE:
5652           i_hvdict[key] = None
5653         else:
5654           i_hvdict[key] = val
5655       cluster = self.cfg.GetClusterInfo()
5656       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5657                                 i_hvdict)
5658       # local check
5659       hypervisor.GetHypervisor(
5660         instance.hypervisor).CheckParameterSyntax(hv_new)
5661       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5662       self.hv_new = hv_new # the new actual values
5663       self.hv_inst = i_hvdict # the new dict (without defaults)
5664     else:
5665       self.hv_new = self.hv_inst = {}
5666
5667     # beparams processing
5668     if self.op.beparams:
5669       i_bedict = copy.deepcopy(instance.beparams)
5670       for key, val in self.op.beparams.iteritems():
5671         if val == constants.VALUE_DEFAULT:
5672           try:
5673             del i_bedict[key]
5674           except KeyError:
5675             pass
5676         else:
5677           i_bedict[key] = val
5678       cluster = self.cfg.GetClusterInfo()
5679       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5680                                 i_bedict)
5681       self.be_new = be_new # the new actual values
5682       self.be_inst = i_bedict # the new dict (without defaults)
5683     else:
5684       self.be_new = self.be_inst = {}
5685
5686     self.warn = []
5687
5688     if constants.BE_MEMORY in self.op.beparams and not self.force:
5689       mem_check_list = [pnode]
5690       if be_new[constants.BE_AUTO_BALANCE]:
5691         # either we changed auto_balance to yes or it was from before
5692         mem_check_list.extend(instance.secondary_nodes)
5693       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5694                                                   instance.hypervisor)
5695       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5696                                          instance.hypervisor)
5697       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5698         # Assume the primary node is unreachable and go ahead
5699         self.warn.append("Can't get info from primary node %s" % pnode)
5700       else:
5701         if not instance_info.failed and instance_info.data:
5702           current_mem = instance_info.data['memory']
5703         else:
5704           # Assume instance not running
5705           # (there is a slight race condition here, but it's not very probable,
5706           # and we have no other way to check)
5707           current_mem = 0
5708         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5709                     nodeinfo[pnode].data['memory_free'])
5710         if miss_mem > 0:
5711           raise errors.OpPrereqError("This change will prevent the instance"
5712                                      " from starting, due to %d MB of memory"
5713                                      " missing on its primary node" % miss_mem)
5714
5715       if be_new[constants.BE_AUTO_BALANCE]:
5716         for node, nres in nodeinfo.iteritems():
5717           if node not in instance.secondary_nodes:
5718             continue
5719           if nres.failed or not isinstance(nres.data, dict):
5720             self.warn.append("Can't get info from secondary node %s" % node)
5721           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5722             self.warn.append("Not enough memory to failover instance to"
5723                              " secondary node %s" % node)
5724
5725     # NIC processing
5726     for nic_op, nic_dict in self.op.nics:
5727       if nic_op == constants.DDM_REMOVE:
5728         if not instance.nics:
5729           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5730         continue
5731       if nic_op != constants.DDM_ADD:
5732         # an existing nic
5733         if nic_op < 0 or nic_op >= len(instance.nics):
5734           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5735                                      " are 0 to %d" %
5736                                      (nic_op, len(instance.nics)))
5737       nic_bridge = nic_dict.get('bridge', None)
5738       if nic_bridge is not None:
5739         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5740           msg = ("Bridge '%s' doesn't exist on one of"
5741                  " the instance nodes" % nic_bridge)
5742           if self.force:
5743             self.warn.append(msg)
5744           else:
5745             raise errors.OpPrereqError(msg)
5746
5747     # DISK processing
5748     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5749       raise errors.OpPrereqError("Disk operations not supported for"
5750                                  " diskless instances")
5751     for disk_op, disk_dict in self.op.disks:
5752       if disk_op == constants.DDM_REMOVE:
5753         if len(instance.disks) == 1:
5754           raise errors.OpPrereqError("Cannot remove the last disk of"
5755                                      " an instance")
5756         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5757         ins_l = ins_l[pnode]
5758         if ins_l.failed or not isinstance(ins_l.data, list):
5759           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5760         if instance.name in ins_l.data:
5761           raise errors.OpPrereqError("Instance is running, can't remove"
5762                                      " disks.")
5763
5764       if (disk_op == constants.DDM_ADD and
5765           len(instance.nics) >= constants.MAX_DISKS):
5766         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5767                                    " add more" % constants.MAX_DISKS)
5768       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5769         # an existing disk
5770         if disk_op < 0 or disk_op >= len(instance.disks):
5771           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5772                                      " are 0 to %d" %
5773                                      (disk_op, len(instance.disks)))
5774
5775     return
5776
5777   def Exec(self, feedback_fn):
5778     """Modifies an instance.
5779
5780     All parameters take effect only at the next restart of the instance.
5781
5782     """
5783     # Process here the warnings from CheckPrereq, as we don't have a
5784     # feedback_fn there.
5785     for warn in self.warn:
5786       feedback_fn("WARNING: %s" % warn)
5787
5788     result = []
5789     instance = self.instance
5790     # disk changes
5791     for disk_op, disk_dict in self.op.disks:
5792       if disk_op == constants.DDM_REMOVE:
5793         # remove the last disk
5794         device = instance.disks.pop()
5795         device_idx = len(instance.disks)
5796         for node, disk in device.ComputeNodeTree(instance.primary_node):
5797           self.cfg.SetDiskID(disk, node)
5798           rpc_result = self.rpc.call_blockdev_remove(node, disk)
5799           if rpc_result.failed or not rpc_result.data:
5800             self.proc.LogWarning("Could not remove disk/%d on node %s,"
5801                                  " continuing anyway", device_idx, node)
5802         result.append(("disk/%d" % device_idx, "remove"))
5803       elif disk_op == constants.DDM_ADD:
5804         # add a new disk
5805         if instance.disk_template == constants.DT_FILE:
5806           file_driver, file_path = instance.disks[0].logical_id
5807           file_path = os.path.dirname(file_path)
5808         else:
5809           file_driver = file_path = None
5810         disk_idx_base = len(instance.disks)
5811         new_disk = _GenerateDiskTemplate(self,
5812                                          instance.disk_template,
5813                                          instance.name, instance.primary_node,
5814                                          instance.secondary_nodes,
5815                                          [disk_dict],
5816                                          file_path,
5817                                          file_driver,
5818                                          disk_idx_base)[0]
5819         instance.disks.append(new_disk)
5820         info = _GetInstanceInfoText(instance)
5821
5822         logging.info("Creating volume %s for instance %s",
5823                      new_disk.iv_name, instance.name)
5824         # Note: this needs to be kept in sync with _CreateDisks
5825         #HARDCODE
5826         for node in instance.all_nodes:
5827           f_create = node == instance.primary_node
5828           try:
5829             _CreateBlockDev(self, node, instance, new_disk,
5830                             f_create, info, f_create)
5831           except errors.OpExecError, err:
5832             self.LogWarning("Failed to create volume %s (%s) on"
5833                             " node %s: %s",
5834                             new_disk.iv_name, new_disk, node, err)
5835         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5836                        (new_disk.size, new_disk.mode)))
5837       else:
5838         # change a given disk
5839         instance.disks[disk_op].mode = disk_dict['mode']
5840         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5841     # NIC changes
5842     for nic_op, nic_dict in self.op.nics:
5843       if nic_op == constants.DDM_REMOVE:
5844         # remove the last nic
5845         del instance.nics[-1]
5846         result.append(("nic.%d" % len(instance.nics), "remove"))
5847       elif nic_op == constants.DDM_ADD:
5848         # add a new nic
5849         if 'mac' not in nic_dict:
5850           mac = constants.VALUE_GENERATE
5851         else:
5852           mac = nic_dict['mac']
5853         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5854           mac = self.cfg.GenerateMAC()
5855         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5856                               bridge=nic_dict.get('bridge', None))
5857         instance.nics.append(new_nic)
5858         result.append(("nic.%d" % (len(instance.nics) - 1),
5859                        "add:mac=%s,ip=%s,bridge=%s" %
5860                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5861       else:
5862         # change a given nic
5863         for key in 'mac', 'ip', 'bridge':
5864           if key in nic_dict:
5865             setattr(instance.nics[nic_op], key, nic_dict[key])
5866             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5867
5868     # hvparams changes
5869     if self.op.hvparams:
5870       instance.hvparams = self.hv_new
5871       for key, val in self.op.hvparams.iteritems():
5872         result.append(("hv/%s" % key, val))
5873
5874     # beparams changes
5875     if self.op.beparams:
5876       instance.beparams = self.be_inst
5877       for key, val in self.op.beparams.iteritems():
5878         result.append(("be/%s" % key, val))
5879
5880     self.cfg.Update(instance)
5881
5882     return result
5883
5884
5885 class LUQueryExports(NoHooksLU):
5886   """Query the exports list
5887
5888   """
5889   _OP_REQP = ['nodes']
5890   REQ_BGL = False
5891
5892   def ExpandNames(self):
5893     self.needed_locks = {}
5894     self.share_locks[locking.LEVEL_NODE] = 1
5895     if not self.op.nodes:
5896       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5897     else:
5898       self.needed_locks[locking.LEVEL_NODE] = \
5899         _GetWantedNodes(self, self.op.nodes)
5900
5901   def CheckPrereq(self):
5902     """Check prerequisites.
5903
5904     """
5905     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5906
5907   def Exec(self, feedback_fn):
5908     """Compute the list of all the exported system images.
5909
5910     @rtype: dict
5911     @return: a dictionary with the structure node->(export-list)
5912         where export-list is a list of the instances exported on
5913         that node.
5914
5915     """
5916     rpcresult = self.rpc.call_export_list(self.nodes)
5917     result = {}
5918     for node in rpcresult:
5919       if rpcresult[node].failed:
5920         result[node] = False
5921       else:
5922         result[node] = rpcresult[node].data
5923
5924     return result
5925
5926
5927 class LUExportInstance(LogicalUnit):
5928   """Export an instance to an image in the cluster.
5929
5930   """
5931   HPATH = "instance-export"
5932   HTYPE = constants.HTYPE_INSTANCE
5933   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5934   REQ_BGL = False
5935
5936   def ExpandNames(self):
5937     self._ExpandAndLockInstance()
5938     # FIXME: lock only instance primary and destination node
5939     #
5940     # Sad but true, for now we have do lock all nodes, as we don't know where
5941     # the previous export might be, and and in this LU we search for it and
5942     # remove it from its current node. In the future we could fix this by:
5943     #  - making a tasklet to search (share-lock all), then create the new one,
5944     #    then one to remove, after
5945     #  - removing the removal operation altoghether
5946     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5947
5948   def DeclareLocks(self, level):
5949     """Last minute lock declaration."""
5950     # All nodes are locked anyway, so nothing to do here.
5951
5952   def BuildHooksEnv(self):
5953     """Build hooks env.
5954
5955     This will run on the master, primary node and target node.
5956
5957     """
5958     env = {
5959       "EXPORT_NODE": self.op.target_node,
5960       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5961       }
5962     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5963     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5964           self.op.target_node]
5965     return env, nl, nl
5966
5967   def CheckPrereq(self):
5968     """Check prerequisites.
5969
5970     This checks that the instance and node names are valid.
5971
5972     """
5973     instance_name = self.op.instance_name
5974     self.instance = self.cfg.GetInstanceInfo(instance_name)
5975     assert self.instance is not None, \
5976           "Cannot retrieve locked instance %s" % self.op.instance_name
5977     _CheckNodeOnline(self, self.instance.primary_node)
5978
5979     self.dst_node = self.cfg.GetNodeInfo(
5980       self.cfg.ExpandNodeName(self.op.target_node))
5981
5982     if self.dst_node is None:
5983       # This is wrong node name, not a non-locked node
5984       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5985     _CheckNodeOnline(self, self.dst_node.name)
5986
5987     # instance disk type verification
5988     for disk in self.instance.disks:
5989       if disk.dev_type == constants.LD_FILE:
5990         raise errors.OpPrereqError("Export not supported for instances with"
5991                                    " file-based disks")
5992
5993   def Exec(self, feedback_fn):
5994     """Export an instance to an image in the cluster.
5995
5996     """
5997     instance = self.instance
5998     dst_node = self.dst_node
5999     src_node = instance.primary_node
6000     if self.op.shutdown:
6001       # shutdown the instance, but not the disks
6002       result = self.rpc.call_instance_shutdown(src_node, instance)
6003       result.Raise()
6004       if not result.data:
6005         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
6006                                  (instance.name, src_node))
6007
6008     vgname = self.cfg.GetVGName()
6009
6010     snap_disks = []
6011
6012     # set the disks ID correctly since call_instance_start needs the
6013     # correct drbd minor to create the symlinks
6014     for disk in instance.disks:
6015       self.cfg.SetDiskID(disk, src_node)
6016
6017     try:
6018       for disk in instance.disks:
6019         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6020         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6021         if new_dev_name.failed or not new_dev_name.data:
6022           self.LogWarning("Could not snapshot block device %s on node %s",
6023                           disk.logical_id[1], src_node)
6024           snap_disks.append(False)
6025         else:
6026           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6027                                  logical_id=(vgname, new_dev_name.data),
6028                                  physical_id=(vgname, new_dev_name.data),
6029                                  iv_name=disk.iv_name)
6030           snap_disks.append(new_dev)
6031
6032     finally:
6033       if self.op.shutdown and instance.admin_up:
6034         result = self.rpc.call_instance_start(src_node, instance, None)
6035         msg = result.RemoteFailMsg()
6036         if msg:
6037           _ShutdownInstanceDisks(self, instance)
6038           raise errors.OpExecError("Could not start instance: %s" % msg)
6039
6040     # TODO: check for size
6041
6042     cluster_name = self.cfg.GetClusterName()
6043     for idx, dev in enumerate(snap_disks):
6044       if dev:
6045         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6046                                                instance, cluster_name, idx)
6047         if result.failed or not result.data:
6048           self.LogWarning("Could not export block device %s from node %s to"
6049                           " node %s", dev.logical_id[1], src_node,
6050                           dst_node.name)
6051         result = self.rpc.call_blockdev_remove(src_node, dev)
6052         if result.failed or not result.data:
6053           self.LogWarning("Could not remove snapshot block device %s from node"
6054                           " %s", dev.logical_id[1], src_node)
6055
6056     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6057     if result.failed or not result.data:
6058       self.LogWarning("Could not finalize export for instance %s on node %s",
6059                       instance.name, dst_node.name)
6060
6061     nodelist = self.cfg.GetNodeList()
6062     nodelist.remove(dst_node.name)
6063
6064     # on one-node clusters nodelist will be empty after the removal
6065     # if we proceed the backup would be removed because OpQueryExports
6066     # substitutes an empty list with the full cluster node list.
6067     if nodelist:
6068       exportlist = self.rpc.call_export_list(nodelist)
6069       for node in exportlist:
6070         if exportlist[node].failed:
6071           continue
6072         if instance.name in exportlist[node].data:
6073           if not self.rpc.call_export_remove(node, instance.name):
6074             self.LogWarning("Could not remove older export for instance %s"
6075                             " on node %s", instance.name, node)
6076
6077
6078 class LURemoveExport(NoHooksLU):
6079   """Remove exports related to the named instance.
6080
6081   """
6082   _OP_REQP = ["instance_name"]
6083   REQ_BGL = False
6084
6085   def ExpandNames(self):
6086     self.needed_locks = {}
6087     # We need all nodes to be locked in order for RemoveExport to work, but we
6088     # don't need to lock the instance itself, as nothing will happen to it (and
6089     # we can remove exports also for a removed instance)
6090     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6091
6092   def CheckPrereq(self):
6093     """Check prerequisites.
6094     """
6095     pass
6096
6097   def Exec(self, feedback_fn):
6098     """Remove any export.
6099
6100     """
6101     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6102     # If the instance was not found we'll try with the name that was passed in.
6103     # This will only work if it was an FQDN, though.
6104     fqdn_warn = False
6105     if not instance_name:
6106       fqdn_warn = True
6107       instance_name = self.op.instance_name
6108
6109     exportlist = self.rpc.call_export_list(self.acquired_locks[
6110       locking.LEVEL_NODE])
6111     found = False
6112     for node in exportlist:
6113       if exportlist[node].failed:
6114         self.LogWarning("Failed to query node %s, continuing" % node)
6115         continue
6116       if instance_name in exportlist[node].data:
6117         found = True
6118         result = self.rpc.call_export_remove(node, instance_name)
6119         if result.failed or not result.data:
6120           logging.error("Could not remove export for instance %s"
6121                         " on node %s", instance_name, node)
6122
6123     if fqdn_warn and not found:
6124       feedback_fn("Export not found. If trying to remove an export belonging"
6125                   " to a deleted instance please use its Fully Qualified"
6126                   " Domain Name.")
6127
6128
6129 class TagsLU(NoHooksLU):
6130   """Generic tags LU.
6131
6132   This is an abstract class which is the parent of all the other tags LUs.
6133
6134   """
6135
6136   def ExpandNames(self):
6137     self.needed_locks = {}
6138     if self.op.kind == constants.TAG_NODE:
6139       name = self.cfg.ExpandNodeName(self.op.name)
6140       if name is None:
6141         raise errors.OpPrereqError("Invalid node name (%s)" %
6142                                    (self.op.name,))
6143       self.op.name = name
6144       self.needed_locks[locking.LEVEL_NODE] = name
6145     elif self.op.kind == constants.TAG_INSTANCE:
6146       name = self.cfg.ExpandInstanceName(self.op.name)
6147       if name is None:
6148         raise errors.OpPrereqError("Invalid instance name (%s)" %
6149                                    (self.op.name,))
6150       self.op.name = name
6151       self.needed_locks[locking.LEVEL_INSTANCE] = name
6152
6153   def CheckPrereq(self):
6154     """Check prerequisites.
6155
6156     """
6157     if self.op.kind == constants.TAG_CLUSTER:
6158       self.target = self.cfg.GetClusterInfo()
6159     elif self.op.kind == constants.TAG_NODE:
6160       self.target = self.cfg.GetNodeInfo(self.op.name)
6161     elif self.op.kind == constants.TAG_INSTANCE:
6162       self.target = self.cfg.GetInstanceInfo(self.op.name)
6163     else:
6164       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6165                                  str(self.op.kind))
6166
6167
6168 class LUGetTags(TagsLU):
6169   """Returns the tags of a given object.
6170
6171   """
6172   _OP_REQP = ["kind", "name"]
6173   REQ_BGL = False
6174
6175   def Exec(self, feedback_fn):
6176     """Returns the tag list.
6177
6178     """
6179     return list(self.target.GetTags())
6180
6181
6182 class LUSearchTags(NoHooksLU):
6183   """Searches the tags for a given pattern.
6184
6185   """
6186   _OP_REQP = ["pattern"]
6187   REQ_BGL = False
6188
6189   def ExpandNames(self):
6190     self.needed_locks = {}
6191
6192   def CheckPrereq(self):
6193     """Check prerequisites.
6194
6195     This checks the pattern passed for validity by compiling it.
6196
6197     """
6198     try:
6199       self.re = re.compile(self.op.pattern)
6200     except re.error, err:
6201       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6202                                  (self.op.pattern, err))
6203
6204   def Exec(self, feedback_fn):
6205     """Returns the tag list.
6206
6207     """
6208     cfg = self.cfg
6209     tgts = [("/cluster", cfg.GetClusterInfo())]
6210     ilist = cfg.GetAllInstancesInfo().values()
6211     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6212     nlist = cfg.GetAllNodesInfo().values()
6213     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6214     results = []
6215     for path, target in tgts:
6216       for tag in target.GetTags():
6217         if self.re.search(tag):
6218           results.append((path, tag))
6219     return results
6220
6221
6222 class LUAddTags(TagsLU):
6223   """Sets a tag on a given object.
6224
6225   """
6226   _OP_REQP = ["kind", "name", "tags"]
6227   REQ_BGL = False
6228
6229   def CheckPrereq(self):
6230     """Check prerequisites.
6231
6232     This checks the type and length of the tag name and value.
6233
6234     """
6235     TagsLU.CheckPrereq(self)
6236     for tag in self.op.tags:
6237       objects.TaggableObject.ValidateTag(tag)
6238
6239   def Exec(self, feedback_fn):
6240     """Sets the tag.
6241
6242     """
6243     try:
6244       for tag in self.op.tags:
6245         self.target.AddTag(tag)
6246     except errors.TagError, err:
6247       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6248     try:
6249       self.cfg.Update(self.target)
6250     except errors.ConfigurationError:
6251       raise errors.OpRetryError("There has been a modification to the"
6252                                 " config file and the operation has been"
6253                                 " aborted. Please retry.")
6254
6255
6256 class LUDelTags(TagsLU):
6257   """Delete a list of tags from a given object.
6258
6259   """
6260   _OP_REQP = ["kind", "name", "tags"]
6261   REQ_BGL = False
6262
6263   def CheckPrereq(self):
6264     """Check prerequisites.
6265
6266     This checks that we have the given tag.
6267
6268     """
6269     TagsLU.CheckPrereq(self)
6270     for tag in self.op.tags:
6271       objects.TaggableObject.ValidateTag(tag)
6272     del_tags = frozenset(self.op.tags)
6273     cur_tags = self.target.GetTags()
6274     if not del_tags <= cur_tags:
6275       diff_tags = del_tags - cur_tags
6276       diff_names = ["'%s'" % tag for tag in diff_tags]
6277       diff_names.sort()
6278       raise errors.OpPrereqError("Tag(s) %s not found" %
6279                                  (",".join(diff_names)))
6280
6281   def Exec(self, feedback_fn):
6282     """Remove the tag from the object.
6283
6284     """
6285     for tag in self.op.tags:
6286       self.target.RemoveTag(tag)
6287     try:
6288       self.cfg.Update(self.target)
6289     except errors.ConfigurationError:
6290       raise errors.OpRetryError("There has been a modification to the"
6291                                 " config file and the operation has been"
6292                                 " aborted. Please retry.")
6293
6294
6295 class LUTestDelay(NoHooksLU):
6296   """Sleep for a specified amount of time.
6297
6298   This LU sleeps on the master and/or nodes for a specified amount of
6299   time.
6300
6301   """
6302   _OP_REQP = ["duration", "on_master", "on_nodes"]
6303   REQ_BGL = False
6304
6305   def ExpandNames(self):
6306     """Expand names and set required locks.
6307
6308     This expands the node list, if any.
6309
6310     """
6311     self.needed_locks = {}
6312     if self.op.on_nodes:
6313       # _GetWantedNodes can be used here, but is not always appropriate to use
6314       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6315       # more information.
6316       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6317       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6318
6319   def CheckPrereq(self):
6320     """Check prerequisites.
6321
6322     """
6323
6324   def Exec(self, feedback_fn):
6325     """Do the actual sleep.
6326
6327     """
6328     if self.op.on_master:
6329       if not utils.TestDelay(self.op.duration):
6330         raise errors.OpExecError("Error during master delay test")
6331     if self.op.on_nodes:
6332       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6333       if not result:
6334         raise errors.OpExecError("Complete failure from rpc call")
6335       for node, node_result in result.items():
6336         node_result.Raise()
6337         if not node_result.data:
6338           raise errors.OpExecError("Failure during rpc call to node %s,"
6339                                    " result: %s" % (node, node_result.data))
6340
6341
6342 class IAllocator(object):
6343   """IAllocator framework.
6344
6345   An IAllocator instance has three sets of attributes:
6346     - cfg that is needed to query the cluster
6347     - input data (all members of the _KEYS class attribute are required)
6348     - four buffer attributes (in|out_data|text), that represent the
6349       input (to the external script) in text and data structure format,
6350       and the output from it, again in two formats
6351     - the result variables from the script (success, info, nodes) for
6352       easy usage
6353
6354   """
6355   _ALLO_KEYS = [
6356     "mem_size", "disks", "disk_template",
6357     "os", "tags", "nics", "vcpus", "hypervisor",
6358     ]
6359   _RELO_KEYS = [
6360     "relocate_from",
6361     ]
6362
6363   def __init__(self, lu, mode, name, **kwargs):
6364     self.lu = lu
6365     # init buffer variables
6366     self.in_text = self.out_text = self.in_data = self.out_data = None
6367     # init all input fields so that pylint is happy
6368     self.mode = mode
6369     self.name = name
6370     self.mem_size = self.disks = self.disk_template = None
6371     self.os = self.tags = self.nics = self.vcpus = None
6372     self.hypervisor = None
6373     self.relocate_from = None
6374     # computed fields
6375     self.required_nodes = None
6376     # init result fields
6377     self.success = self.info = self.nodes = None
6378     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6379       keyset = self._ALLO_KEYS
6380     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6381       keyset = self._RELO_KEYS
6382     else:
6383       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6384                                    " IAllocator" % self.mode)
6385     for key in kwargs:
6386       if key not in keyset:
6387         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6388                                      " IAllocator" % key)
6389       setattr(self, key, kwargs[key])
6390     for key in keyset:
6391       if key not in kwargs:
6392         raise errors.ProgrammerError("Missing input parameter '%s' to"
6393                                      " IAllocator" % key)
6394     self._BuildInputData()
6395
6396   def _ComputeClusterData(self):
6397     """Compute the generic allocator input data.
6398
6399     This is the data that is independent of the actual operation.
6400
6401     """
6402     cfg = self.lu.cfg
6403     cluster_info = cfg.GetClusterInfo()
6404     # cluster data
6405     data = {
6406       "version": 1,
6407       "cluster_name": cfg.GetClusterName(),
6408       "cluster_tags": list(cluster_info.GetTags()),
6409       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6410       # we don't have job IDs
6411       }
6412     iinfo = cfg.GetAllInstancesInfo().values()
6413     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6414
6415     # node data
6416     node_results = {}
6417     node_list = cfg.GetNodeList()
6418
6419     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6420       hypervisor_name = self.hypervisor
6421     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6422       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6423
6424     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6425                                            hypervisor_name)
6426     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6427                        cluster_info.enabled_hypervisors)
6428     for nname, nresult in node_data.items():
6429       # first fill in static (config-based) values
6430       ninfo = cfg.GetNodeInfo(nname)
6431       pnr = {
6432         "tags": list(ninfo.GetTags()),
6433         "primary_ip": ninfo.primary_ip,
6434         "secondary_ip": ninfo.secondary_ip,
6435         "offline": ninfo.offline,
6436         "master_candidate": ninfo.master_candidate,
6437         }
6438
6439       if not ninfo.offline:
6440         nresult.Raise()
6441         if not isinstance(nresult.data, dict):
6442           raise errors.OpExecError("Can't get data for node %s" % nname)
6443         remote_info = nresult.data
6444         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6445                      'vg_size', 'vg_free', 'cpu_total']:
6446           if attr not in remote_info:
6447             raise errors.OpExecError("Node '%s' didn't return attribute"
6448                                      " '%s'" % (nname, attr))
6449           try:
6450             remote_info[attr] = int(remote_info[attr])
6451           except ValueError, err:
6452             raise errors.OpExecError("Node '%s' returned invalid value"
6453                                      " for '%s': %s" % (nname, attr, err))
6454         # compute memory used by primary instances
6455         i_p_mem = i_p_up_mem = 0
6456         for iinfo, beinfo in i_list:
6457           if iinfo.primary_node == nname:
6458             i_p_mem += beinfo[constants.BE_MEMORY]
6459             if iinfo.name not in node_iinfo[nname].data:
6460               i_used_mem = 0
6461             else:
6462               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6463             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6464             remote_info['memory_free'] -= max(0, i_mem_diff)
6465
6466             if iinfo.admin_up:
6467               i_p_up_mem += beinfo[constants.BE_MEMORY]
6468
6469         # compute memory used by instances
6470         pnr_dyn = {
6471           "total_memory": remote_info['memory_total'],
6472           "reserved_memory": remote_info['memory_dom0'],
6473           "free_memory": remote_info['memory_free'],
6474           "total_disk": remote_info['vg_size'],
6475           "free_disk": remote_info['vg_free'],
6476           "total_cpus": remote_info['cpu_total'],
6477           "i_pri_memory": i_p_mem,
6478           "i_pri_up_memory": i_p_up_mem,
6479           }
6480         pnr.update(pnr_dyn)
6481
6482       node_results[nname] = pnr
6483     data["nodes"] = node_results
6484
6485     # instance data
6486     instance_data = {}
6487     for iinfo, beinfo in i_list:
6488       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6489                   for n in iinfo.nics]
6490       pir = {
6491         "tags": list(iinfo.GetTags()),
6492         "admin_up": iinfo.admin_up,
6493         "vcpus": beinfo[constants.BE_VCPUS],
6494         "memory": beinfo[constants.BE_MEMORY],
6495         "os": iinfo.os,
6496         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6497         "nics": nic_data,
6498         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6499         "disk_template": iinfo.disk_template,
6500         "hypervisor": iinfo.hypervisor,
6501         }
6502       instance_data[iinfo.name] = pir
6503
6504     data["instances"] = instance_data
6505
6506     self.in_data = data
6507
6508   def _AddNewInstance(self):
6509     """Add new instance data to allocator structure.
6510
6511     This in combination with _AllocatorGetClusterData will create the
6512     correct structure needed as input for the allocator.
6513
6514     The checks for the completeness of the opcode must have already been
6515     done.
6516
6517     """
6518     data = self.in_data
6519
6520     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6521
6522     if self.disk_template in constants.DTS_NET_MIRROR:
6523       self.required_nodes = 2
6524     else:
6525       self.required_nodes = 1
6526     request = {
6527       "type": "allocate",
6528       "name": self.name,
6529       "disk_template": self.disk_template,
6530       "tags": self.tags,
6531       "os": self.os,
6532       "vcpus": self.vcpus,
6533       "memory": self.mem_size,
6534       "disks": self.disks,
6535       "disk_space_total": disk_space,
6536       "nics": self.nics,
6537       "required_nodes": self.required_nodes,
6538       }
6539     data["request"] = request
6540
6541   def _AddRelocateInstance(self):
6542     """Add relocate instance data to allocator structure.
6543
6544     This in combination with _IAllocatorGetClusterData will create the
6545     correct structure needed as input for the allocator.
6546
6547     The checks for the completeness of the opcode must have already been
6548     done.
6549
6550     """
6551     instance = self.lu.cfg.GetInstanceInfo(self.name)
6552     if instance is None:
6553       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6554                                    " IAllocator" % self.name)
6555
6556     if instance.disk_template not in constants.DTS_NET_MIRROR:
6557       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6558
6559     if len(instance.secondary_nodes) != 1:
6560       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6561
6562     self.required_nodes = 1
6563     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6564     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6565
6566     request = {
6567       "type": "relocate",
6568       "name": self.name,
6569       "disk_space_total": disk_space,
6570       "required_nodes": self.required_nodes,
6571       "relocate_from": self.relocate_from,
6572       }
6573     self.in_data["request"] = request
6574
6575   def _BuildInputData(self):
6576     """Build input data structures.
6577
6578     """
6579     self._ComputeClusterData()
6580
6581     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6582       self._AddNewInstance()
6583     else:
6584       self._AddRelocateInstance()
6585
6586     self.in_text = serializer.Dump(self.in_data)
6587
6588   def Run(self, name, validate=True, call_fn=None):
6589     """Run an instance allocator and return the results.
6590
6591     """
6592     if call_fn is None:
6593       call_fn = self.lu.rpc.call_iallocator_runner
6594     data = self.in_text
6595
6596     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6597     result.Raise()
6598
6599     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6600       raise errors.OpExecError("Invalid result from master iallocator runner")
6601
6602     rcode, stdout, stderr, fail = result.data
6603
6604     if rcode == constants.IARUN_NOTFOUND:
6605       raise errors.OpExecError("Can't find allocator '%s'" % name)
6606     elif rcode == constants.IARUN_FAILURE:
6607       raise errors.OpExecError("Instance allocator call failed: %s,"
6608                                " output: %s" % (fail, stdout+stderr))
6609     self.out_text = stdout
6610     if validate:
6611       self._ValidateResult()
6612
6613   def _ValidateResult(self):
6614     """Process the allocator results.
6615
6616     This will process and if successful save the result in
6617     self.out_data and the other parameters.
6618
6619     """
6620     try:
6621       rdict = serializer.Load(self.out_text)
6622     except Exception, err:
6623       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6624
6625     if not isinstance(rdict, dict):
6626       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6627
6628     for key in "success", "info", "nodes":
6629       if key not in rdict:
6630         raise errors.OpExecError("Can't parse iallocator results:"
6631                                  " missing key '%s'" % key)
6632       setattr(self, key, rdict[key])
6633
6634     if not isinstance(rdict["nodes"], list):
6635       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6636                                " is not a list")
6637     self.out_data = rdict
6638
6639
6640 class LUTestAllocator(NoHooksLU):
6641   """Run allocator tests.
6642
6643   This LU runs the allocator tests
6644
6645   """
6646   _OP_REQP = ["direction", "mode", "name"]
6647
6648   def CheckPrereq(self):
6649     """Check prerequisites.
6650
6651     This checks the opcode parameters depending on the director and mode test.
6652
6653     """
6654     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6655       for attr in ["name", "mem_size", "disks", "disk_template",
6656                    "os", "tags", "nics", "vcpus"]:
6657         if not hasattr(self.op, attr):
6658           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6659                                      attr)
6660       iname = self.cfg.ExpandInstanceName(self.op.name)
6661       if iname is not None:
6662         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6663                                    iname)
6664       if not isinstance(self.op.nics, list):
6665         raise errors.OpPrereqError("Invalid parameter 'nics'")
6666       for row in self.op.nics:
6667         if (not isinstance(row, dict) or
6668             "mac" not in row or
6669             "ip" not in row or
6670             "bridge" not in row):
6671           raise errors.OpPrereqError("Invalid contents of the"
6672                                      " 'nics' parameter")
6673       if not isinstance(self.op.disks, list):
6674         raise errors.OpPrereqError("Invalid parameter 'disks'")
6675       for row in self.op.disks:
6676         if (not isinstance(row, dict) or
6677             "size" not in row or
6678             not isinstance(row["size"], int) or
6679             "mode" not in row or
6680             row["mode"] not in ['r', 'w']):
6681           raise errors.OpPrereqError("Invalid contents of the"
6682                                      " 'disks' parameter")
6683       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6684         self.op.hypervisor = self.cfg.GetHypervisorType()
6685     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6686       if not hasattr(self.op, "name"):
6687         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6688       fname = self.cfg.ExpandInstanceName(self.op.name)
6689       if fname is None:
6690         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6691                                    self.op.name)
6692       self.op.name = fname
6693       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6694     else:
6695       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6696                                  self.op.mode)
6697
6698     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6699       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6700         raise errors.OpPrereqError("Missing allocator name")
6701     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6702       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6703                                  self.op.direction)
6704
6705   def Exec(self, feedback_fn):
6706     """Run the allocator test.
6707
6708     """
6709     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6710       ial = IAllocator(self,
6711                        mode=self.op.mode,
6712                        name=self.op.name,
6713                        mem_size=self.op.mem_size,
6714                        disks=self.op.disks,
6715                        disk_template=self.op.disk_template,
6716                        os=self.op.os,
6717                        tags=self.op.tags,
6718                        nics=self.op.nics,
6719                        vcpus=self.op.vcpus,
6720                        hypervisor=self.op.hypervisor,
6721                        )
6722     else:
6723       ial = IAllocator(self,
6724                        mode=self.op.mode,
6725                        name=self.op.name,
6726                        relocate_from=list(self.relocate_from),
6727                        )
6728
6729     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6730       result = ial.in_text
6731     else:
6732       ial.Run(self.op.allocator, validate=False)
6733       result = ial.out_text
6734     return result