9db397bb8170a904ede35d194f254058a79a374b
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396   return wanted
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _CheckBooleanOpField(op, name):
419   """Validates boolean opcode parameters.
420
421   This will ensure that an opcode parameter is either a boolean value,
422   or None (but that it always exists).
423
424   """
425   val = getattr(op, name, None)
426   if not (val is None or isinstance(val, bool)):
427     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428                                (name, str(val)))
429   setattr(op, name, val)
430
431
432 def _CheckNodeOnline(lu, node):
433   """Ensure that a given node is online.
434
435   @param lu: the LU on behalf of which we make the check
436   @param node: the node to check
437   @raise errors.OpPrereqError: if the nodes is offline
438
439   """
440   if lu.cfg.GetNodeInfo(node).offline:
441     raise errors.OpPrereqError("Can't use offline node %s" % node)
442
443
444 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445                           memory, vcpus, nics):
446   """Builds instance related env variables for hooks
447
448   This builds the hook environment from individual variables.
449
450   @type name: string
451   @param name: the name of the instance
452   @type primary_node: string
453   @param primary_node: the name of the instance's primary node
454   @type secondary_nodes: list
455   @param secondary_nodes: list of secondary nodes as strings
456   @type os_type: string
457   @param os_type: the name of the instance's OS
458   @type status: boolean
459   @param status: the should_run status of the instance
460   @type memory: string
461   @param memory: the memory size of the instance
462   @type vcpus: string
463   @param vcpus: the count of VCPUs the instance has
464   @type nics: list
465   @param nics: list of tuples (ip, bridge, mac) representing
466       the NICs the instance  has
467   @rtype: dict
468   @return: the hook environment for this instance
469
470   """
471   if status:
472     str_status = "up"
473   else:
474     str_status = "down"
475   env = {
476     "OP_TARGET": name,
477     "INSTANCE_NAME": name,
478     "INSTANCE_PRIMARY": primary_node,
479     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
480     "INSTANCE_OS_TYPE": os_type,
481     "INSTANCE_STATUS": str_status,
482     "INSTANCE_MEMORY": memory,
483     "INSTANCE_VCPUS": vcpus,
484   }
485
486   if nics:
487     nic_count = len(nics)
488     for idx, (ip, bridge, mac) in enumerate(nics):
489       if ip is None:
490         ip = ""
491       env["INSTANCE_NIC%d_IP" % idx] = ip
492       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
493       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
494   else:
495     nic_count = 0
496
497   env["INSTANCE_NIC_COUNT"] = nic_count
498
499   return env
500
501
502 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
503   """Builds instance related env variables for hooks from an object.
504
505   @type lu: L{LogicalUnit}
506   @param lu: the logical unit on whose behalf we execute
507   @type instance: L{objects.Instance}
508   @param instance: the instance for which we should build the
509       environment
510   @type override: dict
511   @param override: dictionary with key/values that will override
512       our values
513   @rtype: dict
514   @return: the hook environment dictionary
515
516   """
517   bep = lu.cfg.GetClusterInfo().FillBE(instance)
518   args = {
519     'name': instance.name,
520     'primary_node': instance.primary_node,
521     'secondary_nodes': instance.secondary_nodes,
522     'os_type': instance.os,
523     'status': instance.admin_up,
524     'memory': bep[constants.BE_MEMORY],
525     'vcpus': bep[constants.BE_VCPUS],
526     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
527   }
528   if override:
529     args.update(override)
530   return _BuildInstanceHookEnv(**args)
531
532
533 def _AdjustCandidatePool(lu):
534   """Adjust the candidate pool after node operations.
535
536   """
537   mod_list = lu.cfg.MaintainCandidatePool()
538   if mod_list:
539     lu.LogInfo("Promoted nodes to master candidate role: %s",
540                ", ".join(node.name for node in mod_list))
541     for name in mod_list:
542       lu.context.ReaddNode(name)
543   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
544   if mc_now > mc_max:
545     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
546                (mc_now, mc_max))
547
548
549 def _CheckInstanceBridgesExist(lu, instance):
550   """Check that the brigdes needed by an instance exist.
551
552   """
553   # check bridges existance
554   brlist = [nic.bridge for nic in instance.nics]
555   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
556   result.Raise()
557   if not result.data:
558     raise errors.OpPrereqError("One or more target bridges %s does not"
559                                " exist on destination node '%s'" %
560                                (brlist, instance.primary_node))
561
562
563 class LUDestroyCluster(NoHooksLU):
564   """Logical unit for destroying the cluster.
565
566   """
567   _OP_REQP = []
568
569   def CheckPrereq(self):
570     """Check prerequisites.
571
572     This checks whether the cluster is empty.
573
574     Any errors are signalled by raising errors.OpPrereqError.
575
576     """
577     master = self.cfg.GetMasterNode()
578
579     nodelist = self.cfg.GetNodeList()
580     if len(nodelist) != 1 or nodelist[0] != master:
581       raise errors.OpPrereqError("There are still %d node(s) in"
582                                  " this cluster." % (len(nodelist) - 1))
583     instancelist = self.cfg.GetInstanceList()
584     if instancelist:
585       raise errors.OpPrereqError("There are still %d instance(s) in"
586                                  " this cluster." % len(instancelist))
587
588   def Exec(self, feedback_fn):
589     """Destroys the cluster.
590
591     """
592     master = self.cfg.GetMasterNode()
593     result = self.rpc.call_node_stop_master(master, False)
594     result.Raise()
595     if not result.data:
596       raise errors.OpExecError("Could not disable the master role")
597     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
598     utils.CreateBackup(priv_key)
599     utils.CreateBackup(pub_key)
600     return master
601
602
603 class LUVerifyCluster(LogicalUnit):
604   """Verifies the cluster status.
605
606   """
607   HPATH = "cluster-verify"
608   HTYPE = constants.HTYPE_CLUSTER
609   _OP_REQP = ["skip_checks"]
610   REQ_BGL = False
611
612   def ExpandNames(self):
613     self.needed_locks = {
614       locking.LEVEL_NODE: locking.ALL_SET,
615       locking.LEVEL_INSTANCE: locking.ALL_SET,
616     }
617     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
618
619   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
620                   node_result, feedback_fn, master_files,
621                   drbd_map):
622     """Run multiple tests against a node.
623
624     Test list:
625
626       - compares ganeti version
627       - checks vg existance and size > 20G
628       - checks config file checksum
629       - checks ssh to other nodes
630
631     @type nodeinfo: L{objects.Node}
632     @param nodeinfo: the node to check
633     @param file_list: required list of files
634     @param local_cksum: dictionary of local files and their checksums
635     @param node_result: the results from the node
636     @param feedback_fn: function used to accumulate results
637     @param master_files: list of files that only masters should have
638     @param drbd_map: the useddrbd minors for this node, in
639         form of minor: (instance, must_exist) which correspond to instances
640         and their running status
641
642     """
643     node = nodeinfo.name
644
645     # main result, node_result should be a non-empty dict
646     if not node_result or not isinstance(node_result, dict):
647       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
648       return True
649
650     # compares ganeti version
651     local_version = constants.PROTOCOL_VERSION
652     remote_version = node_result.get('version', None)
653     if not (remote_version and isinstance(remote_version, (list, tuple)) and
654             len(remote_version) == 2):
655       feedback_fn("  - ERROR: connection to %s failed" % (node))
656       return True
657
658     if local_version != remote_version[0]:
659       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
660                   " node %s %s" % (local_version, node, remote_version[0]))
661       return True
662
663     # node seems compatible, we can actually try to look into its results
664
665     bad = False
666
667     # full package version
668     if constants.RELEASE_VERSION != remote_version[1]:
669       feedback_fn("  - WARNING: software version mismatch: master %s,"
670                   " node %s %s" %
671                   (constants.RELEASE_VERSION, node, remote_version[1]))
672
673     # checks vg existence and size > 20G
674
675     vglist = node_result.get(constants.NV_VGLIST, None)
676     if not vglist:
677       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
678                       (node,))
679       bad = True
680     else:
681       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
682                                             constants.MIN_VG_SIZE)
683       if vgstatus:
684         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
685         bad = True
686
687     # checks config file checksum
688
689     remote_cksum = node_result.get(constants.NV_FILELIST, None)
690     if not isinstance(remote_cksum, dict):
691       bad = True
692       feedback_fn("  - ERROR: node hasn't returned file checksum data")
693     else:
694       for file_name in file_list:
695         node_is_mc = nodeinfo.master_candidate
696         must_have_file = file_name not in master_files
697         if file_name not in remote_cksum:
698           if node_is_mc or must_have_file:
699             bad = True
700             feedback_fn("  - ERROR: file '%s' missing" % file_name)
701         elif remote_cksum[file_name] != local_cksum[file_name]:
702           if node_is_mc or must_have_file:
703             bad = True
704             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
705           else:
706             # not candidate and this is not a must-have file
707             bad = True
708             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
709                         " '%s'" % file_name)
710         else:
711           # all good, except non-master/non-must have combination
712           if not node_is_mc and not must_have_file:
713             feedback_fn("  - ERROR: file '%s' should not exist on non master"
714                         " candidates" % file_name)
715
716     # checks ssh to any
717
718     if constants.NV_NODELIST not in node_result:
719       bad = True
720       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
721     else:
722       if node_result[constants.NV_NODELIST]:
723         bad = True
724         for node in node_result[constants.NV_NODELIST]:
725           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
726                           (node, node_result[constants.NV_NODELIST][node]))
727
728     if constants.NV_NODENETTEST not in node_result:
729       bad = True
730       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
731     else:
732       if node_result[constants.NV_NODENETTEST]:
733         bad = True
734         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
735         for node in nlist:
736           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
737                           (node, node_result[constants.NV_NODENETTEST][node]))
738
739     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
740     if isinstance(hyp_result, dict):
741       for hv_name, hv_result in hyp_result.iteritems():
742         if hv_result is not None:
743           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
744                       (hv_name, hv_result))
745
746     # check used drbd list
747     used_minors = node_result.get(constants.NV_DRBDLIST, [])
748     for minor, (iname, must_exist) in drbd_map.items():
749       if minor not in used_minors and must_exist:
750         feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
751                     (minor, iname))
752         bad = True
753     for minor in used_minors:
754       if minor not in drbd_map:
755         feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
756         bad = True
757
758     return bad
759
760   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
761                       node_instance, feedback_fn, n_offline):
762     """Verify an instance.
763
764     This function checks to see if the required block devices are
765     available on the instance's node.
766
767     """
768     bad = False
769
770     node_current = instanceconfig.primary_node
771
772     node_vol_should = {}
773     instanceconfig.MapLVsByNode(node_vol_should)
774
775     for node in node_vol_should:
776       if node in n_offline:
777         # ignore missing volumes on offline nodes
778         continue
779       for volume in node_vol_should[node]:
780         if node not in node_vol_is or volume not in node_vol_is[node]:
781           feedback_fn("  - ERROR: volume %s missing on node %s" %
782                           (volume, node))
783           bad = True
784
785     if instanceconfig.admin_up:
786       if ((node_current not in node_instance or
787           not instance in node_instance[node_current]) and
788           node_current not in n_offline):
789         feedback_fn("  - ERROR: instance %s not running on node %s" %
790                         (instance, node_current))
791         bad = True
792
793     for node in node_instance:
794       if (not node == node_current):
795         if instance in node_instance[node]:
796           feedback_fn("  - ERROR: instance %s should not run on node %s" %
797                           (instance, node))
798           bad = True
799
800     return bad
801
802   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
803     """Verify if there are any unknown volumes in the cluster.
804
805     The .os, .swap and backup volumes are ignored. All other volumes are
806     reported as unknown.
807
808     """
809     bad = False
810
811     for node in node_vol_is:
812       for volume in node_vol_is[node]:
813         if node not in node_vol_should or volume not in node_vol_should[node]:
814           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
815                       (volume, node))
816           bad = True
817     return bad
818
819   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
820     """Verify the list of running instances.
821
822     This checks what instances are running but unknown to the cluster.
823
824     """
825     bad = False
826     for node in node_instance:
827       for runninginstance in node_instance[node]:
828         if runninginstance not in instancelist:
829           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
830                           (runninginstance, node))
831           bad = True
832     return bad
833
834   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
835     """Verify N+1 Memory Resilience.
836
837     Check that if one single node dies we can still start all the instances it
838     was primary for.
839
840     """
841     bad = False
842
843     for node, nodeinfo in node_info.iteritems():
844       # This code checks that every node which is now listed as secondary has
845       # enough memory to host all instances it is supposed to should a single
846       # other node in the cluster fail.
847       # FIXME: not ready for failover to an arbitrary node
848       # FIXME: does not support file-backed instances
849       # WARNING: we currently take into account down instances as well as up
850       # ones, considering that even if they're down someone might want to start
851       # them even in the event of a node failure.
852       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
853         needed_mem = 0
854         for instance in instances:
855           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
856           if bep[constants.BE_AUTO_BALANCE]:
857             needed_mem += bep[constants.BE_MEMORY]
858         if nodeinfo['mfree'] < needed_mem:
859           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
860                       " failovers should node %s fail" % (node, prinode))
861           bad = True
862     return bad
863
864   def CheckPrereq(self):
865     """Check prerequisites.
866
867     Transform the list of checks we're going to skip into a set and check that
868     all its members are valid.
869
870     """
871     self.skip_set = frozenset(self.op.skip_checks)
872     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
873       raise errors.OpPrereqError("Invalid checks to be skipped specified")
874
875   def BuildHooksEnv(self):
876     """Build hooks env.
877
878     Cluster-Verify hooks just rone in the post phase and their failure makes
879     the output be logged in the verify output and the verification to fail.
880
881     """
882     all_nodes = self.cfg.GetNodeList()
883     # TODO: populate the environment with useful information for verify hooks
884     env = {}
885     return env, [], all_nodes
886
887   def Exec(self, feedback_fn):
888     """Verify integrity of cluster, performing various test on nodes.
889
890     """
891     bad = False
892     feedback_fn("* Verifying global settings")
893     for msg in self.cfg.VerifyConfig():
894       feedback_fn("  - ERROR: %s" % msg)
895
896     vg_name = self.cfg.GetVGName()
897     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
898     nodelist = utils.NiceSort(self.cfg.GetNodeList())
899     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
900     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
901     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
902                         for iname in instancelist)
903     i_non_redundant = [] # Non redundant instances
904     i_non_a_balanced = [] # Non auto-balanced instances
905     n_offline = [] # List of offline nodes
906     node_volume = {}
907     node_instance = {}
908     node_info = {}
909     instance_cfg = {}
910
911     # FIXME: verify OS list
912     # do local checksums
913     master_files = [constants.CLUSTER_CONF_FILE]
914
915     file_names = ssconf.SimpleStore().GetFileList()
916     file_names.append(constants.SSL_CERT_FILE)
917     file_names.append(constants.RAPI_CERT_FILE)
918     file_names.extend(master_files)
919
920     local_checksums = utils.FingerprintFiles(file_names)
921
922     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
923     node_verify_param = {
924       constants.NV_FILELIST: file_names,
925       constants.NV_NODELIST: [node.name for node in nodeinfo
926                               if not node.offline],
927       constants.NV_HYPERVISOR: hypervisors,
928       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
929                                   node.secondary_ip) for node in nodeinfo
930                                  if not node.offline],
931       constants.NV_LVLIST: vg_name,
932       constants.NV_INSTANCELIST: hypervisors,
933       constants.NV_VGLIST: None,
934       constants.NV_VERSION: None,
935       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
936       constants.NV_DRBDLIST: None,
937       }
938     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
939                                            self.cfg.GetClusterName())
940
941     cluster = self.cfg.GetClusterInfo()
942     master_node = self.cfg.GetMasterNode()
943     all_drbd_map = self.cfg.ComputeDRBDMap()
944
945     for node_i in nodeinfo:
946       node = node_i.name
947       nresult = all_nvinfo[node].data
948
949       if node_i.offline:
950         feedback_fn("* Skipping offline node %s" % (node,))
951         n_offline.append(node)
952         continue
953
954       if node == master_node:
955         ntype = "master"
956       elif node_i.master_candidate:
957         ntype = "master candidate"
958       else:
959         ntype = "regular"
960       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
961
962       if all_nvinfo[node].failed or not isinstance(nresult, dict):
963         feedback_fn("  - ERROR: connection to %s failed" % (node,))
964         bad = True
965         continue
966
967       node_drbd = {}
968       for minor, instance in all_drbd_map[node].items():
969         instance = instanceinfo[instance]
970         node_drbd[minor] = (instance.name, instance.admin_up)
971       result = self._VerifyNode(node_i, file_names, local_checksums,
972                                 nresult, feedback_fn, master_files,
973                                 node_drbd)
974       bad = bad or result
975
976       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
977       if isinstance(lvdata, basestring):
978         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
979                     (node, lvdata.encode('string_escape')))
980         bad = True
981         node_volume[node] = {}
982       elif not isinstance(lvdata, dict):
983         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
984         bad = True
985         continue
986       else:
987         node_volume[node] = lvdata
988
989       # node_instance
990       idata = nresult.get(constants.NV_INSTANCELIST, None)
991       if not isinstance(idata, list):
992         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
993                     (node,))
994         bad = True
995         continue
996
997       node_instance[node] = idata
998
999       # node_info
1000       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1001       if not isinstance(nodeinfo, dict):
1002         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1003         bad = True
1004         continue
1005
1006       try:
1007         node_info[node] = {
1008           "mfree": int(nodeinfo['memory_free']),
1009           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
1010           "pinst": [],
1011           "sinst": [],
1012           # dictionary holding all instances this node is secondary for,
1013           # grouped by their primary node. Each key is a cluster node, and each
1014           # value is a list of instances which have the key as primary and the
1015           # current node as secondary.  this is handy to calculate N+1 memory
1016           # availability if you can only failover from a primary to its
1017           # secondary.
1018           "sinst-by-pnode": {},
1019         }
1020       except ValueError:
1021         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1022         bad = True
1023         continue
1024
1025     node_vol_should = {}
1026
1027     for instance in instancelist:
1028       feedback_fn("* Verifying instance %s" % instance)
1029       inst_config = instanceinfo[instance]
1030       result =  self._VerifyInstance(instance, inst_config, node_volume,
1031                                      node_instance, feedback_fn, n_offline)
1032       bad = bad or result
1033       inst_nodes_offline = []
1034
1035       inst_config.MapLVsByNode(node_vol_should)
1036
1037       instance_cfg[instance] = inst_config
1038
1039       pnode = inst_config.primary_node
1040       if pnode in node_info:
1041         node_info[pnode]['pinst'].append(instance)
1042       elif pnode not in n_offline:
1043         feedback_fn("  - ERROR: instance %s, connection to primary node"
1044                     " %s failed" % (instance, pnode))
1045         bad = True
1046
1047       if pnode in n_offline:
1048         inst_nodes_offline.append(pnode)
1049
1050       # If the instance is non-redundant we cannot survive losing its primary
1051       # node, so we are not N+1 compliant. On the other hand we have no disk
1052       # templates with more than one secondary so that situation is not well
1053       # supported either.
1054       # FIXME: does not support file-backed instances
1055       if len(inst_config.secondary_nodes) == 0:
1056         i_non_redundant.append(instance)
1057       elif len(inst_config.secondary_nodes) > 1:
1058         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1059                     % instance)
1060
1061       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1062         i_non_a_balanced.append(instance)
1063
1064       for snode in inst_config.secondary_nodes:
1065         if snode in node_info:
1066           node_info[snode]['sinst'].append(instance)
1067           if pnode not in node_info[snode]['sinst-by-pnode']:
1068             node_info[snode]['sinst-by-pnode'][pnode] = []
1069           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1070         elif snode not in n_offline:
1071           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1072                       " %s failed" % (instance, snode))
1073           bad = True
1074         if snode in n_offline:
1075           inst_nodes_offline.append(snode)
1076
1077       if inst_nodes_offline:
1078         # warn that the instance lives on offline nodes, and set bad=True
1079         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1080                     ", ".join(inst_nodes_offline))
1081         bad = True
1082
1083     feedback_fn("* Verifying orphan volumes")
1084     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1085                                        feedback_fn)
1086     bad = bad or result
1087
1088     feedback_fn("* Verifying remaining instances")
1089     result = self._VerifyOrphanInstances(instancelist, node_instance,
1090                                          feedback_fn)
1091     bad = bad or result
1092
1093     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1094       feedback_fn("* Verifying N+1 Memory redundancy")
1095       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1096       bad = bad or result
1097
1098     feedback_fn("* Other Notes")
1099     if i_non_redundant:
1100       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1101                   % len(i_non_redundant))
1102
1103     if i_non_a_balanced:
1104       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1105                   % len(i_non_a_balanced))
1106
1107     if n_offline:
1108       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1109
1110     return not bad
1111
1112   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1113     """Analize the post-hooks' result
1114
1115     This method analyses the hook result, handles it, and sends some
1116     nicely-formatted feedback back to the user.
1117
1118     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1119         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1120     @param hooks_results: the results of the multi-node hooks rpc call
1121     @param feedback_fn: function used send feedback back to the caller
1122     @param lu_result: previous Exec result
1123     @return: the new Exec result, based on the previous result
1124         and hook results
1125
1126     """
1127     # We only really run POST phase hooks, and are only interested in
1128     # their results
1129     if phase == constants.HOOKS_PHASE_POST:
1130       # Used to change hooks' output to proper indentation
1131       indent_re = re.compile('^', re.M)
1132       feedback_fn("* Hooks Results")
1133       if not hooks_results:
1134         feedback_fn("  - ERROR: general communication failure")
1135         lu_result = 1
1136       else:
1137         for node_name in hooks_results:
1138           show_node_header = True
1139           res = hooks_results[node_name]
1140           if res.failed or res.data is False or not isinstance(res.data, list):
1141             if res.offline:
1142               # no need to warn or set fail return value
1143               continue
1144             feedback_fn("    Communication failure in hooks execution")
1145             lu_result = 1
1146             continue
1147           for script, hkr, output in res.data:
1148             if hkr == constants.HKR_FAIL:
1149               # The node header is only shown once, if there are
1150               # failing hooks on that node
1151               if show_node_header:
1152                 feedback_fn("  Node %s:" % node_name)
1153                 show_node_header = False
1154               feedback_fn("    ERROR: Script %s failed, output:" % script)
1155               output = indent_re.sub('      ', output)
1156               feedback_fn("%s" % output)
1157               lu_result = 1
1158
1159       return lu_result
1160
1161
1162 class LUVerifyDisks(NoHooksLU):
1163   """Verifies the cluster disks status.
1164
1165   """
1166   _OP_REQP = []
1167   REQ_BGL = False
1168
1169   def ExpandNames(self):
1170     self.needed_locks = {
1171       locking.LEVEL_NODE: locking.ALL_SET,
1172       locking.LEVEL_INSTANCE: locking.ALL_SET,
1173     }
1174     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1175
1176   def CheckPrereq(self):
1177     """Check prerequisites.
1178
1179     This has no prerequisites.
1180
1181     """
1182     pass
1183
1184   def Exec(self, feedback_fn):
1185     """Verify integrity of cluster disks.
1186
1187     """
1188     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1189
1190     vg_name = self.cfg.GetVGName()
1191     nodes = utils.NiceSort(self.cfg.GetNodeList())
1192     instances = [self.cfg.GetInstanceInfo(name)
1193                  for name in self.cfg.GetInstanceList()]
1194
1195     nv_dict = {}
1196     for inst in instances:
1197       inst_lvs = {}
1198       if (not inst.admin_up or
1199           inst.disk_template not in constants.DTS_NET_MIRROR):
1200         continue
1201       inst.MapLVsByNode(inst_lvs)
1202       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1203       for node, vol_list in inst_lvs.iteritems():
1204         for vol in vol_list:
1205           nv_dict[(node, vol)] = inst
1206
1207     if not nv_dict:
1208       return result
1209
1210     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1211
1212     to_act = set()
1213     for node in nodes:
1214       # node_volume
1215       lvs = node_lvs[node]
1216       if lvs.failed:
1217         if not lvs.offline:
1218           self.LogWarning("Connection to node %s failed: %s" %
1219                           (node, lvs.data))
1220         continue
1221       lvs = lvs.data
1222       if isinstance(lvs, basestring):
1223         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1224         res_nlvm[node] = lvs
1225       elif not isinstance(lvs, dict):
1226         logging.warning("Connection to node %s failed or invalid data"
1227                         " returned", node)
1228         res_nodes.append(node)
1229         continue
1230
1231       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1232         inst = nv_dict.pop((node, lv_name), None)
1233         if (not lv_online and inst is not None
1234             and inst.name not in res_instances):
1235           res_instances.append(inst.name)
1236
1237     # any leftover items in nv_dict are missing LVs, let's arrange the
1238     # data better
1239     for key, inst in nv_dict.iteritems():
1240       if inst.name not in res_missing:
1241         res_missing[inst.name] = []
1242       res_missing[inst.name].append(key)
1243
1244     return result
1245
1246
1247 class LURenameCluster(LogicalUnit):
1248   """Rename the cluster.
1249
1250   """
1251   HPATH = "cluster-rename"
1252   HTYPE = constants.HTYPE_CLUSTER
1253   _OP_REQP = ["name"]
1254
1255   def BuildHooksEnv(self):
1256     """Build hooks env.
1257
1258     """
1259     env = {
1260       "OP_TARGET": self.cfg.GetClusterName(),
1261       "NEW_NAME": self.op.name,
1262       }
1263     mn = self.cfg.GetMasterNode()
1264     return env, [mn], [mn]
1265
1266   def CheckPrereq(self):
1267     """Verify that the passed name is a valid one.
1268
1269     """
1270     hostname = utils.HostInfo(self.op.name)
1271
1272     new_name = hostname.name
1273     self.ip = new_ip = hostname.ip
1274     old_name = self.cfg.GetClusterName()
1275     old_ip = self.cfg.GetMasterIP()
1276     if new_name == old_name and new_ip == old_ip:
1277       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1278                                  " cluster has changed")
1279     if new_ip != old_ip:
1280       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1281         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1282                                    " reachable on the network. Aborting." %
1283                                    new_ip)
1284
1285     self.op.name = new_name
1286
1287   def Exec(self, feedback_fn):
1288     """Rename the cluster.
1289
1290     """
1291     clustername = self.op.name
1292     ip = self.ip
1293
1294     # shutdown the master IP
1295     master = self.cfg.GetMasterNode()
1296     result = self.rpc.call_node_stop_master(master, False)
1297     if result.failed or not result.data:
1298       raise errors.OpExecError("Could not disable the master role")
1299
1300     try:
1301       cluster = self.cfg.GetClusterInfo()
1302       cluster.cluster_name = clustername
1303       cluster.master_ip = ip
1304       self.cfg.Update(cluster)
1305
1306       # update the known hosts file
1307       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1308       node_list = self.cfg.GetNodeList()
1309       try:
1310         node_list.remove(master)
1311       except ValueError:
1312         pass
1313       result = self.rpc.call_upload_file(node_list,
1314                                          constants.SSH_KNOWN_HOSTS_FILE)
1315       for to_node, to_result in result.iteritems():
1316         if to_result.failed or not to_result.data:
1317           logging.error("Copy of file %s to node %s failed",
1318                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1319
1320     finally:
1321       result = self.rpc.call_node_start_master(master, False)
1322       if result.failed or not result.data:
1323         self.LogWarning("Could not re-enable the master role on"
1324                         " the master, please restart manually.")
1325
1326
1327 def _RecursiveCheckIfLVMBased(disk):
1328   """Check if the given disk or its children are lvm-based.
1329
1330   @type disk: L{objects.Disk}
1331   @param disk: the disk to check
1332   @rtype: booleean
1333   @return: boolean indicating whether a LD_LV dev_type was found or not
1334
1335   """
1336   if disk.children:
1337     for chdisk in disk.children:
1338       if _RecursiveCheckIfLVMBased(chdisk):
1339         return True
1340   return disk.dev_type == constants.LD_LV
1341
1342
1343 class LUSetClusterParams(LogicalUnit):
1344   """Change the parameters of the cluster.
1345
1346   """
1347   HPATH = "cluster-modify"
1348   HTYPE = constants.HTYPE_CLUSTER
1349   _OP_REQP = []
1350   REQ_BGL = False
1351
1352   def CheckParameters(self):
1353     """Check parameters
1354
1355     """
1356     if not hasattr(self.op, "candidate_pool_size"):
1357       self.op.candidate_pool_size = None
1358     if self.op.candidate_pool_size is not None:
1359       try:
1360         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1361       except ValueError, err:
1362         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1363                                    str(err))
1364       if self.op.candidate_pool_size < 1:
1365         raise errors.OpPrereqError("At least one master candidate needed")
1366
1367   def ExpandNames(self):
1368     # FIXME: in the future maybe other cluster params won't require checking on
1369     # all nodes to be modified.
1370     self.needed_locks = {
1371       locking.LEVEL_NODE: locking.ALL_SET,
1372     }
1373     self.share_locks[locking.LEVEL_NODE] = 1
1374
1375   def BuildHooksEnv(self):
1376     """Build hooks env.
1377
1378     """
1379     env = {
1380       "OP_TARGET": self.cfg.GetClusterName(),
1381       "NEW_VG_NAME": self.op.vg_name,
1382       }
1383     mn = self.cfg.GetMasterNode()
1384     return env, [mn], [mn]
1385
1386   def CheckPrereq(self):
1387     """Check prerequisites.
1388
1389     This checks whether the given params don't conflict and
1390     if the given volume group is valid.
1391
1392     """
1393     # FIXME: This only works because there is only one parameter that can be
1394     # changed or removed.
1395     if self.op.vg_name is not None and not self.op.vg_name:
1396       instances = self.cfg.GetAllInstancesInfo().values()
1397       for inst in instances:
1398         for disk in inst.disks:
1399           if _RecursiveCheckIfLVMBased(disk):
1400             raise errors.OpPrereqError("Cannot disable lvm storage while"
1401                                        " lvm-based instances exist")
1402
1403     node_list = self.acquired_locks[locking.LEVEL_NODE]
1404
1405     # if vg_name not None, checks given volume group on all nodes
1406     if self.op.vg_name:
1407       vglist = self.rpc.call_vg_list(node_list)
1408       for node in node_list:
1409         if vglist[node].failed:
1410           # ignoring down node
1411           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1412           continue
1413         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1414                                               self.op.vg_name,
1415                                               constants.MIN_VG_SIZE)
1416         if vgstatus:
1417           raise errors.OpPrereqError("Error on node '%s': %s" %
1418                                      (node, vgstatus))
1419
1420     self.cluster = cluster = self.cfg.GetClusterInfo()
1421     # validate beparams changes
1422     if self.op.beparams:
1423       utils.CheckBEParams(self.op.beparams)
1424       self.new_beparams = cluster.FillDict(
1425         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1426
1427     # hypervisor list/parameters
1428     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1429     if self.op.hvparams:
1430       if not isinstance(self.op.hvparams, dict):
1431         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1432       for hv_name, hv_dict in self.op.hvparams.items():
1433         if hv_name not in self.new_hvparams:
1434           self.new_hvparams[hv_name] = hv_dict
1435         else:
1436           self.new_hvparams[hv_name].update(hv_dict)
1437
1438     if self.op.enabled_hypervisors is not None:
1439       self.hv_list = self.op.enabled_hypervisors
1440     else:
1441       self.hv_list = cluster.enabled_hypervisors
1442
1443     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1444       # either the enabled list has changed, or the parameters have, validate
1445       for hv_name, hv_params in self.new_hvparams.items():
1446         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1447             (self.op.enabled_hypervisors and
1448              hv_name in self.op.enabled_hypervisors)):
1449           # either this is a new hypervisor, or its parameters have changed
1450           hv_class = hypervisor.GetHypervisor(hv_name)
1451           hv_class.CheckParameterSyntax(hv_params)
1452           _CheckHVParams(self, node_list, hv_name, hv_params)
1453
1454   def Exec(self, feedback_fn):
1455     """Change the parameters of the cluster.
1456
1457     """
1458     if self.op.vg_name is not None:
1459       if self.op.vg_name != self.cfg.GetVGName():
1460         self.cfg.SetVGName(self.op.vg_name)
1461       else:
1462         feedback_fn("Cluster LVM configuration already in desired"
1463                     " state, not changing")
1464     if self.op.hvparams:
1465       self.cluster.hvparams = self.new_hvparams
1466     if self.op.enabled_hypervisors is not None:
1467       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1468     if self.op.beparams:
1469       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1470     if self.op.candidate_pool_size is not None:
1471       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1472
1473     self.cfg.Update(self.cluster)
1474
1475     # we want to update nodes after the cluster so that if any errors
1476     # happen, we have recorded and saved the cluster info
1477     if self.op.candidate_pool_size is not None:
1478       _AdjustCandidatePool(self)
1479
1480
1481 class LURedistributeConfig(NoHooksLU):
1482   """Force the redistribution of cluster configuration.
1483
1484   This is a very simple LU.
1485
1486   """
1487   _OP_REQP = []
1488   REQ_BGL = False
1489
1490   def ExpandNames(self):
1491     self.needed_locks = {
1492       locking.LEVEL_NODE: locking.ALL_SET,
1493     }
1494     self.share_locks[locking.LEVEL_NODE] = 1
1495
1496   def CheckPrereq(self):
1497     """Check prerequisites.
1498
1499     """
1500
1501   def Exec(self, feedback_fn):
1502     """Redistribute the configuration.
1503
1504     """
1505     self.cfg.Update(self.cfg.GetClusterInfo())
1506
1507
1508 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1509   """Sleep and poll for an instance's disk to sync.
1510
1511   """
1512   if not instance.disks:
1513     return True
1514
1515   if not oneshot:
1516     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1517
1518   node = instance.primary_node
1519
1520   for dev in instance.disks:
1521     lu.cfg.SetDiskID(dev, node)
1522
1523   retries = 0
1524   while True:
1525     max_time = 0
1526     done = True
1527     cumul_degraded = False
1528     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1529     if rstats.failed or not rstats.data:
1530       lu.LogWarning("Can't get any data from node %s", node)
1531       retries += 1
1532       if retries >= 10:
1533         raise errors.RemoteError("Can't contact node %s for mirror data,"
1534                                  " aborting." % node)
1535       time.sleep(6)
1536       continue
1537     rstats = rstats.data
1538     retries = 0
1539     for i, mstat in enumerate(rstats):
1540       if mstat is None:
1541         lu.LogWarning("Can't compute data for node %s/%s",
1542                            node, instance.disks[i].iv_name)
1543         continue
1544       # we ignore the ldisk parameter
1545       perc_done, est_time, is_degraded, _ = mstat
1546       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1547       if perc_done is not None:
1548         done = False
1549         if est_time is not None:
1550           rem_time = "%d estimated seconds remaining" % est_time
1551           max_time = est_time
1552         else:
1553           rem_time = "no time estimate"
1554         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1555                         (instance.disks[i].iv_name, perc_done, rem_time))
1556     if done or oneshot:
1557       break
1558
1559     time.sleep(min(60, max_time))
1560
1561   if done:
1562     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1563   return not cumul_degraded
1564
1565
1566 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1567   """Check that mirrors are not degraded.
1568
1569   The ldisk parameter, if True, will change the test from the
1570   is_degraded attribute (which represents overall non-ok status for
1571   the device(s)) to the ldisk (representing the local storage status).
1572
1573   """
1574   lu.cfg.SetDiskID(dev, node)
1575   if ldisk:
1576     idx = 6
1577   else:
1578     idx = 5
1579
1580   result = True
1581   if on_primary or dev.AssembleOnSecondary():
1582     rstats = lu.rpc.call_blockdev_find(node, dev)
1583     if rstats.failed or not rstats.data:
1584       logging.warning("Node %s: disk degraded, not found or node down", node)
1585       result = False
1586     else:
1587       result = result and (not rstats.data[idx])
1588   if dev.children:
1589     for child in dev.children:
1590       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1591
1592   return result
1593
1594
1595 class LUDiagnoseOS(NoHooksLU):
1596   """Logical unit for OS diagnose/query.
1597
1598   """
1599   _OP_REQP = ["output_fields", "names"]
1600   REQ_BGL = False
1601   _FIELDS_STATIC = utils.FieldSet()
1602   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1603
1604   def ExpandNames(self):
1605     if self.op.names:
1606       raise errors.OpPrereqError("Selective OS query not supported")
1607
1608     _CheckOutputFields(static=self._FIELDS_STATIC,
1609                        dynamic=self._FIELDS_DYNAMIC,
1610                        selected=self.op.output_fields)
1611
1612     # Lock all nodes, in shared mode
1613     self.needed_locks = {}
1614     self.share_locks[locking.LEVEL_NODE] = 1
1615     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1616
1617   def CheckPrereq(self):
1618     """Check prerequisites.
1619
1620     """
1621
1622   @staticmethod
1623   def _DiagnoseByOS(node_list, rlist):
1624     """Remaps a per-node return list into an a per-os per-node dictionary
1625
1626     @param node_list: a list with the names of all nodes
1627     @param rlist: a map with node names as keys and OS objects as values
1628
1629     @rtype: dict
1630     @returns: a dictionary with osnames as keys and as value another map, with
1631         nodes as keys and list of OS objects as values, eg::
1632
1633           {"debian-etch": {"node1": [<object>,...],
1634                            "node2": [<object>,]}
1635           }
1636
1637     """
1638     all_os = {}
1639     for node_name, nr in rlist.iteritems():
1640       if nr.failed or not nr.data:
1641         continue
1642       for os_obj in nr.data:
1643         if os_obj.name not in all_os:
1644           # build a list of nodes for this os containing empty lists
1645           # for each node in node_list
1646           all_os[os_obj.name] = {}
1647           for nname in node_list:
1648             all_os[os_obj.name][nname] = []
1649         all_os[os_obj.name][node_name].append(os_obj)
1650     return all_os
1651
1652   def Exec(self, feedback_fn):
1653     """Compute the list of OSes.
1654
1655     """
1656     node_list = self.acquired_locks[locking.LEVEL_NODE]
1657     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1658                    if node in node_list]
1659     node_data = self.rpc.call_os_diagnose(valid_nodes)
1660     if node_data == False:
1661       raise errors.OpExecError("Can't gather the list of OSes")
1662     pol = self._DiagnoseByOS(valid_nodes, node_data)
1663     output = []
1664     for os_name, os_data in pol.iteritems():
1665       row = []
1666       for field in self.op.output_fields:
1667         if field == "name":
1668           val = os_name
1669         elif field == "valid":
1670           val = utils.all([osl and osl[0] for osl in os_data.values()])
1671         elif field == "node_status":
1672           val = {}
1673           for node_name, nos_list in os_data.iteritems():
1674             val[node_name] = [(v.status, v.path) for v in nos_list]
1675         else:
1676           raise errors.ParameterError(field)
1677         row.append(val)
1678       output.append(row)
1679
1680     return output
1681
1682
1683 class LURemoveNode(LogicalUnit):
1684   """Logical unit for removing a node.
1685
1686   """
1687   HPATH = "node-remove"
1688   HTYPE = constants.HTYPE_NODE
1689   _OP_REQP = ["node_name"]
1690
1691   def BuildHooksEnv(self):
1692     """Build hooks env.
1693
1694     This doesn't run on the target node in the pre phase as a failed
1695     node would then be impossible to remove.
1696
1697     """
1698     env = {
1699       "OP_TARGET": self.op.node_name,
1700       "NODE_NAME": self.op.node_name,
1701       }
1702     all_nodes = self.cfg.GetNodeList()
1703     all_nodes.remove(self.op.node_name)
1704     return env, all_nodes, all_nodes
1705
1706   def CheckPrereq(self):
1707     """Check prerequisites.
1708
1709     This checks:
1710      - the node exists in the configuration
1711      - it does not have primary or secondary instances
1712      - it's not the master
1713
1714     Any errors are signalled by raising errors.OpPrereqError.
1715
1716     """
1717     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1718     if node is None:
1719       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1720
1721     instance_list = self.cfg.GetInstanceList()
1722
1723     masternode = self.cfg.GetMasterNode()
1724     if node.name == masternode:
1725       raise errors.OpPrereqError("Node is the master node,"
1726                                  " you need to failover first.")
1727
1728     for instance_name in instance_list:
1729       instance = self.cfg.GetInstanceInfo(instance_name)
1730       if node.name in instance.all_nodes:
1731         raise errors.OpPrereqError("Instance %s is still running on the node,"
1732                                    " please remove first." % instance_name)
1733     self.op.node_name = node.name
1734     self.node = node
1735
1736   def Exec(self, feedback_fn):
1737     """Removes the node from the cluster.
1738
1739     """
1740     node = self.node
1741     logging.info("Stopping the node daemon and removing configs from node %s",
1742                  node.name)
1743
1744     self.context.RemoveNode(node.name)
1745
1746     self.rpc.call_node_leave_cluster(node.name)
1747
1748     # Promote nodes to master candidate as needed
1749     _AdjustCandidatePool(self)
1750
1751
1752 class LUQueryNodes(NoHooksLU):
1753   """Logical unit for querying nodes.
1754
1755   """
1756   _OP_REQP = ["output_fields", "names"]
1757   REQ_BGL = False
1758   _FIELDS_DYNAMIC = utils.FieldSet(
1759     "dtotal", "dfree",
1760     "mtotal", "mnode", "mfree",
1761     "bootid",
1762     "ctotal",
1763     )
1764
1765   _FIELDS_STATIC = utils.FieldSet(
1766     "name", "pinst_cnt", "sinst_cnt",
1767     "pinst_list", "sinst_list",
1768     "pip", "sip", "tags",
1769     "serial_no",
1770     "master_candidate",
1771     "master",
1772     "offline",
1773     )
1774
1775   def ExpandNames(self):
1776     _CheckOutputFields(static=self._FIELDS_STATIC,
1777                        dynamic=self._FIELDS_DYNAMIC,
1778                        selected=self.op.output_fields)
1779
1780     self.needed_locks = {}
1781     self.share_locks[locking.LEVEL_NODE] = 1
1782
1783     if self.op.names:
1784       self.wanted = _GetWantedNodes(self, self.op.names)
1785     else:
1786       self.wanted = locking.ALL_SET
1787
1788     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1789     if self.do_locking:
1790       # if we don't request only static fields, we need to lock the nodes
1791       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1792
1793
1794   def CheckPrereq(self):
1795     """Check prerequisites.
1796
1797     """
1798     # The validation of the node list is done in the _GetWantedNodes,
1799     # if non empty, and if empty, there's no validation to do
1800     pass
1801
1802   def Exec(self, feedback_fn):
1803     """Computes the list of nodes and their attributes.
1804
1805     """
1806     all_info = self.cfg.GetAllNodesInfo()
1807     if self.do_locking:
1808       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1809     elif self.wanted != locking.ALL_SET:
1810       nodenames = self.wanted
1811       missing = set(nodenames).difference(all_info.keys())
1812       if missing:
1813         raise errors.OpExecError(
1814           "Some nodes were removed before retrieving their data: %s" % missing)
1815     else:
1816       nodenames = all_info.keys()
1817
1818     nodenames = utils.NiceSort(nodenames)
1819     nodelist = [all_info[name] for name in nodenames]
1820
1821     # begin data gathering
1822
1823     if self.do_locking:
1824       live_data = {}
1825       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1826                                           self.cfg.GetHypervisorType())
1827       for name in nodenames:
1828         nodeinfo = node_data[name]
1829         if not nodeinfo.failed and nodeinfo.data:
1830           nodeinfo = nodeinfo.data
1831           fn = utils.TryConvert
1832           live_data[name] = {
1833             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1834             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1835             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1836             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1837             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1838             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1839             "bootid": nodeinfo.get('bootid', None),
1840             }
1841         else:
1842           live_data[name] = {}
1843     else:
1844       live_data = dict.fromkeys(nodenames, {})
1845
1846     node_to_primary = dict([(name, set()) for name in nodenames])
1847     node_to_secondary = dict([(name, set()) for name in nodenames])
1848
1849     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1850                              "sinst_cnt", "sinst_list"))
1851     if inst_fields & frozenset(self.op.output_fields):
1852       instancelist = self.cfg.GetInstanceList()
1853
1854       for instance_name in instancelist:
1855         inst = self.cfg.GetInstanceInfo(instance_name)
1856         if inst.primary_node in node_to_primary:
1857           node_to_primary[inst.primary_node].add(inst.name)
1858         for secnode in inst.secondary_nodes:
1859           if secnode in node_to_secondary:
1860             node_to_secondary[secnode].add(inst.name)
1861
1862     master_node = self.cfg.GetMasterNode()
1863
1864     # end data gathering
1865
1866     output = []
1867     for node in nodelist:
1868       node_output = []
1869       for field in self.op.output_fields:
1870         if field == "name":
1871           val = node.name
1872         elif field == "pinst_list":
1873           val = list(node_to_primary[node.name])
1874         elif field == "sinst_list":
1875           val = list(node_to_secondary[node.name])
1876         elif field == "pinst_cnt":
1877           val = len(node_to_primary[node.name])
1878         elif field == "sinst_cnt":
1879           val = len(node_to_secondary[node.name])
1880         elif field == "pip":
1881           val = node.primary_ip
1882         elif field == "sip":
1883           val = node.secondary_ip
1884         elif field == "tags":
1885           val = list(node.GetTags())
1886         elif field == "serial_no":
1887           val = node.serial_no
1888         elif field == "master_candidate":
1889           val = node.master_candidate
1890         elif field == "master":
1891           val = node.name == master_node
1892         elif field == "offline":
1893           val = node.offline
1894         elif self._FIELDS_DYNAMIC.Matches(field):
1895           val = live_data[node.name].get(field, None)
1896         else:
1897           raise errors.ParameterError(field)
1898         node_output.append(val)
1899       output.append(node_output)
1900
1901     return output
1902
1903
1904 class LUQueryNodeVolumes(NoHooksLU):
1905   """Logical unit for getting volumes on node(s).
1906
1907   """
1908   _OP_REQP = ["nodes", "output_fields"]
1909   REQ_BGL = False
1910   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1911   _FIELDS_STATIC = utils.FieldSet("node")
1912
1913   def ExpandNames(self):
1914     _CheckOutputFields(static=self._FIELDS_STATIC,
1915                        dynamic=self._FIELDS_DYNAMIC,
1916                        selected=self.op.output_fields)
1917
1918     self.needed_locks = {}
1919     self.share_locks[locking.LEVEL_NODE] = 1
1920     if not self.op.nodes:
1921       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1922     else:
1923       self.needed_locks[locking.LEVEL_NODE] = \
1924         _GetWantedNodes(self, self.op.nodes)
1925
1926   def CheckPrereq(self):
1927     """Check prerequisites.
1928
1929     This checks that the fields required are valid output fields.
1930
1931     """
1932     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1933
1934   def Exec(self, feedback_fn):
1935     """Computes the list of nodes and their attributes.
1936
1937     """
1938     nodenames = self.nodes
1939     volumes = self.rpc.call_node_volumes(nodenames)
1940
1941     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1942              in self.cfg.GetInstanceList()]
1943
1944     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1945
1946     output = []
1947     for node in nodenames:
1948       if node not in volumes or volumes[node].failed or not volumes[node].data:
1949         continue
1950
1951       node_vols = volumes[node].data[:]
1952       node_vols.sort(key=lambda vol: vol['dev'])
1953
1954       for vol in node_vols:
1955         node_output = []
1956         for field in self.op.output_fields:
1957           if field == "node":
1958             val = node
1959           elif field == "phys":
1960             val = vol['dev']
1961           elif field == "vg":
1962             val = vol['vg']
1963           elif field == "name":
1964             val = vol['name']
1965           elif field == "size":
1966             val = int(float(vol['size']))
1967           elif field == "instance":
1968             for inst in ilist:
1969               if node not in lv_by_node[inst]:
1970                 continue
1971               if vol['name'] in lv_by_node[inst][node]:
1972                 val = inst.name
1973                 break
1974             else:
1975               val = '-'
1976           else:
1977             raise errors.ParameterError(field)
1978           node_output.append(str(val))
1979
1980         output.append(node_output)
1981
1982     return output
1983
1984
1985 class LUAddNode(LogicalUnit):
1986   """Logical unit for adding node to the cluster.
1987
1988   """
1989   HPATH = "node-add"
1990   HTYPE = constants.HTYPE_NODE
1991   _OP_REQP = ["node_name"]
1992
1993   def BuildHooksEnv(self):
1994     """Build hooks env.
1995
1996     This will run on all nodes before, and on all nodes + the new node after.
1997
1998     """
1999     env = {
2000       "OP_TARGET": self.op.node_name,
2001       "NODE_NAME": self.op.node_name,
2002       "NODE_PIP": self.op.primary_ip,
2003       "NODE_SIP": self.op.secondary_ip,
2004       }
2005     nodes_0 = self.cfg.GetNodeList()
2006     nodes_1 = nodes_0 + [self.op.node_name, ]
2007     return env, nodes_0, nodes_1
2008
2009   def CheckPrereq(self):
2010     """Check prerequisites.
2011
2012     This checks:
2013      - the new node is not already in the config
2014      - it is resolvable
2015      - its parameters (single/dual homed) matches the cluster
2016
2017     Any errors are signalled by raising errors.OpPrereqError.
2018
2019     """
2020     node_name = self.op.node_name
2021     cfg = self.cfg
2022
2023     dns_data = utils.HostInfo(node_name)
2024
2025     node = dns_data.name
2026     primary_ip = self.op.primary_ip = dns_data.ip
2027     secondary_ip = getattr(self.op, "secondary_ip", None)
2028     if secondary_ip is None:
2029       secondary_ip = primary_ip
2030     if not utils.IsValidIP(secondary_ip):
2031       raise errors.OpPrereqError("Invalid secondary IP given")
2032     self.op.secondary_ip = secondary_ip
2033
2034     node_list = cfg.GetNodeList()
2035     if not self.op.readd and node in node_list:
2036       raise errors.OpPrereqError("Node %s is already in the configuration" %
2037                                  node)
2038     elif self.op.readd and node not in node_list:
2039       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2040
2041     for existing_node_name in node_list:
2042       existing_node = cfg.GetNodeInfo(existing_node_name)
2043
2044       if self.op.readd and node == existing_node_name:
2045         if (existing_node.primary_ip != primary_ip or
2046             existing_node.secondary_ip != secondary_ip):
2047           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2048                                      " address configuration as before")
2049         continue
2050
2051       if (existing_node.primary_ip == primary_ip or
2052           existing_node.secondary_ip == primary_ip or
2053           existing_node.primary_ip == secondary_ip or
2054           existing_node.secondary_ip == secondary_ip):
2055         raise errors.OpPrereqError("New node ip address(es) conflict with"
2056                                    " existing node %s" % existing_node.name)
2057
2058     # check that the type of the node (single versus dual homed) is the
2059     # same as for the master
2060     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2061     master_singlehomed = myself.secondary_ip == myself.primary_ip
2062     newbie_singlehomed = secondary_ip == primary_ip
2063     if master_singlehomed != newbie_singlehomed:
2064       if master_singlehomed:
2065         raise errors.OpPrereqError("The master has no private ip but the"
2066                                    " new node has one")
2067       else:
2068         raise errors.OpPrereqError("The master has a private ip but the"
2069                                    " new node doesn't have one")
2070
2071     # checks reachablity
2072     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2073       raise errors.OpPrereqError("Node not reachable by ping")
2074
2075     if not newbie_singlehomed:
2076       # check reachability from my secondary ip to newbie's secondary ip
2077       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2078                            source=myself.secondary_ip):
2079         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2080                                    " based ping to noded port")
2081
2082     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2083     mc_now, _ = self.cfg.GetMasterCandidateStats()
2084     master_candidate = mc_now < cp_size
2085
2086     self.new_node = objects.Node(name=node,
2087                                  primary_ip=primary_ip,
2088                                  secondary_ip=secondary_ip,
2089                                  master_candidate=master_candidate,
2090                                  offline=False)
2091
2092   def Exec(self, feedback_fn):
2093     """Adds the new node to the cluster.
2094
2095     """
2096     new_node = self.new_node
2097     node = new_node.name
2098
2099     # check connectivity
2100     result = self.rpc.call_version([node])[node]
2101     result.Raise()
2102     if result.data:
2103       if constants.PROTOCOL_VERSION == result.data:
2104         logging.info("Communication to node %s fine, sw version %s match",
2105                      node, result.data)
2106       else:
2107         raise errors.OpExecError("Version mismatch master version %s,"
2108                                  " node version %s" %
2109                                  (constants.PROTOCOL_VERSION, result.data))
2110     else:
2111       raise errors.OpExecError("Cannot get version from the new node")
2112
2113     # setup ssh on node
2114     logging.info("Copy ssh key to node %s", node)
2115     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2116     keyarray = []
2117     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2118                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2119                 priv_key, pub_key]
2120
2121     for i in keyfiles:
2122       f = open(i, 'r')
2123       try:
2124         keyarray.append(f.read())
2125       finally:
2126         f.close()
2127
2128     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2129                                     keyarray[2],
2130                                     keyarray[3], keyarray[4], keyarray[5])
2131
2132     if result.failed or not result.data:
2133       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2134
2135     # Add node to our /etc/hosts, and add key to known_hosts
2136     utils.AddHostToEtcHosts(new_node.name)
2137
2138     if new_node.secondary_ip != new_node.primary_ip:
2139       result = self.rpc.call_node_has_ip_address(new_node.name,
2140                                                  new_node.secondary_ip)
2141       if result.failed or not result.data:
2142         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2143                                  " you gave (%s). Please fix and re-run this"
2144                                  " command." % new_node.secondary_ip)
2145
2146     node_verify_list = [self.cfg.GetMasterNode()]
2147     node_verify_param = {
2148       'nodelist': [node],
2149       # TODO: do a node-net-test as well?
2150     }
2151
2152     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2153                                        self.cfg.GetClusterName())
2154     for verifier in node_verify_list:
2155       if result[verifier].failed or not result[verifier].data:
2156         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2157                                  " for remote verification" % verifier)
2158       if result[verifier].data['nodelist']:
2159         for failed in result[verifier].data['nodelist']:
2160           feedback_fn("ssh/hostname verification failed %s -> %s" %
2161                       (verifier, result[verifier]['nodelist'][failed]))
2162         raise errors.OpExecError("ssh/hostname verification failed.")
2163
2164     # Distribute updated /etc/hosts and known_hosts to all nodes,
2165     # including the node just added
2166     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2167     dist_nodes = self.cfg.GetNodeList()
2168     if not self.op.readd:
2169       dist_nodes.append(node)
2170     if myself.name in dist_nodes:
2171       dist_nodes.remove(myself.name)
2172
2173     logging.debug("Copying hosts and known_hosts to all nodes")
2174     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2175       result = self.rpc.call_upload_file(dist_nodes, fname)
2176       for to_node, to_result in result.iteritems():
2177         if to_result.failed or not to_result.data:
2178           logging.error("Copy of file %s to node %s failed", fname, to_node)
2179
2180     to_copy = []
2181     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2182       to_copy.append(constants.VNC_PASSWORD_FILE)
2183     for fname in to_copy:
2184       result = self.rpc.call_upload_file([node], fname)
2185       if result[node].failed or not result[node]:
2186         logging.error("Could not copy file %s to node %s", fname, node)
2187
2188     if self.op.readd:
2189       self.context.ReaddNode(new_node)
2190     else:
2191       self.context.AddNode(new_node)
2192
2193
2194 class LUSetNodeParams(LogicalUnit):
2195   """Modifies the parameters of a node.
2196
2197   """
2198   HPATH = "node-modify"
2199   HTYPE = constants.HTYPE_NODE
2200   _OP_REQP = ["node_name"]
2201   REQ_BGL = False
2202
2203   def CheckArguments(self):
2204     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2205     if node_name is None:
2206       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2207     self.op.node_name = node_name
2208     _CheckBooleanOpField(self.op, 'master_candidate')
2209     _CheckBooleanOpField(self.op, 'offline')
2210     if self.op.master_candidate is None and self.op.offline is None:
2211       raise errors.OpPrereqError("Please pass at least one modification")
2212     if self.op.offline == True and self.op.master_candidate == True:
2213       raise errors.OpPrereqError("Can't set the node into offline and"
2214                                  " master_candidate at the same time")
2215
2216   def ExpandNames(self):
2217     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2218
2219   def BuildHooksEnv(self):
2220     """Build hooks env.
2221
2222     This runs on the master node.
2223
2224     """
2225     env = {
2226       "OP_TARGET": self.op.node_name,
2227       "MASTER_CANDIDATE": str(self.op.master_candidate),
2228       "OFFLINE": str(self.op.offline),
2229       }
2230     nl = [self.cfg.GetMasterNode(),
2231           self.op.node_name]
2232     return env, nl, nl
2233
2234   def CheckPrereq(self):
2235     """Check prerequisites.
2236
2237     This only checks the instance list against the existing names.
2238
2239     """
2240     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2241
2242     if ((self.op.master_candidate == False or self.op.offline == True)
2243         and node.master_candidate):
2244       # we will demote the node from master_candidate
2245       if self.op.node_name == self.cfg.GetMasterNode():
2246         raise errors.OpPrereqError("The master node has to be a"
2247                                    " master candidate and online")
2248       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2249       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2250       if num_candidates <= cp_size:
2251         msg = ("Not enough master candidates (desired"
2252                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2253         if self.op.force:
2254           self.LogWarning(msg)
2255         else:
2256           raise errors.OpPrereqError(msg)
2257
2258     if (self.op.master_candidate == True and node.offline and
2259         not self.op.offline == False):
2260       raise errors.OpPrereqError("Can't set an offline node to"
2261                                  " master_candidate")
2262
2263     return
2264
2265   def Exec(self, feedback_fn):
2266     """Modifies a node.
2267
2268     """
2269     node = self.node
2270
2271     result = []
2272
2273     if self.op.offline is not None:
2274       node.offline = self.op.offline
2275       result.append(("offline", str(self.op.offline)))
2276       if self.op.offline == True and node.master_candidate:
2277         node.master_candidate = False
2278         result.append(("master_candidate", "auto-demotion due to offline"))
2279
2280     if self.op.master_candidate is not None:
2281       node.master_candidate = self.op.master_candidate
2282       result.append(("master_candidate", str(self.op.master_candidate)))
2283       if self.op.master_candidate == False:
2284         rrc = self.rpc.call_node_demote_from_mc(node.name)
2285         if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2286             or len(rrc.data) != 2):
2287           self.LogWarning("Node rpc error: %s" % rrc.error)
2288         elif not rrc.data[0]:
2289           self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2290
2291     # this will trigger configuration file update, if needed
2292     self.cfg.Update(node)
2293     # this will trigger job queue propagation or cleanup
2294     if self.op.node_name != self.cfg.GetMasterNode():
2295       self.context.ReaddNode(node)
2296
2297     return result
2298
2299
2300 class LUQueryClusterInfo(NoHooksLU):
2301   """Query cluster configuration.
2302
2303   """
2304   _OP_REQP = []
2305   REQ_BGL = False
2306
2307   def ExpandNames(self):
2308     self.needed_locks = {}
2309
2310   def CheckPrereq(self):
2311     """No prerequsites needed for this LU.
2312
2313     """
2314     pass
2315
2316   def Exec(self, feedback_fn):
2317     """Return cluster config.
2318
2319     """
2320     cluster = self.cfg.GetClusterInfo()
2321     result = {
2322       "software_version": constants.RELEASE_VERSION,
2323       "protocol_version": constants.PROTOCOL_VERSION,
2324       "config_version": constants.CONFIG_VERSION,
2325       "os_api_version": constants.OS_API_VERSION,
2326       "export_version": constants.EXPORT_VERSION,
2327       "architecture": (platform.architecture()[0], platform.machine()),
2328       "name": cluster.cluster_name,
2329       "master": cluster.master_node,
2330       "default_hypervisor": cluster.default_hypervisor,
2331       "enabled_hypervisors": cluster.enabled_hypervisors,
2332       "hvparams": cluster.hvparams,
2333       "beparams": cluster.beparams,
2334       "candidate_pool_size": cluster.candidate_pool_size,
2335       }
2336
2337     return result
2338
2339
2340 class LUQueryConfigValues(NoHooksLU):
2341   """Return configuration values.
2342
2343   """
2344   _OP_REQP = []
2345   REQ_BGL = False
2346   _FIELDS_DYNAMIC = utils.FieldSet()
2347   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2348
2349   def ExpandNames(self):
2350     self.needed_locks = {}
2351
2352     _CheckOutputFields(static=self._FIELDS_STATIC,
2353                        dynamic=self._FIELDS_DYNAMIC,
2354                        selected=self.op.output_fields)
2355
2356   def CheckPrereq(self):
2357     """No prerequisites.
2358
2359     """
2360     pass
2361
2362   def Exec(self, feedback_fn):
2363     """Dump a representation of the cluster config to the standard output.
2364
2365     """
2366     values = []
2367     for field in self.op.output_fields:
2368       if field == "cluster_name":
2369         entry = self.cfg.GetClusterName()
2370       elif field == "master_node":
2371         entry = self.cfg.GetMasterNode()
2372       elif field == "drain_flag":
2373         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2374       else:
2375         raise errors.ParameterError(field)
2376       values.append(entry)
2377     return values
2378
2379
2380 class LUActivateInstanceDisks(NoHooksLU):
2381   """Bring up an instance's disks.
2382
2383   """
2384   _OP_REQP = ["instance_name"]
2385   REQ_BGL = False
2386
2387   def ExpandNames(self):
2388     self._ExpandAndLockInstance()
2389     self.needed_locks[locking.LEVEL_NODE] = []
2390     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2391
2392   def DeclareLocks(self, level):
2393     if level == locking.LEVEL_NODE:
2394       self._LockInstancesNodes()
2395
2396   def CheckPrereq(self):
2397     """Check prerequisites.
2398
2399     This checks that the instance is in the cluster.
2400
2401     """
2402     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2403     assert self.instance is not None, \
2404       "Cannot retrieve locked instance %s" % self.op.instance_name
2405     _CheckNodeOnline(self, self.instance.primary_node)
2406
2407   def Exec(self, feedback_fn):
2408     """Activate the disks.
2409
2410     """
2411     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2412     if not disks_ok:
2413       raise errors.OpExecError("Cannot activate block devices")
2414
2415     return disks_info
2416
2417
2418 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2419   """Prepare the block devices for an instance.
2420
2421   This sets up the block devices on all nodes.
2422
2423   @type lu: L{LogicalUnit}
2424   @param lu: the logical unit on whose behalf we execute
2425   @type instance: L{objects.Instance}
2426   @param instance: the instance for whose disks we assemble
2427   @type ignore_secondaries: boolean
2428   @param ignore_secondaries: if true, errors on secondary nodes
2429       won't result in an error return from the function
2430   @return: False if the operation failed, otherwise a list of
2431       (host, instance_visible_name, node_visible_name)
2432       with the mapping from node devices to instance devices
2433
2434   """
2435   device_info = []
2436   disks_ok = True
2437   iname = instance.name
2438   # With the two passes mechanism we try to reduce the window of
2439   # opportunity for the race condition of switching DRBD to primary
2440   # before handshaking occured, but we do not eliminate it
2441
2442   # The proper fix would be to wait (with some limits) until the
2443   # connection has been made and drbd transitions from WFConnection
2444   # into any other network-connected state (Connected, SyncTarget,
2445   # SyncSource, etc.)
2446
2447   # 1st pass, assemble on all nodes in secondary mode
2448   for inst_disk in instance.disks:
2449     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2450       lu.cfg.SetDiskID(node_disk, node)
2451       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2452       if result.failed or not result:
2453         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2454                            " (is_primary=False, pass=1)",
2455                            inst_disk.iv_name, node)
2456         if not ignore_secondaries:
2457           disks_ok = False
2458
2459   # FIXME: race condition on drbd migration to primary
2460
2461   # 2nd pass, do only the primary node
2462   for inst_disk in instance.disks:
2463     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2464       if node != instance.primary_node:
2465         continue
2466       lu.cfg.SetDiskID(node_disk, node)
2467       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2468       if result.failed or not result:
2469         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2470                            " (is_primary=True, pass=2)",
2471                            inst_disk.iv_name, node)
2472         disks_ok = False
2473     device_info.append((instance.primary_node, inst_disk.iv_name, result.data))
2474
2475   # leave the disks configured for the primary node
2476   # this is a workaround that would be fixed better by
2477   # improving the logical/physical id handling
2478   for disk in instance.disks:
2479     lu.cfg.SetDiskID(disk, instance.primary_node)
2480
2481   return disks_ok, device_info
2482
2483
2484 def _StartInstanceDisks(lu, instance, force):
2485   """Start the disks of an instance.
2486
2487   """
2488   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2489                                            ignore_secondaries=force)
2490   if not disks_ok:
2491     _ShutdownInstanceDisks(lu, instance)
2492     if force is not None and not force:
2493       lu.proc.LogWarning("", hint="If the message above refers to a"
2494                          " secondary node,"
2495                          " you can retry the operation using '--force'.")
2496     raise errors.OpExecError("Disk consistency error")
2497
2498
2499 class LUDeactivateInstanceDisks(NoHooksLU):
2500   """Shutdown an instance's disks.
2501
2502   """
2503   _OP_REQP = ["instance_name"]
2504   REQ_BGL = False
2505
2506   def ExpandNames(self):
2507     self._ExpandAndLockInstance()
2508     self.needed_locks[locking.LEVEL_NODE] = []
2509     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2510
2511   def DeclareLocks(self, level):
2512     if level == locking.LEVEL_NODE:
2513       self._LockInstancesNodes()
2514
2515   def CheckPrereq(self):
2516     """Check prerequisites.
2517
2518     This checks that the instance is in the cluster.
2519
2520     """
2521     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2522     assert self.instance is not None, \
2523       "Cannot retrieve locked instance %s" % self.op.instance_name
2524
2525   def Exec(self, feedback_fn):
2526     """Deactivate the disks
2527
2528     """
2529     instance = self.instance
2530     _SafeShutdownInstanceDisks(self, instance)
2531
2532
2533 def _SafeShutdownInstanceDisks(lu, instance):
2534   """Shutdown block devices of an instance.
2535
2536   This function checks if an instance is running, before calling
2537   _ShutdownInstanceDisks.
2538
2539   """
2540   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2541                                       [instance.hypervisor])
2542   ins_l = ins_l[instance.primary_node]
2543   if ins_l.failed or not isinstance(ins_l.data, list):
2544     raise errors.OpExecError("Can't contact node '%s'" %
2545                              instance.primary_node)
2546
2547   if instance.name in ins_l.data:
2548     raise errors.OpExecError("Instance is running, can't shutdown"
2549                              " block devices.")
2550
2551   _ShutdownInstanceDisks(lu, instance)
2552
2553
2554 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2555   """Shutdown block devices of an instance.
2556
2557   This does the shutdown on all nodes of the instance.
2558
2559   If the ignore_primary is false, errors on the primary node are
2560   ignored.
2561
2562   """
2563   result = True
2564   for disk in instance.disks:
2565     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2566       lu.cfg.SetDiskID(top_disk, node)
2567       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2568       if result.failed or not result.data:
2569         logging.error("Could not shutdown block device %s on node %s",
2570                       disk.iv_name, node)
2571         if not ignore_primary or node != instance.primary_node:
2572           result = False
2573   return result
2574
2575
2576 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2577   """Checks if a node has enough free memory.
2578
2579   This function check if a given node has the needed amount of free
2580   memory. In case the node has less memory or we cannot get the
2581   information from the node, this function raise an OpPrereqError
2582   exception.
2583
2584   @type lu: C{LogicalUnit}
2585   @param lu: a logical unit from which we get configuration data
2586   @type node: C{str}
2587   @param node: the node to check
2588   @type reason: C{str}
2589   @param reason: string to use in the error message
2590   @type requested: C{int}
2591   @param requested: the amount of memory in MiB to check for
2592   @type hypervisor_name: C{str}
2593   @param hypervisor_name: the hypervisor to ask for memory stats
2594   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2595       we cannot check the node
2596
2597   """
2598   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2599   nodeinfo[node].Raise()
2600   free_mem = nodeinfo[node].data.get('memory_free')
2601   if not isinstance(free_mem, int):
2602     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2603                              " was '%s'" % (node, free_mem))
2604   if requested > free_mem:
2605     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2606                              " needed %s MiB, available %s MiB" %
2607                              (node, reason, requested, free_mem))
2608
2609
2610 class LUStartupInstance(LogicalUnit):
2611   """Starts an instance.
2612
2613   """
2614   HPATH = "instance-start"
2615   HTYPE = constants.HTYPE_INSTANCE
2616   _OP_REQP = ["instance_name", "force"]
2617   REQ_BGL = False
2618
2619   def ExpandNames(self):
2620     self._ExpandAndLockInstance()
2621
2622   def BuildHooksEnv(self):
2623     """Build hooks env.
2624
2625     This runs on master, primary and secondary nodes of the instance.
2626
2627     """
2628     env = {
2629       "FORCE": self.op.force,
2630       }
2631     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2632     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2633     return env, nl, nl
2634
2635   def CheckPrereq(self):
2636     """Check prerequisites.
2637
2638     This checks that the instance is in the cluster.
2639
2640     """
2641     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2642     assert self.instance is not None, \
2643       "Cannot retrieve locked instance %s" % self.op.instance_name
2644
2645     _CheckNodeOnline(self, instance.primary_node)
2646
2647     bep = self.cfg.GetClusterInfo().FillBE(instance)
2648     # check bridges existance
2649     _CheckInstanceBridgesExist(self, instance)
2650
2651     _CheckNodeFreeMemory(self, instance.primary_node,
2652                          "starting instance %s" % instance.name,
2653                          bep[constants.BE_MEMORY], instance.hypervisor)
2654
2655   def Exec(self, feedback_fn):
2656     """Start the instance.
2657
2658     """
2659     instance = self.instance
2660     force = self.op.force
2661     extra_args = getattr(self.op, "extra_args", "")
2662
2663     self.cfg.MarkInstanceUp(instance.name)
2664
2665     node_current = instance.primary_node
2666
2667     _StartInstanceDisks(self, instance, force)
2668
2669     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2670     msg = result.RemoteFailMsg()
2671     if msg:
2672       _ShutdownInstanceDisks(self, instance)
2673       raise errors.OpExecError("Could not start instance: %s" % msg)
2674
2675
2676 class LURebootInstance(LogicalUnit):
2677   """Reboot an instance.
2678
2679   """
2680   HPATH = "instance-reboot"
2681   HTYPE = constants.HTYPE_INSTANCE
2682   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2683   REQ_BGL = False
2684
2685   def ExpandNames(self):
2686     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2687                                    constants.INSTANCE_REBOOT_HARD,
2688                                    constants.INSTANCE_REBOOT_FULL]:
2689       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2690                                   (constants.INSTANCE_REBOOT_SOFT,
2691                                    constants.INSTANCE_REBOOT_HARD,
2692                                    constants.INSTANCE_REBOOT_FULL))
2693     self._ExpandAndLockInstance()
2694
2695   def BuildHooksEnv(self):
2696     """Build hooks env.
2697
2698     This runs on master, primary and secondary nodes of the instance.
2699
2700     """
2701     env = {
2702       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2703       }
2704     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2705     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2706     return env, nl, nl
2707
2708   def CheckPrereq(self):
2709     """Check prerequisites.
2710
2711     This checks that the instance is in the cluster.
2712
2713     """
2714     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2715     assert self.instance is not None, \
2716       "Cannot retrieve locked instance %s" % self.op.instance_name
2717
2718     _CheckNodeOnline(self, instance.primary_node)
2719
2720     # check bridges existance
2721     _CheckInstanceBridgesExist(self, instance)
2722
2723   def Exec(self, feedback_fn):
2724     """Reboot the instance.
2725
2726     """
2727     instance = self.instance
2728     ignore_secondaries = self.op.ignore_secondaries
2729     reboot_type = self.op.reboot_type
2730     extra_args = getattr(self.op, "extra_args", "")
2731
2732     node_current = instance.primary_node
2733
2734     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2735                        constants.INSTANCE_REBOOT_HARD]:
2736       result = self.rpc.call_instance_reboot(node_current, instance,
2737                                              reboot_type, extra_args)
2738       if result.failed or not result.data:
2739         raise errors.OpExecError("Could not reboot instance")
2740     else:
2741       if not self.rpc.call_instance_shutdown(node_current, instance):
2742         raise errors.OpExecError("could not shutdown instance for full reboot")
2743       _ShutdownInstanceDisks(self, instance)
2744       _StartInstanceDisks(self, instance, ignore_secondaries)
2745       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2746       msg = result.RemoteFailMsg()
2747       if msg:
2748         _ShutdownInstanceDisks(self, instance)
2749         raise errors.OpExecError("Could not start instance for"
2750                                  " full reboot: %s" % msg)
2751
2752     self.cfg.MarkInstanceUp(instance.name)
2753
2754
2755 class LUShutdownInstance(LogicalUnit):
2756   """Shutdown an instance.
2757
2758   """
2759   HPATH = "instance-stop"
2760   HTYPE = constants.HTYPE_INSTANCE
2761   _OP_REQP = ["instance_name"]
2762   REQ_BGL = False
2763
2764   def ExpandNames(self):
2765     self._ExpandAndLockInstance()
2766
2767   def BuildHooksEnv(self):
2768     """Build hooks env.
2769
2770     This runs on master, primary and secondary nodes of the instance.
2771
2772     """
2773     env = _BuildInstanceHookEnvByObject(self, self.instance)
2774     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2775     return env, nl, nl
2776
2777   def CheckPrereq(self):
2778     """Check prerequisites.
2779
2780     This checks that the instance is in the cluster.
2781
2782     """
2783     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2784     assert self.instance is not None, \
2785       "Cannot retrieve locked instance %s" % self.op.instance_name
2786     _CheckNodeOnline(self, self.instance.primary_node)
2787
2788   def Exec(self, feedback_fn):
2789     """Shutdown the instance.
2790
2791     """
2792     instance = self.instance
2793     node_current = instance.primary_node
2794     self.cfg.MarkInstanceDown(instance.name)
2795     result = self.rpc.call_instance_shutdown(node_current, instance)
2796     if result.failed or not result.data:
2797       self.proc.LogWarning("Could not shutdown instance")
2798
2799     _ShutdownInstanceDisks(self, instance)
2800
2801
2802 class LUReinstallInstance(LogicalUnit):
2803   """Reinstall an instance.
2804
2805   """
2806   HPATH = "instance-reinstall"
2807   HTYPE = constants.HTYPE_INSTANCE
2808   _OP_REQP = ["instance_name"]
2809   REQ_BGL = False
2810
2811   def ExpandNames(self):
2812     self._ExpandAndLockInstance()
2813
2814   def BuildHooksEnv(self):
2815     """Build hooks env.
2816
2817     This runs on master, primary and secondary nodes of the instance.
2818
2819     """
2820     env = _BuildInstanceHookEnvByObject(self, self.instance)
2821     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2822     return env, nl, nl
2823
2824   def CheckPrereq(self):
2825     """Check prerequisites.
2826
2827     This checks that the instance is in the cluster and is not running.
2828
2829     """
2830     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2831     assert instance is not None, \
2832       "Cannot retrieve locked instance %s" % self.op.instance_name
2833     _CheckNodeOnline(self, instance.primary_node)
2834
2835     if instance.disk_template == constants.DT_DISKLESS:
2836       raise errors.OpPrereqError("Instance '%s' has no disks" %
2837                                  self.op.instance_name)
2838     if instance.admin_up:
2839       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2840                                  self.op.instance_name)
2841     remote_info = self.rpc.call_instance_info(instance.primary_node,
2842                                               instance.name,
2843                                               instance.hypervisor)
2844     if remote_info.failed or remote_info.data:
2845       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2846                                  (self.op.instance_name,
2847                                   instance.primary_node))
2848
2849     self.op.os_type = getattr(self.op, "os_type", None)
2850     if self.op.os_type is not None:
2851       # OS verification
2852       pnode = self.cfg.GetNodeInfo(
2853         self.cfg.ExpandNodeName(instance.primary_node))
2854       if pnode is None:
2855         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2856                                    self.op.pnode)
2857       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2858       result.Raise()
2859       if not isinstance(result.data, objects.OS):
2860         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2861                                    " primary node"  % self.op.os_type)
2862
2863     self.instance = instance
2864
2865   def Exec(self, feedback_fn):
2866     """Reinstall the instance.
2867
2868     """
2869     inst = self.instance
2870
2871     if self.op.os_type is not None:
2872       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2873       inst.os = self.op.os_type
2874       self.cfg.Update(inst)
2875
2876     _StartInstanceDisks(self, inst, None)
2877     try:
2878       feedback_fn("Running the instance OS create scripts...")
2879       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2880       msg = result.RemoteFailMsg()
2881       if msg:
2882         raise errors.OpExecError("Could not install OS for instance %s"
2883                                  " on node %s: %s" %
2884                                  (inst.name, inst.primary_node, msg))
2885     finally:
2886       _ShutdownInstanceDisks(self, inst)
2887
2888
2889 class LURenameInstance(LogicalUnit):
2890   """Rename an instance.
2891
2892   """
2893   HPATH = "instance-rename"
2894   HTYPE = constants.HTYPE_INSTANCE
2895   _OP_REQP = ["instance_name", "new_name"]
2896
2897   def BuildHooksEnv(self):
2898     """Build hooks env.
2899
2900     This runs on master, primary and secondary nodes of the instance.
2901
2902     """
2903     env = _BuildInstanceHookEnvByObject(self, self.instance)
2904     env["INSTANCE_NEW_NAME"] = self.op.new_name
2905     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2906     return env, nl, nl
2907
2908   def CheckPrereq(self):
2909     """Check prerequisites.
2910
2911     This checks that the instance is in the cluster and is not running.
2912
2913     """
2914     instance = self.cfg.GetInstanceInfo(
2915       self.cfg.ExpandInstanceName(self.op.instance_name))
2916     if instance is None:
2917       raise errors.OpPrereqError("Instance '%s' not known" %
2918                                  self.op.instance_name)
2919     _CheckNodeOnline(self, instance.primary_node)
2920
2921     if instance.admin_up:
2922       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2923                                  self.op.instance_name)
2924     remote_info = self.rpc.call_instance_info(instance.primary_node,
2925                                               instance.name,
2926                                               instance.hypervisor)
2927     remote_info.Raise()
2928     if remote_info.data:
2929       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2930                                  (self.op.instance_name,
2931                                   instance.primary_node))
2932     self.instance = instance
2933
2934     # new name verification
2935     name_info = utils.HostInfo(self.op.new_name)
2936
2937     self.op.new_name = new_name = name_info.name
2938     instance_list = self.cfg.GetInstanceList()
2939     if new_name in instance_list:
2940       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2941                                  new_name)
2942
2943     if not getattr(self.op, "ignore_ip", False):
2944       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2945         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2946                                    (name_info.ip, new_name))
2947
2948
2949   def Exec(self, feedback_fn):
2950     """Reinstall the instance.
2951
2952     """
2953     inst = self.instance
2954     old_name = inst.name
2955
2956     if inst.disk_template == constants.DT_FILE:
2957       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2958
2959     self.cfg.RenameInstance(inst.name, self.op.new_name)
2960     # Change the instance lock. This is definitely safe while we hold the BGL
2961     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2962     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2963
2964     # re-read the instance from the configuration after rename
2965     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2966
2967     if inst.disk_template == constants.DT_FILE:
2968       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2969       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2970                                                      old_file_storage_dir,
2971                                                      new_file_storage_dir)
2972       result.Raise()
2973       if not result.data:
2974         raise errors.OpExecError("Could not connect to node '%s' to rename"
2975                                  " directory '%s' to '%s' (but the instance"
2976                                  " has been renamed in Ganeti)" % (
2977                                  inst.primary_node, old_file_storage_dir,
2978                                  new_file_storage_dir))
2979
2980       if not result.data[0]:
2981         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2982                                  " (but the instance has been renamed in"
2983                                  " Ganeti)" % (old_file_storage_dir,
2984                                                new_file_storage_dir))
2985
2986     _StartInstanceDisks(self, inst, None)
2987     try:
2988       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2989                                                  old_name)
2990       msg = result.RemoteFailMsg()
2991       if msg:
2992         msg = ("Could not run OS rename script for instance %s on node %s"
2993                " (but the instance has been renamed in Ganeti): %s" %
2994                (inst.name, inst.primary_node, msg))
2995         self.proc.LogWarning(msg)
2996     finally:
2997       _ShutdownInstanceDisks(self, inst)
2998
2999
3000 class LURemoveInstance(LogicalUnit):
3001   """Remove an instance.
3002
3003   """
3004   HPATH = "instance-remove"
3005   HTYPE = constants.HTYPE_INSTANCE
3006   _OP_REQP = ["instance_name", "ignore_failures"]
3007   REQ_BGL = False
3008
3009   def ExpandNames(self):
3010     self._ExpandAndLockInstance()
3011     self.needed_locks[locking.LEVEL_NODE] = []
3012     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3013
3014   def DeclareLocks(self, level):
3015     if level == locking.LEVEL_NODE:
3016       self._LockInstancesNodes()
3017
3018   def BuildHooksEnv(self):
3019     """Build hooks env.
3020
3021     This runs on master, primary and secondary nodes of the instance.
3022
3023     """
3024     env = _BuildInstanceHookEnvByObject(self, self.instance)
3025     nl = [self.cfg.GetMasterNode()]
3026     return env, nl, nl
3027
3028   def CheckPrereq(self):
3029     """Check prerequisites.
3030
3031     This checks that the instance is in the cluster.
3032
3033     """
3034     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3035     assert self.instance is not None, \
3036       "Cannot retrieve locked instance %s" % self.op.instance_name
3037
3038   def Exec(self, feedback_fn):
3039     """Remove the instance.
3040
3041     """
3042     instance = self.instance
3043     logging.info("Shutting down instance %s on node %s",
3044                  instance.name, instance.primary_node)
3045
3046     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3047     if result.failed or not result.data:
3048       if self.op.ignore_failures:
3049         feedback_fn("Warning: can't shutdown instance")
3050       else:
3051         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3052                                  (instance.name, instance.primary_node))
3053
3054     logging.info("Removing block devices for instance %s", instance.name)
3055
3056     if not _RemoveDisks(self, instance):
3057       if self.op.ignore_failures:
3058         feedback_fn("Warning: can't remove instance's disks")
3059       else:
3060         raise errors.OpExecError("Can't remove instance's disks")
3061
3062     logging.info("Removing instance %s out of cluster config", instance.name)
3063
3064     self.cfg.RemoveInstance(instance.name)
3065     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3066
3067
3068 class LUQueryInstances(NoHooksLU):
3069   """Logical unit for querying instances.
3070
3071   """
3072   _OP_REQP = ["output_fields", "names"]
3073   REQ_BGL = False
3074   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3075                                     "admin_state", "admin_ram",
3076                                     "disk_template", "ip", "mac", "bridge",
3077                                     "sda_size", "sdb_size", "vcpus", "tags",
3078                                     "network_port", "beparams",
3079                                     "(disk).(size)/([0-9]+)",
3080                                     "(disk).(sizes)",
3081                                     "(nic).(mac|ip|bridge)/([0-9]+)",
3082                                     "(nic).(macs|ips|bridges)",
3083                                     "(disk|nic).(count)",
3084                                     "serial_no", "hypervisor", "hvparams",] +
3085                                   ["hv/%s" % name
3086                                    for name in constants.HVS_PARAMETERS] +
3087                                   ["be/%s" % name
3088                                    for name in constants.BES_PARAMETERS])
3089   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3090
3091
3092   def ExpandNames(self):
3093     _CheckOutputFields(static=self._FIELDS_STATIC,
3094                        dynamic=self._FIELDS_DYNAMIC,
3095                        selected=self.op.output_fields)
3096
3097     self.needed_locks = {}
3098     self.share_locks[locking.LEVEL_INSTANCE] = 1
3099     self.share_locks[locking.LEVEL_NODE] = 1
3100
3101     if self.op.names:
3102       self.wanted = _GetWantedInstances(self, self.op.names)
3103     else:
3104       self.wanted = locking.ALL_SET
3105
3106     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3107     if self.do_locking:
3108       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3109       self.needed_locks[locking.LEVEL_NODE] = []
3110       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3111
3112   def DeclareLocks(self, level):
3113     if level == locking.LEVEL_NODE and self.do_locking:
3114       self._LockInstancesNodes()
3115
3116   def CheckPrereq(self):
3117     """Check prerequisites.
3118
3119     """
3120     pass
3121
3122   def Exec(self, feedback_fn):
3123     """Computes the list of nodes and their attributes.
3124
3125     """
3126     all_info = self.cfg.GetAllInstancesInfo()
3127     if self.wanted == locking.ALL_SET:
3128       # caller didn't specify instance names, so ordering is not important
3129       if self.do_locking:
3130         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3131       else:
3132         instance_names = all_info.keys()
3133       instance_names = utils.NiceSort(instance_names)
3134     else:
3135       # caller did specify names, so we must keep the ordering
3136       if self.do_locking:
3137         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3138       else:
3139         tgt_set = all_info.keys()
3140       missing = set(self.wanted).difference(tgt_set)
3141       if missing:
3142         raise errors.OpExecError("Some instances were removed before"
3143                                  " retrieving their data: %s" % missing)
3144       instance_names = self.wanted
3145
3146     instance_list = [all_info[iname] for iname in instance_names]
3147
3148     # begin data gathering
3149
3150     nodes = frozenset([inst.primary_node for inst in instance_list])
3151     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3152
3153     bad_nodes = []
3154     off_nodes = []
3155     if self.do_locking:
3156       live_data = {}
3157       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3158       for name in nodes:
3159         result = node_data[name]
3160         if result.offline:
3161           # offline nodes will be in both lists
3162           off_nodes.append(name)
3163         if result.failed:
3164           bad_nodes.append(name)
3165         else:
3166           if result.data:
3167             live_data.update(result.data)
3168             # else no instance is alive
3169     else:
3170       live_data = dict([(name, {}) for name in instance_names])
3171
3172     # end data gathering
3173
3174     HVPREFIX = "hv/"
3175     BEPREFIX = "be/"
3176     output = []
3177     for instance in instance_list:
3178       iout = []
3179       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3180       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3181       for field in self.op.output_fields:
3182         st_match = self._FIELDS_STATIC.Matches(field)
3183         if field == "name":
3184           val = instance.name
3185         elif field == "os":
3186           val = instance.os
3187         elif field == "pnode":
3188           val = instance.primary_node
3189         elif field == "snodes":
3190           val = list(instance.secondary_nodes)
3191         elif field == "admin_state":
3192           val = instance.admin_up
3193         elif field == "oper_state":
3194           if instance.primary_node in bad_nodes:
3195             val = None
3196           else:
3197             val = bool(live_data.get(instance.name))
3198         elif field == "status":
3199           if instance.primary_node in off_nodes:
3200             val = "ERROR_nodeoffline"
3201           elif instance.primary_node in bad_nodes:
3202             val = "ERROR_nodedown"
3203           else:
3204             running = bool(live_data.get(instance.name))
3205             if running:
3206               if instance.admin_up:
3207                 val = "running"
3208               else:
3209                 val = "ERROR_up"
3210             else:
3211               if instance.admin_up:
3212                 val = "ERROR_down"
3213               else:
3214                 val = "ADMIN_down"
3215         elif field == "oper_ram":
3216           if instance.primary_node in bad_nodes:
3217             val = None
3218           elif instance.name in live_data:
3219             val = live_data[instance.name].get("memory", "?")
3220           else:
3221             val = "-"
3222         elif field == "disk_template":
3223           val = instance.disk_template
3224         elif field == "ip":
3225           val = instance.nics[0].ip
3226         elif field == "bridge":
3227           val = instance.nics[0].bridge
3228         elif field == "mac":
3229           val = instance.nics[0].mac
3230         elif field == "sda_size" or field == "sdb_size":
3231           idx = ord(field[2]) - ord('a')
3232           try:
3233             val = instance.FindDisk(idx).size
3234           except errors.OpPrereqError:
3235             val = None
3236         elif field == "tags":
3237           val = list(instance.GetTags())
3238         elif field == "serial_no":
3239           val = instance.serial_no
3240         elif field == "network_port":
3241           val = instance.network_port
3242         elif field == "hypervisor":
3243           val = instance.hypervisor
3244         elif field == "hvparams":
3245           val = i_hv
3246         elif (field.startswith(HVPREFIX) and
3247               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3248           val = i_hv.get(field[len(HVPREFIX):], None)
3249         elif field == "beparams":
3250           val = i_be
3251         elif (field.startswith(BEPREFIX) and
3252               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3253           val = i_be.get(field[len(BEPREFIX):], None)
3254         elif st_match and st_match.groups():
3255           # matches a variable list
3256           st_groups = st_match.groups()
3257           if st_groups and st_groups[0] == "disk":
3258             if st_groups[1] == "count":
3259               val = len(instance.disks)
3260             elif st_groups[1] == "sizes":
3261               val = [disk.size for disk in instance.disks]
3262             elif st_groups[1] == "size":
3263               try:
3264                 val = instance.FindDisk(st_groups[2]).size
3265               except errors.OpPrereqError:
3266                 val = None
3267             else:
3268               assert False, "Unhandled disk parameter"
3269           elif st_groups[0] == "nic":
3270             if st_groups[1] == "count":
3271               val = len(instance.nics)
3272             elif st_groups[1] == "macs":
3273               val = [nic.mac for nic in instance.nics]
3274             elif st_groups[1] == "ips":
3275               val = [nic.ip for nic in instance.nics]
3276             elif st_groups[1] == "bridges":
3277               val = [nic.bridge for nic in instance.nics]
3278             else:
3279               # index-based item
3280               nic_idx = int(st_groups[2])
3281               if nic_idx >= len(instance.nics):
3282                 val = None
3283               else:
3284                 if st_groups[1] == "mac":
3285                   val = instance.nics[nic_idx].mac
3286                 elif st_groups[1] == "ip":
3287                   val = instance.nics[nic_idx].ip
3288                 elif st_groups[1] == "bridge":
3289                   val = instance.nics[nic_idx].bridge
3290                 else:
3291                   assert False, "Unhandled NIC parameter"
3292           else:
3293             assert False, "Unhandled variable parameter"
3294         else:
3295           raise errors.ParameterError(field)
3296         iout.append(val)
3297       output.append(iout)
3298
3299     return output
3300
3301
3302 class LUFailoverInstance(LogicalUnit):
3303   """Failover an instance.
3304
3305   """
3306   HPATH = "instance-failover"
3307   HTYPE = constants.HTYPE_INSTANCE
3308   _OP_REQP = ["instance_name", "ignore_consistency"]
3309   REQ_BGL = False
3310
3311   def ExpandNames(self):
3312     self._ExpandAndLockInstance()
3313     self.needed_locks[locking.LEVEL_NODE] = []
3314     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3315
3316   def DeclareLocks(self, level):
3317     if level == locking.LEVEL_NODE:
3318       self._LockInstancesNodes()
3319
3320   def BuildHooksEnv(self):
3321     """Build hooks env.
3322
3323     This runs on master, primary and secondary nodes of the instance.
3324
3325     """
3326     env = {
3327       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3328       }
3329     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3330     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3331     return env, nl, nl
3332
3333   def CheckPrereq(self):
3334     """Check prerequisites.
3335
3336     This checks that the instance is in the cluster.
3337
3338     """
3339     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3340     assert self.instance is not None, \
3341       "Cannot retrieve locked instance %s" % self.op.instance_name
3342
3343     bep = self.cfg.GetClusterInfo().FillBE(instance)
3344     if instance.disk_template not in constants.DTS_NET_MIRROR:
3345       raise errors.OpPrereqError("Instance's disk layout is not"
3346                                  " network mirrored, cannot failover.")
3347
3348     secondary_nodes = instance.secondary_nodes
3349     if not secondary_nodes:
3350       raise errors.ProgrammerError("no secondary node but using "
3351                                    "a mirrored disk template")
3352
3353     target_node = secondary_nodes[0]
3354     _CheckNodeOnline(self, target_node)
3355     # check memory requirements on the secondary node
3356     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3357                          instance.name, bep[constants.BE_MEMORY],
3358                          instance.hypervisor)
3359
3360     # check bridge existance
3361     brlist = [nic.bridge for nic in instance.nics]
3362     result = self.rpc.call_bridges_exist(target_node, brlist)
3363     result.Raise()
3364     if not result.data:
3365       raise errors.OpPrereqError("One or more target bridges %s does not"
3366                                  " exist on destination node '%s'" %
3367                                  (brlist, target_node))
3368
3369   def Exec(self, feedback_fn):
3370     """Failover an instance.
3371
3372     The failover is done by shutting it down on its present node and
3373     starting it on the secondary.
3374
3375     """
3376     instance = self.instance
3377
3378     source_node = instance.primary_node
3379     target_node = instance.secondary_nodes[0]
3380
3381     feedback_fn("* checking disk consistency between source and target")
3382     for dev in instance.disks:
3383       # for drbd, these are drbd over lvm
3384       if not _CheckDiskConsistency(self, dev, target_node, False):
3385         if instance.admin_up and not self.op.ignore_consistency:
3386           raise errors.OpExecError("Disk %s is degraded on target node,"
3387                                    " aborting failover." % dev.iv_name)
3388
3389     feedback_fn("* shutting down instance on source node")
3390     logging.info("Shutting down instance %s on node %s",
3391                  instance.name, source_node)
3392
3393     result = self.rpc.call_instance_shutdown(source_node, instance)
3394     if result.failed or not result.data:
3395       if self.op.ignore_consistency:
3396         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3397                              " Proceeding"
3398                              " anyway. Please make sure node %s is down",
3399                              instance.name, source_node, source_node)
3400       else:
3401         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3402                                  (instance.name, source_node))
3403
3404     feedback_fn("* deactivating the instance's disks on source node")
3405     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3406       raise errors.OpExecError("Can't shut down the instance's disks.")
3407
3408     instance.primary_node = target_node
3409     # distribute new instance config to the other nodes
3410     self.cfg.Update(instance)
3411
3412     # Only start the instance if it's marked as up
3413     if instance.admin_up:
3414       feedback_fn("* activating the instance's disks on target node")
3415       logging.info("Starting instance %s on node %s",
3416                    instance.name, target_node)
3417
3418       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3419                                                ignore_secondaries=True)
3420       if not disks_ok:
3421         _ShutdownInstanceDisks(self, instance)
3422         raise errors.OpExecError("Can't activate the instance's disks")
3423
3424       feedback_fn("* starting the instance on the target node")
3425       result = self.rpc.call_instance_start(target_node, instance, None)
3426       msg = result.RemoteFailMsg()
3427       if msg:
3428         _ShutdownInstanceDisks(self, instance)
3429         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3430                                  (instance.name, target_node, msg))
3431
3432
3433 class LUMigrateInstance(LogicalUnit):
3434   """Migrate an instance.
3435
3436   This is migration without shutting down, compared to the failover,
3437   which is done with shutdown.
3438
3439   """
3440   HPATH = "instance-migrate"
3441   HTYPE = constants.HTYPE_INSTANCE
3442   _OP_REQP = ["instance_name", "live", "cleanup"]
3443
3444   REQ_BGL = False
3445
3446   def ExpandNames(self):
3447     self._ExpandAndLockInstance()
3448     self.needed_locks[locking.LEVEL_NODE] = []
3449     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3450
3451   def DeclareLocks(self, level):
3452     if level == locking.LEVEL_NODE:
3453       self._LockInstancesNodes()
3454
3455   def BuildHooksEnv(self):
3456     """Build hooks env.
3457
3458     This runs on master, primary and secondary nodes of the instance.
3459
3460     """
3461     env = _BuildInstanceHookEnvByObject(self, self.instance)
3462     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3463     return env, nl, nl
3464
3465   def CheckPrereq(self):
3466     """Check prerequisites.
3467
3468     This checks that the instance is in the cluster.
3469
3470     """
3471     instance = self.cfg.GetInstanceInfo(
3472       self.cfg.ExpandInstanceName(self.op.instance_name))
3473     if instance is None:
3474       raise errors.OpPrereqError("Instance '%s' not known" %
3475                                  self.op.instance_name)
3476
3477     if instance.disk_template != constants.DT_DRBD8:
3478       raise errors.OpPrereqError("Instance's disk layout is not"
3479                                  " drbd8, cannot migrate.")
3480
3481     secondary_nodes = instance.secondary_nodes
3482     if not secondary_nodes:
3483       raise errors.ProgrammerError("no secondary node but using "
3484                                    "drbd8 disk template")
3485
3486     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3487
3488     target_node = secondary_nodes[0]
3489     # check memory requirements on the secondary node
3490     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3491                          instance.name, i_be[constants.BE_MEMORY],
3492                          instance.hypervisor)
3493
3494     # check bridge existance
3495     brlist = [nic.bridge for nic in instance.nics]
3496     result = self.rpc.call_bridges_exist(target_node, brlist)
3497     if result.failed or not result.data:
3498       raise errors.OpPrereqError("One or more target bridges %s does not"
3499                                  " exist on destination node '%s'" %
3500                                  (brlist, target_node))
3501
3502     if not self.op.cleanup:
3503       result = self.rpc.call_instance_migratable(instance.primary_node,
3504                                                  instance)
3505       msg = result.RemoteFailMsg()
3506       if msg:
3507         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3508                                    msg)
3509
3510     self.instance = instance
3511
3512   def _WaitUntilSync(self):
3513     """Poll with custom rpc for disk sync.
3514
3515     This uses our own step-based rpc call.
3516
3517     """
3518     self.feedback_fn("* wait until resync is done")
3519     all_done = False
3520     while not all_done:
3521       all_done = True
3522       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3523                                             self.nodes_ip,
3524                                             self.instance.disks)
3525       min_percent = 100
3526       for node, nres in result.items():
3527         msg = nres.RemoteFailMsg()
3528         if msg:
3529           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3530                                    (node, msg))
3531         node_done, node_percent = nres.data[1]
3532         all_done = all_done and node_done
3533         if node_percent is not None:
3534           min_percent = min(min_percent, node_percent)
3535       if not all_done:
3536         if min_percent < 100:
3537           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3538         time.sleep(2)
3539
3540   def _EnsureSecondary(self, node):
3541     """Demote a node to secondary.
3542
3543     """
3544     self.feedback_fn("* switching node %s to secondary mode" % node)
3545
3546     for dev in self.instance.disks:
3547       self.cfg.SetDiskID(dev, node)
3548
3549     result = self.rpc.call_blockdev_close(node, self.instance.name,
3550                                           self.instance.disks)
3551     msg = result.RemoteFailMsg()
3552     if msg:
3553       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3554                                " error %s" % (node, msg))
3555
3556   def _GoStandalone(self):
3557     """Disconnect from the network.
3558
3559     """
3560     self.feedback_fn("* changing into standalone mode")
3561     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3562                                                self.instance.disks)
3563     for node, nres in result.items():
3564       msg = nres.RemoteFailMsg()
3565       if msg:
3566         raise errors.OpExecError("Cannot disconnect disks node %s,"
3567                                  " error %s" % (node, msg))
3568
3569   def _GoReconnect(self, multimaster):
3570     """Reconnect to the network.
3571
3572     """
3573     if multimaster:
3574       msg = "dual-master"
3575     else:
3576       msg = "single-master"
3577     self.feedback_fn("* changing disks into %s mode" % msg)
3578     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3579                                            self.instance.disks,
3580                                            self.instance.name, multimaster)
3581     for node, nres in result.items():
3582       msg = nres.RemoteFailMsg()
3583       if msg:
3584         raise errors.OpExecError("Cannot change disks config on node %s,"
3585                                  " error: %s" % (node, msg))
3586
3587   def _ExecCleanup(self):
3588     """Try to cleanup after a failed migration.
3589
3590     The cleanup is done by:
3591       - check that the instance is running only on one node
3592         (and update the config if needed)
3593       - change disks on its secondary node to secondary
3594       - wait until disks are fully synchronized
3595       - disconnect from the network
3596       - change disks into single-master mode
3597       - wait again until disks are fully synchronized
3598
3599     """
3600     instance = self.instance
3601     target_node = self.target_node
3602     source_node = self.source_node
3603
3604     # check running on only one node
3605     self.feedback_fn("* checking where the instance actually runs"
3606                      " (if this hangs, the hypervisor might be in"
3607                      " a bad state)")
3608     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3609     for node, result in ins_l.items():
3610       result.Raise()
3611       if not isinstance(result.data, list):
3612         raise errors.OpExecError("Can't contact node '%s'" % node)
3613
3614     runningon_source = instance.name in ins_l[source_node].data
3615     runningon_target = instance.name in ins_l[target_node].data
3616
3617     if runningon_source and runningon_target:
3618       raise errors.OpExecError("Instance seems to be running on two nodes,"
3619                                " or the hypervisor is confused. You will have"
3620                                " to ensure manually that it runs only on one"
3621                                " and restart this operation.")
3622
3623     if not (runningon_source or runningon_target):
3624       raise errors.OpExecError("Instance does not seem to be running at all."
3625                                " In this case, it's safer to repair by"
3626                                " running 'gnt-instance stop' to ensure disk"
3627                                " shutdown, and then restarting it.")
3628
3629     if runningon_target:
3630       # the migration has actually succeeded, we need to update the config
3631       self.feedback_fn("* instance running on secondary node (%s),"
3632                        " updating config" % target_node)
3633       instance.primary_node = target_node
3634       self.cfg.Update(instance)
3635       demoted_node = source_node
3636     else:
3637       self.feedback_fn("* instance confirmed to be running on its"
3638                        " primary node (%s)" % source_node)
3639       demoted_node = target_node
3640
3641     self._EnsureSecondary(demoted_node)
3642     try:
3643       self._WaitUntilSync()
3644     except errors.OpExecError:
3645       # we ignore here errors, since if the device is standalone, it
3646       # won't be able to sync
3647       pass
3648     self._GoStandalone()
3649     self._GoReconnect(False)
3650     self._WaitUntilSync()
3651
3652     self.feedback_fn("* done")
3653
3654   def _RevertDiskStatus(self):
3655     """Try to revert the disk status after a failed migration.
3656
3657     """
3658     target_node = self.target_node
3659     try:
3660       self._EnsureSecondary(target_node)
3661       self._GoStandalone()
3662       self._GoReconnect(False)
3663       self._WaitUntilSync()
3664     except errors.OpExecError, err:
3665       self.LogWarning("Migration failed and I can't reconnect the"
3666                       " drives: error '%s'\n"
3667                       "Please look and recover the instance status" %
3668                       str(err))
3669
3670   def _AbortMigration(self):
3671     """Call the hypervisor code to abort a started migration.
3672
3673     """
3674     instance = self.instance
3675     target_node = self.target_node
3676     migration_info = self.migration_info
3677
3678     abort_result = self.rpc.call_finalize_migration(target_node,
3679                                                     instance,
3680                                                     migration_info,
3681                                                     False)
3682     abort_msg = abort_result.RemoteFailMsg()
3683     if abort_msg:
3684       logging.error("Aborting migration failed on target node %s: %s" %
3685                     (target_node, abort_msg))
3686       # Don't raise an exception here, as we stil have to try to revert the
3687       # disk status, even if this step failed.
3688
3689   def _ExecMigration(self):
3690     """Migrate an instance.
3691
3692     The migrate is done by:
3693       - change the disks into dual-master mode
3694       - wait until disks are fully synchronized again
3695       - migrate the instance
3696       - change disks on the new secondary node (the old primary) to secondary
3697       - wait until disks are fully synchronized
3698       - change disks into single-master mode
3699
3700     """
3701     instance = self.instance
3702     target_node = self.target_node
3703     source_node = self.source_node
3704
3705     self.feedback_fn("* checking disk consistency between source and target")
3706     for dev in instance.disks:
3707       if not _CheckDiskConsistency(self, dev, target_node, False):
3708         raise errors.OpExecError("Disk %s is degraded or not fully"
3709                                  " synchronized on target node,"
3710                                  " aborting migrate." % dev.iv_name)
3711
3712     # First get the migration information from the remote node
3713     result = self.rpc.call_migration_info(source_node, instance)
3714     msg = result.RemoteFailMsg()
3715     if msg:
3716       log_err = ("Failed fetching source migration information from %s: %s" %
3717                   (source_node, msg))
3718       logging.error(log_err)
3719       raise errors.OpExecError(log_err)
3720
3721     self.migration_info = migration_info = result.data[1]
3722
3723     # Then switch the disks to master/master mode
3724     self._EnsureSecondary(target_node)
3725     self._GoStandalone()
3726     self._GoReconnect(True)
3727     self._WaitUntilSync()
3728
3729     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3730     result = self.rpc.call_accept_instance(target_node,
3731                                            instance,
3732                                            migration_info,
3733                                            self.nodes_ip[target_node])
3734
3735     msg = result.RemoteFailMsg()
3736     if msg:
3737       logging.error("Instance pre-migration failed, trying to revert"
3738                     " disk status: %s", msg)
3739       self._AbortMigration()
3740       self._RevertDiskStatus()
3741       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3742                                (instance.name, msg))
3743
3744     self.feedback_fn("* migrating instance to %s" % target_node)
3745     time.sleep(10)
3746     result = self.rpc.call_instance_migrate(source_node, instance,
3747                                             self.nodes_ip[target_node],
3748                                             self.op.live)
3749     msg = result.RemoteFailMsg()
3750     if msg:
3751       logging.error("Instance migration failed, trying to revert"
3752                     " disk status: %s", msg)
3753       self._AbortMigration()
3754       self._RevertDiskStatus()
3755       raise errors.OpExecError("Could not migrate instance %s: %s" %
3756                                (instance.name, msg))
3757     time.sleep(10)
3758
3759     instance.primary_node = target_node
3760     # distribute new instance config to the other nodes
3761     self.cfg.Update(instance)
3762
3763     result = self.rpc.call_finalize_migration(target_node,
3764                                               instance,
3765                                               migration_info,
3766                                               True)
3767     msg = result.RemoteFailMsg()
3768     if msg:
3769       logging.error("Instance migration succeeded, but finalization failed:"
3770                     " %s" % msg)
3771       raise errors.OpExecError("Could not finalize instance migration: %s" %
3772                                msg)
3773
3774     self._EnsureSecondary(source_node)
3775     self._WaitUntilSync()
3776     self._GoStandalone()
3777     self._GoReconnect(False)
3778     self._WaitUntilSync()
3779
3780     self.feedback_fn("* done")
3781
3782   def Exec(self, feedback_fn):
3783     """Perform the migration.
3784
3785     """
3786     self.feedback_fn = feedback_fn
3787
3788     self.source_node = self.instance.primary_node
3789     self.target_node = self.instance.secondary_nodes[0]
3790     self.all_nodes = [self.source_node, self.target_node]
3791     self.nodes_ip = {
3792       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3793       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3794       }
3795     if self.op.cleanup:
3796       return self._ExecCleanup()
3797     else:
3798       return self._ExecMigration()
3799
3800
3801 def _CreateBlockDev(lu, node, instance, device, force_create,
3802                     info, force_open):
3803   """Create a tree of block devices on a given node.
3804
3805   If this device type has to be created on secondaries, create it and
3806   all its children.
3807
3808   If not, just recurse to children keeping the same 'force' value.
3809
3810   @param lu: the lu on whose behalf we execute
3811   @param node: the node on which to create the device
3812   @type instance: L{objects.Instance}
3813   @param instance: the instance which owns the device
3814   @type device: L{objects.Disk}
3815   @param device: the device to create
3816   @type force_create: boolean
3817   @param force_create: whether to force creation of this device; this
3818       will be change to True whenever we find a device which has
3819       CreateOnSecondary() attribute
3820   @param info: the extra 'metadata' we should attach to the device
3821       (this will be represented as a LVM tag)
3822   @type force_open: boolean
3823   @param force_open: this parameter will be passes to the
3824       L{backend.CreateBlockDevice} function where it specifies
3825       whether we run on primary or not, and it affects both
3826       the child assembly and the device own Open() execution
3827
3828   """
3829   if device.CreateOnSecondary():
3830     force_create = True
3831
3832   if device.children:
3833     for child in device.children:
3834       _CreateBlockDev(lu, node, instance, child, force_create,
3835                       info, force_open)
3836
3837   if not force_create:
3838     return
3839
3840   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3841
3842
3843 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3844   """Create a single block device on a given node.
3845
3846   This will not recurse over children of the device, so they must be
3847   created in advance.
3848
3849   @param lu: the lu on whose behalf we execute
3850   @param node: the node on which to create the device
3851   @type instance: L{objects.Instance}
3852   @param instance: the instance which owns the device
3853   @type device: L{objects.Disk}
3854   @param device: the device to create
3855   @param info: the extra 'metadata' we should attach to the device
3856       (this will be represented as a LVM tag)
3857   @type force_open: boolean
3858   @param force_open: this parameter will be passes to the
3859       L{backend.CreateBlockDevice} function where it specifies
3860       whether we run on primary or not, and it affects both
3861       the child assembly and the device own Open() execution
3862
3863   """
3864   lu.cfg.SetDiskID(device, node)
3865   result = lu.rpc.call_blockdev_create(node, device, device.size,
3866                                        instance.name, force_open, info)
3867   msg = result.RemoteFailMsg()
3868   if msg:
3869     raise errors.OpExecError("Can't create block device %s on"
3870                              " node %s for instance %s: %s" %
3871                              (device, node, instance.name, msg))
3872   if device.physical_id is None:
3873     device.physical_id = result.data[1]
3874
3875
3876 def _GenerateUniqueNames(lu, exts):
3877   """Generate a suitable LV name.
3878
3879   This will generate a logical volume name for the given instance.
3880
3881   """
3882   results = []
3883   for val in exts:
3884     new_id = lu.cfg.GenerateUniqueID()
3885     results.append("%s%s" % (new_id, val))
3886   return results
3887
3888
3889 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3890                          p_minor, s_minor):
3891   """Generate a drbd8 device complete with its children.
3892
3893   """
3894   port = lu.cfg.AllocatePort()
3895   vgname = lu.cfg.GetVGName()
3896   shared_secret = lu.cfg.GenerateDRBDSecret()
3897   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3898                           logical_id=(vgname, names[0]))
3899   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3900                           logical_id=(vgname, names[1]))
3901   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3902                           logical_id=(primary, secondary, port,
3903                                       p_minor, s_minor,
3904                                       shared_secret),
3905                           children=[dev_data, dev_meta],
3906                           iv_name=iv_name)
3907   return drbd_dev
3908
3909
3910 def _GenerateDiskTemplate(lu, template_name,
3911                           instance_name, primary_node,
3912                           secondary_nodes, disk_info,
3913                           file_storage_dir, file_driver,
3914                           base_index):
3915   """Generate the entire disk layout for a given template type.
3916
3917   """
3918   #TODO: compute space requirements
3919
3920   vgname = lu.cfg.GetVGName()
3921   disk_count = len(disk_info)
3922   disks = []
3923   if template_name == constants.DT_DISKLESS:
3924     pass
3925   elif template_name == constants.DT_PLAIN:
3926     if len(secondary_nodes) != 0:
3927       raise errors.ProgrammerError("Wrong template configuration")
3928
3929     names = _GenerateUniqueNames(lu, [".disk%d" % i
3930                                       for i in range(disk_count)])
3931     for idx, disk in enumerate(disk_info):
3932       disk_index = idx + base_index
3933       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3934                               logical_id=(vgname, names[idx]),
3935                               iv_name="disk/%d" % disk_index,
3936                               mode=disk["mode"])
3937       disks.append(disk_dev)
3938   elif template_name == constants.DT_DRBD8:
3939     if len(secondary_nodes) != 1:
3940       raise errors.ProgrammerError("Wrong template configuration")
3941     remote_node = secondary_nodes[0]
3942     minors = lu.cfg.AllocateDRBDMinor(
3943       [primary_node, remote_node] * len(disk_info), instance_name)
3944
3945     names = []
3946     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
3947                                                for i in range(disk_count)]):
3948       names.append(lv_prefix + "_data")
3949       names.append(lv_prefix + "_meta")
3950     for idx, disk in enumerate(disk_info):
3951       disk_index = idx + base_index
3952       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3953                                       disk["size"], names[idx*2:idx*2+2],
3954                                       "disk/%d" % disk_index,
3955                                       minors[idx*2], minors[idx*2+1])
3956       disk_dev.mode = disk["mode"]
3957       disks.append(disk_dev)
3958   elif template_name == constants.DT_FILE:
3959     if len(secondary_nodes) != 0:
3960       raise errors.ProgrammerError("Wrong template configuration")
3961
3962     for idx, disk in enumerate(disk_info):
3963       disk_index = idx + base_index
3964       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3965                               iv_name="disk/%d" % disk_index,
3966                               logical_id=(file_driver,
3967                                           "%s/disk%d" % (file_storage_dir,
3968                                                          idx)),
3969                               mode=disk["mode"])
3970       disks.append(disk_dev)
3971   else:
3972     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3973   return disks
3974
3975
3976 def _GetInstanceInfoText(instance):
3977   """Compute that text that should be added to the disk's metadata.
3978
3979   """
3980   return "originstname+%s" % instance.name
3981
3982
3983 def _CreateDisks(lu, instance):
3984   """Create all disks for an instance.
3985
3986   This abstracts away some work from AddInstance.
3987
3988   @type lu: L{LogicalUnit}
3989   @param lu: the logical unit on whose behalf we execute
3990   @type instance: L{objects.Instance}
3991   @param instance: the instance whose disks we should create
3992   @rtype: boolean
3993   @return: the success of the creation
3994
3995   """
3996   info = _GetInstanceInfoText(instance)
3997   pnode = instance.primary_node
3998
3999   if instance.disk_template == constants.DT_FILE:
4000     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4001     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4002
4003     if result.failed or not result.data:
4004       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4005
4006     if not result.data[0]:
4007       raise errors.OpExecError("Failed to create directory '%s'" %
4008                                file_storage_dir)
4009
4010   # Note: this needs to be kept in sync with adding of disks in
4011   # LUSetInstanceParams
4012   for device in instance.disks:
4013     logging.info("Creating volume %s for instance %s",
4014                  device.iv_name, instance.name)
4015     #HARDCODE
4016     for node in instance.all_nodes:
4017       f_create = node == pnode
4018       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4019
4020
4021 def _RemoveDisks(lu, instance):
4022   """Remove all disks for an instance.
4023
4024   This abstracts away some work from `AddInstance()` and
4025   `RemoveInstance()`. Note that in case some of the devices couldn't
4026   be removed, the removal will continue with the other ones (compare
4027   with `_CreateDisks()`).
4028
4029   @type lu: L{LogicalUnit}
4030   @param lu: the logical unit on whose behalf we execute
4031   @type instance: L{objects.Instance}
4032   @param instance: the instance whose disks we should remove
4033   @rtype: boolean
4034   @return: the success of the removal
4035
4036   """
4037   logging.info("Removing block devices for instance %s", instance.name)
4038
4039   result = True
4040   for device in instance.disks:
4041     for node, disk in device.ComputeNodeTree(instance.primary_node):
4042       lu.cfg.SetDiskID(disk, node)
4043       result = lu.rpc.call_blockdev_remove(node, disk)
4044       if result.failed or not result.data:
4045         lu.proc.LogWarning("Could not remove block device %s on node %s,"
4046                            " continuing anyway", device.iv_name, node)
4047         result = False
4048
4049   if instance.disk_template == constants.DT_FILE:
4050     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4051     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4052                                                  file_storage_dir)
4053     if result.failed or not result.data:
4054       logging.error("Could not remove directory '%s'", file_storage_dir)
4055       result = False
4056
4057   return result
4058
4059
4060 def _ComputeDiskSize(disk_template, disks):
4061   """Compute disk size requirements in the volume group
4062
4063   """
4064   # Required free disk space as a function of disk and swap space
4065   req_size_dict = {
4066     constants.DT_DISKLESS: None,
4067     constants.DT_PLAIN: sum(d["size"] for d in disks),
4068     # 128 MB are added for drbd metadata for each disk
4069     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4070     constants.DT_FILE: None,
4071   }
4072
4073   if disk_template not in req_size_dict:
4074     raise errors.ProgrammerError("Disk template '%s' size requirement"
4075                                  " is unknown" %  disk_template)
4076
4077   return req_size_dict[disk_template]
4078
4079
4080 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4081   """Hypervisor parameter validation.
4082
4083   This function abstract the hypervisor parameter validation to be
4084   used in both instance create and instance modify.
4085
4086   @type lu: L{LogicalUnit}
4087   @param lu: the logical unit for which we check
4088   @type nodenames: list
4089   @param nodenames: the list of nodes on which we should check
4090   @type hvname: string
4091   @param hvname: the name of the hypervisor we should use
4092   @type hvparams: dict
4093   @param hvparams: the parameters which we need to check
4094   @raise errors.OpPrereqError: if the parameters are not valid
4095
4096   """
4097   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4098                                                   hvname,
4099                                                   hvparams)
4100   for node in nodenames:
4101     info = hvinfo[node]
4102     if info.offline:
4103       continue
4104     info.Raise()
4105     if not info.data or not isinstance(info.data, (tuple, list)):
4106       raise errors.OpPrereqError("Cannot get current information"
4107                                  " from node '%s' (%s)" % (node, info.data))
4108     if not info.data[0]:
4109       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4110                                  " %s" % info.data[1])
4111
4112
4113 class LUCreateInstance(LogicalUnit):
4114   """Create an instance.
4115
4116   """
4117   HPATH = "instance-add"
4118   HTYPE = constants.HTYPE_INSTANCE
4119   _OP_REQP = ["instance_name", "disks", "disk_template",
4120               "mode", "start",
4121               "wait_for_sync", "ip_check", "nics",
4122               "hvparams", "beparams"]
4123   REQ_BGL = False
4124
4125   def _ExpandNode(self, node):
4126     """Expands and checks one node name.
4127
4128     """
4129     node_full = self.cfg.ExpandNodeName(node)
4130     if node_full is None:
4131       raise errors.OpPrereqError("Unknown node %s" % node)
4132     return node_full
4133
4134   def ExpandNames(self):
4135     """ExpandNames for CreateInstance.
4136
4137     Figure out the right locks for instance creation.
4138
4139     """
4140     self.needed_locks = {}
4141
4142     # set optional parameters to none if they don't exist
4143     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4144       if not hasattr(self.op, attr):
4145         setattr(self.op, attr, None)
4146
4147     # cheap checks, mostly valid constants given
4148
4149     # verify creation mode
4150     if self.op.mode not in (constants.INSTANCE_CREATE,
4151                             constants.INSTANCE_IMPORT):
4152       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4153                                  self.op.mode)
4154
4155     # disk template and mirror node verification
4156     if self.op.disk_template not in constants.DISK_TEMPLATES:
4157       raise errors.OpPrereqError("Invalid disk template name")
4158
4159     if self.op.hypervisor is None:
4160       self.op.hypervisor = self.cfg.GetHypervisorType()
4161
4162     cluster = self.cfg.GetClusterInfo()
4163     enabled_hvs = cluster.enabled_hypervisors
4164     if self.op.hypervisor not in enabled_hvs:
4165       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4166                                  " cluster (%s)" % (self.op.hypervisor,
4167                                   ",".join(enabled_hvs)))
4168
4169     # check hypervisor parameter syntax (locally)
4170
4171     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4172                                   self.op.hvparams)
4173     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4174     hv_type.CheckParameterSyntax(filled_hvp)
4175
4176     # fill and remember the beparams dict
4177     utils.CheckBEParams(self.op.beparams)
4178     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4179                                     self.op.beparams)
4180
4181     #### instance parameters check
4182
4183     # instance name verification
4184     hostname1 = utils.HostInfo(self.op.instance_name)
4185     self.op.instance_name = instance_name = hostname1.name
4186
4187     # this is just a preventive check, but someone might still add this
4188     # instance in the meantime, and creation will fail at lock-add time
4189     if instance_name in self.cfg.GetInstanceList():
4190       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4191                                  instance_name)
4192
4193     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4194
4195     # NIC buildup
4196     self.nics = []
4197     for nic in self.op.nics:
4198       # ip validity checks
4199       ip = nic.get("ip", None)
4200       if ip is None or ip.lower() == "none":
4201         nic_ip = None
4202       elif ip.lower() == constants.VALUE_AUTO:
4203         nic_ip = hostname1.ip
4204       else:
4205         if not utils.IsValidIP(ip):
4206           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4207                                      " like a valid IP" % ip)
4208         nic_ip = ip
4209
4210       # MAC address verification
4211       mac = nic.get("mac", constants.VALUE_AUTO)
4212       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4213         if not utils.IsValidMac(mac.lower()):
4214           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4215                                      mac)
4216       # bridge verification
4217       bridge = nic.get("bridge", None)
4218       if bridge is None:
4219         bridge = self.cfg.GetDefBridge()
4220       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4221
4222     # disk checks/pre-build
4223     self.disks = []
4224     for disk in self.op.disks:
4225       mode = disk.get("mode", constants.DISK_RDWR)
4226       if mode not in constants.DISK_ACCESS_SET:
4227         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4228                                    mode)
4229       size = disk.get("size", None)
4230       if size is None:
4231         raise errors.OpPrereqError("Missing disk size")
4232       try:
4233         size = int(size)
4234       except ValueError:
4235         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4236       self.disks.append({"size": size, "mode": mode})
4237
4238     # used in CheckPrereq for ip ping check
4239     self.check_ip = hostname1.ip
4240
4241     # file storage checks
4242     if (self.op.file_driver and
4243         not self.op.file_driver in constants.FILE_DRIVER):
4244       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4245                                  self.op.file_driver)
4246
4247     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4248       raise errors.OpPrereqError("File storage directory path not absolute")
4249
4250     ### Node/iallocator related checks
4251     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4252       raise errors.OpPrereqError("One and only one of iallocator and primary"
4253                                  " node must be given")
4254
4255     if self.op.iallocator:
4256       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4257     else:
4258       self.op.pnode = self._ExpandNode(self.op.pnode)
4259       nodelist = [self.op.pnode]
4260       if self.op.snode is not None:
4261         self.op.snode = self._ExpandNode(self.op.snode)
4262         nodelist.append(self.op.snode)
4263       self.needed_locks[locking.LEVEL_NODE] = nodelist
4264
4265     # in case of import lock the source node too
4266     if self.op.mode == constants.INSTANCE_IMPORT:
4267       src_node = getattr(self.op, "src_node", None)
4268       src_path = getattr(self.op, "src_path", None)
4269
4270       if src_path is None:
4271         self.op.src_path = src_path = self.op.instance_name
4272
4273       if src_node is None:
4274         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4275         self.op.src_node = None
4276         if os.path.isabs(src_path):
4277           raise errors.OpPrereqError("Importing an instance from an absolute"
4278                                      " path requires a source node option.")
4279       else:
4280         self.op.src_node = src_node = self._ExpandNode(src_node)
4281         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4282           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4283         if not os.path.isabs(src_path):
4284           self.op.src_path = src_path = \
4285             os.path.join(constants.EXPORT_DIR, src_path)
4286
4287     else: # INSTANCE_CREATE
4288       if getattr(self.op, "os_type", None) is None:
4289         raise errors.OpPrereqError("No guest OS specified")
4290
4291   def _RunAllocator(self):
4292     """Run the allocator based on input opcode.
4293
4294     """
4295     nics = [n.ToDict() for n in self.nics]
4296     ial = IAllocator(self,
4297                      mode=constants.IALLOCATOR_MODE_ALLOC,
4298                      name=self.op.instance_name,
4299                      disk_template=self.op.disk_template,
4300                      tags=[],
4301                      os=self.op.os_type,
4302                      vcpus=self.be_full[constants.BE_VCPUS],
4303                      mem_size=self.be_full[constants.BE_MEMORY],
4304                      disks=self.disks,
4305                      nics=nics,
4306                      hypervisor=self.op.hypervisor,
4307                      )
4308
4309     ial.Run(self.op.iallocator)
4310
4311     if not ial.success:
4312       raise errors.OpPrereqError("Can't compute nodes using"
4313                                  " iallocator '%s': %s" % (self.op.iallocator,
4314                                                            ial.info))
4315     if len(ial.nodes) != ial.required_nodes:
4316       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4317                                  " of nodes (%s), required %s" %
4318                                  (self.op.iallocator, len(ial.nodes),
4319                                   ial.required_nodes))
4320     self.op.pnode = ial.nodes[0]
4321     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4322                  self.op.instance_name, self.op.iallocator,
4323                  ", ".join(ial.nodes))
4324     if ial.required_nodes == 2:
4325       self.op.snode = ial.nodes[1]
4326
4327   def BuildHooksEnv(self):
4328     """Build hooks env.
4329
4330     This runs on master, primary and secondary nodes of the instance.
4331
4332     """
4333     env = {
4334       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
4335       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
4336       "INSTANCE_ADD_MODE": self.op.mode,
4337       }
4338     if self.op.mode == constants.INSTANCE_IMPORT:
4339       env["INSTANCE_SRC_NODE"] = self.op.src_node
4340       env["INSTANCE_SRC_PATH"] = self.op.src_path
4341       env["INSTANCE_SRC_IMAGES"] = self.src_images
4342
4343     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
4344       primary_node=self.op.pnode,
4345       secondary_nodes=self.secondaries,
4346       status=self.instance_status,
4347       os_type=self.op.os_type,
4348       memory=self.be_full[constants.BE_MEMORY],
4349       vcpus=self.be_full[constants.BE_VCPUS],
4350       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4351     ))
4352
4353     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4354           self.secondaries)
4355     return env, nl, nl
4356
4357
4358   def CheckPrereq(self):
4359     """Check prerequisites.
4360
4361     """
4362     if (not self.cfg.GetVGName() and
4363         self.op.disk_template not in constants.DTS_NOT_LVM):
4364       raise errors.OpPrereqError("Cluster does not support lvm-based"
4365                                  " instances")
4366
4367
4368     if self.op.mode == constants.INSTANCE_IMPORT:
4369       src_node = self.op.src_node
4370       src_path = self.op.src_path
4371
4372       if src_node is None:
4373         exp_list = self.rpc.call_export_list(
4374           self.acquired_locks[locking.LEVEL_NODE])
4375         found = False
4376         for node in exp_list:
4377           if not exp_list[node].failed and src_path in exp_list[node].data:
4378             found = True
4379             self.op.src_node = src_node = node
4380             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4381                                                        src_path)
4382             break
4383         if not found:
4384           raise errors.OpPrereqError("No export found for relative path %s" %
4385                                       src_path)
4386
4387       _CheckNodeOnline(self, src_node)
4388       result = self.rpc.call_export_info(src_node, src_path)
4389       result.Raise()
4390       if not result.data:
4391         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4392
4393       export_info = result.data
4394       if not export_info.has_section(constants.INISECT_EXP):
4395         raise errors.ProgrammerError("Corrupted export config")
4396
4397       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4398       if (int(ei_version) != constants.EXPORT_VERSION):
4399         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4400                                    (ei_version, constants.EXPORT_VERSION))
4401
4402       # Check that the new instance doesn't have less disks than the export
4403       instance_disks = len(self.disks)
4404       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4405       if instance_disks < export_disks:
4406         raise errors.OpPrereqError("Not enough disks to import."
4407                                    " (instance: %d, export: %d)" %
4408                                    (instance_disks, export_disks))
4409
4410       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4411       disk_images = []
4412       for idx in range(export_disks):
4413         option = 'disk%d_dump' % idx
4414         if export_info.has_option(constants.INISECT_INS, option):
4415           # FIXME: are the old os-es, disk sizes, etc. useful?
4416           export_name = export_info.get(constants.INISECT_INS, option)
4417           image = os.path.join(src_path, export_name)
4418           disk_images.append(image)
4419         else:
4420           disk_images.append(False)
4421
4422       self.src_images = disk_images
4423
4424       old_name = export_info.get(constants.INISECT_INS, 'name')
4425       # FIXME: int() here could throw a ValueError on broken exports
4426       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4427       if self.op.instance_name == old_name:
4428         for idx, nic in enumerate(self.nics):
4429           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4430             nic_mac_ini = 'nic%d_mac' % idx
4431             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4432
4433     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4434     if self.op.start and not self.op.ip_check:
4435       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4436                                  " adding an instance in start mode")
4437
4438     if self.op.ip_check:
4439       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4440         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4441                                    (self.check_ip, self.op.instance_name))
4442
4443     #### allocator run
4444
4445     if self.op.iallocator is not None:
4446       self._RunAllocator()
4447
4448     #### node related checks
4449
4450     # check primary node
4451     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4452     assert self.pnode is not None, \
4453       "Cannot retrieve locked node %s" % self.op.pnode
4454     if pnode.offline:
4455       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4456                                  pnode.name)
4457
4458     self.secondaries = []
4459
4460     # mirror node verification
4461     if self.op.disk_template in constants.DTS_NET_MIRROR:
4462       if self.op.snode is None:
4463         raise errors.OpPrereqError("The networked disk templates need"
4464                                    " a mirror node")
4465       if self.op.snode == pnode.name:
4466         raise errors.OpPrereqError("The secondary node cannot be"
4467                                    " the primary node.")
4468       self.secondaries.append(self.op.snode)
4469       _CheckNodeOnline(self, self.op.snode)
4470
4471     nodenames = [pnode.name] + self.secondaries
4472
4473     req_size = _ComputeDiskSize(self.op.disk_template,
4474                                 self.disks)
4475
4476     # Check lv size requirements
4477     if req_size is not None:
4478       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4479                                          self.op.hypervisor)
4480       for node in nodenames:
4481         info = nodeinfo[node]
4482         info.Raise()
4483         info = info.data
4484         if not info:
4485           raise errors.OpPrereqError("Cannot get current information"
4486                                      " from node '%s'" % node)
4487         vg_free = info.get('vg_free', None)
4488         if not isinstance(vg_free, int):
4489           raise errors.OpPrereqError("Can't compute free disk space on"
4490                                      " node %s" % node)
4491         if req_size > info['vg_free']:
4492           raise errors.OpPrereqError("Not enough disk space on target node %s."
4493                                      " %d MB available, %d MB required" %
4494                                      (node, info['vg_free'], req_size))
4495
4496     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4497
4498     # os verification
4499     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4500     result.Raise()
4501     if not isinstance(result.data, objects.OS):
4502       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4503                                  " primary node"  % self.op.os_type)
4504
4505     # bridge check on primary node
4506     bridges = [n.bridge for n in self.nics]
4507     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4508     result.Raise()
4509     if not result.data:
4510       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4511                                  " exist on destination node '%s'" %
4512                                  (",".join(bridges), pnode.name))
4513
4514     # memory check on primary node
4515     if self.op.start:
4516       _CheckNodeFreeMemory(self, self.pnode.name,
4517                            "creating instance %s" % self.op.instance_name,
4518                            self.be_full[constants.BE_MEMORY],
4519                            self.op.hypervisor)
4520
4521     self.instance_status = self.op.start
4522
4523   def Exec(self, feedback_fn):
4524     """Create and add the instance to the cluster.
4525
4526     """
4527     instance = self.op.instance_name
4528     pnode_name = self.pnode.name
4529
4530     for nic in self.nics:
4531       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4532         nic.mac = self.cfg.GenerateMAC()
4533
4534     ht_kind = self.op.hypervisor
4535     if ht_kind in constants.HTS_REQ_PORT:
4536       network_port = self.cfg.AllocatePort()
4537     else:
4538       network_port = None
4539
4540     ##if self.op.vnc_bind_address is None:
4541     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4542
4543     # this is needed because os.path.join does not accept None arguments
4544     if self.op.file_storage_dir is None:
4545       string_file_storage_dir = ""
4546     else:
4547       string_file_storage_dir = self.op.file_storage_dir
4548
4549     # build the full file storage dir path
4550     file_storage_dir = os.path.normpath(os.path.join(
4551                                         self.cfg.GetFileStorageDir(),
4552                                         string_file_storage_dir, instance))
4553
4554
4555     disks = _GenerateDiskTemplate(self,
4556                                   self.op.disk_template,
4557                                   instance, pnode_name,
4558                                   self.secondaries,
4559                                   self.disks,
4560                                   file_storage_dir,
4561                                   self.op.file_driver,
4562                                   0)
4563
4564     iobj = objects.Instance(name=instance, os=self.op.os_type,
4565                             primary_node=pnode_name,
4566                             nics=self.nics, disks=disks,
4567                             disk_template=self.op.disk_template,
4568                             admin_up=self.instance_status,
4569                             network_port=network_port,
4570                             beparams=self.op.beparams,
4571                             hvparams=self.op.hvparams,
4572                             hypervisor=self.op.hypervisor,
4573                             )
4574
4575     feedback_fn("* creating instance disks...")
4576     try:
4577       _CreateDisks(self, iobj)
4578     except errors.OpExecError:
4579       self.LogWarning("Device creation failed, reverting...")
4580       try:
4581         _RemoveDisks(self, iobj)
4582       finally:
4583         self.cfg.ReleaseDRBDMinors(instance)
4584         raise
4585
4586     feedback_fn("adding instance %s to cluster config" % instance)
4587
4588     self.cfg.AddInstance(iobj)
4589     # Declare that we don't want to remove the instance lock anymore, as we've
4590     # added the instance to the config
4591     del self.remove_locks[locking.LEVEL_INSTANCE]
4592     # Unlock all the nodes
4593     if self.op.mode == constants.INSTANCE_IMPORT:
4594       nodes_keep = [self.op.src_node]
4595       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4596                        if node != self.op.src_node]
4597       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4598       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4599     else:
4600       self.context.glm.release(locking.LEVEL_NODE)
4601       del self.acquired_locks[locking.LEVEL_NODE]
4602
4603     if self.op.wait_for_sync:
4604       disk_abort = not _WaitForSync(self, iobj)
4605     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4606       # make sure the disks are not degraded (still sync-ing is ok)
4607       time.sleep(15)
4608       feedback_fn("* checking mirrors status")
4609       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4610     else:
4611       disk_abort = False
4612
4613     if disk_abort:
4614       _RemoveDisks(self, iobj)
4615       self.cfg.RemoveInstance(iobj.name)
4616       # Make sure the instance lock gets removed
4617       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4618       raise errors.OpExecError("There are some degraded disks for"
4619                                " this instance")
4620
4621     feedback_fn("creating os for instance %s on node %s" %
4622                 (instance, pnode_name))
4623
4624     if iobj.disk_template != constants.DT_DISKLESS:
4625       if self.op.mode == constants.INSTANCE_CREATE:
4626         feedback_fn("* running the instance OS create scripts...")
4627         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4628         msg = result.RemoteFailMsg()
4629         if msg:
4630           raise errors.OpExecError("Could not add os for instance %s"
4631                                    " on node %s: %s" %
4632                                    (instance, pnode_name, msg))
4633
4634       elif self.op.mode == constants.INSTANCE_IMPORT:
4635         feedback_fn("* running the instance OS import scripts...")
4636         src_node = self.op.src_node
4637         src_images = self.src_images
4638         cluster_name = self.cfg.GetClusterName()
4639         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4640                                                          src_node, src_images,
4641                                                          cluster_name)
4642         import_result.Raise()
4643         for idx, result in enumerate(import_result.data):
4644           if not result:
4645             self.LogWarning("Could not import the image %s for instance"
4646                             " %s, disk %d, on node %s" %
4647                             (src_images[idx], instance, idx, pnode_name))
4648       else:
4649         # also checked in the prereq part
4650         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4651                                      % self.op.mode)
4652
4653     if self.op.start:
4654       logging.info("Starting instance %s on node %s", instance, pnode_name)
4655       feedback_fn("* starting instance...")
4656       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4657       msg = result.RemoteFailMsg()
4658       if msg:
4659         raise errors.OpExecError("Could not start instance: %s" % msg)
4660
4661
4662 class LUConnectConsole(NoHooksLU):
4663   """Connect to an instance's console.
4664
4665   This is somewhat special in that it returns the command line that
4666   you need to run on the master node in order to connect to the
4667   console.
4668
4669   """
4670   _OP_REQP = ["instance_name"]
4671   REQ_BGL = False
4672
4673   def ExpandNames(self):
4674     self._ExpandAndLockInstance()
4675
4676   def CheckPrereq(self):
4677     """Check prerequisites.
4678
4679     This checks that the instance is in the cluster.
4680
4681     """
4682     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4683     assert self.instance is not None, \
4684       "Cannot retrieve locked instance %s" % self.op.instance_name
4685     _CheckNodeOnline(self, self.instance.primary_node)
4686
4687   def Exec(self, feedback_fn):
4688     """Connect to the console of an instance
4689
4690     """
4691     instance = self.instance
4692     node = instance.primary_node
4693
4694     node_insts = self.rpc.call_instance_list([node],
4695                                              [instance.hypervisor])[node]
4696     node_insts.Raise()
4697
4698     if instance.name not in node_insts.data:
4699       raise errors.OpExecError("Instance %s is not running." % instance.name)
4700
4701     logging.debug("Connecting to console of %s on %s", instance.name, node)
4702
4703     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4704     cluster = self.cfg.GetClusterInfo()
4705     # beparams and hvparams are passed separately, to avoid editing the
4706     # instance and then saving the defaults in the instance itself.
4707     hvparams = cluster.FillHV(instance)
4708     beparams = cluster.FillBE(instance)
4709     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4710
4711     # build ssh cmdline
4712     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4713
4714
4715 class LUReplaceDisks(LogicalUnit):
4716   """Replace the disks of an instance.
4717
4718   """
4719   HPATH = "mirrors-replace"
4720   HTYPE = constants.HTYPE_INSTANCE
4721   _OP_REQP = ["instance_name", "mode", "disks"]
4722   REQ_BGL = False
4723
4724   def CheckArguments(self):
4725     if not hasattr(self.op, "remote_node"):
4726       self.op.remote_node = None
4727     if not hasattr(self.op, "iallocator"):
4728       self.op.iallocator = None
4729
4730     # check for valid parameter combination
4731     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4732     if self.op.mode == constants.REPLACE_DISK_CHG:
4733       if cnt == 2:
4734         raise errors.OpPrereqError("When changing the secondary either an"
4735                                    " iallocator script must be used or the"
4736                                    " new node given")
4737       elif cnt == 0:
4738         raise errors.OpPrereqError("Give either the iallocator or the new"
4739                                    " secondary, not both")
4740     else: # not replacing the secondary
4741       if cnt != 2:
4742         raise errors.OpPrereqError("The iallocator and new node options can"
4743                                    " be used only when changing the"
4744                                    " secondary node")
4745
4746   def ExpandNames(self):
4747     self._ExpandAndLockInstance()
4748
4749     if self.op.iallocator is not None:
4750       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4751     elif self.op.remote_node is not None:
4752       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4753       if remote_node is None:
4754         raise errors.OpPrereqError("Node '%s' not known" %
4755                                    self.op.remote_node)
4756       self.op.remote_node = remote_node
4757       # Warning: do not remove the locking of the new secondary here
4758       # unless DRBD8.AddChildren is changed to work in parallel;
4759       # currently it doesn't since parallel invocations of
4760       # FindUnusedMinor will conflict
4761       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4762       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4763     else:
4764       self.needed_locks[locking.LEVEL_NODE] = []
4765       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4766
4767   def DeclareLocks(self, level):
4768     # If we're not already locking all nodes in the set we have to declare the
4769     # instance's primary/secondary nodes.
4770     if (level == locking.LEVEL_NODE and
4771         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4772       self._LockInstancesNodes()
4773
4774   def _RunAllocator(self):
4775     """Compute a new secondary node using an IAllocator.
4776
4777     """
4778     ial = IAllocator(self,
4779                      mode=constants.IALLOCATOR_MODE_RELOC,
4780                      name=self.op.instance_name,
4781                      relocate_from=[self.sec_node])
4782
4783     ial.Run(self.op.iallocator)
4784
4785     if not ial.success:
4786       raise errors.OpPrereqError("Can't compute nodes using"
4787                                  " iallocator '%s': %s" % (self.op.iallocator,
4788                                                            ial.info))
4789     if len(ial.nodes) != ial.required_nodes:
4790       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4791                                  " of nodes (%s), required %s" %
4792                                  (len(ial.nodes), ial.required_nodes))
4793     self.op.remote_node = ial.nodes[0]
4794     self.LogInfo("Selected new secondary for the instance: %s",
4795                  self.op.remote_node)
4796
4797   def BuildHooksEnv(self):
4798     """Build hooks env.
4799
4800     This runs on the master, the primary and all the secondaries.
4801
4802     """
4803     env = {
4804       "MODE": self.op.mode,
4805       "NEW_SECONDARY": self.op.remote_node,
4806       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4807       }
4808     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4809     nl = [
4810       self.cfg.GetMasterNode(),
4811       self.instance.primary_node,
4812       ]
4813     if self.op.remote_node is not None:
4814       nl.append(self.op.remote_node)
4815     return env, nl, nl
4816
4817   def CheckPrereq(self):
4818     """Check prerequisites.
4819
4820     This checks that the instance is in the cluster.
4821
4822     """
4823     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4824     assert instance is not None, \
4825       "Cannot retrieve locked instance %s" % self.op.instance_name
4826     self.instance = instance
4827
4828     if instance.disk_template != constants.DT_DRBD8:
4829       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4830                                  " instances")
4831
4832     if len(instance.secondary_nodes) != 1:
4833       raise errors.OpPrereqError("The instance has a strange layout,"
4834                                  " expected one secondary but found %d" %
4835                                  len(instance.secondary_nodes))
4836
4837     self.sec_node = instance.secondary_nodes[0]
4838
4839     if self.op.iallocator is not None:
4840       self._RunAllocator()
4841
4842     remote_node = self.op.remote_node
4843     if remote_node is not None:
4844       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4845       assert self.remote_node_info is not None, \
4846         "Cannot retrieve locked node %s" % remote_node
4847     else:
4848       self.remote_node_info = None
4849     if remote_node == instance.primary_node:
4850       raise errors.OpPrereqError("The specified node is the primary node of"
4851                                  " the instance.")
4852     elif remote_node == self.sec_node:
4853       raise errors.OpPrereqError("The specified node is already the"
4854                                  " secondary node of the instance.")
4855
4856     if self.op.mode == constants.REPLACE_DISK_PRI:
4857       n1 = self.tgt_node = instance.primary_node
4858       n2 = self.oth_node = self.sec_node
4859     elif self.op.mode == constants.REPLACE_DISK_SEC:
4860       n1 = self.tgt_node = self.sec_node
4861       n2 = self.oth_node = instance.primary_node
4862     elif self.op.mode == constants.REPLACE_DISK_CHG:
4863       n1 = self.new_node = remote_node
4864       n2 = self.oth_node = instance.primary_node
4865       self.tgt_node = self.sec_node
4866     else:
4867       raise errors.ProgrammerError("Unhandled disk replace mode")
4868
4869     _CheckNodeOnline(self, n1)
4870     _CheckNodeOnline(self, n2)
4871
4872     if not self.op.disks:
4873       self.op.disks = range(len(instance.disks))
4874
4875     for disk_idx in self.op.disks:
4876       instance.FindDisk(disk_idx)
4877
4878   def _ExecD8DiskOnly(self, feedback_fn):
4879     """Replace a disk on the primary or secondary for dbrd8.
4880
4881     The algorithm for replace is quite complicated:
4882
4883       1. for each disk to be replaced:
4884
4885         1. create new LVs on the target node with unique names
4886         1. detach old LVs from the drbd device
4887         1. rename old LVs to name_replaced.<time_t>
4888         1. rename new LVs to old LVs
4889         1. attach the new LVs (with the old names now) to the drbd device
4890
4891       1. wait for sync across all devices
4892
4893       1. for each modified disk:
4894
4895         1. remove old LVs (which have the name name_replaces.<time_t>)
4896
4897     Failures are not very well handled.
4898
4899     """
4900     steps_total = 6
4901     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4902     instance = self.instance
4903     iv_names = {}
4904     vgname = self.cfg.GetVGName()
4905     # start of work
4906     cfg = self.cfg
4907     tgt_node = self.tgt_node
4908     oth_node = self.oth_node
4909
4910     # Step: check device activation
4911     self.proc.LogStep(1, steps_total, "check device existence")
4912     info("checking volume groups")
4913     my_vg = cfg.GetVGName()
4914     results = self.rpc.call_vg_list([oth_node, tgt_node])
4915     if not results:
4916       raise errors.OpExecError("Can't list volume groups on the nodes")
4917     for node in oth_node, tgt_node:
4918       res = results[node]
4919       if res.failed or not res.data or my_vg not in res.data:
4920         raise errors.OpExecError("Volume group '%s' not found on %s" %
4921                                  (my_vg, node))
4922     for idx, dev in enumerate(instance.disks):
4923       if idx not in self.op.disks:
4924         continue
4925       for node in tgt_node, oth_node:
4926         info("checking disk/%d on %s" % (idx, node))
4927         cfg.SetDiskID(dev, node)
4928         if not self.rpc.call_blockdev_find(node, dev):
4929           raise errors.OpExecError("Can't find disk/%d on node %s" %
4930                                    (idx, node))
4931
4932     # Step: check other node consistency
4933     self.proc.LogStep(2, steps_total, "check peer consistency")
4934     for idx, dev in enumerate(instance.disks):
4935       if idx not in self.op.disks:
4936         continue
4937       info("checking disk/%d consistency on %s" % (idx, oth_node))
4938       if not _CheckDiskConsistency(self, dev, oth_node,
4939                                    oth_node==instance.primary_node):
4940         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4941                                  " to replace disks on this node (%s)" %
4942                                  (oth_node, tgt_node))
4943
4944     # Step: create new storage
4945     self.proc.LogStep(3, steps_total, "allocate new storage")
4946     for idx, dev in enumerate(instance.disks):
4947       if idx not in self.op.disks:
4948         continue
4949       size = dev.size
4950       cfg.SetDiskID(dev, tgt_node)
4951       lv_names = [".disk%d_%s" % (idx, suf)
4952                   for suf in ["data", "meta"]]
4953       names = _GenerateUniqueNames(self, lv_names)
4954       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4955                              logical_id=(vgname, names[0]))
4956       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4957                              logical_id=(vgname, names[1]))
4958       new_lvs = [lv_data, lv_meta]
4959       old_lvs = dev.children
4960       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4961       info("creating new local storage on %s for %s" %
4962            (tgt_node, dev.iv_name))
4963       # we pass force_create=True to force the LVM creation
4964       for new_lv in new_lvs:
4965         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
4966                         _GetInstanceInfoText(instance), False)
4967
4968     # Step: for each lv, detach+rename*2+attach
4969     self.proc.LogStep(4, steps_total, "change drbd configuration")
4970     for dev, old_lvs, new_lvs in iv_names.itervalues():
4971       info("detaching %s drbd from local storage" % dev.iv_name)
4972       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4973       result.Raise()
4974       if not result.data:
4975         raise errors.OpExecError("Can't detach drbd from local storage on node"
4976                                  " %s for device %s" % (tgt_node, dev.iv_name))
4977       #dev.children = []
4978       #cfg.Update(instance)
4979
4980       # ok, we created the new LVs, so now we know we have the needed
4981       # storage; as such, we proceed on the target node to rename
4982       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4983       # using the assumption that logical_id == physical_id (which in
4984       # turn is the unique_id on that node)
4985
4986       # FIXME(iustin): use a better name for the replaced LVs
4987       temp_suffix = int(time.time())
4988       ren_fn = lambda d, suff: (d.physical_id[0],
4989                                 d.physical_id[1] + "_replaced-%s" % suff)
4990       # build the rename list based on what LVs exist on the node
4991       rlist = []
4992       for to_ren in old_lvs:
4993         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4994         if not find_res.failed and find_res.data is not None: # device exists
4995           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4996
4997       info("renaming the old LVs on the target node")
4998       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4999       result.Raise()
5000       if not result.data:
5001         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5002       # now we rename the new LVs to the old LVs
5003       info("renaming the new LVs on the target node")
5004       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5005       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5006       result.Raise()
5007       if not result.data:
5008         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5009
5010       for old, new in zip(old_lvs, new_lvs):
5011         new.logical_id = old.logical_id
5012         cfg.SetDiskID(new, tgt_node)
5013
5014       for disk in old_lvs:
5015         disk.logical_id = ren_fn(disk, temp_suffix)
5016         cfg.SetDiskID(disk, tgt_node)
5017
5018       # now that the new lvs have the old name, we can add them to the device
5019       info("adding new mirror component on %s" % tgt_node)
5020       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5021       if result.failed or not result.data:
5022         for new_lv in new_lvs:
5023           result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
5024           if result.failed or not result.data:
5025             warning("Can't rollback device %s", hint="manually cleanup unused"
5026                     " logical volumes")
5027         raise errors.OpExecError("Can't add local storage to drbd")
5028
5029       dev.children = new_lvs
5030       cfg.Update(instance)
5031
5032     # Step: wait for sync
5033
5034     # this can fail as the old devices are degraded and _WaitForSync
5035     # does a combined result over all disks, so we don't check its
5036     # return value
5037     self.proc.LogStep(5, steps_total, "sync devices")
5038     _WaitForSync(self, instance, unlock=True)
5039
5040     # so check manually all the devices
5041     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5042       cfg.SetDiskID(dev, instance.primary_node)
5043       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5044       if result.failed or result.data[5]:
5045         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5046
5047     # Step: remove old storage
5048     self.proc.LogStep(6, steps_total, "removing old storage")
5049     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5050       info("remove logical volumes for %s" % name)
5051       for lv in old_lvs:
5052         cfg.SetDiskID(lv, tgt_node)
5053         result = self.rpc.call_blockdev_remove(tgt_node, lv)
5054         if result.failed or not result.data:
5055           warning("Can't remove old LV", hint="manually remove unused LVs")
5056           continue
5057
5058   def _ExecD8Secondary(self, feedback_fn):
5059     """Replace the secondary node for drbd8.
5060
5061     The algorithm for replace is quite complicated:
5062       - for all disks of the instance:
5063         - create new LVs on the new node with same names
5064         - shutdown the drbd device on the old secondary
5065         - disconnect the drbd network on the primary
5066         - create the drbd device on the new secondary
5067         - network attach the drbd on the primary, using an artifice:
5068           the drbd code for Attach() will connect to the network if it
5069           finds a device which is connected to the good local disks but
5070           not network enabled
5071       - wait for sync across all devices
5072       - remove all disks from the old secondary
5073
5074     Failures are not very well handled.
5075
5076     """
5077     steps_total = 6
5078     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5079     instance = self.instance
5080     iv_names = {}
5081     # start of work
5082     cfg = self.cfg
5083     old_node = self.tgt_node
5084     new_node = self.new_node
5085     pri_node = instance.primary_node
5086     nodes_ip = {
5087       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5088       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5089       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5090       }
5091
5092     # Step: check device activation
5093     self.proc.LogStep(1, steps_total, "check device existence")
5094     info("checking volume groups")
5095     my_vg = cfg.GetVGName()
5096     results = self.rpc.call_vg_list([pri_node, new_node])
5097     for node in pri_node, new_node:
5098       res = results[node]
5099       if res.failed or not res.data or my_vg not in res.data:
5100         raise errors.OpExecError("Volume group '%s' not found on %s" %
5101                                  (my_vg, node))
5102     for idx, dev in enumerate(instance.disks):
5103       if idx not in self.op.disks:
5104         continue
5105       info("checking disk/%d on %s" % (idx, pri_node))
5106       cfg.SetDiskID(dev, pri_node)
5107       result = self.rpc.call_blockdev_find(pri_node, dev)
5108       result.Raise()
5109       if not result.data:
5110         raise errors.OpExecError("Can't find disk/%d on node %s" %
5111                                  (idx, pri_node))
5112
5113     # Step: check other node consistency
5114     self.proc.LogStep(2, steps_total, "check peer consistency")
5115     for idx, dev in enumerate(instance.disks):
5116       if idx not in self.op.disks:
5117         continue
5118       info("checking disk/%d consistency on %s" % (idx, pri_node))
5119       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5120         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5121                                  " unsafe to replace the secondary" %
5122                                  pri_node)
5123
5124     # Step: create new storage
5125     self.proc.LogStep(3, steps_total, "allocate new storage")
5126     for idx, dev in enumerate(instance.disks):
5127       info("adding new local storage on %s for disk/%d" %
5128            (new_node, idx))
5129       # we pass force_create=True to force LVM creation
5130       for new_lv in dev.children:
5131         _CreateBlockDev(self, new_node, instance, new_lv, True,
5132                         _GetInstanceInfoText(instance), False)
5133
5134     # Step 4: dbrd minors and drbd setups changes
5135     # after this, we must manually remove the drbd minors on both the
5136     # error and the success paths
5137     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5138                                    instance.name)
5139     logging.debug("Allocated minors %s" % (minors,))
5140     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5141     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5142       size = dev.size
5143       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5144       # create new devices on new_node; note that we create two IDs:
5145       # one without port, so the drbd will be activated without
5146       # networking information on the new node at this stage, and one
5147       # with network, for the latter activation in step 4
5148       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5149       if pri_node == o_node1:
5150         p_minor = o_minor1
5151       else:
5152         p_minor = o_minor2
5153
5154       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5155       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5156
5157       iv_names[idx] = (dev, dev.children, new_net_id)
5158       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5159                     new_net_id)
5160       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5161                               logical_id=new_alone_id,
5162                               children=dev.children)
5163       try:
5164         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5165                               _GetInstanceInfoText(instance), False)
5166       except errors.BlockDeviceError:
5167         self.cfg.ReleaseDRBDMinors(instance.name)
5168         raise
5169
5170     for idx, dev in enumerate(instance.disks):
5171       # we have new devices, shutdown the drbd on the old secondary
5172       info("shutting down drbd for disk/%d on old node" % idx)
5173       cfg.SetDiskID(dev, old_node)
5174       result = self.rpc.call_blockdev_shutdown(old_node, dev)
5175       if result.failed or not result.data:
5176         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
5177                 hint="Please cleanup this device manually as soon as possible")
5178
5179     info("detaching primary drbds from the network (=> standalone)")
5180     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5181                                                instance.disks)[pri_node]
5182
5183     msg = result.RemoteFailMsg()
5184     if msg:
5185       # detaches didn't succeed (unlikely)
5186       self.cfg.ReleaseDRBDMinors(instance.name)
5187       raise errors.OpExecError("Can't detach the disks from the network on"
5188                                " old node: %s" % (msg,))
5189
5190     # if we managed to detach at least one, we update all the disks of
5191     # the instance to point to the new secondary
5192     info("updating instance configuration")
5193     for dev, _, new_logical_id in iv_names.itervalues():
5194       dev.logical_id = new_logical_id
5195       cfg.SetDiskID(dev, pri_node)
5196     cfg.Update(instance)
5197
5198     # and now perform the drbd attach
5199     info("attaching primary drbds to new secondary (standalone => connected)")
5200     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5201                                            instance.disks, instance.name,
5202                                            False)
5203     for to_node, to_result in result.items():
5204       msg = to_result.RemoteFailMsg()
5205       if msg:
5206         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5207                 hint="please do a gnt-instance info to see the"
5208                 " status of disks")
5209
5210     # this can fail as the old devices are degraded and _WaitForSync
5211     # does a combined result over all disks, so we don't check its
5212     # return value
5213     self.proc.LogStep(5, steps_total, "sync devices")
5214     _WaitForSync(self, instance, unlock=True)
5215
5216     # so check manually all the devices
5217     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5218       cfg.SetDiskID(dev, pri_node)
5219       result = self.rpc.call_blockdev_find(pri_node, dev)
5220       result.Raise()
5221       if result.data[5]:
5222         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5223
5224     self.proc.LogStep(6, steps_total, "removing old storage")
5225     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5226       info("remove logical volumes for disk/%d" % idx)
5227       for lv in old_lvs:
5228         cfg.SetDiskID(lv, old_node)
5229         result = self.rpc.call_blockdev_remove(old_node, lv)
5230         if result.failed or not result.data:
5231           warning("Can't remove LV on old secondary",
5232                   hint="Cleanup stale volumes by hand")
5233
5234   def Exec(self, feedback_fn):
5235     """Execute disk replacement.
5236
5237     This dispatches the disk replacement to the appropriate handler.
5238
5239     """
5240     instance = self.instance
5241
5242     # Activate the instance disks if we're replacing them on a down instance
5243     if not instance.admin_up:
5244       _StartInstanceDisks(self, instance, True)
5245
5246     if self.op.mode == constants.REPLACE_DISK_CHG:
5247       fn = self._ExecD8Secondary
5248     else:
5249       fn = self._ExecD8DiskOnly
5250
5251     ret = fn(feedback_fn)
5252
5253     # Deactivate the instance disks if we're replacing them on a down instance
5254     if not instance.admin_up:
5255       _SafeShutdownInstanceDisks(self, instance)
5256
5257     return ret
5258
5259
5260 class LUGrowDisk(LogicalUnit):
5261   """Grow a disk of an instance.
5262
5263   """
5264   HPATH = "disk-grow"
5265   HTYPE = constants.HTYPE_INSTANCE
5266   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5267   REQ_BGL = False
5268
5269   def ExpandNames(self):
5270     self._ExpandAndLockInstance()
5271     self.needed_locks[locking.LEVEL_NODE] = []
5272     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5273
5274   def DeclareLocks(self, level):
5275     if level == locking.LEVEL_NODE:
5276       self._LockInstancesNodes()
5277
5278   def BuildHooksEnv(self):
5279     """Build hooks env.
5280
5281     This runs on the master, the primary and all the secondaries.
5282
5283     """
5284     env = {
5285       "DISK": self.op.disk,
5286       "AMOUNT": self.op.amount,
5287       }
5288     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5289     nl = [
5290       self.cfg.GetMasterNode(),
5291       self.instance.primary_node,
5292       ]
5293     return env, nl, nl
5294
5295   def CheckPrereq(self):
5296     """Check prerequisites.
5297
5298     This checks that the instance is in the cluster.
5299
5300     """
5301     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5302     assert instance is not None, \
5303       "Cannot retrieve locked instance %s" % self.op.instance_name
5304     nodenames = list(instance.all_nodes)
5305     for node in nodenames:
5306       _CheckNodeOnline(self, node)
5307
5308
5309     self.instance = instance
5310
5311     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5312       raise errors.OpPrereqError("Instance's disk layout does not support"
5313                                  " growing.")
5314
5315     self.disk = instance.FindDisk(self.op.disk)
5316
5317     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5318                                        instance.hypervisor)
5319     for node in nodenames:
5320       info = nodeinfo[node]
5321       if info.failed or not info.data:
5322         raise errors.OpPrereqError("Cannot get current information"
5323                                    " from node '%s'" % node)
5324       vg_free = info.data.get('vg_free', None)
5325       if not isinstance(vg_free, int):
5326         raise errors.OpPrereqError("Can't compute free disk space on"
5327                                    " node %s" % node)
5328       if self.op.amount > vg_free:
5329         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5330                                    " %d MiB available, %d MiB required" %
5331                                    (node, vg_free, self.op.amount))
5332
5333   def Exec(self, feedback_fn):
5334     """Execute disk grow.
5335
5336     """
5337     instance = self.instance
5338     disk = self.disk
5339     for node in instance.all_nodes:
5340       self.cfg.SetDiskID(disk, node)
5341       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5342       result.Raise()
5343       if (not result.data or not isinstance(result.data, (list, tuple)) or
5344           len(result.data) != 2):
5345         raise errors.OpExecError("Grow request failed to node %s" % node)
5346       elif not result.data[0]:
5347         raise errors.OpExecError("Grow request failed to node %s: %s" %
5348                                  (node, result.data[1]))
5349     disk.RecordGrow(self.op.amount)
5350     self.cfg.Update(instance)
5351     if self.op.wait_for_sync:
5352       disk_abort = not _WaitForSync(self, instance)
5353       if disk_abort:
5354         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5355                              " status.\nPlease check the instance.")
5356
5357
5358 class LUQueryInstanceData(NoHooksLU):
5359   """Query runtime instance data.
5360
5361   """
5362   _OP_REQP = ["instances", "static"]
5363   REQ_BGL = False
5364
5365   def ExpandNames(self):
5366     self.needed_locks = {}
5367     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5368
5369     if not isinstance(self.op.instances, list):
5370       raise errors.OpPrereqError("Invalid argument type 'instances'")
5371
5372     if self.op.instances:
5373       self.wanted_names = []
5374       for name in self.op.instances:
5375         full_name = self.cfg.ExpandInstanceName(name)
5376         if full_name is None:
5377           raise errors.OpPrereqError("Instance '%s' not known" % name)
5378         self.wanted_names.append(full_name)
5379       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5380     else:
5381       self.wanted_names = None
5382       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5383
5384     self.needed_locks[locking.LEVEL_NODE] = []
5385     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5386
5387   def DeclareLocks(self, level):
5388     if level == locking.LEVEL_NODE:
5389       self._LockInstancesNodes()
5390
5391   def CheckPrereq(self):
5392     """Check prerequisites.
5393
5394     This only checks the optional instance list against the existing names.
5395
5396     """
5397     if self.wanted_names is None:
5398       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5399
5400     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5401                              in self.wanted_names]
5402     return
5403
5404   def _ComputeDiskStatus(self, instance, snode, dev):
5405     """Compute block device status.
5406
5407     """
5408     static = self.op.static
5409     if not static:
5410       self.cfg.SetDiskID(dev, instance.primary_node)
5411       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5412       dev_pstatus.Raise()
5413       dev_pstatus = dev_pstatus.data
5414     else:
5415       dev_pstatus = None
5416
5417     if dev.dev_type in constants.LDS_DRBD:
5418       # we change the snode then (otherwise we use the one passed in)
5419       if dev.logical_id[0] == instance.primary_node:
5420         snode = dev.logical_id[1]
5421       else:
5422         snode = dev.logical_id[0]
5423
5424     if snode and not static:
5425       self.cfg.SetDiskID(dev, snode)
5426       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5427       dev_sstatus.Raise()
5428       dev_sstatus = dev_sstatus.data
5429     else:
5430       dev_sstatus = None
5431
5432     if dev.children:
5433       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5434                       for child in dev.children]
5435     else:
5436       dev_children = []
5437
5438     data = {
5439       "iv_name": dev.iv_name,
5440       "dev_type": dev.dev_type,
5441       "logical_id": dev.logical_id,
5442       "physical_id": dev.physical_id,
5443       "pstatus": dev_pstatus,
5444       "sstatus": dev_sstatus,
5445       "children": dev_children,
5446       "mode": dev.mode,
5447       }
5448
5449     return data
5450
5451   def Exec(self, feedback_fn):
5452     """Gather and return data"""
5453     result = {}
5454
5455     cluster = self.cfg.GetClusterInfo()
5456
5457     for instance in self.wanted_instances:
5458       if not self.op.static:
5459         remote_info = self.rpc.call_instance_info(instance.primary_node,
5460                                                   instance.name,
5461                                                   instance.hypervisor)
5462         remote_info.Raise()
5463         remote_info = remote_info.data
5464         if remote_info and "state" in remote_info:
5465           remote_state = "up"
5466         else:
5467           remote_state = "down"
5468       else:
5469         remote_state = None
5470       if instance.admin_up:
5471         config_state = "up"
5472       else:
5473         config_state = "down"
5474
5475       disks = [self._ComputeDiskStatus(instance, None, device)
5476                for device in instance.disks]
5477
5478       idict = {
5479         "name": instance.name,
5480         "config_state": config_state,
5481         "run_state": remote_state,
5482         "pnode": instance.primary_node,
5483         "snodes": instance.secondary_nodes,
5484         "os": instance.os,
5485         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5486         "disks": disks,
5487         "hypervisor": instance.hypervisor,
5488         "network_port": instance.network_port,
5489         "hv_instance": instance.hvparams,
5490         "hv_actual": cluster.FillHV(instance),
5491         "be_instance": instance.beparams,
5492         "be_actual": cluster.FillBE(instance),
5493         }
5494
5495       result[instance.name] = idict
5496
5497     return result
5498
5499
5500 class LUSetInstanceParams(LogicalUnit):
5501   """Modifies an instances's parameters.
5502
5503   """
5504   HPATH = "instance-modify"
5505   HTYPE = constants.HTYPE_INSTANCE
5506   _OP_REQP = ["instance_name"]
5507   REQ_BGL = False
5508
5509   def CheckArguments(self):
5510     if not hasattr(self.op, 'nics'):
5511       self.op.nics = []
5512     if not hasattr(self.op, 'disks'):
5513       self.op.disks = []
5514     if not hasattr(self.op, 'beparams'):
5515       self.op.beparams = {}
5516     if not hasattr(self.op, 'hvparams'):
5517       self.op.hvparams = {}
5518     self.op.force = getattr(self.op, "force", False)
5519     if not (self.op.nics or self.op.disks or
5520             self.op.hvparams or self.op.beparams):
5521       raise errors.OpPrereqError("No changes submitted")
5522
5523     utils.CheckBEParams(self.op.beparams)
5524
5525     # Disk validation
5526     disk_addremove = 0
5527     for disk_op, disk_dict in self.op.disks:
5528       if disk_op == constants.DDM_REMOVE:
5529         disk_addremove += 1
5530         continue
5531       elif disk_op == constants.DDM_ADD:
5532         disk_addremove += 1
5533       else:
5534         if not isinstance(disk_op, int):
5535           raise errors.OpPrereqError("Invalid disk index")
5536       if disk_op == constants.DDM_ADD:
5537         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5538         if mode not in constants.DISK_ACCESS_SET:
5539           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5540         size = disk_dict.get('size', None)
5541         if size is None:
5542           raise errors.OpPrereqError("Required disk parameter size missing")
5543         try:
5544           size = int(size)
5545         except ValueError, err:
5546           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5547                                      str(err))
5548         disk_dict['size'] = size
5549       else:
5550         # modification of disk
5551         if 'size' in disk_dict:
5552           raise errors.OpPrereqError("Disk size change not possible, use"
5553                                      " grow-disk")
5554
5555     if disk_addremove > 1:
5556       raise errors.OpPrereqError("Only one disk add or remove operation"
5557                                  " supported at a time")
5558
5559     # NIC validation
5560     nic_addremove = 0
5561     for nic_op, nic_dict in self.op.nics:
5562       if nic_op == constants.DDM_REMOVE:
5563         nic_addremove += 1
5564         continue
5565       elif nic_op == constants.DDM_ADD:
5566         nic_addremove += 1
5567       else:
5568         if not isinstance(nic_op, int):
5569           raise errors.OpPrereqError("Invalid nic index")
5570
5571       # nic_dict should be a dict
5572       nic_ip = nic_dict.get('ip', None)
5573       if nic_ip is not None:
5574         if nic_ip.lower() == "none":
5575           nic_dict['ip'] = None
5576         else:
5577           if not utils.IsValidIP(nic_ip):
5578             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5579       # we can only check None bridges and assign the default one
5580       nic_bridge = nic_dict.get('bridge', None)
5581       if nic_bridge is None:
5582         nic_dict['bridge'] = self.cfg.GetDefBridge()
5583       # but we can validate MACs
5584       nic_mac = nic_dict.get('mac', None)
5585       if nic_mac is not None:
5586         if self.cfg.IsMacInUse(nic_mac):
5587           raise errors.OpPrereqError("MAC address %s already in use"
5588                                      " in cluster" % nic_mac)
5589         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5590           if not utils.IsValidMac(nic_mac):
5591             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5592     if nic_addremove > 1:
5593       raise errors.OpPrereqError("Only one NIC add or remove operation"
5594                                  " supported at a time")
5595
5596   def ExpandNames(self):
5597     self._ExpandAndLockInstance()
5598     self.needed_locks[locking.LEVEL_NODE] = []
5599     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5600
5601   def DeclareLocks(self, level):
5602     if level == locking.LEVEL_NODE:
5603       self._LockInstancesNodes()
5604
5605   def BuildHooksEnv(self):
5606     """Build hooks env.
5607
5608     This runs on the master, primary and secondaries.
5609
5610     """
5611     args = dict()
5612     if constants.BE_MEMORY in self.be_new:
5613       args['memory'] = self.be_new[constants.BE_MEMORY]
5614     if constants.BE_VCPUS in self.be_new:
5615       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5616     # FIXME: readd disk/nic changes
5617     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5618     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5619     return env, nl, nl
5620
5621   def CheckPrereq(self):
5622     """Check prerequisites.
5623
5624     This only checks the instance list against the existing names.
5625
5626     """
5627     force = self.force = self.op.force
5628
5629     # checking the new params on the primary/secondary nodes
5630
5631     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5632     assert self.instance is not None, \
5633       "Cannot retrieve locked instance %s" % self.op.instance_name
5634     pnode = instance.primary_node
5635     nodelist = list(instance.all_nodes)
5636
5637     # hvparams processing
5638     if self.op.hvparams:
5639       i_hvdict = copy.deepcopy(instance.hvparams)
5640       for key, val in self.op.hvparams.iteritems():
5641         if val == constants.VALUE_DEFAULT:
5642           try:
5643             del i_hvdict[key]
5644           except KeyError:
5645             pass
5646         elif val == constants.VALUE_NONE:
5647           i_hvdict[key] = None
5648         else:
5649           i_hvdict[key] = val
5650       cluster = self.cfg.GetClusterInfo()
5651       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5652                                 i_hvdict)
5653       # local check
5654       hypervisor.GetHypervisor(
5655         instance.hypervisor).CheckParameterSyntax(hv_new)
5656       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5657       self.hv_new = hv_new # the new actual values
5658       self.hv_inst = i_hvdict # the new dict (without defaults)
5659     else:
5660       self.hv_new = self.hv_inst = {}
5661
5662     # beparams processing
5663     if self.op.beparams:
5664       i_bedict = copy.deepcopy(instance.beparams)
5665       for key, val in self.op.beparams.iteritems():
5666         if val == constants.VALUE_DEFAULT:
5667           try:
5668             del i_bedict[key]
5669           except KeyError:
5670             pass
5671         else:
5672           i_bedict[key] = val
5673       cluster = self.cfg.GetClusterInfo()
5674       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5675                                 i_bedict)
5676       self.be_new = be_new # the new actual values
5677       self.be_inst = i_bedict # the new dict (without defaults)
5678     else:
5679       self.be_new = self.be_inst = {}
5680
5681     self.warn = []
5682
5683     if constants.BE_MEMORY in self.op.beparams and not self.force:
5684       mem_check_list = [pnode]
5685       if be_new[constants.BE_AUTO_BALANCE]:
5686         # either we changed auto_balance to yes or it was from before
5687         mem_check_list.extend(instance.secondary_nodes)
5688       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5689                                                   instance.hypervisor)
5690       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5691                                          instance.hypervisor)
5692       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5693         # Assume the primary node is unreachable and go ahead
5694         self.warn.append("Can't get info from primary node %s" % pnode)
5695       else:
5696         if not instance_info.failed and instance_info.data:
5697           current_mem = instance_info.data['memory']
5698         else:
5699           # Assume instance not running
5700           # (there is a slight race condition here, but it's not very probable,
5701           # and we have no other way to check)
5702           current_mem = 0
5703         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5704                     nodeinfo[pnode].data['memory_free'])
5705         if miss_mem > 0:
5706           raise errors.OpPrereqError("This change will prevent the instance"
5707                                      " from starting, due to %d MB of memory"
5708                                      " missing on its primary node" % miss_mem)
5709
5710       if be_new[constants.BE_AUTO_BALANCE]:
5711         for node, nres in nodeinfo.iteritems():
5712           if node not in instance.secondary_nodes:
5713             continue
5714           if nres.failed or not isinstance(nres.data, dict):
5715             self.warn.append("Can't get info from secondary node %s" % node)
5716           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5717             self.warn.append("Not enough memory to failover instance to"
5718                              " secondary node %s" % node)
5719
5720     # NIC processing
5721     for nic_op, nic_dict in self.op.nics:
5722       if nic_op == constants.DDM_REMOVE:
5723         if not instance.nics:
5724           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5725         continue
5726       if nic_op != constants.DDM_ADD:
5727         # an existing nic
5728         if nic_op < 0 or nic_op >= len(instance.nics):
5729           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5730                                      " are 0 to %d" %
5731                                      (nic_op, len(instance.nics)))
5732       nic_bridge = nic_dict.get('bridge', None)
5733       if nic_bridge is not None:
5734         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5735           msg = ("Bridge '%s' doesn't exist on one of"
5736                  " the instance nodes" % nic_bridge)
5737           if self.force:
5738             self.warn.append(msg)
5739           else:
5740             raise errors.OpPrereqError(msg)
5741
5742     # DISK processing
5743     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5744       raise errors.OpPrereqError("Disk operations not supported for"
5745                                  " diskless instances")
5746     for disk_op, disk_dict in self.op.disks:
5747       if disk_op == constants.DDM_REMOVE:
5748         if len(instance.disks) == 1:
5749           raise errors.OpPrereqError("Cannot remove the last disk of"
5750                                      " an instance")
5751         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5752         ins_l = ins_l[pnode]
5753         if ins_l.failed or not isinstance(ins_l.data, list):
5754           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5755         if instance.name in ins_l.data:
5756           raise errors.OpPrereqError("Instance is running, can't remove"
5757                                      " disks.")
5758
5759       if (disk_op == constants.DDM_ADD and
5760           len(instance.nics) >= constants.MAX_DISKS):
5761         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5762                                    " add more" % constants.MAX_DISKS)
5763       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5764         # an existing disk
5765         if disk_op < 0 or disk_op >= len(instance.disks):
5766           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5767                                      " are 0 to %d" %
5768                                      (disk_op, len(instance.disks)))
5769
5770     return
5771
5772   def Exec(self, feedback_fn):
5773     """Modifies an instance.
5774
5775     All parameters take effect only at the next restart of the instance.
5776
5777     """
5778     # Process here the warnings from CheckPrereq, as we don't have a
5779     # feedback_fn there.
5780     for warn in self.warn:
5781       feedback_fn("WARNING: %s" % warn)
5782
5783     result = []
5784     instance = self.instance
5785     # disk changes
5786     for disk_op, disk_dict in self.op.disks:
5787       if disk_op == constants.DDM_REMOVE:
5788         # remove the last disk
5789         device = instance.disks.pop()
5790         device_idx = len(instance.disks)
5791         for node, disk in device.ComputeNodeTree(instance.primary_node):
5792           self.cfg.SetDiskID(disk, node)
5793           rpc_result = self.rpc.call_blockdev_remove(node, disk)
5794           if rpc_result.failed or not rpc_result.data:
5795             self.proc.LogWarning("Could not remove disk/%d on node %s,"
5796                                  " continuing anyway", device_idx, node)
5797         result.append(("disk/%d" % device_idx, "remove"))
5798       elif disk_op == constants.DDM_ADD:
5799         # add a new disk
5800         if instance.disk_template == constants.DT_FILE:
5801           file_driver, file_path = instance.disks[0].logical_id
5802           file_path = os.path.dirname(file_path)
5803         else:
5804           file_driver = file_path = None
5805         disk_idx_base = len(instance.disks)
5806         new_disk = _GenerateDiskTemplate(self,
5807                                          instance.disk_template,
5808                                          instance.name, instance.primary_node,
5809                                          instance.secondary_nodes,
5810                                          [disk_dict],
5811                                          file_path,
5812                                          file_driver,
5813                                          disk_idx_base)[0]
5814         instance.disks.append(new_disk)
5815         info = _GetInstanceInfoText(instance)
5816
5817         logging.info("Creating volume %s for instance %s",
5818                      new_disk.iv_name, instance.name)
5819         # Note: this needs to be kept in sync with _CreateDisks
5820         #HARDCODE
5821         for node in instance.all_nodes:
5822           f_create = node == instance.primary_node
5823           try:
5824             _CreateBlockDev(self, node, instance, new_disk,
5825                             f_create, info, f_create)
5826           except errors.OpExecError, err:
5827             self.LogWarning("Failed to create volume %s (%s) on"
5828                             " node %s: %s",
5829                             new_disk.iv_name, new_disk, node, err)
5830         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5831                        (new_disk.size, new_disk.mode)))
5832       else:
5833         # change a given disk
5834         instance.disks[disk_op].mode = disk_dict['mode']
5835         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5836     # NIC changes
5837     for nic_op, nic_dict in self.op.nics:
5838       if nic_op == constants.DDM_REMOVE:
5839         # remove the last nic
5840         del instance.nics[-1]
5841         result.append(("nic.%d" % len(instance.nics), "remove"))
5842       elif nic_op == constants.DDM_ADD:
5843         # add a new nic
5844         if 'mac' not in nic_dict:
5845           mac = constants.VALUE_GENERATE
5846         else:
5847           mac = nic_dict['mac']
5848         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5849           mac = self.cfg.GenerateMAC()
5850         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5851                               bridge=nic_dict.get('bridge', None))
5852         instance.nics.append(new_nic)
5853         result.append(("nic.%d" % (len(instance.nics) - 1),
5854                        "add:mac=%s,ip=%s,bridge=%s" %
5855                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5856       else:
5857         # change a given nic
5858         for key in 'mac', 'ip', 'bridge':
5859           if key in nic_dict:
5860             setattr(instance.nics[nic_op], key, nic_dict[key])
5861             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5862
5863     # hvparams changes
5864     if self.op.hvparams:
5865       instance.hvparams = self.hv_new
5866       for key, val in self.op.hvparams.iteritems():
5867         result.append(("hv/%s" % key, val))
5868
5869     # beparams changes
5870     if self.op.beparams:
5871       instance.beparams = self.be_inst
5872       for key, val in self.op.beparams.iteritems():
5873         result.append(("be/%s" % key, val))
5874
5875     self.cfg.Update(instance)
5876
5877     return result
5878
5879
5880 class LUQueryExports(NoHooksLU):
5881   """Query the exports list
5882
5883   """
5884   _OP_REQP = ['nodes']
5885   REQ_BGL = False
5886
5887   def ExpandNames(self):
5888     self.needed_locks = {}
5889     self.share_locks[locking.LEVEL_NODE] = 1
5890     if not self.op.nodes:
5891       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5892     else:
5893       self.needed_locks[locking.LEVEL_NODE] = \
5894         _GetWantedNodes(self, self.op.nodes)
5895
5896   def CheckPrereq(self):
5897     """Check prerequisites.
5898
5899     """
5900     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5901
5902   def Exec(self, feedback_fn):
5903     """Compute the list of all the exported system images.
5904
5905     @rtype: dict
5906     @return: a dictionary with the structure node->(export-list)
5907         where export-list is a list of the instances exported on
5908         that node.
5909
5910     """
5911     rpcresult = self.rpc.call_export_list(self.nodes)
5912     result = {}
5913     for node in rpcresult:
5914       if rpcresult[node].failed:
5915         result[node] = False
5916       else:
5917         result[node] = rpcresult[node].data
5918
5919     return result
5920
5921
5922 class LUExportInstance(LogicalUnit):
5923   """Export an instance to an image in the cluster.
5924
5925   """
5926   HPATH = "instance-export"
5927   HTYPE = constants.HTYPE_INSTANCE
5928   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5929   REQ_BGL = False
5930
5931   def ExpandNames(self):
5932     self._ExpandAndLockInstance()
5933     # FIXME: lock only instance primary and destination node
5934     #
5935     # Sad but true, for now we have do lock all nodes, as we don't know where
5936     # the previous export might be, and and in this LU we search for it and
5937     # remove it from its current node. In the future we could fix this by:
5938     #  - making a tasklet to search (share-lock all), then create the new one,
5939     #    then one to remove, after
5940     #  - removing the removal operation altoghether
5941     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5942
5943   def DeclareLocks(self, level):
5944     """Last minute lock declaration."""
5945     # All nodes are locked anyway, so nothing to do here.
5946
5947   def BuildHooksEnv(self):
5948     """Build hooks env.
5949
5950     This will run on the master, primary node and target node.
5951
5952     """
5953     env = {
5954       "EXPORT_NODE": self.op.target_node,
5955       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5956       }
5957     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5958     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5959           self.op.target_node]
5960     return env, nl, nl
5961
5962   def CheckPrereq(self):
5963     """Check prerequisites.
5964
5965     This checks that the instance and node names are valid.
5966
5967     """
5968     instance_name = self.op.instance_name
5969     self.instance = self.cfg.GetInstanceInfo(instance_name)
5970     assert self.instance is not None, \
5971           "Cannot retrieve locked instance %s" % self.op.instance_name
5972     _CheckNodeOnline(self, self.instance.primary_node)
5973
5974     self.dst_node = self.cfg.GetNodeInfo(
5975       self.cfg.ExpandNodeName(self.op.target_node))
5976
5977     if self.dst_node is None:
5978       # This is wrong node name, not a non-locked node
5979       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5980     _CheckNodeOnline(self, self.dst_node.name)
5981
5982     # instance disk type verification
5983     for disk in self.instance.disks:
5984       if disk.dev_type == constants.LD_FILE:
5985         raise errors.OpPrereqError("Export not supported for instances with"
5986                                    " file-based disks")
5987
5988   def Exec(self, feedback_fn):
5989     """Export an instance to an image in the cluster.
5990
5991     """
5992     instance = self.instance
5993     dst_node = self.dst_node
5994     src_node = instance.primary_node
5995     if self.op.shutdown:
5996       # shutdown the instance, but not the disks
5997       result = self.rpc.call_instance_shutdown(src_node, instance)
5998       result.Raise()
5999       if not result.data:
6000         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
6001                                  (instance.name, src_node))
6002
6003     vgname = self.cfg.GetVGName()
6004
6005     snap_disks = []
6006
6007     # set the disks ID correctly since call_instance_start needs the
6008     # correct drbd minor to create the symlinks
6009     for disk in instance.disks:
6010       self.cfg.SetDiskID(disk, src_node)
6011
6012     try:
6013       for disk in instance.disks:
6014         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6015         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6016         if new_dev_name.failed or not new_dev_name.data:
6017           self.LogWarning("Could not snapshot block device %s on node %s",
6018                           disk.logical_id[1], src_node)
6019           snap_disks.append(False)
6020         else:
6021           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6022                                  logical_id=(vgname, new_dev_name.data),
6023                                  physical_id=(vgname, new_dev_name.data),
6024                                  iv_name=disk.iv_name)
6025           snap_disks.append(new_dev)
6026
6027     finally:
6028       if self.op.shutdown and instance.admin_up:
6029         result = self.rpc.call_instance_start(src_node, instance, None)
6030         msg = result.RemoteFailMsg()
6031         if msg:
6032           _ShutdownInstanceDisks(self, instance)
6033           raise errors.OpExecError("Could not start instance: %s" % msg)
6034
6035     # TODO: check for size
6036
6037     cluster_name = self.cfg.GetClusterName()
6038     for idx, dev in enumerate(snap_disks):
6039       if dev:
6040         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6041                                                instance, cluster_name, idx)
6042         if result.failed or not result.data:
6043           self.LogWarning("Could not export block device %s from node %s to"
6044                           " node %s", dev.logical_id[1], src_node,
6045                           dst_node.name)
6046         result = self.rpc.call_blockdev_remove(src_node, dev)
6047         if result.failed or not result.data:
6048           self.LogWarning("Could not remove snapshot block device %s from node"
6049                           " %s", dev.logical_id[1], src_node)
6050
6051     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6052     if result.failed or not result.data:
6053       self.LogWarning("Could not finalize export for instance %s on node %s",
6054                       instance.name, dst_node.name)
6055
6056     nodelist = self.cfg.GetNodeList()
6057     nodelist.remove(dst_node.name)
6058
6059     # on one-node clusters nodelist will be empty after the removal
6060     # if we proceed the backup would be removed because OpQueryExports
6061     # substitutes an empty list with the full cluster node list.
6062     if nodelist:
6063       exportlist = self.rpc.call_export_list(nodelist)
6064       for node in exportlist:
6065         if exportlist[node].failed:
6066           continue
6067         if instance.name in exportlist[node].data:
6068           if not self.rpc.call_export_remove(node, instance.name):
6069             self.LogWarning("Could not remove older export for instance %s"
6070                             " on node %s", instance.name, node)
6071
6072
6073 class LURemoveExport(NoHooksLU):
6074   """Remove exports related to the named instance.
6075
6076   """
6077   _OP_REQP = ["instance_name"]
6078   REQ_BGL = False
6079
6080   def ExpandNames(self):
6081     self.needed_locks = {}
6082     # We need all nodes to be locked in order for RemoveExport to work, but we
6083     # don't need to lock the instance itself, as nothing will happen to it (and
6084     # we can remove exports also for a removed instance)
6085     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6086
6087   def CheckPrereq(self):
6088     """Check prerequisites.
6089     """
6090     pass
6091
6092   def Exec(self, feedback_fn):
6093     """Remove any export.
6094
6095     """
6096     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6097     # If the instance was not found we'll try with the name that was passed in.
6098     # This will only work if it was an FQDN, though.
6099     fqdn_warn = False
6100     if not instance_name:
6101       fqdn_warn = True
6102       instance_name = self.op.instance_name
6103
6104     exportlist = self.rpc.call_export_list(self.acquired_locks[
6105       locking.LEVEL_NODE])
6106     found = False
6107     for node in exportlist:
6108       if exportlist[node].failed:
6109         self.LogWarning("Failed to query node %s, continuing" % node)
6110         continue
6111       if instance_name in exportlist[node].data:
6112         found = True
6113         result = self.rpc.call_export_remove(node, instance_name)
6114         if result.failed or not result.data:
6115           logging.error("Could not remove export for instance %s"
6116                         " on node %s", instance_name, node)
6117
6118     if fqdn_warn and not found:
6119       feedback_fn("Export not found. If trying to remove an export belonging"
6120                   " to a deleted instance please use its Fully Qualified"
6121                   " Domain Name.")
6122
6123
6124 class TagsLU(NoHooksLU):
6125   """Generic tags LU.
6126
6127   This is an abstract class which is the parent of all the other tags LUs.
6128
6129   """
6130
6131   def ExpandNames(self):
6132     self.needed_locks = {}
6133     if self.op.kind == constants.TAG_NODE:
6134       name = self.cfg.ExpandNodeName(self.op.name)
6135       if name is None:
6136         raise errors.OpPrereqError("Invalid node name (%s)" %
6137                                    (self.op.name,))
6138       self.op.name = name
6139       self.needed_locks[locking.LEVEL_NODE] = name
6140     elif self.op.kind == constants.TAG_INSTANCE:
6141       name = self.cfg.ExpandInstanceName(self.op.name)
6142       if name is None:
6143         raise errors.OpPrereqError("Invalid instance name (%s)" %
6144                                    (self.op.name,))
6145       self.op.name = name
6146       self.needed_locks[locking.LEVEL_INSTANCE] = name
6147
6148   def CheckPrereq(self):
6149     """Check prerequisites.
6150
6151     """
6152     if self.op.kind == constants.TAG_CLUSTER:
6153       self.target = self.cfg.GetClusterInfo()
6154     elif self.op.kind == constants.TAG_NODE:
6155       self.target = self.cfg.GetNodeInfo(self.op.name)
6156     elif self.op.kind == constants.TAG_INSTANCE:
6157       self.target = self.cfg.GetInstanceInfo(self.op.name)
6158     else:
6159       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6160                                  str(self.op.kind))
6161
6162
6163 class LUGetTags(TagsLU):
6164   """Returns the tags of a given object.
6165
6166   """
6167   _OP_REQP = ["kind", "name"]
6168   REQ_BGL = False
6169
6170   def Exec(self, feedback_fn):
6171     """Returns the tag list.
6172
6173     """
6174     return list(self.target.GetTags())
6175
6176
6177 class LUSearchTags(NoHooksLU):
6178   """Searches the tags for a given pattern.
6179
6180   """
6181   _OP_REQP = ["pattern"]
6182   REQ_BGL = False
6183
6184   def ExpandNames(self):
6185     self.needed_locks = {}
6186
6187   def CheckPrereq(self):
6188     """Check prerequisites.
6189
6190     This checks the pattern passed for validity by compiling it.
6191
6192     """
6193     try:
6194       self.re = re.compile(self.op.pattern)
6195     except re.error, err:
6196       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6197                                  (self.op.pattern, err))
6198
6199   def Exec(self, feedback_fn):
6200     """Returns the tag list.
6201
6202     """
6203     cfg = self.cfg
6204     tgts = [("/cluster", cfg.GetClusterInfo())]
6205     ilist = cfg.GetAllInstancesInfo().values()
6206     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6207     nlist = cfg.GetAllNodesInfo().values()
6208     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6209     results = []
6210     for path, target in tgts:
6211       for tag in target.GetTags():
6212         if self.re.search(tag):
6213           results.append((path, tag))
6214     return results
6215
6216
6217 class LUAddTags(TagsLU):
6218   """Sets a tag on a given object.
6219
6220   """
6221   _OP_REQP = ["kind", "name", "tags"]
6222   REQ_BGL = False
6223
6224   def CheckPrereq(self):
6225     """Check prerequisites.
6226
6227     This checks the type and length of the tag name and value.
6228
6229     """
6230     TagsLU.CheckPrereq(self)
6231     for tag in self.op.tags:
6232       objects.TaggableObject.ValidateTag(tag)
6233
6234   def Exec(self, feedback_fn):
6235     """Sets the tag.
6236
6237     """
6238     try:
6239       for tag in self.op.tags:
6240         self.target.AddTag(tag)
6241     except errors.TagError, err:
6242       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6243     try:
6244       self.cfg.Update(self.target)
6245     except errors.ConfigurationError:
6246       raise errors.OpRetryError("There has been a modification to the"
6247                                 " config file and the operation has been"
6248                                 " aborted. Please retry.")
6249
6250
6251 class LUDelTags(TagsLU):
6252   """Delete a list of tags from a given object.
6253
6254   """
6255   _OP_REQP = ["kind", "name", "tags"]
6256   REQ_BGL = False
6257
6258   def CheckPrereq(self):
6259     """Check prerequisites.
6260
6261     This checks that we have the given tag.
6262
6263     """
6264     TagsLU.CheckPrereq(self)
6265     for tag in self.op.tags:
6266       objects.TaggableObject.ValidateTag(tag)
6267     del_tags = frozenset(self.op.tags)
6268     cur_tags = self.target.GetTags()
6269     if not del_tags <= cur_tags:
6270       diff_tags = del_tags - cur_tags
6271       diff_names = ["'%s'" % tag for tag in diff_tags]
6272       diff_names.sort()
6273       raise errors.OpPrereqError("Tag(s) %s not found" %
6274                                  (",".join(diff_names)))
6275
6276   def Exec(self, feedback_fn):
6277     """Remove the tag from the object.
6278
6279     """
6280     for tag in self.op.tags:
6281       self.target.RemoveTag(tag)
6282     try:
6283       self.cfg.Update(self.target)
6284     except errors.ConfigurationError:
6285       raise errors.OpRetryError("There has been a modification to the"
6286                                 " config file and the operation has been"
6287                                 " aborted. Please retry.")
6288
6289
6290 class LUTestDelay(NoHooksLU):
6291   """Sleep for a specified amount of time.
6292
6293   This LU sleeps on the master and/or nodes for a specified amount of
6294   time.
6295
6296   """
6297   _OP_REQP = ["duration", "on_master", "on_nodes"]
6298   REQ_BGL = False
6299
6300   def ExpandNames(self):
6301     """Expand names and set required locks.
6302
6303     This expands the node list, if any.
6304
6305     """
6306     self.needed_locks = {}
6307     if self.op.on_nodes:
6308       # _GetWantedNodes can be used here, but is not always appropriate to use
6309       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6310       # more information.
6311       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6312       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6313
6314   def CheckPrereq(self):
6315     """Check prerequisites.
6316
6317     """
6318
6319   def Exec(self, feedback_fn):
6320     """Do the actual sleep.
6321
6322     """
6323     if self.op.on_master:
6324       if not utils.TestDelay(self.op.duration):
6325         raise errors.OpExecError("Error during master delay test")
6326     if self.op.on_nodes:
6327       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6328       if not result:
6329         raise errors.OpExecError("Complete failure from rpc call")
6330       for node, node_result in result.items():
6331         node_result.Raise()
6332         if not node_result.data:
6333           raise errors.OpExecError("Failure during rpc call to node %s,"
6334                                    " result: %s" % (node, node_result.data))
6335
6336
6337 class IAllocator(object):
6338   """IAllocator framework.
6339
6340   An IAllocator instance has three sets of attributes:
6341     - cfg that is needed to query the cluster
6342     - input data (all members of the _KEYS class attribute are required)
6343     - four buffer attributes (in|out_data|text), that represent the
6344       input (to the external script) in text and data structure format,
6345       and the output from it, again in two formats
6346     - the result variables from the script (success, info, nodes) for
6347       easy usage
6348
6349   """
6350   _ALLO_KEYS = [
6351     "mem_size", "disks", "disk_template",
6352     "os", "tags", "nics", "vcpus", "hypervisor",
6353     ]
6354   _RELO_KEYS = [
6355     "relocate_from",
6356     ]
6357
6358   def __init__(self, lu, mode, name, **kwargs):
6359     self.lu = lu
6360     # init buffer variables
6361     self.in_text = self.out_text = self.in_data = self.out_data = None
6362     # init all input fields so that pylint is happy
6363     self.mode = mode
6364     self.name = name
6365     self.mem_size = self.disks = self.disk_template = None
6366     self.os = self.tags = self.nics = self.vcpus = None
6367     self.hypervisor = None
6368     self.relocate_from = None
6369     # computed fields
6370     self.required_nodes = None
6371     # init result fields
6372     self.success = self.info = self.nodes = None
6373     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6374       keyset = self._ALLO_KEYS
6375     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6376       keyset = self._RELO_KEYS
6377     else:
6378       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6379                                    " IAllocator" % self.mode)
6380     for key in kwargs:
6381       if key not in keyset:
6382         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6383                                      " IAllocator" % key)
6384       setattr(self, key, kwargs[key])
6385     for key in keyset:
6386       if key not in kwargs:
6387         raise errors.ProgrammerError("Missing input parameter '%s' to"
6388                                      " IAllocator" % key)
6389     self._BuildInputData()
6390
6391   def _ComputeClusterData(self):
6392     """Compute the generic allocator input data.
6393
6394     This is the data that is independent of the actual operation.
6395
6396     """
6397     cfg = self.lu.cfg
6398     cluster_info = cfg.GetClusterInfo()
6399     # cluster data
6400     data = {
6401       "version": 1,
6402       "cluster_name": cfg.GetClusterName(),
6403       "cluster_tags": list(cluster_info.GetTags()),
6404       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6405       # we don't have job IDs
6406       }
6407     iinfo = cfg.GetAllInstancesInfo().values()
6408     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6409
6410     # node data
6411     node_results = {}
6412     node_list = cfg.GetNodeList()
6413
6414     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6415       hypervisor_name = self.hypervisor
6416     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6417       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6418
6419     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6420                                            hypervisor_name)
6421     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6422                        cluster_info.enabled_hypervisors)
6423     for nname, nresult in node_data.items():
6424       # first fill in static (config-based) values
6425       ninfo = cfg.GetNodeInfo(nname)
6426       pnr = {
6427         "tags": list(ninfo.GetTags()),
6428         "primary_ip": ninfo.primary_ip,
6429         "secondary_ip": ninfo.secondary_ip,
6430         "offline": ninfo.offline,
6431         "master_candidate": ninfo.master_candidate,
6432         }
6433
6434       if not ninfo.offline:
6435         nresult.Raise()
6436         if not isinstance(nresult.data, dict):
6437           raise errors.OpExecError("Can't get data for node %s" % nname)
6438         remote_info = nresult.data
6439         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6440                      'vg_size', 'vg_free', 'cpu_total']:
6441           if attr not in remote_info:
6442             raise errors.OpExecError("Node '%s' didn't return attribute"
6443                                      " '%s'" % (nname, attr))
6444           try:
6445             remote_info[attr] = int(remote_info[attr])
6446           except ValueError, err:
6447             raise errors.OpExecError("Node '%s' returned invalid value"
6448                                      " for '%s': %s" % (nname, attr, err))
6449         # compute memory used by primary instances
6450         i_p_mem = i_p_up_mem = 0
6451         for iinfo, beinfo in i_list:
6452           if iinfo.primary_node == nname:
6453             i_p_mem += beinfo[constants.BE_MEMORY]
6454             if iinfo.name not in node_iinfo[nname].data:
6455               i_used_mem = 0
6456             else:
6457               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6458             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6459             remote_info['memory_free'] -= max(0, i_mem_diff)
6460
6461             if iinfo.admin_up:
6462               i_p_up_mem += beinfo[constants.BE_MEMORY]
6463
6464         # compute memory used by instances
6465         pnr_dyn = {
6466           "total_memory": remote_info['memory_total'],
6467           "reserved_memory": remote_info['memory_dom0'],
6468           "free_memory": remote_info['memory_free'],
6469           "total_disk": remote_info['vg_size'],
6470           "free_disk": remote_info['vg_free'],
6471           "total_cpus": remote_info['cpu_total'],
6472           "i_pri_memory": i_p_mem,
6473           "i_pri_up_memory": i_p_up_mem,
6474           }
6475         pnr.update(pnr_dyn)
6476
6477       node_results[nname] = pnr
6478     data["nodes"] = node_results
6479
6480     # instance data
6481     instance_data = {}
6482     for iinfo, beinfo in i_list:
6483       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6484                   for n in iinfo.nics]
6485       pir = {
6486         "tags": list(iinfo.GetTags()),
6487         "admin_up": iinfo.admin_up,
6488         "vcpus": beinfo[constants.BE_VCPUS],
6489         "memory": beinfo[constants.BE_MEMORY],
6490         "os": iinfo.os,
6491         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6492         "nics": nic_data,
6493         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6494         "disk_template": iinfo.disk_template,
6495         "hypervisor": iinfo.hypervisor,
6496         }
6497       instance_data[iinfo.name] = pir
6498
6499     data["instances"] = instance_data
6500
6501     self.in_data = data
6502
6503   def _AddNewInstance(self):
6504     """Add new instance data to allocator structure.
6505
6506     This in combination with _AllocatorGetClusterData will create the
6507     correct structure needed as input for the allocator.
6508
6509     The checks for the completeness of the opcode must have already been
6510     done.
6511
6512     """
6513     data = self.in_data
6514
6515     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6516
6517     if self.disk_template in constants.DTS_NET_MIRROR:
6518       self.required_nodes = 2
6519     else:
6520       self.required_nodes = 1
6521     request = {
6522       "type": "allocate",
6523       "name": self.name,
6524       "disk_template": self.disk_template,
6525       "tags": self.tags,
6526       "os": self.os,
6527       "vcpus": self.vcpus,
6528       "memory": self.mem_size,
6529       "disks": self.disks,
6530       "disk_space_total": disk_space,
6531       "nics": self.nics,
6532       "required_nodes": self.required_nodes,
6533       }
6534     data["request"] = request
6535
6536   def _AddRelocateInstance(self):
6537     """Add relocate instance data to allocator structure.
6538
6539     This in combination with _IAllocatorGetClusterData will create the
6540     correct structure needed as input for the allocator.
6541
6542     The checks for the completeness of the opcode must have already been
6543     done.
6544
6545     """
6546     instance = self.lu.cfg.GetInstanceInfo(self.name)
6547     if instance is None:
6548       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6549                                    " IAllocator" % self.name)
6550
6551     if instance.disk_template not in constants.DTS_NET_MIRROR:
6552       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6553
6554     if len(instance.secondary_nodes) != 1:
6555       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6556
6557     self.required_nodes = 1
6558     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6559     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6560
6561     request = {
6562       "type": "relocate",
6563       "name": self.name,
6564       "disk_space_total": disk_space,
6565       "required_nodes": self.required_nodes,
6566       "relocate_from": self.relocate_from,
6567       }
6568     self.in_data["request"] = request
6569
6570   def _BuildInputData(self):
6571     """Build input data structures.
6572
6573     """
6574     self._ComputeClusterData()
6575
6576     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6577       self._AddNewInstance()
6578     else:
6579       self._AddRelocateInstance()
6580
6581     self.in_text = serializer.Dump(self.in_data)
6582
6583   def Run(self, name, validate=True, call_fn=None):
6584     """Run an instance allocator and return the results.
6585
6586     """
6587     if call_fn is None:
6588       call_fn = self.lu.rpc.call_iallocator_runner
6589     data = self.in_text
6590
6591     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6592     result.Raise()
6593
6594     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6595       raise errors.OpExecError("Invalid result from master iallocator runner")
6596
6597     rcode, stdout, stderr, fail = result.data
6598
6599     if rcode == constants.IARUN_NOTFOUND:
6600       raise errors.OpExecError("Can't find allocator '%s'" % name)
6601     elif rcode == constants.IARUN_FAILURE:
6602       raise errors.OpExecError("Instance allocator call failed: %s,"
6603                                " output: %s" % (fail, stdout+stderr))
6604     self.out_text = stdout
6605     if validate:
6606       self._ValidateResult()
6607
6608   def _ValidateResult(self):
6609     """Process the allocator results.
6610
6611     This will process and if successful save the result in
6612     self.out_data and the other parameters.
6613
6614     """
6615     try:
6616       rdict = serializer.Load(self.out_text)
6617     except Exception, err:
6618       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6619
6620     if not isinstance(rdict, dict):
6621       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6622
6623     for key in "success", "info", "nodes":
6624       if key not in rdict:
6625         raise errors.OpExecError("Can't parse iallocator results:"
6626                                  " missing key '%s'" % key)
6627       setattr(self, key, rdict[key])
6628
6629     if not isinstance(rdict["nodes"], list):
6630       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6631                                " is not a list")
6632     self.out_data = rdict
6633
6634
6635 class LUTestAllocator(NoHooksLU):
6636   """Run allocator tests.
6637
6638   This LU runs the allocator tests
6639
6640   """
6641   _OP_REQP = ["direction", "mode", "name"]
6642
6643   def CheckPrereq(self):
6644     """Check prerequisites.
6645
6646     This checks the opcode parameters depending on the director and mode test.
6647
6648     """
6649     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6650       for attr in ["name", "mem_size", "disks", "disk_template",
6651                    "os", "tags", "nics", "vcpus"]:
6652         if not hasattr(self.op, attr):
6653           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6654                                      attr)
6655       iname = self.cfg.ExpandInstanceName(self.op.name)
6656       if iname is not None:
6657         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6658                                    iname)
6659       if not isinstance(self.op.nics, list):
6660         raise errors.OpPrereqError("Invalid parameter 'nics'")
6661       for row in self.op.nics:
6662         if (not isinstance(row, dict) or
6663             "mac" not in row or
6664             "ip" not in row or
6665             "bridge" not in row):
6666           raise errors.OpPrereqError("Invalid contents of the"
6667                                      " 'nics' parameter")
6668       if not isinstance(self.op.disks, list):
6669         raise errors.OpPrereqError("Invalid parameter 'disks'")
6670       for row in self.op.disks:
6671         if (not isinstance(row, dict) or
6672             "size" not in row or
6673             not isinstance(row["size"], int) or
6674             "mode" not in row or
6675             row["mode"] not in ['r', 'w']):
6676           raise errors.OpPrereqError("Invalid contents of the"
6677                                      " 'disks' parameter")
6678       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6679         self.op.hypervisor = self.cfg.GetHypervisorType()
6680     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6681       if not hasattr(self.op, "name"):
6682         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6683       fname = self.cfg.ExpandInstanceName(self.op.name)
6684       if fname is None:
6685         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6686                                    self.op.name)
6687       self.op.name = fname
6688       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6689     else:
6690       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6691                                  self.op.mode)
6692
6693     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6694       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6695         raise errors.OpPrereqError("Missing allocator name")
6696     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6697       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6698                                  self.op.direction)
6699
6700   def Exec(self, feedback_fn):
6701     """Run the allocator test.
6702
6703     """
6704     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6705       ial = IAllocator(self,
6706                        mode=self.op.mode,
6707                        name=self.op.name,
6708                        mem_size=self.op.mem_size,
6709                        disks=self.op.disks,
6710                        disk_template=self.op.disk_template,
6711                        os=self.op.os,
6712                        tags=self.op.tags,
6713                        nics=self.op.nics,
6714                        vcpus=self.op.vcpus,
6715                        hypervisor=self.op.hypervisor,
6716                        )
6717     else:
6718       ial = IAllocator(self,
6719                        mode=self.op.mode,
6720                        name=self.op.name,
6721                        relocate_from=list(self.relocate_from),
6722                        )
6723
6724     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6725       result = ial.in_text
6726     else:
6727       ial.Run(self.op.allocator, validate=False)
6728       result = ial.out_text
6729     return result