ganeti.http: Add constant for DELETE
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = lu.cfg.GetInstanceList()
396   return utils.NiceSort(wanted)
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
419                           memory, vcpus, nics):
420   """Builds instance related env variables for hooks
421
422   This builds the hook environment from individual variables.
423
424   @type name: string
425   @param name: the name of the instance
426   @type primary_node: string
427   @param primary_node: the name of the instance's primary node
428   @type secondary_nodes: list
429   @param secondary_nodes: list of secondary nodes as strings
430   @type os_type: string
431   @param os_type: the name of the instance's OS
432   @type status: string
433   @param status: the desired status of the instances
434   @type memory: string
435   @param memory: the memory size of the instance
436   @type vcpus: string
437   @param vcpus: the count of VCPUs the instance has
438   @type nics: list
439   @param nics: list of tuples (ip, bridge, mac) representing
440       the NICs the instance  has
441   @rtype: dict
442   @return: the hook environment for this instance
443
444   """
445   env = {
446     "OP_TARGET": name,
447     "INSTANCE_NAME": name,
448     "INSTANCE_PRIMARY": primary_node,
449     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
450     "INSTANCE_OS_TYPE": os_type,
451     "INSTANCE_STATUS": status,
452     "INSTANCE_MEMORY": memory,
453     "INSTANCE_VCPUS": vcpus,
454   }
455
456   if nics:
457     nic_count = len(nics)
458     for idx, (ip, bridge, mac) in enumerate(nics):
459       if ip is None:
460         ip = ""
461       env["INSTANCE_NIC%d_IP" % idx] = ip
462       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
463       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
464   else:
465     nic_count = 0
466
467   env["INSTANCE_NIC_COUNT"] = nic_count
468
469   return env
470
471
472 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
473   """Builds instance related env variables for hooks from an object.
474
475   @type lu: L{LogicalUnit}
476   @param lu: the logical unit on whose behalf we execute
477   @type instance: L{objects.Instance}
478   @param instance: the instance for which we should build the
479       environment
480   @type override: dict
481   @param override: dictionary with key/values that will override
482       our values
483   @rtype: dict
484   @return: the hook environment dictionary
485
486   """
487   bep = lu.cfg.GetClusterInfo().FillBE(instance)
488   args = {
489     'name': instance.name,
490     'primary_node': instance.primary_node,
491     'secondary_nodes': instance.secondary_nodes,
492     'os_type': instance.os,
493     'status': instance.os,
494     'memory': bep[constants.BE_MEMORY],
495     'vcpus': bep[constants.BE_VCPUS],
496     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
497   }
498   if override:
499     args.update(override)
500   return _BuildInstanceHookEnv(**args)
501
502
503 def _CheckInstanceBridgesExist(lu, instance):
504   """Check that the brigdes needed by an instance exist.
505
506   """
507   # check bridges existance
508   brlist = [nic.bridge for nic in instance.nics]
509   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
510   result.Raise()
511   if not result.data:
512     raise errors.OpPrereqError("One or more target bridges %s does not"
513                                " exist on destination node '%s'" %
514                                (brlist, instance.primary_node))
515
516
517 class LUDestroyCluster(NoHooksLU):
518   """Logical unit for destroying the cluster.
519
520   """
521   _OP_REQP = []
522
523   def CheckPrereq(self):
524     """Check prerequisites.
525
526     This checks whether the cluster is empty.
527
528     Any errors are signalled by raising errors.OpPrereqError.
529
530     """
531     master = self.cfg.GetMasterNode()
532
533     nodelist = self.cfg.GetNodeList()
534     if len(nodelist) != 1 or nodelist[0] != master:
535       raise errors.OpPrereqError("There are still %d node(s) in"
536                                  " this cluster." % (len(nodelist) - 1))
537     instancelist = self.cfg.GetInstanceList()
538     if instancelist:
539       raise errors.OpPrereqError("There are still %d instance(s) in"
540                                  " this cluster." % len(instancelist))
541
542   def Exec(self, feedback_fn):
543     """Destroys the cluster.
544
545     """
546     master = self.cfg.GetMasterNode()
547     result = self.rpc.call_node_stop_master(master, False)
548     result.Raise()
549     if not result.data:
550       raise errors.OpExecError("Could not disable the master role")
551     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
552     utils.CreateBackup(priv_key)
553     utils.CreateBackup(pub_key)
554     return master
555
556
557 class LUVerifyCluster(LogicalUnit):
558   """Verifies the cluster status.
559
560   """
561   HPATH = "cluster-verify"
562   HTYPE = constants.HTYPE_CLUSTER
563   _OP_REQP = ["skip_checks"]
564   REQ_BGL = False
565
566   def ExpandNames(self):
567     self.needed_locks = {
568       locking.LEVEL_NODE: locking.ALL_SET,
569       locking.LEVEL_INSTANCE: locking.ALL_SET,
570     }
571     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
572
573   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
574                   node_result, feedback_fn, master_files):
575     """Run multiple tests against a node.
576
577     Test list:
578
579       - compares ganeti version
580       - checks vg existance and size > 20G
581       - checks config file checksum
582       - checks ssh to other nodes
583
584     @type nodeinfo: L{objects.Node}
585     @param nodeinfo: the node to check
586     @param file_list: required list of files
587     @param local_cksum: dictionary of local files and their checksums
588     @param node_result: the results from the node
589     @param feedback_fn: function used to accumulate results
590     @param master_files: list of files that only masters should have
591
592     """
593     node = nodeinfo.name
594
595     # main result, node_result should be a non-empty dict
596     if not node_result or not isinstance(node_result, dict):
597       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
598       return True
599
600     # compares ganeti version
601     local_version = constants.PROTOCOL_VERSION
602     remote_version = node_result.get('version', None)
603     if not remote_version:
604       feedback_fn("  - ERROR: connection to %s failed" % (node))
605       return True
606
607     if local_version != remote_version:
608       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
609                       (local_version, node, remote_version))
610       return True
611
612     # checks vg existance and size > 20G
613
614     bad = False
615     vglist = node_result.get(constants.NV_VGLIST, None)
616     if not vglist:
617       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
618                       (node,))
619       bad = True
620     else:
621       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
622                                             constants.MIN_VG_SIZE)
623       if vgstatus:
624         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
625         bad = True
626
627     # checks config file checksum
628
629     remote_cksum = node_result.get(constants.NV_FILELIST, None)
630     if not isinstance(remote_cksum, dict):
631       bad = True
632       feedback_fn("  - ERROR: node hasn't returned file checksum data")
633     else:
634       for file_name in file_list:
635         node_is_mc = nodeinfo.master_candidate
636         must_have_file = file_name not in master_files
637         if file_name not in remote_cksum:
638           if node_is_mc or must_have_file:
639             bad = True
640             feedback_fn("  - ERROR: file '%s' missing" % file_name)
641         elif remote_cksum[file_name] != local_cksum[file_name]:
642           if node_is_mc or must_have_file:
643             bad = True
644             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
645           else:
646             # not candidate and this is not a must-have file
647             bad = True
648             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
649                         " '%s'" % file_name)
650         else:
651           # all good, except non-master/non-must have combination
652           if not node_is_mc and not must_have_file:
653             feedback_fn("  - ERROR: file '%s' should not exist on non master"
654                         " candidates" % file_name)
655
656     # checks ssh to any
657
658     if constants.NV_NODELIST not in node_result:
659       bad = True
660       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
661     else:
662       if node_result[constants.NV_NODELIST]:
663         bad = True
664         for node in node_result[constants.NV_NODELIST]:
665           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
666                           (node, node_result[constants.NV_NODELIST][node]))
667
668     if constants.NV_NODENETTEST not in node_result:
669       bad = True
670       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
671     else:
672       if node_result[constants.NV_NODENETTEST]:
673         bad = True
674         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
675         for node in nlist:
676           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
677                           (node, node_result[constants.NV_NODENETTEST][node]))
678
679     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
680     if isinstance(hyp_result, dict):
681       for hv_name, hv_result in hyp_result.iteritems():
682         if hv_result is not None:
683           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
684                       (hv_name, hv_result))
685     return bad
686
687   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688                       node_instance, feedback_fn):
689     """Verify an instance.
690
691     This function checks to see if the required block devices are
692     available on the instance's node.
693
694     """
695     bad = False
696
697     node_current = instanceconfig.primary_node
698
699     node_vol_should = {}
700     instanceconfig.MapLVsByNode(node_vol_should)
701
702     for node in node_vol_should:
703       for volume in node_vol_should[node]:
704         if node not in node_vol_is or volume not in node_vol_is[node]:
705           feedback_fn("  - ERROR: volume %s missing on node %s" %
706                           (volume, node))
707           bad = True
708
709     if not instanceconfig.status == 'down':
710       if (node_current not in node_instance or
711           not instance in node_instance[node_current]):
712         feedback_fn("  - ERROR: instance %s not running on node %s" %
713                         (instance, node_current))
714         bad = True
715
716     for node in node_instance:
717       if (not node == node_current):
718         if instance in node_instance[node]:
719           feedback_fn("  - ERROR: instance %s should not run on node %s" %
720                           (instance, node))
721           bad = True
722
723     return bad
724
725   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726     """Verify if there are any unknown volumes in the cluster.
727
728     The .os, .swap and backup volumes are ignored. All other volumes are
729     reported as unknown.
730
731     """
732     bad = False
733
734     for node in node_vol_is:
735       for volume in node_vol_is[node]:
736         if node not in node_vol_should or volume not in node_vol_should[node]:
737           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738                       (volume, node))
739           bad = True
740     return bad
741
742   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743     """Verify the list of running instances.
744
745     This checks what instances are running but unknown to the cluster.
746
747     """
748     bad = False
749     for node in node_instance:
750       for runninginstance in node_instance[node]:
751         if runninginstance not in instancelist:
752           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753                           (runninginstance, node))
754           bad = True
755     return bad
756
757   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758     """Verify N+1 Memory Resilience.
759
760     Check that if one single node dies we can still start all the instances it
761     was primary for.
762
763     """
764     bad = False
765
766     for node, nodeinfo in node_info.iteritems():
767       # This code checks that every node which is now listed as secondary has
768       # enough memory to host all instances it is supposed to should a single
769       # other node in the cluster fail.
770       # FIXME: not ready for failover to an arbitrary node
771       # FIXME: does not support file-backed instances
772       # WARNING: we currently take into account down instances as well as up
773       # ones, considering that even if they're down someone might want to start
774       # them even in the event of a node failure.
775       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
776         needed_mem = 0
777         for instance in instances:
778           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
779           if bep[constants.BE_AUTO_BALANCE]:
780             needed_mem += bep[constants.BE_MEMORY]
781         if nodeinfo['mfree'] < needed_mem:
782           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
783                       " failovers should node %s fail" % (node, prinode))
784           bad = True
785     return bad
786
787   def CheckPrereq(self):
788     """Check prerequisites.
789
790     Transform the list of checks we're going to skip into a set and check that
791     all its members are valid.
792
793     """
794     self.skip_set = frozenset(self.op.skip_checks)
795     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
796       raise errors.OpPrereqError("Invalid checks to be skipped specified")
797
798   def BuildHooksEnv(self):
799     """Build hooks env.
800
801     Cluster-Verify hooks just rone in the post phase and their failure makes
802     the output be logged in the verify output and the verification to fail.
803
804     """
805     all_nodes = self.cfg.GetNodeList()
806     # TODO: populate the environment with useful information for verify hooks
807     env = {}
808     return env, [], all_nodes
809
810   def Exec(self, feedback_fn):
811     """Verify integrity of cluster, performing various test on nodes.
812
813     """
814     bad = False
815     feedback_fn("* Verifying global settings")
816     for msg in self.cfg.VerifyConfig():
817       feedback_fn("  - ERROR: %s" % msg)
818
819     vg_name = self.cfg.GetVGName()
820     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
821     nodelist = utils.NiceSort(self.cfg.GetNodeList())
822     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
823     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
824     i_non_redundant = [] # Non redundant instances
825     i_non_a_balanced = [] # Non auto-balanced instances
826     node_volume = {}
827     node_instance = {}
828     node_info = {}
829     instance_cfg = {}
830
831     # FIXME: verify OS list
832     # do local checksums
833     master_files = [constants.CLUSTER_CONF_FILE]
834
835     file_names = ssconf.SimpleStore().GetFileList()
836     file_names.append(constants.SSL_CERT_FILE)
837     file_names.extend(master_files)
838
839     local_checksums = utils.FingerprintFiles(file_names)
840
841     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
842     node_verify_param = {
843       constants.NV_FILELIST: file_names,
844       constants.NV_NODELIST: nodelist,
845       constants.NV_HYPERVISOR: hypervisors,
846       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
847                                   node.secondary_ip) for node in nodeinfo],
848       constants.NV_LVLIST: vg_name,
849       constants.NV_INSTANCELIST: hypervisors,
850       constants.NV_VGLIST: None,
851       constants.NV_VERSION: None,
852       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
853       }
854     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
855                                            self.cfg.GetClusterName())
856
857     cluster = self.cfg.GetClusterInfo()
858     master_node = self.cfg.GetMasterNode()
859     for node_i in nodeinfo:
860       node = node_i.name
861       nresult = all_nvinfo[node].data
862
863       if node == master_node:
864         ntype = "master"
865       elif node_i.master_candidate:
866         ntype = "master candidate"
867       else:
868         ntype = "regular"
869       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
870
871       if all_nvinfo[node].failed or not isinstance(nresult, dict):
872         feedback_fn("  - ERROR: connection to %s failed" % (node,))
873         bad = True
874         continue
875
876       result = self._VerifyNode(node_i, file_names, local_checksums,
877                                 nresult, feedback_fn, master_files)
878       bad = bad or result
879
880       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
881       if isinstance(lvdata, basestring):
882         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
883                     (node, lvdata.encode('string_escape')))
884         bad = True
885         node_volume[node] = {}
886       elif not isinstance(lvdata, dict):
887         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
888         bad = True
889         continue
890       else:
891         node_volume[node] = lvdata
892
893       # node_instance
894       idata = nresult.get(constants.NV_INSTANCELIST, None)
895       if not isinstance(idata, list):
896         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
897                     (node,))
898         bad = True
899         continue
900
901       node_instance[node] = idata
902
903       # node_info
904       nodeinfo = nresult.get(constants.NV_HVINFO, None)
905       if not isinstance(nodeinfo, dict):
906         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
907         bad = True
908         continue
909
910       try:
911         node_info[node] = {
912           "mfree": int(nodeinfo['memory_free']),
913           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
914           "pinst": [],
915           "sinst": [],
916           # dictionary holding all instances this node is secondary for,
917           # grouped by their primary node. Each key is a cluster node, and each
918           # value is a list of instances which have the key as primary and the
919           # current node as secondary.  this is handy to calculate N+1 memory
920           # availability if you can only failover from a primary to its
921           # secondary.
922           "sinst-by-pnode": {},
923         }
924       except ValueError:
925         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
926         bad = True
927         continue
928
929     node_vol_should = {}
930
931     for instance in instancelist:
932       feedback_fn("* Verifying instance %s" % instance)
933       inst_config = self.cfg.GetInstanceInfo(instance)
934       result =  self._VerifyInstance(instance, inst_config, node_volume,
935                                      node_instance, feedback_fn)
936       bad = bad or result
937
938       inst_config.MapLVsByNode(node_vol_should)
939
940       instance_cfg[instance] = inst_config
941
942       pnode = inst_config.primary_node
943       if pnode in node_info:
944         node_info[pnode]['pinst'].append(instance)
945       else:
946         feedback_fn("  - ERROR: instance %s, connection to primary node"
947                     " %s failed" % (instance, pnode))
948         bad = True
949
950       # If the instance is non-redundant we cannot survive losing its primary
951       # node, so we are not N+1 compliant. On the other hand we have no disk
952       # templates with more than one secondary so that situation is not well
953       # supported either.
954       # FIXME: does not support file-backed instances
955       if len(inst_config.secondary_nodes) == 0:
956         i_non_redundant.append(instance)
957       elif len(inst_config.secondary_nodes) > 1:
958         feedback_fn("  - WARNING: multiple secondaries for instance %s"
959                     % instance)
960
961       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
962         i_non_a_balanced.append(instance)
963
964       for snode in inst_config.secondary_nodes:
965         if snode in node_info:
966           node_info[snode]['sinst'].append(instance)
967           if pnode not in node_info[snode]['sinst-by-pnode']:
968             node_info[snode]['sinst-by-pnode'][pnode] = []
969           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
970         else:
971           feedback_fn("  - ERROR: instance %s, connection to secondary node"
972                       " %s failed" % (instance, snode))
973
974     feedback_fn("* Verifying orphan volumes")
975     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
976                                        feedback_fn)
977     bad = bad or result
978
979     feedback_fn("* Verifying remaining instances")
980     result = self._VerifyOrphanInstances(instancelist, node_instance,
981                                          feedback_fn)
982     bad = bad or result
983
984     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
985       feedback_fn("* Verifying N+1 Memory redundancy")
986       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
987       bad = bad or result
988
989     feedback_fn("* Other Notes")
990     if i_non_redundant:
991       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
992                   % len(i_non_redundant))
993
994     if i_non_a_balanced:
995       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
996                   % len(i_non_a_balanced))
997
998     return not bad
999
1000   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1001     """Analize the post-hooks' result
1002
1003     This method analyses the hook result, handles it, and sends some
1004     nicely-formatted feedback back to the user.
1005
1006     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1007         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1008     @param hooks_results: the results of the multi-node hooks rpc call
1009     @param feedback_fn: function used send feedback back to the caller
1010     @param lu_result: previous Exec result
1011     @return: the new Exec result, based on the previous result
1012         and hook results
1013
1014     """
1015     # We only really run POST phase hooks, and are only interested in
1016     # their results
1017     if phase == constants.HOOKS_PHASE_POST:
1018       # Used to change hooks' output to proper indentation
1019       indent_re = re.compile('^', re.M)
1020       feedback_fn("* Hooks Results")
1021       if not hooks_results:
1022         feedback_fn("  - ERROR: general communication failure")
1023         lu_result = 1
1024       else:
1025         for node_name in hooks_results:
1026           show_node_header = True
1027           res = hooks_results[node_name]
1028           if res.failed or res.data is False or not isinstance(res.data, list):
1029             feedback_fn("    Communication failure in hooks execution")
1030             lu_result = 1
1031             continue
1032           for script, hkr, output in res.data:
1033             if hkr == constants.HKR_FAIL:
1034               # The node header is only shown once, if there are
1035               # failing hooks on that node
1036               if show_node_header:
1037                 feedback_fn("  Node %s:" % node_name)
1038                 show_node_header = False
1039               feedback_fn("    ERROR: Script %s failed, output:" % script)
1040               output = indent_re.sub('      ', output)
1041               feedback_fn("%s" % output)
1042               lu_result = 1
1043
1044       return lu_result
1045
1046
1047 class LUVerifyDisks(NoHooksLU):
1048   """Verifies the cluster disks status.
1049
1050   """
1051   _OP_REQP = []
1052   REQ_BGL = False
1053
1054   def ExpandNames(self):
1055     self.needed_locks = {
1056       locking.LEVEL_NODE: locking.ALL_SET,
1057       locking.LEVEL_INSTANCE: locking.ALL_SET,
1058     }
1059     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1060
1061   def CheckPrereq(self):
1062     """Check prerequisites.
1063
1064     This has no prerequisites.
1065
1066     """
1067     pass
1068
1069   def Exec(self, feedback_fn):
1070     """Verify integrity of cluster disks.
1071
1072     """
1073     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1074
1075     vg_name = self.cfg.GetVGName()
1076     nodes = utils.NiceSort(self.cfg.GetNodeList())
1077     instances = [self.cfg.GetInstanceInfo(name)
1078                  for name in self.cfg.GetInstanceList()]
1079
1080     nv_dict = {}
1081     for inst in instances:
1082       inst_lvs = {}
1083       if (inst.status != "up" or
1084           inst.disk_template not in constants.DTS_NET_MIRROR):
1085         continue
1086       inst.MapLVsByNode(inst_lvs)
1087       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1088       for node, vol_list in inst_lvs.iteritems():
1089         for vol in vol_list:
1090           nv_dict[(node, vol)] = inst
1091
1092     if not nv_dict:
1093       return result
1094
1095     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1096
1097     to_act = set()
1098     for node in nodes:
1099       # node_volume
1100       lvs = node_lvs[node]
1101       if lvs.failed:
1102         self.LogWarning("Connection to node %s failed: %s" %
1103                         (node, lvs.data))
1104         continue
1105       lvs = lvs.data
1106       if isinstance(lvs, basestring):
1107         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1108         res_nlvm[node] = lvs
1109       elif not isinstance(lvs, dict):
1110         logging.warning("Connection to node %s failed or invalid data"
1111                         " returned", node)
1112         res_nodes.append(node)
1113         continue
1114
1115       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1116         inst = nv_dict.pop((node, lv_name), None)
1117         if (not lv_online and inst is not None
1118             and inst.name not in res_instances):
1119           res_instances.append(inst.name)
1120
1121     # any leftover items in nv_dict are missing LVs, let's arrange the
1122     # data better
1123     for key, inst in nv_dict.iteritems():
1124       if inst.name not in res_missing:
1125         res_missing[inst.name] = []
1126       res_missing[inst.name].append(key)
1127
1128     return result
1129
1130
1131 class LURenameCluster(LogicalUnit):
1132   """Rename the cluster.
1133
1134   """
1135   HPATH = "cluster-rename"
1136   HTYPE = constants.HTYPE_CLUSTER
1137   _OP_REQP = ["name"]
1138
1139   def BuildHooksEnv(self):
1140     """Build hooks env.
1141
1142     """
1143     env = {
1144       "OP_TARGET": self.cfg.GetClusterName(),
1145       "NEW_NAME": self.op.name,
1146       }
1147     mn = self.cfg.GetMasterNode()
1148     return env, [mn], [mn]
1149
1150   def CheckPrereq(self):
1151     """Verify that the passed name is a valid one.
1152
1153     """
1154     hostname = utils.HostInfo(self.op.name)
1155
1156     new_name = hostname.name
1157     self.ip = new_ip = hostname.ip
1158     old_name = self.cfg.GetClusterName()
1159     old_ip = self.cfg.GetMasterIP()
1160     if new_name == old_name and new_ip == old_ip:
1161       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1162                                  " cluster has changed")
1163     if new_ip != old_ip:
1164       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1165         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1166                                    " reachable on the network. Aborting." %
1167                                    new_ip)
1168
1169     self.op.name = new_name
1170
1171   def Exec(self, feedback_fn):
1172     """Rename the cluster.
1173
1174     """
1175     clustername = self.op.name
1176     ip = self.ip
1177
1178     # shutdown the master IP
1179     master = self.cfg.GetMasterNode()
1180     result = self.rpc.call_node_stop_master(master, False)
1181     if result.failed or not result.data:
1182       raise errors.OpExecError("Could not disable the master role")
1183
1184     try:
1185       cluster = self.cfg.GetClusterInfo()
1186       cluster.cluster_name = clustername
1187       cluster.master_ip = ip
1188       self.cfg.Update(cluster)
1189
1190       # update the known hosts file
1191       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1192       node_list = self.cfg.GetNodeList()
1193       try:
1194         node_list.remove(master)
1195       except ValueError:
1196         pass
1197       result = self.rpc.call_upload_file(node_list,
1198                                          constants.SSH_KNOWN_HOSTS_FILE)
1199       for to_node, to_result in result.iteritems():
1200         if to_result.failed or not to_result.data:
1201           logging.error("Copy of file %s to node %s failed", fname, to_node)
1202
1203     finally:
1204       result = self.rpc.call_node_start_master(master, False)
1205       if result.failed or not result.data:
1206         self.LogWarning("Could not re-enable the master role on"
1207                         " the master, please restart manually.")
1208
1209
1210 def _RecursiveCheckIfLVMBased(disk):
1211   """Check if the given disk or its children are lvm-based.
1212
1213   @type disk: L{objects.Disk}
1214   @param disk: the disk to check
1215   @rtype: booleean
1216   @return: boolean indicating whether a LD_LV dev_type was found or not
1217
1218   """
1219   if disk.children:
1220     for chdisk in disk.children:
1221       if _RecursiveCheckIfLVMBased(chdisk):
1222         return True
1223   return disk.dev_type == constants.LD_LV
1224
1225
1226 class LUSetClusterParams(LogicalUnit):
1227   """Change the parameters of the cluster.
1228
1229   """
1230   HPATH = "cluster-modify"
1231   HTYPE = constants.HTYPE_CLUSTER
1232   _OP_REQP = []
1233   REQ_BGL = False
1234
1235   def CheckParameters(self):
1236     """Check parameters
1237
1238     """
1239     if not hasattr(self.op, "candidate_pool_size"):
1240       self.op.candidate_pool_size = None
1241     if self.op.candidate_pool_size is not None:
1242       try:
1243         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1244       except ValueError, err:
1245         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1246                                    str(err))
1247       if self.op.candidate_pool_size < 1:
1248         raise errors.OpPrereqError("At least one master candidate needed")
1249
1250   def ExpandNames(self):
1251     # FIXME: in the future maybe other cluster params won't require checking on
1252     # all nodes to be modified.
1253     self.needed_locks = {
1254       locking.LEVEL_NODE: locking.ALL_SET,
1255     }
1256     self.share_locks[locking.LEVEL_NODE] = 1
1257
1258   def BuildHooksEnv(self):
1259     """Build hooks env.
1260
1261     """
1262     env = {
1263       "OP_TARGET": self.cfg.GetClusterName(),
1264       "NEW_VG_NAME": self.op.vg_name,
1265       }
1266     mn = self.cfg.GetMasterNode()
1267     return env, [mn], [mn]
1268
1269   def CheckPrereq(self):
1270     """Check prerequisites.
1271
1272     This checks whether the given params don't conflict and
1273     if the given volume group is valid.
1274
1275     """
1276     # FIXME: This only works because there is only one parameter that can be
1277     # changed or removed.
1278     if self.op.vg_name is not None and not self.op.vg_name:
1279       instances = self.cfg.GetAllInstancesInfo().values()
1280       for inst in instances:
1281         for disk in inst.disks:
1282           if _RecursiveCheckIfLVMBased(disk):
1283             raise errors.OpPrereqError("Cannot disable lvm storage while"
1284                                        " lvm-based instances exist")
1285
1286     node_list = self.acquired_locks[locking.LEVEL_NODE]
1287
1288     # if vg_name not None, checks given volume group on all nodes
1289     if self.op.vg_name:
1290       vglist = self.rpc.call_vg_list(node_list)
1291       for node in node_list:
1292         if vglist[node].failed:
1293           # ignoring down node
1294           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1295           continue
1296         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1297                                               self.op.vg_name,
1298                                               constants.MIN_VG_SIZE)
1299         if vgstatus:
1300           raise errors.OpPrereqError("Error on node '%s': %s" %
1301                                      (node, vgstatus))
1302
1303     self.cluster = cluster = self.cfg.GetClusterInfo()
1304     # validate beparams changes
1305     if self.op.beparams:
1306       utils.CheckBEParams(self.op.beparams)
1307       self.new_beparams = cluster.FillDict(
1308         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1309
1310     # hypervisor list/parameters
1311     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1312     if self.op.hvparams:
1313       if not isinstance(self.op.hvparams, dict):
1314         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1315       for hv_name, hv_dict in self.op.hvparams.items():
1316         if hv_name not in self.new_hvparams:
1317           self.new_hvparams[hv_name] = hv_dict
1318         else:
1319           self.new_hvparams[hv_name].update(hv_dict)
1320
1321     if self.op.enabled_hypervisors is not None:
1322       self.hv_list = self.op.enabled_hypervisors
1323     else:
1324       self.hv_list = cluster.enabled_hypervisors
1325
1326     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1327       # either the enabled list has changed, or the parameters have, validate
1328       for hv_name, hv_params in self.new_hvparams.items():
1329         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1330             (self.op.enabled_hypervisors and
1331              hv_name in self.op.enabled_hypervisors)):
1332           # either this is a new hypervisor, or its parameters have changed
1333           hv_class = hypervisor.GetHypervisor(hv_name)
1334           hv_class.CheckParameterSyntax(hv_params)
1335           _CheckHVParams(self, node_list, hv_name, hv_params)
1336
1337   def Exec(self, feedback_fn):
1338     """Change the parameters of the cluster.
1339
1340     """
1341     if self.op.vg_name is not None:
1342       if self.op.vg_name != self.cfg.GetVGName():
1343         self.cfg.SetVGName(self.op.vg_name)
1344       else:
1345         feedback_fn("Cluster LVM configuration already in desired"
1346                     " state, not changing")
1347     if self.op.hvparams:
1348       self.cluster.hvparams = self.new_hvparams
1349     if self.op.enabled_hypervisors is not None:
1350       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1351     if self.op.beparams:
1352       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1353     if self.op.candidate_pool_size is not None:
1354       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1355
1356     self.cfg.Update(self.cluster)
1357
1358     # we want to update nodes after the cluster so that if any errors
1359     # happen, we have recorded and saved the cluster info
1360     if self.op.candidate_pool_size is not None:
1361       node_info = self.cfg.GetAllNodesInfo().values()
1362       num_candidates = len([node for node in node_info
1363                             if node.master_candidate])
1364       num_nodes = len(node_info)
1365       if num_candidates < self.op.candidate_pool_size:
1366         random.shuffle(node_info)
1367         for node in node_info:
1368           if num_candidates >= self.op.candidate_pool_size:
1369             break
1370           if node.master_candidate:
1371             continue
1372           node.master_candidate = True
1373           self.LogInfo("Promoting node %s to master candidate", node.name)
1374           self.cfg.Update(node)
1375           self.context.ReaddNode(node)
1376           num_candidates += 1
1377       elif num_candidates > self.op.candidate_pool_size:
1378         self.LogInfo("Note: more nodes are candidates (%d) than the new value"
1379                      " of candidate_pool_size (%d)" %
1380                      (num_candidates, self.op.candidate_pool_size))
1381
1382
1383 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1384   """Sleep and poll for an instance's disk to sync.
1385
1386   """
1387   if not instance.disks:
1388     return True
1389
1390   if not oneshot:
1391     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1392
1393   node = instance.primary_node
1394
1395   for dev in instance.disks:
1396     lu.cfg.SetDiskID(dev, node)
1397
1398   retries = 0
1399   while True:
1400     max_time = 0
1401     done = True
1402     cumul_degraded = False
1403     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1404     if rstats.failed or not rstats.data:
1405       lu.LogWarning("Can't get any data from node %s", node)
1406       retries += 1
1407       if retries >= 10:
1408         raise errors.RemoteError("Can't contact node %s for mirror data,"
1409                                  " aborting." % node)
1410       time.sleep(6)
1411       continue
1412     rstats = rstats.data
1413     retries = 0
1414     for i in range(len(rstats)):
1415       mstat = rstats[i]
1416       if mstat is None:
1417         lu.LogWarning("Can't compute data for node %s/%s",
1418                            node, instance.disks[i].iv_name)
1419         continue
1420       # we ignore the ldisk parameter
1421       perc_done, est_time, is_degraded, _ = mstat
1422       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1423       if perc_done is not None:
1424         done = False
1425         if est_time is not None:
1426           rem_time = "%d estimated seconds remaining" % est_time
1427           max_time = est_time
1428         else:
1429           rem_time = "no time estimate"
1430         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1431                         (instance.disks[i].iv_name, perc_done, rem_time))
1432     if done or oneshot:
1433       break
1434
1435     time.sleep(min(60, max_time))
1436
1437   if done:
1438     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1439   return not cumul_degraded
1440
1441
1442 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1443   """Check that mirrors are not degraded.
1444
1445   The ldisk parameter, if True, will change the test from the
1446   is_degraded attribute (which represents overall non-ok status for
1447   the device(s)) to the ldisk (representing the local storage status).
1448
1449   """
1450   lu.cfg.SetDiskID(dev, node)
1451   if ldisk:
1452     idx = 6
1453   else:
1454     idx = 5
1455
1456   result = True
1457   if on_primary or dev.AssembleOnSecondary():
1458     rstats = lu.rpc.call_blockdev_find(node, dev)
1459     if rstats.failed or not rstats.data:
1460       logging.warning("Node %s: disk degraded, not found or node down", node)
1461       result = False
1462     else:
1463       result = result and (not rstats.data[idx])
1464   if dev.children:
1465     for child in dev.children:
1466       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1467
1468   return result
1469
1470
1471 class LUDiagnoseOS(NoHooksLU):
1472   """Logical unit for OS diagnose/query.
1473
1474   """
1475   _OP_REQP = ["output_fields", "names"]
1476   REQ_BGL = False
1477   _FIELDS_STATIC = utils.FieldSet()
1478   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1479
1480   def ExpandNames(self):
1481     if self.op.names:
1482       raise errors.OpPrereqError("Selective OS query not supported")
1483
1484     _CheckOutputFields(static=self._FIELDS_STATIC,
1485                        dynamic=self._FIELDS_DYNAMIC,
1486                        selected=self.op.output_fields)
1487
1488     # Lock all nodes, in shared mode
1489     self.needed_locks = {}
1490     self.share_locks[locking.LEVEL_NODE] = 1
1491     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1492
1493   def CheckPrereq(self):
1494     """Check prerequisites.
1495
1496     """
1497
1498   @staticmethod
1499   def _DiagnoseByOS(node_list, rlist):
1500     """Remaps a per-node return list into an a per-os per-node dictionary
1501
1502     @param node_list: a list with the names of all nodes
1503     @param rlist: a map with node names as keys and OS objects as values
1504
1505     @rtype: dict
1506     @returns: a dictionary with osnames as keys and as value another map, with
1507         nodes as keys and list of OS objects as values, eg::
1508
1509           {"debian-etch": {"node1": [<object>,...],
1510                            "node2": [<object>,]}
1511           }
1512
1513     """
1514     all_os = {}
1515     for node_name, nr in rlist.iteritems():
1516       if nr.failed or not nr.data:
1517         continue
1518       for os_obj in nr.data:
1519         if os_obj.name not in all_os:
1520           # build a list of nodes for this os containing empty lists
1521           # for each node in node_list
1522           all_os[os_obj.name] = {}
1523           for nname in node_list:
1524             all_os[os_obj.name][nname] = []
1525         all_os[os_obj.name][node_name].append(os_obj)
1526     return all_os
1527
1528   def Exec(self, feedback_fn):
1529     """Compute the list of OSes.
1530
1531     """
1532     node_list = self.acquired_locks[locking.LEVEL_NODE]
1533     node_data = self.rpc.call_os_diagnose(node_list)
1534     if node_data == False:
1535       raise errors.OpExecError("Can't gather the list of OSes")
1536     pol = self._DiagnoseByOS(node_list, node_data)
1537     output = []
1538     for os_name, os_data in pol.iteritems():
1539       row = []
1540       for field in self.op.output_fields:
1541         if field == "name":
1542           val = os_name
1543         elif field == "valid":
1544           val = utils.all([osl and osl[0] for osl in os_data.values()])
1545         elif field == "node_status":
1546           val = {}
1547           for node_name, nos_list in os_data.iteritems():
1548             val[node_name] = [(v.status, v.path) for v in nos_list]
1549         else:
1550           raise errors.ParameterError(field)
1551         row.append(val)
1552       output.append(row)
1553
1554     return output
1555
1556
1557 class LURemoveNode(LogicalUnit):
1558   """Logical unit for removing a node.
1559
1560   """
1561   HPATH = "node-remove"
1562   HTYPE = constants.HTYPE_NODE
1563   _OP_REQP = ["node_name"]
1564
1565   def BuildHooksEnv(self):
1566     """Build hooks env.
1567
1568     This doesn't run on the target node in the pre phase as a failed
1569     node would then be impossible to remove.
1570
1571     """
1572     env = {
1573       "OP_TARGET": self.op.node_name,
1574       "NODE_NAME": self.op.node_name,
1575       }
1576     all_nodes = self.cfg.GetNodeList()
1577     all_nodes.remove(self.op.node_name)
1578     return env, all_nodes, all_nodes
1579
1580   def CheckPrereq(self):
1581     """Check prerequisites.
1582
1583     This checks:
1584      - the node exists in the configuration
1585      - it does not have primary or secondary instances
1586      - it's not the master
1587
1588     Any errors are signalled by raising errors.OpPrereqError.
1589
1590     """
1591     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1592     if node is None:
1593       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1594
1595     instance_list = self.cfg.GetInstanceList()
1596
1597     masternode = self.cfg.GetMasterNode()
1598     if node.name == masternode:
1599       raise errors.OpPrereqError("Node is the master node,"
1600                                  " you need to failover first.")
1601
1602     for instance_name in instance_list:
1603       instance = self.cfg.GetInstanceInfo(instance_name)
1604       if node.name == instance.primary_node:
1605         raise errors.OpPrereqError("Instance %s still running on the node,"
1606                                    " please remove first." % instance_name)
1607       if node.name in instance.secondary_nodes:
1608         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1609                                    " please remove first." % instance_name)
1610     self.op.node_name = node.name
1611     self.node = node
1612
1613   def Exec(self, feedback_fn):
1614     """Removes the node from the cluster.
1615
1616     """
1617     node = self.node
1618     logging.info("Stopping the node daemon and removing configs from node %s",
1619                  node.name)
1620
1621     self.context.RemoveNode(node.name)
1622
1623     self.rpc.call_node_leave_cluster(node.name)
1624
1625     # Promote nodes to master candidate as needed
1626     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1627     node_info = self.cfg.GetAllNodesInfo().values()
1628     num_candidates = len([n for n in node_info
1629                           if n.master_candidate])
1630     num_nodes = len(node_info)
1631     random.shuffle(node_info)
1632     for node in node_info:
1633       if num_candidates >= cp_size or num_candidates >= num_nodes:
1634         break
1635       if node.master_candidate:
1636         continue
1637       node.master_candidate = True
1638       self.LogInfo("Promoting node %s to master candidate", node.name)
1639       self.cfg.Update(node)
1640       self.context.ReaddNode(node)
1641       num_candidates += 1
1642
1643
1644 class LUQueryNodes(NoHooksLU):
1645   """Logical unit for querying nodes.
1646
1647   """
1648   _OP_REQP = ["output_fields", "names"]
1649   REQ_BGL = False
1650   _FIELDS_DYNAMIC = utils.FieldSet(
1651     "dtotal", "dfree",
1652     "mtotal", "mnode", "mfree",
1653     "bootid",
1654     "ctotal",
1655     )
1656
1657   _FIELDS_STATIC = utils.FieldSet(
1658     "name", "pinst_cnt", "sinst_cnt",
1659     "pinst_list", "sinst_list",
1660     "pip", "sip", "tags",
1661     "serial_no",
1662     "master_candidate",
1663     "master",
1664     )
1665
1666   def ExpandNames(self):
1667     _CheckOutputFields(static=self._FIELDS_STATIC,
1668                        dynamic=self._FIELDS_DYNAMIC,
1669                        selected=self.op.output_fields)
1670
1671     self.needed_locks = {}
1672     self.share_locks[locking.LEVEL_NODE] = 1
1673
1674     if self.op.names:
1675       self.wanted = _GetWantedNodes(self, self.op.names)
1676     else:
1677       self.wanted = locking.ALL_SET
1678
1679     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1680     if self.do_locking:
1681       # if we don't request only static fields, we need to lock the nodes
1682       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1683
1684
1685   def CheckPrereq(self):
1686     """Check prerequisites.
1687
1688     """
1689     # The validation of the node list is done in the _GetWantedNodes,
1690     # if non empty, and if empty, there's no validation to do
1691     pass
1692
1693   def Exec(self, feedback_fn):
1694     """Computes the list of nodes and their attributes.
1695
1696     """
1697     all_info = self.cfg.GetAllNodesInfo()
1698     if self.do_locking:
1699       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1700     elif self.wanted != locking.ALL_SET:
1701       nodenames = self.wanted
1702       missing = set(nodenames).difference(all_info.keys())
1703       if missing:
1704         raise errors.OpExecError(
1705           "Some nodes were removed before retrieving their data: %s" % missing)
1706     else:
1707       nodenames = all_info.keys()
1708
1709     nodenames = utils.NiceSort(nodenames)
1710     nodelist = [all_info[name] for name in nodenames]
1711
1712     # begin data gathering
1713
1714     if self.do_locking:
1715       live_data = {}
1716       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1717                                           self.cfg.GetHypervisorType())
1718       for name in nodenames:
1719         nodeinfo = node_data[name]
1720         if not nodeinfo.failed and nodeinfo.data:
1721           nodeinfo = nodeinfo.data
1722           fn = utils.TryConvert
1723           live_data[name] = {
1724             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1725             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1726             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1727             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1728             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1729             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1730             "bootid": nodeinfo.get('bootid', None),
1731             }
1732         else:
1733           live_data[name] = {}
1734     else:
1735       live_data = dict.fromkeys(nodenames, {})
1736
1737     node_to_primary = dict([(name, set()) for name in nodenames])
1738     node_to_secondary = dict([(name, set()) for name in nodenames])
1739
1740     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1741                              "sinst_cnt", "sinst_list"))
1742     if inst_fields & frozenset(self.op.output_fields):
1743       instancelist = self.cfg.GetInstanceList()
1744
1745       for instance_name in instancelist:
1746         inst = self.cfg.GetInstanceInfo(instance_name)
1747         if inst.primary_node in node_to_primary:
1748           node_to_primary[inst.primary_node].add(inst.name)
1749         for secnode in inst.secondary_nodes:
1750           if secnode in node_to_secondary:
1751             node_to_secondary[secnode].add(inst.name)
1752
1753     master_node = self.cfg.GetMasterNode()
1754
1755     # end data gathering
1756
1757     output = []
1758     for node in nodelist:
1759       node_output = []
1760       for field in self.op.output_fields:
1761         if field == "name":
1762           val = node.name
1763         elif field == "pinst_list":
1764           val = list(node_to_primary[node.name])
1765         elif field == "sinst_list":
1766           val = list(node_to_secondary[node.name])
1767         elif field == "pinst_cnt":
1768           val = len(node_to_primary[node.name])
1769         elif field == "sinst_cnt":
1770           val = len(node_to_secondary[node.name])
1771         elif field == "pip":
1772           val = node.primary_ip
1773         elif field == "sip":
1774           val = node.secondary_ip
1775         elif field == "tags":
1776           val = list(node.GetTags())
1777         elif field == "serial_no":
1778           val = node.serial_no
1779         elif field == "master_candidate":
1780           val = node.master_candidate
1781         elif field == "master":
1782           val = node.name == master_node
1783         elif self._FIELDS_DYNAMIC.Matches(field):
1784           val = live_data[node.name].get(field, None)
1785         else:
1786           raise errors.ParameterError(field)
1787         node_output.append(val)
1788       output.append(node_output)
1789
1790     return output
1791
1792
1793 class LUQueryNodeVolumes(NoHooksLU):
1794   """Logical unit for getting volumes on node(s).
1795
1796   """
1797   _OP_REQP = ["nodes", "output_fields"]
1798   REQ_BGL = False
1799   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1800   _FIELDS_STATIC = utils.FieldSet("node")
1801
1802   def ExpandNames(self):
1803     _CheckOutputFields(static=self._FIELDS_STATIC,
1804                        dynamic=self._FIELDS_DYNAMIC,
1805                        selected=self.op.output_fields)
1806
1807     self.needed_locks = {}
1808     self.share_locks[locking.LEVEL_NODE] = 1
1809     if not self.op.nodes:
1810       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1811     else:
1812       self.needed_locks[locking.LEVEL_NODE] = \
1813         _GetWantedNodes(self, self.op.nodes)
1814
1815   def CheckPrereq(self):
1816     """Check prerequisites.
1817
1818     This checks that the fields required are valid output fields.
1819
1820     """
1821     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1822
1823   def Exec(self, feedback_fn):
1824     """Computes the list of nodes and their attributes.
1825
1826     """
1827     nodenames = self.nodes
1828     volumes = self.rpc.call_node_volumes(nodenames)
1829
1830     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1831              in self.cfg.GetInstanceList()]
1832
1833     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1834
1835     output = []
1836     for node in nodenames:
1837       if node not in volumes or volumes[node].failed or not volumes[node].data:
1838         continue
1839
1840       node_vols = volumes[node].data[:]
1841       node_vols.sort(key=lambda vol: vol['dev'])
1842
1843       for vol in node_vols:
1844         node_output = []
1845         for field in self.op.output_fields:
1846           if field == "node":
1847             val = node
1848           elif field == "phys":
1849             val = vol['dev']
1850           elif field == "vg":
1851             val = vol['vg']
1852           elif field == "name":
1853             val = vol['name']
1854           elif field == "size":
1855             val = int(float(vol['size']))
1856           elif field == "instance":
1857             for inst in ilist:
1858               if node not in lv_by_node[inst]:
1859                 continue
1860               if vol['name'] in lv_by_node[inst][node]:
1861                 val = inst.name
1862                 break
1863             else:
1864               val = '-'
1865           else:
1866             raise errors.ParameterError(field)
1867           node_output.append(str(val))
1868
1869         output.append(node_output)
1870
1871     return output
1872
1873
1874 class LUAddNode(LogicalUnit):
1875   """Logical unit for adding node to the cluster.
1876
1877   """
1878   HPATH = "node-add"
1879   HTYPE = constants.HTYPE_NODE
1880   _OP_REQP = ["node_name"]
1881
1882   def BuildHooksEnv(self):
1883     """Build hooks env.
1884
1885     This will run on all nodes before, and on all nodes + the new node after.
1886
1887     """
1888     env = {
1889       "OP_TARGET": self.op.node_name,
1890       "NODE_NAME": self.op.node_name,
1891       "NODE_PIP": self.op.primary_ip,
1892       "NODE_SIP": self.op.secondary_ip,
1893       }
1894     nodes_0 = self.cfg.GetNodeList()
1895     nodes_1 = nodes_0 + [self.op.node_name, ]
1896     return env, nodes_0, nodes_1
1897
1898   def CheckPrereq(self):
1899     """Check prerequisites.
1900
1901     This checks:
1902      - the new node is not already in the config
1903      - it is resolvable
1904      - its parameters (single/dual homed) matches the cluster
1905
1906     Any errors are signalled by raising errors.OpPrereqError.
1907
1908     """
1909     node_name = self.op.node_name
1910     cfg = self.cfg
1911
1912     dns_data = utils.HostInfo(node_name)
1913
1914     node = dns_data.name
1915     primary_ip = self.op.primary_ip = dns_data.ip
1916     secondary_ip = getattr(self.op, "secondary_ip", None)
1917     if secondary_ip is None:
1918       secondary_ip = primary_ip
1919     if not utils.IsValidIP(secondary_ip):
1920       raise errors.OpPrereqError("Invalid secondary IP given")
1921     self.op.secondary_ip = secondary_ip
1922
1923     node_list = cfg.GetNodeList()
1924     if not self.op.readd and node in node_list:
1925       raise errors.OpPrereqError("Node %s is already in the configuration" %
1926                                  node)
1927     elif self.op.readd and node not in node_list:
1928       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1929
1930     for existing_node_name in node_list:
1931       existing_node = cfg.GetNodeInfo(existing_node_name)
1932
1933       if self.op.readd and node == existing_node_name:
1934         if (existing_node.primary_ip != primary_ip or
1935             existing_node.secondary_ip != secondary_ip):
1936           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1937                                      " address configuration as before")
1938         continue
1939
1940       if (existing_node.primary_ip == primary_ip or
1941           existing_node.secondary_ip == primary_ip or
1942           existing_node.primary_ip == secondary_ip or
1943           existing_node.secondary_ip == secondary_ip):
1944         raise errors.OpPrereqError("New node ip address(es) conflict with"
1945                                    " existing node %s" % existing_node.name)
1946
1947     # check that the type of the node (single versus dual homed) is the
1948     # same as for the master
1949     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1950     master_singlehomed = myself.secondary_ip == myself.primary_ip
1951     newbie_singlehomed = secondary_ip == primary_ip
1952     if master_singlehomed != newbie_singlehomed:
1953       if master_singlehomed:
1954         raise errors.OpPrereqError("The master has no private ip but the"
1955                                    " new node has one")
1956       else:
1957         raise errors.OpPrereqError("The master has a private ip but the"
1958                                    " new node doesn't have one")
1959
1960     # checks reachablity
1961     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1962       raise errors.OpPrereqError("Node not reachable by ping")
1963
1964     if not newbie_singlehomed:
1965       # check reachability from my secondary ip to newbie's secondary ip
1966       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1967                            source=myself.secondary_ip):
1968         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1969                                    " based ping to noded port")
1970
1971     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1972     node_info = self.cfg.GetAllNodesInfo().values()
1973     num_candidates = len([n for n in node_info
1974                           if n.master_candidate])
1975     master_candidate = num_candidates < cp_size
1976
1977     self.new_node = objects.Node(name=node,
1978                                  primary_ip=primary_ip,
1979                                  secondary_ip=secondary_ip,
1980                                  master_candidate=master_candidate)
1981
1982   def Exec(self, feedback_fn):
1983     """Adds the new node to the cluster.
1984
1985     """
1986     new_node = self.new_node
1987     node = new_node.name
1988
1989     # check connectivity
1990     result = self.rpc.call_version([node])[node]
1991     result.Raise()
1992     if result.data:
1993       if constants.PROTOCOL_VERSION == result.data:
1994         logging.info("Communication to node %s fine, sw version %s match",
1995                      node, result.data)
1996       else:
1997         raise errors.OpExecError("Version mismatch master version %s,"
1998                                  " node version %s" %
1999                                  (constants.PROTOCOL_VERSION, result.data))
2000     else:
2001       raise errors.OpExecError("Cannot get version from the new node")
2002
2003     # setup ssh on node
2004     logging.info("Copy ssh key to node %s", node)
2005     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2006     keyarray = []
2007     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2008                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2009                 priv_key, pub_key]
2010
2011     for i in keyfiles:
2012       f = open(i, 'r')
2013       try:
2014         keyarray.append(f.read())
2015       finally:
2016         f.close()
2017
2018     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2019                                     keyarray[2],
2020                                     keyarray[3], keyarray[4], keyarray[5])
2021
2022     if result.failed or not result.data:
2023       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2024
2025     # Add node to our /etc/hosts, and add key to known_hosts
2026     utils.AddHostToEtcHosts(new_node.name)
2027
2028     if new_node.secondary_ip != new_node.primary_ip:
2029       result = self.rpc.call_node_has_ip_address(new_node.name,
2030                                                  new_node.secondary_ip)
2031       if result.failed or not result.data:
2032         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2033                                  " you gave (%s). Please fix and re-run this"
2034                                  " command." % new_node.secondary_ip)
2035
2036     node_verify_list = [self.cfg.GetMasterNode()]
2037     node_verify_param = {
2038       'nodelist': [node],
2039       # TODO: do a node-net-test as well?
2040     }
2041
2042     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2043                                        self.cfg.GetClusterName())
2044     for verifier in node_verify_list:
2045       if result[verifier].failed or not result[verifier].data:
2046         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2047                                  " for remote verification" % verifier)
2048       if result[verifier].data['nodelist']:
2049         for failed in result[verifier].data['nodelist']:
2050           feedback_fn("ssh/hostname verification failed %s -> %s" %
2051                       (verifier, result[verifier]['nodelist'][failed]))
2052         raise errors.OpExecError("ssh/hostname verification failed.")
2053
2054     # Distribute updated /etc/hosts and known_hosts to all nodes,
2055     # including the node just added
2056     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2057     dist_nodes = self.cfg.GetNodeList()
2058     if not self.op.readd:
2059       dist_nodes.append(node)
2060     if myself.name in dist_nodes:
2061       dist_nodes.remove(myself.name)
2062
2063     logging.debug("Copying hosts and known_hosts to all nodes")
2064     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2065       result = self.rpc.call_upload_file(dist_nodes, fname)
2066       for to_node, to_result in result.iteritems():
2067         if to_result.failed or not to_result.data:
2068           logging.error("Copy of file %s to node %s failed", fname, to_node)
2069
2070     to_copy = []
2071     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2072       to_copy.append(constants.VNC_PASSWORD_FILE)
2073     for fname in to_copy:
2074       result = self.rpc.call_upload_file([node], fname)
2075       if result[node].failed or not result[node]:
2076         logging.error("Could not copy file %s to node %s", fname, node)
2077
2078     if self.op.readd:
2079       self.context.ReaddNode(new_node)
2080     else:
2081       self.context.AddNode(new_node)
2082
2083
2084 class LUSetNodeParams(LogicalUnit):
2085   """Modifies the parameters of a node.
2086
2087   """
2088   HPATH = "node-modify"
2089   HTYPE = constants.HTYPE_NODE
2090   _OP_REQP = ["node_name"]
2091   REQ_BGL = False
2092
2093   def CheckArguments(self):
2094     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2095     if node_name is None:
2096       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2097     self.op.node_name = node_name
2098     if not hasattr(self.op, 'master_candidate'):
2099       raise errors.OpPrereqError("Please pass at least one modification")
2100     self.op.master_candidate = bool(self.op.master_candidate)
2101
2102   def ExpandNames(self):
2103     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2104
2105   def BuildHooksEnv(self):
2106     """Build hooks env.
2107
2108     This runs on the master node.
2109
2110     """
2111     env = {
2112       "OP_TARGET": self.op.node_name,
2113       "MASTER_CANDIDATE": str(self.op.master_candidate),
2114       }
2115     nl = [self.cfg.GetMasterNode(),
2116           self.op.node_name]
2117     return env, nl, nl
2118
2119   def CheckPrereq(self):
2120     """Check prerequisites.
2121
2122     This only checks the instance list against the existing names.
2123
2124     """
2125     force = self.force = self.op.force
2126
2127     if self.op.master_candidate == False:
2128       if self.op.node_name == self.cfg.GetMasterNode():
2129         raise errors.OpPrereqError("The master node has to be a"
2130                                    " master candidate")
2131       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2132       node_info = self.cfg.GetAllNodesInfo().values()
2133       num_candidates = len([node for node in node_info
2134                             if node.master_candidate])
2135       if num_candidates <= cp_size:
2136         msg = ("Not enough master candidates (desired"
2137                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2138         if force:
2139           self.LogWarning(msg)
2140         else:
2141           raise errors.OpPrereqError(msg)
2142
2143     return
2144
2145   def Exec(self, feedback_fn):
2146     """Modifies a node.
2147
2148     """
2149     node = self.cfg.GetNodeInfo(self.op.node_name)
2150
2151     result = []
2152
2153     if self.op.master_candidate is not None:
2154       node.master_candidate = self.op.master_candidate
2155       result.append(("master_candidate", str(self.op.master_candidate)))
2156
2157     # this will trigger configuration file update, if needed
2158     self.cfg.Update(node)
2159     # this will trigger job queue propagation or cleanup
2160     if self.op.node_name != self.cfg.GetMasterNode():
2161       self.context.ReaddNode(node)
2162
2163     return result
2164
2165
2166 class LUQueryClusterInfo(NoHooksLU):
2167   """Query cluster configuration.
2168
2169   """
2170   _OP_REQP = []
2171   REQ_BGL = False
2172
2173   def ExpandNames(self):
2174     self.needed_locks = {}
2175
2176   def CheckPrereq(self):
2177     """No prerequsites needed for this LU.
2178
2179     """
2180     pass
2181
2182   def Exec(self, feedback_fn):
2183     """Return cluster config.
2184
2185     """
2186     cluster = self.cfg.GetClusterInfo()
2187     result = {
2188       "software_version": constants.RELEASE_VERSION,
2189       "protocol_version": constants.PROTOCOL_VERSION,
2190       "config_version": constants.CONFIG_VERSION,
2191       "os_api_version": constants.OS_API_VERSION,
2192       "export_version": constants.EXPORT_VERSION,
2193       "architecture": (platform.architecture()[0], platform.machine()),
2194       "name": cluster.cluster_name,
2195       "master": cluster.master_node,
2196       "default_hypervisor": cluster.default_hypervisor,
2197       "enabled_hypervisors": cluster.enabled_hypervisors,
2198       "hvparams": cluster.hvparams,
2199       "beparams": cluster.beparams,
2200       "candidate_pool_size": cluster.candidate_pool_size,
2201       }
2202
2203     return result
2204
2205
2206 class LUQueryConfigValues(NoHooksLU):
2207   """Return configuration values.
2208
2209   """
2210   _OP_REQP = []
2211   REQ_BGL = False
2212   _FIELDS_DYNAMIC = utils.FieldSet()
2213   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2214
2215   def ExpandNames(self):
2216     self.needed_locks = {}
2217
2218     _CheckOutputFields(static=self._FIELDS_STATIC,
2219                        dynamic=self._FIELDS_DYNAMIC,
2220                        selected=self.op.output_fields)
2221
2222   def CheckPrereq(self):
2223     """No prerequisites.
2224
2225     """
2226     pass
2227
2228   def Exec(self, feedback_fn):
2229     """Dump a representation of the cluster config to the standard output.
2230
2231     """
2232     values = []
2233     for field in self.op.output_fields:
2234       if field == "cluster_name":
2235         entry = self.cfg.GetClusterName()
2236       elif field == "master_node":
2237         entry = self.cfg.GetMasterNode()
2238       elif field == "drain_flag":
2239         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2240       else:
2241         raise errors.ParameterError(field)
2242       values.append(entry)
2243     return values
2244
2245
2246 class LUActivateInstanceDisks(NoHooksLU):
2247   """Bring up an instance's disks.
2248
2249   """
2250   _OP_REQP = ["instance_name"]
2251   REQ_BGL = False
2252
2253   def ExpandNames(self):
2254     self._ExpandAndLockInstance()
2255     self.needed_locks[locking.LEVEL_NODE] = []
2256     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2257
2258   def DeclareLocks(self, level):
2259     if level == locking.LEVEL_NODE:
2260       self._LockInstancesNodes()
2261
2262   def CheckPrereq(self):
2263     """Check prerequisites.
2264
2265     This checks that the instance is in the cluster.
2266
2267     """
2268     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2269     assert self.instance is not None, \
2270       "Cannot retrieve locked instance %s" % self.op.instance_name
2271
2272   def Exec(self, feedback_fn):
2273     """Activate the disks.
2274
2275     """
2276     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2277     if not disks_ok:
2278       raise errors.OpExecError("Cannot activate block devices")
2279
2280     return disks_info
2281
2282
2283 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2284   """Prepare the block devices for an instance.
2285
2286   This sets up the block devices on all nodes.
2287
2288   @type lu: L{LogicalUnit}
2289   @param lu: the logical unit on whose behalf we execute
2290   @type instance: L{objects.Instance}
2291   @param instance: the instance for whose disks we assemble
2292   @type ignore_secondaries: boolean
2293   @param ignore_secondaries: if true, errors on secondary nodes
2294       won't result in an error return from the function
2295   @return: False if the operation failed, otherwise a list of
2296       (host, instance_visible_name, node_visible_name)
2297       with the mapping from node devices to instance devices
2298
2299   """
2300   device_info = []
2301   disks_ok = True
2302   iname = instance.name
2303   # With the two passes mechanism we try to reduce the window of
2304   # opportunity for the race condition of switching DRBD to primary
2305   # before handshaking occured, but we do not eliminate it
2306
2307   # The proper fix would be to wait (with some limits) until the
2308   # connection has been made and drbd transitions from WFConnection
2309   # into any other network-connected state (Connected, SyncTarget,
2310   # SyncSource, etc.)
2311
2312   # 1st pass, assemble on all nodes in secondary mode
2313   for inst_disk in instance.disks:
2314     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2315       lu.cfg.SetDiskID(node_disk, node)
2316       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2317       if result.failed or not result:
2318         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2319                            " (is_primary=False, pass=1)",
2320                            inst_disk.iv_name, node)
2321         if not ignore_secondaries:
2322           disks_ok = False
2323
2324   # FIXME: race condition on drbd migration to primary
2325
2326   # 2nd pass, do only the primary node
2327   for inst_disk in instance.disks:
2328     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2329       if node != instance.primary_node:
2330         continue
2331       lu.cfg.SetDiskID(node_disk, node)
2332       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2333       if result.failed or not result:
2334         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2335                            " (is_primary=True, pass=2)",
2336                            inst_disk.iv_name, node)
2337         disks_ok = False
2338     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2339
2340   # leave the disks configured for the primary node
2341   # this is a workaround that would be fixed better by
2342   # improving the logical/physical id handling
2343   for disk in instance.disks:
2344     lu.cfg.SetDiskID(disk, instance.primary_node)
2345
2346   return disks_ok, device_info
2347
2348
2349 def _StartInstanceDisks(lu, instance, force):
2350   """Start the disks of an instance.
2351
2352   """
2353   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2354                                            ignore_secondaries=force)
2355   if not disks_ok:
2356     _ShutdownInstanceDisks(lu, instance)
2357     if force is not None and not force:
2358       lu.proc.LogWarning("", hint="If the message above refers to a"
2359                          " secondary node,"
2360                          " you can retry the operation using '--force'.")
2361     raise errors.OpExecError("Disk consistency error")
2362
2363
2364 class LUDeactivateInstanceDisks(NoHooksLU):
2365   """Shutdown an instance's disks.
2366
2367   """
2368   _OP_REQP = ["instance_name"]
2369   REQ_BGL = False
2370
2371   def ExpandNames(self):
2372     self._ExpandAndLockInstance()
2373     self.needed_locks[locking.LEVEL_NODE] = []
2374     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2375
2376   def DeclareLocks(self, level):
2377     if level == locking.LEVEL_NODE:
2378       self._LockInstancesNodes()
2379
2380   def CheckPrereq(self):
2381     """Check prerequisites.
2382
2383     This checks that the instance is in the cluster.
2384
2385     """
2386     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2387     assert self.instance is not None, \
2388       "Cannot retrieve locked instance %s" % self.op.instance_name
2389
2390   def Exec(self, feedback_fn):
2391     """Deactivate the disks
2392
2393     """
2394     instance = self.instance
2395     _SafeShutdownInstanceDisks(self, instance)
2396
2397
2398 def _SafeShutdownInstanceDisks(lu, instance):
2399   """Shutdown block devices of an instance.
2400
2401   This function checks if an instance is running, before calling
2402   _ShutdownInstanceDisks.
2403
2404   """
2405   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2406                                       [instance.hypervisor])
2407   ins_l = ins_l[instance.primary_node]
2408   if ins_l.failed or not isinstance(ins_l.data, list):
2409     raise errors.OpExecError("Can't contact node '%s'" %
2410                              instance.primary_node)
2411
2412   if instance.name in ins_l.data:
2413     raise errors.OpExecError("Instance is running, can't shutdown"
2414                              " block devices.")
2415
2416   _ShutdownInstanceDisks(lu, instance)
2417
2418
2419 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2420   """Shutdown block devices of an instance.
2421
2422   This does the shutdown on all nodes of the instance.
2423
2424   If the ignore_primary is false, errors on the primary node are
2425   ignored.
2426
2427   """
2428   result = True
2429   for disk in instance.disks:
2430     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2431       lu.cfg.SetDiskID(top_disk, node)
2432       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2433       if result.failed or not result.data:
2434         logging.error("Could not shutdown block device %s on node %s",
2435                       disk.iv_name, node)
2436         if not ignore_primary or node != instance.primary_node:
2437           result = False
2438   return result
2439
2440
2441 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2442   """Checks if a node has enough free memory.
2443
2444   This function check if a given node has the needed amount of free
2445   memory. In case the node has less memory or we cannot get the
2446   information from the node, this function raise an OpPrereqError
2447   exception.
2448
2449   @type lu: C{LogicalUnit}
2450   @param lu: a logical unit from which we get configuration data
2451   @type node: C{str}
2452   @param node: the node to check
2453   @type reason: C{str}
2454   @param reason: string to use in the error message
2455   @type requested: C{int}
2456   @param requested: the amount of memory in MiB to check for
2457   @type hypervisor: C{str}
2458   @param hypervisor: the hypervisor to ask for memory stats
2459   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2460       we cannot check the node
2461
2462   """
2463   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2464   nodeinfo[node].Raise()
2465   free_mem = nodeinfo[node].data.get('memory_free')
2466   if not isinstance(free_mem, int):
2467     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2468                              " was '%s'" % (node, free_mem))
2469   if requested > free_mem:
2470     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2471                              " needed %s MiB, available %s MiB" %
2472                              (node, reason, requested, free_mem))
2473
2474
2475 class LUStartupInstance(LogicalUnit):
2476   """Starts an instance.
2477
2478   """
2479   HPATH = "instance-start"
2480   HTYPE = constants.HTYPE_INSTANCE
2481   _OP_REQP = ["instance_name", "force"]
2482   REQ_BGL = False
2483
2484   def ExpandNames(self):
2485     self._ExpandAndLockInstance()
2486
2487   def BuildHooksEnv(self):
2488     """Build hooks env.
2489
2490     This runs on master, primary and secondary nodes of the instance.
2491
2492     """
2493     env = {
2494       "FORCE": self.op.force,
2495       }
2496     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2497     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2498           list(self.instance.secondary_nodes))
2499     return env, nl, nl
2500
2501   def CheckPrereq(self):
2502     """Check prerequisites.
2503
2504     This checks that the instance is in the cluster.
2505
2506     """
2507     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2508     assert self.instance is not None, \
2509       "Cannot retrieve locked instance %s" % self.op.instance_name
2510
2511     bep = self.cfg.GetClusterInfo().FillBE(instance)
2512     # check bridges existance
2513     _CheckInstanceBridgesExist(self, instance)
2514
2515     _CheckNodeFreeMemory(self, instance.primary_node,
2516                          "starting instance %s" % instance.name,
2517                          bep[constants.BE_MEMORY], instance.hypervisor)
2518
2519   def Exec(self, feedback_fn):
2520     """Start the instance.
2521
2522     """
2523     instance = self.instance
2524     force = self.op.force
2525     extra_args = getattr(self.op, "extra_args", "")
2526
2527     self.cfg.MarkInstanceUp(instance.name)
2528
2529     node_current = instance.primary_node
2530
2531     _StartInstanceDisks(self, instance, force)
2532
2533     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2534     if result.failed or not result.data:
2535       _ShutdownInstanceDisks(self, instance)
2536       raise errors.OpExecError("Could not start instance")
2537
2538
2539 class LURebootInstance(LogicalUnit):
2540   """Reboot an instance.
2541
2542   """
2543   HPATH = "instance-reboot"
2544   HTYPE = constants.HTYPE_INSTANCE
2545   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2546   REQ_BGL = False
2547
2548   def ExpandNames(self):
2549     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2550                                    constants.INSTANCE_REBOOT_HARD,
2551                                    constants.INSTANCE_REBOOT_FULL]:
2552       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2553                                   (constants.INSTANCE_REBOOT_SOFT,
2554                                    constants.INSTANCE_REBOOT_HARD,
2555                                    constants.INSTANCE_REBOOT_FULL))
2556     self._ExpandAndLockInstance()
2557
2558   def BuildHooksEnv(self):
2559     """Build hooks env.
2560
2561     This runs on master, primary and secondary nodes of the instance.
2562
2563     """
2564     env = {
2565       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2566       }
2567     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2568     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2569           list(self.instance.secondary_nodes))
2570     return env, nl, nl
2571
2572   def CheckPrereq(self):
2573     """Check prerequisites.
2574
2575     This checks that the instance is in the cluster.
2576
2577     """
2578     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2579     assert self.instance is not None, \
2580       "Cannot retrieve locked instance %s" % self.op.instance_name
2581
2582     # check bridges existance
2583     _CheckInstanceBridgesExist(self, instance)
2584
2585   def Exec(self, feedback_fn):
2586     """Reboot the instance.
2587
2588     """
2589     instance = self.instance
2590     ignore_secondaries = self.op.ignore_secondaries
2591     reboot_type = self.op.reboot_type
2592     extra_args = getattr(self.op, "extra_args", "")
2593
2594     node_current = instance.primary_node
2595
2596     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2597                        constants.INSTANCE_REBOOT_HARD]:
2598       result = self.rpc.call_instance_reboot(node_current, instance,
2599                                              reboot_type, extra_args)
2600       if result.failed or not result.data:
2601         raise errors.OpExecError("Could not reboot instance")
2602     else:
2603       if not self.rpc.call_instance_shutdown(node_current, instance):
2604         raise errors.OpExecError("could not shutdown instance for full reboot")
2605       _ShutdownInstanceDisks(self, instance)
2606       _StartInstanceDisks(self, instance, ignore_secondaries)
2607       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2608       if result.failed or not result.data:
2609         _ShutdownInstanceDisks(self, instance)
2610         raise errors.OpExecError("Could not start instance for full reboot")
2611
2612     self.cfg.MarkInstanceUp(instance.name)
2613
2614
2615 class LUShutdownInstance(LogicalUnit):
2616   """Shutdown an instance.
2617
2618   """
2619   HPATH = "instance-stop"
2620   HTYPE = constants.HTYPE_INSTANCE
2621   _OP_REQP = ["instance_name"]
2622   REQ_BGL = False
2623
2624   def ExpandNames(self):
2625     self._ExpandAndLockInstance()
2626
2627   def BuildHooksEnv(self):
2628     """Build hooks env.
2629
2630     This runs on master, primary and secondary nodes of the instance.
2631
2632     """
2633     env = _BuildInstanceHookEnvByObject(self, self.instance)
2634     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2635           list(self.instance.secondary_nodes))
2636     return env, nl, nl
2637
2638   def CheckPrereq(self):
2639     """Check prerequisites.
2640
2641     This checks that the instance is in the cluster.
2642
2643     """
2644     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2645     assert self.instance is not None, \
2646       "Cannot retrieve locked instance %s" % self.op.instance_name
2647
2648   def Exec(self, feedback_fn):
2649     """Shutdown the instance.
2650
2651     """
2652     instance = self.instance
2653     node_current = instance.primary_node
2654     self.cfg.MarkInstanceDown(instance.name)
2655     result = self.rpc.call_instance_shutdown(node_current, instance)
2656     if result.failed or not result.data:
2657       self.proc.LogWarning("Could not shutdown instance")
2658
2659     _ShutdownInstanceDisks(self, instance)
2660
2661
2662 class LUReinstallInstance(LogicalUnit):
2663   """Reinstall an instance.
2664
2665   """
2666   HPATH = "instance-reinstall"
2667   HTYPE = constants.HTYPE_INSTANCE
2668   _OP_REQP = ["instance_name"]
2669   REQ_BGL = False
2670
2671   def ExpandNames(self):
2672     self._ExpandAndLockInstance()
2673
2674   def BuildHooksEnv(self):
2675     """Build hooks env.
2676
2677     This runs on master, primary and secondary nodes of the instance.
2678
2679     """
2680     env = _BuildInstanceHookEnvByObject(self, self.instance)
2681     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2682           list(self.instance.secondary_nodes))
2683     return env, nl, nl
2684
2685   def CheckPrereq(self):
2686     """Check prerequisites.
2687
2688     This checks that the instance is in the cluster and is not running.
2689
2690     """
2691     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2692     assert instance is not None, \
2693       "Cannot retrieve locked instance %s" % self.op.instance_name
2694
2695     if instance.disk_template == constants.DT_DISKLESS:
2696       raise errors.OpPrereqError("Instance '%s' has no disks" %
2697                                  self.op.instance_name)
2698     if instance.status != "down":
2699       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2700                                  self.op.instance_name)
2701     remote_info = self.rpc.call_instance_info(instance.primary_node,
2702                                               instance.name,
2703                                               instance.hypervisor)
2704     if remote_info.failed or remote_info.data:
2705       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2706                                  (self.op.instance_name,
2707                                   instance.primary_node))
2708
2709     self.op.os_type = getattr(self.op, "os_type", None)
2710     if self.op.os_type is not None:
2711       # OS verification
2712       pnode = self.cfg.GetNodeInfo(
2713         self.cfg.ExpandNodeName(instance.primary_node))
2714       if pnode is None:
2715         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2716                                    self.op.pnode)
2717       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2718       result.Raise()
2719       if not isinstance(result.data, objects.OS):
2720         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2721                                    " primary node"  % self.op.os_type)
2722
2723     self.instance = instance
2724
2725   def Exec(self, feedback_fn):
2726     """Reinstall the instance.
2727
2728     """
2729     inst = self.instance
2730
2731     if self.op.os_type is not None:
2732       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2733       inst.os = self.op.os_type
2734       self.cfg.Update(inst)
2735
2736     _StartInstanceDisks(self, inst, None)
2737     try:
2738       feedback_fn("Running the instance OS create scripts...")
2739       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2740       result.Raise()
2741       if not result.data:
2742         raise errors.OpExecError("Could not install OS for instance %s"
2743                                  " on node %s" %
2744                                  (inst.name, inst.primary_node))
2745     finally:
2746       _ShutdownInstanceDisks(self, inst)
2747
2748
2749 class LURenameInstance(LogicalUnit):
2750   """Rename an instance.
2751
2752   """
2753   HPATH = "instance-rename"
2754   HTYPE = constants.HTYPE_INSTANCE
2755   _OP_REQP = ["instance_name", "new_name"]
2756
2757   def BuildHooksEnv(self):
2758     """Build hooks env.
2759
2760     This runs on master, primary and secondary nodes of the instance.
2761
2762     """
2763     env = _BuildInstanceHookEnvByObject(self, self.instance)
2764     env["INSTANCE_NEW_NAME"] = self.op.new_name
2765     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2766           list(self.instance.secondary_nodes))
2767     return env, nl, nl
2768
2769   def CheckPrereq(self):
2770     """Check prerequisites.
2771
2772     This checks that the instance is in the cluster and is not running.
2773
2774     """
2775     instance = self.cfg.GetInstanceInfo(
2776       self.cfg.ExpandInstanceName(self.op.instance_name))
2777     if instance is None:
2778       raise errors.OpPrereqError("Instance '%s' not known" %
2779                                  self.op.instance_name)
2780     if instance.status != "down":
2781       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2782                                  self.op.instance_name)
2783     remote_info = self.rpc.call_instance_info(instance.primary_node,
2784                                               instance.name,
2785                                               instance.hypervisor)
2786     remote_info.Raise()
2787     if remote_info.data:
2788       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2789                                  (self.op.instance_name,
2790                                   instance.primary_node))
2791     self.instance = instance
2792
2793     # new name verification
2794     name_info = utils.HostInfo(self.op.new_name)
2795
2796     self.op.new_name = new_name = name_info.name
2797     instance_list = self.cfg.GetInstanceList()
2798     if new_name in instance_list:
2799       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2800                                  new_name)
2801
2802     if not getattr(self.op, "ignore_ip", False):
2803       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2804         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2805                                    (name_info.ip, new_name))
2806
2807
2808   def Exec(self, feedback_fn):
2809     """Reinstall the instance.
2810
2811     """
2812     inst = self.instance
2813     old_name = inst.name
2814
2815     if inst.disk_template == constants.DT_FILE:
2816       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2817
2818     self.cfg.RenameInstance(inst.name, self.op.new_name)
2819     # Change the instance lock. This is definitely safe while we hold the BGL
2820     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2821     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2822
2823     # re-read the instance from the configuration after rename
2824     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2825
2826     if inst.disk_template == constants.DT_FILE:
2827       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2828       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2829                                                      old_file_storage_dir,
2830                                                      new_file_storage_dir)
2831       result.Raise()
2832       if not result.data:
2833         raise errors.OpExecError("Could not connect to node '%s' to rename"
2834                                  " directory '%s' to '%s' (but the instance"
2835                                  " has been renamed in Ganeti)" % (
2836                                  inst.primary_node, old_file_storage_dir,
2837                                  new_file_storage_dir))
2838
2839       if not result.data[0]:
2840         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2841                                  " (but the instance has been renamed in"
2842                                  " Ganeti)" % (old_file_storage_dir,
2843                                                new_file_storage_dir))
2844
2845     _StartInstanceDisks(self, inst, None)
2846     try:
2847       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2848                                                  old_name)
2849       if result.failed or not result.data:
2850         msg = ("Could not run OS rename script for instance %s on node %s"
2851                " (but the instance has been renamed in Ganeti)" %
2852                (inst.name, inst.primary_node))
2853         self.proc.LogWarning(msg)
2854     finally:
2855       _ShutdownInstanceDisks(self, inst)
2856
2857
2858 class LURemoveInstance(LogicalUnit):
2859   """Remove an instance.
2860
2861   """
2862   HPATH = "instance-remove"
2863   HTYPE = constants.HTYPE_INSTANCE
2864   _OP_REQP = ["instance_name", "ignore_failures"]
2865   REQ_BGL = False
2866
2867   def ExpandNames(self):
2868     self._ExpandAndLockInstance()
2869     self.needed_locks[locking.LEVEL_NODE] = []
2870     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2871
2872   def DeclareLocks(self, level):
2873     if level == locking.LEVEL_NODE:
2874       self._LockInstancesNodes()
2875
2876   def BuildHooksEnv(self):
2877     """Build hooks env.
2878
2879     This runs on master, primary and secondary nodes of the instance.
2880
2881     """
2882     env = _BuildInstanceHookEnvByObject(self, self.instance)
2883     nl = [self.cfg.GetMasterNode()]
2884     return env, nl, nl
2885
2886   def CheckPrereq(self):
2887     """Check prerequisites.
2888
2889     This checks that the instance is in the cluster.
2890
2891     """
2892     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2893     assert self.instance is not None, \
2894       "Cannot retrieve locked instance %s" % self.op.instance_name
2895
2896   def Exec(self, feedback_fn):
2897     """Remove the instance.
2898
2899     """
2900     instance = self.instance
2901     logging.info("Shutting down instance %s on node %s",
2902                  instance.name, instance.primary_node)
2903
2904     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2905     if result.failed or not result.data:
2906       if self.op.ignore_failures:
2907         feedback_fn("Warning: can't shutdown instance")
2908       else:
2909         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2910                                  (instance.name, instance.primary_node))
2911
2912     logging.info("Removing block devices for instance %s", instance.name)
2913
2914     if not _RemoveDisks(self, instance):
2915       if self.op.ignore_failures:
2916         feedback_fn("Warning: can't remove instance's disks")
2917       else:
2918         raise errors.OpExecError("Can't remove instance's disks")
2919
2920     logging.info("Removing instance %s out of cluster config", instance.name)
2921
2922     self.cfg.RemoveInstance(instance.name)
2923     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2924
2925
2926 class LUQueryInstances(NoHooksLU):
2927   """Logical unit for querying instances.
2928
2929   """
2930   _OP_REQP = ["output_fields", "names"]
2931   REQ_BGL = False
2932   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2933                                     "admin_state", "admin_ram",
2934                                     "disk_template", "ip", "mac", "bridge",
2935                                     "sda_size", "sdb_size", "vcpus", "tags",
2936                                     "network_port", "beparams",
2937                                     "(disk).(size)/([0-9]+)",
2938                                     "(disk).(sizes)",
2939                                     "(nic).(mac|ip|bridge)/([0-9]+)",
2940                                     "(nic).(macs|ips|bridges)",
2941                                     "(disk|nic).(count)",
2942                                     "serial_no", "hypervisor", "hvparams",] +
2943                                   ["hv/%s" % name
2944                                    for name in constants.HVS_PARAMETERS] +
2945                                   ["be/%s" % name
2946                                    for name in constants.BES_PARAMETERS])
2947   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2948
2949
2950   def ExpandNames(self):
2951     _CheckOutputFields(static=self._FIELDS_STATIC,
2952                        dynamic=self._FIELDS_DYNAMIC,
2953                        selected=self.op.output_fields)
2954
2955     self.needed_locks = {}
2956     self.share_locks[locking.LEVEL_INSTANCE] = 1
2957     self.share_locks[locking.LEVEL_NODE] = 1
2958
2959     if self.op.names:
2960       self.wanted = _GetWantedInstances(self, self.op.names)
2961     else:
2962       self.wanted = locking.ALL_SET
2963
2964     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2965     if self.do_locking:
2966       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2967       self.needed_locks[locking.LEVEL_NODE] = []
2968       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2969
2970   def DeclareLocks(self, level):
2971     if level == locking.LEVEL_NODE and self.do_locking:
2972       self._LockInstancesNodes()
2973
2974   def CheckPrereq(self):
2975     """Check prerequisites.
2976
2977     """
2978     pass
2979
2980   def Exec(self, feedback_fn):
2981     """Computes the list of nodes and their attributes.
2982
2983     """
2984     all_info = self.cfg.GetAllInstancesInfo()
2985     if self.do_locking:
2986       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2987     elif self.wanted != locking.ALL_SET:
2988       instance_names = self.wanted
2989       missing = set(instance_names).difference(all_info.keys())
2990       if missing:
2991         raise errors.OpExecError(
2992           "Some instances were removed before retrieving their data: %s"
2993           % missing)
2994     else:
2995       instance_names = all_info.keys()
2996
2997     instance_names = utils.NiceSort(instance_names)
2998     instance_list = [all_info[iname] for iname in instance_names]
2999
3000     # begin data gathering
3001
3002     nodes = frozenset([inst.primary_node for inst in instance_list])
3003     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3004
3005     bad_nodes = []
3006     if self.do_locking:
3007       live_data = {}
3008       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3009       for name in nodes:
3010         result = node_data[name]
3011         if result.failed:
3012           bad_nodes.append(name)
3013         else:
3014           if result.data:
3015             live_data.update(result.data)
3016             # else no instance is alive
3017     else:
3018       live_data = dict([(name, {}) for name in instance_names])
3019
3020     # end data gathering
3021
3022     HVPREFIX = "hv/"
3023     BEPREFIX = "be/"
3024     output = []
3025     for instance in instance_list:
3026       iout = []
3027       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3028       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3029       for field in self.op.output_fields:
3030         st_match = self._FIELDS_STATIC.Matches(field)
3031         if field == "name":
3032           val = instance.name
3033         elif field == "os":
3034           val = instance.os
3035         elif field == "pnode":
3036           val = instance.primary_node
3037         elif field == "snodes":
3038           val = list(instance.secondary_nodes)
3039         elif field == "admin_state":
3040           val = (instance.status != "down")
3041         elif field == "oper_state":
3042           if instance.primary_node in bad_nodes:
3043             val = None
3044           else:
3045             val = bool(live_data.get(instance.name))
3046         elif field == "status":
3047           if instance.primary_node in bad_nodes:
3048             val = "ERROR_nodedown"
3049           else:
3050             running = bool(live_data.get(instance.name))
3051             if running:
3052               if instance.status != "down":
3053                 val = "running"
3054               else:
3055                 val = "ERROR_up"
3056             else:
3057               if instance.status != "down":
3058                 val = "ERROR_down"
3059               else:
3060                 val = "ADMIN_down"
3061         elif field == "oper_ram":
3062           if instance.primary_node in bad_nodes:
3063             val = None
3064           elif instance.name in live_data:
3065             val = live_data[instance.name].get("memory", "?")
3066           else:
3067             val = "-"
3068         elif field == "disk_template":
3069           val = instance.disk_template
3070         elif field == "ip":
3071           val = instance.nics[0].ip
3072         elif field == "bridge":
3073           val = instance.nics[0].bridge
3074         elif field == "mac":
3075           val = instance.nics[0].mac
3076         elif field == "sda_size" or field == "sdb_size":
3077           idx = ord(field[2]) - ord('a')
3078           try:
3079             val = instance.FindDisk(idx).size
3080           except errors.OpPrereqError:
3081             val = None
3082         elif field == "tags":
3083           val = list(instance.GetTags())
3084         elif field == "serial_no":
3085           val = instance.serial_no
3086         elif field == "network_port":
3087           val = instance.network_port
3088         elif field == "hypervisor":
3089           val = instance.hypervisor
3090         elif field == "hvparams":
3091           val = i_hv
3092         elif (field.startswith(HVPREFIX) and
3093               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3094           val = i_hv.get(field[len(HVPREFIX):], None)
3095         elif field == "beparams":
3096           val = i_be
3097         elif (field.startswith(BEPREFIX) and
3098               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3099           val = i_be.get(field[len(BEPREFIX):], None)
3100         elif st_match and st_match.groups():
3101           # matches a variable list
3102           st_groups = st_match.groups()
3103           if st_groups and st_groups[0] == "disk":
3104             if st_groups[1] == "count":
3105               val = len(instance.disks)
3106             elif st_groups[1] == "sizes":
3107               val = [disk.size for disk in instance.disks]
3108             elif st_groups[1] == "size":
3109               try:
3110                 val = instance.FindDisk(st_groups[2]).size
3111               except errors.OpPrereqError:
3112                 val = None
3113             else:
3114               assert False, "Unhandled disk parameter"
3115           elif st_groups[0] == "nic":
3116             if st_groups[1] == "count":
3117               val = len(instance.nics)
3118             elif st_groups[1] == "macs":
3119               val = [nic.mac for nic in instance.nics]
3120             elif st_groups[1] == "ips":
3121               val = [nic.ip for nic in instance.nics]
3122             elif st_groups[1] == "bridges":
3123               val = [nic.bridge for nic in instance.nics]
3124             else:
3125               # index-based item
3126               nic_idx = int(st_groups[2])
3127               if nic_idx >= len(instance.nics):
3128                 val = None
3129               else:
3130                 if st_groups[1] == "mac":
3131                   val = instance.nics[nic_idx].mac
3132                 elif st_groups[1] == "ip":
3133                   val = instance.nics[nic_idx].ip
3134                 elif st_groups[1] == "bridge":
3135                   val = instance.nics[nic_idx].bridge
3136                 else:
3137                   assert False, "Unhandled NIC parameter"
3138           else:
3139             assert False, "Unhandled variable parameter"
3140         else:
3141           raise errors.ParameterError(field)
3142         iout.append(val)
3143       output.append(iout)
3144
3145     return output
3146
3147
3148 class LUFailoverInstance(LogicalUnit):
3149   """Failover an instance.
3150
3151   """
3152   HPATH = "instance-failover"
3153   HTYPE = constants.HTYPE_INSTANCE
3154   _OP_REQP = ["instance_name", "ignore_consistency"]
3155   REQ_BGL = False
3156
3157   def ExpandNames(self):
3158     self._ExpandAndLockInstance()
3159     self.needed_locks[locking.LEVEL_NODE] = []
3160     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3161
3162   def DeclareLocks(self, level):
3163     if level == locking.LEVEL_NODE:
3164       self._LockInstancesNodes()
3165
3166   def BuildHooksEnv(self):
3167     """Build hooks env.
3168
3169     This runs on master, primary and secondary nodes of the instance.
3170
3171     """
3172     env = {
3173       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3174       }
3175     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3176     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3177     return env, nl, nl
3178
3179   def CheckPrereq(self):
3180     """Check prerequisites.
3181
3182     This checks that the instance is in the cluster.
3183
3184     """
3185     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3186     assert self.instance is not None, \
3187       "Cannot retrieve locked instance %s" % self.op.instance_name
3188
3189     bep = self.cfg.GetClusterInfo().FillBE(instance)
3190     if instance.disk_template not in constants.DTS_NET_MIRROR:
3191       raise errors.OpPrereqError("Instance's disk layout is not"
3192                                  " network mirrored, cannot failover.")
3193
3194     secondary_nodes = instance.secondary_nodes
3195     if not secondary_nodes:
3196       raise errors.ProgrammerError("no secondary node but using "
3197                                    "a mirrored disk template")
3198
3199     target_node = secondary_nodes[0]
3200     # check memory requirements on the secondary node
3201     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3202                          instance.name, bep[constants.BE_MEMORY],
3203                          instance.hypervisor)
3204
3205     # check bridge existance
3206     brlist = [nic.bridge for nic in instance.nics]
3207     result = self.rpc.call_bridges_exist(target_node, brlist)
3208     result.Raise()
3209     if not result.data:
3210       raise errors.OpPrereqError("One or more target bridges %s does not"
3211                                  " exist on destination node '%s'" %
3212                                  (brlist, target_node))
3213
3214   def Exec(self, feedback_fn):
3215     """Failover an instance.
3216
3217     The failover is done by shutting it down on its present node and
3218     starting it on the secondary.
3219
3220     """
3221     instance = self.instance
3222
3223     source_node = instance.primary_node
3224     target_node = instance.secondary_nodes[0]
3225
3226     feedback_fn("* checking disk consistency between source and target")
3227     for dev in instance.disks:
3228       # for drbd, these are drbd over lvm
3229       if not _CheckDiskConsistency(self, dev, target_node, False):
3230         if instance.status == "up" and not self.op.ignore_consistency:
3231           raise errors.OpExecError("Disk %s is degraded on target node,"
3232                                    " aborting failover." % dev.iv_name)
3233
3234     feedback_fn("* shutting down instance on source node")
3235     logging.info("Shutting down instance %s on node %s",
3236                  instance.name, source_node)
3237
3238     result = self.rpc.call_instance_shutdown(source_node, instance)
3239     if result.failed or not result.data:
3240       if self.op.ignore_consistency:
3241         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3242                              " Proceeding"
3243                              " anyway. Please make sure node %s is down",
3244                              instance.name, source_node, source_node)
3245       else:
3246         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3247                                  (instance.name, source_node))
3248
3249     feedback_fn("* deactivating the instance's disks on source node")
3250     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3251       raise errors.OpExecError("Can't shut down the instance's disks.")
3252
3253     instance.primary_node = target_node
3254     # distribute new instance config to the other nodes
3255     self.cfg.Update(instance)
3256
3257     # Only start the instance if it's marked as up
3258     if instance.status == "up":
3259       feedback_fn("* activating the instance's disks on target node")
3260       logging.info("Starting instance %s on node %s",
3261                    instance.name, target_node)
3262
3263       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3264                                                ignore_secondaries=True)
3265       if not disks_ok:
3266         _ShutdownInstanceDisks(self, instance)
3267         raise errors.OpExecError("Can't activate the instance's disks")
3268
3269       feedback_fn("* starting the instance on the target node")
3270       result = self.rpc.call_instance_start(target_node, instance, None)
3271       if result.failed or not result.data:
3272         _ShutdownInstanceDisks(self, instance)
3273         raise errors.OpExecError("Could not start instance %s on node %s." %
3274                                  (instance.name, target_node))
3275
3276
3277 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3278   """Create a tree of block devices on the primary node.
3279
3280   This always creates all devices.
3281
3282   """
3283   if device.children:
3284     for child in device.children:
3285       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3286         return False
3287
3288   lu.cfg.SetDiskID(device, node)
3289   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3290                                        instance.name, True, info)
3291   if new_id.failed or not new_id.data:
3292     return False
3293   if device.physical_id is None:
3294     device.physical_id = new_id
3295   return True
3296
3297
3298 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3299   """Create a tree of block devices on a secondary node.
3300
3301   If this device type has to be created on secondaries, create it and
3302   all its children.
3303
3304   If not, just recurse to children keeping the same 'force' value.
3305
3306   """
3307   if device.CreateOnSecondary():
3308     force = True
3309   if device.children:
3310     for child in device.children:
3311       if not _CreateBlockDevOnSecondary(lu, node, instance,
3312                                         child, force, info):
3313         return False
3314
3315   if not force:
3316     return True
3317   lu.cfg.SetDiskID(device, node)
3318   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3319                                        instance.name, False, info)
3320   if new_id.failed or not new_id.data:
3321     return False
3322   if device.physical_id is None:
3323     device.physical_id = new_id
3324   return True
3325
3326
3327 def _GenerateUniqueNames(lu, exts):
3328   """Generate a suitable LV name.
3329
3330   This will generate a logical volume name for the given instance.
3331
3332   """
3333   results = []
3334   for val in exts:
3335     new_id = lu.cfg.GenerateUniqueID()
3336     results.append("%s%s" % (new_id, val))
3337   return results
3338
3339
3340 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3341                          p_minor, s_minor):
3342   """Generate a drbd8 device complete with its children.
3343
3344   """
3345   port = lu.cfg.AllocatePort()
3346   vgname = lu.cfg.GetVGName()
3347   shared_secret = lu.cfg.GenerateDRBDSecret()
3348   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3349                           logical_id=(vgname, names[0]))
3350   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3351                           logical_id=(vgname, names[1]))
3352   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3353                           logical_id=(primary, secondary, port,
3354                                       p_minor, s_minor,
3355                                       shared_secret),
3356                           children=[dev_data, dev_meta],
3357                           iv_name=iv_name)
3358   return drbd_dev
3359
3360
3361 def _GenerateDiskTemplate(lu, template_name,
3362                           instance_name, primary_node,
3363                           secondary_nodes, disk_info,
3364                           file_storage_dir, file_driver,
3365                           base_index):
3366   """Generate the entire disk layout for a given template type.
3367
3368   """
3369   #TODO: compute space requirements
3370
3371   vgname = lu.cfg.GetVGName()
3372   disk_count = len(disk_info)
3373   disks = []
3374   if template_name == constants.DT_DISKLESS:
3375     pass
3376   elif template_name == constants.DT_PLAIN:
3377     if len(secondary_nodes) != 0:
3378       raise errors.ProgrammerError("Wrong template configuration")
3379
3380     names = _GenerateUniqueNames(lu, [".disk%d" % i
3381                                       for i in range(disk_count)])
3382     for idx, disk in enumerate(disk_info):
3383       disk_index = idx + base_index
3384       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3385                               logical_id=(vgname, names[idx]),
3386                               iv_name="disk/%d" % disk_index)
3387       disks.append(disk_dev)
3388   elif template_name == constants.DT_DRBD8:
3389     if len(secondary_nodes) != 1:
3390       raise errors.ProgrammerError("Wrong template configuration")
3391     remote_node = secondary_nodes[0]
3392     minors = lu.cfg.AllocateDRBDMinor(
3393       [primary_node, remote_node] * len(disk_info), instance_name)
3394
3395     names = _GenerateUniqueNames(lu,
3396                                  [".disk%d_%s" % (i, s)
3397                                   for i in range(disk_count)
3398                                   for s in ("data", "meta")
3399                                   ])
3400     for idx, disk in enumerate(disk_info):
3401       disk_index = idx + base_index
3402       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3403                                       disk["size"], names[idx*2:idx*2+2],
3404                                       "disk/%d" % disk_index,
3405                                       minors[idx*2], minors[idx*2+1])
3406       disks.append(disk_dev)
3407   elif template_name == constants.DT_FILE:
3408     if len(secondary_nodes) != 0:
3409       raise errors.ProgrammerError("Wrong template configuration")
3410
3411     for idx, disk in enumerate(disk_info):
3412       disk_index = idx + base_index
3413       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3414                               iv_name="disk/%d" % disk_index,
3415                               logical_id=(file_driver,
3416                                           "%s/disk%d" % (file_storage_dir,
3417                                                          idx)))
3418       disks.append(disk_dev)
3419   else:
3420     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3421   return disks
3422
3423
3424 def _GetInstanceInfoText(instance):
3425   """Compute that text that should be added to the disk's metadata.
3426
3427   """
3428   return "originstname+%s" % instance.name
3429
3430
3431 def _CreateDisks(lu, instance):
3432   """Create all disks for an instance.
3433
3434   This abstracts away some work from AddInstance.
3435
3436   @type lu: L{LogicalUnit}
3437   @param lu: the logical unit on whose behalf we execute
3438   @type instance: L{objects.Instance}
3439   @param instance: the instance whose disks we should create
3440   @rtype: boolean
3441   @return: the success of the creation
3442
3443   """
3444   info = _GetInstanceInfoText(instance)
3445
3446   if instance.disk_template == constants.DT_FILE:
3447     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3448     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3449                                                  file_storage_dir)
3450
3451     if result.failed or not result.data:
3452       logging.error("Could not connect to node '%s'", instance.primary_node)
3453       return False
3454
3455     if not result.data[0]:
3456       logging.error("Failed to create directory '%s'", file_storage_dir)
3457       return False
3458
3459   # Note: this needs to be kept in sync with adding of disks in
3460   # LUSetInstanceParams
3461   for device in instance.disks:
3462     logging.info("Creating volume %s for instance %s",
3463                  device.iv_name, instance.name)
3464     #HARDCODE
3465     for secondary_node in instance.secondary_nodes:
3466       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3467                                         device, False, info):
3468         logging.error("Failed to create volume %s (%s) on secondary node %s!",
3469                       device.iv_name, device, secondary_node)
3470         return False
3471     #HARDCODE
3472     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3473                                     instance, device, info):
3474       logging.error("Failed to create volume %s on primary!", device.iv_name)
3475       return False
3476
3477   return True
3478
3479
3480 def _RemoveDisks(lu, instance):
3481   """Remove all disks for an instance.
3482
3483   This abstracts away some work from `AddInstance()` and
3484   `RemoveInstance()`. Note that in case some of the devices couldn't
3485   be removed, the removal will continue with the other ones (compare
3486   with `_CreateDisks()`).
3487
3488   @type lu: L{LogicalUnit}
3489   @param lu: the logical unit on whose behalf we execute
3490   @type instance: L{objects.Instance}
3491   @param instance: the instance whose disks we should remove
3492   @rtype: boolean
3493   @return: the success of the removal
3494
3495   """
3496   logging.info("Removing block devices for instance %s", instance.name)
3497
3498   result = True
3499   for device in instance.disks:
3500     for node, disk in device.ComputeNodeTree(instance.primary_node):
3501       lu.cfg.SetDiskID(disk, node)
3502       result = lu.rpc.call_blockdev_remove(node, disk)
3503       if result.failed or not result.data:
3504         lu.proc.LogWarning("Could not remove block device %s on node %s,"
3505                            " continuing anyway", device.iv_name, node)
3506         result = False
3507
3508   if instance.disk_template == constants.DT_FILE:
3509     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3510     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3511                                                  file_storage_dir)
3512     if result.failed or not result.data:
3513       logging.error("Could not remove directory '%s'", file_storage_dir)
3514       result = False
3515
3516   return result
3517
3518
3519 def _ComputeDiskSize(disk_template, disks):
3520   """Compute disk size requirements in the volume group
3521
3522   """
3523   # Required free disk space as a function of disk and swap space
3524   req_size_dict = {
3525     constants.DT_DISKLESS: None,
3526     constants.DT_PLAIN: sum(d["size"] for d in disks),
3527     # 128 MB are added for drbd metadata for each disk
3528     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3529     constants.DT_FILE: None,
3530   }
3531
3532   if disk_template not in req_size_dict:
3533     raise errors.ProgrammerError("Disk template '%s' size requirement"
3534                                  " is unknown" %  disk_template)
3535
3536   return req_size_dict[disk_template]
3537
3538
3539 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3540   """Hypervisor parameter validation.
3541
3542   This function abstract the hypervisor parameter validation to be
3543   used in both instance create and instance modify.
3544
3545   @type lu: L{LogicalUnit}
3546   @param lu: the logical unit for which we check
3547   @type nodenames: list
3548   @param nodenames: the list of nodes on which we should check
3549   @type hvname: string
3550   @param hvname: the name of the hypervisor we should use
3551   @type hvparams: dict
3552   @param hvparams: the parameters which we need to check
3553   @raise errors.OpPrereqError: if the parameters are not valid
3554
3555   """
3556   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3557                                                   hvname,
3558                                                   hvparams)
3559   for node in nodenames:
3560     info = hvinfo[node]
3561     info.Raise()
3562     if not info.data or not isinstance(info.data, (tuple, list)):
3563       raise errors.OpPrereqError("Cannot get current information"
3564                                  " from node '%s' (%s)" % (node, info.data))
3565     if not info.data[0]:
3566       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3567                                  " %s" % info.data[1])
3568
3569
3570 class LUCreateInstance(LogicalUnit):
3571   """Create an instance.
3572
3573   """
3574   HPATH = "instance-add"
3575   HTYPE = constants.HTYPE_INSTANCE
3576   _OP_REQP = ["instance_name", "disks", "disk_template",
3577               "mode", "start",
3578               "wait_for_sync", "ip_check", "nics",
3579               "hvparams", "beparams"]
3580   REQ_BGL = False
3581
3582   def _ExpandNode(self, node):
3583     """Expands and checks one node name.
3584
3585     """
3586     node_full = self.cfg.ExpandNodeName(node)
3587     if node_full is None:
3588       raise errors.OpPrereqError("Unknown node %s" % node)
3589     return node_full
3590
3591   def ExpandNames(self):
3592     """ExpandNames for CreateInstance.
3593
3594     Figure out the right locks for instance creation.
3595
3596     """
3597     self.needed_locks = {}
3598
3599     # set optional parameters to none if they don't exist
3600     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3601       if not hasattr(self.op, attr):
3602         setattr(self.op, attr, None)
3603
3604     # cheap checks, mostly valid constants given
3605
3606     # verify creation mode
3607     if self.op.mode not in (constants.INSTANCE_CREATE,
3608                             constants.INSTANCE_IMPORT):
3609       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3610                                  self.op.mode)
3611
3612     # disk template and mirror node verification
3613     if self.op.disk_template not in constants.DISK_TEMPLATES:
3614       raise errors.OpPrereqError("Invalid disk template name")
3615
3616     if self.op.hypervisor is None:
3617       self.op.hypervisor = self.cfg.GetHypervisorType()
3618
3619     cluster = self.cfg.GetClusterInfo()
3620     enabled_hvs = cluster.enabled_hypervisors
3621     if self.op.hypervisor not in enabled_hvs:
3622       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3623                                  " cluster (%s)" % (self.op.hypervisor,
3624                                   ",".join(enabled_hvs)))
3625
3626     # check hypervisor parameter syntax (locally)
3627
3628     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3629                                   self.op.hvparams)
3630     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3631     hv_type.CheckParameterSyntax(filled_hvp)
3632
3633     # fill and remember the beparams dict
3634     utils.CheckBEParams(self.op.beparams)
3635     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3636                                     self.op.beparams)
3637
3638     #### instance parameters check
3639
3640     # instance name verification
3641     hostname1 = utils.HostInfo(self.op.instance_name)
3642     self.op.instance_name = instance_name = hostname1.name
3643
3644     # this is just a preventive check, but someone might still add this
3645     # instance in the meantime, and creation will fail at lock-add time
3646     if instance_name in self.cfg.GetInstanceList():
3647       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3648                                  instance_name)
3649
3650     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3651
3652     # NIC buildup
3653     self.nics = []
3654     for nic in self.op.nics:
3655       # ip validity checks
3656       ip = nic.get("ip", None)
3657       if ip is None or ip.lower() == "none":
3658         nic_ip = None
3659       elif ip.lower() == constants.VALUE_AUTO:
3660         nic_ip = hostname1.ip
3661       else:
3662         if not utils.IsValidIP(ip):
3663           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3664                                      " like a valid IP" % ip)
3665         nic_ip = ip
3666
3667       # MAC address verification
3668       mac = nic.get("mac", constants.VALUE_AUTO)
3669       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3670         if not utils.IsValidMac(mac.lower()):
3671           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3672                                      mac)
3673       # bridge verification
3674       bridge = nic.get("bridge", self.cfg.GetDefBridge())
3675       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3676
3677     # disk checks/pre-build
3678     self.disks = []
3679     for disk in self.op.disks:
3680       mode = disk.get("mode", constants.DISK_RDWR)
3681       if mode not in constants.DISK_ACCESS_SET:
3682         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3683                                    mode)
3684       size = disk.get("size", None)
3685       if size is None:
3686         raise errors.OpPrereqError("Missing disk size")
3687       try:
3688         size = int(size)
3689       except ValueError:
3690         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3691       self.disks.append({"size": size, "mode": mode})
3692
3693     # used in CheckPrereq for ip ping check
3694     self.check_ip = hostname1.ip
3695
3696     # file storage checks
3697     if (self.op.file_driver and
3698         not self.op.file_driver in constants.FILE_DRIVER):
3699       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3700                                  self.op.file_driver)
3701
3702     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3703       raise errors.OpPrereqError("File storage directory path not absolute")
3704
3705     ### Node/iallocator related checks
3706     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3707       raise errors.OpPrereqError("One and only one of iallocator and primary"
3708                                  " node must be given")
3709
3710     if self.op.iallocator:
3711       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3712     else:
3713       self.op.pnode = self._ExpandNode(self.op.pnode)
3714       nodelist = [self.op.pnode]
3715       if self.op.snode is not None:
3716         self.op.snode = self._ExpandNode(self.op.snode)
3717         nodelist.append(self.op.snode)
3718       self.needed_locks[locking.LEVEL_NODE] = nodelist
3719
3720     # in case of import lock the source node too
3721     if self.op.mode == constants.INSTANCE_IMPORT:
3722       src_node = getattr(self.op, "src_node", None)
3723       src_path = getattr(self.op, "src_path", None)
3724
3725       if src_path is None:
3726         self.op.src_path = src_path = self.op.instance_name
3727
3728       if src_node is None:
3729         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3730         self.op.src_node = None
3731         if os.path.isabs(src_path):
3732           raise errors.OpPrereqError("Importing an instance from an absolute"
3733                                      " path requires a source node option.")
3734       else:
3735         self.op.src_node = src_node = self._ExpandNode(src_node)
3736         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3737           self.needed_locks[locking.LEVEL_NODE].append(src_node)
3738         if not os.path.isabs(src_path):
3739           self.op.src_path = src_path = \
3740             os.path.join(constants.EXPORT_DIR, src_path)
3741
3742     else: # INSTANCE_CREATE
3743       if getattr(self.op, "os_type", None) is None:
3744         raise errors.OpPrereqError("No guest OS specified")
3745
3746   def _RunAllocator(self):
3747     """Run the allocator based on input opcode.
3748
3749     """
3750     nics = [n.ToDict() for n in self.nics]
3751     ial = IAllocator(self,
3752                      mode=constants.IALLOCATOR_MODE_ALLOC,
3753                      name=self.op.instance_name,
3754                      disk_template=self.op.disk_template,
3755                      tags=[],
3756                      os=self.op.os_type,
3757                      vcpus=self.be_full[constants.BE_VCPUS],
3758                      mem_size=self.be_full[constants.BE_MEMORY],
3759                      disks=self.disks,
3760                      nics=nics,
3761                      hypervisor=self.op.hypervisor,
3762                      )
3763
3764     ial.Run(self.op.iallocator)
3765
3766     if not ial.success:
3767       raise errors.OpPrereqError("Can't compute nodes using"
3768                                  " iallocator '%s': %s" % (self.op.iallocator,
3769                                                            ial.info))
3770     if len(ial.nodes) != ial.required_nodes:
3771       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3772                                  " of nodes (%s), required %s" %
3773                                  (self.op.iallocator, len(ial.nodes),
3774                                   ial.required_nodes))
3775     self.op.pnode = ial.nodes[0]
3776     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3777                  self.op.instance_name, self.op.iallocator,
3778                  ", ".join(ial.nodes))
3779     if ial.required_nodes == 2:
3780       self.op.snode = ial.nodes[1]
3781
3782   def BuildHooksEnv(self):
3783     """Build hooks env.
3784
3785     This runs on master, primary and secondary nodes of the instance.
3786
3787     """
3788     env = {
3789       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3790       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3791       "INSTANCE_ADD_MODE": self.op.mode,
3792       }
3793     if self.op.mode == constants.INSTANCE_IMPORT:
3794       env["INSTANCE_SRC_NODE"] = self.op.src_node
3795       env["INSTANCE_SRC_PATH"] = self.op.src_path
3796       env["INSTANCE_SRC_IMAGES"] = self.src_images
3797
3798     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3799       primary_node=self.op.pnode,
3800       secondary_nodes=self.secondaries,
3801       status=self.instance_status,
3802       os_type=self.op.os_type,
3803       memory=self.be_full[constants.BE_MEMORY],
3804       vcpus=self.be_full[constants.BE_VCPUS],
3805       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3806     ))
3807
3808     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3809           self.secondaries)
3810     return env, nl, nl
3811
3812
3813   def CheckPrereq(self):
3814     """Check prerequisites.
3815
3816     """
3817     if (not self.cfg.GetVGName() and
3818         self.op.disk_template not in constants.DTS_NOT_LVM):
3819       raise errors.OpPrereqError("Cluster does not support lvm-based"
3820                                  " instances")
3821
3822
3823     if self.op.mode == constants.INSTANCE_IMPORT:
3824       src_node = self.op.src_node
3825       src_path = self.op.src_path
3826
3827       if src_node is None:
3828         exp_list = self.rpc.call_export_list(
3829           self.acquired_locks[locking.LEVEL_NODE])
3830         found = False
3831         for node in exp_list:
3832           if not exp_list[node].failed and src_path in exp_list[node].data:
3833             found = True
3834             self.op.src_node = src_node = node
3835             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3836                                                        src_path)
3837             break
3838         if not found:
3839           raise errors.OpPrereqError("No export found for relative path %s" %
3840                                       src_path)
3841
3842       result = self.rpc.call_export_info(src_node, src_path)
3843       result.Raise()
3844       if not result.data:
3845         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3846
3847       export_info = result.data
3848       if not export_info.has_section(constants.INISECT_EXP):
3849         raise errors.ProgrammerError("Corrupted export config")
3850
3851       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3852       if (int(ei_version) != constants.EXPORT_VERSION):
3853         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3854                                    (ei_version, constants.EXPORT_VERSION))
3855
3856       # Check that the new instance doesn't have less disks than the export
3857       instance_disks = len(self.disks)
3858       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3859       if instance_disks < export_disks:
3860         raise errors.OpPrereqError("Not enough disks to import."
3861                                    " (instance: %d, export: %d)" %
3862                                    (instance_disks, export_disks))
3863
3864       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3865       disk_images = []
3866       for idx in range(export_disks):
3867         option = 'disk%d_dump' % idx
3868         if export_info.has_option(constants.INISECT_INS, option):
3869           # FIXME: are the old os-es, disk sizes, etc. useful?
3870           export_name = export_info.get(constants.INISECT_INS, option)
3871           image = os.path.join(src_path, export_name)
3872           disk_images.append(image)
3873         else:
3874           disk_images.append(False)
3875
3876       self.src_images = disk_images
3877
3878       old_name = export_info.get(constants.INISECT_INS, 'name')
3879       # FIXME: int() here could throw a ValueError on broken exports
3880       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3881       if self.op.instance_name == old_name:
3882         for idx, nic in enumerate(self.nics):
3883           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3884             nic_mac_ini = 'nic%d_mac' % idx
3885             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3886
3887     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3888     if self.op.start and not self.op.ip_check:
3889       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3890                                  " adding an instance in start mode")
3891
3892     if self.op.ip_check:
3893       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3894         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3895                                    (self.check_ip, self.op.instance_name))
3896
3897     #### allocator run
3898
3899     if self.op.iallocator is not None:
3900       self._RunAllocator()
3901
3902     #### node related checks
3903
3904     # check primary node
3905     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3906     assert self.pnode is not None, \
3907       "Cannot retrieve locked node %s" % self.op.pnode
3908     self.secondaries = []
3909
3910     # mirror node verification
3911     if self.op.disk_template in constants.DTS_NET_MIRROR:
3912       if self.op.snode is None:
3913         raise errors.OpPrereqError("The networked disk templates need"
3914                                    " a mirror node")
3915       if self.op.snode == pnode.name:
3916         raise errors.OpPrereqError("The secondary node cannot be"
3917                                    " the primary node.")
3918       self.secondaries.append(self.op.snode)
3919
3920     nodenames = [pnode.name] + self.secondaries
3921
3922     req_size = _ComputeDiskSize(self.op.disk_template,
3923                                 self.disks)
3924
3925     # Check lv size requirements
3926     if req_size is not None:
3927       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3928                                          self.op.hypervisor)
3929       for node in nodenames:
3930         info = nodeinfo[node]
3931         info.Raise()
3932         info = info.data
3933         if not info:
3934           raise errors.OpPrereqError("Cannot get current information"
3935                                      " from node '%s'" % node)
3936         vg_free = info.get('vg_free', None)
3937         if not isinstance(vg_free, int):
3938           raise errors.OpPrereqError("Can't compute free disk space on"
3939                                      " node %s" % node)
3940         if req_size > info['vg_free']:
3941           raise errors.OpPrereqError("Not enough disk space on target node %s."
3942                                      " %d MB available, %d MB required" %
3943                                      (node, info['vg_free'], req_size))
3944
3945     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3946
3947     # os verification
3948     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3949     result.Raise()
3950     if not isinstance(result.data, objects.OS):
3951       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3952                                  " primary node"  % self.op.os_type)
3953
3954     # bridge check on primary node
3955     bridges = [n.bridge for n in self.nics]
3956     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3957     result.Raise()
3958     if not result.data:
3959       raise errors.OpPrereqError("One of the target bridges '%s' does not"
3960                                  " exist on destination node '%s'" %
3961                                  (",".join(bridges), pnode.name))
3962
3963     # memory check on primary node
3964     if self.op.start:
3965       _CheckNodeFreeMemory(self, self.pnode.name,
3966                            "creating instance %s" % self.op.instance_name,
3967                            self.be_full[constants.BE_MEMORY],
3968                            self.op.hypervisor)
3969
3970     if self.op.start:
3971       self.instance_status = 'up'
3972     else:
3973       self.instance_status = 'down'
3974
3975   def Exec(self, feedback_fn):
3976     """Create and add the instance to the cluster.
3977
3978     """
3979     instance = self.op.instance_name
3980     pnode_name = self.pnode.name
3981
3982     for nic in self.nics:
3983       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3984         nic.mac = self.cfg.GenerateMAC()
3985
3986     ht_kind = self.op.hypervisor
3987     if ht_kind in constants.HTS_REQ_PORT:
3988       network_port = self.cfg.AllocatePort()
3989     else:
3990       network_port = None
3991
3992     ##if self.op.vnc_bind_address is None:
3993     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3994
3995     # this is needed because os.path.join does not accept None arguments
3996     if self.op.file_storage_dir is None:
3997       string_file_storage_dir = ""
3998     else:
3999       string_file_storage_dir = self.op.file_storage_dir
4000
4001     # build the full file storage dir path
4002     file_storage_dir = os.path.normpath(os.path.join(
4003                                         self.cfg.GetFileStorageDir(),
4004                                         string_file_storage_dir, instance))
4005
4006
4007     disks = _GenerateDiskTemplate(self,
4008                                   self.op.disk_template,
4009                                   instance, pnode_name,
4010                                   self.secondaries,
4011                                   self.disks,
4012                                   file_storage_dir,
4013                                   self.op.file_driver,
4014                                   0)
4015
4016     iobj = objects.Instance(name=instance, os=self.op.os_type,
4017                             primary_node=pnode_name,
4018                             nics=self.nics, disks=disks,
4019                             disk_template=self.op.disk_template,
4020                             status=self.instance_status,
4021                             network_port=network_port,
4022                             beparams=self.op.beparams,
4023                             hvparams=self.op.hvparams,
4024                             hypervisor=self.op.hypervisor,
4025                             )
4026
4027     feedback_fn("* creating instance disks...")
4028     if not _CreateDisks(self, iobj):
4029       _RemoveDisks(self, iobj)
4030       self.cfg.ReleaseDRBDMinors(instance)
4031       raise errors.OpExecError("Device creation failed, reverting...")
4032
4033     feedback_fn("adding instance %s to cluster config" % instance)
4034
4035     self.cfg.AddInstance(iobj)
4036     # Declare that we don't want to remove the instance lock anymore, as we've
4037     # added the instance to the config
4038     del self.remove_locks[locking.LEVEL_INSTANCE]
4039     # Remove the temp. assignements for the instance's drbds
4040     self.cfg.ReleaseDRBDMinors(instance)
4041     # Unlock all the nodes
4042     if self.op.mode == constants.INSTANCE_IMPORT:
4043       nodes_keep = [self.op.src_node]
4044       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4045                        if node != self.op.src_node]
4046       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4047       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4048     else:
4049       self.context.glm.release(locking.LEVEL_NODE)
4050       del self.acquired_locks[locking.LEVEL_NODE]
4051
4052     if self.op.wait_for_sync:
4053       disk_abort = not _WaitForSync(self, iobj)
4054     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4055       # make sure the disks are not degraded (still sync-ing is ok)
4056       time.sleep(15)
4057       feedback_fn("* checking mirrors status")
4058       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4059     else:
4060       disk_abort = False
4061
4062     if disk_abort:
4063       _RemoveDisks(self, iobj)
4064       self.cfg.RemoveInstance(iobj.name)
4065       # Make sure the instance lock gets removed
4066       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4067       raise errors.OpExecError("There are some degraded disks for"
4068                                " this instance")
4069
4070     feedback_fn("creating os for instance %s on node %s" %
4071                 (instance, pnode_name))
4072
4073     if iobj.disk_template != constants.DT_DISKLESS:
4074       if self.op.mode == constants.INSTANCE_CREATE:
4075         feedback_fn("* running the instance OS create scripts...")
4076         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4077         result.Raise()
4078         if not result.data:
4079           raise errors.OpExecError("Could not add os for instance %s"
4080                                    " on node %s" %
4081                                    (instance, pnode_name))
4082
4083       elif self.op.mode == constants.INSTANCE_IMPORT:
4084         feedback_fn("* running the instance OS import scripts...")
4085         src_node = self.op.src_node
4086         src_images = self.src_images
4087         cluster_name = self.cfg.GetClusterName()
4088         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4089                                                          src_node, src_images,
4090                                                          cluster_name)
4091         import_result.Raise()
4092         for idx, result in enumerate(import_result.data):
4093           if not result:
4094             self.LogWarning("Could not import the image %s for instance"
4095                             " %s, disk %d, on node %s" %
4096                             (src_images[idx], instance, idx, pnode_name))
4097       else:
4098         # also checked in the prereq part
4099         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4100                                      % self.op.mode)
4101
4102     if self.op.start:
4103       logging.info("Starting instance %s on node %s", instance, pnode_name)
4104       feedback_fn("* starting instance...")
4105       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4106       result.Raise()
4107       if not result.data:
4108         raise errors.OpExecError("Could not start instance")
4109
4110
4111 class LUConnectConsole(NoHooksLU):
4112   """Connect to an instance's console.
4113
4114   This is somewhat special in that it returns the command line that
4115   you need to run on the master node in order to connect to the
4116   console.
4117
4118   """
4119   _OP_REQP = ["instance_name"]
4120   REQ_BGL = False
4121
4122   def ExpandNames(self):
4123     self._ExpandAndLockInstance()
4124
4125   def CheckPrereq(self):
4126     """Check prerequisites.
4127
4128     This checks that the instance is in the cluster.
4129
4130     """
4131     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4132     assert self.instance is not None, \
4133       "Cannot retrieve locked instance %s" % self.op.instance_name
4134
4135   def Exec(self, feedback_fn):
4136     """Connect to the console of an instance
4137
4138     """
4139     instance = self.instance
4140     node = instance.primary_node
4141
4142     node_insts = self.rpc.call_instance_list([node],
4143                                              [instance.hypervisor])[node]
4144     node_insts.Raise()
4145
4146     if instance.name not in node_insts.data:
4147       raise errors.OpExecError("Instance %s is not running." % instance.name)
4148
4149     logging.debug("Connecting to console of %s on %s", instance.name, node)
4150
4151     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4152     console_cmd = hyper.GetShellCommandForConsole(instance)
4153
4154     # build ssh cmdline
4155     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4156
4157
4158 class LUReplaceDisks(LogicalUnit):
4159   """Replace the disks of an instance.
4160
4161   """
4162   HPATH = "mirrors-replace"
4163   HTYPE = constants.HTYPE_INSTANCE
4164   _OP_REQP = ["instance_name", "mode", "disks"]
4165   REQ_BGL = False
4166
4167   def ExpandNames(self):
4168     self._ExpandAndLockInstance()
4169
4170     if not hasattr(self.op, "remote_node"):
4171       self.op.remote_node = None
4172
4173     ia_name = getattr(self.op, "iallocator", None)
4174     if ia_name is not None:
4175       if self.op.remote_node is not None:
4176         raise errors.OpPrereqError("Give either the iallocator or the new"
4177                                    " secondary, not both")
4178       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4179     elif self.op.remote_node is not None:
4180       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4181       if remote_node is None:
4182         raise errors.OpPrereqError("Node '%s' not known" %
4183                                    self.op.remote_node)
4184       self.op.remote_node = remote_node
4185       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4186       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4187     else:
4188       self.needed_locks[locking.LEVEL_NODE] = []
4189       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4190
4191   def DeclareLocks(self, level):
4192     # If we're not already locking all nodes in the set we have to declare the
4193     # instance's primary/secondary nodes.
4194     if (level == locking.LEVEL_NODE and
4195         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4196       self._LockInstancesNodes()
4197
4198   def _RunAllocator(self):
4199     """Compute a new secondary node using an IAllocator.
4200
4201     """
4202     ial = IAllocator(self,
4203                      mode=constants.IALLOCATOR_MODE_RELOC,
4204                      name=self.op.instance_name,
4205                      relocate_from=[self.sec_node])
4206
4207     ial.Run(self.op.iallocator)
4208
4209     if not ial.success:
4210       raise errors.OpPrereqError("Can't compute nodes using"
4211                                  " iallocator '%s': %s" % (self.op.iallocator,
4212                                                            ial.info))
4213     if len(ial.nodes) != ial.required_nodes:
4214       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4215                                  " of nodes (%s), required %s" %
4216                                  (len(ial.nodes), ial.required_nodes))
4217     self.op.remote_node = ial.nodes[0]
4218     self.LogInfo("Selected new secondary for the instance: %s",
4219                  self.op.remote_node)
4220
4221   def BuildHooksEnv(self):
4222     """Build hooks env.
4223
4224     This runs on the master, the primary and all the secondaries.
4225
4226     """
4227     env = {
4228       "MODE": self.op.mode,
4229       "NEW_SECONDARY": self.op.remote_node,
4230       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4231       }
4232     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4233     nl = [
4234       self.cfg.GetMasterNode(),
4235       self.instance.primary_node,
4236       ]
4237     if self.op.remote_node is not None:
4238       nl.append(self.op.remote_node)
4239     return env, nl, nl
4240
4241   def CheckPrereq(self):
4242     """Check prerequisites.
4243
4244     This checks that the instance is in the cluster.
4245
4246     """
4247     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4248     assert instance is not None, \
4249       "Cannot retrieve locked instance %s" % self.op.instance_name
4250     self.instance = instance
4251
4252     if instance.disk_template not in constants.DTS_NET_MIRROR:
4253       raise errors.OpPrereqError("Instance's disk layout is not"
4254                                  " network mirrored.")
4255
4256     if len(instance.secondary_nodes) != 1:
4257       raise errors.OpPrereqError("The instance has a strange layout,"
4258                                  " expected one secondary but found %d" %
4259                                  len(instance.secondary_nodes))
4260
4261     self.sec_node = instance.secondary_nodes[0]
4262
4263     ia_name = getattr(self.op, "iallocator", None)
4264     if ia_name is not None:
4265       self._RunAllocator()
4266
4267     remote_node = self.op.remote_node
4268     if remote_node is not None:
4269       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4270       assert self.remote_node_info is not None, \
4271         "Cannot retrieve locked node %s" % remote_node
4272     else:
4273       self.remote_node_info = None
4274     if remote_node == instance.primary_node:
4275       raise errors.OpPrereqError("The specified node is the primary node of"
4276                                  " the instance.")
4277     elif remote_node == self.sec_node:
4278       if self.op.mode == constants.REPLACE_DISK_SEC:
4279         # this is for DRBD8, where we can't execute the same mode of
4280         # replacement as for drbd7 (no different port allocated)
4281         raise errors.OpPrereqError("Same secondary given, cannot execute"
4282                                    " replacement")
4283     if instance.disk_template == constants.DT_DRBD8:
4284       if (self.op.mode == constants.REPLACE_DISK_ALL and
4285           remote_node is not None):
4286         # switch to replace secondary mode
4287         self.op.mode = constants.REPLACE_DISK_SEC
4288
4289       if self.op.mode == constants.REPLACE_DISK_ALL:
4290         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4291                                    " secondary disk replacement, not"
4292                                    " both at once")
4293       elif self.op.mode == constants.REPLACE_DISK_PRI:
4294         if remote_node is not None:
4295           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4296                                      " the secondary while doing a primary"
4297                                      " node disk replacement")
4298         self.tgt_node = instance.primary_node
4299         self.oth_node = instance.secondary_nodes[0]
4300       elif self.op.mode == constants.REPLACE_DISK_SEC:
4301         self.new_node = remote_node # this can be None, in which case
4302                                     # we don't change the secondary
4303         self.tgt_node = instance.secondary_nodes[0]
4304         self.oth_node = instance.primary_node
4305       else:
4306         raise errors.ProgrammerError("Unhandled disk replace mode")
4307
4308     if not self.op.disks:
4309       self.op.disks = range(len(instance.disks))
4310
4311     for disk_idx in self.op.disks:
4312       instance.FindDisk(disk_idx)
4313
4314   def _ExecD8DiskOnly(self, feedback_fn):
4315     """Replace a disk on the primary or secondary for dbrd8.
4316
4317     The algorithm for replace is quite complicated:
4318
4319       1. for each disk to be replaced:
4320
4321         1. create new LVs on the target node with unique names
4322         1. detach old LVs from the drbd device
4323         1. rename old LVs to name_replaced.<time_t>
4324         1. rename new LVs to old LVs
4325         1. attach the new LVs (with the old names now) to the drbd device
4326
4327       1. wait for sync across all devices
4328
4329       1. for each modified disk:
4330
4331         1. remove old LVs (which have the name name_replaces.<time_t>)
4332
4333     Failures are not very well handled.
4334
4335     """
4336     steps_total = 6
4337     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4338     instance = self.instance
4339     iv_names = {}
4340     vgname = self.cfg.GetVGName()
4341     # start of work
4342     cfg = self.cfg
4343     tgt_node = self.tgt_node
4344     oth_node = self.oth_node
4345
4346     # Step: check device activation
4347     self.proc.LogStep(1, steps_total, "check device existence")
4348     info("checking volume groups")
4349     my_vg = cfg.GetVGName()
4350     results = self.rpc.call_vg_list([oth_node, tgt_node])
4351     if not results:
4352       raise errors.OpExecError("Can't list volume groups on the nodes")
4353     for node in oth_node, tgt_node:
4354       res = results[node]
4355       if res.failed or not res.data or my_vg not in res.data:
4356         raise errors.OpExecError("Volume group '%s' not found on %s" %
4357                                  (my_vg, node))
4358     for idx, dev in enumerate(instance.disks):
4359       if idx not in self.op.disks:
4360         continue
4361       for node in tgt_node, oth_node:
4362         info("checking disk/%d on %s" % (idx, node))
4363         cfg.SetDiskID(dev, node)
4364         if not self.rpc.call_blockdev_find(node, dev):
4365           raise errors.OpExecError("Can't find disk/%d on node %s" %
4366                                    (idx, node))
4367
4368     # Step: check other node consistency
4369     self.proc.LogStep(2, steps_total, "check peer consistency")
4370     for idx, dev in enumerate(instance.disks):
4371       if idx not in self.op.disks:
4372         continue
4373       info("checking disk/%d consistency on %s" % (idx, oth_node))
4374       if not _CheckDiskConsistency(self, dev, oth_node,
4375                                    oth_node==instance.primary_node):
4376         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4377                                  " to replace disks on this node (%s)" %
4378                                  (oth_node, tgt_node))
4379
4380     # Step: create new storage
4381     self.proc.LogStep(3, steps_total, "allocate new storage")
4382     for idx, dev in enumerate(instance.disks):
4383       if idx not in self.op.disks:
4384         continue
4385       size = dev.size
4386       cfg.SetDiskID(dev, tgt_node)
4387       lv_names = [".disk%d_%s" % (idx, suf)
4388                   for suf in ["data", "meta"]]
4389       names = _GenerateUniqueNames(self, lv_names)
4390       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4391                              logical_id=(vgname, names[0]))
4392       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4393                              logical_id=(vgname, names[1]))
4394       new_lvs = [lv_data, lv_meta]
4395       old_lvs = dev.children
4396       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4397       info("creating new local storage on %s for %s" %
4398            (tgt_node, dev.iv_name))
4399       # since we *always* want to create this LV, we use the
4400       # _Create...OnPrimary (which forces the creation), even if we
4401       # are talking about the secondary node
4402       for new_lv in new_lvs:
4403         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4404                                         _GetInstanceInfoText(instance)):
4405           raise errors.OpExecError("Failed to create new LV named '%s' on"
4406                                    " node '%s'" %
4407                                    (new_lv.logical_id[1], tgt_node))
4408
4409     # Step: for each lv, detach+rename*2+attach
4410     self.proc.LogStep(4, steps_total, "change drbd configuration")
4411     for dev, old_lvs, new_lvs in iv_names.itervalues():
4412       info("detaching %s drbd from local storage" % dev.iv_name)
4413       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4414       result.Raise()
4415       if not result.data:
4416         raise errors.OpExecError("Can't detach drbd from local storage on node"
4417                                  " %s for device %s" % (tgt_node, dev.iv_name))
4418       #dev.children = []
4419       #cfg.Update(instance)
4420
4421       # ok, we created the new LVs, so now we know we have the needed
4422       # storage; as such, we proceed on the target node to rename
4423       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4424       # using the assumption that logical_id == physical_id (which in
4425       # turn is the unique_id on that node)
4426
4427       # FIXME(iustin): use a better name for the replaced LVs
4428       temp_suffix = int(time.time())
4429       ren_fn = lambda d, suff: (d.physical_id[0],
4430                                 d.physical_id[1] + "_replaced-%s" % suff)
4431       # build the rename list based on what LVs exist on the node
4432       rlist = []
4433       for to_ren in old_lvs:
4434         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4435         if not find_res.failed and find_res.data is not None: # device exists
4436           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4437
4438       info("renaming the old LVs on the target node")
4439       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4440       result.Raise()
4441       if not result.data:
4442         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4443       # now we rename the new LVs to the old LVs
4444       info("renaming the new LVs on the target node")
4445       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4446       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4447       result.Raise()
4448       if not result.data:
4449         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4450
4451       for old, new in zip(old_lvs, new_lvs):
4452         new.logical_id = old.logical_id
4453         cfg.SetDiskID(new, tgt_node)
4454
4455       for disk in old_lvs:
4456         disk.logical_id = ren_fn(disk, temp_suffix)
4457         cfg.SetDiskID(disk, tgt_node)
4458
4459       # now that the new lvs have the old name, we can add them to the device
4460       info("adding new mirror component on %s" % tgt_node)
4461       result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4462       if result.failed or not result.data:
4463         for new_lv in new_lvs:
4464           result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4465           if result.failed or not result.data:
4466             warning("Can't rollback device %s", hint="manually cleanup unused"
4467                     " logical volumes")
4468         raise errors.OpExecError("Can't add local storage to drbd")
4469
4470       dev.children = new_lvs
4471       cfg.Update(instance)
4472
4473     # Step: wait for sync
4474
4475     # this can fail as the old devices are degraded and _WaitForSync
4476     # does a combined result over all disks, so we don't check its
4477     # return value
4478     self.proc.LogStep(5, steps_total, "sync devices")
4479     _WaitForSync(self, instance, unlock=True)
4480
4481     # so check manually all the devices
4482     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4483       cfg.SetDiskID(dev, instance.primary_node)
4484       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4485       if result.failed or result.data[5]:
4486         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4487
4488     # Step: remove old storage
4489     self.proc.LogStep(6, steps_total, "removing old storage")
4490     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4491       info("remove logical volumes for %s" % name)
4492       for lv in old_lvs:
4493         cfg.SetDiskID(lv, tgt_node)
4494         result = self.rpc.call_blockdev_remove(tgt_node, lv)
4495         if result.failed or not result.data:
4496           warning("Can't remove old LV", hint="manually remove unused LVs")
4497           continue
4498
4499   def _ExecD8Secondary(self, feedback_fn):
4500     """Replace the secondary node for drbd8.
4501
4502     The algorithm for replace is quite complicated:
4503       - for all disks of the instance:
4504         - create new LVs on the new node with same names
4505         - shutdown the drbd device on the old secondary
4506         - disconnect the drbd network on the primary
4507         - create the drbd device on the new secondary
4508         - network attach the drbd on the primary, using an artifice:
4509           the drbd code for Attach() will connect to the network if it
4510           finds a device which is connected to the good local disks but
4511           not network enabled
4512       - wait for sync across all devices
4513       - remove all disks from the old secondary
4514
4515     Failures are not very well handled.
4516
4517     """
4518     steps_total = 6
4519     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4520     instance = self.instance
4521     iv_names = {}
4522     vgname = self.cfg.GetVGName()
4523     # start of work
4524     cfg = self.cfg
4525     old_node = self.tgt_node
4526     new_node = self.new_node
4527     pri_node = instance.primary_node
4528
4529     # Step: check device activation
4530     self.proc.LogStep(1, steps_total, "check device existence")
4531     info("checking volume groups")
4532     my_vg = cfg.GetVGName()
4533     results = self.rpc.call_vg_list([pri_node, new_node])
4534     for node in pri_node, new_node:
4535       res = results[node]
4536       if res.failed or not res.data or my_vg not in res.data:
4537         raise errors.OpExecError("Volume group '%s' not found on %s" %
4538                                  (my_vg, node))
4539     for idx, dev in enumerate(instance.disks):
4540       if idx not in self.op.disks:
4541         continue
4542       info("checking disk/%d on %s" % (idx, pri_node))
4543       cfg.SetDiskID(dev, pri_node)
4544       result = self.rpc.call_blockdev_find(pri_node, dev)
4545       result.Raise()
4546       if not result.data:
4547         raise errors.OpExecError("Can't find disk/%d on node %s" %
4548                                  (idx, pri_node))
4549
4550     # Step: check other node consistency
4551     self.proc.LogStep(2, steps_total, "check peer consistency")
4552     for idx, dev in enumerate(instance.disks):
4553       if idx not in self.op.disks:
4554         continue
4555       info("checking disk/%d consistency on %s" % (idx, pri_node))
4556       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4557         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4558                                  " unsafe to replace the secondary" %
4559                                  pri_node)
4560
4561     # Step: create new storage
4562     self.proc.LogStep(3, steps_total, "allocate new storage")
4563     for idx, dev in enumerate(instance.disks):
4564       size = dev.size
4565       info("adding new local storage on %s for disk/%d" %
4566            (new_node, idx))
4567       # since we *always* want to create this LV, we use the
4568       # _Create...OnPrimary (which forces the creation), even if we
4569       # are talking about the secondary node
4570       for new_lv in dev.children:
4571         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4572                                         _GetInstanceInfoText(instance)):
4573           raise errors.OpExecError("Failed to create new LV named '%s' on"
4574                                    " node '%s'" %
4575                                    (new_lv.logical_id[1], new_node))
4576
4577     # Step 4: dbrd minors and drbd setups changes
4578     # after this, we must manually remove the drbd minors on both the
4579     # error and the success paths
4580     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4581                                    instance.name)
4582     logging.debug("Allocated minors %s" % (minors,))
4583     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4584     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4585       size = dev.size
4586       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4587       # create new devices on new_node
4588       if pri_node == dev.logical_id[0]:
4589         new_logical_id = (pri_node, new_node,
4590                           dev.logical_id[2], dev.logical_id[3], new_minor,
4591                           dev.logical_id[5])
4592       else:
4593         new_logical_id = (new_node, pri_node,
4594                           dev.logical_id[2], new_minor, dev.logical_id[4],
4595                           dev.logical_id[5])
4596       iv_names[idx] = (dev, dev.children, new_logical_id)
4597       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4598                     new_logical_id)
4599       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4600                               logical_id=new_logical_id,
4601                               children=dev.children)
4602       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4603                                         new_drbd, False,
4604                                         _GetInstanceInfoText(instance)):
4605         self.cfg.ReleaseDRBDMinors(instance.name)
4606         raise errors.OpExecError("Failed to create new DRBD on"
4607                                  " node '%s'" % new_node)
4608
4609     for idx, dev in enumerate(instance.disks):
4610       # we have new devices, shutdown the drbd on the old secondary
4611       info("shutting down drbd for disk/%d on old node" % idx)
4612       cfg.SetDiskID(dev, old_node)
4613       result = self.rpc.call_blockdev_shutdown(old_node, dev)
4614       if result.failed or not result.data:
4615         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4616                 hint="Please cleanup this device manually as soon as possible")
4617
4618     info("detaching primary drbds from the network (=> standalone)")
4619     done = 0
4620     for idx, dev in enumerate(instance.disks):
4621       cfg.SetDiskID(dev, pri_node)
4622       # set the network part of the physical (unique in bdev terms) id
4623       # to None, meaning detach from network
4624       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4625       # and 'find' the device, which will 'fix' it to match the
4626       # standalone state
4627       result = self.rpc.call_blockdev_find(pri_node, dev)
4628       if not result.failed and result.data:
4629         done += 1
4630       else:
4631         warning("Failed to detach drbd disk/%d from network, unusual case" %
4632                 idx)
4633
4634     if not done:
4635       # no detaches succeeded (very unlikely)
4636       self.cfg.ReleaseDRBDMinors(instance.name)
4637       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4638
4639     # if we managed to detach at least one, we update all the disks of
4640     # the instance to point to the new secondary
4641     info("updating instance configuration")
4642     for dev, _, new_logical_id in iv_names.itervalues():
4643       dev.logical_id = new_logical_id
4644       cfg.SetDiskID(dev, pri_node)
4645     cfg.Update(instance)
4646     # we can remove now the temp minors as now the new values are
4647     # written to the config file (and therefore stable)
4648     self.cfg.ReleaseDRBDMinors(instance.name)
4649
4650     # and now perform the drbd attach
4651     info("attaching primary drbds to new secondary (standalone => connected)")
4652     failures = []
4653     for idx, dev in enumerate(instance.disks):
4654       info("attaching primary drbd for disk/%d to new secondary node" % idx)
4655       # since the attach is smart, it's enough to 'find' the device,
4656       # it will automatically activate the network, if the physical_id
4657       # is correct
4658       cfg.SetDiskID(dev, pri_node)
4659       logging.debug("Disk to attach: %s", dev)
4660       result = self.rpc.call_blockdev_find(pri_node, dev)
4661       if result.failed or not result.data:
4662         warning("can't attach drbd disk/%d to new secondary!" % idx,
4663                 "please do a gnt-instance info to see the status of disks")
4664
4665     # this can fail as the old devices are degraded and _WaitForSync
4666     # does a combined result over all disks, so we don't check its
4667     # return value
4668     self.proc.LogStep(5, steps_total, "sync devices")
4669     _WaitForSync(self, instance, unlock=True)
4670
4671     # so check manually all the devices
4672     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4673       cfg.SetDiskID(dev, pri_node)
4674       result = self.rpc.call_blockdev_find(pri_node, dev)
4675       result.Raise()
4676       if result.data[5]:
4677         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4678
4679     self.proc.LogStep(6, steps_total, "removing old storage")
4680     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4681       info("remove logical volumes for disk/%d" % idx)
4682       for lv in old_lvs:
4683         cfg.SetDiskID(lv, old_node)
4684         result = self.rpc.call_blockdev_remove(old_node, lv)
4685         if result.failed or not result.data:
4686           warning("Can't remove LV on old secondary",
4687                   hint="Cleanup stale volumes by hand")
4688
4689   def Exec(self, feedback_fn):
4690     """Execute disk replacement.
4691
4692     This dispatches the disk replacement to the appropriate handler.
4693
4694     """
4695     instance = self.instance
4696
4697     # Activate the instance disks if we're replacing them on a down instance
4698     if instance.status == "down":
4699       _StartInstanceDisks(self, instance, True)
4700
4701     if instance.disk_template == constants.DT_DRBD8:
4702       if self.op.remote_node is None:
4703         fn = self._ExecD8DiskOnly
4704       else:
4705         fn = self._ExecD8Secondary
4706     else:
4707       raise errors.ProgrammerError("Unhandled disk replacement case")
4708
4709     ret = fn(feedback_fn)
4710
4711     # Deactivate the instance disks if we're replacing them on a down instance
4712     if instance.status == "down":
4713       _SafeShutdownInstanceDisks(self, instance)
4714
4715     return ret
4716
4717
4718 class LUGrowDisk(LogicalUnit):
4719   """Grow a disk of an instance.
4720
4721   """
4722   HPATH = "disk-grow"
4723   HTYPE = constants.HTYPE_INSTANCE
4724   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4725   REQ_BGL = False
4726
4727   def ExpandNames(self):
4728     self._ExpandAndLockInstance()
4729     self.needed_locks[locking.LEVEL_NODE] = []
4730     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4731
4732   def DeclareLocks(self, level):
4733     if level == locking.LEVEL_NODE:
4734       self._LockInstancesNodes()
4735
4736   def BuildHooksEnv(self):
4737     """Build hooks env.
4738
4739     This runs on the master, the primary and all the secondaries.
4740
4741     """
4742     env = {
4743       "DISK": self.op.disk,
4744       "AMOUNT": self.op.amount,
4745       }
4746     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4747     nl = [
4748       self.cfg.GetMasterNode(),
4749       self.instance.primary_node,
4750       ]
4751     return env, nl, nl
4752
4753   def CheckPrereq(self):
4754     """Check prerequisites.
4755
4756     This checks that the instance is in the cluster.
4757
4758     """
4759     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4760     assert instance is not None, \
4761       "Cannot retrieve locked instance %s" % self.op.instance_name
4762
4763     self.instance = instance
4764
4765     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4766       raise errors.OpPrereqError("Instance's disk layout does not support"
4767                                  " growing.")
4768
4769     self.disk = instance.FindDisk(self.op.disk)
4770
4771     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4772     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4773                                        instance.hypervisor)
4774     for node in nodenames:
4775       info = nodeinfo[node]
4776       if info.failed or not info.data:
4777         raise errors.OpPrereqError("Cannot get current information"
4778                                    " from node '%s'" % node)
4779       vg_free = info.data.get('vg_free', None)
4780       if not isinstance(vg_free, int):
4781         raise errors.OpPrereqError("Can't compute free disk space on"
4782                                    " node %s" % node)
4783       if self.op.amount > vg_free:
4784         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4785                                    " %d MiB available, %d MiB required" %
4786                                    (node, vg_free, self.op.amount))
4787
4788   def Exec(self, feedback_fn):
4789     """Execute disk grow.
4790
4791     """
4792     instance = self.instance
4793     disk = self.disk
4794     for node in (instance.secondary_nodes + (instance.primary_node,)):
4795       self.cfg.SetDiskID(disk, node)
4796       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4797       result.Raise()
4798       if (not result.data or not isinstance(result.data, (list, tuple)) or
4799           len(result.data) != 2):
4800         raise errors.OpExecError("Grow request failed to node %s" % node)
4801       elif not result.data[0]:
4802         raise errors.OpExecError("Grow request failed to node %s: %s" %
4803                                  (node, result.data[1]))
4804     disk.RecordGrow(self.op.amount)
4805     self.cfg.Update(instance)
4806     if self.op.wait_for_sync:
4807       disk_abort = not _WaitForSync(self, instance)
4808       if disk_abort:
4809         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4810                              " status.\nPlease check the instance.")
4811
4812
4813 class LUQueryInstanceData(NoHooksLU):
4814   """Query runtime instance data.
4815
4816   """
4817   _OP_REQP = ["instances", "static"]
4818   REQ_BGL = False
4819
4820   def ExpandNames(self):
4821     self.needed_locks = {}
4822     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4823
4824     if not isinstance(self.op.instances, list):
4825       raise errors.OpPrereqError("Invalid argument type 'instances'")
4826
4827     if self.op.instances:
4828       self.wanted_names = []
4829       for name in self.op.instances:
4830         full_name = self.cfg.ExpandInstanceName(name)
4831         if full_name is None:
4832           raise errors.OpPrereqError("Instance '%s' not known" %
4833                                      self.op.instance_name)
4834         self.wanted_names.append(full_name)
4835       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4836     else:
4837       self.wanted_names = None
4838       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4839
4840     self.needed_locks[locking.LEVEL_NODE] = []
4841     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4842
4843   def DeclareLocks(self, level):
4844     if level == locking.LEVEL_NODE:
4845       self._LockInstancesNodes()
4846
4847   def CheckPrereq(self):
4848     """Check prerequisites.
4849
4850     This only checks the optional instance list against the existing names.
4851
4852     """
4853     if self.wanted_names is None:
4854       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4855
4856     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4857                              in self.wanted_names]
4858     return
4859
4860   def _ComputeDiskStatus(self, instance, snode, dev):
4861     """Compute block device status.
4862
4863     """
4864     static = self.op.static
4865     if not static:
4866       self.cfg.SetDiskID(dev, instance.primary_node)
4867       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4868       dev_pstatus.Raise()
4869       dev_pstatus = dev_pstatus.data
4870     else:
4871       dev_pstatus = None
4872
4873     if dev.dev_type in constants.LDS_DRBD:
4874       # we change the snode then (otherwise we use the one passed in)
4875       if dev.logical_id[0] == instance.primary_node:
4876         snode = dev.logical_id[1]
4877       else:
4878         snode = dev.logical_id[0]
4879
4880     if snode and not static:
4881       self.cfg.SetDiskID(dev, snode)
4882       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4883       dev_sstatus.Raise()
4884       dev_sstatus = dev_sstatus.data
4885     else:
4886       dev_sstatus = None
4887
4888     if dev.children:
4889       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4890                       for child in dev.children]
4891     else:
4892       dev_children = []
4893
4894     data = {
4895       "iv_name": dev.iv_name,
4896       "dev_type": dev.dev_type,
4897       "logical_id": dev.logical_id,
4898       "physical_id": dev.physical_id,
4899       "pstatus": dev_pstatus,
4900       "sstatus": dev_sstatus,
4901       "children": dev_children,
4902       "mode": dev.mode,
4903       }
4904
4905     return data
4906
4907   def Exec(self, feedback_fn):
4908     """Gather and return data"""
4909     result = {}
4910
4911     cluster = self.cfg.GetClusterInfo()
4912
4913     for instance in self.wanted_instances:
4914       if not self.op.static:
4915         remote_info = self.rpc.call_instance_info(instance.primary_node,
4916                                                   instance.name,
4917                                                   instance.hypervisor)
4918         remote_info.Raise()
4919         remote_info = remote_info.data
4920         if remote_info and "state" in remote_info:
4921           remote_state = "up"
4922         else:
4923           remote_state = "down"
4924       else:
4925         remote_state = None
4926       if instance.status == "down":
4927         config_state = "down"
4928       else:
4929         config_state = "up"
4930
4931       disks = [self._ComputeDiskStatus(instance, None, device)
4932                for device in instance.disks]
4933
4934       idict = {
4935         "name": instance.name,
4936         "config_state": config_state,
4937         "run_state": remote_state,
4938         "pnode": instance.primary_node,
4939         "snodes": instance.secondary_nodes,
4940         "os": instance.os,
4941         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4942         "disks": disks,
4943         "hypervisor": instance.hypervisor,
4944         "network_port": instance.network_port,
4945         "hv_instance": instance.hvparams,
4946         "hv_actual": cluster.FillHV(instance),
4947         "be_instance": instance.beparams,
4948         "be_actual": cluster.FillBE(instance),
4949         }
4950
4951       result[instance.name] = idict
4952
4953     return result
4954
4955
4956 class LUSetInstanceParams(LogicalUnit):
4957   """Modifies an instances's parameters.
4958
4959   """
4960   HPATH = "instance-modify"
4961   HTYPE = constants.HTYPE_INSTANCE
4962   _OP_REQP = ["instance_name"]
4963   REQ_BGL = False
4964
4965   def CheckArguments(self):
4966     if not hasattr(self.op, 'nics'):
4967       self.op.nics = []
4968     if not hasattr(self.op, 'disks'):
4969       self.op.disks = []
4970     if not hasattr(self.op, 'beparams'):
4971       self.op.beparams = {}
4972     if not hasattr(self.op, 'hvparams'):
4973       self.op.hvparams = {}
4974     self.op.force = getattr(self.op, "force", False)
4975     if not (self.op.nics or self.op.disks or
4976             self.op.hvparams or self.op.beparams):
4977       raise errors.OpPrereqError("No changes submitted")
4978
4979     utils.CheckBEParams(self.op.beparams)
4980
4981     # Disk validation
4982     disk_addremove = 0
4983     for disk_op, disk_dict in self.op.disks:
4984       if disk_op == constants.DDM_REMOVE:
4985         disk_addremove += 1
4986         continue
4987       elif disk_op == constants.DDM_ADD:
4988         disk_addremove += 1
4989       else:
4990         if not isinstance(disk_op, int):
4991           raise errors.OpPrereqError("Invalid disk index")
4992       if disk_op == constants.DDM_ADD:
4993         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
4994         if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
4995           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
4996         size = disk_dict.get('size', None)
4997         if size is None:
4998           raise errors.OpPrereqError("Required disk parameter size missing")
4999         try:
5000           size = int(size)
5001         except ValueError, err:
5002           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5003                                      str(err))
5004         disk_dict['size'] = size
5005       else:
5006         # modification of disk
5007         if 'size' in disk_dict:
5008           raise errors.OpPrereqError("Disk size change not possible, use"
5009                                      " grow-disk")
5010
5011     if disk_addremove > 1:
5012       raise errors.OpPrereqError("Only one disk add or remove operation"
5013                                  " supported at a time")
5014
5015     # NIC validation
5016     nic_addremove = 0
5017     for nic_op, nic_dict in self.op.nics:
5018       if nic_op == constants.DDM_REMOVE:
5019         nic_addremove += 1
5020         continue
5021       elif nic_op == constants.DDM_ADD:
5022         nic_addremove += 1
5023       else:
5024         if not isinstance(nic_op, int):
5025           raise errors.OpPrereqError("Invalid nic index")
5026
5027       # nic_dict should be a dict
5028       nic_ip = nic_dict.get('ip', None)
5029       if nic_ip is not None:
5030         if nic_ip.lower() == "none":
5031           nic_dict['ip'] = None
5032         else:
5033           if not utils.IsValidIP(nic_ip):
5034             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5035       # we can only check None bridges and assign the default one
5036       nic_bridge = nic_dict.get('bridge', None)
5037       if nic_bridge is None:
5038         nic_dict['bridge'] = self.cfg.GetDefBridge()
5039       # but we can validate MACs
5040       nic_mac = nic_dict.get('mac', None)
5041       if nic_mac is not None:
5042         if self.cfg.IsMacInUse(nic_mac):
5043           raise errors.OpPrereqError("MAC address %s already in use"
5044                                      " in cluster" % nic_mac)
5045         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5046           if not utils.IsValidMac(nic_mac):
5047             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5048     if nic_addremove > 1:
5049       raise errors.OpPrereqError("Only one NIC add or remove operation"
5050                                  " supported at a time")
5051
5052   def ExpandNames(self):
5053     self._ExpandAndLockInstance()
5054     self.needed_locks[locking.LEVEL_NODE] = []
5055     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5056
5057   def DeclareLocks(self, level):
5058     if level == locking.LEVEL_NODE:
5059       self._LockInstancesNodes()
5060
5061   def BuildHooksEnv(self):
5062     """Build hooks env.
5063
5064     This runs on the master, primary and secondaries.
5065
5066     """
5067     args = dict()
5068     if constants.BE_MEMORY in self.be_new:
5069       args['memory'] = self.be_new[constants.BE_MEMORY]
5070     if constants.BE_VCPUS in self.be_new:
5071       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5072     # FIXME: readd disk/nic changes
5073     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5074     nl = [self.cfg.GetMasterNode(),
5075           self.instance.primary_node] + list(self.instance.secondary_nodes)
5076     return env, nl, nl
5077
5078   def CheckPrereq(self):
5079     """Check prerequisites.
5080
5081     This only checks the instance list against the existing names.
5082
5083     """
5084     force = self.force = self.op.force
5085
5086     # checking the new params on the primary/secondary nodes
5087
5088     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5089     assert self.instance is not None, \
5090       "Cannot retrieve locked instance %s" % self.op.instance_name
5091     pnode = self.instance.primary_node
5092     nodelist = [pnode]
5093     nodelist.extend(instance.secondary_nodes)
5094
5095     # hvparams processing
5096     if self.op.hvparams:
5097       i_hvdict = copy.deepcopy(instance.hvparams)
5098       for key, val in self.op.hvparams.iteritems():
5099         if val == constants.VALUE_DEFAULT:
5100           try:
5101             del i_hvdict[key]
5102           except KeyError:
5103             pass
5104         elif val == constants.VALUE_NONE:
5105           i_hvdict[key] = None
5106         else:
5107           i_hvdict[key] = val
5108       cluster = self.cfg.GetClusterInfo()
5109       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5110                                 i_hvdict)
5111       # local check
5112       hypervisor.GetHypervisor(
5113         instance.hypervisor).CheckParameterSyntax(hv_new)
5114       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5115       self.hv_new = hv_new # the new actual values
5116       self.hv_inst = i_hvdict # the new dict (without defaults)
5117     else:
5118       self.hv_new = self.hv_inst = {}
5119
5120     # beparams processing
5121     if self.op.beparams:
5122       i_bedict = copy.deepcopy(instance.beparams)
5123       for key, val in self.op.beparams.iteritems():
5124         if val == constants.VALUE_DEFAULT:
5125           try:
5126             del i_bedict[key]
5127           except KeyError:
5128             pass
5129         else:
5130           i_bedict[key] = val
5131       cluster = self.cfg.GetClusterInfo()
5132       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5133                                 i_bedict)
5134       self.be_new = be_new # the new actual values
5135       self.be_inst = i_bedict # the new dict (without defaults)
5136     else:
5137       self.be_new = self.be_inst = {}
5138
5139     self.warn = []
5140
5141     if constants.BE_MEMORY in self.op.beparams and not self.force:
5142       mem_check_list = [pnode]
5143       if be_new[constants.BE_AUTO_BALANCE]:
5144         # either we changed auto_balance to yes or it was from before
5145         mem_check_list.extend(instance.secondary_nodes)
5146       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5147                                                   instance.hypervisor)
5148       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5149                                          instance.hypervisor)
5150       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5151         # Assume the primary node is unreachable and go ahead
5152         self.warn.append("Can't get info from primary node %s" % pnode)
5153       else:
5154         if not instance_info.failed and instance_info.data:
5155           current_mem = instance_info.data['memory']
5156         else:
5157           # Assume instance not running
5158           # (there is a slight race condition here, but it's not very probable,
5159           # and we have no other way to check)
5160           current_mem = 0
5161         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5162                     nodeinfo[pnode].data['memory_free'])
5163         if miss_mem > 0:
5164           raise errors.OpPrereqError("This change will prevent the instance"
5165                                      " from starting, due to %d MB of memory"
5166                                      " missing on its primary node" % miss_mem)
5167
5168       if be_new[constants.BE_AUTO_BALANCE]:
5169         for node, nres in instance.secondary_nodes.iteritems():
5170           if nres.failed or not isinstance(nres.data, dict):
5171             self.warn.append("Can't get info from secondary node %s" % node)
5172           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5173             self.warn.append("Not enough memory to failover instance to"
5174                              " secondary node %s" % node)
5175
5176     # NIC processing
5177     for nic_op, nic_dict in self.op.nics:
5178       if nic_op == constants.DDM_REMOVE:
5179         if not instance.nics:
5180           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5181         continue
5182       if nic_op != constants.DDM_ADD:
5183         # an existing nic
5184         if nic_op < 0 or nic_op >= len(instance.nics):
5185           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5186                                      " are 0 to %d" %
5187                                      (nic_op, len(instance.nics)))
5188       nic_bridge = nic_dict.get('bridge', None)
5189       if nic_bridge is not None:
5190         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5191           msg = ("Bridge '%s' doesn't exist on one of"
5192                  " the instance nodes" % nic_bridge)
5193           if self.force:
5194             self.warn.append(msg)
5195           else:
5196             raise errors.OpPrereqError(msg)
5197
5198     # DISK processing
5199     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5200       raise errors.OpPrereqError("Disk operations not supported for"
5201                                  " diskless instances")
5202     for disk_op, disk_dict in self.op.disks:
5203       if disk_op == constants.DDM_REMOVE:
5204         if len(instance.disks) == 1:
5205           raise errors.OpPrereqError("Cannot remove the last disk of"
5206                                      " an instance")
5207         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5208         ins_l = ins_l[pnode]
5209         if not type(ins_l) is list:
5210           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5211         if instance.name in ins_l:
5212           raise errors.OpPrereqError("Instance is running, can't remove"
5213                                      " disks.")
5214
5215       if (disk_op == constants.DDM_ADD and
5216           len(instance.nics) >= constants.MAX_DISKS):
5217         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5218                                    " add more" % constants.MAX_DISKS)
5219       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5220         # an existing disk
5221         if disk_op < 0 or disk_op >= len(instance.disks):
5222           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5223                                      " are 0 to %d" %
5224                                      (disk_op, len(instance.disks)))
5225
5226     return
5227
5228   def Exec(self, feedback_fn):
5229     """Modifies an instance.
5230
5231     All parameters take effect only at the next restart of the instance.
5232
5233     """
5234     # Process here the warnings from CheckPrereq, as we don't have a
5235     # feedback_fn there.
5236     for warn in self.warn:
5237       feedback_fn("WARNING: %s" % warn)
5238
5239     result = []
5240     instance = self.instance
5241     # disk changes
5242     for disk_op, disk_dict in self.op.disks:
5243       if disk_op == constants.DDM_REMOVE:
5244         # remove the last disk
5245         device = instance.disks.pop()
5246         device_idx = len(instance.disks)
5247         for node, disk in device.ComputeNodeTree(instance.primary_node):
5248           self.cfg.SetDiskID(disk, node)
5249           result = self.rpc.call_blockdev_remove(node, disk)
5250           if result.failed or not result.data:
5251             self.proc.LogWarning("Could not remove disk/%d on node %s,"
5252                                  " continuing anyway", device_idx, node)
5253         result.append(("disk/%d" % device_idx, "remove"))
5254       elif disk_op == constants.DDM_ADD:
5255         # add a new disk
5256         if instance.disk_template == constants.DT_FILE:
5257           file_driver, file_path = instance.disks[0].logical_id
5258           file_path = os.path.dirname(file_path)
5259         else:
5260           file_driver = file_path = None
5261         disk_idx_base = len(instance.disks)
5262         new_disk = _GenerateDiskTemplate(self,
5263                                          instance.disk_template,
5264                                          instance, instance.primary_node,
5265                                          instance.secondary_nodes,
5266                                          [disk_dict],
5267                                          file_path,
5268                                          file_driver,
5269                                          disk_idx_base)[0]
5270         new_disk.mode = disk_dict['mode']
5271         instance.disks.append(new_disk)
5272         info = _GetInstanceInfoText(instance)
5273
5274         logging.info("Creating volume %s for instance %s",
5275                      new_disk.iv_name, instance.name)
5276         # Note: this needs to be kept in sync with _CreateDisks
5277         #HARDCODE
5278         for secondary_node in instance.secondary_nodes:
5279           if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5280                                             new_disk, False, info):
5281             self.LogWarning("Failed to create volume %s (%s) on"
5282                             " secondary node %s!",
5283                             new_disk.iv_name, new_disk, secondary_node)
5284         #HARDCODE
5285         if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5286                                         instance, new_disk, info):
5287           self.LogWarning("Failed to create volume %s on primary!",
5288                           new_disk.iv_name)
5289         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5290                        (new_disk.size, new_disk.mode)))
5291       else:
5292         # change a given disk
5293         instance.disks[disk_op].mode = disk_dict['mode']
5294         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5295     # NIC changes
5296     for nic_op, nic_dict in self.op.nics:
5297       if nic_op == constants.DDM_REMOVE:
5298         # remove the last nic
5299         del instance.nics[-1]
5300         result.append(("nic.%d" % len(instance.nics), "remove"))
5301       elif nic_op == constants.DDM_ADD:
5302         # add a new nic
5303         if 'mac' not in nic_dict:
5304           mac = constants.VALUE_GENERATE
5305         else:
5306           mac = nic_dict['mac']
5307         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5308           mac = self.cfg.GenerateMAC()
5309         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5310                               bridge=nic_dict.get('bridge', None))
5311         instance.nics.append(new_nic)
5312         result.append(("nic.%d" % (len(instance.nics) - 1),
5313                        "add:mac=%s,ip=%s,bridge=%s" %
5314                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5315       else:
5316         # change a given nic
5317         for key in 'mac', 'ip', 'bridge':
5318           if key in nic_dict:
5319             setattr(instance.nics[nic_op], key, nic_dict[key])
5320             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5321
5322     # hvparams changes
5323     if self.op.hvparams:
5324       instance.hvparams = self.hv_new
5325       for key, val in self.op.hvparams.iteritems():
5326         result.append(("hv/%s" % key, val))
5327
5328     # beparams changes
5329     if self.op.beparams:
5330       instance.beparams = self.be_inst
5331       for key, val in self.op.beparams.iteritems():
5332         result.append(("be/%s" % key, val))
5333
5334     self.cfg.Update(instance)
5335
5336     return result
5337
5338
5339 class LUQueryExports(NoHooksLU):
5340   """Query the exports list
5341
5342   """
5343   _OP_REQP = ['nodes']
5344   REQ_BGL = False
5345
5346   def ExpandNames(self):
5347     self.needed_locks = {}
5348     self.share_locks[locking.LEVEL_NODE] = 1
5349     if not self.op.nodes:
5350       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5351     else:
5352       self.needed_locks[locking.LEVEL_NODE] = \
5353         _GetWantedNodes(self, self.op.nodes)
5354
5355   def CheckPrereq(self):
5356     """Check prerequisites.
5357
5358     """
5359     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5360
5361   def Exec(self, feedback_fn):
5362     """Compute the list of all the exported system images.
5363
5364     @rtype: dict
5365     @return: a dictionary with the structure node->(export-list)
5366         where export-list is a list of the instances exported on
5367         that node.
5368
5369     """
5370     rpcresult = self.rpc.call_export_list(self.nodes)
5371     result = {}
5372     for node in rpcresult:
5373       if rpcresult[node].failed:
5374         result[node] = False
5375       else:
5376         result[node] = rpcresult[node].data
5377
5378     return result
5379
5380
5381 class LUExportInstance(LogicalUnit):
5382   """Export an instance to an image in the cluster.
5383
5384   """
5385   HPATH = "instance-export"
5386   HTYPE = constants.HTYPE_INSTANCE
5387   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5388   REQ_BGL = False
5389
5390   def ExpandNames(self):
5391     self._ExpandAndLockInstance()
5392     # FIXME: lock only instance primary and destination node
5393     #
5394     # Sad but true, for now we have do lock all nodes, as we don't know where
5395     # the previous export might be, and and in this LU we search for it and
5396     # remove it from its current node. In the future we could fix this by:
5397     #  - making a tasklet to search (share-lock all), then create the new one,
5398     #    then one to remove, after
5399     #  - removing the removal operation altoghether
5400     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5401
5402   def DeclareLocks(self, level):
5403     """Last minute lock declaration."""
5404     # All nodes are locked anyway, so nothing to do here.
5405
5406   def BuildHooksEnv(self):
5407     """Build hooks env.
5408
5409     This will run on the master, primary node and target node.
5410
5411     """
5412     env = {
5413       "EXPORT_NODE": self.op.target_node,
5414       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5415       }
5416     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5417     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5418           self.op.target_node]
5419     return env, nl, nl
5420
5421   def CheckPrereq(self):
5422     """Check prerequisites.
5423
5424     This checks that the instance and node names are valid.
5425
5426     """
5427     instance_name = self.op.instance_name
5428     self.instance = self.cfg.GetInstanceInfo(instance_name)
5429     assert self.instance is not None, \
5430           "Cannot retrieve locked instance %s" % self.op.instance_name
5431
5432     self.dst_node = self.cfg.GetNodeInfo(
5433       self.cfg.ExpandNodeName(self.op.target_node))
5434
5435     if self.dst_node is None:
5436       # This is wrong node name, not a non-locked node
5437       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5438
5439     # instance disk type verification
5440     for disk in self.instance.disks:
5441       if disk.dev_type == constants.LD_FILE:
5442         raise errors.OpPrereqError("Export not supported for instances with"
5443                                    " file-based disks")
5444
5445   def Exec(self, feedback_fn):
5446     """Export an instance to an image in the cluster.
5447
5448     """
5449     instance = self.instance
5450     dst_node = self.dst_node
5451     src_node = instance.primary_node
5452     if self.op.shutdown:
5453       # shutdown the instance, but not the disks
5454       result = self.rpc.call_instance_shutdown(src_node, instance)
5455       result.Raise()
5456       if not result.data:
5457         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5458                                  (instance.name, src_node))
5459
5460     vgname = self.cfg.GetVGName()
5461
5462     snap_disks = []
5463
5464     try:
5465       for disk in instance.disks:
5466         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5467         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5468         if new_dev_name.failed or not new_dev_name.data:
5469           self.LogWarning("Could not snapshot block device %s on node %s",
5470                           disk.logical_id[1], src_node)
5471           snap_disks.append(False)
5472         else:
5473           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5474                                  logical_id=(vgname, new_dev_name.data),
5475                                  physical_id=(vgname, new_dev_name.data),
5476                                  iv_name=disk.iv_name)
5477           snap_disks.append(new_dev)
5478
5479     finally:
5480       if self.op.shutdown and instance.status == "up":
5481         result = self.rpc.call_instance_start(src_node, instance, None)
5482         if result.failed or not result.data:
5483           _ShutdownInstanceDisks(self, instance)
5484           raise errors.OpExecError("Could not start instance")
5485
5486     # TODO: check for size
5487
5488     cluster_name = self.cfg.GetClusterName()
5489     for idx, dev in enumerate(snap_disks):
5490       if dev:
5491         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5492                                                instance, cluster_name, idx)
5493         if result.failed or not result.data:
5494           self.LogWarning("Could not export block device %s from node %s to"
5495                           " node %s", dev.logical_id[1], src_node,
5496                           dst_node.name)
5497         result = self.rpc.call_blockdev_remove(src_node, dev)
5498         if result.failed or not result.data:
5499           self.LogWarning("Could not remove snapshot block device %s from node"
5500                           " %s", dev.logical_id[1], src_node)
5501
5502     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5503     if result.failed or not result.data:
5504       self.LogWarning("Could not finalize export for instance %s on node %s",
5505                       instance.name, dst_node.name)
5506
5507     nodelist = self.cfg.GetNodeList()
5508     nodelist.remove(dst_node.name)
5509
5510     # on one-node clusters nodelist will be empty after the removal
5511     # if we proceed the backup would be removed because OpQueryExports
5512     # substitutes an empty list with the full cluster node list.
5513     if nodelist:
5514       exportlist = self.rpc.call_export_list(nodelist)
5515       for node in exportlist:
5516         if exportlist[node].failed:
5517           continue
5518         if instance.name in exportlist[node].data:
5519           if not self.rpc.call_export_remove(node, instance.name):
5520             self.LogWarning("Could not remove older export for instance %s"
5521                             " on node %s", instance.name, node)
5522
5523
5524 class LURemoveExport(NoHooksLU):
5525   """Remove exports related to the named instance.
5526
5527   """
5528   _OP_REQP = ["instance_name"]
5529   REQ_BGL = False
5530
5531   def ExpandNames(self):
5532     self.needed_locks = {}
5533     # We need all nodes to be locked in order for RemoveExport to work, but we
5534     # don't need to lock the instance itself, as nothing will happen to it (and
5535     # we can remove exports also for a removed instance)
5536     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5537
5538   def CheckPrereq(self):
5539     """Check prerequisites.
5540     """
5541     pass
5542
5543   def Exec(self, feedback_fn):
5544     """Remove any export.
5545
5546     """
5547     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5548     # If the instance was not found we'll try with the name that was passed in.
5549     # This will only work if it was an FQDN, though.
5550     fqdn_warn = False
5551     if not instance_name:
5552       fqdn_warn = True
5553       instance_name = self.op.instance_name
5554
5555     exportlist = self.rpc.call_export_list(self.acquired_locks[
5556       locking.LEVEL_NODE])
5557     found = False
5558     for node in exportlist:
5559       if exportlist[node].failed:
5560         self.LogWarning("Failed to query node %s, continuing" % node)
5561         continue
5562       if instance_name in exportlist[node].data:
5563         found = True
5564         result = self.rpc.call_export_remove(node, instance_name)
5565         if result.failed or not result.data:
5566           logging.error("Could not remove export for instance %s"
5567                         " on node %s", instance_name, node)
5568
5569     if fqdn_warn and not found:
5570       feedback_fn("Export not found. If trying to remove an export belonging"
5571                   " to a deleted instance please use its Fully Qualified"
5572                   " Domain Name.")
5573
5574
5575 class TagsLU(NoHooksLU):
5576   """Generic tags LU.
5577
5578   This is an abstract class which is the parent of all the other tags LUs.
5579
5580   """
5581
5582   def ExpandNames(self):
5583     self.needed_locks = {}
5584     if self.op.kind == constants.TAG_NODE:
5585       name = self.cfg.ExpandNodeName(self.op.name)
5586       if name is None:
5587         raise errors.OpPrereqError("Invalid node name (%s)" %
5588                                    (self.op.name,))
5589       self.op.name = name
5590       self.needed_locks[locking.LEVEL_NODE] = name
5591     elif self.op.kind == constants.TAG_INSTANCE:
5592       name = self.cfg.ExpandInstanceName(self.op.name)
5593       if name is None:
5594         raise errors.OpPrereqError("Invalid instance name (%s)" %
5595                                    (self.op.name,))
5596       self.op.name = name
5597       self.needed_locks[locking.LEVEL_INSTANCE] = name
5598
5599   def CheckPrereq(self):
5600     """Check prerequisites.
5601
5602     """
5603     if self.op.kind == constants.TAG_CLUSTER:
5604       self.target = self.cfg.GetClusterInfo()
5605     elif self.op.kind == constants.TAG_NODE:
5606       self.target = self.cfg.GetNodeInfo(self.op.name)
5607     elif self.op.kind == constants.TAG_INSTANCE:
5608       self.target = self.cfg.GetInstanceInfo(self.op.name)
5609     else:
5610       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5611                                  str(self.op.kind))
5612
5613
5614 class LUGetTags(TagsLU):
5615   """Returns the tags of a given object.
5616
5617   """
5618   _OP_REQP = ["kind", "name"]
5619   REQ_BGL = False
5620
5621   def Exec(self, feedback_fn):
5622     """Returns the tag list.
5623
5624     """
5625     return list(self.target.GetTags())
5626
5627
5628 class LUSearchTags(NoHooksLU):
5629   """Searches the tags for a given pattern.
5630
5631   """
5632   _OP_REQP = ["pattern"]
5633   REQ_BGL = False
5634
5635   def ExpandNames(self):
5636     self.needed_locks = {}
5637
5638   def CheckPrereq(self):
5639     """Check prerequisites.
5640
5641     This checks the pattern passed for validity by compiling it.
5642
5643     """
5644     try:
5645       self.re = re.compile(self.op.pattern)
5646     except re.error, err:
5647       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5648                                  (self.op.pattern, err))
5649
5650   def Exec(self, feedback_fn):
5651     """Returns the tag list.
5652
5653     """
5654     cfg = self.cfg
5655     tgts = [("/cluster", cfg.GetClusterInfo())]
5656     ilist = cfg.GetAllInstancesInfo().values()
5657     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5658     nlist = cfg.GetAllNodesInfo().values()
5659     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5660     results = []
5661     for path, target in tgts:
5662       for tag in target.GetTags():
5663         if self.re.search(tag):
5664           results.append((path, tag))
5665     return results
5666
5667
5668 class LUAddTags(TagsLU):
5669   """Sets a tag on a given object.
5670
5671   """
5672   _OP_REQP = ["kind", "name", "tags"]
5673   REQ_BGL = False
5674
5675   def CheckPrereq(self):
5676     """Check prerequisites.
5677
5678     This checks the type and length of the tag name and value.
5679
5680     """
5681     TagsLU.CheckPrereq(self)
5682     for tag in self.op.tags:
5683       objects.TaggableObject.ValidateTag(tag)
5684
5685   def Exec(self, feedback_fn):
5686     """Sets the tag.
5687
5688     """
5689     try:
5690       for tag in self.op.tags:
5691         self.target.AddTag(tag)
5692     except errors.TagError, err:
5693       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5694     try:
5695       self.cfg.Update(self.target)
5696     except errors.ConfigurationError:
5697       raise errors.OpRetryError("There has been a modification to the"
5698                                 " config file and the operation has been"
5699                                 " aborted. Please retry.")
5700
5701
5702 class LUDelTags(TagsLU):
5703   """Delete a list of tags from a given object.
5704
5705   """
5706   _OP_REQP = ["kind", "name", "tags"]
5707   REQ_BGL = False
5708
5709   def CheckPrereq(self):
5710     """Check prerequisites.
5711
5712     This checks that we have the given tag.
5713
5714     """
5715     TagsLU.CheckPrereq(self)
5716     for tag in self.op.tags:
5717       objects.TaggableObject.ValidateTag(tag)
5718     del_tags = frozenset(self.op.tags)
5719     cur_tags = self.target.GetTags()
5720     if not del_tags <= cur_tags:
5721       diff_tags = del_tags - cur_tags
5722       diff_names = ["'%s'" % tag for tag in diff_tags]
5723       diff_names.sort()
5724       raise errors.OpPrereqError("Tag(s) %s not found" %
5725                                  (",".join(diff_names)))
5726
5727   def Exec(self, feedback_fn):
5728     """Remove the tag from the object.
5729
5730     """
5731     for tag in self.op.tags:
5732       self.target.RemoveTag(tag)
5733     try:
5734       self.cfg.Update(self.target)
5735     except errors.ConfigurationError:
5736       raise errors.OpRetryError("There has been a modification to the"
5737                                 " config file and the operation has been"
5738                                 " aborted. Please retry.")
5739
5740
5741 class LUTestDelay(NoHooksLU):
5742   """Sleep for a specified amount of time.
5743
5744   This LU sleeps on the master and/or nodes for a specified amount of
5745   time.
5746
5747   """
5748   _OP_REQP = ["duration", "on_master", "on_nodes"]
5749   REQ_BGL = False
5750
5751   def ExpandNames(self):
5752     """Expand names and set required locks.
5753
5754     This expands the node list, if any.
5755
5756     """
5757     self.needed_locks = {}
5758     if self.op.on_nodes:
5759       # _GetWantedNodes can be used here, but is not always appropriate to use
5760       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5761       # more information.
5762       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5763       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5764
5765   def CheckPrereq(self):
5766     """Check prerequisites.
5767
5768     """
5769
5770   def Exec(self, feedback_fn):
5771     """Do the actual sleep.
5772
5773     """
5774     if self.op.on_master:
5775       if not utils.TestDelay(self.op.duration):
5776         raise errors.OpExecError("Error during master delay test")
5777     if self.op.on_nodes:
5778       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5779       if not result:
5780         raise errors.OpExecError("Complete failure from rpc call")
5781       for node, node_result in result.items():
5782         node_result.Raise()
5783         if not node_result.data:
5784           raise errors.OpExecError("Failure during rpc call to node %s,"
5785                                    " result: %s" % (node, node_result.data))
5786
5787
5788 class IAllocator(object):
5789   """IAllocator framework.
5790
5791   An IAllocator instance has three sets of attributes:
5792     - cfg that is needed to query the cluster
5793     - input data (all members of the _KEYS class attribute are required)
5794     - four buffer attributes (in|out_data|text), that represent the
5795       input (to the external script) in text and data structure format,
5796       and the output from it, again in two formats
5797     - the result variables from the script (success, info, nodes) for
5798       easy usage
5799
5800   """
5801   _ALLO_KEYS = [
5802     "mem_size", "disks", "disk_template",
5803     "os", "tags", "nics", "vcpus", "hypervisor",
5804     ]
5805   _RELO_KEYS = [
5806     "relocate_from",
5807     ]
5808
5809   def __init__(self, lu, mode, name, **kwargs):
5810     self.lu = lu
5811     # init buffer variables
5812     self.in_text = self.out_text = self.in_data = self.out_data = None
5813     # init all input fields so that pylint is happy
5814     self.mode = mode
5815     self.name = name
5816     self.mem_size = self.disks = self.disk_template = None
5817     self.os = self.tags = self.nics = self.vcpus = None
5818     self.relocate_from = None
5819     # computed fields
5820     self.required_nodes = None
5821     # init result fields
5822     self.success = self.info = self.nodes = None
5823     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5824       keyset = self._ALLO_KEYS
5825     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5826       keyset = self._RELO_KEYS
5827     else:
5828       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5829                                    " IAllocator" % self.mode)
5830     for key in kwargs:
5831       if key not in keyset:
5832         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5833                                      " IAllocator" % key)
5834       setattr(self, key, kwargs[key])
5835     for key in keyset:
5836       if key not in kwargs:
5837         raise errors.ProgrammerError("Missing input parameter '%s' to"
5838                                      " IAllocator" % key)
5839     self._BuildInputData()
5840
5841   def _ComputeClusterData(self):
5842     """Compute the generic allocator input data.
5843
5844     This is the data that is independent of the actual operation.
5845
5846     """
5847     cfg = self.lu.cfg
5848     cluster_info = cfg.GetClusterInfo()
5849     # cluster data
5850     data = {
5851       "version": 1,
5852       "cluster_name": cfg.GetClusterName(),
5853       "cluster_tags": list(cluster_info.GetTags()),
5854       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5855       # we don't have job IDs
5856       }
5857     iinfo = cfg.GetAllInstancesInfo().values()
5858     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5859
5860     # node data
5861     node_results = {}
5862     node_list = cfg.GetNodeList()
5863
5864     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5865       hypervisor = self.hypervisor
5866     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5867       hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5868
5869     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5870                                            hypervisor)
5871     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5872                        cluster_info.enabled_hypervisors)
5873     for nname in node_list:
5874       ninfo = cfg.GetNodeInfo(nname)
5875       node_data[nname].Raise()
5876       if not isinstance(node_data[nname].data, dict):
5877         raise errors.OpExecError("Can't get data for node %s" % nname)
5878       remote_info = node_data[nname].data
5879       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5880                    'vg_size', 'vg_free', 'cpu_total']:
5881         if attr not in remote_info:
5882           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5883                                    (nname, attr))
5884         try:
5885           remote_info[attr] = int(remote_info[attr])
5886         except ValueError, err:
5887           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5888                                    " %s" % (nname, attr, str(err)))
5889       # compute memory used by primary instances
5890       i_p_mem = i_p_up_mem = 0
5891       for iinfo, beinfo in i_list:
5892         if iinfo.primary_node == nname:
5893           i_p_mem += beinfo[constants.BE_MEMORY]
5894           if iinfo.name not in node_iinfo[nname]:
5895             i_used_mem = 0
5896           else:
5897             i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5898           i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5899           remote_info['memory_free'] -= max(0, i_mem_diff)
5900
5901           if iinfo.status == "up":
5902             i_p_up_mem += beinfo[constants.BE_MEMORY]
5903
5904       # compute memory used by instances
5905       pnr = {
5906         "tags": list(ninfo.GetTags()),
5907         "total_memory": remote_info['memory_total'],
5908         "reserved_memory": remote_info['memory_dom0'],
5909         "free_memory": remote_info['memory_free'],
5910         "i_pri_memory": i_p_mem,
5911         "i_pri_up_memory": i_p_up_mem,
5912         "total_disk": remote_info['vg_size'],
5913         "free_disk": remote_info['vg_free'],
5914         "primary_ip": ninfo.primary_ip,
5915         "secondary_ip": ninfo.secondary_ip,
5916         "total_cpus": remote_info['cpu_total'],
5917         }
5918       node_results[nname] = pnr
5919     data["nodes"] = node_results
5920
5921     # instance data
5922     instance_data = {}
5923     for iinfo, beinfo in i_list:
5924       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5925                   for n in iinfo.nics]
5926       pir = {
5927         "tags": list(iinfo.GetTags()),
5928         "should_run": iinfo.status == "up",
5929         "vcpus": beinfo[constants.BE_VCPUS],
5930         "memory": beinfo[constants.BE_MEMORY],
5931         "os": iinfo.os,
5932         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5933         "nics": nic_data,
5934         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5935         "disk_template": iinfo.disk_template,
5936         "hypervisor": iinfo.hypervisor,
5937         }
5938       instance_data[iinfo.name] = pir
5939
5940     data["instances"] = instance_data
5941
5942     self.in_data = data
5943
5944   def _AddNewInstance(self):
5945     """Add new instance data to allocator structure.
5946
5947     This in combination with _AllocatorGetClusterData will create the
5948     correct structure needed as input for the allocator.
5949
5950     The checks for the completeness of the opcode must have already been
5951     done.
5952
5953     """
5954     data = self.in_data
5955     if len(self.disks) != 2:
5956       raise errors.OpExecError("Only two-disk configurations supported")
5957
5958     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5959
5960     if self.disk_template in constants.DTS_NET_MIRROR:
5961       self.required_nodes = 2
5962     else:
5963       self.required_nodes = 1
5964     request = {
5965       "type": "allocate",
5966       "name": self.name,
5967       "disk_template": self.disk_template,
5968       "tags": self.tags,
5969       "os": self.os,
5970       "vcpus": self.vcpus,
5971       "memory": self.mem_size,
5972       "disks": self.disks,
5973       "disk_space_total": disk_space,
5974       "nics": self.nics,
5975       "required_nodes": self.required_nodes,
5976       }
5977     data["request"] = request
5978
5979   def _AddRelocateInstance(self):
5980     """Add relocate instance data to allocator structure.
5981
5982     This in combination with _IAllocatorGetClusterData will create the
5983     correct structure needed as input for the allocator.
5984
5985     The checks for the completeness of the opcode must have already been
5986     done.
5987
5988     """
5989     instance = self.lu.cfg.GetInstanceInfo(self.name)
5990     if instance is None:
5991       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5992                                    " IAllocator" % self.name)
5993
5994     if instance.disk_template not in constants.DTS_NET_MIRROR:
5995       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5996
5997     if len(instance.secondary_nodes) != 1:
5998       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5999
6000     self.required_nodes = 1
6001     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6002     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6003
6004     request = {
6005       "type": "relocate",
6006       "name": self.name,
6007       "disk_space_total": disk_space,
6008       "required_nodes": self.required_nodes,
6009       "relocate_from": self.relocate_from,
6010       }
6011     self.in_data["request"] = request
6012
6013   def _BuildInputData(self):
6014     """Build input data structures.
6015
6016     """
6017     self._ComputeClusterData()
6018
6019     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6020       self._AddNewInstance()
6021     else:
6022       self._AddRelocateInstance()
6023
6024     self.in_text = serializer.Dump(self.in_data)
6025
6026   def Run(self, name, validate=True, call_fn=None):
6027     """Run an instance allocator and return the results.
6028
6029     """
6030     if call_fn is None:
6031       call_fn = self.lu.rpc.call_iallocator_runner
6032     data = self.in_text
6033
6034     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6035     result.Raise()
6036
6037     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6038       raise errors.OpExecError("Invalid result from master iallocator runner")
6039
6040     rcode, stdout, stderr, fail = result.data
6041
6042     if rcode == constants.IARUN_NOTFOUND:
6043       raise errors.OpExecError("Can't find allocator '%s'" % name)
6044     elif rcode == constants.IARUN_FAILURE:
6045       raise errors.OpExecError("Instance allocator call failed: %s,"
6046                                " output: %s" % (fail, stdout+stderr))
6047     self.out_text = stdout
6048     if validate:
6049       self._ValidateResult()
6050
6051   def _ValidateResult(self):
6052     """Process the allocator results.
6053
6054     This will process and if successful save the result in
6055     self.out_data and the other parameters.
6056
6057     """
6058     try:
6059       rdict = serializer.Load(self.out_text)
6060     except Exception, err:
6061       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6062
6063     if not isinstance(rdict, dict):
6064       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6065
6066     for key in "success", "info", "nodes":
6067       if key not in rdict:
6068         raise errors.OpExecError("Can't parse iallocator results:"
6069                                  " missing key '%s'" % key)
6070       setattr(self, key, rdict[key])
6071
6072     if not isinstance(rdict["nodes"], list):
6073       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6074                                " is not a list")
6075     self.out_data = rdict
6076
6077
6078 class LUTestAllocator(NoHooksLU):
6079   """Run allocator tests.
6080
6081   This LU runs the allocator tests
6082
6083   """
6084   _OP_REQP = ["direction", "mode", "name"]
6085
6086   def CheckPrereq(self):
6087     """Check prerequisites.
6088
6089     This checks the opcode parameters depending on the director and mode test.
6090
6091     """
6092     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6093       for attr in ["name", "mem_size", "disks", "disk_template",
6094                    "os", "tags", "nics", "vcpus"]:
6095         if not hasattr(self.op, attr):
6096           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6097                                      attr)
6098       iname = self.cfg.ExpandInstanceName(self.op.name)
6099       if iname is not None:
6100         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6101                                    iname)
6102       if not isinstance(self.op.nics, list):
6103         raise errors.OpPrereqError("Invalid parameter 'nics'")
6104       for row in self.op.nics:
6105         if (not isinstance(row, dict) or
6106             "mac" not in row or
6107             "ip" not in row or
6108             "bridge" not in row):
6109           raise errors.OpPrereqError("Invalid contents of the"
6110                                      " 'nics' parameter")
6111       if not isinstance(self.op.disks, list):
6112         raise errors.OpPrereqError("Invalid parameter 'disks'")
6113       if len(self.op.disks) != 2:
6114         raise errors.OpPrereqError("Only two-disk configurations supported")
6115       for row in self.op.disks:
6116         if (not isinstance(row, dict) or
6117             "size" not in row or
6118             not isinstance(row["size"], int) or
6119             "mode" not in row or
6120             row["mode"] not in ['r', 'w']):
6121           raise errors.OpPrereqError("Invalid contents of the"
6122                                      " 'disks' parameter")
6123       if self.op.hypervisor is None:
6124         self.op.hypervisor = self.cfg.GetHypervisorType()
6125     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6126       if not hasattr(self.op, "name"):
6127         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6128       fname = self.cfg.ExpandInstanceName(self.op.name)
6129       if fname is None:
6130         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6131                                    self.op.name)
6132       self.op.name = fname
6133       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6134     else:
6135       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6136                                  self.op.mode)
6137
6138     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6139       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6140         raise errors.OpPrereqError("Missing allocator name")
6141     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6142       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6143                                  self.op.direction)
6144
6145   def Exec(self, feedback_fn):
6146     """Run the allocator test.
6147
6148     """
6149     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6150       ial = IAllocator(self,
6151                        mode=self.op.mode,
6152                        name=self.op.name,
6153                        mem_size=self.op.mem_size,
6154                        disks=self.op.disks,
6155                        disk_template=self.op.disk_template,
6156                        os=self.op.os,
6157                        tags=self.op.tags,
6158                        nics=self.op.nics,
6159                        vcpus=self.op.vcpus,
6160                        hypervisor=self.op.hypervisor,
6161                        )
6162     else:
6163       ial = IAllocator(self,
6164                        mode=self.op.mode,
6165                        name=self.op.name,
6166                        relocate_from=list(self.relocate_from),
6167                        )
6168
6169     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6170       result = ial.in_text
6171     else:
6172       ial.Run(self.op.allocator, validate=False)
6173       result = ial.out_text
6174     return result