cmdlib: check node stats in prereqs
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = lu.cfg.GetInstanceList()
396   return utils.NiceSort(wanted)
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _CheckBooleanOpField(op, name):
419   """Validates boolean opcode parameters.
420
421   This will ensure that an opcode parameter is either a boolean value,
422   or None (but that it always exists).
423
424   """
425   val = getattr(op, name, None)
426   if not (val is None or isinstance(val, bool)):
427     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428                                (name, str(val)))
429   setattr(op, name, val)
430
431
432 def _CheckNodeOnline(lu, node):
433   """Ensure that a given node is online.
434
435   @param lu: the LU on behalf of which we make the check
436   @param node: the node to check
437   @raise errors.OpPrereqError: if the nodes is offline
438
439   """
440   if lu.cfg.GetNodeInfo(node).offline:
441     raise errors.OpPrereqError("Can't use offline node %s" % node)
442
443
444 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445                           memory, vcpus, nics):
446   """Builds instance related env variables for hooks
447
448   This builds the hook environment from individual variables.
449
450   @type name: string
451   @param name: the name of the instance
452   @type primary_node: string
453   @param primary_node: the name of the instance's primary node
454   @type secondary_nodes: list
455   @param secondary_nodes: list of secondary nodes as strings
456   @type os_type: string
457   @param os_type: the name of the instance's OS
458   @type status: string
459   @param status: the desired status of the instances
460   @type memory: string
461   @param memory: the memory size of the instance
462   @type vcpus: string
463   @param vcpus: the count of VCPUs the instance has
464   @type nics: list
465   @param nics: list of tuples (ip, bridge, mac) representing
466       the NICs the instance  has
467   @rtype: dict
468   @return: the hook environment for this instance
469
470   """
471   env = {
472     "OP_TARGET": name,
473     "INSTANCE_NAME": name,
474     "INSTANCE_PRIMARY": primary_node,
475     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
476     "INSTANCE_OS_TYPE": os_type,
477     "INSTANCE_STATUS": status,
478     "INSTANCE_MEMORY": memory,
479     "INSTANCE_VCPUS": vcpus,
480   }
481
482   if nics:
483     nic_count = len(nics)
484     for idx, (ip, bridge, mac) in enumerate(nics):
485       if ip is None:
486         ip = ""
487       env["INSTANCE_NIC%d_IP" % idx] = ip
488       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
489       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
490   else:
491     nic_count = 0
492
493   env["INSTANCE_NIC_COUNT"] = nic_count
494
495   return env
496
497
498 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
499   """Builds instance related env variables for hooks from an object.
500
501   @type lu: L{LogicalUnit}
502   @param lu: the logical unit on whose behalf we execute
503   @type instance: L{objects.Instance}
504   @param instance: the instance for which we should build the
505       environment
506   @type override: dict
507   @param override: dictionary with key/values that will override
508       our values
509   @rtype: dict
510   @return: the hook environment dictionary
511
512   """
513   bep = lu.cfg.GetClusterInfo().FillBE(instance)
514   args = {
515     'name': instance.name,
516     'primary_node': instance.primary_node,
517     'secondary_nodes': instance.secondary_nodes,
518     'os_type': instance.os,
519     'status': instance.os,
520     'memory': bep[constants.BE_MEMORY],
521     'vcpus': bep[constants.BE_VCPUS],
522     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
523   }
524   if override:
525     args.update(override)
526   return _BuildInstanceHookEnv(**args)
527
528
529 def _AdjustCandidatePool(lu):
530   """Adjust the candidate pool after node operations.
531
532   """
533   mod_list = lu.cfg.MaintainCandidatePool()
534   if mod_list:
535     lu.LogInfo("Promoted nodes to master candidate role: %s",
536                ", ".join(mod_list))
537     for name in mod_list:
538       lu.context.ReaddNode(name)
539   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
540   if mc_now > mc_max:
541     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
542                (mc_now, mc_max))
543
544
545 def _CheckInstanceBridgesExist(lu, instance):
546   """Check that the brigdes needed by an instance exist.
547
548   """
549   # check bridges existance
550   brlist = [nic.bridge for nic in instance.nics]
551   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
552   result.Raise()
553   if not result.data:
554     raise errors.OpPrereqError("One or more target bridges %s does not"
555                                " exist on destination node '%s'" %
556                                (brlist, instance.primary_node))
557
558
559 class LUDestroyCluster(NoHooksLU):
560   """Logical unit for destroying the cluster.
561
562   """
563   _OP_REQP = []
564
565   def CheckPrereq(self):
566     """Check prerequisites.
567
568     This checks whether the cluster is empty.
569
570     Any errors are signalled by raising errors.OpPrereqError.
571
572     """
573     master = self.cfg.GetMasterNode()
574
575     nodelist = self.cfg.GetNodeList()
576     if len(nodelist) != 1 or nodelist[0] != master:
577       raise errors.OpPrereqError("There are still %d node(s) in"
578                                  " this cluster." % (len(nodelist) - 1))
579     instancelist = self.cfg.GetInstanceList()
580     if instancelist:
581       raise errors.OpPrereqError("There are still %d instance(s) in"
582                                  " this cluster." % len(instancelist))
583
584   def Exec(self, feedback_fn):
585     """Destroys the cluster.
586
587     """
588     master = self.cfg.GetMasterNode()
589     result = self.rpc.call_node_stop_master(master, False)
590     result.Raise()
591     if not result.data:
592       raise errors.OpExecError("Could not disable the master role")
593     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
594     utils.CreateBackup(priv_key)
595     utils.CreateBackup(pub_key)
596     return master
597
598
599 class LUVerifyCluster(LogicalUnit):
600   """Verifies the cluster status.
601
602   """
603   HPATH = "cluster-verify"
604   HTYPE = constants.HTYPE_CLUSTER
605   _OP_REQP = ["skip_checks"]
606   REQ_BGL = False
607
608   def ExpandNames(self):
609     self.needed_locks = {
610       locking.LEVEL_NODE: locking.ALL_SET,
611       locking.LEVEL_INSTANCE: locking.ALL_SET,
612     }
613     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
614
615   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
616                   node_result, feedback_fn, master_files):
617     """Run multiple tests against a node.
618
619     Test list:
620
621       - compares ganeti version
622       - checks vg existance and size > 20G
623       - checks config file checksum
624       - checks ssh to other nodes
625
626     @type nodeinfo: L{objects.Node}
627     @param nodeinfo: the node to check
628     @param file_list: required list of files
629     @param local_cksum: dictionary of local files and their checksums
630     @param node_result: the results from the node
631     @param feedback_fn: function used to accumulate results
632     @param master_files: list of files that only masters should have
633
634     """
635     node = nodeinfo.name
636
637     # main result, node_result should be a non-empty dict
638     if not node_result or not isinstance(node_result, dict):
639       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
640       return True
641
642     # compares ganeti version
643     local_version = constants.PROTOCOL_VERSION
644     remote_version = node_result.get('version', None)
645     if not remote_version:
646       feedback_fn("  - ERROR: connection to %s failed" % (node))
647       return True
648
649     if local_version != remote_version:
650       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
651                       (local_version, node, remote_version))
652       return True
653
654     # checks vg existance and size > 20G
655
656     bad = False
657     vglist = node_result.get(constants.NV_VGLIST, None)
658     if not vglist:
659       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
660                       (node,))
661       bad = True
662     else:
663       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
664                                             constants.MIN_VG_SIZE)
665       if vgstatus:
666         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
667         bad = True
668
669     # checks config file checksum
670
671     remote_cksum = node_result.get(constants.NV_FILELIST, None)
672     if not isinstance(remote_cksum, dict):
673       bad = True
674       feedback_fn("  - ERROR: node hasn't returned file checksum data")
675     else:
676       for file_name in file_list:
677         node_is_mc = nodeinfo.master_candidate
678         must_have_file = file_name not in master_files
679         if file_name not in remote_cksum:
680           if node_is_mc or must_have_file:
681             bad = True
682             feedback_fn("  - ERROR: file '%s' missing" % file_name)
683         elif remote_cksum[file_name] != local_cksum[file_name]:
684           if node_is_mc or must_have_file:
685             bad = True
686             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
687           else:
688             # not candidate and this is not a must-have file
689             bad = True
690             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
691                         " '%s'" % file_name)
692         else:
693           # all good, except non-master/non-must have combination
694           if not node_is_mc and not must_have_file:
695             feedback_fn("  - ERROR: file '%s' should not exist on non master"
696                         " candidates" % file_name)
697
698     # checks ssh to any
699
700     if constants.NV_NODELIST not in node_result:
701       bad = True
702       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
703     else:
704       if node_result[constants.NV_NODELIST]:
705         bad = True
706         for node in node_result[constants.NV_NODELIST]:
707           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
708                           (node, node_result[constants.NV_NODELIST][node]))
709
710     if constants.NV_NODENETTEST not in node_result:
711       bad = True
712       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
713     else:
714       if node_result[constants.NV_NODENETTEST]:
715         bad = True
716         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
717         for node in nlist:
718           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
719                           (node, node_result[constants.NV_NODENETTEST][node]))
720
721     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
722     if isinstance(hyp_result, dict):
723       for hv_name, hv_result in hyp_result.iteritems():
724         if hv_result is not None:
725           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
726                       (hv_name, hv_result))
727     return bad
728
729   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
730                       node_instance, feedback_fn):
731     """Verify an instance.
732
733     This function checks to see if the required block devices are
734     available on the instance's node.
735
736     """
737     bad = False
738
739     node_current = instanceconfig.primary_node
740
741     node_vol_should = {}
742     instanceconfig.MapLVsByNode(node_vol_should)
743
744     for node in node_vol_should:
745       for volume in node_vol_should[node]:
746         if node not in node_vol_is or volume not in node_vol_is[node]:
747           feedback_fn("  - ERROR: volume %s missing on node %s" %
748                           (volume, node))
749           bad = True
750
751     if not instanceconfig.status == 'down':
752       if (node_current not in node_instance or
753           not instance in node_instance[node_current]):
754         feedback_fn("  - ERROR: instance %s not running on node %s" %
755                         (instance, node_current))
756         bad = True
757
758     for node in node_instance:
759       if (not node == node_current):
760         if instance in node_instance[node]:
761           feedback_fn("  - ERROR: instance %s should not run on node %s" %
762                           (instance, node))
763           bad = True
764
765     return bad
766
767   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
768     """Verify if there are any unknown volumes in the cluster.
769
770     The .os, .swap and backup volumes are ignored. All other volumes are
771     reported as unknown.
772
773     """
774     bad = False
775
776     for node in node_vol_is:
777       for volume in node_vol_is[node]:
778         if node not in node_vol_should or volume not in node_vol_should[node]:
779           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
780                       (volume, node))
781           bad = True
782     return bad
783
784   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
785     """Verify the list of running instances.
786
787     This checks what instances are running but unknown to the cluster.
788
789     """
790     bad = False
791     for node in node_instance:
792       for runninginstance in node_instance[node]:
793         if runninginstance not in instancelist:
794           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
795                           (runninginstance, node))
796           bad = True
797     return bad
798
799   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
800     """Verify N+1 Memory Resilience.
801
802     Check that if one single node dies we can still start all the instances it
803     was primary for.
804
805     """
806     bad = False
807
808     for node, nodeinfo in node_info.iteritems():
809       # This code checks that every node which is now listed as secondary has
810       # enough memory to host all instances it is supposed to should a single
811       # other node in the cluster fail.
812       # FIXME: not ready for failover to an arbitrary node
813       # FIXME: does not support file-backed instances
814       # WARNING: we currently take into account down instances as well as up
815       # ones, considering that even if they're down someone might want to start
816       # them even in the event of a node failure.
817       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
818         needed_mem = 0
819         for instance in instances:
820           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
821           if bep[constants.BE_AUTO_BALANCE]:
822             needed_mem += bep[constants.BE_MEMORY]
823         if nodeinfo['mfree'] < needed_mem:
824           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
825                       " failovers should node %s fail" % (node, prinode))
826           bad = True
827     return bad
828
829   def CheckPrereq(self):
830     """Check prerequisites.
831
832     Transform the list of checks we're going to skip into a set and check that
833     all its members are valid.
834
835     """
836     self.skip_set = frozenset(self.op.skip_checks)
837     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
838       raise errors.OpPrereqError("Invalid checks to be skipped specified")
839
840   def BuildHooksEnv(self):
841     """Build hooks env.
842
843     Cluster-Verify hooks just rone in the post phase and their failure makes
844     the output be logged in the verify output and the verification to fail.
845
846     """
847     all_nodes = self.cfg.GetNodeList()
848     # TODO: populate the environment with useful information for verify hooks
849     env = {}
850     return env, [], all_nodes
851
852   def Exec(self, feedback_fn):
853     """Verify integrity of cluster, performing various test on nodes.
854
855     """
856     bad = False
857     feedback_fn("* Verifying global settings")
858     for msg in self.cfg.VerifyConfig():
859       feedback_fn("  - ERROR: %s" % msg)
860
861     vg_name = self.cfg.GetVGName()
862     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
863     nodelist = utils.NiceSort(self.cfg.GetNodeList())
864     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
865     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
866     i_non_redundant = [] # Non redundant instances
867     i_non_a_balanced = [] # Non auto-balanced instances
868     node_volume = {}
869     node_instance = {}
870     node_info = {}
871     instance_cfg = {}
872
873     # FIXME: verify OS list
874     # do local checksums
875     master_files = [constants.CLUSTER_CONF_FILE]
876
877     file_names = ssconf.SimpleStore().GetFileList()
878     file_names.append(constants.SSL_CERT_FILE)
879     file_names.extend(master_files)
880
881     local_checksums = utils.FingerprintFiles(file_names)
882
883     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
884     node_verify_param = {
885       constants.NV_FILELIST: file_names,
886       constants.NV_NODELIST: nodelist,
887       constants.NV_HYPERVISOR: hypervisors,
888       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
889                                   node.secondary_ip) for node in nodeinfo],
890       constants.NV_LVLIST: vg_name,
891       constants.NV_INSTANCELIST: hypervisors,
892       constants.NV_VGLIST: None,
893       constants.NV_VERSION: None,
894       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
895       }
896     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
897                                            self.cfg.GetClusterName())
898
899     cluster = self.cfg.GetClusterInfo()
900     master_node = self.cfg.GetMasterNode()
901     for node_i in nodeinfo:
902       node = node_i.name
903       nresult = all_nvinfo[node].data
904
905       if node == master_node:
906         ntype = "master"
907       elif node_i.master_candidate:
908         ntype = "master candidate"
909       else:
910         ntype = "regular"
911       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
912
913       if all_nvinfo[node].failed or not isinstance(nresult, dict):
914         feedback_fn("  - ERROR: connection to %s failed" % (node,))
915         bad = True
916         continue
917
918       result = self._VerifyNode(node_i, file_names, local_checksums,
919                                 nresult, feedback_fn, master_files)
920       bad = bad or result
921
922       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
923       if isinstance(lvdata, basestring):
924         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
925                     (node, lvdata.encode('string_escape')))
926         bad = True
927         node_volume[node] = {}
928       elif not isinstance(lvdata, dict):
929         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
930         bad = True
931         continue
932       else:
933         node_volume[node] = lvdata
934
935       # node_instance
936       idata = nresult.get(constants.NV_INSTANCELIST, None)
937       if not isinstance(idata, list):
938         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
939                     (node,))
940         bad = True
941         continue
942
943       node_instance[node] = idata
944
945       # node_info
946       nodeinfo = nresult.get(constants.NV_HVINFO, None)
947       if not isinstance(nodeinfo, dict):
948         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
949         bad = True
950         continue
951
952       try:
953         node_info[node] = {
954           "mfree": int(nodeinfo['memory_free']),
955           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
956           "pinst": [],
957           "sinst": [],
958           # dictionary holding all instances this node is secondary for,
959           # grouped by their primary node. Each key is a cluster node, and each
960           # value is a list of instances which have the key as primary and the
961           # current node as secondary.  this is handy to calculate N+1 memory
962           # availability if you can only failover from a primary to its
963           # secondary.
964           "sinst-by-pnode": {},
965         }
966       except ValueError:
967         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
968         bad = True
969         continue
970
971     node_vol_should = {}
972
973     for instance in instancelist:
974       feedback_fn("* Verifying instance %s" % instance)
975       inst_config = self.cfg.GetInstanceInfo(instance)
976       result =  self._VerifyInstance(instance, inst_config, node_volume,
977                                      node_instance, feedback_fn)
978       bad = bad or result
979
980       inst_config.MapLVsByNode(node_vol_should)
981
982       instance_cfg[instance] = inst_config
983
984       pnode = inst_config.primary_node
985       if pnode in node_info:
986         node_info[pnode]['pinst'].append(instance)
987       else:
988         feedback_fn("  - ERROR: instance %s, connection to primary node"
989                     " %s failed" % (instance, pnode))
990         bad = True
991
992       # If the instance is non-redundant we cannot survive losing its primary
993       # node, so we are not N+1 compliant. On the other hand we have no disk
994       # templates with more than one secondary so that situation is not well
995       # supported either.
996       # FIXME: does not support file-backed instances
997       if len(inst_config.secondary_nodes) == 0:
998         i_non_redundant.append(instance)
999       elif len(inst_config.secondary_nodes) > 1:
1000         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1001                     % instance)
1002
1003       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1004         i_non_a_balanced.append(instance)
1005
1006       for snode in inst_config.secondary_nodes:
1007         if snode in node_info:
1008           node_info[snode]['sinst'].append(instance)
1009           if pnode not in node_info[snode]['sinst-by-pnode']:
1010             node_info[snode]['sinst-by-pnode'][pnode] = []
1011           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1012         else:
1013           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1014                       " %s failed" % (instance, snode))
1015
1016     feedback_fn("* Verifying orphan volumes")
1017     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1018                                        feedback_fn)
1019     bad = bad or result
1020
1021     feedback_fn("* Verifying remaining instances")
1022     result = self._VerifyOrphanInstances(instancelist, node_instance,
1023                                          feedback_fn)
1024     bad = bad or result
1025
1026     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1027       feedback_fn("* Verifying N+1 Memory redundancy")
1028       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1029       bad = bad or result
1030
1031     feedback_fn("* Other Notes")
1032     if i_non_redundant:
1033       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1034                   % len(i_non_redundant))
1035
1036     if i_non_a_balanced:
1037       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1038                   % len(i_non_a_balanced))
1039
1040     return not bad
1041
1042   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1043     """Analize the post-hooks' result
1044
1045     This method analyses the hook result, handles it, and sends some
1046     nicely-formatted feedback back to the user.
1047
1048     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1049         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1050     @param hooks_results: the results of the multi-node hooks rpc call
1051     @param feedback_fn: function used send feedback back to the caller
1052     @param lu_result: previous Exec result
1053     @return: the new Exec result, based on the previous result
1054         and hook results
1055
1056     """
1057     # We only really run POST phase hooks, and are only interested in
1058     # their results
1059     if phase == constants.HOOKS_PHASE_POST:
1060       # Used to change hooks' output to proper indentation
1061       indent_re = re.compile('^', re.M)
1062       feedback_fn("* Hooks Results")
1063       if not hooks_results:
1064         feedback_fn("  - ERROR: general communication failure")
1065         lu_result = 1
1066       else:
1067         for node_name in hooks_results:
1068           show_node_header = True
1069           res = hooks_results[node_name]
1070           if res.failed or res.data is False or not isinstance(res.data, list):
1071             feedback_fn("    Communication failure in hooks execution")
1072             lu_result = 1
1073             continue
1074           for script, hkr, output in res.data:
1075             if hkr == constants.HKR_FAIL:
1076               # The node header is only shown once, if there are
1077               # failing hooks on that node
1078               if show_node_header:
1079                 feedback_fn("  Node %s:" % node_name)
1080                 show_node_header = False
1081               feedback_fn("    ERROR: Script %s failed, output:" % script)
1082               output = indent_re.sub('      ', output)
1083               feedback_fn("%s" % output)
1084               lu_result = 1
1085
1086       return lu_result
1087
1088
1089 class LUVerifyDisks(NoHooksLU):
1090   """Verifies the cluster disks status.
1091
1092   """
1093   _OP_REQP = []
1094   REQ_BGL = False
1095
1096   def ExpandNames(self):
1097     self.needed_locks = {
1098       locking.LEVEL_NODE: locking.ALL_SET,
1099       locking.LEVEL_INSTANCE: locking.ALL_SET,
1100     }
1101     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1102
1103   def CheckPrereq(self):
1104     """Check prerequisites.
1105
1106     This has no prerequisites.
1107
1108     """
1109     pass
1110
1111   def Exec(self, feedback_fn):
1112     """Verify integrity of cluster disks.
1113
1114     """
1115     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1116
1117     vg_name = self.cfg.GetVGName()
1118     nodes = utils.NiceSort(self.cfg.GetNodeList())
1119     instances = [self.cfg.GetInstanceInfo(name)
1120                  for name in self.cfg.GetInstanceList()]
1121
1122     nv_dict = {}
1123     for inst in instances:
1124       inst_lvs = {}
1125       if (inst.status != "up" or
1126           inst.disk_template not in constants.DTS_NET_MIRROR):
1127         continue
1128       inst.MapLVsByNode(inst_lvs)
1129       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1130       for node, vol_list in inst_lvs.iteritems():
1131         for vol in vol_list:
1132           nv_dict[(node, vol)] = inst
1133
1134     if not nv_dict:
1135       return result
1136
1137     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1138
1139     to_act = set()
1140     for node in nodes:
1141       # node_volume
1142       lvs = node_lvs[node]
1143       if lvs.failed:
1144         self.LogWarning("Connection to node %s failed: %s" %
1145                         (node, lvs.data))
1146         continue
1147       lvs = lvs.data
1148       if isinstance(lvs, basestring):
1149         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1150         res_nlvm[node] = lvs
1151       elif not isinstance(lvs, dict):
1152         logging.warning("Connection to node %s failed or invalid data"
1153                         " returned", node)
1154         res_nodes.append(node)
1155         continue
1156
1157       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1158         inst = nv_dict.pop((node, lv_name), None)
1159         if (not lv_online and inst is not None
1160             and inst.name not in res_instances):
1161           res_instances.append(inst.name)
1162
1163     # any leftover items in nv_dict are missing LVs, let's arrange the
1164     # data better
1165     for key, inst in nv_dict.iteritems():
1166       if inst.name not in res_missing:
1167         res_missing[inst.name] = []
1168       res_missing[inst.name].append(key)
1169
1170     return result
1171
1172
1173 class LURenameCluster(LogicalUnit):
1174   """Rename the cluster.
1175
1176   """
1177   HPATH = "cluster-rename"
1178   HTYPE = constants.HTYPE_CLUSTER
1179   _OP_REQP = ["name"]
1180
1181   def BuildHooksEnv(self):
1182     """Build hooks env.
1183
1184     """
1185     env = {
1186       "OP_TARGET": self.cfg.GetClusterName(),
1187       "NEW_NAME": self.op.name,
1188       }
1189     mn = self.cfg.GetMasterNode()
1190     return env, [mn], [mn]
1191
1192   def CheckPrereq(self):
1193     """Verify that the passed name is a valid one.
1194
1195     """
1196     hostname = utils.HostInfo(self.op.name)
1197
1198     new_name = hostname.name
1199     self.ip = new_ip = hostname.ip
1200     old_name = self.cfg.GetClusterName()
1201     old_ip = self.cfg.GetMasterIP()
1202     if new_name == old_name and new_ip == old_ip:
1203       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1204                                  " cluster has changed")
1205     if new_ip != old_ip:
1206       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1207         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1208                                    " reachable on the network. Aborting." %
1209                                    new_ip)
1210
1211     self.op.name = new_name
1212
1213   def Exec(self, feedback_fn):
1214     """Rename the cluster.
1215
1216     """
1217     clustername = self.op.name
1218     ip = self.ip
1219
1220     # shutdown the master IP
1221     master = self.cfg.GetMasterNode()
1222     result = self.rpc.call_node_stop_master(master, False)
1223     if result.failed or not result.data:
1224       raise errors.OpExecError("Could not disable the master role")
1225
1226     try:
1227       cluster = self.cfg.GetClusterInfo()
1228       cluster.cluster_name = clustername
1229       cluster.master_ip = ip
1230       self.cfg.Update(cluster)
1231
1232       # update the known hosts file
1233       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1234       node_list = self.cfg.GetNodeList()
1235       try:
1236         node_list.remove(master)
1237       except ValueError:
1238         pass
1239       result = self.rpc.call_upload_file(node_list,
1240                                          constants.SSH_KNOWN_HOSTS_FILE)
1241       for to_node, to_result in result.iteritems():
1242         if to_result.failed or not to_result.data:
1243           logging.error("Copy of file %s to node %s failed", fname, to_node)
1244
1245     finally:
1246       result = self.rpc.call_node_start_master(master, False)
1247       if result.failed or not result.data:
1248         self.LogWarning("Could not re-enable the master role on"
1249                         " the master, please restart manually.")
1250
1251
1252 def _RecursiveCheckIfLVMBased(disk):
1253   """Check if the given disk or its children are lvm-based.
1254
1255   @type disk: L{objects.Disk}
1256   @param disk: the disk to check
1257   @rtype: booleean
1258   @return: boolean indicating whether a LD_LV dev_type was found or not
1259
1260   """
1261   if disk.children:
1262     for chdisk in disk.children:
1263       if _RecursiveCheckIfLVMBased(chdisk):
1264         return True
1265   return disk.dev_type == constants.LD_LV
1266
1267
1268 class LUSetClusterParams(LogicalUnit):
1269   """Change the parameters of the cluster.
1270
1271   """
1272   HPATH = "cluster-modify"
1273   HTYPE = constants.HTYPE_CLUSTER
1274   _OP_REQP = []
1275   REQ_BGL = False
1276
1277   def CheckParameters(self):
1278     """Check parameters
1279
1280     """
1281     if not hasattr(self.op, "candidate_pool_size"):
1282       self.op.candidate_pool_size = None
1283     if self.op.candidate_pool_size is not None:
1284       try:
1285         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1286       except ValueError, err:
1287         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1288                                    str(err))
1289       if self.op.candidate_pool_size < 1:
1290         raise errors.OpPrereqError("At least one master candidate needed")
1291
1292   def ExpandNames(self):
1293     # FIXME: in the future maybe other cluster params won't require checking on
1294     # all nodes to be modified.
1295     self.needed_locks = {
1296       locking.LEVEL_NODE: locking.ALL_SET,
1297     }
1298     self.share_locks[locking.LEVEL_NODE] = 1
1299
1300   def BuildHooksEnv(self):
1301     """Build hooks env.
1302
1303     """
1304     env = {
1305       "OP_TARGET": self.cfg.GetClusterName(),
1306       "NEW_VG_NAME": self.op.vg_name,
1307       }
1308     mn = self.cfg.GetMasterNode()
1309     return env, [mn], [mn]
1310
1311   def CheckPrereq(self):
1312     """Check prerequisites.
1313
1314     This checks whether the given params don't conflict and
1315     if the given volume group is valid.
1316
1317     """
1318     # FIXME: This only works because there is only one parameter that can be
1319     # changed or removed.
1320     if self.op.vg_name is not None and not self.op.vg_name:
1321       instances = self.cfg.GetAllInstancesInfo().values()
1322       for inst in instances:
1323         for disk in inst.disks:
1324           if _RecursiveCheckIfLVMBased(disk):
1325             raise errors.OpPrereqError("Cannot disable lvm storage while"
1326                                        " lvm-based instances exist")
1327
1328     node_list = self.acquired_locks[locking.LEVEL_NODE]
1329
1330     # if vg_name not None, checks given volume group on all nodes
1331     if self.op.vg_name:
1332       vglist = self.rpc.call_vg_list(node_list)
1333       for node in node_list:
1334         if vglist[node].failed:
1335           # ignoring down node
1336           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1337           continue
1338         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1339                                               self.op.vg_name,
1340                                               constants.MIN_VG_SIZE)
1341         if vgstatus:
1342           raise errors.OpPrereqError("Error on node '%s': %s" %
1343                                      (node, vgstatus))
1344
1345     self.cluster = cluster = self.cfg.GetClusterInfo()
1346     # validate beparams changes
1347     if self.op.beparams:
1348       utils.CheckBEParams(self.op.beparams)
1349       self.new_beparams = cluster.FillDict(
1350         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1351
1352     # hypervisor list/parameters
1353     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1354     if self.op.hvparams:
1355       if not isinstance(self.op.hvparams, dict):
1356         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1357       for hv_name, hv_dict in self.op.hvparams.items():
1358         if hv_name not in self.new_hvparams:
1359           self.new_hvparams[hv_name] = hv_dict
1360         else:
1361           self.new_hvparams[hv_name].update(hv_dict)
1362
1363     if self.op.enabled_hypervisors is not None:
1364       self.hv_list = self.op.enabled_hypervisors
1365     else:
1366       self.hv_list = cluster.enabled_hypervisors
1367
1368     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1369       # either the enabled list has changed, or the parameters have, validate
1370       for hv_name, hv_params in self.new_hvparams.items():
1371         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1372             (self.op.enabled_hypervisors and
1373              hv_name in self.op.enabled_hypervisors)):
1374           # either this is a new hypervisor, or its parameters have changed
1375           hv_class = hypervisor.GetHypervisor(hv_name)
1376           hv_class.CheckParameterSyntax(hv_params)
1377           _CheckHVParams(self, node_list, hv_name, hv_params)
1378
1379   def Exec(self, feedback_fn):
1380     """Change the parameters of the cluster.
1381
1382     """
1383     if self.op.vg_name is not None:
1384       if self.op.vg_name != self.cfg.GetVGName():
1385         self.cfg.SetVGName(self.op.vg_name)
1386       else:
1387         feedback_fn("Cluster LVM configuration already in desired"
1388                     " state, not changing")
1389     if self.op.hvparams:
1390       self.cluster.hvparams = self.new_hvparams
1391     if self.op.enabled_hypervisors is not None:
1392       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1393     if self.op.beparams:
1394       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1395     if self.op.candidate_pool_size is not None:
1396       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1397
1398     self.cfg.Update(self.cluster)
1399
1400     # we want to update nodes after the cluster so that if any errors
1401     # happen, we have recorded and saved the cluster info
1402     if self.op.candidate_pool_size is not None:
1403       _AdjustCandidatePool(self)
1404
1405
1406 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1407   """Sleep and poll for an instance's disk to sync.
1408
1409   """
1410   if not instance.disks:
1411     return True
1412
1413   if not oneshot:
1414     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1415
1416   node = instance.primary_node
1417
1418   for dev in instance.disks:
1419     lu.cfg.SetDiskID(dev, node)
1420
1421   retries = 0
1422   while True:
1423     max_time = 0
1424     done = True
1425     cumul_degraded = False
1426     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1427     if rstats.failed or not rstats.data:
1428       lu.LogWarning("Can't get any data from node %s", node)
1429       retries += 1
1430       if retries >= 10:
1431         raise errors.RemoteError("Can't contact node %s for mirror data,"
1432                                  " aborting." % node)
1433       time.sleep(6)
1434       continue
1435     rstats = rstats.data
1436     retries = 0
1437     for i in range(len(rstats)):
1438       mstat = rstats[i]
1439       if mstat is None:
1440         lu.LogWarning("Can't compute data for node %s/%s",
1441                            node, instance.disks[i].iv_name)
1442         continue
1443       # we ignore the ldisk parameter
1444       perc_done, est_time, is_degraded, _ = mstat
1445       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1446       if perc_done is not None:
1447         done = False
1448         if est_time is not None:
1449           rem_time = "%d estimated seconds remaining" % est_time
1450           max_time = est_time
1451         else:
1452           rem_time = "no time estimate"
1453         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1454                         (instance.disks[i].iv_name, perc_done, rem_time))
1455     if done or oneshot:
1456       break
1457
1458     time.sleep(min(60, max_time))
1459
1460   if done:
1461     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1462   return not cumul_degraded
1463
1464
1465 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1466   """Check that mirrors are not degraded.
1467
1468   The ldisk parameter, if True, will change the test from the
1469   is_degraded attribute (which represents overall non-ok status for
1470   the device(s)) to the ldisk (representing the local storage status).
1471
1472   """
1473   lu.cfg.SetDiskID(dev, node)
1474   if ldisk:
1475     idx = 6
1476   else:
1477     idx = 5
1478
1479   result = True
1480   if on_primary or dev.AssembleOnSecondary():
1481     rstats = lu.rpc.call_blockdev_find(node, dev)
1482     if rstats.failed or not rstats.data:
1483       logging.warning("Node %s: disk degraded, not found or node down", node)
1484       result = False
1485     else:
1486       result = result and (not rstats.data[idx])
1487   if dev.children:
1488     for child in dev.children:
1489       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1490
1491   return result
1492
1493
1494 class LUDiagnoseOS(NoHooksLU):
1495   """Logical unit for OS diagnose/query.
1496
1497   """
1498   _OP_REQP = ["output_fields", "names"]
1499   REQ_BGL = False
1500   _FIELDS_STATIC = utils.FieldSet()
1501   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1502
1503   def ExpandNames(self):
1504     if self.op.names:
1505       raise errors.OpPrereqError("Selective OS query not supported")
1506
1507     _CheckOutputFields(static=self._FIELDS_STATIC,
1508                        dynamic=self._FIELDS_DYNAMIC,
1509                        selected=self.op.output_fields)
1510
1511     # Lock all nodes, in shared mode
1512     self.needed_locks = {}
1513     self.share_locks[locking.LEVEL_NODE] = 1
1514     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1515
1516   def CheckPrereq(self):
1517     """Check prerequisites.
1518
1519     """
1520
1521   @staticmethod
1522   def _DiagnoseByOS(node_list, rlist):
1523     """Remaps a per-node return list into an a per-os per-node dictionary
1524
1525     @param node_list: a list with the names of all nodes
1526     @param rlist: a map with node names as keys and OS objects as values
1527
1528     @rtype: dict
1529     @returns: a dictionary with osnames as keys and as value another map, with
1530         nodes as keys and list of OS objects as values, eg::
1531
1532           {"debian-etch": {"node1": [<object>,...],
1533                            "node2": [<object>,]}
1534           }
1535
1536     """
1537     all_os = {}
1538     for node_name, nr in rlist.iteritems():
1539       if nr.failed or not nr.data:
1540         continue
1541       for os_obj in nr.data:
1542         if os_obj.name not in all_os:
1543           # build a list of nodes for this os containing empty lists
1544           # for each node in node_list
1545           all_os[os_obj.name] = {}
1546           for nname in node_list:
1547             all_os[os_obj.name][nname] = []
1548         all_os[os_obj.name][node_name].append(os_obj)
1549     return all_os
1550
1551   def Exec(self, feedback_fn):
1552     """Compute the list of OSes.
1553
1554     """
1555     node_list = self.acquired_locks[locking.LEVEL_NODE]
1556     node_data = self.rpc.call_os_diagnose(node_list)
1557     if node_data == False:
1558       raise errors.OpExecError("Can't gather the list of OSes")
1559     pol = self._DiagnoseByOS(node_list, node_data)
1560     output = []
1561     for os_name, os_data in pol.iteritems():
1562       row = []
1563       for field in self.op.output_fields:
1564         if field == "name":
1565           val = os_name
1566         elif field == "valid":
1567           val = utils.all([osl and osl[0] for osl in os_data.values()])
1568         elif field == "node_status":
1569           val = {}
1570           for node_name, nos_list in os_data.iteritems():
1571             val[node_name] = [(v.status, v.path) for v in nos_list]
1572         else:
1573           raise errors.ParameterError(field)
1574         row.append(val)
1575       output.append(row)
1576
1577     return output
1578
1579
1580 class LURemoveNode(LogicalUnit):
1581   """Logical unit for removing a node.
1582
1583   """
1584   HPATH = "node-remove"
1585   HTYPE = constants.HTYPE_NODE
1586   _OP_REQP = ["node_name"]
1587
1588   def BuildHooksEnv(self):
1589     """Build hooks env.
1590
1591     This doesn't run on the target node in the pre phase as a failed
1592     node would then be impossible to remove.
1593
1594     """
1595     env = {
1596       "OP_TARGET": self.op.node_name,
1597       "NODE_NAME": self.op.node_name,
1598       }
1599     all_nodes = self.cfg.GetNodeList()
1600     all_nodes.remove(self.op.node_name)
1601     return env, all_nodes, all_nodes
1602
1603   def CheckPrereq(self):
1604     """Check prerequisites.
1605
1606     This checks:
1607      - the node exists in the configuration
1608      - it does not have primary or secondary instances
1609      - it's not the master
1610
1611     Any errors are signalled by raising errors.OpPrereqError.
1612
1613     """
1614     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1615     if node is None:
1616       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1617
1618     instance_list = self.cfg.GetInstanceList()
1619
1620     masternode = self.cfg.GetMasterNode()
1621     if node.name == masternode:
1622       raise errors.OpPrereqError("Node is the master node,"
1623                                  " you need to failover first.")
1624
1625     for instance_name in instance_list:
1626       instance = self.cfg.GetInstanceInfo(instance_name)
1627       if node.name == instance.primary_node:
1628         raise errors.OpPrereqError("Instance %s still running on the node,"
1629                                    " please remove first." % instance_name)
1630       if node.name in instance.secondary_nodes:
1631         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1632                                    " please remove first." % instance_name)
1633     self.op.node_name = node.name
1634     self.node = node
1635
1636   def Exec(self, feedback_fn):
1637     """Removes the node from the cluster.
1638
1639     """
1640     node = self.node
1641     logging.info("Stopping the node daemon and removing configs from node %s",
1642                  node.name)
1643
1644     self.context.RemoveNode(node.name)
1645
1646     self.rpc.call_node_leave_cluster(node.name)
1647
1648     # Promote nodes to master candidate as needed
1649     _AdjustCandidatePool(self)
1650
1651
1652 class LUQueryNodes(NoHooksLU):
1653   """Logical unit for querying nodes.
1654
1655   """
1656   _OP_REQP = ["output_fields", "names"]
1657   REQ_BGL = False
1658   _FIELDS_DYNAMIC = utils.FieldSet(
1659     "dtotal", "dfree",
1660     "mtotal", "mnode", "mfree",
1661     "bootid",
1662     "ctotal",
1663     )
1664
1665   _FIELDS_STATIC = utils.FieldSet(
1666     "name", "pinst_cnt", "sinst_cnt",
1667     "pinst_list", "sinst_list",
1668     "pip", "sip", "tags",
1669     "serial_no",
1670     "master_candidate",
1671     "master",
1672     "offline",
1673     )
1674
1675   def ExpandNames(self):
1676     _CheckOutputFields(static=self._FIELDS_STATIC,
1677                        dynamic=self._FIELDS_DYNAMIC,
1678                        selected=self.op.output_fields)
1679
1680     self.needed_locks = {}
1681     self.share_locks[locking.LEVEL_NODE] = 1
1682
1683     if self.op.names:
1684       self.wanted = _GetWantedNodes(self, self.op.names)
1685     else:
1686       self.wanted = locking.ALL_SET
1687
1688     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1689     if self.do_locking:
1690       # if we don't request only static fields, we need to lock the nodes
1691       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1692
1693
1694   def CheckPrereq(self):
1695     """Check prerequisites.
1696
1697     """
1698     # The validation of the node list is done in the _GetWantedNodes,
1699     # if non empty, and if empty, there's no validation to do
1700     pass
1701
1702   def Exec(self, feedback_fn):
1703     """Computes the list of nodes and their attributes.
1704
1705     """
1706     all_info = self.cfg.GetAllNodesInfo()
1707     if self.do_locking:
1708       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1709     elif self.wanted != locking.ALL_SET:
1710       nodenames = self.wanted
1711       missing = set(nodenames).difference(all_info.keys())
1712       if missing:
1713         raise errors.OpExecError(
1714           "Some nodes were removed before retrieving their data: %s" % missing)
1715     else:
1716       nodenames = all_info.keys()
1717
1718     nodenames = utils.NiceSort(nodenames)
1719     nodelist = [all_info[name] for name in nodenames]
1720
1721     # begin data gathering
1722
1723     if self.do_locking:
1724       live_data = {}
1725       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1726                                           self.cfg.GetHypervisorType())
1727       for name in nodenames:
1728         nodeinfo = node_data[name]
1729         if not nodeinfo.failed and nodeinfo.data:
1730           nodeinfo = nodeinfo.data
1731           fn = utils.TryConvert
1732           live_data[name] = {
1733             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1734             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1735             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1736             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1737             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1738             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1739             "bootid": nodeinfo.get('bootid', None),
1740             }
1741         else:
1742           live_data[name] = {}
1743     else:
1744       live_data = dict.fromkeys(nodenames, {})
1745
1746     node_to_primary = dict([(name, set()) for name in nodenames])
1747     node_to_secondary = dict([(name, set()) for name in nodenames])
1748
1749     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1750                              "sinst_cnt", "sinst_list"))
1751     if inst_fields & frozenset(self.op.output_fields):
1752       instancelist = self.cfg.GetInstanceList()
1753
1754       for instance_name in instancelist:
1755         inst = self.cfg.GetInstanceInfo(instance_name)
1756         if inst.primary_node in node_to_primary:
1757           node_to_primary[inst.primary_node].add(inst.name)
1758         for secnode in inst.secondary_nodes:
1759           if secnode in node_to_secondary:
1760             node_to_secondary[secnode].add(inst.name)
1761
1762     master_node = self.cfg.GetMasterNode()
1763
1764     # end data gathering
1765
1766     output = []
1767     for node in nodelist:
1768       node_output = []
1769       for field in self.op.output_fields:
1770         if field == "name":
1771           val = node.name
1772         elif field == "pinst_list":
1773           val = list(node_to_primary[node.name])
1774         elif field == "sinst_list":
1775           val = list(node_to_secondary[node.name])
1776         elif field == "pinst_cnt":
1777           val = len(node_to_primary[node.name])
1778         elif field == "sinst_cnt":
1779           val = len(node_to_secondary[node.name])
1780         elif field == "pip":
1781           val = node.primary_ip
1782         elif field == "sip":
1783           val = node.secondary_ip
1784         elif field == "tags":
1785           val = list(node.GetTags())
1786         elif field == "serial_no":
1787           val = node.serial_no
1788         elif field == "master_candidate":
1789           val = node.master_candidate
1790         elif field == "master":
1791           val = node.name == master_node
1792         elif field == "offline":
1793           val = node.offline
1794         elif self._FIELDS_DYNAMIC.Matches(field):
1795           val = live_data[node.name].get(field, None)
1796         else:
1797           raise errors.ParameterError(field)
1798         node_output.append(val)
1799       output.append(node_output)
1800
1801     return output
1802
1803
1804 class LUQueryNodeVolumes(NoHooksLU):
1805   """Logical unit for getting volumes on node(s).
1806
1807   """
1808   _OP_REQP = ["nodes", "output_fields"]
1809   REQ_BGL = False
1810   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1811   _FIELDS_STATIC = utils.FieldSet("node")
1812
1813   def ExpandNames(self):
1814     _CheckOutputFields(static=self._FIELDS_STATIC,
1815                        dynamic=self._FIELDS_DYNAMIC,
1816                        selected=self.op.output_fields)
1817
1818     self.needed_locks = {}
1819     self.share_locks[locking.LEVEL_NODE] = 1
1820     if not self.op.nodes:
1821       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1822     else:
1823       self.needed_locks[locking.LEVEL_NODE] = \
1824         _GetWantedNodes(self, self.op.nodes)
1825
1826   def CheckPrereq(self):
1827     """Check prerequisites.
1828
1829     This checks that the fields required are valid output fields.
1830
1831     """
1832     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1833
1834   def Exec(self, feedback_fn):
1835     """Computes the list of nodes and their attributes.
1836
1837     """
1838     nodenames = self.nodes
1839     volumes = self.rpc.call_node_volumes(nodenames)
1840
1841     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1842              in self.cfg.GetInstanceList()]
1843
1844     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1845
1846     output = []
1847     for node in nodenames:
1848       if node not in volumes or volumes[node].failed or not volumes[node].data:
1849         continue
1850
1851       node_vols = volumes[node].data[:]
1852       node_vols.sort(key=lambda vol: vol['dev'])
1853
1854       for vol in node_vols:
1855         node_output = []
1856         for field in self.op.output_fields:
1857           if field == "node":
1858             val = node
1859           elif field == "phys":
1860             val = vol['dev']
1861           elif field == "vg":
1862             val = vol['vg']
1863           elif field == "name":
1864             val = vol['name']
1865           elif field == "size":
1866             val = int(float(vol['size']))
1867           elif field == "instance":
1868             for inst in ilist:
1869               if node not in lv_by_node[inst]:
1870                 continue
1871               if vol['name'] in lv_by_node[inst][node]:
1872                 val = inst.name
1873                 break
1874             else:
1875               val = '-'
1876           else:
1877             raise errors.ParameterError(field)
1878           node_output.append(str(val))
1879
1880         output.append(node_output)
1881
1882     return output
1883
1884
1885 class LUAddNode(LogicalUnit):
1886   """Logical unit for adding node to the cluster.
1887
1888   """
1889   HPATH = "node-add"
1890   HTYPE = constants.HTYPE_NODE
1891   _OP_REQP = ["node_name"]
1892
1893   def BuildHooksEnv(self):
1894     """Build hooks env.
1895
1896     This will run on all nodes before, and on all nodes + the new node after.
1897
1898     """
1899     env = {
1900       "OP_TARGET": self.op.node_name,
1901       "NODE_NAME": self.op.node_name,
1902       "NODE_PIP": self.op.primary_ip,
1903       "NODE_SIP": self.op.secondary_ip,
1904       }
1905     nodes_0 = self.cfg.GetNodeList()
1906     nodes_1 = nodes_0 + [self.op.node_name, ]
1907     return env, nodes_0, nodes_1
1908
1909   def CheckPrereq(self):
1910     """Check prerequisites.
1911
1912     This checks:
1913      - the new node is not already in the config
1914      - it is resolvable
1915      - its parameters (single/dual homed) matches the cluster
1916
1917     Any errors are signalled by raising errors.OpPrereqError.
1918
1919     """
1920     node_name = self.op.node_name
1921     cfg = self.cfg
1922
1923     dns_data = utils.HostInfo(node_name)
1924
1925     node = dns_data.name
1926     primary_ip = self.op.primary_ip = dns_data.ip
1927     secondary_ip = getattr(self.op, "secondary_ip", None)
1928     if secondary_ip is None:
1929       secondary_ip = primary_ip
1930     if not utils.IsValidIP(secondary_ip):
1931       raise errors.OpPrereqError("Invalid secondary IP given")
1932     self.op.secondary_ip = secondary_ip
1933
1934     node_list = cfg.GetNodeList()
1935     if not self.op.readd and node in node_list:
1936       raise errors.OpPrereqError("Node %s is already in the configuration" %
1937                                  node)
1938     elif self.op.readd and node not in node_list:
1939       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1940
1941     for existing_node_name in node_list:
1942       existing_node = cfg.GetNodeInfo(existing_node_name)
1943
1944       if self.op.readd and node == existing_node_name:
1945         if (existing_node.primary_ip != primary_ip or
1946             existing_node.secondary_ip != secondary_ip):
1947           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1948                                      " address configuration as before")
1949         continue
1950
1951       if (existing_node.primary_ip == primary_ip or
1952           existing_node.secondary_ip == primary_ip or
1953           existing_node.primary_ip == secondary_ip or
1954           existing_node.secondary_ip == secondary_ip):
1955         raise errors.OpPrereqError("New node ip address(es) conflict with"
1956                                    " existing node %s" % existing_node.name)
1957
1958     # check that the type of the node (single versus dual homed) is the
1959     # same as for the master
1960     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1961     master_singlehomed = myself.secondary_ip == myself.primary_ip
1962     newbie_singlehomed = secondary_ip == primary_ip
1963     if master_singlehomed != newbie_singlehomed:
1964       if master_singlehomed:
1965         raise errors.OpPrereqError("The master has no private ip but the"
1966                                    " new node has one")
1967       else:
1968         raise errors.OpPrereqError("The master has a private ip but the"
1969                                    " new node doesn't have one")
1970
1971     # checks reachablity
1972     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1973       raise errors.OpPrereqError("Node not reachable by ping")
1974
1975     if not newbie_singlehomed:
1976       # check reachability from my secondary ip to newbie's secondary ip
1977       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1978                            source=myself.secondary_ip):
1979         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1980                                    " based ping to noded port")
1981
1982     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1983     node_info = self.cfg.GetAllNodesInfo().values()
1984     mc_now, _ = self.cfg.GetMasterCandidateStats()
1985     master_candidate = mc_now < cp_size
1986
1987     self.new_node = objects.Node(name=node,
1988                                  primary_ip=primary_ip,
1989                                  secondary_ip=secondary_ip,
1990                                  master_candidate=master_candidate,
1991                                  offline=False)
1992
1993   def Exec(self, feedback_fn):
1994     """Adds the new node to the cluster.
1995
1996     """
1997     new_node = self.new_node
1998     node = new_node.name
1999
2000     # check connectivity
2001     result = self.rpc.call_version([node])[node]
2002     result.Raise()
2003     if result.data:
2004       if constants.PROTOCOL_VERSION == result.data:
2005         logging.info("Communication to node %s fine, sw version %s match",
2006                      node, result.data)
2007       else:
2008         raise errors.OpExecError("Version mismatch master version %s,"
2009                                  " node version %s" %
2010                                  (constants.PROTOCOL_VERSION, result.data))
2011     else:
2012       raise errors.OpExecError("Cannot get version from the new node")
2013
2014     # setup ssh on node
2015     logging.info("Copy ssh key to node %s", node)
2016     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2017     keyarray = []
2018     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2019                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2020                 priv_key, pub_key]
2021
2022     for i in keyfiles:
2023       f = open(i, 'r')
2024       try:
2025         keyarray.append(f.read())
2026       finally:
2027         f.close()
2028
2029     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2030                                     keyarray[2],
2031                                     keyarray[3], keyarray[4], keyarray[5])
2032
2033     if result.failed or not result.data:
2034       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2035
2036     # Add node to our /etc/hosts, and add key to known_hosts
2037     utils.AddHostToEtcHosts(new_node.name)
2038
2039     if new_node.secondary_ip != new_node.primary_ip:
2040       result = self.rpc.call_node_has_ip_address(new_node.name,
2041                                                  new_node.secondary_ip)
2042       if result.failed or not result.data:
2043         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2044                                  " you gave (%s). Please fix and re-run this"
2045                                  " command." % new_node.secondary_ip)
2046
2047     node_verify_list = [self.cfg.GetMasterNode()]
2048     node_verify_param = {
2049       'nodelist': [node],
2050       # TODO: do a node-net-test as well?
2051     }
2052
2053     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2054                                        self.cfg.GetClusterName())
2055     for verifier in node_verify_list:
2056       if result[verifier].failed or not result[verifier].data:
2057         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2058                                  " for remote verification" % verifier)
2059       if result[verifier].data['nodelist']:
2060         for failed in result[verifier].data['nodelist']:
2061           feedback_fn("ssh/hostname verification failed %s -> %s" %
2062                       (verifier, result[verifier]['nodelist'][failed]))
2063         raise errors.OpExecError("ssh/hostname verification failed.")
2064
2065     # Distribute updated /etc/hosts and known_hosts to all nodes,
2066     # including the node just added
2067     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2068     dist_nodes = self.cfg.GetNodeList()
2069     if not self.op.readd:
2070       dist_nodes.append(node)
2071     if myself.name in dist_nodes:
2072       dist_nodes.remove(myself.name)
2073
2074     logging.debug("Copying hosts and known_hosts to all nodes")
2075     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2076       result = self.rpc.call_upload_file(dist_nodes, fname)
2077       for to_node, to_result in result.iteritems():
2078         if to_result.failed or not to_result.data:
2079           logging.error("Copy of file %s to node %s failed", fname, to_node)
2080
2081     to_copy = []
2082     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2083       to_copy.append(constants.VNC_PASSWORD_FILE)
2084     for fname in to_copy:
2085       result = self.rpc.call_upload_file([node], fname)
2086       if result[node].failed or not result[node]:
2087         logging.error("Could not copy file %s to node %s", fname, node)
2088
2089     if self.op.readd:
2090       self.context.ReaddNode(new_node)
2091     else:
2092       self.context.AddNode(new_node)
2093
2094
2095 class LUSetNodeParams(LogicalUnit):
2096   """Modifies the parameters of a node.
2097
2098   """
2099   HPATH = "node-modify"
2100   HTYPE = constants.HTYPE_NODE
2101   _OP_REQP = ["node_name"]
2102   REQ_BGL = False
2103
2104   def CheckArguments(self):
2105     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2106     if node_name is None:
2107       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2108     self.op.node_name = node_name
2109     if not hasattr(self.op, 'master_candidate'):
2110       raise errors.OpPrereqError("Please pass at least one modification")
2111     self.op.master_candidate = bool(self.op.master_candidate)
2112
2113   def ExpandNames(self):
2114     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2115
2116   def BuildHooksEnv(self):
2117     """Build hooks env.
2118
2119     This runs on the master node.
2120
2121     """
2122     env = {
2123       "OP_TARGET": self.op.node_name,
2124       "MASTER_CANDIDATE": str(self.op.master_candidate),
2125       }
2126     nl = [self.cfg.GetMasterNode(),
2127           self.op.node_name]
2128     return env, nl, nl
2129
2130   def CheckPrereq(self):
2131     """Check prerequisites.
2132
2133     This only checks the instance list against the existing names.
2134
2135     """
2136     force = self.force = self.op.force
2137
2138     if self.op.master_candidate == False:
2139       if self.op.node_name == self.cfg.GetMasterNode():
2140         raise errors.OpPrereqError("The master node has to be a"
2141                                    " master candidate")
2142       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2143       node_info = self.cfg.GetAllNodesInfo().values()
2144       num_candidates = len([node for node in node_info
2145                             if node.master_candidate])
2146       if num_candidates <= cp_size:
2147         msg = ("Not enough master candidates (desired"
2148                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2149         if force:
2150           self.LogWarning(msg)
2151         else:
2152           raise errors.OpPrereqError(msg)
2153
2154     return
2155
2156   def Exec(self, feedback_fn):
2157     """Modifies a node.
2158
2159     """
2160     node = self.cfg.GetNodeInfo(self.op.node_name)
2161
2162     result = []
2163
2164     if self.op.master_candidate is not None:
2165       node.master_candidate = self.op.master_candidate
2166       result.append(("master_candidate", str(self.op.master_candidate)))
2167       if self.op.master_candidate == False:
2168         rrc = self.rpc.call_node_demote_from_mc(node.name)
2169         if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2170             or len(rrc.data) != 2):
2171           self.LogWarning("Node rpc error: %s" % rrc.error)
2172         elif not rrc.data[0]:
2173           self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2174
2175     # this will trigger configuration file update, if needed
2176     self.cfg.Update(node)
2177     # this will trigger job queue propagation or cleanup
2178     if self.op.node_name != self.cfg.GetMasterNode():
2179       self.context.ReaddNode(node)
2180
2181     return result
2182
2183
2184 class LUQueryClusterInfo(NoHooksLU):
2185   """Query cluster configuration.
2186
2187   """
2188   _OP_REQP = []
2189   REQ_BGL = False
2190
2191   def ExpandNames(self):
2192     self.needed_locks = {}
2193
2194   def CheckPrereq(self):
2195     """No prerequsites needed for this LU.
2196
2197     """
2198     pass
2199
2200   def Exec(self, feedback_fn):
2201     """Return cluster config.
2202
2203     """
2204     cluster = self.cfg.GetClusterInfo()
2205     result = {
2206       "software_version": constants.RELEASE_VERSION,
2207       "protocol_version": constants.PROTOCOL_VERSION,
2208       "config_version": constants.CONFIG_VERSION,
2209       "os_api_version": constants.OS_API_VERSION,
2210       "export_version": constants.EXPORT_VERSION,
2211       "architecture": (platform.architecture()[0], platform.machine()),
2212       "name": cluster.cluster_name,
2213       "master": cluster.master_node,
2214       "default_hypervisor": cluster.default_hypervisor,
2215       "enabled_hypervisors": cluster.enabled_hypervisors,
2216       "hvparams": cluster.hvparams,
2217       "beparams": cluster.beparams,
2218       "candidate_pool_size": cluster.candidate_pool_size,
2219       }
2220
2221     return result
2222
2223
2224 class LUQueryConfigValues(NoHooksLU):
2225   """Return configuration values.
2226
2227   """
2228   _OP_REQP = []
2229   REQ_BGL = False
2230   _FIELDS_DYNAMIC = utils.FieldSet()
2231   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2232
2233   def ExpandNames(self):
2234     self.needed_locks = {}
2235
2236     _CheckOutputFields(static=self._FIELDS_STATIC,
2237                        dynamic=self._FIELDS_DYNAMIC,
2238                        selected=self.op.output_fields)
2239
2240   def CheckPrereq(self):
2241     """No prerequisites.
2242
2243     """
2244     pass
2245
2246   def Exec(self, feedback_fn):
2247     """Dump a representation of the cluster config to the standard output.
2248
2249     """
2250     values = []
2251     for field in self.op.output_fields:
2252       if field == "cluster_name":
2253         entry = self.cfg.GetClusterName()
2254       elif field == "master_node":
2255         entry = self.cfg.GetMasterNode()
2256       elif field == "drain_flag":
2257         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2258       else:
2259         raise errors.ParameterError(field)
2260       values.append(entry)
2261     return values
2262
2263
2264 class LUActivateInstanceDisks(NoHooksLU):
2265   """Bring up an instance's disks.
2266
2267   """
2268   _OP_REQP = ["instance_name"]
2269   REQ_BGL = False
2270
2271   def ExpandNames(self):
2272     self._ExpandAndLockInstance()
2273     self.needed_locks[locking.LEVEL_NODE] = []
2274     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2275
2276   def DeclareLocks(self, level):
2277     if level == locking.LEVEL_NODE:
2278       self._LockInstancesNodes()
2279
2280   def CheckPrereq(self):
2281     """Check prerequisites.
2282
2283     This checks that the instance is in the cluster.
2284
2285     """
2286     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2287     assert self.instance is not None, \
2288       "Cannot retrieve locked instance %s" % self.op.instance_name
2289     _CheckNodeOnline(self, instance.primary_node)
2290
2291   def Exec(self, feedback_fn):
2292     """Activate the disks.
2293
2294     """
2295     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2296     if not disks_ok:
2297       raise errors.OpExecError("Cannot activate block devices")
2298
2299     return disks_info
2300
2301
2302 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2303   """Prepare the block devices for an instance.
2304
2305   This sets up the block devices on all nodes.
2306
2307   @type lu: L{LogicalUnit}
2308   @param lu: the logical unit on whose behalf we execute
2309   @type instance: L{objects.Instance}
2310   @param instance: the instance for whose disks we assemble
2311   @type ignore_secondaries: boolean
2312   @param ignore_secondaries: if true, errors on secondary nodes
2313       won't result in an error return from the function
2314   @return: False if the operation failed, otherwise a list of
2315       (host, instance_visible_name, node_visible_name)
2316       with the mapping from node devices to instance devices
2317
2318   """
2319   device_info = []
2320   disks_ok = True
2321   iname = instance.name
2322   # With the two passes mechanism we try to reduce the window of
2323   # opportunity for the race condition of switching DRBD to primary
2324   # before handshaking occured, but we do not eliminate it
2325
2326   # The proper fix would be to wait (with some limits) until the
2327   # connection has been made and drbd transitions from WFConnection
2328   # into any other network-connected state (Connected, SyncTarget,
2329   # SyncSource, etc.)
2330
2331   # 1st pass, assemble on all nodes in secondary mode
2332   for inst_disk in instance.disks:
2333     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2334       lu.cfg.SetDiskID(node_disk, node)
2335       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2336       if result.failed or not result:
2337         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2338                            " (is_primary=False, pass=1)",
2339                            inst_disk.iv_name, node)
2340         if not ignore_secondaries:
2341           disks_ok = False
2342
2343   # FIXME: race condition on drbd migration to primary
2344
2345   # 2nd pass, do only the primary node
2346   for inst_disk in instance.disks:
2347     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2348       if node != instance.primary_node:
2349         continue
2350       lu.cfg.SetDiskID(node_disk, node)
2351       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2352       if result.failed or not result:
2353         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2354                            " (is_primary=True, pass=2)",
2355                            inst_disk.iv_name, node)
2356         disks_ok = False
2357     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2358
2359   # leave the disks configured for the primary node
2360   # this is a workaround that would be fixed better by
2361   # improving the logical/physical id handling
2362   for disk in instance.disks:
2363     lu.cfg.SetDiskID(disk, instance.primary_node)
2364
2365   return disks_ok, device_info
2366
2367
2368 def _StartInstanceDisks(lu, instance, force):
2369   """Start the disks of an instance.
2370
2371   """
2372   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2373                                            ignore_secondaries=force)
2374   if not disks_ok:
2375     _ShutdownInstanceDisks(lu, instance)
2376     if force is not None and not force:
2377       lu.proc.LogWarning("", hint="If the message above refers to a"
2378                          " secondary node,"
2379                          " you can retry the operation using '--force'.")
2380     raise errors.OpExecError("Disk consistency error")
2381
2382
2383 class LUDeactivateInstanceDisks(NoHooksLU):
2384   """Shutdown an instance's disks.
2385
2386   """
2387   _OP_REQP = ["instance_name"]
2388   REQ_BGL = False
2389
2390   def ExpandNames(self):
2391     self._ExpandAndLockInstance()
2392     self.needed_locks[locking.LEVEL_NODE] = []
2393     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2394
2395   def DeclareLocks(self, level):
2396     if level == locking.LEVEL_NODE:
2397       self._LockInstancesNodes()
2398
2399   def CheckPrereq(self):
2400     """Check prerequisites.
2401
2402     This checks that the instance is in the cluster.
2403
2404     """
2405     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2406     assert self.instance is not None, \
2407       "Cannot retrieve locked instance %s" % self.op.instance_name
2408
2409   def Exec(self, feedback_fn):
2410     """Deactivate the disks
2411
2412     """
2413     instance = self.instance
2414     _SafeShutdownInstanceDisks(self, instance)
2415
2416
2417 def _SafeShutdownInstanceDisks(lu, instance):
2418   """Shutdown block devices of an instance.
2419
2420   This function checks if an instance is running, before calling
2421   _ShutdownInstanceDisks.
2422
2423   """
2424   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2425                                       [instance.hypervisor])
2426   ins_l = ins_l[instance.primary_node]
2427   if ins_l.failed or not isinstance(ins_l.data, list):
2428     raise errors.OpExecError("Can't contact node '%s'" %
2429                              instance.primary_node)
2430
2431   if instance.name in ins_l.data:
2432     raise errors.OpExecError("Instance is running, can't shutdown"
2433                              " block devices.")
2434
2435   _ShutdownInstanceDisks(lu, instance)
2436
2437
2438 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2439   """Shutdown block devices of an instance.
2440
2441   This does the shutdown on all nodes of the instance.
2442
2443   If the ignore_primary is false, errors on the primary node are
2444   ignored.
2445
2446   """
2447   result = True
2448   for disk in instance.disks:
2449     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2450       lu.cfg.SetDiskID(top_disk, node)
2451       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2452       if result.failed or not result.data:
2453         logging.error("Could not shutdown block device %s on node %s",
2454                       disk.iv_name, node)
2455         if not ignore_primary or node != instance.primary_node:
2456           result = False
2457   return result
2458
2459
2460 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2461   """Checks if a node has enough free memory.
2462
2463   This function check if a given node has the needed amount of free
2464   memory. In case the node has less memory or we cannot get the
2465   information from the node, this function raise an OpPrereqError
2466   exception.
2467
2468   @type lu: C{LogicalUnit}
2469   @param lu: a logical unit from which we get configuration data
2470   @type node: C{str}
2471   @param node: the node to check
2472   @type reason: C{str}
2473   @param reason: string to use in the error message
2474   @type requested: C{int}
2475   @param requested: the amount of memory in MiB to check for
2476   @type hypervisor: C{str}
2477   @param hypervisor: the hypervisor to ask for memory stats
2478   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2479       we cannot check the node
2480
2481   """
2482   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2483   nodeinfo[node].Raise()
2484   free_mem = nodeinfo[node].data.get('memory_free')
2485   if not isinstance(free_mem, int):
2486     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2487                              " was '%s'" % (node, free_mem))
2488   if requested > free_mem:
2489     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2490                              " needed %s MiB, available %s MiB" %
2491                              (node, reason, requested, free_mem))
2492
2493
2494 class LUStartupInstance(LogicalUnit):
2495   """Starts an instance.
2496
2497   """
2498   HPATH = "instance-start"
2499   HTYPE = constants.HTYPE_INSTANCE
2500   _OP_REQP = ["instance_name", "force"]
2501   REQ_BGL = False
2502
2503   def ExpandNames(self):
2504     self._ExpandAndLockInstance()
2505
2506   def BuildHooksEnv(self):
2507     """Build hooks env.
2508
2509     This runs on master, primary and secondary nodes of the instance.
2510
2511     """
2512     env = {
2513       "FORCE": self.op.force,
2514       }
2515     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2516     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2517           list(self.instance.secondary_nodes))
2518     return env, nl, nl
2519
2520   def CheckPrereq(self):
2521     """Check prerequisites.
2522
2523     This checks that the instance is in the cluster.
2524
2525     """
2526     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2527     assert self.instance is not None, \
2528       "Cannot retrieve locked instance %s" % self.op.instance_name
2529
2530     _CheckNodeOnline(self, instance.primary_node)
2531
2532     bep = self.cfg.GetClusterInfo().FillBE(instance)
2533     # check bridges existance
2534     _CheckInstanceBridgesExist(self, instance)
2535
2536     _CheckNodeFreeMemory(self, instance.primary_node,
2537                          "starting instance %s" % instance.name,
2538                          bep[constants.BE_MEMORY], instance.hypervisor)
2539
2540   def Exec(self, feedback_fn):
2541     """Start the instance.
2542
2543     """
2544     instance = self.instance
2545     force = self.op.force
2546     extra_args = getattr(self.op, "extra_args", "")
2547
2548     self.cfg.MarkInstanceUp(instance.name)
2549
2550     node_current = instance.primary_node
2551
2552     _StartInstanceDisks(self, instance, force)
2553
2554     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2555     if result.failed or not result.data:
2556       _ShutdownInstanceDisks(self, instance)
2557       raise errors.OpExecError("Could not start instance")
2558
2559
2560 class LURebootInstance(LogicalUnit):
2561   """Reboot an instance.
2562
2563   """
2564   HPATH = "instance-reboot"
2565   HTYPE = constants.HTYPE_INSTANCE
2566   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2567   REQ_BGL = False
2568
2569   def ExpandNames(self):
2570     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2571                                    constants.INSTANCE_REBOOT_HARD,
2572                                    constants.INSTANCE_REBOOT_FULL]:
2573       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2574                                   (constants.INSTANCE_REBOOT_SOFT,
2575                                    constants.INSTANCE_REBOOT_HARD,
2576                                    constants.INSTANCE_REBOOT_FULL))
2577     self._ExpandAndLockInstance()
2578
2579   def BuildHooksEnv(self):
2580     """Build hooks env.
2581
2582     This runs on master, primary and secondary nodes of the instance.
2583
2584     """
2585     env = {
2586       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2587       }
2588     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2589     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2590           list(self.instance.secondary_nodes))
2591     return env, nl, nl
2592
2593   def CheckPrereq(self):
2594     """Check prerequisites.
2595
2596     This checks that the instance is in the cluster.
2597
2598     """
2599     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2600     assert self.instance is not None, \
2601       "Cannot retrieve locked instance %s" % self.op.instance_name
2602
2603     _CheckNodeOnline(self, instance.primary_node)
2604
2605     # check bridges existance
2606     _CheckInstanceBridgesExist(self, instance)
2607
2608   def Exec(self, feedback_fn):
2609     """Reboot the instance.
2610
2611     """
2612     instance = self.instance
2613     ignore_secondaries = self.op.ignore_secondaries
2614     reboot_type = self.op.reboot_type
2615     extra_args = getattr(self.op, "extra_args", "")
2616
2617     node_current = instance.primary_node
2618
2619     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2620                        constants.INSTANCE_REBOOT_HARD]:
2621       result = self.rpc.call_instance_reboot(node_current, instance,
2622                                              reboot_type, extra_args)
2623       if result.failed or not result.data:
2624         raise errors.OpExecError("Could not reboot instance")
2625     else:
2626       if not self.rpc.call_instance_shutdown(node_current, instance):
2627         raise errors.OpExecError("could not shutdown instance for full reboot")
2628       _ShutdownInstanceDisks(self, instance)
2629       _StartInstanceDisks(self, instance, ignore_secondaries)
2630       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2631       if result.failed or not result.data:
2632         _ShutdownInstanceDisks(self, instance)
2633         raise errors.OpExecError("Could not start instance for full reboot")
2634
2635     self.cfg.MarkInstanceUp(instance.name)
2636
2637
2638 class LUShutdownInstance(LogicalUnit):
2639   """Shutdown an instance.
2640
2641   """
2642   HPATH = "instance-stop"
2643   HTYPE = constants.HTYPE_INSTANCE
2644   _OP_REQP = ["instance_name"]
2645   REQ_BGL = False
2646
2647   def ExpandNames(self):
2648     self._ExpandAndLockInstance()
2649
2650   def BuildHooksEnv(self):
2651     """Build hooks env.
2652
2653     This runs on master, primary and secondary nodes of the instance.
2654
2655     """
2656     env = _BuildInstanceHookEnvByObject(self, self.instance)
2657     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2658           list(self.instance.secondary_nodes))
2659     return env, nl, nl
2660
2661   def CheckPrereq(self):
2662     """Check prerequisites.
2663
2664     This checks that the instance is in the cluster.
2665
2666     """
2667     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2668     assert self.instance is not None, \
2669       "Cannot retrieve locked instance %s" % self.op.instance_name
2670     _CheckNodeOnline(self, instance.primary_node)
2671
2672   def Exec(self, feedback_fn):
2673     """Shutdown the instance.
2674
2675     """
2676     instance = self.instance
2677     node_current = instance.primary_node
2678     self.cfg.MarkInstanceDown(instance.name)
2679     result = self.rpc.call_instance_shutdown(node_current, instance)
2680     if result.failed or not result.data:
2681       self.proc.LogWarning("Could not shutdown instance")
2682
2683     _ShutdownInstanceDisks(self, instance)
2684
2685
2686 class LUReinstallInstance(LogicalUnit):
2687   """Reinstall an instance.
2688
2689   """
2690   HPATH = "instance-reinstall"
2691   HTYPE = constants.HTYPE_INSTANCE
2692   _OP_REQP = ["instance_name"]
2693   REQ_BGL = False
2694
2695   def ExpandNames(self):
2696     self._ExpandAndLockInstance()
2697
2698   def BuildHooksEnv(self):
2699     """Build hooks env.
2700
2701     This runs on master, primary and secondary nodes of the instance.
2702
2703     """
2704     env = _BuildInstanceHookEnvByObject(self, self.instance)
2705     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2706           list(self.instance.secondary_nodes))
2707     return env, nl, nl
2708
2709   def CheckPrereq(self):
2710     """Check prerequisites.
2711
2712     This checks that the instance is in the cluster and is not running.
2713
2714     """
2715     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2716     assert instance is not None, \
2717       "Cannot retrieve locked instance %s" % self.op.instance_name
2718     _CheckNodeOnline(self, instance.primary_node)
2719
2720     if instance.disk_template == constants.DT_DISKLESS:
2721       raise errors.OpPrereqError("Instance '%s' has no disks" %
2722                                  self.op.instance_name)
2723     if instance.status != "down":
2724       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2725                                  self.op.instance_name)
2726     remote_info = self.rpc.call_instance_info(instance.primary_node,
2727                                               instance.name,
2728                                               instance.hypervisor)
2729     if remote_info.failed or remote_info.data:
2730       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2731                                  (self.op.instance_name,
2732                                   instance.primary_node))
2733
2734     self.op.os_type = getattr(self.op, "os_type", None)
2735     if self.op.os_type is not None:
2736       # OS verification
2737       pnode = self.cfg.GetNodeInfo(
2738         self.cfg.ExpandNodeName(instance.primary_node))
2739       if pnode is None:
2740         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2741                                    self.op.pnode)
2742       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2743       result.Raise()
2744       if not isinstance(result.data, objects.OS):
2745         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2746                                    " primary node"  % self.op.os_type)
2747
2748     self.instance = instance
2749
2750   def Exec(self, feedback_fn):
2751     """Reinstall the instance.
2752
2753     """
2754     inst = self.instance
2755
2756     if self.op.os_type is not None:
2757       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2758       inst.os = self.op.os_type
2759       self.cfg.Update(inst)
2760
2761     _StartInstanceDisks(self, inst, None)
2762     try:
2763       feedback_fn("Running the instance OS create scripts...")
2764       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2765       result.Raise()
2766       if not result.data:
2767         raise errors.OpExecError("Could not install OS for instance %s"
2768                                  " on node %s" %
2769                                  (inst.name, inst.primary_node))
2770     finally:
2771       _ShutdownInstanceDisks(self, inst)
2772
2773
2774 class LURenameInstance(LogicalUnit):
2775   """Rename an instance.
2776
2777   """
2778   HPATH = "instance-rename"
2779   HTYPE = constants.HTYPE_INSTANCE
2780   _OP_REQP = ["instance_name", "new_name"]
2781
2782   def BuildHooksEnv(self):
2783     """Build hooks env.
2784
2785     This runs on master, primary and secondary nodes of the instance.
2786
2787     """
2788     env = _BuildInstanceHookEnvByObject(self, self.instance)
2789     env["INSTANCE_NEW_NAME"] = self.op.new_name
2790     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2791           list(self.instance.secondary_nodes))
2792     return env, nl, nl
2793
2794   def CheckPrereq(self):
2795     """Check prerequisites.
2796
2797     This checks that the instance is in the cluster and is not running.
2798
2799     """
2800     instance = self.cfg.GetInstanceInfo(
2801       self.cfg.ExpandInstanceName(self.op.instance_name))
2802     if instance is None:
2803       raise errors.OpPrereqError("Instance '%s' not known" %
2804                                  self.op.instance_name)
2805     _CheckNodeOnline(self, instance.primary_node)
2806
2807     if instance.status != "down":
2808       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2809                                  self.op.instance_name)
2810     remote_info = self.rpc.call_instance_info(instance.primary_node,
2811                                               instance.name,
2812                                               instance.hypervisor)
2813     remote_info.Raise()
2814     if remote_info.data:
2815       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2816                                  (self.op.instance_name,
2817                                   instance.primary_node))
2818     self.instance = instance
2819
2820     # new name verification
2821     name_info = utils.HostInfo(self.op.new_name)
2822
2823     self.op.new_name = new_name = name_info.name
2824     instance_list = self.cfg.GetInstanceList()
2825     if new_name in instance_list:
2826       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2827                                  new_name)
2828
2829     if not getattr(self.op, "ignore_ip", False):
2830       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2831         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2832                                    (name_info.ip, new_name))
2833
2834
2835   def Exec(self, feedback_fn):
2836     """Reinstall the instance.
2837
2838     """
2839     inst = self.instance
2840     old_name = inst.name
2841
2842     if inst.disk_template == constants.DT_FILE:
2843       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2844
2845     self.cfg.RenameInstance(inst.name, self.op.new_name)
2846     # Change the instance lock. This is definitely safe while we hold the BGL
2847     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2848     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2849
2850     # re-read the instance from the configuration after rename
2851     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2852
2853     if inst.disk_template == constants.DT_FILE:
2854       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2855       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2856                                                      old_file_storage_dir,
2857                                                      new_file_storage_dir)
2858       result.Raise()
2859       if not result.data:
2860         raise errors.OpExecError("Could not connect to node '%s' to rename"
2861                                  " directory '%s' to '%s' (but the instance"
2862                                  " has been renamed in Ganeti)" % (
2863                                  inst.primary_node, old_file_storage_dir,
2864                                  new_file_storage_dir))
2865
2866       if not result.data[0]:
2867         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2868                                  " (but the instance has been renamed in"
2869                                  " Ganeti)" % (old_file_storage_dir,
2870                                                new_file_storage_dir))
2871
2872     _StartInstanceDisks(self, inst, None)
2873     try:
2874       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2875                                                  old_name)
2876       if result.failed or not result.data:
2877         msg = ("Could not run OS rename script for instance %s on node %s"
2878                " (but the instance has been renamed in Ganeti)" %
2879                (inst.name, inst.primary_node))
2880         self.proc.LogWarning(msg)
2881     finally:
2882       _ShutdownInstanceDisks(self, inst)
2883
2884
2885 class LURemoveInstance(LogicalUnit):
2886   """Remove an instance.
2887
2888   """
2889   HPATH = "instance-remove"
2890   HTYPE = constants.HTYPE_INSTANCE
2891   _OP_REQP = ["instance_name", "ignore_failures"]
2892   REQ_BGL = False
2893
2894   def ExpandNames(self):
2895     self._ExpandAndLockInstance()
2896     self.needed_locks[locking.LEVEL_NODE] = []
2897     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2898
2899   def DeclareLocks(self, level):
2900     if level == locking.LEVEL_NODE:
2901       self._LockInstancesNodes()
2902
2903   def BuildHooksEnv(self):
2904     """Build hooks env.
2905
2906     This runs on master, primary and secondary nodes of the instance.
2907
2908     """
2909     env = _BuildInstanceHookEnvByObject(self, self.instance)
2910     nl = [self.cfg.GetMasterNode()]
2911     return env, nl, nl
2912
2913   def CheckPrereq(self):
2914     """Check prerequisites.
2915
2916     This checks that the instance is in the cluster.
2917
2918     """
2919     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2920     assert self.instance is not None, \
2921       "Cannot retrieve locked instance %s" % self.op.instance_name
2922
2923   def Exec(self, feedback_fn):
2924     """Remove the instance.
2925
2926     """
2927     instance = self.instance
2928     logging.info("Shutting down instance %s on node %s",
2929                  instance.name, instance.primary_node)
2930
2931     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2932     if result.failed or not result.data:
2933       if self.op.ignore_failures:
2934         feedback_fn("Warning: can't shutdown instance")
2935       else:
2936         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2937                                  (instance.name, instance.primary_node))
2938
2939     logging.info("Removing block devices for instance %s", instance.name)
2940
2941     if not _RemoveDisks(self, instance):
2942       if self.op.ignore_failures:
2943         feedback_fn("Warning: can't remove instance's disks")
2944       else:
2945         raise errors.OpExecError("Can't remove instance's disks")
2946
2947     logging.info("Removing instance %s out of cluster config", instance.name)
2948
2949     self.cfg.RemoveInstance(instance.name)
2950     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2951
2952
2953 class LUQueryInstances(NoHooksLU):
2954   """Logical unit for querying instances.
2955
2956   """
2957   _OP_REQP = ["output_fields", "names"]
2958   REQ_BGL = False
2959   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2960                                     "admin_state", "admin_ram",
2961                                     "disk_template", "ip", "mac", "bridge",
2962                                     "sda_size", "sdb_size", "vcpus", "tags",
2963                                     "network_port", "beparams",
2964                                     "(disk).(size)/([0-9]+)",
2965                                     "(disk).(sizes)",
2966                                     "(nic).(mac|ip|bridge)/([0-9]+)",
2967                                     "(nic).(macs|ips|bridges)",
2968                                     "(disk|nic).(count)",
2969                                     "serial_no", "hypervisor", "hvparams",] +
2970                                   ["hv/%s" % name
2971                                    for name in constants.HVS_PARAMETERS] +
2972                                   ["be/%s" % name
2973                                    for name in constants.BES_PARAMETERS])
2974   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2975
2976
2977   def ExpandNames(self):
2978     _CheckOutputFields(static=self._FIELDS_STATIC,
2979                        dynamic=self._FIELDS_DYNAMIC,
2980                        selected=self.op.output_fields)
2981
2982     self.needed_locks = {}
2983     self.share_locks[locking.LEVEL_INSTANCE] = 1
2984     self.share_locks[locking.LEVEL_NODE] = 1
2985
2986     if self.op.names:
2987       self.wanted = _GetWantedInstances(self, self.op.names)
2988     else:
2989       self.wanted = locking.ALL_SET
2990
2991     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2992     if self.do_locking:
2993       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2994       self.needed_locks[locking.LEVEL_NODE] = []
2995       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2996
2997   def DeclareLocks(self, level):
2998     if level == locking.LEVEL_NODE and self.do_locking:
2999       self._LockInstancesNodes()
3000
3001   def CheckPrereq(self):
3002     """Check prerequisites.
3003
3004     """
3005     pass
3006
3007   def Exec(self, feedback_fn):
3008     """Computes the list of nodes and their attributes.
3009
3010     """
3011     all_info = self.cfg.GetAllInstancesInfo()
3012     if self.do_locking:
3013       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3014     elif self.wanted != locking.ALL_SET:
3015       instance_names = self.wanted
3016       missing = set(instance_names).difference(all_info.keys())
3017       if missing:
3018         raise errors.OpExecError(
3019           "Some instances were removed before retrieving their data: %s"
3020           % missing)
3021     else:
3022       instance_names = all_info.keys()
3023
3024     instance_names = utils.NiceSort(instance_names)
3025     instance_list = [all_info[iname] for iname in instance_names]
3026
3027     # begin data gathering
3028
3029     nodes = frozenset([inst.primary_node for inst in instance_list])
3030     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3031
3032     bad_nodes = []
3033     off_nodes = []
3034     if self.do_locking:
3035       live_data = {}
3036       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3037       for name in nodes:
3038         result = node_data[name]
3039         if result.offline:
3040           # offline nodes will be in both lists
3041           off_nodes.append(name)
3042         if result.failed:
3043           bad_nodes.append(name)
3044         else:
3045           if result.data:
3046             live_data.update(result.data)
3047             # else no instance is alive
3048     else:
3049       live_data = dict([(name, {}) for name in instance_names])
3050
3051     # end data gathering
3052
3053     HVPREFIX = "hv/"
3054     BEPREFIX = "be/"
3055     output = []
3056     for instance in instance_list:
3057       iout = []
3058       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3059       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3060       for field in self.op.output_fields:
3061         st_match = self._FIELDS_STATIC.Matches(field)
3062         if field == "name":
3063           val = instance.name
3064         elif field == "os":
3065           val = instance.os
3066         elif field == "pnode":
3067           val = instance.primary_node
3068         elif field == "snodes":
3069           val = list(instance.secondary_nodes)
3070         elif field == "admin_state":
3071           val = (instance.status != "down")
3072         elif field == "oper_state":
3073           if instance.primary_node in bad_nodes:
3074             val = None
3075           else:
3076             val = bool(live_data.get(instance.name))
3077         elif field == "status":
3078           if instance.primary_node in off_nodes:
3079             val = "ERROR_nodeoffline"
3080           elif instance.primary_node in bad_nodes:
3081             val = "ERROR_nodedown"
3082           else:
3083             running = bool(live_data.get(instance.name))
3084             if running:
3085               if instance.status != "down":
3086                 val = "running"
3087               else:
3088                 val = "ERROR_up"
3089             else:
3090               if instance.status != "down":
3091                 val = "ERROR_down"
3092               else:
3093                 val = "ADMIN_down"
3094         elif field == "oper_ram":
3095           if instance.primary_node in bad_nodes:
3096             val = None
3097           elif instance.name in live_data:
3098             val = live_data[instance.name].get("memory", "?")
3099           else:
3100             val = "-"
3101         elif field == "disk_template":
3102           val = instance.disk_template
3103         elif field == "ip":
3104           val = instance.nics[0].ip
3105         elif field == "bridge":
3106           val = instance.nics[0].bridge
3107         elif field == "mac":
3108           val = instance.nics[0].mac
3109         elif field == "sda_size" or field == "sdb_size":
3110           idx = ord(field[2]) - ord('a')
3111           try:
3112             val = instance.FindDisk(idx).size
3113           except errors.OpPrereqError:
3114             val = None
3115         elif field == "tags":
3116           val = list(instance.GetTags())
3117         elif field == "serial_no":
3118           val = instance.serial_no
3119         elif field == "network_port":
3120           val = instance.network_port
3121         elif field == "hypervisor":
3122           val = instance.hypervisor
3123         elif field == "hvparams":
3124           val = i_hv
3125         elif (field.startswith(HVPREFIX) and
3126               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3127           val = i_hv.get(field[len(HVPREFIX):], None)
3128         elif field == "beparams":
3129           val = i_be
3130         elif (field.startswith(BEPREFIX) and
3131               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3132           val = i_be.get(field[len(BEPREFIX):], None)
3133         elif st_match and st_match.groups():
3134           # matches a variable list
3135           st_groups = st_match.groups()
3136           if st_groups and st_groups[0] == "disk":
3137             if st_groups[1] == "count":
3138               val = len(instance.disks)
3139             elif st_groups[1] == "sizes":
3140               val = [disk.size for disk in instance.disks]
3141             elif st_groups[1] == "size":
3142               try:
3143                 val = instance.FindDisk(st_groups[2]).size
3144               except errors.OpPrereqError:
3145                 val = None
3146             else:
3147               assert False, "Unhandled disk parameter"
3148           elif st_groups[0] == "nic":
3149             if st_groups[1] == "count":
3150               val = len(instance.nics)
3151             elif st_groups[1] == "macs":
3152               val = [nic.mac for nic in instance.nics]
3153             elif st_groups[1] == "ips":
3154               val = [nic.ip for nic in instance.nics]
3155             elif st_groups[1] == "bridges":
3156               val = [nic.bridge for nic in instance.nics]
3157             else:
3158               # index-based item
3159               nic_idx = int(st_groups[2])
3160               if nic_idx >= len(instance.nics):
3161                 val = None
3162               else:
3163                 if st_groups[1] == "mac":
3164                   val = instance.nics[nic_idx].mac
3165                 elif st_groups[1] == "ip":
3166                   val = instance.nics[nic_idx].ip
3167                 elif st_groups[1] == "bridge":
3168                   val = instance.nics[nic_idx].bridge
3169                 else:
3170                   assert False, "Unhandled NIC parameter"
3171           else:
3172             assert False, "Unhandled variable parameter"
3173         else:
3174           raise errors.ParameterError(field)
3175         iout.append(val)
3176       output.append(iout)
3177
3178     return output
3179
3180
3181 class LUFailoverInstance(LogicalUnit):
3182   """Failover an instance.
3183
3184   """
3185   HPATH = "instance-failover"
3186   HTYPE = constants.HTYPE_INSTANCE
3187   _OP_REQP = ["instance_name", "ignore_consistency"]
3188   REQ_BGL = False
3189
3190   def ExpandNames(self):
3191     self._ExpandAndLockInstance()
3192     self.needed_locks[locking.LEVEL_NODE] = []
3193     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3194
3195   def DeclareLocks(self, level):
3196     if level == locking.LEVEL_NODE:
3197       self._LockInstancesNodes()
3198
3199   def BuildHooksEnv(self):
3200     """Build hooks env.
3201
3202     This runs on master, primary and secondary nodes of the instance.
3203
3204     """
3205     env = {
3206       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3207       }
3208     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3209     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3210     return env, nl, nl
3211
3212   def CheckPrereq(self):
3213     """Check prerequisites.
3214
3215     This checks that the instance is in the cluster.
3216
3217     """
3218     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3219     assert self.instance is not None, \
3220       "Cannot retrieve locked instance %s" % self.op.instance_name
3221
3222     bep = self.cfg.GetClusterInfo().FillBE(instance)
3223     if instance.disk_template not in constants.DTS_NET_MIRROR:
3224       raise errors.OpPrereqError("Instance's disk layout is not"
3225                                  " network mirrored, cannot failover.")
3226
3227     secondary_nodes = instance.secondary_nodes
3228     if not secondary_nodes:
3229       raise errors.ProgrammerError("no secondary node but using "
3230                                    "a mirrored disk template")
3231
3232     target_node = secondary_nodes[0]
3233     _CheckNodeOnline(self, target_node)
3234     # check memory requirements on the secondary node
3235     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3236                          instance.name, bep[constants.BE_MEMORY],
3237                          instance.hypervisor)
3238
3239     # check bridge existance
3240     brlist = [nic.bridge for nic in instance.nics]
3241     result = self.rpc.call_bridges_exist(target_node, brlist)
3242     result.Raise()
3243     if not result.data:
3244       raise errors.OpPrereqError("One or more target bridges %s does not"
3245                                  " exist on destination node '%s'" %
3246                                  (brlist, target_node))
3247
3248   def Exec(self, feedback_fn):
3249     """Failover an instance.
3250
3251     The failover is done by shutting it down on its present node and
3252     starting it on the secondary.
3253
3254     """
3255     instance = self.instance
3256
3257     source_node = instance.primary_node
3258     target_node = instance.secondary_nodes[0]
3259
3260     feedback_fn("* checking disk consistency between source and target")
3261     for dev in instance.disks:
3262       # for drbd, these are drbd over lvm
3263       if not _CheckDiskConsistency(self, dev, target_node, False):
3264         if instance.status == "up" and not self.op.ignore_consistency:
3265           raise errors.OpExecError("Disk %s is degraded on target node,"
3266                                    " aborting failover." % dev.iv_name)
3267
3268     feedback_fn("* shutting down instance on source node")
3269     logging.info("Shutting down instance %s on node %s",
3270                  instance.name, source_node)
3271
3272     result = self.rpc.call_instance_shutdown(source_node, instance)
3273     if result.failed or not result.data:
3274       if self.op.ignore_consistency:
3275         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3276                              " Proceeding"
3277                              " anyway. Please make sure node %s is down",
3278                              instance.name, source_node, source_node)
3279       else:
3280         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3281                                  (instance.name, source_node))
3282
3283     feedback_fn("* deactivating the instance's disks on source node")
3284     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3285       raise errors.OpExecError("Can't shut down the instance's disks.")
3286
3287     instance.primary_node = target_node
3288     # distribute new instance config to the other nodes
3289     self.cfg.Update(instance)
3290
3291     # Only start the instance if it's marked as up
3292     if instance.status == "up":
3293       feedback_fn("* activating the instance's disks on target node")
3294       logging.info("Starting instance %s on node %s",
3295                    instance.name, target_node)
3296
3297       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3298                                                ignore_secondaries=True)
3299       if not disks_ok:
3300         _ShutdownInstanceDisks(self, instance)
3301         raise errors.OpExecError("Can't activate the instance's disks")
3302
3303       feedback_fn("* starting the instance on the target node")
3304       result = self.rpc.call_instance_start(target_node, instance, None)
3305       if result.failed or not result.data:
3306         _ShutdownInstanceDisks(self, instance)
3307         raise errors.OpExecError("Could not start instance %s on node %s." %
3308                                  (instance.name, target_node))
3309
3310
3311 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3312   """Create a tree of block devices on the primary node.
3313
3314   This always creates all devices.
3315
3316   """
3317   if device.children:
3318     for child in device.children:
3319       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3320         return False
3321
3322   lu.cfg.SetDiskID(device, node)
3323   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3324                                        instance.name, True, info)
3325   if new_id.failed or not new_id.data:
3326     return False
3327   if device.physical_id is None:
3328     device.physical_id = new_id
3329   return True
3330
3331
3332 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3333   """Create a tree of block devices on a secondary node.
3334
3335   If this device type has to be created on secondaries, create it and
3336   all its children.
3337
3338   If not, just recurse to children keeping the same 'force' value.
3339
3340   """
3341   if device.CreateOnSecondary():
3342     force = True
3343   if device.children:
3344     for child in device.children:
3345       if not _CreateBlockDevOnSecondary(lu, node, instance,
3346                                         child, force, info):
3347         return False
3348
3349   if not force:
3350     return True
3351   lu.cfg.SetDiskID(device, node)
3352   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3353                                        instance.name, False, info)
3354   if new_id.failed or not new_id.data:
3355     return False
3356   if device.physical_id is None:
3357     device.physical_id = new_id
3358   return True
3359
3360
3361 def _GenerateUniqueNames(lu, exts):
3362   """Generate a suitable LV name.
3363
3364   This will generate a logical volume name for the given instance.
3365
3366   """
3367   results = []
3368   for val in exts:
3369     new_id = lu.cfg.GenerateUniqueID()
3370     results.append("%s%s" % (new_id, val))
3371   return results
3372
3373
3374 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3375                          p_minor, s_minor):
3376   """Generate a drbd8 device complete with its children.
3377
3378   """
3379   port = lu.cfg.AllocatePort()
3380   vgname = lu.cfg.GetVGName()
3381   shared_secret = lu.cfg.GenerateDRBDSecret()
3382   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3383                           logical_id=(vgname, names[0]))
3384   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3385                           logical_id=(vgname, names[1]))
3386   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3387                           logical_id=(primary, secondary, port,
3388                                       p_minor, s_minor,
3389                                       shared_secret),
3390                           children=[dev_data, dev_meta],
3391                           iv_name=iv_name)
3392   return drbd_dev
3393
3394
3395 def _GenerateDiskTemplate(lu, template_name,
3396                           instance_name, primary_node,
3397                           secondary_nodes, disk_info,
3398                           file_storage_dir, file_driver,
3399                           base_index):
3400   """Generate the entire disk layout for a given template type.
3401
3402   """
3403   #TODO: compute space requirements
3404
3405   vgname = lu.cfg.GetVGName()
3406   disk_count = len(disk_info)
3407   disks = []
3408   if template_name == constants.DT_DISKLESS:
3409     pass
3410   elif template_name == constants.DT_PLAIN:
3411     if len(secondary_nodes) != 0:
3412       raise errors.ProgrammerError("Wrong template configuration")
3413
3414     names = _GenerateUniqueNames(lu, [".disk%d" % i
3415                                       for i in range(disk_count)])
3416     for idx, disk in enumerate(disk_info):
3417       disk_index = idx + base_index
3418       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3419                               logical_id=(vgname, names[idx]),
3420                               iv_name="disk/%d" % disk_index)
3421       disks.append(disk_dev)
3422   elif template_name == constants.DT_DRBD8:
3423     if len(secondary_nodes) != 1:
3424       raise errors.ProgrammerError("Wrong template configuration")
3425     remote_node = secondary_nodes[0]
3426     minors = lu.cfg.AllocateDRBDMinor(
3427       [primary_node, remote_node] * len(disk_info), instance_name)
3428
3429     names = _GenerateUniqueNames(lu,
3430                                  [".disk%d_%s" % (i, s)
3431                                   for i in range(disk_count)
3432                                   for s in ("data", "meta")
3433                                   ])
3434     for idx, disk in enumerate(disk_info):
3435       disk_index = idx + base_index
3436       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3437                                       disk["size"], names[idx*2:idx*2+2],
3438                                       "disk/%d" % disk_index,
3439                                       minors[idx*2], minors[idx*2+1])
3440       disks.append(disk_dev)
3441   elif template_name == constants.DT_FILE:
3442     if len(secondary_nodes) != 0:
3443       raise errors.ProgrammerError("Wrong template configuration")
3444
3445     for idx, disk in enumerate(disk_info):
3446       disk_index = idx + base_index
3447       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3448                               iv_name="disk/%d" % disk_index,
3449                               logical_id=(file_driver,
3450                                           "%s/disk%d" % (file_storage_dir,
3451                                                          idx)))
3452       disks.append(disk_dev)
3453   else:
3454     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3455   return disks
3456
3457
3458 def _GetInstanceInfoText(instance):
3459   """Compute that text that should be added to the disk's metadata.
3460
3461   """
3462   return "originstname+%s" % instance.name
3463
3464
3465 def _CreateDisks(lu, instance):
3466   """Create all disks for an instance.
3467
3468   This abstracts away some work from AddInstance.
3469
3470   @type lu: L{LogicalUnit}
3471   @param lu: the logical unit on whose behalf we execute
3472   @type instance: L{objects.Instance}
3473   @param instance: the instance whose disks we should create
3474   @rtype: boolean
3475   @return: the success of the creation
3476
3477   """
3478   info = _GetInstanceInfoText(instance)
3479
3480   if instance.disk_template == constants.DT_FILE:
3481     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3482     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3483                                                  file_storage_dir)
3484
3485     if result.failed or not result.data:
3486       logging.error("Could not connect to node '%s'", instance.primary_node)
3487       return False
3488
3489     if not result.data[0]:
3490       logging.error("Failed to create directory '%s'", file_storage_dir)
3491       return False
3492
3493   # Note: this needs to be kept in sync with adding of disks in
3494   # LUSetInstanceParams
3495   for device in instance.disks:
3496     logging.info("Creating volume %s for instance %s",
3497                  device.iv_name, instance.name)
3498     #HARDCODE
3499     for secondary_node in instance.secondary_nodes:
3500       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3501                                         device, False, info):
3502         logging.error("Failed to create volume %s (%s) on secondary node %s!",
3503                       device.iv_name, device, secondary_node)
3504         return False
3505     #HARDCODE
3506     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3507                                     instance, device, info):
3508       logging.error("Failed to create volume %s on primary!", device.iv_name)
3509       return False
3510
3511   return True
3512
3513
3514 def _RemoveDisks(lu, instance):
3515   """Remove all disks for an instance.
3516
3517   This abstracts away some work from `AddInstance()` and
3518   `RemoveInstance()`. Note that in case some of the devices couldn't
3519   be removed, the removal will continue with the other ones (compare
3520   with `_CreateDisks()`).
3521
3522   @type lu: L{LogicalUnit}
3523   @param lu: the logical unit on whose behalf we execute
3524   @type instance: L{objects.Instance}
3525   @param instance: the instance whose disks we should remove
3526   @rtype: boolean
3527   @return: the success of the removal
3528
3529   """
3530   logging.info("Removing block devices for instance %s", instance.name)
3531
3532   result = True
3533   for device in instance.disks:
3534     for node, disk in device.ComputeNodeTree(instance.primary_node):
3535       lu.cfg.SetDiskID(disk, node)
3536       result = lu.rpc.call_blockdev_remove(node, disk)
3537       if result.failed or not result.data:
3538         lu.proc.LogWarning("Could not remove block device %s on node %s,"
3539                            " continuing anyway", device.iv_name, node)
3540         result = False
3541
3542   if instance.disk_template == constants.DT_FILE:
3543     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3544     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3545                                                  file_storage_dir)
3546     if result.failed or not result.data:
3547       logging.error("Could not remove directory '%s'", file_storage_dir)
3548       result = False
3549
3550   return result
3551
3552
3553 def _ComputeDiskSize(disk_template, disks):
3554   """Compute disk size requirements in the volume group
3555
3556   """
3557   # Required free disk space as a function of disk and swap space
3558   req_size_dict = {
3559     constants.DT_DISKLESS: None,
3560     constants.DT_PLAIN: sum(d["size"] for d in disks),
3561     # 128 MB are added for drbd metadata for each disk
3562     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3563     constants.DT_FILE: None,
3564   }
3565
3566   if disk_template not in req_size_dict:
3567     raise errors.ProgrammerError("Disk template '%s' size requirement"
3568                                  " is unknown" %  disk_template)
3569
3570   return req_size_dict[disk_template]
3571
3572
3573 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3574   """Hypervisor parameter validation.
3575
3576   This function abstract the hypervisor parameter validation to be
3577   used in both instance create and instance modify.
3578
3579   @type lu: L{LogicalUnit}
3580   @param lu: the logical unit for which we check
3581   @type nodenames: list
3582   @param nodenames: the list of nodes on which we should check
3583   @type hvname: string
3584   @param hvname: the name of the hypervisor we should use
3585   @type hvparams: dict
3586   @param hvparams: the parameters which we need to check
3587   @raise errors.OpPrereqError: if the parameters are not valid
3588
3589   """
3590   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3591                                                   hvname,
3592                                                   hvparams)
3593   for node in nodenames:
3594     info = hvinfo[node]
3595     info.Raise()
3596     if not info.data or not isinstance(info.data, (tuple, list)):
3597       raise errors.OpPrereqError("Cannot get current information"
3598                                  " from node '%s' (%s)" % (node, info.data))
3599     if not info.data[0]:
3600       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3601                                  " %s" % info.data[1])
3602
3603
3604 class LUCreateInstance(LogicalUnit):
3605   """Create an instance.
3606
3607   """
3608   HPATH = "instance-add"
3609   HTYPE = constants.HTYPE_INSTANCE
3610   _OP_REQP = ["instance_name", "disks", "disk_template",
3611               "mode", "start",
3612               "wait_for_sync", "ip_check", "nics",
3613               "hvparams", "beparams"]
3614   REQ_BGL = False
3615
3616   def _ExpandNode(self, node):
3617     """Expands and checks one node name.
3618
3619     """
3620     node_full = self.cfg.ExpandNodeName(node)
3621     if node_full is None:
3622       raise errors.OpPrereqError("Unknown node %s" % node)
3623     return node_full
3624
3625   def ExpandNames(self):
3626     """ExpandNames for CreateInstance.
3627
3628     Figure out the right locks for instance creation.
3629
3630     """
3631     self.needed_locks = {}
3632
3633     # set optional parameters to none if they don't exist
3634     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3635       if not hasattr(self.op, attr):
3636         setattr(self.op, attr, None)
3637
3638     # cheap checks, mostly valid constants given
3639
3640     # verify creation mode
3641     if self.op.mode not in (constants.INSTANCE_CREATE,
3642                             constants.INSTANCE_IMPORT):
3643       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3644                                  self.op.mode)
3645
3646     # disk template and mirror node verification
3647     if self.op.disk_template not in constants.DISK_TEMPLATES:
3648       raise errors.OpPrereqError("Invalid disk template name")
3649
3650     if self.op.hypervisor is None:
3651       self.op.hypervisor = self.cfg.GetHypervisorType()
3652
3653     cluster = self.cfg.GetClusterInfo()
3654     enabled_hvs = cluster.enabled_hypervisors
3655     if self.op.hypervisor not in enabled_hvs:
3656       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3657                                  " cluster (%s)" % (self.op.hypervisor,
3658                                   ",".join(enabled_hvs)))
3659
3660     # check hypervisor parameter syntax (locally)
3661
3662     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3663                                   self.op.hvparams)
3664     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3665     hv_type.CheckParameterSyntax(filled_hvp)
3666
3667     # fill and remember the beparams dict
3668     utils.CheckBEParams(self.op.beparams)
3669     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3670                                     self.op.beparams)
3671
3672     #### instance parameters check
3673
3674     # instance name verification
3675     hostname1 = utils.HostInfo(self.op.instance_name)
3676     self.op.instance_name = instance_name = hostname1.name
3677
3678     # this is just a preventive check, but someone might still add this
3679     # instance in the meantime, and creation will fail at lock-add time
3680     if instance_name in self.cfg.GetInstanceList():
3681       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3682                                  instance_name)
3683
3684     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3685
3686     # NIC buildup
3687     self.nics = []
3688     for nic in self.op.nics:
3689       # ip validity checks
3690       ip = nic.get("ip", None)
3691       if ip is None or ip.lower() == "none":
3692         nic_ip = None
3693       elif ip.lower() == constants.VALUE_AUTO:
3694         nic_ip = hostname1.ip
3695       else:
3696         if not utils.IsValidIP(ip):
3697           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3698                                      " like a valid IP" % ip)
3699         nic_ip = ip
3700
3701       # MAC address verification
3702       mac = nic.get("mac", constants.VALUE_AUTO)
3703       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3704         if not utils.IsValidMac(mac.lower()):
3705           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3706                                      mac)
3707       # bridge verification
3708       bridge = nic.get("bridge", self.cfg.GetDefBridge())
3709       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3710
3711     # disk checks/pre-build
3712     self.disks = []
3713     for disk in self.op.disks:
3714       mode = disk.get("mode", constants.DISK_RDWR)
3715       if mode not in constants.DISK_ACCESS_SET:
3716         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3717                                    mode)
3718       size = disk.get("size", None)
3719       if size is None:
3720         raise errors.OpPrereqError("Missing disk size")
3721       try:
3722         size = int(size)
3723       except ValueError:
3724         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3725       self.disks.append({"size": size, "mode": mode})
3726
3727     # used in CheckPrereq for ip ping check
3728     self.check_ip = hostname1.ip
3729
3730     # file storage checks
3731     if (self.op.file_driver and
3732         not self.op.file_driver in constants.FILE_DRIVER):
3733       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3734                                  self.op.file_driver)
3735
3736     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3737       raise errors.OpPrereqError("File storage directory path not absolute")
3738
3739     ### Node/iallocator related checks
3740     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3741       raise errors.OpPrereqError("One and only one of iallocator and primary"
3742                                  " node must be given")
3743
3744     if self.op.iallocator:
3745       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3746     else:
3747       self.op.pnode = self._ExpandNode(self.op.pnode)
3748       nodelist = [self.op.pnode]
3749       if self.op.snode is not None:
3750         self.op.snode = self._ExpandNode(self.op.snode)
3751         nodelist.append(self.op.snode)
3752       self.needed_locks[locking.LEVEL_NODE] = nodelist
3753
3754     # in case of import lock the source node too
3755     if self.op.mode == constants.INSTANCE_IMPORT:
3756       src_node = getattr(self.op, "src_node", None)
3757       src_path = getattr(self.op, "src_path", None)
3758
3759       if src_path is None:
3760         self.op.src_path = src_path = self.op.instance_name
3761
3762       if src_node is None:
3763         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3764         self.op.src_node = None
3765         if os.path.isabs(src_path):
3766           raise errors.OpPrereqError("Importing an instance from an absolute"
3767                                      " path requires a source node option.")
3768       else:
3769         self.op.src_node = src_node = self._ExpandNode(src_node)
3770         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3771           self.needed_locks[locking.LEVEL_NODE].append(src_node)
3772         if not os.path.isabs(src_path):
3773           self.op.src_path = src_path = \
3774             os.path.join(constants.EXPORT_DIR, src_path)
3775
3776     else: # INSTANCE_CREATE
3777       if getattr(self.op, "os_type", None) is None:
3778         raise errors.OpPrereqError("No guest OS specified")
3779
3780   def _RunAllocator(self):
3781     """Run the allocator based on input opcode.
3782
3783     """
3784     nics = [n.ToDict() for n in self.nics]
3785     ial = IAllocator(self,
3786                      mode=constants.IALLOCATOR_MODE_ALLOC,
3787                      name=self.op.instance_name,
3788                      disk_template=self.op.disk_template,
3789                      tags=[],
3790                      os=self.op.os_type,
3791                      vcpus=self.be_full[constants.BE_VCPUS],
3792                      mem_size=self.be_full[constants.BE_MEMORY],
3793                      disks=self.disks,
3794                      nics=nics,
3795                      hypervisor=self.op.hypervisor,
3796                      )
3797
3798     ial.Run(self.op.iallocator)
3799
3800     if not ial.success:
3801       raise errors.OpPrereqError("Can't compute nodes using"
3802                                  " iallocator '%s': %s" % (self.op.iallocator,
3803                                                            ial.info))
3804     if len(ial.nodes) != ial.required_nodes:
3805       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3806                                  " of nodes (%s), required %s" %
3807                                  (self.op.iallocator, len(ial.nodes),
3808                                   ial.required_nodes))
3809     self.op.pnode = ial.nodes[0]
3810     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3811                  self.op.instance_name, self.op.iallocator,
3812                  ", ".join(ial.nodes))
3813     if ial.required_nodes == 2:
3814       self.op.snode = ial.nodes[1]
3815
3816   def BuildHooksEnv(self):
3817     """Build hooks env.
3818
3819     This runs on master, primary and secondary nodes of the instance.
3820
3821     """
3822     env = {
3823       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3824       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3825       "INSTANCE_ADD_MODE": self.op.mode,
3826       }
3827     if self.op.mode == constants.INSTANCE_IMPORT:
3828       env["INSTANCE_SRC_NODE"] = self.op.src_node
3829       env["INSTANCE_SRC_PATH"] = self.op.src_path
3830       env["INSTANCE_SRC_IMAGES"] = self.src_images
3831
3832     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3833       primary_node=self.op.pnode,
3834       secondary_nodes=self.secondaries,
3835       status=self.instance_status,
3836       os_type=self.op.os_type,
3837       memory=self.be_full[constants.BE_MEMORY],
3838       vcpus=self.be_full[constants.BE_VCPUS],
3839       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3840     ))
3841
3842     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3843           self.secondaries)
3844     return env, nl, nl
3845
3846
3847   def CheckPrereq(self):
3848     """Check prerequisites.
3849
3850     """
3851     if (not self.cfg.GetVGName() and
3852         self.op.disk_template not in constants.DTS_NOT_LVM):
3853       raise errors.OpPrereqError("Cluster does not support lvm-based"
3854                                  " instances")
3855
3856
3857     if self.op.mode == constants.INSTANCE_IMPORT:
3858       src_node = self.op.src_node
3859       src_path = self.op.src_path
3860
3861       if src_node is None:
3862         exp_list = self.rpc.call_export_list(
3863           self.acquired_locks[locking.LEVEL_NODE])
3864         found = False
3865         for node in exp_list:
3866           if not exp_list[node].failed and src_path in exp_list[node].data:
3867             found = True
3868             self.op.src_node = src_node = node
3869             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3870                                                        src_path)
3871             break
3872         if not found:
3873           raise errors.OpPrereqError("No export found for relative path %s" %
3874                                       src_path)
3875
3876       _CheckNodeOnline(self, src_node)
3877       result = self.rpc.call_export_info(src_node, src_path)
3878       result.Raise()
3879       if not result.data:
3880         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3881
3882       export_info = result.data
3883       if not export_info.has_section(constants.INISECT_EXP):
3884         raise errors.ProgrammerError("Corrupted export config")
3885
3886       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3887       if (int(ei_version) != constants.EXPORT_VERSION):
3888         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3889                                    (ei_version, constants.EXPORT_VERSION))
3890
3891       # Check that the new instance doesn't have less disks than the export
3892       instance_disks = len(self.disks)
3893       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3894       if instance_disks < export_disks:
3895         raise errors.OpPrereqError("Not enough disks to import."
3896                                    " (instance: %d, export: %d)" %
3897                                    (instance_disks, export_disks))
3898
3899       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3900       disk_images = []
3901       for idx in range(export_disks):
3902         option = 'disk%d_dump' % idx
3903         if export_info.has_option(constants.INISECT_INS, option):
3904           # FIXME: are the old os-es, disk sizes, etc. useful?
3905           export_name = export_info.get(constants.INISECT_INS, option)
3906           image = os.path.join(src_path, export_name)
3907           disk_images.append(image)
3908         else:
3909           disk_images.append(False)
3910
3911       self.src_images = disk_images
3912
3913       old_name = export_info.get(constants.INISECT_INS, 'name')
3914       # FIXME: int() here could throw a ValueError on broken exports
3915       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3916       if self.op.instance_name == old_name:
3917         for idx, nic in enumerate(self.nics):
3918           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3919             nic_mac_ini = 'nic%d_mac' % idx
3920             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3921
3922     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3923     if self.op.start and not self.op.ip_check:
3924       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3925                                  " adding an instance in start mode")
3926
3927     if self.op.ip_check:
3928       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3929         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3930                                    (self.check_ip, self.op.instance_name))
3931
3932     #### allocator run
3933
3934     if self.op.iallocator is not None:
3935       self._RunAllocator()
3936
3937     #### node related checks
3938
3939     # check primary node
3940     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3941     assert self.pnode is not None, \
3942       "Cannot retrieve locked node %s" % self.op.pnode
3943     if pnode.offline:
3944       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
3945                                  pnode.name)
3946
3947     self.secondaries = []
3948
3949     # mirror node verification
3950     if self.op.disk_template in constants.DTS_NET_MIRROR:
3951       if self.op.snode is None:
3952         raise errors.OpPrereqError("The networked disk templates need"
3953                                    " a mirror node")
3954       if self.op.snode == pnode.name:
3955         raise errors.OpPrereqError("The secondary node cannot be"
3956                                    " the primary node.")
3957       self.secondaries.append(self.op.snode)
3958       _CheckNodeOnline(self, self.op.snode)
3959
3960     nodenames = [pnode.name] + self.secondaries
3961
3962     req_size = _ComputeDiskSize(self.op.disk_template,
3963                                 self.disks)
3964
3965     # Check lv size requirements
3966     if req_size is not None:
3967       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3968                                          self.op.hypervisor)
3969       for node in nodenames:
3970         info = nodeinfo[node]
3971         info.Raise()
3972         info = info.data
3973         if not info:
3974           raise errors.OpPrereqError("Cannot get current information"
3975                                      " from node '%s'" % node)
3976         vg_free = info.get('vg_free', None)
3977         if not isinstance(vg_free, int):
3978           raise errors.OpPrereqError("Can't compute free disk space on"
3979                                      " node %s" % node)
3980         if req_size > info['vg_free']:
3981           raise errors.OpPrereqError("Not enough disk space on target node %s."
3982                                      " %d MB available, %d MB required" %
3983                                      (node, info['vg_free'], req_size))
3984
3985     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3986
3987     # os verification
3988     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3989     result.Raise()
3990     if not isinstance(result.data, objects.OS):
3991       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3992                                  " primary node"  % self.op.os_type)
3993
3994     # bridge check on primary node
3995     bridges = [n.bridge for n in self.nics]
3996     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3997     result.Raise()
3998     if not result.data:
3999       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4000                                  " exist on destination node '%s'" %
4001                                  (",".join(bridges), pnode.name))
4002
4003     # memory check on primary node
4004     if self.op.start:
4005       _CheckNodeFreeMemory(self, self.pnode.name,
4006                            "creating instance %s" % self.op.instance_name,
4007                            self.be_full[constants.BE_MEMORY],
4008                            self.op.hypervisor)
4009
4010     if self.op.start:
4011       self.instance_status = 'up'
4012     else:
4013       self.instance_status = 'down'
4014
4015   def Exec(self, feedback_fn):
4016     """Create and add the instance to the cluster.
4017
4018     """
4019     instance = self.op.instance_name
4020     pnode_name = self.pnode.name
4021
4022     for nic in self.nics:
4023       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4024         nic.mac = self.cfg.GenerateMAC()
4025
4026     ht_kind = self.op.hypervisor
4027     if ht_kind in constants.HTS_REQ_PORT:
4028       network_port = self.cfg.AllocatePort()
4029     else:
4030       network_port = None
4031
4032     ##if self.op.vnc_bind_address is None:
4033     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4034
4035     # this is needed because os.path.join does not accept None arguments
4036     if self.op.file_storage_dir is None:
4037       string_file_storage_dir = ""
4038     else:
4039       string_file_storage_dir = self.op.file_storage_dir
4040
4041     # build the full file storage dir path
4042     file_storage_dir = os.path.normpath(os.path.join(
4043                                         self.cfg.GetFileStorageDir(),
4044                                         string_file_storage_dir, instance))
4045
4046
4047     disks = _GenerateDiskTemplate(self,
4048                                   self.op.disk_template,
4049                                   instance, pnode_name,
4050                                   self.secondaries,
4051                                   self.disks,
4052                                   file_storage_dir,
4053                                   self.op.file_driver,
4054                                   0)
4055
4056     iobj = objects.Instance(name=instance, os=self.op.os_type,
4057                             primary_node=pnode_name,
4058                             nics=self.nics, disks=disks,
4059                             disk_template=self.op.disk_template,
4060                             status=self.instance_status,
4061                             network_port=network_port,
4062                             beparams=self.op.beparams,
4063                             hvparams=self.op.hvparams,
4064                             hypervisor=self.op.hypervisor,
4065                             )
4066
4067     feedback_fn("* creating instance disks...")
4068     if not _CreateDisks(self, iobj):
4069       _RemoveDisks(self, iobj)
4070       self.cfg.ReleaseDRBDMinors(instance)
4071       raise errors.OpExecError("Device creation failed, reverting...")
4072
4073     feedback_fn("adding instance %s to cluster config" % instance)
4074
4075     self.cfg.AddInstance(iobj)
4076     # Declare that we don't want to remove the instance lock anymore, as we've
4077     # added the instance to the config
4078     del self.remove_locks[locking.LEVEL_INSTANCE]
4079     # Remove the temp. assignements for the instance's drbds
4080     self.cfg.ReleaseDRBDMinors(instance)
4081     # Unlock all the nodes
4082     if self.op.mode == constants.INSTANCE_IMPORT:
4083       nodes_keep = [self.op.src_node]
4084       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4085                        if node != self.op.src_node]
4086       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4087       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4088     else:
4089       self.context.glm.release(locking.LEVEL_NODE)
4090       del self.acquired_locks[locking.LEVEL_NODE]
4091
4092     if self.op.wait_for_sync:
4093       disk_abort = not _WaitForSync(self, iobj)
4094     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4095       # make sure the disks are not degraded (still sync-ing is ok)
4096       time.sleep(15)
4097       feedback_fn("* checking mirrors status")
4098       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4099     else:
4100       disk_abort = False
4101
4102     if disk_abort:
4103       _RemoveDisks(self, iobj)
4104       self.cfg.RemoveInstance(iobj.name)
4105       # Make sure the instance lock gets removed
4106       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4107       raise errors.OpExecError("There are some degraded disks for"
4108                                " this instance")
4109
4110     feedback_fn("creating os for instance %s on node %s" %
4111                 (instance, pnode_name))
4112
4113     if iobj.disk_template != constants.DT_DISKLESS:
4114       if self.op.mode == constants.INSTANCE_CREATE:
4115         feedback_fn("* running the instance OS create scripts...")
4116         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4117         result.Raise()
4118         if not result.data:
4119           raise errors.OpExecError("Could not add os for instance %s"
4120                                    " on node %s" %
4121                                    (instance, pnode_name))
4122
4123       elif self.op.mode == constants.INSTANCE_IMPORT:
4124         feedback_fn("* running the instance OS import scripts...")
4125         src_node = self.op.src_node
4126         src_images = self.src_images
4127         cluster_name = self.cfg.GetClusterName()
4128         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4129                                                          src_node, src_images,
4130                                                          cluster_name)
4131         import_result.Raise()
4132         for idx, result in enumerate(import_result.data):
4133           if not result:
4134             self.LogWarning("Could not import the image %s for instance"
4135                             " %s, disk %d, on node %s" %
4136                             (src_images[idx], instance, idx, pnode_name))
4137       else:
4138         # also checked in the prereq part
4139         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4140                                      % self.op.mode)
4141
4142     if self.op.start:
4143       logging.info("Starting instance %s on node %s", instance, pnode_name)
4144       feedback_fn("* starting instance...")
4145       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4146       result.Raise()
4147       if not result.data:
4148         raise errors.OpExecError("Could not start instance")
4149
4150
4151 class LUConnectConsole(NoHooksLU):
4152   """Connect to an instance's console.
4153
4154   This is somewhat special in that it returns the command line that
4155   you need to run on the master node in order to connect to the
4156   console.
4157
4158   """
4159   _OP_REQP = ["instance_name"]
4160   REQ_BGL = False
4161
4162   def ExpandNames(self):
4163     self._ExpandAndLockInstance()
4164
4165   def CheckPrereq(self):
4166     """Check prerequisites.
4167
4168     This checks that the instance is in the cluster.
4169
4170     """
4171     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4172     assert self.instance is not None, \
4173       "Cannot retrieve locked instance %s" % self.op.instance_name
4174     _CheckNodeOnline(self, self.op.primary_node)
4175
4176   def Exec(self, feedback_fn):
4177     """Connect to the console of an instance
4178
4179     """
4180     instance = self.instance
4181     node = instance.primary_node
4182
4183     node_insts = self.rpc.call_instance_list([node],
4184                                              [instance.hypervisor])[node]
4185     node_insts.Raise()
4186
4187     if instance.name not in node_insts.data:
4188       raise errors.OpExecError("Instance %s is not running." % instance.name)
4189
4190     logging.debug("Connecting to console of %s on %s", instance.name, node)
4191
4192     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4193     console_cmd = hyper.GetShellCommandForConsole(instance)
4194
4195     # build ssh cmdline
4196     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4197
4198
4199 class LUReplaceDisks(LogicalUnit):
4200   """Replace the disks of an instance.
4201
4202   """
4203   HPATH = "mirrors-replace"
4204   HTYPE = constants.HTYPE_INSTANCE
4205   _OP_REQP = ["instance_name", "mode", "disks"]
4206   REQ_BGL = False
4207
4208   def ExpandNames(self):
4209     self._ExpandAndLockInstance()
4210
4211     if not hasattr(self.op, "remote_node"):
4212       self.op.remote_node = None
4213
4214     ia_name = getattr(self.op, "iallocator", None)
4215     if ia_name is not None:
4216       if self.op.remote_node is not None:
4217         raise errors.OpPrereqError("Give either the iallocator or the new"
4218                                    " secondary, not both")
4219       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4220     elif self.op.remote_node is not None:
4221       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4222       if remote_node is None:
4223         raise errors.OpPrereqError("Node '%s' not known" %
4224                                    self.op.remote_node)
4225       self.op.remote_node = remote_node
4226       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4227       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4228     else:
4229       self.needed_locks[locking.LEVEL_NODE] = []
4230       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4231
4232   def DeclareLocks(self, level):
4233     # If we're not already locking all nodes in the set we have to declare the
4234     # instance's primary/secondary nodes.
4235     if (level == locking.LEVEL_NODE and
4236         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4237       self._LockInstancesNodes()
4238
4239   def _RunAllocator(self):
4240     """Compute a new secondary node using an IAllocator.
4241
4242     """
4243     ial = IAllocator(self,
4244                      mode=constants.IALLOCATOR_MODE_RELOC,
4245                      name=self.op.instance_name,
4246                      relocate_from=[self.sec_node])
4247
4248     ial.Run(self.op.iallocator)
4249
4250     if not ial.success:
4251       raise errors.OpPrereqError("Can't compute nodes using"
4252                                  " iallocator '%s': %s" % (self.op.iallocator,
4253                                                            ial.info))
4254     if len(ial.nodes) != ial.required_nodes:
4255       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4256                                  " of nodes (%s), required %s" %
4257                                  (len(ial.nodes), ial.required_nodes))
4258     self.op.remote_node = ial.nodes[0]
4259     self.LogInfo("Selected new secondary for the instance: %s",
4260                  self.op.remote_node)
4261
4262   def BuildHooksEnv(self):
4263     """Build hooks env.
4264
4265     This runs on the master, the primary and all the secondaries.
4266
4267     """
4268     env = {
4269       "MODE": self.op.mode,
4270       "NEW_SECONDARY": self.op.remote_node,
4271       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4272       }
4273     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4274     nl = [
4275       self.cfg.GetMasterNode(),
4276       self.instance.primary_node,
4277       ]
4278     if self.op.remote_node is not None:
4279       nl.append(self.op.remote_node)
4280     return env, nl, nl
4281
4282   def CheckPrereq(self):
4283     """Check prerequisites.
4284
4285     This checks that the instance is in the cluster.
4286
4287     """
4288     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4289     assert instance is not None, \
4290       "Cannot retrieve locked instance %s" % self.op.instance_name
4291     self.instance = instance
4292
4293     if instance.disk_template not in constants.DTS_NET_MIRROR:
4294       raise errors.OpPrereqError("Instance's disk layout is not"
4295                                  " network mirrored.")
4296
4297     if len(instance.secondary_nodes) != 1:
4298       raise errors.OpPrereqError("The instance has a strange layout,"
4299                                  " expected one secondary but found %d" %
4300                                  len(instance.secondary_nodes))
4301
4302     self.sec_node = instance.secondary_nodes[0]
4303
4304     ia_name = getattr(self.op, "iallocator", None)
4305     if ia_name is not None:
4306       self._RunAllocator()
4307
4308     remote_node = self.op.remote_node
4309     if remote_node is not None:
4310       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4311       assert self.remote_node_info is not None, \
4312         "Cannot retrieve locked node %s" % remote_node
4313     else:
4314       self.remote_node_info = None
4315     if remote_node == instance.primary_node:
4316       raise errors.OpPrereqError("The specified node is the primary node of"
4317                                  " the instance.")
4318     elif remote_node == self.sec_node:
4319       if self.op.mode == constants.REPLACE_DISK_SEC:
4320         # this is for DRBD8, where we can't execute the same mode of
4321         # replacement as for drbd7 (no different port allocated)
4322         raise errors.OpPrereqError("Same secondary given, cannot execute"
4323                                    " replacement")
4324     if instance.disk_template == constants.DT_DRBD8:
4325       if (self.op.mode == constants.REPLACE_DISK_ALL and
4326           remote_node is not None):
4327         # switch to replace secondary mode
4328         self.op.mode = constants.REPLACE_DISK_SEC
4329
4330       if self.op.mode == constants.REPLACE_DISK_ALL:
4331         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4332                                    " secondary disk replacement, not"
4333                                    " both at once")
4334       elif self.op.mode == constants.REPLACE_DISK_PRI:
4335         if remote_node is not None:
4336           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4337                                      " the secondary while doing a primary"
4338                                      " node disk replacement")
4339         self.tgt_node = instance.primary_node
4340         self.oth_node = instance.secondary_nodes[0]
4341         _CheckNodeOnline(self, self.tgt_node)
4342         _CheckNodeOnline(self, self.oth_node)
4343       elif self.op.mode == constants.REPLACE_DISK_SEC:
4344         self.new_node = remote_node # this can be None, in which case
4345                                     # we don't change the secondary
4346         self.tgt_node = instance.secondary_nodes[0]
4347         self.oth_node = instance.primary_node
4348         _CheckNodeOnline(self, self.oth_node)
4349         if self.new_node is not None:
4350           _CheckNodeOnline(self, self.new_node)
4351         else:
4352           _CheckNodeOnline(self, self.tgt_node)
4353       else:
4354         raise errors.ProgrammerError("Unhandled disk replace mode")
4355
4356     if not self.op.disks:
4357       self.op.disks = range(len(instance.disks))
4358
4359     for disk_idx in self.op.disks:
4360       instance.FindDisk(disk_idx)
4361
4362   def _ExecD8DiskOnly(self, feedback_fn):
4363     """Replace a disk on the primary or secondary for dbrd8.
4364
4365     The algorithm for replace is quite complicated:
4366
4367       1. for each disk to be replaced:
4368
4369         1. create new LVs on the target node with unique names
4370         1. detach old LVs from the drbd device
4371         1. rename old LVs to name_replaced.<time_t>
4372         1. rename new LVs to old LVs
4373         1. attach the new LVs (with the old names now) to the drbd device
4374
4375       1. wait for sync across all devices
4376
4377       1. for each modified disk:
4378
4379         1. remove old LVs (which have the name name_replaces.<time_t>)
4380
4381     Failures are not very well handled.
4382
4383     """
4384     steps_total = 6
4385     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4386     instance = self.instance
4387     iv_names = {}
4388     vgname = self.cfg.GetVGName()
4389     # start of work
4390     cfg = self.cfg
4391     tgt_node = self.tgt_node
4392     oth_node = self.oth_node
4393
4394     # Step: check device activation
4395     self.proc.LogStep(1, steps_total, "check device existence")
4396     info("checking volume groups")
4397     my_vg = cfg.GetVGName()
4398     results = self.rpc.call_vg_list([oth_node, tgt_node])
4399     if not results:
4400       raise errors.OpExecError("Can't list volume groups on the nodes")
4401     for node in oth_node, tgt_node:
4402       res = results[node]
4403       if res.failed or not res.data or my_vg not in res.data:
4404         raise errors.OpExecError("Volume group '%s' not found on %s" %
4405                                  (my_vg, node))
4406     for idx, dev in enumerate(instance.disks):
4407       if idx not in self.op.disks:
4408         continue
4409       for node in tgt_node, oth_node:
4410         info("checking disk/%d on %s" % (idx, node))
4411         cfg.SetDiskID(dev, node)
4412         if not self.rpc.call_blockdev_find(node, dev):
4413           raise errors.OpExecError("Can't find disk/%d on node %s" %
4414                                    (idx, node))
4415
4416     # Step: check other node consistency
4417     self.proc.LogStep(2, steps_total, "check peer consistency")
4418     for idx, dev in enumerate(instance.disks):
4419       if idx not in self.op.disks:
4420         continue
4421       info("checking disk/%d consistency on %s" % (idx, oth_node))
4422       if not _CheckDiskConsistency(self, dev, oth_node,
4423                                    oth_node==instance.primary_node):
4424         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4425                                  " to replace disks on this node (%s)" %
4426                                  (oth_node, tgt_node))
4427
4428     # Step: create new storage
4429     self.proc.LogStep(3, steps_total, "allocate new storage")
4430     for idx, dev in enumerate(instance.disks):
4431       if idx not in self.op.disks:
4432         continue
4433       size = dev.size
4434       cfg.SetDiskID(dev, tgt_node)
4435       lv_names = [".disk%d_%s" % (idx, suf)
4436                   for suf in ["data", "meta"]]
4437       names = _GenerateUniqueNames(self, lv_names)
4438       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4439                              logical_id=(vgname, names[0]))
4440       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4441                              logical_id=(vgname, names[1]))
4442       new_lvs = [lv_data, lv_meta]
4443       old_lvs = dev.children
4444       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4445       info("creating new local storage on %s for %s" %
4446            (tgt_node, dev.iv_name))
4447       # since we *always* want to create this LV, we use the
4448       # _Create...OnPrimary (which forces the creation), even if we
4449       # are talking about the secondary node
4450       for new_lv in new_lvs:
4451         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4452                                         _GetInstanceInfoText(instance)):
4453           raise errors.OpExecError("Failed to create new LV named '%s' on"
4454                                    " node '%s'" %
4455                                    (new_lv.logical_id[1], tgt_node))
4456
4457     # Step: for each lv, detach+rename*2+attach
4458     self.proc.LogStep(4, steps_total, "change drbd configuration")
4459     for dev, old_lvs, new_lvs in iv_names.itervalues():
4460       info("detaching %s drbd from local storage" % dev.iv_name)
4461       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4462       result.Raise()
4463       if not result.data:
4464         raise errors.OpExecError("Can't detach drbd from local storage on node"
4465                                  " %s for device %s" % (tgt_node, dev.iv_name))
4466       #dev.children = []
4467       #cfg.Update(instance)
4468
4469       # ok, we created the new LVs, so now we know we have the needed
4470       # storage; as such, we proceed on the target node to rename
4471       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4472       # using the assumption that logical_id == physical_id (which in
4473       # turn is the unique_id on that node)
4474
4475       # FIXME(iustin): use a better name for the replaced LVs
4476       temp_suffix = int(time.time())
4477       ren_fn = lambda d, suff: (d.physical_id[0],
4478                                 d.physical_id[1] + "_replaced-%s" % suff)
4479       # build the rename list based on what LVs exist on the node
4480       rlist = []
4481       for to_ren in old_lvs:
4482         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4483         if not find_res.failed and find_res.data is not None: # device exists
4484           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4485
4486       info("renaming the old LVs on the target node")
4487       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4488       result.Raise()
4489       if not result.data:
4490         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4491       # now we rename the new LVs to the old LVs
4492       info("renaming the new LVs on the target node")
4493       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4494       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4495       result.Raise()
4496       if not result.data:
4497         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4498
4499       for old, new in zip(old_lvs, new_lvs):
4500         new.logical_id = old.logical_id
4501         cfg.SetDiskID(new, tgt_node)
4502
4503       for disk in old_lvs:
4504         disk.logical_id = ren_fn(disk, temp_suffix)
4505         cfg.SetDiskID(disk, tgt_node)
4506
4507       # now that the new lvs have the old name, we can add them to the device
4508       info("adding new mirror component on %s" % tgt_node)
4509       result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4510       if result.failed or not result.data:
4511         for new_lv in new_lvs:
4512           result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4513           if result.failed or not result.data:
4514             warning("Can't rollback device %s", hint="manually cleanup unused"
4515                     " logical volumes")
4516         raise errors.OpExecError("Can't add local storage to drbd")
4517
4518       dev.children = new_lvs
4519       cfg.Update(instance)
4520
4521     # Step: wait for sync
4522
4523     # this can fail as the old devices are degraded and _WaitForSync
4524     # does a combined result over all disks, so we don't check its
4525     # return value
4526     self.proc.LogStep(5, steps_total, "sync devices")
4527     _WaitForSync(self, instance, unlock=True)
4528
4529     # so check manually all the devices
4530     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4531       cfg.SetDiskID(dev, instance.primary_node)
4532       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4533       if result.failed or result.data[5]:
4534         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4535
4536     # Step: remove old storage
4537     self.proc.LogStep(6, steps_total, "removing old storage")
4538     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4539       info("remove logical volumes for %s" % name)
4540       for lv in old_lvs:
4541         cfg.SetDiskID(lv, tgt_node)
4542         result = self.rpc.call_blockdev_remove(tgt_node, lv)
4543         if result.failed or not result.data:
4544           warning("Can't remove old LV", hint="manually remove unused LVs")
4545           continue
4546
4547   def _ExecD8Secondary(self, feedback_fn):
4548     """Replace the secondary node for drbd8.
4549
4550     The algorithm for replace is quite complicated:
4551       - for all disks of the instance:
4552         - create new LVs on the new node with same names
4553         - shutdown the drbd device on the old secondary
4554         - disconnect the drbd network on the primary
4555         - create the drbd device on the new secondary
4556         - network attach the drbd on the primary, using an artifice:
4557           the drbd code for Attach() will connect to the network if it
4558           finds a device which is connected to the good local disks but
4559           not network enabled
4560       - wait for sync across all devices
4561       - remove all disks from the old secondary
4562
4563     Failures are not very well handled.
4564
4565     """
4566     steps_total = 6
4567     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4568     instance = self.instance
4569     iv_names = {}
4570     vgname = self.cfg.GetVGName()
4571     # start of work
4572     cfg = self.cfg
4573     old_node = self.tgt_node
4574     new_node = self.new_node
4575     pri_node = instance.primary_node
4576
4577     # Step: check device activation
4578     self.proc.LogStep(1, steps_total, "check device existence")
4579     info("checking volume groups")
4580     my_vg = cfg.GetVGName()
4581     results = self.rpc.call_vg_list([pri_node, new_node])
4582     for node in pri_node, new_node:
4583       res = results[node]
4584       if res.failed or not res.data or my_vg not in res.data:
4585         raise errors.OpExecError("Volume group '%s' not found on %s" %
4586                                  (my_vg, node))
4587     for idx, dev in enumerate(instance.disks):
4588       if idx not in self.op.disks:
4589         continue
4590       info("checking disk/%d on %s" % (idx, pri_node))
4591       cfg.SetDiskID(dev, pri_node)
4592       result = self.rpc.call_blockdev_find(pri_node, dev)
4593       result.Raise()
4594       if not result.data:
4595         raise errors.OpExecError("Can't find disk/%d on node %s" %
4596                                  (idx, pri_node))
4597
4598     # Step: check other node consistency
4599     self.proc.LogStep(2, steps_total, "check peer consistency")
4600     for idx, dev in enumerate(instance.disks):
4601       if idx not in self.op.disks:
4602         continue
4603       info("checking disk/%d consistency on %s" % (idx, pri_node))
4604       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4605         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4606                                  " unsafe to replace the secondary" %
4607                                  pri_node)
4608
4609     # Step: create new storage
4610     self.proc.LogStep(3, steps_total, "allocate new storage")
4611     for idx, dev in enumerate(instance.disks):
4612       size = dev.size
4613       info("adding new local storage on %s for disk/%d" %
4614            (new_node, idx))
4615       # since we *always* want to create this LV, we use the
4616       # _Create...OnPrimary (which forces the creation), even if we
4617       # are talking about the secondary node
4618       for new_lv in dev.children:
4619         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4620                                         _GetInstanceInfoText(instance)):
4621           raise errors.OpExecError("Failed to create new LV named '%s' on"
4622                                    " node '%s'" %
4623                                    (new_lv.logical_id[1], new_node))
4624
4625     # Step 4: dbrd minors and drbd setups changes
4626     # after this, we must manually remove the drbd minors on both the
4627     # error and the success paths
4628     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4629                                    instance.name)
4630     logging.debug("Allocated minors %s" % (minors,))
4631     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4632     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4633       size = dev.size
4634       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4635       # create new devices on new_node
4636       if pri_node == dev.logical_id[0]:
4637         new_logical_id = (pri_node, new_node,
4638                           dev.logical_id[2], dev.logical_id[3], new_minor,
4639                           dev.logical_id[5])
4640       else:
4641         new_logical_id = (new_node, pri_node,
4642                           dev.logical_id[2], new_minor, dev.logical_id[4],
4643                           dev.logical_id[5])
4644       iv_names[idx] = (dev, dev.children, new_logical_id)
4645       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4646                     new_logical_id)
4647       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4648                               logical_id=new_logical_id,
4649                               children=dev.children)
4650       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4651                                         new_drbd, False,
4652                                         _GetInstanceInfoText(instance)):
4653         self.cfg.ReleaseDRBDMinors(instance.name)
4654         raise errors.OpExecError("Failed to create new DRBD on"
4655                                  " node '%s'" % new_node)
4656
4657     for idx, dev in enumerate(instance.disks):
4658       # we have new devices, shutdown the drbd on the old secondary
4659       info("shutting down drbd for disk/%d on old node" % idx)
4660       cfg.SetDiskID(dev, old_node)
4661       result = self.rpc.call_blockdev_shutdown(old_node, dev)
4662       if result.failed or not result.data:
4663         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4664                 hint="Please cleanup this device manually as soon as possible")
4665
4666     info("detaching primary drbds from the network (=> standalone)")
4667     done = 0
4668     for idx, dev in enumerate(instance.disks):
4669       cfg.SetDiskID(dev, pri_node)
4670       # set the network part of the physical (unique in bdev terms) id
4671       # to None, meaning detach from network
4672       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4673       # and 'find' the device, which will 'fix' it to match the
4674       # standalone state
4675       result = self.rpc.call_blockdev_find(pri_node, dev)
4676       if not result.failed and result.data:
4677         done += 1
4678       else:
4679         warning("Failed to detach drbd disk/%d from network, unusual case" %
4680                 idx)
4681
4682     if not done:
4683       # no detaches succeeded (very unlikely)
4684       self.cfg.ReleaseDRBDMinors(instance.name)
4685       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4686
4687     # if we managed to detach at least one, we update all the disks of
4688     # the instance to point to the new secondary
4689     info("updating instance configuration")
4690     for dev, _, new_logical_id in iv_names.itervalues():
4691       dev.logical_id = new_logical_id
4692       cfg.SetDiskID(dev, pri_node)
4693     cfg.Update(instance)
4694     # we can remove now the temp minors as now the new values are
4695     # written to the config file (and therefore stable)
4696     self.cfg.ReleaseDRBDMinors(instance.name)
4697
4698     # and now perform the drbd attach
4699     info("attaching primary drbds to new secondary (standalone => connected)")
4700     failures = []
4701     for idx, dev in enumerate(instance.disks):
4702       info("attaching primary drbd for disk/%d to new secondary node" % idx)
4703       # since the attach is smart, it's enough to 'find' the device,
4704       # it will automatically activate the network, if the physical_id
4705       # is correct
4706       cfg.SetDiskID(dev, pri_node)
4707       logging.debug("Disk to attach: %s", dev)
4708       result = self.rpc.call_blockdev_find(pri_node, dev)
4709       if result.failed or not result.data:
4710         warning("can't attach drbd disk/%d to new secondary!" % idx,
4711                 "please do a gnt-instance info to see the status of disks")
4712
4713     # this can fail as the old devices are degraded and _WaitForSync
4714     # does a combined result over all disks, so we don't check its
4715     # return value
4716     self.proc.LogStep(5, steps_total, "sync devices")
4717     _WaitForSync(self, instance, unlock=True)
4718
4719     # so check manually all the devices
4720     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4721       cfg.SetDiskID(dev, pri_node)
4722       result = self.rpc.call_blockdev_find(pri_node, dev)
4723       result.Raise()
4724       if result.data[5]:
4725         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4726
4727     self.proc.LogStep(6, steps_total, "removing old storage")
4728     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4729       info("remove logical volumes for disk/%d" % idx)
4730       for lv in old_lvs:
4731         cfg.SetDiskID(lv, old_node)
4732         result = self.rpc.call_blockdev_remove(old_node, lv)
4733         if result.failed or not result.data:
4734           warning("Can't remove LV on old secondary",
4735                   hint="Cleanup stale volumes by hand")
4736
4737   def Exec(self, feedback_fn):
4738     """Execute disk replacement.
4739
4740     This dispatches the disk replacement to the appropriate handler.
4741
4742     """
4743     instance = self.instance
4744
4745     # Activate the instance disks if we're replacing them on a down instance
4746     if instance.status == "down":
4747       _StartInstanceDisks(self, instance, True)
4748
4749     if instance.disk_template == constants.DT_DRBD8:
4750       if self.op.remote_node is None:
4751         fn = self._ExecD8DiskOnly
4752       else:
4753         fn = self._ExecD8Secondary
4754     else:
4755       raise errors.ProgrammerError("Unhandled disk replacement case")
4756
4757     ret = fn(feedback_fn)
4758
4759     # Deactivate the instance disks if we're replacing them on a down instance
4760     if instance.status == "down":
4761       _SafeShutdownInstanceDisks(self, instance)
4762
4763     return ret
4764
4765
4766 class LUGrowDisk(LogicalUnit):
4767   """Grow a disk of an instance.
4768
4769   """
4770   HPATH = "disk-grow"
4771   HTYPE = constants.HTYPE_INSTANCE
4772   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4773   REQ_BGL = False
4774
4775   def ExpandNames(self):
4776     self._ExpandAndLockInstance()
4777     self.needed_locks[locking.LEVEL_NODE] = []
4778     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4779
4780   def DeclareLocks(self, level):
4781     if level == locking.LEVEL_NODE:
4782       self._LockInstancesNodes()
4783
4784   def BuildHooksEnv(self):
4785     """Build hooks env.
4786
4787     This runs on the master, the primary and all the secondaries.
4788
4789     """
4790     env = {
4791       "DISK": self.op.disk,
4792       "AMOUNT": self.op.amount,
4793       }
4794     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4795     nl = [
4796       self.cfg.GetMasterNode(),
4797       self.instance.primary_node,
4798       ]
4799     return env, nl, nl
4800
4801   def CheckPrereq(self):
4802     """Check prerequisites.
4803
4804     This checks that the instance is in the cluster.
4805
4806     """
4807     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4808     assert instance is not None, \
4809       "Cannot retrieve locked instance %s" % self.op.instance_name
4810     _CheckNodeOnline(self, instance.primary_node)
4811     for node in instance.secondary_nodes:
4812       _CheckNodeOnline(self, node)
4813
4814
4815     self.instance = instance
4816
4817     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4818       raise errors.OpPrereqError("Instance's disk layout does not support"
4819                                  " growing.")
4820
4821     self.disk = instance.FindDisk(self.op.disk)
4822
4823     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4824     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4825                                        instance.hypervisor)
4826     for node in nodenames:
4827       info = nodeinfo[node]
4828       if info.failed or not info.data:
4829         raise errors.OpPrereqError("Cannot get current information"
4830                                    " from node '%s'" % node)
4831       vg_free = info.data.get('vg_free', None)
4832       if not isinstance(vg_free, int):
4833         raise errors.OpPrereqError("Can't compute free disk space on"
4834                                    " node %s" % node)
4835       if self.op.amount > vg_free:
4836         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4837                                    " %d MiB available, %d MiB required" %
4838                                    (node, vg_free, self.op.amount))
4839
4840   def Exec(self, feedback_fn):
4841     """Execute disk grow.
4842
4843     """
4844     instance = self.instance
4845     disk = self.disk
4846     for node in (instance.secondary_nodes + (instance.primary_node,)):
4847       self.cfg.SetDiskID(disk, node)
4848       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4849       result.Raise()
4850       if (not result.data or not isinstance(result.data, (list, tuple)) or
4851           len(result.data) != 2):
4852         raise errors.OpExecError("Grow request failed to node %s" % node)
4853       elif not result.data[0]:
4854         raise errors.OpExecError("Grow request failed to node %s: %s" %
4855                                  (node, result.data[1]))
4856     disk.RecordGrow(self.op.amount)
4857     self.cfg.Update(instance)
4858     if self.op.wait_for_sync:
4859       disk_abort = not _WaitForSync(self, instance)
4860       if disk_abort:
4861         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4862                              " status.\nPlease check the instance.")
4863
4864
4865 class LUQueryInstanceData(NoHooksLU):
4866   """Query runtime instance data.
4867
4868   """
4869   _OP_REQP = ["instances", "static"]
4870   REQ_BGL = False
4871
4872   def ExpandNames(self):
4873     self.needed_locks = {}
4874     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4875
4876     if not isinstance(self.op.instances, list):
4877       raise errors.OpPrereqError("Invalid argument type 'instances'")
4878
4879     if self.op.instances:
4880       self.wanted_names = []
4881       for name in self.op.instances:
4882         full_name = self.cfg.ExpandInstanceName(name)
4883         if full_name is None:
4884           raise errors.OpPrereqError("Instance '%s' not known" %
4885                                      self.op.instance_name)
4886         self.wanted_names.append(full_name)
4887       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4888     else:
4889       self.wanted_names = None
4890       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4891
4892     self.needed_locks[locking.LEVEL_NODE] = []
4893     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4894
4895   def DeclareLocks(self, level):
4896     if level == locking.LEVEL_NODE:
4897       self._LockInstancesNodes()
4898
4899   def CheckPrereq(self):
4900     """Check prerequisites.
4901
4902     This only checks the optional instance list against the existing names.
4903
4904     """
4905     if self.wanted_names is None:
4906       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4907
4908     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4909                              in self.wanted_names]
4910     return
4911
4912   def _ComputeDiskStatus(self, instance, snode, dev):
4913     """Compute block device status.
4914
4915     """
4916     static = self.op.static
4917     if not static:
4918       self.cfg.SetDiskID(dev, instance.primary_node)
4919       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4920       dev_pstatus.Raise()
4921       dev_pstatus = dev_pstatus.data
4922     else:
4923       dev_pstatus = None
4924
4925     if dev.dev_type in constants.LDS_DRBD:
4926       # we change the snode then (otherwise we use the one passed in)
4927       if dev.logical_id[0] == instance.primary_node:
4928         snode = dev.logical_id[1]
4929       else:
4930         snode = dev.logical_id[0]
4931
4932     if snode and not static:
4933       self.cfg.SetDiskID(dev, snode)
4934       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4935       dev_sstatus.Raise()
4936       dev_sstatus = dev_sstatus.data
4937     else:
4938       dev_sstatus = None
4939
4940     if dev.children:
4941       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4942                       for child in dev.children]
4943     else:
4944       dev_children = []
4945
4946     data = {
4947       "iv_name": dev.iv_name,
4948       "dev_type": dev.dev_type,
4949       "logical_id": dev.logical_id,
4950       "physical_id": dev.physical_id,
4951       "pstatus": dev_pstatus,
4952       "sstatus": dev_sstatus,
4953       "children": dev_children,
4954       "mode": dev.mode,
4955       }
4956
4957     return data
4958
4959   def Exec(self, feedback_fn):
4960     """Gather and return data"""
4961     result = {}
4962
4963     cluster = self.cfg.GetClusterInfo()
4964
4965     for instance in self.wanted_instances:
4966       if not self.op.static:
4967         remote_info = self.rpc.call_instance_info(instance.primary_node,
4968                                                   instance.name,
4969                                                   instance.hypervisor)
4970         remote_info.Raise()
4971         remote_info = remote_info.data
4972         if remote_info and "state" in remote_info:
4973           remote_state = "up"
4974         else:
4975           remote_state = "down"
4976       else:
4977         remote_state = None
4978       if instance.status == "down":
4979         config_state = "down"
4980       else:
4981         config_state = "up"
4982
4983       disks = [self._ComputeDiskStatus(instance, None, device)
4984                for device in instance.disks]
4985
4986       idict = {
4987         "name": instance.name,
4988         "config_state": config_state,
4989         "run_state": remote_state,
4990         "pnode": instance.primary_node,
4991         "snodes": instance.secondary_nodes,
4992         "os": instance.os,
4993         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4994         "disks": disks,
4995         "hypervisor": instance.hypervisor,
4996         "network_port": instance.network_port,
4997         "hv_instance": instance.hvparams,
4998         "hv_actual": cluster.FillHV(instance),
4999         "be_instance": instance.beparams,
5000         "be_actual": cluster.FillBE(instance),
5001         }
5002
5003       result[instance.name] = idict
5004
5005     return result
5006
5007
5008 class LUSetInstanceParams(LogicalUnit):
5009   """Modifies an instances's parameters.
5010
5011   """
5012   HPATH = "instance-modify"
5013   HTYPE = constants.HTYPE_INSTANCE
5014   _OP_REQP = ["instance_name"]
5015   REQ_BGL = False
5016
5017   def CheckArguments(self):
5018     if not hasattr(self.op, 'nics'):
5019       self.op.nics = []
5020     if not hasattr(self.op, 'disks'):
5021       self.op.disks = []
5022     if not hasattr(self.op, 'beparams'):
5023       self.op.beparams = {}
5024     if not hasattr(self.op, 'hvparams'):
5025       self.op.hvparams = {}
5026     self.op.force = getattr(self.op, "force", False)
5027     if not (self.op.nics or self.op.disks or
5028             self.op.hvparams or self.op.beparams):
5029       raise errors.OpPrereqError("No changes submitted")
5030
5031     utils.CheckBEParams(self.op.beparams)
5032
5033     # Disk validation
5034     disk_addremove = 0
5035     for disk_op, disk_dict in self.op.disks:
5036       if disk_op == constants.DDM_REMOVE:
5037         disk_addremove += 1
5038         continue
5039       elif disk_op == constants.DDM_ADD:
5040         disk_addremove += 1
5041       else:
5042         if not isinstance(disk_op, int):
5043           raise errors.OpPrereqError("Invalid disk index")
5044       if disk_op == constants.DDM_ADD:
5045         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5046         if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
5047           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5048         size = disk_dict.get('size', None)
5049         if size is None:
5050           raise errors.OpPrereqError("Required disk parameter size missing")
5051         try:
5052           size = int(size)
5053         except ValueError, err:
5054           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5055                                      str(err))
5056         disk_dict['size'] = size
5057       else:
5058         # modification of disk
5059         if 'size' in disk_dict:
5060           raise errors.OpPrereqError("Disk size change not possible, use"
5061                                      " grow-disk")
5062
5063     if disk_addremove > 1:
5064       raise errors.OpPrereqError("Only one disk add or remove operation"
5065                                  " supported at a time")
5066
5067     # NIC validation
5068     nic_addremove = 0
5069     for nic_op, nic_dict in self.op.nics:
5070       if nic_op == constants.DDM_REMOVE:
5071         nic_addremove += 1
5072         continue
5073       elif nic_op == constants.DDM_ADD:
5074         nic_addremove += 1
5075       else:
5076         if not isinstance(nic_op, int):
5077           raise errors.OpPrereqError("Invalid nic index")
5078
5079       # nic_dict should be a dict
5080       nic_ip = nic_dict.get('ip', None)
5081       if nic_ip is not None:
5082         if nic_ip.lower() == "none":
5083           nic_dict['ip'] = None
5084         else:
5085           if not utils.IsValidIP(nic_ip):
5086             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5087       # we can only check None bridges and assign the default one
5088       nic_bridge = nic_dict.get('bridge', None)
5089       if nic_bridge is None:
5090         nic_dict['bridge'] = self.cfg.GetDefBridge()
5091       # but we can validate MACs
5092       nic_mac = nic_dict.get('mac', None)
5093       if nic_mac is not None:
5094         if self.cfg.IsMacInUse(nic_mac):
5095           raise errors.OpPrereqError("MAC address %s already in use"
5096                                      " in cluster" % nic_mac)
5097         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5098           if not utils.IsValidMac(nic_mac):
5099             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5100     if nic_addremove > 1:
5101       raise errors.OpPrereqError("Only one NIC add or remove operation"
5102                                  " supported at a time")
5103
5104   def ExpandNames(self):
5105     self._ExpandAndLockInstance()
5106     self.needed_locks[locking.LEVEL_NODE] = []
5107     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5108
5109   def DeclareLocks(self, level):
5110     if level == locking.LEVEL_NODE:
5111       self._LockInstancesNodes()
5112
5113   def BuildHooksEnv(self):
5114     """Build hooks env.
5115
5116     This runs on the master, primary and secondaries.
5117
5118     """
5119     args = dict()
5120     if constants.BE_MEMORY in self.be_new:
5121       args['memory'] = self.be_new[constants.BE_MEMORY]
5122     if constants.BE_VCPUS in self.be_new:
5123       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5124     # FIXME: readd disk/nic changes
5125     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5126     nl = [self.cfg.GetMasterNode(),
5127           self.instance.primary_node] + list(self.instance.secondary_nodes)
5128     return env, nl, nl
5129
5130   def CheckPrereq(self):
5131     """Check prerequisites.
5132
5133     This only checks the instance list against the existing names.
5134
5135     """
5136     force = self.force = self.op.force
5137
5138     # checking the new params on the primary/secondary nodes
5139
5140     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5141     assert self.instance is not None, \
5142       "Cannot retrieve locked instance %s" % self.op.instance_name
5143     pnode = self.instance.primary_node
5144     nodelist = [pnode]
5145     nodelist.extend(instance.secondary_nodes)
5146
5147     # hvparams processing
5148     if self.op.hvparams:
5149       i_hvdict = copy.deepcopy(instance.hvparams)
5150       for key, val in self.op.hvparams.iteritems():
5151         if val == constants.VALUE_DEFAULT:
5152           try:
5153             del i_hvdict[key]
5154           except KeyError:
5155             pass
5156         elif val == constants.VALUE_NONE:
5157           i_hvdict[key] = None
5158         else:
5159           i_hvdict[key] = val
5160       cluster = self.cfg.GetClusterInfo()
5161       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5162                                 i_hvdict)
5163       # local check
5164       hypervisor.GetHypervisor(
5165         instance.hypervisor).CheckParameterSyntax(hv_new)
5166       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5167       self.hv_new = hv_new # the new actual values
5168       self.hv_inst = i_hvdict # the new dict (without defaults)
5169     else:
5170       self.hv_new = self.hv_inst = {}
5171
5172     # beparams processing
5173     if self.op.beparams:
5174       i_bedict = copy.deepcopy(instance.beparams)
5175       for key, val in self.op.beparams.iteritems():
5176         if val == constants.VALUE_DEFAULT:
5177           try:
5178             del i_bedict[key]
5179           except KeyError:
5180             pass
5181         else:
5182           i_bedict[key] = val
5183       cluster = self.cfg.GetClusterInfo()
5184       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5185                                 i_bedict)
5186       self.be_new = be_new # the new actual values
5187       self.be_inst = i_bedict # the new dict (without defaults)
5188     else:
5189       self.be_new = self.be_inst = {}
5190
5191     self.warn = []
5192
5193     if constants.BE_MEMORY in self.op.beparams and not self.force:
5194       mem_check_list = [pnode]
5195       if be_new[constants.BE_AUTO_BALANCE]:
5196         # either we changed auto_balance to yes or it was from before
5197         mem_check_list.extend(instance.secondary_nodes)
5198       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5199                                                   instance.hypervisor)
5200       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5201                                          instance.hypervisor)
5202       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5203         # Assume the primary node is unreachable and go ahead
5204         self.warn.append("Can't get info from primary node %s" % pnode)
5205       else:
5206         if not instance_info.failed and instance_info.data:
5207           current_mem = instance_info.data['memory']
5208         else:
5209           # Assume instance not running
5210           # (there is a slight race condition here, but it's not very probable,
5211           # and we have no other way to check)
5212           current_mem = 0
5213         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5214                     nodeinfo[pnode].data['memory_free'])
5215         if miss_mem > 0:
5216           raise errors.OpPrereqError("This change will prevent the instance"
5217                                      " from starting, due to %d MB of memory"
5218                                      " missing on its primary node" % miss_mem)
5219
5220       if be_new[constants.BE_AUTO_BALANCE]:
5221         for node, nres in instance.secondary_nodes.iteritems():
5222           if nres.failed or not isinstance(nres.data, dict):
5223             self.warn.append("Can't get info from secondary node %s" % node)
5224           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5225             self.warn.append("Not enough memory to failover instance to"
5226                              " secondary node %s" % node)
5227
5228     # NIC processing
5229     for nic_op, nic_dict in self.op.nics:
5230       if nic_op == constants.DDM_REMOVE:
5231         if not instance.nics:
5232           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5233         continue
5234       if nic_op != constants.DDM_ADD:
5235         # an existing nic
5236         if nic_op < 0 or nic_op >= len(instance.nics):
5237           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5238                                      " are 0 to %d" %
5239                                      (nic_op, len(instance.nics)))
5240       nic_bridge = nic_dict.get('bridge', None)
5241       if nic_bridge is not None:
5242         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5243           msg = ("Bridge '%s' doesn't exist on one of"
5244                  " the instance nodes" % nic_bridge)
5245           if self.force:
5246             self.warn.append(msg)
5247           else:
5248             raise errors.OpPrereqError(msg)
5249
5250     # DISK processing
5251     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5252       raise errors.OpPrereqError("Disk operations not supported for"
5253                                  " diskless instances")
5254     for disk_op, disk_dict in self.op.disks:
5255       if disk_op == constants.DDM_REMOVE:
5256         if len(instance.disks) == 1:
5257           raise errors.OpPrereqError("Cannot remove the last disk of"
5258                                      " an instance")
5259         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5260         ins_l = ins_l[pnode]
5261         if not type(ins_l) is list:
5262           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5263         if instance.name in ins_l:
5264           raise errors.OpPrereqError("Instance is running, can't remove"
5265                                      " disks.")
5266
5267       if (disk_op == constants.DDM_ADD and
5268           len(instance.nics) >= constants.MAX_DISKS):
5269         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5270                                    " add more" % constants.MAX_DISKS)
5271       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5272         # an existing disk
5273         if disk_op < 0 or disk_op >= len(instance.disks):
5274           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5275                                      " are 0 to %d" %
5276                                      (disk_op, len(instance.disks)))
5277
5278     return
5279
5280   def Exec(self, feedback_fn):
5281     """Modifies an instance.
5282
5283     All parameters take effect only at the next restart of the instance.
5284
5285     """
5286     # Process here the warnings from CheckPrereq, as we don't have a
5287     # feedback_fn there.
5288     for warn in self.warn:
5289       feedback_fn("WARNING: %s" % warn)
5290
5291     result = []
5292     instance = self.instance
5293     # disk changes
5294     for disk_op, disk_dict in self.op.disks:
5295       if disk_op == constants.DDM_REMOVE:
5296         # remove the last disk
5297         device = instance.disks.pop()
5298         device_idx = len(instance.disks)
5299         for node, disk in device.ComputeNodeTree(instance.primary_node):
5300           self.cfg.SetDiskID(disk, node)
5301           result = self.rpc.call_blockdev_remove(node, disk)
5302           if result.failed or not result.data:
5303             self.proc.LogWarning("Could not remove disk/%d on node %s,"
5304                                  " continuing anyway", device_idx, node)
5305         result.append(("disk/%d" % device_idx, "remove"))
5306       elif disk_op == constants.DDM_ADD:
5307         # add a new disk
5308         if instance.disk_template == constants.DT_FILE:
5309           file_driver, file_path = instance.disks[0].logical_id
5310           file_path = os.path.dirname(file_path)
5311         else:
5312           file_driver = file_path = None
5313         disk_idx_base = len(instance.disks)
5314         new_disk = _GenerateDiskTemplate(self,
5315                                          instance.disk_template,
5316                                          instance, instance.primary_node,
5317                                          instance.secondary_nodes,
5318                                          [disk_dict],
5319                                          file_path,
5320                                          file_driver,
5321                                          disk_idx_base)[0]
5322         new_disk.mode = disk_dict['mode']
5323         instance.disks.append(new_disk)
5324         info = _GetInstanceInfoText(instance)
5325
5326         logging.info("Creating volume %s for instance %s",
5327                      new_disk.iv_name, instance.name)
5328         # Note: this needs to be kept in sync with _CreateDisks
5329         #HARDCODE
5330         for secondary_node in instance.secondary_nodes:
5331           if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5332                                             new_disk, False, info):
5333             self.LogWarning("Failed to create volume %s (%s) on"
5334                             " secondary node %s!",
5335                             new_disk.iv_name, new_disk, secondary_node)
5336         #HARDCODE
5337         if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5338                                         instance, new_disk, info):
5339           self.LogWarning("Failed to create volume %s on primary!",
5340                           new_disk.iv_name)
5341         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5342                        (new_disk.size, new_disk.mode)))
5343       else:
5344         # change a given disk
5345         instance.disks[disk_op].mode = disk_dict['mode']
5346         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5347     # NIC changes
5348     for nic_op, nic_dict in self.op.nics:
5349       if nic_op == constants.DDM_REMOVE:
5350         # remove the last nic
5351         del instance.nics[-1]
5352         result.append(("nic.%d" % len(instance.nics), "remove"))
5353       elif nic_op == constants.DDM_ADD:
5354         # add a new nic
5355         if 'mac' not in nic_dict:
5356           mac = constants.VALUE_GENERATE
5357         else:
5358           mac = nic_dict['mac']
5359         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5360           mac = self.cfg.GenerateMAC()
5361         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5362                               bridge=nic_dict.get('bridge', None))
5363         instance.nics.append(new_nic)
5364         result.append(("nic.%d" % (len(instance.nics) - 1),
5365                        "add:mac=%s,ip=%s,bridge=%s" %
5366                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5367       else:
5368         # change a given nic
5369         for key in 'mac', 'ip', 'bridge':
5370           if key in nic_dict:
5371             setattr(instance.nics[nic_op], key, nic_dict[key])
5372             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5373
5374     # hvparams changes
5375     if self.op.hvparams:
5376       instance.hvparams = self.hv_new
5377       for key, val in self.op.hvparams.iteritems():
5378         result.append(("hv/%s" % key, val))
5379
5380     # beparams changes
5381     if self.op.beparams:
5382       instance.beparams = self.be_inst
5383       for key, val in self.op.beparams.iteritems():
5384         result.append(("be/%s" % key, val))
5385
5386     self.cfg.Update(instance)
5387
5388     return result
5389
5390
5391 class LUQueryExports(NoHooksLU):
5392   """Query the exports list
5393
5394   """
5395   _OP_REQP = ['nodes']
5396   REQ_BGL = False
5397
5398   def ExpandNames(self):
5399     self.needed_locks = {}
5400     self.share_locks[locking.LEVEL_NODE] = 1
5401     if not self.op.nodes:
5402       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5403     else:
5404       self.needed_locks[locking.LEVEL_NODE] = \
5405         _GetWantedNodes(self, self.op.nodes)
5406
5407   def CheckPrereq(self):
5408     """Check prerequisites.
5409
5410     """
5411     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5412
5413   def Exec(self, feedback_fn):
5414     """Compute the list of all the exported system images.
5415
5416     @rtype: dict
5417     @return: a dictionary with the structure node->(export-list)
5418         where export-list is a list of the instances exported on
5419         that node.
5420
5421     """
5422     rpcresult = self.rpc.call_export_list(self.nodes)
5423     result = {}
5424     for node in rpcresult:
5425       if rpcresult[node].failed:
5426         result[node] = False
5427       else:
5428         result[node] = rpcresult[node].data
5429
5430     return result
5431
5432
5433 class LUExportInstance(LogicalUnit):
5434   """Export an instance to an image in the cluster.
5435
5436   """
5437   HPATH = "instance-export"
5438   HTYPE = constants.HTYPE_INSTANCE
5439   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5440   REQ_BGL = False
5441
5442   def ExpandNames(self):
5443     self._ExpandAndLockInstance()
5444     # FIXME: lock only instance primary and destination node
5445     #
5446     # Sad but true, for now we have do lock all nodes, as we don't know where
5447     # the previous export might be, and and in this LU we search for it and
5448     # remove it from its current node. In the future we could fix this by:
5449     #  - making a tasklet to search (share-lock all), then create the new one,
5450     #    then one to remove, after
5451     #  - removing the removal operation altoghether
5452     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5453
5454   def DeclareLocks(self, level):
5455     """Last minute lock declaration."""
5456     # All nodes are locked anyway, so nothing to do here.
5457
5458   def BuildHooksEnv(self):
5459     """Build hooks env.
5460
5461     This will run on the master, primary node and target node.
5462
5463     """
5464     env = {
5465       "EXPORT_NODE": self.op.target_node,
5466       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5467       }
5468     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5469     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5470           self.op.target_node]
5471     return env, nl, nl
5472
5473   def CheckPrereq(self):
5474     """Check prerequisites.
5475
5476     This checks that the instance and node names are valid.
5477
5478     """
5479     instance_name = self.op.instance_name
5480     self.instance = self.cfg.GetInstanceInfo(instance_name)
5481     assert self.instance is not None, \
5482           "Cannot retrieve locked instance %s" % self.op.instance_name
5483     _CheckNodeOnline(self, instance.primary_node)
5484
5485     self.dst_node = self.cfg.GetNodeInfo(
5486       self.cfg.ExpandNodeName(self.op.target_node))
5487
5488     if self.dst_node is None:
5489       # This is wrong node name, not a non-locked node
5490       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5491     _CheckNodeOnline(self, self.op.target_node)
5492
5493     # instance disk type verification
5494     for disk in self.instance.disks:
5495       if disk.dev_type == constants.LD_FILE:
5496         raise errors.OpPrereqError("Export not supported for instances with"
5497                                    " file-based disks")
5498
5499   def Exec(self, feedback_fn):
5500     """Export an instance to an image in the cluster.
5501
5502     """
5503     instance = self.instance
5504     dst_node = self.dst_node
5505     src_node = instance.primary_node
5506     if self.op.shutdown:
5507       # shutdown the instance, but not the disks
5508       result = self.rpc.call_instance_shutdown(src_node, instance)
5509       result.Raise()
5510       if not result.data:
5511         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5512                                  (instance.name, src_node))
5513
5514     vgname = self.cfg.GetVGName()
5515
5516     snap_disks = []
5517
5518     try:
5519       for disk in instance.disks:
5520         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5521         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5522         if new_dev_name.failed or not new_dev_name.data:
5523           self.LogWarning("Could not snapshot block device %s on node %s",
5524                           disk.logical_id[1], src_node)
5525           snap_disks.append(False)
5526         else:
5527           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5528                                  logical_id=(vgname, new_dev_name.data),
5529                                  physical_id=(vgname, new_dev_name.data),
5530                                  iv_name=disk.iv_name)
5531           snap_disks.append(new_dev)
5532
5533     finally:
5534       if self.op.shutdown and instance.status == "up":
5535         result = self.rpc.call_instance_start(src_node, instance, None)
5536         if result.failed or not result.data:
5537           _ShutdownInstanceDisks(self, instance)
5538           raise errors.OpExecError("Could not start instance")
5539
5540     # TODO: check for size
5541
5542     cluster_name = self.cfg.GetClusterName()
5543     for idx, dev in enumerate(snap_disks):
5544       if dev:
5545         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5546                                                instance, cluster_name, idx)
5547         if result.failed or not result.data:
5548           self.LogWarning("Could not export block device %s from node %s to"
5549                           " node %s", dev.logical_id[1], src_node,
5550                           dst_node.name)
5551         result = self.rpc.call_blockdev_remove(src_node, dev)
5552         if result.failed or not result.data:
5553           self.LogWarning("Could not remove snapshot block device %s from node"
5554                           " %s", dev.logical_id[1], src_node)
5555
5556     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5557     if result.failed or not result.data:
5558       self.LogWarning("Could not finalize export for instance %s on node %s",
5559                       instance.name, dst_node.name)
5560
5561     nodelist = self.cfg.GetNodeList()
5562     nodelist.remove(dst_node.name)
5563
5564     # on one-node clusters nodelist will be empty after the removal
5565     # if we proceed the backup would be removed because OpQueryExports
5566     # substitutes an empty list with the full cluster node list.
5567     if nodelist:
5568       exportlist = self.rpc.call_export_list(nodelist)
5569       for node in exportlist:
5570         if exportlist[node].failed:
5571           continue
5572         if instance.name in exportlist[node].data:
5573           if not self.rpc.call_export_remove(node, instance.name):
5574             self.LogWarning("Could not remove older export for instance %s"
5575                             " on node %s", instance.name, node)
5576
5577
5578 class LURemoveExport(NoHooksLU):
5579   """Remove exports related to the named instance.
5580
5581   """
5582   _OP_REQP = ["instance_name"]
5583   REQ_BGL = False
5584
5585   def ExpandNames(self):
5586     self.needed_locks = {}
5587     # We need all nodes to be locked in order for RemoveExport to work, but we
5588     # don't need to lock the instance itself, as nothing will happen to it (and
5589     # we can remove exports also for a removed instance)
5590     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5591
5592   def CheckPrereq(self):
5593     """Check prerequisites.
5594     """
5595     pass
5596
5597   def Exec(self, feedback_fn):
5598     """Remove any export.
5599
5600     """
5601     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5602     # If the instance was not found we'll try with the name that was passed in.
5603     # This will only work if it was an FQDN, though.
5604     fqdn_warn = False
5605     if not instance_name:
5606       fqdn_warn = True
5607       instance_name = self.op.instance_name
5608
5609     exportlist = self.rpc.call_export_list(self.acquired_locks[
5610       locking.LEVEL_NODE])
5611     found = False
5612     for node in exportlist:
5613       if exportlist[node].failed:
5614         self.LogWarning("Failed to query node %s, continuing" % node)
5615         continue
5616       if instance_name in exportlist[node].data:
5617         found = True
5618         result = self.rpc.call_export_remove(node, instance_name)
5619         if result.failed or not result.data:
5620           logging.error("Could not remove export for instance %s"
5621                         " on node %s", instance_name, node)
5622
5623     if fqdn_warn and not found:
5624       feedback_fn("Export not found. If trying to remove an export belonging"
5625                   " to a deleted instance please use its Fully Qualified"
5626                   " Domain Name.")
5627
5628
5629 class TagsLU(NoHooksLU):
5630   """Generic tags LU.
5631
5632   This is an abstract class which is the parent of all the other tags LUs.
5633
5634   """
5635
5636   def ExpandNames(self):
5637     self.needed_locks = {}
5638     if self.op.kind == constants.TAG_NODE:
5639       name = self.cfg.ExpandNodeName(self.op.name)
5640       if name is None:
5641         raise errors.OpPrereqError("Invalid node name (%s)" %
5642                                    (self.op.name,))
5643       self.op.name = name
5644       self.needed_locks[locking.LEVEL_NODE] = name
5645     elif self.op.kind == constants.TAG_INSTANCE:
5646       name = self.cfg.ExpandInstanceName(self.op.name)
5647       if name is None:
5648         raise errors.OpPrereqError("Invalid instance name (%s)" %
5649                                    (self.op.name,))
5650       self.op.name = name
5651       self.needed_locks[locking.LEVEL_INSTANCE] = name
5652
5653   def CheckPrereq(self):
5654     """Check prerequisites.
5655
5656     """
5657     if self.op.kind == constants.TAG_CLUSTER:
5658       self.target = self.cfg.GetClusterInfo()
5659     elif self.op.kind == constants.TAG_NODE:
5660       self.target = self.cfg.GetNodeInfo(self.op.name)
5661     elif self.op.kind == constants.TAG_INSTANCE:
5662       self.target = self.cfg.GetInstanceInfo(self.op.name)
5663     else:
5664       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5665                                  str(self.op.kind))
5666
5667
5668 class LUGetTags(TagsLU):
5669   """Returns the tags of a given object.
5670
5671   """
5672   _OP_REQP = ["kind", "name"]
5673   REQ_BGL = False
5674
5675   def Exec(self, feedback_fn):
5676     """Returns the tag list.
5677
5678     """
5679     return list(self.target.GetTags())
5680
5681
5682 class LUSearchTags(NoHooksLU):
5683   """Searches the tags for a given pattern.
5684
5685   """
5686   _OP_REQP = ["pattern"]
5687   REQ_BGL = False
5688
5689   def ExpandNames(self):
5690     self.needed_locks = {}
5691
5692   def CheckPrereq(self):
5693     """Check prerequisites.
5694
5695     This checks the pattern passed for validity by compiling it.
5696
5697     """
5698     try:
5699       self.re = re.compile(self.op.pattern)
5700     except re.error, err:
5701       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5702                                  (self.op.pattern, err))
5703
5704   def Exec(self, feedback_fn):
5705     """Returns the tag list.
5706
5707     """
5708     cfg = self.cfg
5709     tgts = [("/cluster", cfg.GetClusterInfo())]
5710     ilist = cfg.GetAllInstancesInfo().values()
5711     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5712     nlist = cfg.GetAllNodesInfo().values()
5713     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5714     results = []
5715     for path, target in tgts:
5716       for tag in target.GetTags():
5717         if self.re.search(tag):
5718           results.append((path, tag))
5719     return results
5720
5721
5722 class LUAddTags(TagsLU):
5723   """Sets a tag on a given object.
5724
5725   """
5726   _OP_REQP = ["kind", "name", "tags"]
5727   REQ_BGL = False
5728
5729   def CheckPrereq(self):
5730     """Check prerequisites.
5731
5732     This checks the type and length of the tag name and value.
5733
5734     """
5735     TagsLU.CheckPrereq(self)
5736     for tag in self.op.tags:
5737       objects.TaggableObject.ValidateTag(tag)
5738
5739   def Exec(self, feedback_fn):
5740     """Sets the tag.
5741
5742     """
5743     try:
5744       for tag in self.op.tags:
5745         self.target.AddTag(tag)
5746     except errors.TagError, err:
5747       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5748     try:
5749       self.cfg.Update(self.target)
5750     except errors.ConfigurationError:
5751       raise errors.OpRetryError("There has been a modification to the"
5752                                 " config file and the operation has been"
5753                                 " aborted. Please retry.")
5754
5755
5756 class LUDelTags(TagsLU):
5757   """Delete a list of tags from a given object.
5758
5759   """
5760   _OP_REQP = ["kind", "name", "tags"]
5761   REQ_BGL = False
5762
5763   def CheckPrereq(self):
5764     """Check prerequisites.
5765
5766     This checks that we have the given tag.
5767
5768     """
5769     TagsLU.CheckPrereq(self)
5770     for tag in self.op.tags:
5771       objects.TaggableObject.ValidateTag(tag)
5772     del_tags = frozenset(self.op.tags)
5773     cur_tags = self.target.GetTags()
5774     if not del_tags <= cur_tags:
5775       diff_tags = del_tags - cur_tags
5776       diff_names = ["'%s'" % tag for tag in diff_tags]
5777       diff_names.sort()
5778       raise errors.OpPrereqError("Tag(s) %s not found" %
5779                                  (",".join(diff_names)))
5780
5781   def Exec(self, feedback_fn):
5782     """Remove the tag from the object.
5783
5784     """
5785     for tag in self.op.tags:
5786       self.target.RemoveTag(tag)
5787     try:
5788       self.cfg.Update(self.target)
5789     except errors.ConfigurationError:
5790       raise errors.OpRetryError("There has been a modification to the"
5791                                 " config file and the operation has been"
5792                                 " aborted. Please retry.")
5793
5794
5795 class LUTestDelay(NoHooksLU):
5796   """Sleep for a specified amount of time.
5797
5798   This LU sleeps on the master and/or nodes for a specified amount of
5799   time.
5800
5801   """
5802   _OP_REQP = ["duration", "on_master", "on_nodes"]
5803   REQ_BGL = False
5804
5805   def ExpandNames(self):
5806     """Expand names and set required locks.
5807
5808     This expands the node list, if any.
5809
5810     """
5811     self.needed_locks = {}
5812     if self.op.on_nodes:
5813       # _GetWantedNodes can be used here, but is not always appropriate to use
5814       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5815       # more information.
5816       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5817       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5818
5819   def CheckPrereq(self):
5820     """Check prerequisites.
5821
5822     """
5823
5824   def Exec(self, feedback_fn):
5825     """Do the actual sleep.
5826
5827     """
5828     if self.op.on_master:
5829       if not utils.TestDelay(self.op.duration):
5830         raise errors.OpExecError("Error during master delay test")
5831     if self.op.on_nodes:
5832       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5833       if not result:
5834         raise errors.OpExecError("Complete failure from rpc call")
5835       for node, node_result in result.items():
5836         node_result.Raise()
5837         if not node_result.data:
5838           raise errors.OpExecError("Failure during rpc call to node %s,"
5839                                    " result: %s" % (node, node_result.data))
5840
5841
5842 class IAllocator(object):
5843   """IAllocator framework.
5844
5845   An IAllocator instance has three sets of attributes:
5846     - cfg that is needed to query the cluster
5847     - input data (all members of the _KEYS class attribute are required)
5848     - four buffer attributes (in|out_data|text), that represent the
5849       input (to the external script) in text and data structure format,
5850       and the output from it, again in two formats
5851     - the result variables from the script (success, info, nodes) for
5852       easy usage
5853
5854   """
5855   _ALLO_KEYS = [
5856     "mem_size", "disks", "disk_template",
5857     "os", "tags", "nics", "vcpus", "hypervisor",
5858     ]
5859   _RELO_KEYS = [
5860     "relocate_from",
5861     ]
5862
5863   def __init__(self, lu, mode, name, **kwargs):
5864     self.lu = lu
5865     # init buffer variables
5866     self.in_text = self.out_text = self.in_data = self.out_data = None
5867     # init all input fields so that pylint is happy
5868     self.mode = mode
5869     self.name = name
5870     self.mem_size = self.disks = self.disk_template = None
5871     self.os = self.tags = self.nics = self.vcpus = None
5872     self.relocate_from = None
5873     # computed fields
5874     self.required_nodes = None
5875     # init result fields
5876     self.success = self.info = self.nodes = None
5877     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5878       keyset = self._ALLO_KEYS
5879     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5880       keyset = self._RELO_KEYS
5881     else:
5882       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5883                                    " IAllocator" % self.mode)
5884     for key in kwargs:
5885       if key not in keyset:
5886         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5887                                      " IAllocator" % key)
5888       setattr(self, key, kwargs[key])
5889     for key in keyset:
5890       if key not in kwargs:
5891         raise errors.ProgrammerError("Missing input parameter '%s' to"
5892                                      " IAllocator" % key)
5893     self._BuildInputData()
5894
5895   def _ComputeClusterData(self):
5896     """Compute the generic allocator input data.
5897
5898     This is the data that is independent of the actual operation.
5899
5900     """
5901     cfg = self.lu.cfg
5902     cluster_info = cfg.GetClusterInfo()
5903     # cluster data
5904     data = {
5905       "version": 1,
5906       "cluster_name": cfg.GetClusterName(),
5907       "cluster_tags": list(cluster_info.GetTags()),
5908       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5909       # we don't have job IDs
5910       }
5911     iinfo = cfg.GetAllInstancesInfo().values()
5912     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5913
5914     # node data
5915     node_results = {}
5916     node_list = cfg.GetNodeList()
5917
5918     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5919       hypervisor = self.hypervisor
5920     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5921       hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5922
5923     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5924                                            hypervisor)
5925     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5926                        cluster_info.enabled_hypervisors)
5927     for nname in node_list:
5928       ninfo = cfg.GetNodeInfo(nname)
5929       node_data[nname].Raise()
5930       if not isinstance(node_data[nname].data, dict):
5931         raise errors.OpExecError("Can't get data for node %s" % nname)
5932       remote_info = node_data[nname].data
5933       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5934                    'vg_size', 'vg_free', 'cpu_total']:
5935         if attr not in remote_info:
5936           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5937                                    (nname, attr))
5938         try:
5939           remote_info[attr] = int(remote_info[attr])
5940         except ValueError, err:
5941           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5942                                    " %s" % (nname, attr, str(err)))
5943       # compute memory used by primary instances
5944       i_p_mem = i_p_up_mem = 0
5945       for iinfo, beinfo in i_list:
5946         if iinfo.primary_node == nname:
5947           i_p_mem += beinfo[constants.BE_MEMORY]
5948           if iinfo.name not in node_iinfo[nname]:
5949             i_used_mem = 0
5950           else:
5951             i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5952           i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5953           remote_info['memory_free'] -= max(0, i_mem_diff)
5954
5955           if iinfo.status == "up":
5956             i_p_up_mem += beinfo[constants.BE_MEMORY]
5957
5958       # compute memory used by instances
5959       pnr = {
5960         "tags": list(ninfo.GetTags()),
5961         "total_memory": remote_info['memory_total'],
5962         "reserved_memory": remote_info['memory_dom0'],
5963         "free_memory": remote_info['memory_free'],
5964         "i_pri_memory": i_p_mem,
5965         "i_pri_up_memory": i_p_up_mem,
5966         "total_disk": remote_info['vg_size'],
5967         "free_disk": remote_info['vg_free'],
5968         "primary_ip": ninfo.primary_ip,
5969         "secondary_ip": ninfo.secondary_ip,
5970         "total_cpus": remote_info['cpu_total'],
5971         "offline": ninfo.offline,
5972         }
5973       node_results[nname] = pnr
5974     data["nodes"] = node_results
5975
5976     # instance data
5977     instance_data = {}
5978     for iinfo, beinfo in i_list:
5979       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5980                   for n in iinfo.nics]
5981       pir = {
5982         "tags": list(iinfo.GetTags()),
5983         "should_run": iinfo.status == "up",
5984         "vcpus": beinfo[constants.BE_VCPUS],
5985         "memory": beinfo[constants.BE_MEMORY],
5986         "os": iinfo.os,
5987         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5988         "nics": nic_data,
5989         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5990         "disk_template": iinfo.disk_template,
5991         "hypervisor": iinfo.hypervisor,
5992         }
5993       instance_data[iinfo.name] = pir
5994
5995     data["instances"] = instance_data
5996
5997     self.in_data = data
5998
5999   def _AddNewInstance(self):
6000     """Add new instance data to allocator structure.
6001
6002     This in combination with _AllocatorGetClusterData will create the
6003     correct structure needed as input for the allocator.
6004
6005     The checks for the completeness of the opcode must have already been
6006     done.
6007
6008     """
6009     data = self.in_data
6010     if len(self.disks) != 2:
6011       raise errors.OpExecError("Only two-disk configurations supported")
6012
6013     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6014
6015     if self.disk_template in constants.DTS_NET_MIRROR:
6016       self.required_nodes = 2
6017     else:
6018       self.required_nodes = 1
6019     request = {
6020       "type": "allocate",
6021       "name": self.name,
6022       "disk_template": self.disk_template,
6023       "tags": self.tags,
6024       "os": self.os,
6025       "vcpus": self.vcpus,
6026       "memory": self.mem_size,
6027       "disks": self.disks,
6028       "disk_space_total": disk_space,
6029       "nics": self.nics,
6030       "required_nodes": self.required_nodes,
6031       }
6032     data["request"] = request
6033
6034   def _AddRelocateInstance(self):
6035     """Add relocate instance data to allocator structure.
6036
6037     This in combination with _IAllocatorGetClusterData will create the
6038     correct structure needed as input for the allocator.
6039
6040     The checks for the completeness of the opcode must have already been
6041     done.
6042
6043     """
6044     instance = self.lu.cfg.GetInstanceInfo(self.name)
6045     if instance is None:
6046       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6047                                    " IAllocator" % self.name)
6048
6049     if instance.disk_template not in constants.DTS_NET_MIRROR:
6050       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6051
6052     if len(instance.secondary_nodes) != 1:
6053       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6054
6055     self.required_nodes = 1
6056     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6057     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6058
6059     request = {
6060       "type": "relocate",
6061       "name": self.name,
6062       "disk_space_total": disk_space,
6063       "required_nodes": self.required_nodes,
6064       "relocate_from": self.relocate_from,
6065       }
6066     self.in_data["request"] = request
6067
6068   def _BuildInputData(self):
6069     """Build input data structures.
6070
6071     """
6072     self._ComputeClusterData()
6073
6074     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6075       self._AddNewInstance()
6076     else:
6077       self._AddRelocateInstance()
6078
6079     self.in_text = serializer.Dump(self.in_data)
6080
6081   def Run(self, name, validate=True, call_fn=None):
6082     """Run an instance allocator and return the results.
6083
6084     """
6085     if call_fn is None:
6086       call_fn = self.lu.rpc.call_iallocator_runner
6087     data = self.in_text
6088
6089     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6090     result.Raise()
6091
6092     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6093       raise errors.OpExecError("Invalid result from master iallocator runner")
6094
6095     rcode, stdout, stderr, fail = result.data
6096
6097     if rcode == constants.IARUN_NOTFOUND:
6098       raise errors.OpExecError("Can't find allocator '%s'" % name)
6099     elif rcode == constants.IARUN_FAILURE:
6100       raise errors.OpExecError("Instance allocator call failed: %s,"
6101                                " output: %s" % (fail, stdout+stderr))
6102     self.out_text = stdout
6103     if validate:
6104       self._ValidateResult()
6105
6106   def _ValidateResult(self):
6107     """Process the allocator results.
6108
6109     This will process and if successful save the result in
6110     self.out_data and the other parameters.
6111
6112     """
6113     try:
6114       rdict = serializer.Load(self.out_text)
6115     except Exception, err:
6116       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6117
6118     if not isinstance(rdict, dict):
6119       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6120
6121     for key in "success", "info", "nodes":
6122       if key not in rdict:
6123         raise errors.OpExecError("Can't parse iallocator results:"
6124                                  " missing key '%s'" % key)
6125       setattr(self, key, rdict[key])
6126
6127     if not isinstance(rdict["nodes"], list):
6128       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6129                                " is not a list")
6130     self.out_data = rdict
6131
6132
6133 class LUTestAllocator(NoHooksLU):
6134   """Run allocator tests.
6135
6136   This LU runs the allocator tests
6137
6138   """
6139   _OP_REQP = ["direction", "mode", "name"]
6140
6141   def CheckPrereq(self):
6142     """Check prerequisites.
6143
6144     This checks the opcode parameters depending on the director and mode test.
6145
6146     """
6147     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6148       for attr in ["name", "mem_size", "disks", "disk_template",
6149                    "os", "tags", "nics", "vcpus"]:
6150         if not hasattr(self.op, attr):
6151           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6152                                      attr)
6153       iname = self.cfg.ExpandInstanceName(self.op.name)
6154       if iname is not None:
6155         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6156                                    iname)
6157       if not isinstance(self.op.nics, list):
6158         raise errors.OpPrereqError("Invalid parameter 'nics'")
6159       for row in self.op.nics:
6160         if (not isinstance(row, dict) or
6161             "mac" not in row or
6162             "ip" not in row or
6163             "bridge" not in row):
6164           raise errors.OpPrereqError("Invalid contents of the"
6165                                      " 'nics' parameter")
6166       if not isinstance(self.op.disks, list):
6167         raise errors.OpPrereqError("Invalid parameter 'disks'")
6168       if len(self.op.disks) != 2:
6169         raise errors.OpPrereqError("Only two-disk configurations supported")
6170       for row in self.op.disks:
6171         if (not isinstance(row, dict) or
6172             "size" not in row or
6173             not isinstance(row["size"], int) or
6174             "mode" not in row or
6175             row["mode"] not in ['r', 'w']):
6176           raise errors.OpPrereqError("Invalid contents of the"
6177                                      " 'disks' parameter")
6178       if self.op.hypervisor is None:
6179         self.op.hypervisor = self.cfg.GetHypervisorType()
6180     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6181       if not hasattr(self.op, "name"):
6182         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6183       fname = self.cfg.ExpandInstanceName(self.op.name)
6184       if fname is None:
6185         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6186                                    self.op.name)
6187       self.op.name = fname
6188       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6189     else:
6190       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6191                                  self.op.mode)
6192
6193     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6194       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6195         raise errors.OpPrereqError("Missing allocator name")
6196     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6197       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6198                                  self.op.direction)
6199
6200   def Exec(self, feedback_fn):
6201     """Run the allocator test.
6202
6203     """
6204     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6205       ial = IAllocator(self,
6206                        mode=self.op.mode,
6207                        name=self.op.name,
6208                        mem_size=self.op.mem_size,
6209                        disks=self.op.disks,
6210                        disk_template=self.op.disk_template,
6211                        os=self.op.os,
6212                        tags=self.op.tags,
6213                        nics=self.op.nics,
6214                        vcpus=self.op.vcpus,
6215                        hypervisor=self.op.hypervisor,
6216                        )
6217     else:
6218       ial = IAllocator(self,
6219                        mode=self.op.mode,
6220                        name=self.op.name,
6221                        relocate_from=list(self.relocate_from),
6222                        )
6223
6224     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6225       result = ial.in_text
6226     else:
6227       ial.Run(self.op.allocator, validate=False)
6228       result = ial.out_text
6229     return result