LUAddNode: Check the correct result
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = lu.cfg.GetInstanceList()
396   return utils.NiceSort(wanted)
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
419                           memory, vcpus, nics):
420   """Builds instance related env variables for hooks
421
422   This builds the hook environment from individual variables.
423
424   @type name: string
425   @param name: the name of the instance
426   @type primary_node: string
427   @param primary_node: the name of the instance's primary node
428   @type secondary_nodes: list
429   @param secondary_nodes: list of secondary nodes as strings
430   @type os_type: string
431   @param os_type: the name of the instance's OS
432   @type status: string
433   @param status: the desired status of the instances
434   @type memory: string
435   @param memory: the memory size of the instance
436   @type vcpus: string
437   @param vcpus: the count of VCPUs the instance has
438   @type nics: list
439   @param nics: list of tuples (ip, bridge, mac) representing
440       the NICs the instance  has
441   @rtype: dict
442   @return: the hook environment for this instance
443
444   """
445   env = {
446     "OP_TARGET": name,
447     "INSTANCE_NAME": name,
448     "INSTANCE_PRIMARY": primary_node,
449     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
450     "INSTANCE_OS_TYPE": os_type,
451     "INSTANCE_STATUS": status,
452     "INSTANCE_MEMORY": memory,
453     "INSTANCE_VCPUS": vcpus,
454   }
455
456   if nics:
457     nic_count = len(nics)
458     for idx, (ip, bridge, mac) in enumerate(nics):
459       if ip is None:
460         ip = ""
461       env["INSTANCE_NIC%d_IP" % idx] = ip
462       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
463       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
464   else:
465     nic_count = 0
466
467   env["INSTANCE_NIC_COUNT"] = nic_count
468
469   return env
470
471
472 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
473   """Builds instance related env variables for hooks from an object.
474
475   @type lu: L{LogicalUnit}
476   @param lu: the logical unit on whose behalf we execute
477   @type instance: L{objects.Instance}
478   @param instance: the instance for which we should build the
479       environment
480   @type override: dict
481   @param override: dictionary with key/values that will override
482       our values
483   @rtype: dict
484   @return: the hook environment dictionary
485
486   """
487   bep = lu.cfg.GetClusterInfo().FillBE(instance)
488   args = {
489     'name': instance.name,
490     'primary_node': instance.primary_node,
491     'secondary_nodes': instance.secondary_nodes,
492     'os_type': instance.os,
493     'status': instance.os,
494     'memory': bep[constants.BE_MEMORY],
495     'vcpus': bep[constants.BE_VCPUS],
496     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
497   }
498   if override:
499     args.update(override)
500   return _BuildInstanceHookEnv(**args)
501
502
503 def _CheckInstanceBridgesExist(lu, instance):
504   """Check that the brigdes needed by an instance exist.
505
506   """
507   # check bridges existance
508   brlist = [nic.bridge for nic in instance.nics]
509   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
510   result.Raise()
511   if not result.data:
512     raise errors.OpPrereqError("One or more target bridges %s does not"
513                                " exist on destination node '%s'" %
514                                (brlist, instance.primary_node))
515
516
517 class LUDestroyCluster(NoHooksLU):
518   """Logical unit for destroying the cluster.
519
520   """
521   _OP_REQP = []
522
523   def CheckPrereq(self):
524     """Check prerequisites.
525
526     This checks whether the cluster is empty.
527
528     Any errors are signalled by raising errors.OpPrereqError.
529
530     """
531     master = self.cfg.GetMasterNode()
532
533     nodelist = self.cfg.GetNodeList()
534     if len(nodelist) != 1 or nodelist[0] != master:
535       raise errors.OpPrereqError("There are still %d node(s) in"
536                                  " this cluster." % (len(nodelist) - 1))
537     instancelist = self.cfg.GetInstanceList()
538     if instancelist:
539       raise errors.OpPrereqError("There are still %d instance(s) in"
540                                  " this cluster." % len(instancelist))
541
542   def Exec(self, feedback_fn):
543     """Destroys the cluster.
544
545     """
546     master = self.cfg.GetMasterNode()
547     result = self.rpc.call_node_stop_master(master, False)
548     result.Raise()
549     if not result.data:
550       raise errors.OpExecError("Could not disable the master role")
551     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
552     utils.CreateBackup(priv_key)
553     utils.CreateBackup(pub_key)
554     return master
555
556
557 class LUVerifyCluster(LogicalUnit):
558   """Verifies the cluster status.
559
560   """
561   HPATH = "cluster-verify"
562   HTYPE = constants.HTYPE_CLUSTER
563   _OP_REQP = ["skip_checks"]
564   REQ_BGL = False
565
566   def ExpandNames(self):
567     self.needed_locks = {
568       locking.LEVEL_NODE: locking.ALL_SET,
569       locking.LEVEL_INSTANCE: locking.ALL_SET,
570     }
571     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
572
573   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
574                   node_result, feedback_fn, master_files):
575     """Run multiple tests against a node.
576
577     Test list:
578
579       - compares ganeti version
580       - checks vg existance and size > 20G
581       - checks config file checksum
582       - checks ssh to other nodes
583
584     @type nodeinfo: L{objects.Node}
585     @param nodeinfo: the node to check
586     @param file_list: required list of files
587     @param local_cksum: dictionary of local files and their checksums
588     @param node_result: the results from the node
589     @param feedback_fn: function used to accumulate results
590     @param master_files: list of files that only masters should have
591
592     """
593     node = nodeinfo.name
594
595     # main result, node_result should be a non-empty dict
596     if not node_result or not isinstance(node_result, dict):
597       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
598       return True
599
600     # compares ganeti version
601     local_version = constants.PROTOCOL_VERSION
602     remote_version = node_result.get('version', None)
603     if not remote_version:
604       feedback_fn("  - ERROR: connection to %s failed" % (node))
605       return True
606
607     if local_version != remote_version:
608       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
609                       (local_version, node, remote_version))
610       return True
611
612     # checks vg existance and size > 20G
613
614     bad = False
615     vglist = node_result.get(constants.NV_VGLIST, None)
616     if not vglist:
617       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
618                       (node,))
619       bad = True
620     else:
621       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
622                                             constants.MIN_VG_SIZE)
623       if vgstatus:
624         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
625         bad = True
626
627     # checks config file checksum
628
629     remote_cksum = node_result.get(constants.NV_FILELIST, None)
630     if not isinstance(remote_cksum, dict):
631       bad = True
632       feedback_fn("  - ERROR: node hasn't returned file checksum data")
633     else:
634       for file_name in file_list:
635         node_is_mc = nodeinfo.master_candidate
636         must_have_file = file_name not in master_files
637         if file_name not in remote_cksum:
638           if node_is_mc or must_have_file:
639             bad = True
640             feedback_fn("  - ERROR: file '%s' missing" % file_name)
641         elif remote_cksum[file_name] != local_cksum[file_name]:
642           if node_is_mc or must_have_file:
643             bad = True
644             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
645           else:
646             # not candidate and this is not a must-have file
647             bad = True
648             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
649                         " '%s'" % file_name)
650         else:
651           # all good, except non-master/non-must have combination
652           if not node_is_mc and not must_have_file:
653             feedback_fn("  - ERROR: file '%s' should not exist on non master"
654                         " candidates" % file_name)
655
656     # checks ssh to any
657
658     if constants.NV_NODELIST not in node_result:
659       bad = True
660       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
661     else:
662       if node_result[constants.NV_NODELIST]:
663         bad = True
664         for node in node_result[constants.NV_NODELIST]:
665           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
666                           (node, node_result[constants.NV_NODELIST][node]))
667
668     if constants.NV_NODENETTEST not in node_result:
669       bad = True
670       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
671     else:
672       if node_result[constants.NV_NODENETTEST]:
673         bad = True
674         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
675         for node in nlist:
676           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
677                           (node, node_result[constants.NV_NODENETTEST][node]))
678
679     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
680     if isinstance(hyp_result, dict):
681       for hv_name, hv_result in hyp_result.iteritems():
682         if hv_result is not None:
683           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
684                       (hv_name, hv_result))
685     return bad
686
687   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688                       node_instance, feedback_fn):
689     """Verify an instance.
690
691     This function checks to see if the required block devices are
692     available on the instance's node.
693
694     """
695     bad = False
696
697     node_current = instanceconfig.primary_node
698
699     node_vol_should = {}
700     instanceconfig.MapLVsByNode(node_vol_should)
701
702     for node in node_vol_should:
703       for volume in node_vol_should[node]:
704         if node not in node_vol_is or volume not in node_vol_is[node]:
705           feedback_fn("  - ERROR: volume %s missing on node %s" %
706                           (volume, node))
707           bad = True
708
709     if not instanceconfig.status == 'down':
710       if (node_current not in node_instance or
711           not instance in node_instance[node_current]):
712         feedback_fn("  - ERROR: instance %s not running on node %s" %
713                         (instance, node_current))
714         bad = True
715
716     for node in node_instance:
717       if (not node == node_current):
718         if instance in node_instance[node]:
719           feedback_fn("  - ERROR: instance %s should not run on node %s" %
720                           (instance, node))
721           bad = True
722
723     return bad
724
725   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726     """Verify if there are any unknown volumes in the cluster.
727
728     The .os, .swap and backup volumes are ignored. All other volumes are
729     reported as unknown.
730
731     """
732     bad = False
733
734     for node in node_vol_is:
735       for volume in node_vol_is[node]:
736         if node not in node_vol_should or volume not in node_vol_should[node]:
737           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738                       (volume, node))
739           bad = True
740     return bad
741
742   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743     """Verify the list of running instances.
744
745     This checks what instances are running but unknown to the cluster.
746
747     """
748     bad = False
749     for node in node_instance:
750       for runninginstance in node_instance[node]:
751         if runninginstance not in instancelist:
752           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753                           (runninginstance, node))
754           bad = True
755     return bad
756
757   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758     """Verify N+1 Memory Resilience.
759
760     Check that if one single node dies we can still start all the instances it
761     was primary for.
762
763     """
764     bad = False
765
766     for node, nodeinfo in node_info.iteritems():
767       # This code checks that every node which is now listed as secondary has
768       # enough memory to host all instances it is supposed to should a single
769       # other node in the cluster fail.
770       # FIXME: not ready for failover to an arbitrary node
771       # FIXME: does not support file-backed instances
772       # WARNING: we currently take into account down instances as well as up
773       # ones, considering that even if they're down someone might want to start
774       # them even in the event of a node failure.
775       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
776         needed_mem = 0
777         for instance in instances:
778           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
779           if bep[constants.BE_AUTO_BALANCE]:
780             needed_mem += bep[constants.BE_MEMORY]
781         if nodeinfo['mfree'] < needed_mem:
782           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
783                       " failovers should node %s fail" % (node, prinode))
784           bad = True
785     return bad
786
787   def CheckPrereq(self):
788     """Check prerequisites.
789
790     Transform the list of checks we're going to skip into a set and check that
791     all its members are valid.
792
793     """
794     self.skip_set = frozenset(self.op.skip_checks)
795     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
796       raise errors.OpPrereqError("Invalid checks to be skipped specified")
797
798   def BuildHooksEnv(self):
799     """Build hooks env.
800
801     Cluster-Verify hooks just rone in the post phase and their failure makes
802     the output be logged in the verify output and the verification to fail.
803
804     """
805     all_nodes = self.cfg.GetNodeList()
806     # TODO: populate the environment with useful information for verify hooks
807     env = {}
808     return env, [], all_nodes
809
810   def Exec(self, feedback_fn):
811     """Verify integrity of cluster, performing various test on nodes.
812
813     """
814     bad = False
815     feedback_fn("* Verifying global settings")
816     for msg in self.cfg.VerifyConfig():
817       feedback_fn("  - ERROR: %s" % msg)
818
819     vg_name = self.cfg.GetVGName()
820     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
821     nodelist = utils.NiceSort(self.cfg.GetNodeList())
822     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
823     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
824     i_non_redundant = [] # Non redundant instances
825     i_non_a_balanced = [] # Non auto-balanced instances
826     node_volume = {}
827     node_instance = {}
828     node_info = {}
829     instance_cfg = {}
830
831     # FIXME: verify OS list
832     # do local checksums
833     master_files = [constants.CLUSTER_CONF_FILE]
834
835     file_names = ssconf.SimpleStore().GetFileList()
836     file_names.append(constants.SSL_CERT_FILE)
837     file_names.extend(master_files)
838
839     local_checksums = utils.FingerprintFiles(file_names)
840
841     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
842     node_verify_param = {
843       constants.NV_FILELIST: file_names,
844       constants.NV_NODELIST: nodelist,
845       constants.NV_HYPERVISOR: hypervisors,
846       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
847                                   node.secondary_ip) for node in nodeinfo],
848       constants.NV_LVLIST: vg_name,
849       constants.NV_INSTANCELIST: hypervisors,
850       constants.NV_VGLIST: None,
851       constants.NV_VERSION: None,
852       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
853       }
854     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
855                                            self.cfg.GetClusterName())
856
857     cluster = self.cfg.GetClusterInfo()
858     master_node = self.cfg.GetMasterNode()
859     for node_i in nodeinfo:
860       node = node_i.name
861       nresult = all_nvinfo[node].data
862
863       if node == master_node:
864         ntype = "master"
865       elif node_i.master_candidate:
866         ntype = "master candidate"
867       else:
868         ntype = "regular"
869       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
870
871       if all_nvinfo[node].failed or not isinstance(nresult, dict):
872         feedback_fn("  - ERROR: connection to %s failed" % (node,))
873         bad = True
874         continue
875
876       result = self._VerifyNode(node_i, file_names, local_checksums,
877                                 nresult, feedback_fn, master_files)
878       bad = bad or result
879
880       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
881       if isinstance(lvdata, basestring):
882         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
883                     (node, lvdata.encode('string_escape')))
884         bad = True
885         node_volume[node] = {}
886       elif not isinstance(lvdata, dict):
887         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
888         bad = True
889         continue
890       else:
891         node_volume[node] = lvdata
892
893       # node_instance
894       idata = nresult.get(constants.NV_INSTANCELIST, None)
895       if not isinstance(idata, list):
896         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
897                     (node,))
898         bad = True
899         continue
900
901       node_instance[node] = idata
902
903       # node_info
904       nodeinfo = nresult.get(constants.NV_HVINFO, None)
905       if not isinstance(nodeinfo, dict):
906         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
907         bad = True
908         continue
909
910       try:
911         node_info[node] = {
912           "mfree": int(nodeinfo['memory_free']),
913           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
914           "pinst": [],
915           "sinst": [],
916           # dictionary holding all instances this node is secondary for,
917           # grouped by their primary node. Each key is a cluster node, and each
918           # value is a list of instances which have the key as primary and the
919           # current node as secondary.  this is handy to calculate N+1 memory
920           # availability if you can only failover from a primary to its
921           # secondary.
922           "sinst-by-pnode": {},
923         }
924       except ValueError:
925         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
926         bad = True
927         continue
928
929     node_vol_should = {}
930
931     for instance in instancelist:
932       feedback_fn("* Verifying instance %s" % instance)
933       inst_config = self.cfg.GetInstanceInfo(instance)
934       result =  self._VerifyInstance(instance, inst_config, node_volume,
935                                      node_instance, feedback_fn)
936       bad = bad or result
937
938       inst_config.MapLVsByNode(node_vol_should)
939
940       instance_cfg[instance] = inst_config
941
942       pnode = inst_config.primary_node
943       if pnode in node_info:
944         node_info[pnode]['pinst'].append(instance)
945       else:
946         feedback_fn("  - ERROR: instance %s, connection to primary node"
947                     " %s failed" % (instance, pnode))
948         bad = True
949
950       # If the instance is non-redundant we cannot survive losing its primary
951       # node, so we are not N+1 compliant. On the other hand we have no disk
952       # templates with more than one secondary so that situation is not well
953       # supported either.
954       # FIXME: does not support file-backed instances
955       if len(inst_config.secondary_nodes) == 0:
956         i_non_redundant.append(instance)
957       elif len(inst_config.secondary_nodes) > 1:
958         feedback_fn("  - WARNING: multiple secondaries for instance %s"
959                     % instance)
960
961       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
962         i_non_a_balanced.append(instance)
963
964       for snode in inst_config.secondary_nodes:
965         if snode in node_info:
966           node_info[snode]['sinst'].append(instance)
967           if pnode not in node_info[snode]['sinst-by-pnode']:
968             node_info[snode]['sinst-by-pnode'][pnode] = []
969           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
970         else:
971           feedback_fn("  - ERROR: instance %s, connection to secondary node"
972                       " %s failed" % (instance, snode))
973
974     feedback_fn("* Verifying orphan volumes")
975     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
976                                        feedback_fn)
977     bad = bad or result
978
979     feedback_fn("* Verifying remaining instances")
980     result = self._VerifyOrphanInstances(instancelist, node_instance,
981                                          feedback_fn)
982     bad = bad or result
983
984     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
985       feedback_fn("* Verifying N+1 Memory redundancy")
986       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
987       bad = bad or result
988
989     feedback_fn("* Other Notes")
990     if i_non_redundant:
991       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
992                   % len(i_non_redundant))
993
994     if i_non_a_balanced:
995       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
996                   % len(i_non_a_balanced))
997
998     return not bad
999
1000   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1001     """Analize the post-hooks' result
1002
1003     This method analyses the hook result, handles it, and sends some
1004     nicely-formatted feedback back to the user.
1005
1006     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1007         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1008     @param hooks_results: the results of the multi-node hooks rpc call
1009     @param feedback_fn: function used send feedback back to the caller
1010     @param lu_result: previous Exec result
1011     @return: the new Exec result, based on the previous result
1012         and hook results
1013
1014     """
1015     # We only really run POST phase hooks, and are only interested in
1016     # their results
1017     if phase == constants.HOOKS_PHASE_POST:
1018       # Used to change hooks' output to proper indentation
1019       indent_re = re.compile('^', re.M)
1020       feedback_fn("* Hooks Results")
1021       if not hooks_results:
1022         feedback_fn("  - ERROR: general communication failure")
1023         lu_result = 1
1024       else:
1025         for node_name in hooks_results:
1026           show_node_header = True
1027           res = hooks_results[node_name]
1028           if res.failed or res.data is False or not isinstance(res.data, list):
1029             feedback_fn("    Communication failure in hooks execution")
1030             lu_result = 1
1031             continue
1032           for script, hkr, output in res.data:
1033             if hkr == constants.HKR_FAIL:
1034               # The node header is only shown once, if there are
1035               # failing hooks on that node
1036               if show_node_header:
1037                 feedback_fn("  Node %s:" % node_name)
1038                 show_node_header = False
1039               feedback_fn("    ERROR: Script %s failed, output:" % script)
1040               output = indent_re.sub('      ', output)
1041               feedback_fn("%s" % output)
1042               lu_result = 1
1043
1044       return lu_result
1045
1046
1047 class LUVerifyDisks(NoHooksLU):
1048   """Verifies the cluster disks status.
1049
1050   """
1051   _OP_REQP = []
1052   REQ_BGL = False
1053
1054   def ExpandNames(self):
1055     self.needed_locks = {
1056       locking.LEVEL_NODE: locking.ALL_SET,
1057       locking.LEVEL_INSTANCE: locking.ALL_SET,
1058     }
1059     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1060
1061   def CheckPrereq(self):
1062     """Check prerequisites.
1063
1064     This has no prerequisites.
1065
1066     """
1067     pass
1068
1069   def Exec(self, feedback_fn):
1070     """Verify integrity of cluster disks.
1071
1072     """
1073     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1074
1075     vg_name = self.cfg.GetVGName()
1076     nodes = utils.NiceSort(self.cfg.GetNodeList())
1077     instances = [self.cfg.GetInstanceInfo(name)
1078                  for name in self.cfg.GetInstanceList()]
1079
1080     nv_dict = {}
1081     for inst in instances:
1082       inst_lvs = {}
1083       if (inst.status != "up" or
1084           inst.disk_template not in constants.DTS_NET_MIRROR):
1085         continue
1086       inst.MapLVsByNode(inst_lvs)
1087       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1088       for node, vol_list in inst_lvs.iteritems():
1089         for vol in vol_list:
1090           nv_dict[(node, vol)] = inst
1091
1092     if not nv_dict:
1093       return result
1094
1095     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1096
1097     to_act = set()
1098     for node in nodes:
1099       # node_volume
1100       lvs = node_lvs[node]
1101       if lvs.failed:
1102         self.LogWarning("Connection to node %s failed: %s" %
1103                         (node, lvs.data))
1104         continue
1105       lvs = lvs.data
1106       if isinstance(lvs, basestring):
1107         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1108         res_nlvm[node] = lvs
1109       elif not isinstance(lvs, dict):
1110         logging.warning("Connection to node %s failed or invalid data"
1111                         " returned", node)
1112         res_nodes.append(node)
1113         continue
1114
1115       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1116         inst = nv_dict.pop((node, lv_name), None)
1117         if (not lv_online and inst is not None
1118             and inst.name not in res_instances):
1119           res_instances.append(inst.name)
1120
1121     # any leftover items in nv_dict are missing LVs, let's arrange the
1122     # data better
1123     for key, inst in nv_dict.iteritems():
1124       if inst.name not in res_missing:
1125         res_missing[inst.name] = []
1126       res_missing[inst.name].append(key)
1127
1128     return result
1129
1130
1131 class LURenameCluster(LogicalUnit):
1132   """Rename the cluster.
1133
1134   """
1135   HPATH = "cluster-rename"
1136   HTYPE = constants.HTYPE_CLUSTER
1137   _OP_REQP = ["name"]
1138
1139   def BuildHooksEnv(self):
1140     """Build hooks env.
1141
1142     """
1143     env = {
1144       "OP_TARGET": self.cfg.GetClusterName(),
1145       "NEW_NAME": self.op.name,
1146       }
1147     mn = self.cfg.GetMasterNode()
1148     return env, [mn], [mn]
1149
1150   def CheckPrereq(self):
1151     """Verify that the passed name is a valid one.
1152
1153     """
1154     hostname = utils.HostInfo(self.op.name)
1155
1156     new_name = hostname.name
1157     self.ip = new_ip = hostname.ip
1158     old_name = self.cfg.GetClusterName()
1159     old_ip = self.cfg.GetMasterIP()
1160     if new_name == old_name and new_ip == old_ip:
1161       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1162                                  " cluster has changed")
1163     if new_ip != old_ip:
1164       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1165         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1166                                    " reachable on the network. Aborting." %
1167                                    new_ip)
1168
1169     self.op.name = new_name
1170
1171   def Exec(self, feedback_fn):
1172     """Rename the cluster.
1173
1174     """
1175     clustername = self.op.name
1176     ip = self.ip
1177
1178     # shutdown the master IP
1179     master = self.cfg.GetMasterNode()
1180     result = self.rpc.call_node_stop_master(master, False)
1181     if result.failed or not result.data:
1182       raise errors.OpExecError("Could not disable the master role")
1183
1184     try:
1185       cluster = self.cfg.GetClusterInfo()
1186       cluster.cluster_name = clustername
1187       cluster.master_ip = ip
1188       self.cfg.Update(cluster)
1189
1190       # update the known hosts file
1191       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1192       node_list = self.cfg.GetNodeList()
1193       try:
1194         node_list.remove(master)
1195       except ValueError:
1196         pass
1197       result = self.rpc.call_upload_file(node_list,
1198                                          constants.SSH_KNOWN_HOSTS_FILE)
1199       for to_node, to_result in result.iteritems():
1200         if to_result.failed or not to_result.data:
1201           logging.error("Copy of file %s to node %s failed", fname, to_node)
1202
1203     finally:
1204       result = self.rpc.call_node_start_master(master, False)
1205       if result.failed or not result.data:
1206         self.LogWarning("Could not re-enable the master role on"
1207                         " the master, please restart manually.")
1208
1209
1210 def _RecursiveCheckIfLVMBased(disk):
1211   """Check if the given disk or its children are lvm-based.
1212
1213   @type disk: L{objects.Disk}
1214   @param disk: the disk to check
1215   @rtype: booleean
1216   @return: boolean indicating whether a LD_LV dev_type was found or not
1217
1218   """
1219   if disk.children:
1220     for chdisk in disk.children:
1221       if _RecursiveCheckIfLVMBased(chdisk):
1222         return True
1223   return disk.dev_type == constants.LD_LV
1224
1225
1226 class LUSetClusterParams(LogicalUnit):
1227   """Change the parameters of the cluster.
1228
1229   """
1230   HPATH = "cluster-modify"
1231   HTYPE = constants.HTYPE_CLUSTER
1232   _OP_REQP = []
1233   REQ_BGL = False
1234
1235   def CheckParameters(self):
1236     """Check parameters
1237
1238     """
1239     if not hasattr(self.op, "candidate_pool_size"):
1240       self.op.candidate_pool_size = None
1241     if self.op.candidate_pool_size is not None:
1242       try:
1243         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1244       except ValueError, err:
1245         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1246                                    str(err))
1247       if self.op.candidate_pool_size < 1:
1248         raise errors.OpPrereqError("At least one master candidate needed")
1249
1250   def ExpandNames(self):
1251     # FIXME: in the future maybe other cluster params won't require checking on
1252     # all nodes to be modified.
1253     self.needed_locks = {
1254       locking.LEVEL_NODE: locking.ALL_SET,
1255     }
1256     self.share_locks[locking.LEVEL_NODE] = 1
1257
1258   def BuildHooksEnv(self):
1259     """Build hooks env.
1260
1261     """
1262     env = {
1263       "OP_TARGET": self.cfg.GetClusterName(),
1264       "NEW_VG_NAME": self.op.vg_name,
1265       }
1266     mn = self.cfg.GetMasterNode()
1267     return env, [mn], [mn]
1268
1269   def CheckPrereq(self):
1270     """Check prerequisites.
1271
1272     This checks whether the given params don't conflict and
1273     if the given volume group is valid.
1274
1275     """
1276     # FIXME: This only works because there is only one parameter that can be
1277     # changed or removed.
1278     if self.op.vg_name is not None and not self.op.vg_name:
1279       instances = self.cfg.GetAllInstancesInfo().values()
1280       for inst in instances:
1281         for disk in inst.disks:
1282           if _RecursiveCheckIfLVMBased(disk):
1283             raise errors.OpPrereqError("Cannot disable lvm storage while"
1284                                        " lvm-based instances exist")
1285
1286     node_list = self.acquired_locks[locking.LEVEL_NODE]
1287
1288     # if vg_name not None, checks given volume group on all nodes
1289     if self.op.vg_name:
1290       vglist = self.rpc.call_vg_list(node_list)
1291       for node in node_list:
1292         if vglist[node].failed:
1293           # ignoring down node
1294           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1295           continue
1296         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1297                                               self.op.vg_name,
1298                                               constants.MIN_VG_SIZE)
1299         if vgstatus:
1300           raise errors.OpPrereqError("Error on node '%s': %s" %
1301                                      (node, vgstatus))
1302
1303     self.cluster = cluster = self.cfg.GetClusterInfo()
1304     # validate beparams changes
1305     if self.op.beparams:
1306       utils.CheckBEParams(self.op.beparams)
1307       self.new_beparams = cluster.FillDict(
1308         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1309
1310     # hypervisor list/parameters
1311     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1312     if self.op.hvparams:
1313       if not isinstance(self.op.hvparams, dict):
1314         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1315       for hv_name, hv_dict in self.op.hvparams.items():
1316         if hv_name not in self.new_hvparams:
1317           self.new_hvparams[hv_name] = hv_dict
1318         else:
1319           self.new_hvparams[hv_name].update(hv_dict)
1320
1321     if self.op.enabled_hypervisors is not None:
1322       self.hv_list = self.op.enabled_hypervisors
1323     else:
1324       self.hv_list = cluster.enabled_hypervisors
1325
1326     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1327       # either the enabled list has changed, or the parameters have, validate
1328       for hv_name, hv_params in self.new_hvparams.items():
1329         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1330             (self.op.enabled_hypervisors and
1331              hv_name in self.op.enabled_hypervisors)):
1332           # either this is a new hypervisor, or its parameters have changed
1333           hv_class = hypervisor.GetHypervisor(hv_name)
1334           hv_class.CheckParameterSyntax(hv_params)
1335           _CheckHVParams(self, node_list, hv_name, hv_params)
1336
1337   def Exec(self, feedback_fn):
1338     """Change the parameters of the cluster.
1339
1340     """
1341     if self.op.vg_name is not None:
1342       if self.op.vg_name != self.cfg.GetVGName():
1343         self.cfg.SetVGName(self.op.vg_name)
1344       else:
1345         feedback_fn("Cluster LVM configuration already in desired"
1346                     " state, not changing")
1347     if self.op.hvparams:
1348       self.cluster.hvparams = self.new_hvparams
1349     if self.op.enabled_hypervisors is not None:
1350       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1351     if self.op.beparams:
1352       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1353     if self.op.candidate_pool_size is not None:
1354       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1355
1356     self.cfg.Update(self.cluster)
1357
1358     # we want to update nodes after the cluster so that if any errors
1359     # happen, we have recorded and saved the cluster info
1360     if self.op.candidate_pool_size is not None:
1361       node_info = self.cfg.GetAllNodesInfo().values()
1362       num_candidates = len([node for node in node_info
1363                             if node.master_candidate])
1364       num_nodes = len(node_info)
1365       if num_candidates < self.op.candidate_pool_size:
1366         random.shuffle(node_info)
1367         for node in node_info:
1368           if num_candidates >= self.op.candidate_pool_size:
1369             break
1370           if node.master_candidate:
1371             continue
1372           node.master_candidate = True
1373           self.LogInfo("Promoting node %s to master candidate", node.name)
1374           self.cfg.Update(node)
1375           self.context.ReaddNode(node)
1376           num_candidates += 1
1377       elif num_candidates > self.op.candidate_pool_size:
1378         self.LogInfo("Note: more nodes are candidates (%d) than the new value"
1379                      " of candidate_pool_size (%d)" %
1380                      (num_candidates, self.op.candidate_pool_size))
1381
1382
1383 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1384   """Sleep and poll for an instance's disk to sync.
1385
1386   """
1387   if not instance.disks:
1388     return True
1389
1390   if not oneshot:
1391     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1392
1393   node = instance.primary_node
1394
1395   for dev in instance.disks:
1396     lu.cfg.SetDiskID(dev, node)
1397
1398   retries = 0
1399   while True:
1400     max_time = 0
1401     done = True
1402     cumul_degraded = False
1403     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1404     if rstats.failed or not rstats.data:
1405       lu.LogWarning("Can't get any data from node %s", node)
1406       retries += 1
1407       if retries >= 10:
1408         raise errors.RemoteError("Can't contact node %s for mirror data,"
1409                                  " aborting." % node)
1410       time.sleep(6)
1411       continue
1412     rstats = rstats.data
1413     retries = 0
1414     for i in range(len(rstats)):
1415       mstat = rstats[i]
1416       if mstat is None:
1417         lu.LogWarning("Can't compute data for node %s/%s",
1418                            node, instance.disks[i].iv_name)
1419         continue
1420       # we ignore the ldisk parameter
1421       perc_done, est_time, is_degraded, _ = mstat
1422       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1423       if perc_done is not None:
1424         done = False
1425         if est_time is not None:
1426           rem_time = "%d estimated seconds remaining" % est_time
1427           max_time = est_time
1428         else:
1429           rem_time = "no time estimate"
1430         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1431                         (instance.disks[i].iv_name, perc_done, rem_time))
1432     if done or oneshot:
1433       break
1434
1435     time.sleep(min(60, max_time))
1436
1437   if done:
1438     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1439   return not cumul_degraded
1440
1441
1442 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1443   """Check that mirrors are not degraded.
1444
1445   The ldisk parameter, if True, will change the test from the
1446   is_degraded attribute (which represents overall non-ok status for
1447   the device(s)) to the ldisk (representing the local storage status).
1448
1449   """
1450   lu.cfg.SetDiskID(dev, node)
1451   if ldisk:
1452     idx = 6
1453   else:
1454     idx = 5
1455
1456   result = True
1457   if on_primary or dev.AssembleOnSecondary():
1458     rstats = lu.rpc.call_blockdev_find(node, dev)
1459     if rstats.failed or not rstats.data:
1460       logging.warning("Node %s: disk degraded, not found or node down", node)
1461       result = False
1462     else:
1463       result = result and (not rstats.data[idx])
1464   if dev.children:
1465     for child in dev.children:
1466       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1467
1468   return result
1469
1470
1471 class LUDiagnoseOS(NoHooksLU):
1472   """Logical unit for OS diagnose/query.
1473
1474   """
1475   _OP_REQP = ["output_fields", "names"]
1476   REQ_BGL = False
1477   _FIELDS_STATIC = utils.FieldSet()
1478   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1479
1480   def ExpandNames(self):
1481     if self.op.names:
1482       raise errors.OpPrereqError("Selective OS query not supported")
1483
1484     _CheckOutputFields(static=self._FIELDS_STATIC,
1485                        dynamic=self._FIELDS_DYNAMIC,
1486                        selected=self.op.output_fields)
1487
1488     # Lock all nodes, in shared mode
1489     self.needed_locks = {}
1490     self.share_locks[locking.LEVEL_NODE] = 1
1491     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1492
1493   def CheckPrereq(self):
1494     """Check prerequisites.
1495
1496     """
1497
1498   @staticmethod
1499   def _DiagnoseByOS(node_list, rlist):
1500     """Remaps a per-node return list into an a per-os per-node dictionary
1501
1502     @param node_list: a list with the names of all nodes
1503     @param rlist: a map with node names as keys and OS objects as values
1504
1505     @rtype: dict
1506     @returns: a dictionary with osnames as keys and as value another map, with
1507         nodes as keys and list of OS objects as values, eg::
1508
1509           {"debian-etch": {"node1": [<object>,...],
1510                            "node2": [<object>,]}
1511           }
1512
1513     """
1514     all_os = {}
1515     for node_name, nr in rlist.iteritems():
1516       if nr.failed or not nr.data:
1517         continue
1518       for os_obj in nr.data:
1519         if os_obj.name not in all_os:
1520           # build a list of nodes for this os containing empty lists
1521           # for each node in node_list
1522           all_os[os_obj.name] = {}
1523           for nname in node_list:
1524             all_os[os_obj.name][nname] = []
1525         all_os[os_obj.name][node_name].append(os_obj)
1526     return all_os
1527
1528   def Exec(self, feedback_fn):
1529     """Compute the list of OSes.
1530
1531     """
1532     node_list = self.acquired_locks[locking.LEVEL_NODE]
1533     node_data = self.rpc.call_os_diagnose(node_list)
1534     if node_data == False:
1535       raise errors.OpExecError("Can't gather the list of OSes")
1536     pol = self._DiagnoseByOS(node_list, node_data)
1537     output = []
1538     for os_name, os_data in pol.iteritems():
1539       row = []
1540       for field in self.op.output_fields:
1541         if field == "name":
1542           val = os_name
1543         elif field == "valid":
1544           val = utils.all([osl and osl[0] for osl in os_data.values()])
1545         elif field == "node_status":
1546           val = {}
1547           for node_name, nos_list in os_data.iteritems():
1548             val[node_name] = [(v.status, v.path) for v in nos_list]
1549         else:
1550           raise errors.ParameterError(field)
1551         row.append(val)
1552       output.append(row)
1553
1554     return output
1555
1556
1557 class LURemoveNode(LogicalUnit):
1558   """Logical unit for removing a node.
1559
1560   """
1561   HPATH = "node-remove"
1562   HTYPE = constants.HTYPE_NODE
1563   _OP_REQP = ["node_name"]
1564
1565   def BuildHooksEnv(self):
1566     """Build hooks env.
1567
1568     This doesn't run on the target node in the pre phase as a failed
1569     node would then be impossible to remove.
1570
1571     """
1572     env = {
1573       "OP_TARGET": self.op.node_name,
1574       "NODE_NAME": self.op.node_name,
1575       }
1576     all_nodes = self.cfg.GetNodeList()
1577     all_nodes.remove(self.op.node_name)
1578     return env, all_nodes, all_nodes
1579
1580   def CheckPrereq(self):
1581     """Check prerequisites.
1582
1583     This checks:
1584      - the node exists in the configuration
1585      - it does not have primary or secondary instances
1586      - it's not the master
1587
1588     Any errors are signalled by raising errors.OpPrereqError.
1589
1590     """
1591     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1592     if node is None:
1593       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1594
1595     instance_list = self.cfg.GetInstanceList()
1596
1597     masternode = self.cfg.GetMasterNode()
1598     if node.name == masternode:
1599       raise errors.OpPrereqError("Node is the master node,"
1600                                  " you need to failover first.")
1601
1602     for instance_name in instance_list:
1603       instance = self.cfg.GetInstanceInfo(instance_name)
1604       if node.name == instance.primary_node:
1605         raise errors.OpPrereqError("Instance %s still running on the node,"
1606                                    " please remove first." % instance_name)
1607       if node.name in instance.secondary_nodes:
1608         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1609                                    " please remove first." % instance_name)
1610     self.op.node_name = node.name
1611     self.node = node
1612
1613   def Exec(self, feedback_fn):
1614     """Removes the node from the cluster.
1615
1616     """
1617     node = self.node
1618     logging.info("Stopping the node daemon and removing configs from node %s",
1619                  node.name)
1620
1621     self.context.RemoveNode(node.name)
1622
1623     self.rpc.call_node_leave_cluster(node.name)
1624
1625
1626 class LUQueryNodes(NoHooksLU):
1627   """Logical unit for querying nodes.
1628
1629   """
1630   _OP_REQP = ["output_fields", "names"]
1631   REQ_BGL = False
1632   _FIELDS_DYNAMIC = utils.FieldSet(
1633     "dtotal", "dfree",
1634     "mtotal", "mnode", "mfree",
1635     "bootid",
1636     "ctotal",
1637     )
1638
1639   _FIELDS_STATIC = utils.FieldSet(
1640     "name", "pinst_cnt", "sinst_cnt",
1641     "pinst_list", "sinst_list",
1642     "pip", "sip", "tags",
1643     "serial_no",
1644     "master_candidate",
1645     "master",
1646     )
1647
1648   def ExpandNames(self):
1649     _CheckOutputFields(static=self._FIELDS_STATIC,
1650                        dynamic=self._FIELDS_DYNAMIC,
1651                        selected=self.op.output_fields)
1652
1653     self.needed_locks = {}
1654     self.share_locks[locking.LEVEL_NODE] = 1
1655
1656     if self.op.names:
1657       self.wanted = _GetWantedNodes(self, self.op.names)
1658     else:
1659       self.wanted = locking.ALL_SET
1660
1661     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1662     if self.do_locking:
1663       # if we don't request only static fields, we need to lock the nodes
1664       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1665
1666
1667   def CheckPrereq(self):
1668     """Check prerequisites.
1669
1670     """
1671     # The validation of the node list is done in the _GetWantedNodes,
1672     # if non empty, and if empty, there's no validation to do
1673     pass
1674
1675   def Exec(self, feedback_fn):
1676     """Computes the list of nodes and their attributes.
1677
1678     """
1679     all_info = self.cfg.GetAllNodesInfo()
1680     if self.do_locking:
1681       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1682     elif self.wanted != locking.ALL_SET:
1683       nodenames = self.wanted
1684       missing = set(nodenames).difference(all_info.keys())
1685       if missing:
1686         raise errors.OpExecError(
1687           "Some nodes were removed before retrieving their data: %s" % missing)
1688     else:
1689       nodenames = all_info.keys()
1690
1691     nodenames = utils.NiceSort(nodenames)
1692     nodelist = [all_info[name] for name in nodenames]
1693
1694     # begin data gathering
1695
1696     if self.do_locking:
1697       live_data = {}
1698       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1699                                           self.cfg.GetHypervisorType())
1700       for name in nodenames:
1701         nodeinfo = node_data[name]
1702         if not nodeinfo.failed and nodeinfo.data:
1703           nodeinfo = nodeinfo.data
1704           fn = utils.TryConvert
1705           live_data[name] = {
1706             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1707             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1708             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1709             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1710             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1711             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1712             "bootid": nodeinfo.get('bootid', None),
1713             }
1714         else:
1715           live_data[name] = {}
1716     else:
1717       live_data = dict.fromkeys(nodenames, {})
1718
1719     node_to_primary = dict([(name, set()) for name in nodenames])
1720     node_to_secondary = dict([(name, set()) for name in nodenames])
1721
1722     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1723                              "sinst_cnt", "sinst_list"))
1724     if inst_fields & frozenset(self.op.output_fields):
1725       instancelist = self.cfg.GetInstanceList()
1726
1727       for instance_name in instancelist:
1728         inst = self.cfg.GetInstanceInfo(instance_name)
1729         if inst.primary_node in node_to_primary:
1730           node_to_primary[inst.primary_node].add(inst.name)
1731         for secnode in inst.secondary_nodes:
1732           if secnode in node_to_secondary:
1733             node_to_secondary[secnode].add(inst.name)
1734
1735     master_node = self.cfg.GetMasterNode()
1736
1737     # end data gathering
1738
1739     output = []
1740     for node in nodelist:
1741       node_output = []
1742       for field in self.op.output_fields:
1743         if field == "name":
1744           val = node.name
1745         elif field == "pinst_list":
1746           val = list(node_to_primary[node.name])
1747         elif field == "sinst_list":
1748           val = list(node_to_secondary[node.name])
1749         elif field == "pinst_cnt":
1750           val = len(node_to_primary[node.name])
1751         elif field == "sinst_cnt":
1752           val = len(node_to_secondary[node.name])
1753         elif field == "pip":
1754           val = node.primary_ip
1755         elif field == "sip":
1756           val = node.secondary_ip
1757         elif field == "tags":
1758           val = list(node.GetTags())
1759         elif field == "serial_no":
1760           val = node.serial_no
1761         elif field == "master_candidate":
1762           val = node.master_candidate
1763         elif field == "master":
1764           val = node.name == master_node
1765         elif self._FIELDS_DYNAMIC.Matches(field):
1766           val = live_data[node.name].get(field, None)
1767         else:
1768           raise errors.ParameterError(field)
1769         node_output.append(val)
1770       output.append(node_output)
1771
1772     return output
1773
1774
1775 class LUQueryNodeVolumes(NoHooksLU):
1776   """Logical unit for getting volumes on node(s).
1777
1778   """
1779   _OP_REQP = ["nodes", "output_fields"]
1780   REQ_BGL = False
1781   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1782   _FIELDS_STATIC = utils.FieldSet("node")
1783
1784   def ExpandNames(self):
1785     _CheckOutputFields(static=self._FIELDS_STATIC,
1786                        dynamic=self._FIELDS_DYNAMIC,
1787                        selected=self.op.output_fields)
1788
1789     self.needed_locks = {}
1790     self.share_locks[locking.LEVEL_NODE] = 1
1791     if not self.op.nodes:
1792       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1793     else:
1794       self.needed_locks[locking.LEVEL_NODE] = \
1795         _GetWantedNodes(self, self.op.nodes)
1796
1797   def CheckPrereq(self):
1798     """Check prerequisites.
1799
1800     This checks that the fields required are valid output fields.
1801
1802     """
1803     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1804
1805   def Exec(self, feedback_fn):
1806     """Computes the list of nodes and their attributes.
1807
1808     """
1809     nodenames = self.nodes
1810     volumes = self.rpc.call_node_volumes(nodenames)
1811
1812     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1813              in self.cfg.GetInstanceList()]
1814
1815     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1816
1817     output = []
1818     for node in nodenames:
1819       if node not in volumes or volumes[node].failed or not volumes[node].data:
1820         continue
1821
1822       node_vols = volumes[node].data[:]
1823       node_vols.sort(key=lambda vol: vol['dev'])
1824
1825       for vol in node_vols:
1826         node_output = []
1827         for field in self.op.output_fields:
1828           if field == "node":
1829             val = node
1830           elif field == "phys":
1831             val = vol['dev']
1832           elif field == "vg":
1833             val = vol['vg']
1834           elif field == "name":
1835             val = vol['name']
1836           elif field == "size":
1837             val = int(float(vol['size']))
1838           elif field == "instance":
1839             for inst in ilist:
1840               if node not in lv_by_node[inst]:
1841                 continue
1842               if vol['name'] in lv_by_node[inst][node]:
1843                 val = inst.name
1844                 break
1845             else:
1846               val = '-'
1847           else:
1848             raise errors.ParameterError(field)
1849           node_output.append(str(val))
1850
1851         output.append(node_output)
1852
1853     return output
1854
1855
1856 class LUAddNode(LogicalUnit):
1857   """Logical unit for adding node to the cluster.
1858
1859   """
1860   HPATH = "node-add"
1861   HTYPE = constants.HTYPE_NODE
1862   _OP_REQP = ["node_name"]
1863
1864   def BuildHooksEnv(self):
1865     """Build hooks env.
1866
1867     This will run on all nodes before, and on all nodes + the new node after.
1868
1869     """
1870     env = {
1871       "OP_TARGET": self.op.node_name,
1872       "NODE_NAME": self.op.node_name,
1873       "NODE_PIP": self.op.primary_ip,
1874       "NODE_SIP": self.op.secondary_ip,
1875       }
1876     nodes_0 = self.cfg.GetNodeList()
1877     nodes_1 = nodes_0 + [self.op.node_name, ]
1878     return env, nodes_0, nodes_1
1879
1880   def CheckPrereq(self):
1881     """Check prerequisites.
1882
1883     This checks:
1884      - the new node is not already in the config
1885      - it is resolvable
1886      - its parameters (single/dual homed) matches the cluster
1887
1888     Any errors are signalled by raising errors.OpPrereqError.
1889
1890     """
1891     node_name = self.op.node_name
1892     cfg = self.cfg
1893
1894     dns_data = utils.HostInfo(node_name)
1895
1896     node = dns_data.name
1897     primary_ip = self.op.primary_ip = dns_data.ip
1898     secondary_ip = getattr(self.op, "secondary_ip", None)
1899     if secondary_ip is None:
1900       secondary_ip = primary_ip
1901     if not utils.IsValidIP(secondary_ip):
1902       raise errors.OpPrereqError("Invalid secondary IP given")
1903     self.op.secondary_ip = secondary_ip
1904
1905     node_list = cfg.GetNodeList()
1906     if not self.op.readd and node in node_list:
1907       raise errors.OpPrereqError("Node %s is already in the configuration" %
1908                                  node)
1909     elif self.op.readd and node not in node_list:
1910       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1911
1912     for existing_node_name in node_list:
1913       existing_node = cfg.GetNodeInfo(existing_node_name)
1914
1915       if self.op.readd and node == existing_node_name:
1916         if (existing_node.primary_ip != primary_ip or
1917             existing_node.secondary_ip != secondary_ip):
1918           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1919                                      " address configuration as before")
1920         continue
1921
1922       if (existing_node.primary_ip == primary_ip or
1923           existing_node.secondary_ip == primary_ip or
1924           existing_node.primary_ip == secondary_ip or
1925           existing_node.secondary_ip == secondary_ip):
1926         raise errors.OpPrereqError("New node ip address(es) conflict with"
1927                                    " existing node %s" % existing_node.name)
1928
1929     # check that the type of the node (single versus dual homed) is the
1930     # same as for the master
1931     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1932     master_singlehomed = myself.secondary_ip == myself.primary_ip
1933     newbie_singlehomed = secondary_ip == primary_ip
1934     if master_singlehomed != newbie_singlehomed:
1935       if master_singlehomed:
1936         raise errors.OpPrereqError("The master has no private ip but the"
1937                                    " new node has one")
1938       else:
1939         raise errors.OpPrereqError("The master has a private ip but the"
1940                                    " new node doesn't have one")
1941
1942     # checks reachablity
1943     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1944       raise errors.OpPrereqError("Node not reachable by ping")
1945
1946     if not newbie_singlehomed:
1947       # check reachability from my secondary ip to newbie's secondary ip
1948       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1949                            source=myself.secondary_ip):
1950         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1951                                    " based ping to noded port")
1952
1953     self.new_node = objects.Node(name=node,
1954                                  primary_ip=primary_ip,
1955                                  secondary_ip=secondary_ip)
1956
1957   def Exec(self, feedback_fn):
1958     """Adds the new node to the cluster.
1959
1960     """
1961     new_node = self.new_node
1962     node = new_node.name
1963
1964     # check connectivity
1965     result = self.rpc.call_version([node])[node]
1966     result.Raise()
1967     if result.data:
1968       if constants.PROTOCOL_VERSION == result.data:
1969         logging.info("Communication to node %s fine, sw version %s match",
1970                      node, result.data)
1971       else:
1972         raise errors.OpExecError("Version mismatch master version %s,"
1973                                  " node version %s" %
1974                                  (constants.PROTOCOL_VERSION, result.data))
1975     else:
1976       raise errors.OpExecError("Cannot get version from the new node")
1977
1978     # setup ssh on node
1979     logging.info("Copy ssh key to node %s", node)
1980     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1981     keyarray = []
1982     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1983                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1984                 priv_key, pub_key]
1985
1986     for i in keyfiles:
1987       f = open(i, 'r')
1988       try:
1989         keyarray.append(f.read())
1990       finally:
1991         f.close()
1992
1993     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1994                                     keyarray[2],
1995                                     keyarray[3], keyarray[4], keyarray[5])
1996
1997     if result.failed or not result.data:
1998       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1999
2000     # Add node to our /etc/hosts, and add key to known_hosts
2001     utils.AddHostToEtcHosts(new_node.name)
2002
2003     if new_node.secondary_ip != new_node.primary_ip:
2004       result = self.rpc.call_node_has_ip_address(new_node.name,
2005                                                  new_node.secondary_ip)
2006       if result.failed or not result.data:
2007         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2008                                  " you gave (%s). Please fix and re-run this"
2009                                  " command." % new_node.secondary_ip)
2010
2011     node_verify_list = [self.cfg.GetMasterNode()]
2012     node_verify_param = {
2013       'nodelist': [node],
2014       # TODO: do a node-net-test as well?
2015     }
2016
2017     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2018                                        self.cfg.GetClusterName())
2019     for verifier in node_verify_list:
2020       if result[verifier].failed or not result[verifier].data:
2021         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2022                                  " for remote verification" % verifier)
2023       if result[verifier].data['nodelist']:
2024         for failed in result[verifier].data['nodelist']:
2025           feedback_fn("ssh/hostname verification failed %s -> %s" %
2026                       (verifier, result[verifier]['nodelist'][failed]))
2027         raise errors.OpExecError("ssh/hostname verification failed.")
2028
2029     # Distribute updated /etc/hosts and known_hosts to all nodes,
2030     # including the node just added
2031     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2032     dist_nodes = self.cfg.GetNodeList()
2033     if not self.op.readd:
2034       dist_nodes.append(node)
2035     if myself.name in dist_nodes:
2036       dist_nodes.remove(myself.name)
2037
2038     logging.debug("Copying hosts and known_hosts to all nodes")
2039     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2040       result = self.rpc.call_upload_file(dist_nodes, fname)
2041       for to_node, to_result in result.iteritems():
2042         if to_result.failed or not to_result.data:
2043           logging.error("Copy of file %s to node %s failed", fname, to_node)
2044
2045     to_copy = []
2046     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2047       to_copy.append(constants.VNC_PASSWORD_FILE)
2048     for fname in to_copy:
2049       result = self.rpc.call_upload_file([node], fname)
2050       if result[node].failed or not result[node]:
2051         logging.error("Could not copy file %s to node %s", fname, node)
2052
2053     if self.op.readd:
2054       self.context.ReaddNode(new_node)
2055     else:
2056       self.context.AddNode(new_node)
2057
2058
2059 class LUSetNodeParams(LogicalUnit):
2060   """Modifies the parameters of a node.
2061
2062   """
2063   HPATH = "node-modify"
2064   HTYPE = constants.HTYPE_NODE
2065   _OP_REQP = ["node_name"]
2066   REQ_BGL = False
2067
2068   def CheckArguments(self):
2069     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2070     if node_name is None:
2071       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2072     self.op.node_name = node_name
2073     if not hasattr(self.op, 'master_candidate'):
2074       raise errors.OpPrereqError("Please pass at least one modification")
2075     self.op.master_candidate = bool(self.op.master_candidate)
2076
2077   def ExpandNames(self):
2078     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2079
2080   def BuildHooksEnv(self):
2081     """Build hooks env.
2082
2083     This runs on the master node.
2084
2085     """
2086     env = {
2087       "OP_TARGET": self.op.node_name,
2088       "MASTER_CANDIDATE": str(self.op.master_candidate),
2089       }
2090     nl = [self.cfg.GetMasterNode(),
2091           self.op.node_name]
2092     return env, nl, nl
2093
2094   def CheckPrereq(self):
2095     """Check prerequisites.
2096
2097     This only checks the instance list against the existing names.
2098
2099     """
2100     force = self.force = self.op.force
2101
2102     if self.op.master_candidate == False:
2103       if self.op.node_name == self.cfg.GetMasterNode():
2104         raise errors.OpPrereqError("The master node has to be a"
2105                                    " master candidate")
2106       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2107       node_info = self.cfg.GetAllNodesInfo().values()
2108       num_candidates = len([node for node in node_info
2109                             if node.master_candidate])
2110       if num_candidates <= cp_size:
2111         msg = ("Not enough master candidates (desired"
2112                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2113         if force:
2114           self.LogWarning(msg)
2115         else:
2116           raise errors.OpPrereqError(msg)
2117
2118     return
2119
2120   def Exec(self, feedback_fn):
2121     """Modifies a node.
2122
2123     """
2124     node = self.cfg.GetNodeInfo(self.op.node_name)
2125
2126     result = []
2127
2128     if self.op.master_candidate is not None:
2129       node.master_candidate = self.op.master_candidate
2130       result.append(("master_candidate", str(self.op.master_candidate)))
2131
2132     # this will trigger configuration file update, if needed
2133     self.cfg.Update(node)
2134     # this will trigger job queue propagation or cleanup
2135     if self.op.node_name != self.cfg.GetMasterNode():
2136       self.context.ReaddNode(node)
2137
2138     return result
2139
2140
2141 class LUQueryClusterInfo(NoHooksLU):
2142   """Query cluster configuration.
2143
2144   """
2145   _OP_REQP = []
2146   REQ_BGL = False
2147
2148   def ExpandNames(self):
2149     self.needed_locks = {}
2150
2151   def CheckPrereq(self):
2152     """No prerequsites needed for this LU.
2153
2154     """
2155     pass
2156
2157   def Exec(self, feedback_fn):
2158     """Return cluster config.
2159
2160     """
2161     cluster = self.cfg.GetClusterInfo()
2162     result = {
2163       "software_version": constants.RELEASE_VERSION,
2164       "protocol_version": constants.PROTOCOL_VERSION,
2165       "config_version": constants.CONFIG_VERSION,
2166       "os_api_version": constants.OS_API_VERSION,
2167       "export_version": constants.EXPORT_VERSION,
2168       "architecture": (platform.architecture()[0], platform.machine()),
2169       "name": cluster.cluster_name,
2170       "master": cluster.master_node,
2171       "default_hypervisor": cluster.default_hypervisor,
2172       "enabled_hypervisors": cluster.enabled_hypervisors,
2173       "hvparams": cluster.hvparams,
2174       "beparams": cluster.beparams,
2175       "candidate_pool_size": cluster.candidate_pool_size,
2176       }
2177
2178     return result
2179
2180
2181 class LUQueryConfigValues(NoHooksLU):
2182   """Return configuration values.
2183
2184   """
2185   _OP_REQP = []
2186   REQ_BGL = False
2187   _FIELDS_DYNAMIC = utils.FieldSet()
2188   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2189
2190   def ExpandNames(self):
2191     self.needed_locks = {}
2192
2193     _CheckOutputFields(static=self._FIELDS_STATIC,
2194                        dynamic=self._FIELDS_DYNAMIC,
2195                        selected=self.op.output_fields)
2196
2197   def CheckPrereq(self):
2198     """No prerequisites.
2199
2200     """
2201     pass
2202
2203   def Exec(self, feedback_fn):
2204     """Dump a representation of the cluster config to the standard output.
2205
2206     """
2207     values = []
2208     for field in self.op.output_fields:
2209       if field == "cluster_name":
2210         entry = self.cfg.GetClusterName()
2211       elif field == "master_node":
2212         entry = self.cfg.GetMasterNode()
2213       elif field == "drain_flag":
2214         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2215       else:
2216         raise errors.ParameterError(field)
2217       values.append(entry)
2218     return values
2219
2220
2221 class LUActivateInstanceDisks(NoHooksLU):
2222   """Bring up an instance's disks.
2223
2224   """
2225   _OP_REQP = ["instance_name"]
2226   REQ_BGL = False
2227
2228   def ExpandNames(self):
2229     self._ExpandAndLockInstance()
2230     self.needed_locks[locking.LEVEL_NODE] = []
2231     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2232
2233   def DeclareLocks(self, level):
2234     if level == locking.LEVEL_NODE:
2235       self._LockInstancesNodes()
2236
2237   def CheckPrereq(self):
2238     """Check prerequisites.
2239
2240     This checks that the instance is in the cluster.
2241
2242     """
2243     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2244     assert self.instance is not None, \
2245       "Cannot retrieve locked instance %s" % self.op.instance_name
2246
2247   def Exec(self, feedback_fn):
2248     """Activate the disks.
2249
2250     """
2251     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2252     if not disks_ok:
2253       raise errors.OpExecError("Cannot activate block devices")
2254
2255     return disks_info
2256
2257
2258 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2259   """Prepare the block devices for an instance.
2260
2261   This sets up the block devices on all nodes.
2262
2263   @type lu: L{LogicalUnit}
2264   @param lu: the logical unit on whose behalf we execute
2265   @type instance: L{objects.Instance}
2266   @param instance: the instance for whose disks we assemble
2267   @type ignore_secondaries: boolean
2268   @param ignore_secondaries: if true, errors on secondary nodes
2269       won't result in an error return from the function
2270   @return: False if the operation failed, otherwise a list of
2271       (host, instance_visible_name, node_visible_name)
2272       with the mapping from node devices to instance devices
2273
2274   """
2275   device_info = []
2276   disks_ok = True
2277   iname = instance.name
2278   # With the two passes mechanism we try to reduce the window of
2279   # opportunity for the race condition of switching DRBD to primary
2280   # before handshaking occured, but we do not eliminate it
2281
2282   # The proper fix would be to wait (with some limits) until the
2283   # connection has been made and drbd transitions from WFConnection
2284   # into any other network-connected state (Connected, SyncTarget,
2285   # SyncSource, etc.)
2286
2287   # 1st pass, assemble on all nodes in secondary mode
2288   for inst_disk in instance.disks:
2289     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2290       lu.cfg.SetDiskID(node_disk, node)
2291       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2292       if result.failed or not result:
2293         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2294                            " (is_primary=False, pass=1)",
2295                            inst_disk.iv_name, node)
2296         if not ignore_secondaries:
2297           disks_ok = False
2298
2299   # FIXME: race condition on drbd migration to primary
2300
2301   # 2nd pass, do only the primary node
2302   for inst_disk in instance.disks:
2303     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2304       if node != instance.primary_node:
2305         continue
2306       lu.cfg.SetDiskID(node_disk, node)
2307       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2308       if result.failed or not result:
2309         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2310                            " (is_primary=True, pass=2)",
2311                            inst_disk.iv_name, node)
2312         disks_ok = False
2313     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2314
2315   # leave the disks configured for the primary node
2316   # this is a workaround that would be fixed better by
2317   # improving the logical/physical id handling
2318   for disk in instance.disks:
2319     lu.cfg.SetDiskID(disk, instance.primary_node)
2320
2321   return disks_ok, device_info
2322
2323
2324 def _StartInstanceDisks(lu, instance, force):
2325   """Start the disks of an instance.
2326
2327   """
2328   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2329                                            ignore_secondaries=force)
2330   if not disks_ok:
2331     _ShutdownInstanceDisks(lu, instance)
2332     if force is not None and not force:
2333       lu.proc.LogWarning("", hint="If the message above refers to a"
2334                          " secondary node,"
2335                          " you can retry the operation using '--force'.")
2336     raise errors.OpExecError("Disk consistency error")
2337
2338
2339 class LUDeactivateInstanceDisks(NoHooksLU):
2340   """Shutdown an instance's disks.
2341
2342   """
2343   _OP_REQP = ["instance_name"]
2344   REQ_BGL = False
2345
2346   def ExpandNames(self):
2347     self._ExpandAndLockInstance()
2348     self.needed_locks[locking.LEVEL_NODE] = []
2349     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2350
2351   def DeclareLocks(self, level):
2352     if level == locking.LEVEL_NODE:
2353       self._LockInstancesNodes()
2354
2355   def CheckPrereq(self):
2356     """Check prerequisites.
2357
2358     This checks that the instance is in the cluster.
2359
2360     """
2361     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2362     assert self.instance is not None, \
2363       "Cannot retrieve locked instance %s" % self.op.instance_name
2364
2365   def Exec(self, feedback_fn):
2366     """Deactivate the disks
2367
2368     """
2369     instance = self.instance
2370     _SafeShutdownInstanceDisks(self, instance)
2371
2372
2373 def _SafeShutdownInstanceDisks(lu, instance):
2374   """Shutdown block devices of an instance.
2375
2376   This function checks if an instance is running, before calling
2377   _ShutdownInstanceDisks.
2378
2379   """
2380   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2381                                       [instance.hypervisor])
2382   ins_l = ins_l[instance.primary_node]
2383   if ins_l.failed or not isinstance(ins_l.data, list):
2384     raise errors.OpExecError("Can't contact node '%s'" %
2385                              instance.primary_node)
2386
2387   if instance.name in ins_l.data:
2388     raise errors.OpExecError("Instance is running, can't shutdown"
2389                              " block devices.")
2390
2391   _ShutdownInstanceDisks(lu, instance)
2392
2393
2394 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2395   """Shutdown block devices of an instance.
2396
2397   This does the shutdown on all nodes of the instance.
2398
2399   If the ignore_primary is false, errors on the primary node are
2400   ignored.
2401
2402   """
2403   result = True
2404   for disk in instance.disks:
2405     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2406       lu.cfg.SetDiskID(top_disk, node)
2407       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2408       if result.failed or not result.data:
2409         logging.error("Could not shutdown block device %s on node %s",
2410                       disk.iv_name, node)
2411         if not ignore_primary or node != instance.primary_node:
2412           result = False
2413   return result
2414
2415
2416 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2417   """Checks if a node has enough free memory.
2418
2419   This function check if a given node has the needed amount of free
2420   memory. In case the node has less memory or we cannot get the
2421   information from the node, this function raise an OpPrereqError
2422   exception.
2423
2424   @type lu: C{LogicalUnit}
2425   @param lu: a logical unit from which we get configuration data
2426   @type node: C{str}
2427   @param node: the node to check
2428   @type reason: C{str}
2429   @param reason: string to use in the error message
2430   @type requested: C{int}
2431   @param requested: the amount of memory in MiB to check for
2432   @type hypervisor: C{str}
2433   @param hypervisor: the hypervisor to ask for memory stats
2434   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2435       we cannot check the node
2436
2437   """
2438   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2439   nodeinfo[node].Raise()
2440   free_mem = nodeinfo[node].data.get('memory_free')
2441   if not isinstance(free_mem, int):
2442     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2443                              " was '%s'" % (node, free_mem))
2444   if requested > free_mem:
2445     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2446                              " needed %s MiB, available %s MiB" %
2447                              (node, reason, requested, free_mem))
2448
2449
2450 class LUStartupInstance(LogicalUnit):
2451   """Starts an instance.
2452
2453   """
2454   HPATH = "instance-start"
2455   HTYPE = constants.HTYPE_INSTANCE
2456   _OP_REQP = ["instance_name", "force"]
2457   REQ_BGL = False
2458
2459   def ExpandNames(self):
2460     self._ExpandAndLockInstance()
2461
2462   def BuildHooksEnv(self):
2463     """Build hooks env.
2464
2465     This runs on master, primary and secondary nodes of the instance.
2466
2467     """
2468     env = {
2469       "FORCE": self.op.force,
2470       }
2471     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2472     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2473           list(self.instance.secondary_nodes))
2474     return env, nl, nl
2475
2476   def CheckPrereq(self):
2477     """Check prerequisites.
2478
2479     This checks that the instance is in the cluster.
2480
2481     """
2482     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2483     assert self.instance is not None, \
2484       "Cannot retrieve locked instance %s" % self.op.instance_name
2485
2486     bep = self.cfg.GetClusterInfo().FillBE(instance)
2487     # check bridges existance
2488     _CheckInstanceBridgesExist(self, instance)
2489
2490     _CheckNodeFreeMemory(self, instance.primary_node,
2491                          "starting instance %s" % instance.name,
2492                          bep[constants.BE_MEMORY], instance.hypervisor)
2493
2494   def Exec(self, feedback_fn):
2495     """Start the instance.
2496
2497     """
2498     instance = self.instance
2499     force = self.op.force
2500     extra_args = getattr(self.op, "extra_args", "")
2501
2502     self.cfg.MarkInstanceUp(instance.name)
2503
2504     node_current = instance.primary_node
2505
2506     _StartInstanceDisks(self, instance, force)
2507
2508     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2509     if result.failed or not result.data:
2510       _ShutdownInstanceDisks(self, instance)
2511       raise errors.OpExecError("Could not start instance")
2512
2513
2514 class LURebootInstance(LogicalUnit):
2515   """Reboot an instance.
2516
2517   """
2518   HPATH = "instance-reboot"
2519   HTYPE = constants.HTYPE_INSTANCE
2520   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2521   REQ_BGL = False
2522
2523   def ExpandNames(self):
2524     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2525                                    constants.INSTANCE_REBOOT_HARD,
2526                                    constants.INSTANCE_REBOOT_FULL]:
2527       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2528                                   (constants.INSTANCE_REBOOT_SOFT,
2529                                    constants.INSTANCE_REBOOT_HARD,
2530                                    constants.INSTANCE_REBOOT_FULL))
2531     self._ExpandAndLockInstance()
2532
2533   def BuildHooksEnv(self):
2534     """Build hooks env.
2535
2536     This runs on master, primary and secondary nodes of the instance.
2537
2538     """
2539     env = {
2540       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2541       }
2542     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2543     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2544           list(self.instance.secondary_nodes))
2545     return env, nl, nl
2546
2547   def CheckPrereq(self):
2548     """Check prerequisites.
2549
2550     This checks that the instance is in the cluster.
2551
2552     """
2553     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2554     assert self.instance is not None, \
2555       "Cannot retrieve locked instance %s" % self.op.instance_name
2556
2557     # check bridges existance
2558     _CheckInstanceBridgesExist(self, instance)
2559
2560   def Exec(self, feedback_fn):
2561     """Reboot the instance.
2562
2563     """
2564     instance = self.instance
2565     ignore_secondaries = self.op.ignore_secondaries
2566     reboot_type = self.op.reboot_type
2567     extra_args = getattr(self.op, "extra_args", "")
2568
2569     node_current = instance.primary_node
2570
2571     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2572                        constants.INSTANCE_REBOOT_HARD]:
2573       result = self.rpc.call_instance_reboot(node_current, instance,
2574                                              reboot_type, extra_args)
2575       if result.failed or not result.data:
2576         raise errors.OpExecError("Could not reboot instance")
2577     else:
2578       if not self.rpc.call_instance_shutdown(node_current, instance):
2579         raise errors.OpExecError("could not shutdown instance for full reboot")
2580       _ShutdownInstanceDisks(self, instance)
2581       _StartInstanceDisks(self, instance, ignore_secondaries)
2582       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2583       if result.failed or not result.data:
2584         _ShutdownInstanceDisks(self, instance)
2585         raise errors.OpExecError("Could not start instance for full reboot")
2586
2587     self.cfg.MarkInstanceUp(instance.name)
2588
2589
2590 class LUShutdownInstance(LogicalUnit):
2591   """Shutdown an instance.
2592
2593   """
2594   HPATH = "instance-stop"
2595   HTYPE = constants.HTYPE_INSTANCE
2596   _OP_REQP = ["instance_name"]
2597   REQ_BGL = False
2598
2599   def ExpandNames(self):
2600     self._ExpandAndLockInstance()
2601
2602   def BuildHooksEnv(self):
2603     """Build hooks env.
2604
2605     This runs on master, primary and secondary nodes of the instance.
2606
2607     """
2608     env = _BuildInstanceHookEnvByObject(self, self.instance)
2609     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2610           list(self.instance.secondary_nodes))
2611     return env, nl, nl
2612
2613   def CheckPrereq(self):
2614     """Check prerequisites.
2615
2616     This checks that the instance is in the cluster.
2617
2618     """
2619     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2620     assert self.instance is not None, \
2621       "Cannot retrieve locked instance %s" % self.op.instance_name
2622
2623   def Exec(self, feedback_fn):
2624     """Shutdown the instance.
2625
2626     """
2627     instance = self.instance
2628     node_current = instance.primary_node
2629     self.cfg.MarkInstanceDown(instance.name)
2630     result = self.rpc.call_instance_shutdown(node_current, instance)
2631     if result.failed or not result.data:
2632       self.proc.LogWarning("Could not shutdown instance")
2633
2634     _ShutdownInstanceDisks(self, instance)
2635
2636
2637 class LUReinstallInstance(LogicalUnit):
2638   """Reinstall an instance.
2639
2640   """
2641   HPATH = "instance-reinstall"
2642   HTYPE = constants.HTYPE_INSTANCE
2643   _OP_REQP = ["instance_name"]
2644   REQ_BGL = False
2645
2646   def ExpandNames(self):
2647     self._ExpandAndLockInstance()
2648
2649   def BuildHooksEnv(self):
2650     """Build hooks env.
2651
2652     This runs on master, primary and secondary nodes of the instance.
2653
2654     """
2655     env = _BuildInstanceHookEnvByObject(self, self.instance)
2656     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2657           list(self.instance.secondary_nodes))
2658     return env, nl, nl
2659
2660   def CheckPrereq(self):
2661     """Check prerequisites.
2662
2663     This checks that the instance is in the cluster and is not running.
2664
2665     """
2666     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2667     assert instance is not None, \
2668       "Cannot retrieve locked instance %s" % self.op.instance_name
2669
2670     if instance.disk_template == constants.DT_DISKLESS:
2671       raise errors.OpPrereqError("Instance '%s' has no disks" %
2672                                  self.op.instance_name)
2673     if instance.status != "down":
2674       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2675                                  self.op.instance_name)
2676     remote_info = self.rpc.call_instance_info(instance.primary_node,
2677                                               instance.name,
2678                                               instance.hypervisor)
2679     if remote_info.failed or remote_info.data:
2680       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2681                                  (self.op.instance_name,
2682                                   instance.primary_node))
2683
2684     self.op.os_type = getattr(self.op, "os_type", None)
2685     if self.op.os_type is not None:
2686       # OS verification
2687       pnode = self.cfg.GetNodeInfo(
2688         self.cfg.ExpandNodeName(instance.primary_node))
2689       if pnode is None:
2690         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2691                                    self.op.pnode)
2692       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2693       result.Raise()
2694       if not isinstance(result.data, objects.OS):
2695         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2696                                    " primary node"  % self.op.os_type)
2697
2698     self.instance = instance
2699
2700   def Exec(self, feedback_fn):
2701     """Reinstall the instance.
2702
2703     """
2704     inst = self.instance
2705
2706     if self.op.os_type is not None:
2707       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2708       inst.os = self.op.os_type
2709       self.cfg.Update(inst)
2710
2711     _StartInstanceDisks(self, inst, None)
2712     try:
2713       feedback_fn("Running the instance OS create scripts...")
2714       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2715       result.Raise()
2716       if not result.data:
2717         raise errors.OpExecError("Could not install OS for instance %s"
2718                                  " on node %s" %
2719                                  (inst.name, inst.primary_node))
2720     finally:
2721       _ShutdownInstanceDisks(self, inst)
2722
2723
2724 class LURenameInstance(LogicalUnit):
2725   """Rename an instance.
2726
2727   """
2728   HPATH = "instance-rename"
2729   HTYPE = constants.HTYPE_INSTANCE
2730   _OP_REQP = ["instance_name", "new_name"]
2731
2732   def BuildHooksEnv(self):
2733     """Build hooks env.
2734
2735     This runs on master, primary and secondary nodes of the instance.
2736
2737     """
2738     env = _BuildInstanceHookEnvByObject(self, self.instance)
2739     env["INSTANCE_NEW_NAME"] = self.op.new_name
2740     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2741           list(self.instance.secondary_nodes))
2742     return env, nl, nl
2743
2744   def CheckPrereq(self):
2745     """Check prerequisites.
2746
2747     This checks that the instance is in the cluster and is not running.
2748
2749     """
2750     instance = self.cfg.GetInstanceInfo(
2751       self.cfg.ExpandInstanceName(self.op.instance_name))
2752     if instance is None:
2753       raise errors.OpPrereqError("Instance '%s' not known" %
2754                                  self.op.instance_name)
2755     if instance.status != "down":
2756       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2757                                  self.op.instance_name)
2758     remote_info = self.rpc.call_instance_info(instance.primary_node,
2759                                               instance.name,
2760                                               instance.hypervisor)
2761     remote_info.Raise()
2762     if remote_info.data:
2763       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2764                                  (self.op.instance_name,
2765                                   instance.primary_node))
2766     self.instance = instance
2767
2768     # new name verification
2769     name_info = utils.HostInfo(self.op.new_name)
2770
2771     self.op.new_name = new_name = name_info.name
2772     instance_list = self.cfg.GetInstanceList()
2773     if new_name in instance_list:
2774       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2775                                  new_name)
2776
2777     if not getattr(self.op, "ignore_ip", False):
2778       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2779         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2780                                    (name_info.ip, new_name))
2781
2782
2783   def Exec(self, feedback_fn):
2784     """Reinstall the instance.
2785
2786     """
2787     inst = self.instance
2788     old_name = inst.name
2789
2790     if inst.disk_template == constants.DT_FILE:
2791       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2792
2793     self.cfg.RenameInstance(inst.name, self.op.new_name)
2794     # Change the instance lock. This is definitely safe while we hold the BGL
2795     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2796     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2797
2798     # re-read the instance from the configuration after rename
2799     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2800
2801     if inst.disk_template == constants.DT_FILE:
2802       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2803       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2804                                                      old_file_storage_dir,
2805                                                      new_file_storage_dir)
2806       result.Raise()
2807       if not result.data:
2808         raise errors.OpExecError("Could not connect to node '%s' to rename"
2809                                  " directory '%s' to '%s' (but the instance"
2810                                  " has been renamed in Ganeti)" % (
2811                                  inst.primary_node, old_file_storage_dir,
2812                                  new_file_storage_dir))
2813
2814       if not result.data[0]:
2815         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2816                                  " (but the instance has been renamed in"
2817                                  " Ganeti)" % (old_file_storage_dir,
2818                                                new_file_storage_dir))
2819
2820     _StartInstanceDisks(self, inst, None)
2821     try:
2822       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2823                                                  old_name)
2824       if result.failed or not result.data:
2825         msg = ("Could not run OS rename script for instance %s on node %s"
2826                " (but the instance has been renamed in Ganeti)" %
2827                (inst.name, inst.primary_node))
2828         self.proc.LogWarning(msg)
2829     finally:
2830       _ShutdownInstanceDisks(self, inst)
2831
2832
2833 class LURemoveInstance(LogicalUnit):
2834   """Remove an instance.
2835
2836   """
2837   HPATH = "instance-remove"
2838   HTYPE = constants.HTYPE_INSTANCE
2839   _OP_REQP = ["instance_name", "ignore_failures"]
2840   REQ_BGL = False
2841
2842   def ExpandNames(self):
2843     self._ExpandAndLockInstance()
2844     self.needed_locks[locking.LEVEL_NODE] = []
2845     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2846
2847   def DeclareLocks(self, level):
2848     if level == locking.LEVEL_NODE:
2849       self._LockInstancesNodes()
2850
2851   def BuildHooksEnv(self):
2852     """Build hooks env.
2853
2854     This runs on master, primary and secondary nodes of the instance.
2855
2856     """
2857     env = _BuildInstanceHookEnvByObject(self, self.instance)
2858     nl = [self.cfg.GetMasterNode()]
2859     return env, nl, nl
2860
2861   def CheckPrereq(self):
2862     """Check prerequisites.
2863
2864     This checks that the instance is in the cluster.
2865
2866     """
2867     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2868     assert self.instance is not None, \
2869       "Cannot retrieve locked instance %s" % self.op.instance_name
2870
2871   def Exec(self, feedback_fn):
2872     """Remove the instance.
2873
2874     """
2875     instance = self.instance
2876     logging.info("Shutting down instance %s on node %s",
2877                  instance.name, instance.primary_node)
2878
2879     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2880     if result.failed or not result.data:
2881       if self.op.ignore_failures:
2882         feedback_fn("Warning: can't shutdown instance")
2883       else:
2884         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2885                                  (instance.name, instance.primary_node))
2886
2887     logging.info("Removing block devices for instance %s", instance.name)
2888
2889     if not _RemoveDisks(self, instance):
2890       if self.op.ignore_failures:
2891         feedback_fn("Warning: can't remove instance's disks")
2892       else:
2893         raise errors.OpExecError("Can't remove instance's disks")
2894
2895     logging.info("Removing instance %s out of cluster config", instance.name)
2896
2897     self.cfg.RemoveInstance(instance.name)
2898     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2899
2900
2901 class LUQueryInstances(NoHooksLU):
2902   """Logical unit for querying instances.
2903
2904   """
2905   _OP_REQP = ["output_fields", "names"]
2906   REQ_BGL = False
2907   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2908                                     "admin_state", "admin_ram",
2909                                     "disk_template", "ip", "mac", "bridge",
2910                                     "sda_size", "sdb_size", "vcpus", "tags",
2911                                     "network_port", "beparams",
2912                                     "(disk).(size)/([0-9]+)",
2913                                     "(disk).(sizes)",
2914                                     "(nic).(mac|ip|bridge)/([0-9]+)",
2915                                     "(nic).(macs|ips|bridges)",
2916                                     "(disk|nic).(count)",
2917                                     "serial_no", "hypervisor", "hvparams",] +
2918                                   ["hv/%s" % name
2919                                    for name in constants.HVS_PARAMETERS] +
2920                                   ["be/%s" % name
2921                                    for name in constants.BES_PARAMETERS])
2922   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2923
2924
2925   def ExpandNames(self):
2926     _CheckOutputFields(static=self._FIELDS_STATIC,
2927                        dynamic=self._FIELDS_DYNAMIC,
2928                        selected=self.op.output_fields)
2929
2930     self.needed_locks = {}
2931     self.share_locks[locking.LEVEL_INSTANCE] = 1
2932     self.share_locks[locking.LEVEL_NODE] = 1
2933
2934     if self.op.names:
2935       self.wanted = _GetWantedInstances(self, self.op.names)
2936     else:
2937       self.wanted = locking.ALL_SET
2938
2939     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2940     if self.do_locking:
2941       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2942       self.needed_locks[locking.LEVEL_NODE] = []
2943       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2944
2945   def DeclareLocks(self, level):
2946     if level == locking.LEVEL_NODE and self.do_locking:
2947       self._LockInstancesNodes()
2948
2949   def CheckPrereq(self):
2950     """Check prerequisites.
2951
2952     """
2953     pass
2954
2955   def Exec(self, feedback_fn):
2956     """Computes the list of nodes and their attributes.
2957
2958     """
2959     all_info = self.cfg.GetAllInstancesInfo()
2960     if self.do_locking:
2961       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2962     elif self.wanted != locking.ALL_SET:
2963       instance_names = self.wanted
2964       missing = set(instance_names).difference(all_info.keys())
2965       if missing:
2966         raise errors.OpExecError(
2967           "Some instances were removed before retrieving their data: %s"
2968           % missing)
2969     else:
2970       instance_names = all_info.keys()
2971
2972     instance_names = utils.NiceSort(instance_names)
2973     instance_list = [all_info[iname] for iname in instance_names]
2974
2975     # begin data gathering
2976
2977     nodes = frozenset([inst.primary_node for inst in instance_list])
2978     hv_list = list(set([inst.hypervisor for inst in instance_list]))
2979
2980     bad_nodes = []
2981     if self.do_locking:
2982       live_data = {}
2983       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2984       for name in nodes:
2985         result = node_data[name]
2986         if result.failed:
2987           bad_nodes.append(name)
2988         else:
2989           if result.data:
2990             live_data.update(result.data)
2991             # else no instance is alive
2992     else:
2993       live_data = dict([(name, {}) for name in instance_names])
2994
2995     # end data gathering
2996
2997     HVPREFIX = "hv/"
2998     BEPREFIX = "be/"
2999     output = []
3000     for instance in instance_list:
3001       iout = []
3002       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3003       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3004       for field in self.op.output_fields:
3005         st_match = self._FIELDS_STATIC.Matches(field)
3006         if field == "name":
3007           val = instance.name
3008         elif field == "os":
3009           val = instance.os
3010         elif field == "pnode":
3011           val = instance.primary_node
3012         elif field == "snodes":
3013           val = list(instance.secondary_nodes)
3014         elif field == "admin_state":
3015           val = (instance.status != "down")
3016         elif field == "oper_state":
3017           if instance.primary_node in bad_nodes:
3018             val = None
3019           else:
3020             val = bool(live_data.get(instance.name))
3021         elif field == "status":
3022           if instance.primary_node in bad_nodes:
3023             val = "ERROR_nodedown"
3024           else:
3025             running = bool(live_data.get(instance.name))
3026             if running:
3027               if instance.status != "down":
3028                 val = "running"
3029               else:
3030                 val = "ERROR_up"
3031             else:
3032               if instance.status != "down":
3033                 val = "ERROR_down"
3034               else:
3035                 val = "ADMIN_down"
3036         elif field == "oper_ram":
3037           if instance.primary_node in bad_nodes:
3038             val = None
3039           elif instance.name in live_data:
3040             val = live_data[instance.name].get("memory", "?")
3041           else:
3042             val = "-"
3043         elif field == "disk_template":
3044           val = instance.disk_template
3045         elif field == "ip":
3046           val = instance.nics[0].ip
3047         elif field == "bridge":
3048           val = instance.nics[0].bridge
3049         elif field == "mac":
3050           val = instance.nics[0].mac
3051         elif field == "sda_size" or field == "sdb_size":
3052           idx = ord(field[2]) - ord('a')
3053           try:
3054             val = instance.FindDisk(idx).size
3055           except errors.OpPrereqError:
3056             val = None
3057         elif field == "tags":
3058           val = list(instance.GetTags())
3059         elif field == "serial_no":
3060           val = instance.serial_no
3061         elif field == "network_port":
3062           val = instance.network_port
3063         elif field == "hypervisor":
3064           val = instance.hypervisor
3065         elif field == "hvparams":
3066           val = i_hv
3067         elif (field.startswith(HVPREFIX) and
3068               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3069           val = i_hv.get(field[len(HVPREFIX):], None)
3070         elif field == "beparams":
3071           val = i_be
3072         elif (field.startswith(BEPREFIX) and
3073               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3074           val = i_be.get(field[len(BEPREFIX):], None)
3075         elif st_match and st_match.groups():
3076           # matches a variable list
3077           st_groups = st_match.groups()
3078           if st_groups and st_groups[0] == "disk":
3079             if st_groups[1] == "count":
3080               val = len(instance.disks)
3081             elif st_groups[1] == "sizes":
3082               val = [disk.size for disk in instance.disks]
3083             elif st_groups[1] == "size":
3084               try:
3085                 val = instance.FindDisk(st_groups[2]).size
3086               except errors.OpPrereqError:
3087                 val = None
3088             else:
3089               assert False, "Unhandled disk parameter"
3090           elif st_groups[0] == "nic":
3091             if st_groups[1] == "count":
3092               val = len(instance.nics)
3093             elif st_groups[1] == "macs":
3094               val = [nic.mac for nic in instance.nics]
3095             elif st_groups[1] == "ips":
3096               val = [nic.ip for nic in instance.nics]
3097             elif st_groups[1] == "bridges":
3098               val = [nic.bridge for nic in instance.nics]
3099             else:
3100               # index-based item
3101               nic_idx = int(st_groups[2])
3102               if nic_idx >= len(instance.nics):
3103                 val = None
3104               else:
3105                 if st_groups[1] == "mac":
3106                   val = instance.nics[nic_idx].mac
3107                 elif st_groups[1] == "ip":
3108                   val = instance.nics[nic_idx].ip
3109                 elif st_groups[1] == "bridge":
3110                   val = instance.nics[nic_idx].bridge
3111                 else:
3112                   assert False, "Unhandled NIC parameter"
3113           else:
3114             assert False, "Unhandled variable parameter"
3115         else:
3116           raise errors.ParameterError(field)
3117         iout.append(val)
3118       output.append(iout)
3119
3120     return output
3121
3122
3123 class LUFailoverInstance(LogicalUnit):
3124   """Failover an instance.
3125
3126   """
3127   HPATH = "instance-failover"
3128   HTYPE = constants.HTYPE_INSTANCE
3129   _OP_REQP = ["instance_name", "ignore_consistency"]
3130   REQ_BGL = False
3131
3132   def ExpandNames(self):
3133     self._ExpandAndLockInstance()
3134     self.needed_locks[locking.LEVEL_NODE] = []
3135     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3136
3137   def DeclareLocks(self, level):
3138     if level == locking.LEVEL_NODE:
3139       self._LockInstancesNodes()
3140
3141   def BuildHooksEnv(self):
3142     """Build hooks env.
3143
3144     This runs on master, primary and secondary nodes of the instance.
3145
3146     """
3147     env = {
3148       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3149       }
3150     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3151     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3152     return env, nl, nl
3153
3154   def CheckPrereq(self):
3155     """Check prerequisites.
3156
3157     This checks that the instance is in the cluster.
3158
3159     """
3160     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3161     assert self.instance is not None, \
3162       "Cannot retrieve locked instance %s" % self.op.instance_name
3163
3164     bep = self.cfg.GetClusterInfo().FillBE(instance)
3165     if instance.disk_template not in constants.DTS_NET_MIRROR:
3166       raise errors.OpPrereqError("Instance's disk layout is not"
3167                                  " network mirrored, cannot failover.")
3168
3169     secondary_nodes = instance.secondary_nodes
3170     if not secondary_nodes:
3171       raise errors.ProgrammerError("no secondary node but using "
3172                                    "a mirrored disk template")
3173
3174     target_node = secondary_nodes[0]
3175     # check memory requirements on the secondary node
3176     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3177                          instance.name, bep[constants.BE_MEMORY],
3178                          instance.hypervisor)
3179
3180     # check bridge existance
3181     brlist = [nic.bridge for nic in instance.nics]
3182     result = self.rpc.call_bridges_exist(target_node, brlist)
3183     result.Raise()
3184     if not result.data:
3185       raise errors.OpPrereqError("One or more target bridges %s does not"
3186                                  " exist on destination node '%s'" %
3187                                  (brlist, target_node))
3188
3189   def Exec(self, feedback_fn):
3190     """Failover an instance.
3191
3192     The failover is done by shutting it down on its present node and
3193     starting it on the secondary.
3194
3195     """
3196     instance = self.instance
3197
3198     source_node = instance.primary_node
3199     target_node = instance.secondary_nodes[0]
3200
3201     feedback_fn("* checking disk consistency between source and target")
3202     for dev in instance.disks:
3203       # for drbd, these are drbd over lvm
3204       if not _CheckDiskConsistency(self, dev, target_node, False):
3205         if instance.status == "up" and not self.op.ignore_consistency:
3206           raise errors.OpExecError("Disk %s is degraded on target node,"
3207                                    " aborting failover." % dev.iv_name)
3208
3209     feedback_fn("* shutting down instance on source node")
3210     logging.info("Shutting down instance %s on node %s",
3211                  instance.name, source_node)
3212
3213     result = self.rpc.call_instance_shutdown(source_node, instance)
3214     if result.failed or not result.data:
3215       if self.op.ignore_consistency:
3216         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3217                              " Proceeding"
3218                              " anyway. Please make sure node %s is down",
3219                              instance.name, source_node, source_node)
3220       else:
3221         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3222                                  (instance.name, source_node))
3223
3224     feedback_fn("* deactivating the instance's disks on source node")
3225     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3226       raise errors.OpExecError("Can't shut down the instance's disks.")
3227
3228     instance.primary_node = target_node
3229     # distribute new instance config to the other nodes
3230     self.cfg.Update(instance)
3231
3232     # Only start the instance if it's marked as up
3233     if instance.status == "up":
3234       feedback_fn("* activating the instance's disks on target node")
3235       logging.info("Starting instance %s on node %s",
3236                    instance.name, target_node)
3237
3238       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3239                                                ignore_secondaries=True)
3240       if not disks_ok:
3241         _ShutdownInstanceDisks(self, instance)
3242         raise errors.OpExecError("Can't activate the instance's disks")
3243
3244       feedback_fn("* starting the instance on the target node")
3245       result = self.rpc.call_instance_start(target_node, instance, None)
3246       if result.failed or not result.data:
3247         _ShutdownInstanceDisks(self, instance)
3248         raise errors.OpExecError("Could not start instance %s on node %s." %
3249                                  (instance.name, target_node))
3250
3251
3252 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3253   """Create a tree of block devices on the primary node.
3254
3255   This always creates all devices.
3256
3257   """
3258   if device.children:
3259     for child in device.children:
3260       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3261         return False
3262
3263   lu.cfg.SetDiskID(device, node)
3264   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3265                                        instance.name, True, info)
3266   if new_id.failed or not new_id.data:
3267     return False
3268   if device.physical_id is None:
3269     device.physical_id = new_id
3270   return True
3271
3272
3273 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3274   """Create a tree of block devices on a secondary node.
3275
3276   If this device type has to be created on secondaries, create it and
3277   all its children.
3278
3279   If not, just recurse to children keeping the same 'force' value.
3280
3281   """
3282   if device.CreateOnSecondary():
3283     force = True
3284   if device.children:
3285     for child in device.children:
3286       if not _CreateBlockDevOnSecondary(lu, node, instance,
3287                                         child, force, info):
3288         return False
3289
3290   if not force:
3291     return True
3292   lu.cfg.SetDiskID(device, node)
3293   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3294                                        instance.name, False, info)
3295   if new_id.failed or not new_id.data:
3296     return False
3297   if device.physical_id is None:
3298     device.physical_id = new_id
3299   return True
3300
3301
3302 def _GenerateUniqueNames(lu, exts):
3303   """Generate a suitable LV name.
3304
3305   This will generate a logical volume name for the given instance.
3306
3307   """
3308   results = []
3309   for val in exts:
3310     new_id = lu.cfg.GenerateUniqueID()
3311     results.append("%s%s" % (new_id, val))
3312   return results
3313
3314
3315 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3316                          p_minor, s_minor):
3317   """Generate a drbd8 device complete with its children.
3318
3319   """
3320   port = lu.cfg.AllocatePort()
3321   vgname = lu.cfg.GetVGName()
3322   shared_secret = lu.cfg.GenerateDRBDSecret()
3323   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3324                           logical_id=(vgname, names[0]))
3325   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3326                           logical_id=(vgname, names[1]))
3327   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3328                           logical_id=(primary, secondary, port,
3329                                       p_minor, s_minor,
3330                                       shared_secret),
3331                           children=[dev_data, dev_meta],
3332                           iv_name=iv_name)
3333   return drbd_dev
3334
3335
3336 def _GenerateDiskTemplate(lu, template_name,
3337                           instance_name, primary_node,
3338                           secondary_nodes, disk_info,
3339                           file_storage_dir, file_driver,
3340                           base_index):
3341   """Generate the entire disk layout for a given template type.
3342
3343   """
3344   #TODO: compute space requirements
3345
3346   vgname = lu.cfg.GetVGName()
3347   disk_count = len(disk_info)
3348   disks = []
3349   if template_name == constants.DT_DISKLESS:
3350     pass
3351   elif template_name == constants.DT_PLAIN:
3352     if len(secondary_nodes) != 0:
3353       raise errors.ProgrammerError("Wrong template configuration")
3354
3355     names = _GenerateUniqueNames(lu, [".disk%d" % i
3356                                       for i in range(disk_count)])
3357     for idx, disk in enumerate(disk_info):
3358       disk_index = idx + base_index
3359       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3360                               logical_id=(vgname, names[idx]),
3361                               iv_name="disk/%d" % disk_index)
3362       disks.append(disk_dev)
3363   elif template_name == constants.DT_DRBD8:
3364     if len(secondary_nodes) != 1:
3365       raise errors.ProgrammerError("Wrong template configuration")
3366     remote_node = secondary_nodes[0]
3367     minors = lu.cfg.AllocateDRBDMinor(
3368       [primary_node, remote_node] * len(disk_info), instance_name)
3369
3370     names = _GenerateUniqueNames(lu,
3371                                  [".disk%d_%s" % (i, s)
3372                                   for i in range(disk_count)
3373                                   for s in ("data", "meta")
3374                                   ])
3375     for idx, disk in enumerate(disk_info):
3376       disk_index = idx + base_index
3377       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3378                                       disk["size"], names[idx*2:idx*2+2],
3379                                       "disk/%d" % disk_index,
3380                                       minors[idx*2], minors[idx*2+1])
3381       disks.append(disk_dev)
3382   elif template_name == constants.DT_FILE:
3383     if len(secondary_nodes) != 0:
3384       raise errors.ProgrammerError("Wrong template configuration")
3385
3386     for idx, disk in enumerate(disk_info):
3387       disk_index = idx + base_index
3388       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3389                               iv_name="disk/%d" % disk_index,
3390                               logical_id=(file_driver,
3391                                           "%s/disk%d" % (file_storage_dir,
3392                                                          idx)))
3393       disks.append(disk_dev)
3394   else:
3395     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3396   return disks
3397
3398
3399 def _GetInstanceInfoText(instance):
3400   """Compute that text that should be added to the disk's metadata.
3401
3402   """
3403   return "originstname+%s" % instance.name
3404
3405
3406 def _CreateDisks(lu, instance):
3407   """Create all disks for an instance.
3408
3409   This abstracts away some work from AddInstance.
3410
3411   @type lu: L{LogicalUnit}
3412   @param lu: the logical unit on whose behalf we execute
3413   @type instance: L{objects.Instance}
3414   @param instance: the instance whose disks we should create
3415   @rtype: boolean
3416   @return: the success of the creation
3417
3418   """
3419   info = _GetInstanceInfoText(instance)
3420
3421   if instance.disk_template == constants.DT_FILE:
3422     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3423     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3424                                                  file_storage_dir)
3425
3426     if result.failed or not result.data:
3427       logging.error("Could not connect to node '%s'", instance.primary_node)
3428       return False
3429
3430     if not result.data[0]:
3431       logging.error("Failed to create directory '%s'", file_storage_dir)
3432       return False
3433
3434   # Note: this needs to be kept in sync with adding of disks in
3435   # LUSetInstanceParams
3436   for device in instance.disks:
3437     logging.info("Creating volume %s for instance %s",
3438                  device.iv_name, instance.name)
3439     #HARDCODE
3440     for secondary_node in instance.secondary_nodes:
3441       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3442                                         device, False, info):
3443         logging.error("Failed to create volume %s (%s) on secondary node %s!",
3444                       device.iv_name, device, secondary_node)
3445         return False
3446     #HARDCODE
3447     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3448                                     instance, device, info):
3449       logging.error("Failed to create volume %s on primary!", device.iv_name)
3450       return False
3451
3452   return True
3453
3454
3455 def _RemoveDisks(lu, instance):
3456   """Remove all disks for an instance.
3457
3458   This abstracts away some work from `AddInstance()` and
3459   `RemoveInstance()`. Note that in case some of the devices couldn't
3460   be removed, the removal will continue with the other ones (compare
3461   with `_CreateDisks()`).
3462
3463   @type lu: L{LogicalUnit}
3464   @param lu: the logical unit on whose behalf we execute
3465   @type instance: L{objects.Instance}
3466   @param instance: the instance whose disks we should remove
3467   @rtype: boolean
3468   @return: the success of the removal
3469
3470   """
3471   logging.info("Removing block devices for instance %s", instance.name)
3472
3473   result = True
3474   for device in instance.disks:
3475     for node, disk in device.ComputeNodeTree(instance.primary_node):
3476       lu.cfg.SetDiskID(disk, node)
3477       result = lu.rpc.call_blockdev_remove(node, disk)
3478       if result.failed or not result.data:
3479         lu.proc.LogWarning("Could not remove block device %s on node %s,"
3480                            " continuing anyway", device.iv_name, node)
3481         result = False
3482
3483   if instance.disk_template == constants.DT_FILE:
3484     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3485     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3486                                                  file_storage_dir)
3487     if result.failed or not result.data:
3488       logging.error("Could not remove directory '%s'", file_storage_dir)
3489       result = False
3490
3491   return result
3492
3493
3494 def _ComputeDiskSize(disk_template, disks):
3495   """Compute disk size requirements in the volume group
3496
3497   """
3498   # Required free disk space as a function of disk and swap space
3499   req_size_dict = {
3500     constants.DT_DISKLESS: None,
3501     constants.DT_PLAIN: sum(d["size"] for d in disks),
3502     # 128 MB are added for drbd metadata for each disk
3503     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3504     constants.DT_FILE: None,
3505   }
3506
3507   if disk_template not in req_size_dict:
3508     raise errors.ProgrammerError("Disk template '%s' size requirement"
3509                                  " is unknown" %  disk_template)
3510
3511   return req_size_dict[disk_template]
3512
3513
3514 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3515   """Hypervisor parameter validation.
3516
3517   This function abstract the hypervisor parameter validation to be
3518   used in both instance create and instance modify.
3519
3520   @type lu: L{LogicalUnit}
3521   @param lu: the logical unit for which we check
3522   @type nodenames: list
3523   @param nodenames: the list of nodes on which we should check
3524   @type hvname: string
3525   @param hvname: the name of the hypervisor we should use
3526   @type hvparams: dict
3527   @param hvparams: the parameters which we need to check
3528   @raise errors.OpPrereqError: if the parameters are not valid
3529
3530   """
3531   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3532                                                   hvname,
3533                                                   hvparams)
3534   for node in nodenames:
3535     info = hvinfo[node]
3536     info.Raise()
3537     if not info.data or not isinstance(info.data, (tuple, list)):
3538       raise errors.OpPrereqError("Cannot get current information"
3539                                  " from node '%s' (%s)" % (node, info.data))
3540     if not info.data[0]:
3541       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3542                                  " %s" % info.data[1])
3543
3544
3545 class LUCreateInstance(LogicalUnit):
3546   """Create an instance.
3547
3548   """
3549   HPATH = "instance-add"
3550   HTYPE = constants.HTYPE_INSTANCE
3551   _OP_REQP = ["instance_name", "disks", "disk_template",
3552               "mode", "start",
3553               "wait_for_sync", "ip_check", "nics",
3554               "hvparams", "beparams"]
3555   REQ_BGL = False
3556
3557   def _ExpandNode(self, node):
3558     """Expands and checks one node name.
3559
3560     """
3561     node_full = self.cfg.ExpandNodeName(node)
3562     if node_full is None:
3563       raise errors.OpPrereqError("Unknown node %s" % node)
3564     return node_full
3565
3566   def ExpandNames(self):
3567     """ExpandNames for CreateInstance.
3568
3569     Figure out the right locks for instance creation.
3570
3571     """
3572     self.needed_locks = {}
3573
3574     # set optional parameters to none if they don't exist
3575     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3576       if not hasattr(self.op, attr):
3577         setattr(self.op, attr, None)
3578
3579     # cheap checks, mostly valid constants given
3580
3581     # verify creation mode
3582     if self.op.mode not in (constants.INSTANCE_CREATE,
3583                             constants.INSTANCE_IMPORT):
3584       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3585                                  self.op.mode)
3586
3587     # disk template and mirror node verification
3588     if self.op.disk_template not in constants.DISK_TEMPLATES:
3589       raise errors.OpPrereqError("Invalid disk template name")
3590
3591     if self.op.hypervisor is None:
3592       self.op.hypervisor = self.cfg.GetHypervisorType()
3593
3594     cluster = self.cfg.GetClusterInfo()
3595     enabled_hvs = cluster.enabled_hypervisors
3596     if self.op.hypervisor not in enabled_hvs:
3597       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3598                                  " cluster (%s)" % (self.op.hypervisor,
3599                                   ",".join(enabled_hvs)))
3600
3601     # check hypervisor parameter syntax (locally)
3602
3603     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3604                                   self.op.hvparams)
3605     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3606     hv_type.CheckParameterSyntax(filled_hvp)
3607
3608     # fill and remember the beparams dict
3609     utils.CheckBEParams(self.op.beparams)
3610     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3611                                     self.op.beparams)
3612
3613     #### instance parameters check
3614
3615     # instance name verification
3616     hostname1 = utils.HostInfo(self.op.instance_name)
3617     self.op.instance_name = instance_name = hostname1.name
3618
3619     # this is just a preventive check, but someone might still add this
3620     # instance in the meantime, and creation will fail at lock-add time
3621     if instance_name in self.cfg.GetInstanceList():
3622       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3623                                  instance_name)
3624
3625     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3626
3627     # NIC buildup
3628     self.nics = []
3629     for nic in self.op.nics:
3630       # ip validity checks
3631       ip = nic.get("ip", None)
3632       if ip is None or ip.lower() == "none":
3633         nic_ip = None
3634       elif ip.lower() == constants.VALUE_AUTO:
3635         nic_ip = hostname1.ip
3636       else:
3637         if not utils.IsValidIP(ip):
3638           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3639                                      " like a valid IP" % ip)
3640         nic_ip = ip
3641
3642       # MAC address verification
3643       mac = nic.get("mac", constants.VALUE_AUTO)
3644       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3645         if not utils.IsValidMac(mac.lower()):
3646           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3647                                      mac)
3648       # bridge verification
3649       bridge = nic.get("bridge", self.cfg.GetDefBridge())
3650       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3651
3652     # disk checks/pre-build
3653     self.disks = []
3654     for disk in self.op.disks:
3655       mode = disk.get("mode", constants.DISK_RDWR)
3656       if mode not in constants.DISK_ACCESS_SET:
3657         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3658                                    mode)
3659       size = disk.get("size", None)
3660       if size is None:
3661         raise errors.OpPrereqError("Missing disk size")
3662       try:
3663         size = int(size)
3664       except ValueError:
3665         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3666       self.disks.append({"size": size, "mode": mode})
3667
3668     # used in CheckPrereq for ip ping check
3669     self.check_ip = hostname1.ip
3670
3671     # file storage checks
3672     if (self.op.file_driver and
3673         not self.op.file_driver in constants.FILE_DRIVER):
3674       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3675                                  self.op.file_driver)
3676
3677     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3678       raise errors.OpPrereqError("File storage directory path not absolute")
3679
3680     ### Node/iallocator related checks
3681     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3682       raise errors.OpPrereqError("One and only one of iallocator and primary"
3683                                  " node must be given")
3684
3685     if self.op.iallocator:
3686       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3687     else:
3688       self.op.pnode = self._ExpandNode(self.op.pnode)
3689       nodelist = [self.op.pnode]
3690       if self.op.snode is not None:
3691         self.op.snode = self._ExpandNode(self.op.snode)
3692         nodelist.append(self.op.snode)
3693       self.needed_locks[locking.LEVEL_NODE] = nodelist
3694
3695     # in case of import lock the source node too
3696     if self.op.mode == constants.INSTANCE_IMPORT:
3697       src_node = getattr(self.op, "src_node", None)
3698       src_path = getattr(self.op, "src_path", None)
3699
3700       if src_path is None:
3701         self.op.src_path = src_path = self.op.instance_name
3702
3703       if src_node is None:
3704         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3705         self.op.src_node = None
3706         if os.path.isabs(src_path):
3707           raise errors.OpPrereqError("Importing an instance from an absolute"
3708                                      " path requires a source node option.")
3709       else:
3710         self.op.src_node = src_node = self._ExpandNode(src_node)
3711         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3712           self.needed_locks[locking.LEVEL_NODE].append(src_node)
3713         if not os.path.isabs(src_path):
3714           self.op.src_path = src_path = \
3715             os.path.join(constants.EXPORT_DIR, src_path)
3716
3717     else: # INSTANCE_CREATE
3718       if getattr(self.op, "os_type", None) is None:
3719         raise errors.OpPrereqError("No guest OS specified")
3720
3721   def _RunAllocator(self):
3722     """Run the allocator based on input opcode.
3723
3724     """
3725     nics = [n.ToDict() for n in self.nics]
3726     ial = IAllocator(self,
3727                      mode=constants.IALLOCATOR_MODE_ALLOC,
3728                      name=self.op.instance_name,
3729                      disk_template=self.op.disk_template,
3730                      tags=[],
3731                      os=self.op.os_type,
3732                      vcpus=self.be_full[constants.BE_VCPUS],
3733                      mem_size=self.be_full[constants.BE_MEMORY],
3734                      disks=self.disks,
3735                      nics=nics,
3736                      hypervisor=self.op.hypervisor,
3737                      )
3738
3739     ial.Run(self.op.iallocator)
3740
3741     if not ial.success:
3742       raise errors.OpPrereqError("Can't compute nodes using"
3743                                  " iallocator '%s': %s" % (self.op.iallocator,
3744                                                            ial.info))
3745     if len(ial.nodes) != ial.required_nodes:
3746       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3747                                  " of nodes (%s), required %s" %
3748                                  (self.op.iallocator, len(ial.nodes),
3749                                   ial.required_nodes))
3750     self.op.pnode = ial.nodes[0]
3751     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3752                  self.op.instance_name, self.op.iallocator,
3753                  ", ".join(ial.nodes))
3754     if ial.required_nodes == 2:
3755       self.op.snode = ial.nodes[1]
3756
3757   def BuildHooksEnv(self):
3758     """Build hooks env.
3759
3760     This runs on master, primary and secondary nodes of the instance.
3761
3762     """
3763     env = {
3764       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3765       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3766       "INSTANCE_ADD_MODE": self.op.mode,
3767       }
3768     if self.op.mode == constants.INSTANCE_IMPORT:
3769       env["INSTANCE_SRC_NODE"] = self.op.src_node
3770       env["INSTANCE_SRC_PATH"] = self.op.src_path
3771       env["INSTANCE_SRC_IMAGES"] = self.src_images
3772
3773     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3774       primary_node=self.op.pnode,
3775       secondary_nodes=self.secondaries,
3776       status=self.instance_status,
3777       os_type=self.op.os_type,
3778       memory=self.be_full[constants.BE_MEMORY],
3779       vcpus=self.be_full[constants.BE_VCPUS],
3780       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3781     ))
3782
3783     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3784           self.secondaries)
3785     return env, nl, nl
3786
3787
3788   def CheckPrereq(self):
3789     """Check prerequisites.
3790
3791     """
3792     if (not self.cfg.GetVGName() and
3793         self.op.disk_template not in constants.DTS_NOT_LVM):
3794       raise errors.OpPrereqError("Cluster does not support lvm-based"
3795                                  " instances")
3796
3797
3798     if self.op.mode == constants.INSTANCE_IMPORT:
3799       src_node = self.op.src_node
3800       src_path = self.op.src_path
3801
3802       if src_node is None:
3803         exp_list = self.rpc.call_export_list(
3804           self.acquired_locks[locking.LEVEL_NODE])
3805         found = False
3806         for node in exp_list:
3807           if not exp_list[node].failed and src_path in exp_list[node].data:
3808             found = True
3809             self.op.src_node = src_node = node
3810             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3811                                                        src_path)
3812             break
3813         if not found:
3814           raise errors.OpPrereqError("No export found for relative path %s" %
3815                                       src_path)
3816
3817       result = self.rpc.call_export_info(src_node, src_path)
3818       result.Raise()
3819       if not result.data:
3820         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3821
3822       export_info = result.data
3823       if not export_info.has_section(constants.INISECT_EXP):
3824         raise errors.ProgrammerError("Corrupted export config")
3825
3826       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3827       if (int(ei_version) != constants.EXPORT_VERSION):
3828         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3829                                    (ei_version, constants.EXPORT_VERSION))
3830
3831       # Check that the new instance doesn't have less disks than the export
3832       instance_disks = len(self.disks)
3833       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3834       if instance_disks < export_disks:
3835         raise errors.OpPrereqError("Not enough disks to import."
3836                                    " (instance: %d, export: %d)" %
3837                                    (instance_disks, export_disks))
3838
3839       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3840       disk_images = []
3841       for idx in range(export_disks):
3842         option = 'disk%d_dump' % idx
3843         if export_info.has_option(constants.INISECT_INS, option):
3844           # FIXME: are the old os-es, disk sizes, etc. useful?
3845           export_name = export_info.get(constants.INISECT_INS, option)
3846           image = os.path.join(src_path, export_name)
3847           disk_images.append(image)
3848         else:
3849           disk_images.append(False)
3850
3851       self.src_images = disk_images
3852
3853       old_name = export_info.get(constants.INISECT_INS, 'name')
3854       # FIXME: int() here could throw a ValueError on broken exports
3855       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3856       if self.op.instance_name == old_name:
3857         for idx, nic in enumerate(self.nics):
3858           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3859             nic_mac_ini = 'nic%d_mac' % idx
3860             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3861
3862     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3863     if self.op.start and not self.op.ip_check:
3864       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3865                                  " adding an instance in start mode")
3866
3867     if self.op.ip_check:
3868       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3869         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3870                                    (self.check_ip, self.op.instance_name))
3871
3872     #### allocator run
3873
3874     if self.op.iallocator is not None:
3875       self._RunAllocator()
3876
3877     #### node related checks
3878
3879     # check primary node
3880     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3881     assert self.pnode is not None, \
3882       "Cannot retrieve locked node %s" % self.op.pnode
3883     self.secondaries = []
3884
3885     # mirror node verification
3886     if self.op.disk_template in constants.DTS_NET_MIRROR:
3887       if self.op.snode is None:
3888         raise errors.OpPrereqError("The networked disk templates need"
3889                                    " a mirror node")
3890       if self.op.snode == pnode.name:
3891         raise errors.OpPrereqError("The secondary node cannot be"
3892                                    " the primary node.")
3893       self.secondaries.append(self.op.snode)
3894
3895     nodenames = [pnode.name] + self.secondaries
3896
3897     req_size = _ComputeDiskSize(self.op.disk_template,
3898                                 self.disks)
3899
3900     # Check lv size requirements
3901     if req_size is not None:
3902       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3903                                          self.op.hypervisor)
3904       for node in nodenames:
3905         info = nodeinfo[node]
3906         info.Raise()
3907         info = info.data
3908         if not info:
3909           raise errors.OpPrereqError("Cannot get current information"
3910                                      " from node '%s'" % node)
3911         vg_free = info.get('vg_free', None)
3912         if not isinstance(vg_free, int):
3913           raise errors.OpPrereqError("Can't compute free disk space on"
3914                                      " node %s" % node)
3915         if req_size > info['vg_free']:
3916           raise errors.OpPrereqError("Not enough disk space on target node %s."
3917                                      " %d MB available, %d MB required" %
3918                                      (node, info['vg_free'], req_size))
3919
3920     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3921
3922     # os verification
3923     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3924     result.Raise()
3925     if not isinstance(result.data, objects.OS):
3926       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3927                                  " primary node"  % self.op.os_type)
3928
3929     # bridge check on primary node
3930     bridges = [n.bridge for n in self.nics]
3931     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3932     result.Raise()
3933     if not result.data:
3934       raise errors.OpPrereqError("One of the target bridges '%s' does not"
3935                                  " exist on destination node '%s'" %
3936                                  (",".join(bridges), pnode.name))
3937
3938     # memory check on primary node
3939     if self.op.start:
3940       _CheckNodeFreeMemory(self, self.pnode.name,
3941                            "creating instance %s" % self.op.instance_name,
3942                            self.be_full[constants.BE_MEMORY],
3943                            self.op.hypervisor)
3944
3945     if self.op.start:
3946       self.instance_status = 'up'
3947     else:
3948       self.instance_status = 'down'
3949
3950   def Exec(self, feedback_fn):
3951     """Create and add the instance to the cluster.
3952
3953     """
3954     instance = self.op.instance_name
3955     pnode_name = self.pnode.name
3956
3957     for nic in self.nics:
3958       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3959         nic.mac = self.cfg.GenerateMAC()
3960
3961     ht_kind = self.op.hypervisor
3962     if ht_kind in constants.HTS_REQ_PORT:
3963       network_port = self.cfg.AllocatePort()
3964     else:
3965       network_port = None
3966
3967     ##if self.op.vnc_bind_address is None:
3968     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3969
3970     # this is needed because os.path.join does not accept None arguments
3971     if self.op.file_storage_dir is None:
3972       string_file_storage_dir = ""
3973     else:
3974       string_file_storage_dir = self.op.file_storage_dir
3975
3976     # build the full file storage dir path
3977     file_storage_dir = os.path.normpath(os.path.join(
3978                                         self.cfg.GetFileStorageDir(),
3979                                         string_file_storage_dir, instance))
3980
3981
3982     disks = _GenerateDiskTemplate(self,
3983                                   self.op.disk_template,
3984                                   instance, pnode_name,
3985                                   self.secondaries,
3986                                   self.disks,
3987                                   file_storage_dir,
3988                                   self.op.file_driver,
3989                                   0)
3990
3991     iobj = objects.Instance(name=instance, os=self.op.os_type,
3992                             primary_node=pnode_name,
3993                             nics=self.nics, disks=disks,
3994                             disk_template=self.op.disk_template,
3995                             status=self.instance_status,
3996                             network_port=network_port,
3997                             beparams=self.op.beparams,
3998                             hvparams=self.op.hvparams,
3999                             hypervisor=self.op.hypervisor,
4000                             )
4001
4002     feedback_fn("* creating instance disks...")
4003     if not _CreateDisks(self, iobj):
4004       _RemoveDisks(self, iobj)
4005       self.cfg.ReleaseDRBDMinors(instance)
4006       raise errors.OpExecError("Device creation failed, reverting...")
4007
4008     feedback_fn("adding instance %s to cluster config" % instance)
4009
4010     self.cfg.AddInstance(iobj)
4011     # Declare that we don't want to remove the instance lock anymore, as we've
4012     # added the instance to the config
4013     del self.remove_locks[locking.LEVEL_INSTANCE]
4014     # Remove the temp. assignements for the instance's drbds
4015     self.cfg.ReleaseDRBDMinors(instance)
4016     # Unlock all the nodes
4017     if self.op.mode == constants.INSTANCE_IMPORT:
4018       nodes_keep = [self.op.src_node]
4019       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4020                        if node != self.op.src_node]
4021       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4022       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4023     else:
4024       self.context.glm.release(locking.LEVEL_NODE)
4025       del self.acquired_locks[locking.LEVEL_NODE]
4026
4027     if self.op.wait_for_sync:
4028       disk_abort = not _WaitForSync(self, iobj)
4029     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4030       # make sure the disks are not degraded (still sync-ing is ok)
4031       time.sleep(15)
4032       feedback_fn("* checking mirrors status")
4033       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4034     else:
4035       disk_abort = False
4036
4037     if disk_abort:
4038       _RemoveDisks(self, iobj)
4039       self.cfg.RemoveInstance(iobj.name)
4040       # Make sure the instance lock gets removed
4041       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4042       raise errors.OpExecError("There are some degraded disks for"
4043                                " this instance")
4044
4045     feedback_fn("creating os for instance %s on node %s" %
4046                 (instance, pnode_name))
4047
4048     if iobj.disk_template != constants.DT_DISKLESS:
4049       if self.op.mode == constants.INSTANCE_CREATE:
4050         feedback_fn("* running the instance OS create scripts...")
4051         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4052         result.Raise()
4053         if not result.data:
4054           raise errors.OpExecError("Could not add os for instance %s"
4055                                    " on node %s" %
4056                                    (instance, pnode_name))
4057
4058       elif self.op.mode == constants.INSTANCE_IMPORT:
4059         feedback_fn("* running the instance OS import scripts...")
4060         src_node = self.op.src_node
4061         src_images = self.src_images
4062         cluster_name = self.cfg.GetClusterName()
4063         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4064                                                          src_node, src_images,
4065                                                          cluster_name)
4066         import_result.Raise()
4067         for idx, result in enumerate(import_result.data):
4068           if not result:
4069             self.LogWarning("Could not import the image %s for instance"
4070                             " %s, disk %d, on node %s" %
4071                             (src_images[idx], instance, idx, pnode_name))
4072       else:
4073         # also checked in the prereq part
4074         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4075                                      % self.op.mode)
4076
4077     if self.op.start:
4078       logging.info("Starting instance %s on node %s", instance, pnode_name)
4079       feedback_fn("* starting instance...")
4080       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4081       result.Raise()
4082       if not result.data:
4083         raise errors.OpExecError("Could not start instance")
4084
4085
4086 class LUConnectConsole(NoHooksLU):
4087   """Connect to an instance's console.
4088
4089   This is somewhat special in that it returns the command line that
4090   you need to run on the master node in order to connect to the
4091   console.
4092
4093   """
4094   _OP_REQP = ["instance_name"]
4095   REQ_BGL = False
4096
4097   def ExpandNames(self):
4098     self._ExpandAndLockInstance()
4099
4100   def CheckPrereq(self):
4101     """Check prerequisites.
4102
4103     This checks that the instance is in the cluster.
4104
4105     """
4106     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4107     assert self.instance is not None, \
4108       "Cannot retrieve locked instance %s" % self.op.instance_name
4109
4110   def Exec(self, feedback_fn):
4111     """Connect to the console of an instance
4112
4113     """
4114     instance = self.instance
4115     node = instance.primary_node
4116
4117     node_insts = self.rpc.call_instance_list([node],
4118                                              [instance.hypervisor])[node]
4119     node_insts.Raise()
4120
4121     if instance.name not in node_insts.data:
4122       raise errors.OpExecError("Instance %s is not running." % instance.name)
4123
4124     logging.debug("Connecting to console of %s on %s", instance.name, node)
4125
4126     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4127     console_cmd = hyper.GetShellCommandForConsole(instance)
4128
4129     # build ssh cmdline
4130     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4131
4132
4133 class LUReplaceDisks(LogicalUnit):
4134   """Replace the disks of an instance.
4135
4136   """
4137   HPATH = "mirrors-replace"
4138   HTYPE = constants.HTYPE_INSTANCE
4139   _OP_REQP = ["instance_name", "mode", "disks"]
4140   REQ_BGL = False
4141
4142   def ExpandNames(self):
4143     self._ExpandAndLockInstance()
4144
4145     if not hasattr(self.op, "remote_node"):
4146       self.op.remote_node = None
4147
4148     ia_name = getattr(self.op, "iallocator", None)
4149     if ia_name is not None:
4150       if self.op.remote_node is not None:
4151         raise errors.OpPrereqError("Give either the iallocator or the new"
4152                                    " secondary, not both")
4153       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4154     elif self.op.remote_node is not None:
4155       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4156       if remote_node is None:
4157         raise errors.OpPrereqError("Node '%s' not known" %
4158                                    self.op.remote_node)
4159       self.op.remote_node = remote_node
4160       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4161       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4162     else:
4163       self.needed_locks[locking.LEVEL_NODE] = []
4164       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4165
4166   def DeclareLocks(self, level):
4167     # If we're not already locking all nodes in the set we have to declare the
4168     # instance's primary/secondary nodes.
4169     if (level == locking.LEVEL_NODE and
4170         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4171       self._LockInstancesNodes()
4172
4173   def _RunAllocator(self):
4174     """Compute a new secondary node using an IAllocator.
4175
4176     """
4177     ial = IAllocator(self,
4178                      mode=constants.IALLOCATOR_MODE_RELOC,
4179                      name=self.op.instance_name,
4180                      relocate_from=[self.sec_node])
4181
4182     ial.Run(self.op.iallocator)
4183
4184     if not ial.success:
4185       raise errors.OpPrereqError("Can't compute nodes using"
4186                                  " iallocator '%s': %s" % (self.op.iallocator,
4187                                                            ial.info))
4188     if len(ial.nodes) != ial.required_nodes:
4189       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4190                                  " of nodes (%s), required %s" %
4191                                  (len(ial.nodes), ial.required_nodes))
4192     self.op.remote_node = ial.nodes[0]
4193     self.LogInfo("Selected new secondary for the instance: %s",
4194                  self.op.remote_node)
4195
4196   def BuildHooksEnv(self):
4197     """Build hooks env.
4198
4199     This runs on the master, the primary and all the secondaries.
4200
4201     """
4202     env = {
4203       "MODE": self.op.mode,
4204       "NEW_SECONDARY": self.op.remote_node,
4205       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4206       }
4207     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4208     nl = [
4209       self.cfg.GetMasterNode(),
4210       self.instance.primary_node,
4211       ]
4212     if self.op.remote_node is not None:
4213       nl.append(self.op.remote_node)
4214     return env, nl, nl
4215
4216   def CheckPrereq(self):
4217     """Check prerequisites.
4218
4219     This checks that the instance is in the cluster.
4220
4221     """
4222     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4223     assert instance is not None, \
4224       "Cannot retrieve locked instance %s" % self.op.instance_name
4225     self.instance = instance
4226
4227     if instance.disk_template not in constants.DTS_NET_MIRROR:
4228       raise errors.OpPrereqError("Instance's disk layout is not"
4229                                  " network mirrored.")
4230
4231     if len(instance.secondary_nodes) != 1:
4232       raise errors.OpPrereqError("The instance has a strange layout,"
4233                                  " expected one secondary but found %d" %
4234                                  len(instance.secondary_nodes))
4235
4236     self.sec_node = instance.secondary_nodes[0]
4237
4238     ia_name = getattr(self.op, "iallocator", None)
4239     if ia_name is not None:
4240       self._RunAllocator()
4241
4242     remote_node = self.op.remote_node
4243     if remote_node is not None:
4244       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4245       assert self.remote_node_info is not None, \
4246         "Cannot retrieve locked node %s" % remote_node
4247     else:
4248       self.remote_node_info = None
4249     if remote_node == instance.primary_node:
4250       raise errors.OpPrereqError("The specified node is the primary node of"
4251                                  " the instance.")
4252     elif remote_node == self.sec_node:
4253       if self.op.mode == constants.REPLACE_DISK_SEC:
4254         # this is for DRBD8, where we can't execute the same mode of
4255         # replacement as for drbd7 (no different port allocated)
4256         raise errors.OpPrereqError("Same secondary given, cannot execute"
4257                                    " replacement")
4258     if instance.disk_template == constants.DT_DRBD8:
4259       if (self.op.mode == constants.REPLACE_DISK_ALL and
4260           remote_node is not None):
4261         # switch to replace secondary mode
4262         self.op.mode = constants.REPLACE_DISK_SEC
4263
4264       if self.op.mode == constants.REPLACE_DISK_ALL:
4265         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4266                                    " secondary disk replacement, not"
4267                                    " both at once")
4268       elif self.op.mode == constants.REPLACE_DISK_PRI:
4269         if remote_node is not None:
4270           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4271                                      " the secondary while doing a primary"
4272                                      " node disk replacement")
4273         self.tgt_node = instance.primary_node
4274         self.oth_node = instance.secondary_nodes[0]
4275       elif self.op.mode == constants.REPLACE_DISK_SEC:
4276         self.new_node = remote_node # this can be None, in which case
4277                                     # we don't change the secondary
4278         self.tgt_node = instance.secondary_nodes[0]
4279         self.oth_node = instance.primary_node
4280       else:
4281         raise errors.ProgrammerError("Unhandled disk replace mode")
4282
4283     if not self.op.disks:
4284       self.op.disks = range(len(instance.disks))
4285
4286     for disk_idx in self.op.disks:
4287       instance.FindDisk(disk_idx)
4288
4289   def _ExecD8DiskOnly(self, feedback_fn):
4290     """Replace a disk on the primary or secondary for dbrd8.
4291
4292     The algorithm for replace is quite complicated:
4293
4294       1. for each disk to be replaced:
4295
4296         1. create new LVs on the target node with unique names
4297         1. detach old LVs from the drbd device
4298         1. rename old LVs to name_replaced.<time_t>
4299         1. rename new LVs to old LVs
4300         1. attach the new LVs (with the old names now) to the drbd device
4301
4302       1. wait for sync across all devices
4303
4304       1. for each modified disk:
4305
4306         1. remove old LVs (which have the name name_replaces.<time_t>)
4307
4308     Failures are not very well handled.
4309
4310     """
4311     steps_total = 6
4312     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4313     instance = self.instance
4314     iv_names = {}
4315     vgname = self.cfg.GetVGName()
4316     # start of work
4317     cfg = self.cfg
4318     tgt_node = self.tgt_node
4319     oth_node = self.oth_node
4320
4321     # Step: check device activation
4322     self.proc.LogStep(1, steps_total, "check device existence")
4323     info("checking volume groups")
4324     my_vg = cfg.GetVGName()
4325     results = self.rpc.call_vg_list([oth_node, tgt_node])
4326     if not results:
4327       raise errors.OpExecError("Can't list volume groups on the nodes")
4328     for node in oth_node, tgt_node:
4329       res = results[node]
4330       if res.failed or not res.data or my_vg not in res.data:
4331         raise errors.OpExecError("Volume group '%s' not found on %s" %
4332                                  (my_vg, node))
4333     for idx, dev in enumerate(instance.disks):
4334       if idx not in self.op.disks:
4335         continue
4336       for node in tgt_node, oth_node:
4337         info("checking disk/%d on %s" % (idx, node))
4338         cfg.SetDiskID(dev, node)
4339         if not self.rpc.call_blockdev_find(node, dev):
4340           raise errors.OpExecError("Can't find disk/%d on node %s" %
4341                                    (idx, node))
4342
4343     # Step: check other node consistency
4344     self.proc.LogStep(2, steps_total, "check peer consistency")
4345     for idx, dev in enumerate(instance.disks):
4346       if idx not in self.op.disks:
4347         continue
4348       info("checking disk/%d consistency on %s" % (idx, oth_node))
4349       if not _CheckDiskConsistency(self, dev, oth_node,
4350                                    oth_node==instance.primary_node):
4351         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4352                                  " to replace disks on this node (%s)" %
4353                                  (oth_node, tgt_node))
4354
4355     # Step: create new storage
4356     self.proc.LogStep(3, steps_total, "allocate new storage")
4357     for idx, dev in enumerate(instance.disks):
4358       if idx not in self.op.disks:
4359         continue
4360       size = dev.size
4361       cfg.SetDiskID(dev, tgt_node)
4362       lv_names = [".disk%d_%s" % (idx, suf)
4363                   for suf in ["data", "meta"]]
4364       names = _GenerateUniqueNames(self, lv_names)
4365       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4366                              logical_id=(vgname, names[0]))
4367       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4368                              logical_id=(vgname, names[1]))
4369       new_lvs = [lv_data, lv_meta]
4370       old_lvs = dev.children
4371       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4372       info("creating new local storage on %s for %s" %
4373            (tgt_node, dev.iv_name))
4374       # since we *always* want to create this LV, we use the
4375       # _Create...OnPrimary (which forces the creation), even if we
4376       # are talking about the secondary node
4377       for new_lv in new_lvs:
4378         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4379                                         _GetInstanceInfoText(instance)):
4380           raise errors.OpExecError("Failed to create new LV named '%s' on"
4381                                    " node '%s'" %
4382                                    (new_lv.logical_id[1], tgt_node))
4383
4384     # Step: for each lv, detach+rename*2+attach
4385     self.proc.LogStep(4, steps_total, "change drbd configuration")
4386     for dev, old_lvs, new_lvs in iv_names.itervalues():
4387       info("detaching %s drbd from local storage" % dev.iv_name)
4388       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4389       result.Raise()
4390       if not result.data:
4391         raise errors.OpExecError("Can't detach drbd from local storage on node"
4392                                  " %s for device %s" % (tgt_node, dev.iv_name))
4393       #dev.children = []
4394       #cfg.Update(instance)
4395
4396       # ok, we created the new LVs, so now we know we have the needed
4397       # storage; as such, we proceed on the target node to rename
4398       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4399       # using the assumption that logical_id == physical_id (which in
4400       # turn is the unique_id on that node)
4401
4402       # FIXME(iustin): use a better name for the replaced LVs
4403       temp_suffix = int(time.time())
4404       ren_fn = lambda d, suff: (d.physical_id[0],
4405                                 d.physical_id[1] + "_replaced-%s" % suff)
4406       # build the rename list based on what LVs exist on the node
4407       rlist = []
4408       for to_ren in old_lvs:
4409         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4410         if not find_res.failed and find_res.data is not None: # device exists
4411           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4412
4413       info("renaming the old LVs on the target node")
4414       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4415       result.Raise()
4416       if not result.data:
4417         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4418       # now we rename the new LVs to the old LVs
4419       info("renaming the new LVs on the target node")
4420       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4421       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4422       result.Raise()
4423       if not result.data:
4424         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4425
4426       for old, new in zip(old_lvs, new_lvs):
4427         new.logical_id = old.logical_id
4428         cfg.SetDiskID(new, tgt_node)
4429
4430       for disk in old_lvs:
4431         disk.logical_id = ren_fn(disk, temp_suffix)
4432         cfg.SetDiskID(disk, tgt_node)
4433
4434       # now that the new lvs have the old name, we can add them to the device
4435       info("adding new mirror component on %s" % tgt_node)
4436       result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4437       if result.failed or not result.data:
4438         for new_lv in new_lvs:
4439           result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4440           if result.failed or not result.data:
4441             warning("Can't rollback device %s", hint="manually cleanup unused"
4442                     " logical volumes")
4443         raise errors.OpExecError("Can't add local storage to drbd")
4444
4445       dev.children = new_lvs
4446       cfg.Update(instance)
4447
4448     # Step: wait for sync
4449
4450     # this can fail as the old devices are degraded and _WaitForSync
4451     # does a combined result over all disks, so we don't check its
4452     # return value
4453     self.proc.LogStep(5, steps_total, "sync devices")
4454     _WaitForSync(self, instance, unlock=True)
4455
4456     # so check manually all the devices
4457     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4458       cfg.SetDiskID(dev, instance.primary_node)
4459       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4460       if result.failed or result.data[5]:
4461         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4462
4463     # Step: remove old storage
4464     self.proc.LogStep(6, steps_total, "removing old storage")
4465     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4466       info("remove logical volumes for %s" % name)
4467       for lv in old_lvs:
4468         cfg.SetDiskID(lv, tgt_node)
4469         result = self.rpc.call_blockdev_remove(tgt_node, lv)
4470         if result.failed or not result.data:
4471           warning("Can't remove old LV", hint="manually remove unused LVs")
4472           continue
4473
4474   def _ExecD8Secondary(self, feedback_fn):
4475     """Replace the secondary node for drbd8.
4476
4477     The algorithm for replace is quite complicated:
4478       - for all disks of the instance:
4479         - create new LVs on the new node with same names
4480         - shutdown the drbd device on the old secondary
4481         - disconnect the drbd network on the primary
4482         - create the drbd device on the new secondary
4483         - network attach the drbd on the primary, using an artifice:
4484           the drbd code for Attach() will connect to the network if it
4485           finds a device which is connected to the good local disks but
4486           not network enabled
4487       - wait for sync across all devices
4488       - remove all disks from the old secondary
4489
4490     Failures are not very well handled.
4491
4492     """
4493     steps_total = 6
4494     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4495     instance = self.instance
4496     iv_names = {}
4497     vgname = self.cfg.GetVGName()
4498     # start of work
4499     cfg = self.cfg
4500     old_node = self.tgt_node
4501     new_node = self.new_node
4502     pri_node = instance.primary_node
4503
4504     # Step: check device activation
4505     self.proc.LogStep(1, steps_total, "check device existence")
4506     info("checking volume groups")
4507     my_vg = cfg.GetVGName()
4508     results = self.rpc.call_vg_list([pri_node, new_node])
4509     for node in pri_node, new_node:
4510       res = results[node]
4511       if res.failed or not res.data or my_vg not in res.data:
4512         raise errors.OpExecError("Volume group '%s' not found on %s" %
4513                                  (my_vg, node))
4514     for idx, dev in enumerate(instance.disks):
4515       if idx not in self.op.disks:
4516         continue
4517       info("checking disk/%d on %s" % (idx, pri_node))
4518       cfg.SetDiskID(dev, pri_node)
4519       result = self.rpc.call_blockdev_find(pri_node, dev)
4520       result.Raise()
4521       if not result.data:
4522         raise errors.OpExecError("Can't find disk/%d on node %s" %
4523                                  (idx, pri_node))
4524
4525     # Step: check other node consistency
4526     self.proc.LogStep(2, steps_total, "check peer consistency")
4527     for idx, dev in enumerate(instance.disks):
4528       if idx not in self.op.disks:
4529         continue
4530       info("checking disk/%d consistency on %s" % (idx, pri_node))
4531       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4532         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4533                                  " unsafe to replace the secondary" %
4534                                  pri_node)
4535
4536     # Step: create new storage
4537     self.proc.LogStep(3, steps_total, "allocate new storage")
4538     for idx, dev in enumerate(instance.disks):
4539       size = dev.size
4540       info("adding new local storage on %s for disk/%d" %
4541            (new_node, idx))
4542       # since we *always* want to create this LV, we use the
4543       # _Create...OnPrimary (which forces the creation), even if we
4544       # are talking about the secondary node
4545       for new_lv in dev.children:
4546         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4547                                         _GetInstanceInfoText(instance)):
4548           raise errors.OpExecError("Failed to create new LV named '%s' on"
4549                                    " node '%s'" %
4550                                    (new_lv.logical_id[1], new_node))
4551
4552     # Step 4: dbrd minors and drbd setups changes
4553     # after this, we must manually remove the drbd minors on both the
4554     # error and the success paths
4555     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4556                                    instance.name)
4557     logging.debug("Allocated minors %s" % (minors,))
4558     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4559     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4560       size = dev.size
4561       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4562       # create new devices on new_node
4563       if pri_node == dev.logical_id[0]:
4564         new_logical_id = (pri_node, new_node,
4565                           dev.logical_id[2], dev.logical_id[3], new_minor,
4566                           dev.logical_id[5])
4567       else:
4568         new_logical_id = (new_node, pri_node,
4569                           dev.logical_id[2], new_minor, dev.logical_id[4],
4570                           dev.logical_id[5])
4571       iv_names[idx] = (dev, dev.children, new_logical_id)
4572       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4573                     new_logical_id)
4574       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4575                               logical_id=new_logical_id,
4576                               children=dev.children)
4577       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4578                                         new_drbd, False,
4579                                         _GetInstanceInfoText(instance)):
4580         self.cfg.ReleaseDRBDMinors(instance.name)
4581         raise errors.OpExecError("Failed to create new DRBD on"
4582                                  " node '%s'" % new_node)
4583
4584     for idx, dev in enumerate(instance.disks):
4585       # we have new devices, shutdown the drbd on the old secondary
4586       info("shutting down drbd for disk/%d on old node" % idx)
4587       cfg.SetDiskID(dev, old_node)
4588       result = self.rpc.call_blockdev_shutdown(old_node, dev)
4589       if result.failed or not result.data:
4590         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4591                 hint="Please cleanup this device manually as soon as possible")
4592
4593     info("detaching primary drbds from the network (=> standalone)")
4594     done = 0
4595     for idx, dev in enumerate(instance.disks):
4596       cfg.SetDiskID(dev, pri_node)
4597       # set the network part of the physical (unique in bdev terms) id
4598       # to None, meaning detach from network
4599       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4600       # and 'find' the device, which will 'fix' it to match the
4601       # standalone state
4602       result = self.rpc.call_blockdev_find(pri_node, dev)
4603       if not result.failed and result.data:
4604         done += 1
4605       else:
4606         warning("Failed to detach drbd disk/%d from network, unusual case" %
4607                 idx)
4608
4609     if not done:
4610       # no detaches succeeded (very unlikely)
4611       self.cfg.ReleaseDRBDMinors(instance.name)
4612       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4613
4614     # if we managed to detach at least one, we update all the disks of
4615     # the instance to point to the new secondary
4616     info("updating instance configuration")
4617     for dev, _, new_logical_id in iv_names.itervalues():
4618       dev.logical_id = new_logical_id
4619       cfg.SetDiskID(dev, pri_node)
4620     cfg.Update(instance)
4621     # we can remove now the temp minors as now the new values are
4622     # written to the config file (and therefore stable)
4623     self.cfg.ReleaseDRBDMinors(instance.name)
4624
4625     # and now perform the drbd attach
4626     info("attaching primary drbds to new secondary (standalone => connected)")
4627     failures = []
4628     for idx, dev in enumerate(instance.disks):
4629       info("attaching primary drbd for disk/%d to new secondary node" % idx)
4630       # since the attach is smart, it's enough to 'find' the device,
4631       # it will automatically activate the network, if the physical_id
4632       # is correct
4633       cfg.SetDiskID(dev, pri_node)
4634       logging.debug("Disk to attach: %s", dev)
4635       result = self.rpc.call_blockdev_find(pri_node, dev)
4636       if result.failed or not result.data:
4637         warning("can't attach drbd disk/%d to new secondary!" % idx,
4638                 "please do a gnt-instance info to see the status of disks")
4639
4640     # this can fail as the old devices are degraded and _WaitForSync
4641     # does a combined result over all disks, so we don't check its
4642     # return value
4643     self.proc.LogStep(5, steps_total, "sync devices")
4644     _WaitForSync(self, instance, unlock=True)
4645
4646     # so check manually all the devices
4647     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4648       cfg.SetDiskID(dev, pri_node)
4649       result = self.rpc.call_blockdev_find(pri_node, dev)
4650       result.Raise()
4651       if result.data[5]:
4652         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4653
4654     self.proc.LogStep(6, steps_total, "removing old storage")
4655     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4656       info("remove logical volumes for disk/%d" % idx)
4657       for lv in old_lvs:
4658         cfg.SetDiskID(lv, old_node)
4659         result = self.rpc.call_blockdev_remove(old_node, lv)
4660         if result.failed or not result.data:
4661           warning("Can't remove LV on old secondary",
4662                   hint="Cleanup stale volumes by hand")
4663
4664   def Exec(self, feedback_fn):
4665     """Execute disk replacement.
4666
4667     This dispatches the disk replacement to the appropriate handler.
4668
4669     """
4670     instance = self.instance
4671
4672     # Activate the instance disks if we're replacing them on a down instance
4673     if instance.status == "down":
4674       _StartInstanceDisks(self, instance, True)
4675
4676     if instance.disk_template == constants.DT_DRBD8:
4677       if self.op.remote_node is None:
4678         fn = self._ExecD8DiskOnly
4679       else:
4680         fn = self._ExecD8Secondary
4681     else:
4682       raise errors.ProgrammerError("Unhandled disk replacement case")
4683
4684     ret = fn(feedback_fn)
4685
4686     # Deactivate the instance disks if we're replacing them on a down instance
4687     if instance.status == "down":
4688       _SafeShutdownInstanceDisks(self, instance)
4689
4690     return ret
4691
4692
4693 class LUGrowDisk(LogicalUnit):
4694   """Grow a disk of an instance.
4695
4696   """
4697   HPATH = "disk-grow"
4698   HTYPE = constants.HTYPE_INSTANCE
4699   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4700   REQ_BGL = False
4701
4702   def ExpandNames(self):
4703     self._ExpandAndLockInstance()
4704     self.needed_locks[locking.LEVEL_NODE] = []
4705     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4706
4707   def DeclareLocks(self, level):
4708     if level == locking.LEVEL_NODE:
4709       self._LockInstancesNodes()
4710
4711   def BuildHooksEnv(self):
4712     """Build hooks env.
4713
4714     This runs on the master, the primary and all the secondaries.
4715
4716     """
4717     env = {
4718       "DISK": self.op.disk,
4719       "AMOUNT": self.op.amount,
4720       }
4721     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4722     nl = [
4723       self.cfg.GetMasterNode(),
4724       self.instance.primary_node,
4725       ]
4726     return env, nl, nl
4727
4728   def CheckPrereq(self):
4729     """Check prerequisites.
4730
4731     This checks that the instance is in the cluster.
4732
4733     """
4734     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4735     assert instance is not None, \
4736       "Cannot retrieve locked instance %s" % self.op.instance_name
4737
4738     self.instance = instance
4739
4740     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4741       raise errors.OpPrereqError("Instance's disk layout does not support"
4742                                  " growing.")
4743
4744     self.disk = instance.FindDisk(self.op.disk)
4745
4746     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4747     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4748                                        instance.hypervisor)
4749     for node in nodenames:
4750       info = nodeinfo[node]
4751       if info.failed or not info.data:
4752         raise errors.OpPrereqError("Cannot get current information"
4753                                    " from node '%s'" % node)
4754       vg_free = info.data.get('vg_free', None)
4755       if not isinstance(vg_free, int):
4756         raise errors.OpPrereqError("Can't compute free disk space on"
4757                                    " node %s" % node)
4758       if self.op.amount > vg_free:
4759         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4760                                    " %d MiB available, %d MiB required" %
4761                                    (node, vg_free, self.op.amount))
4762
4763   def Exec(self, feedback_fn):
4764     """Execute disk grow.
4765
4766     """
4767     instance = self.instance
4768     disk = self.disk
4769     for node in (instance.secondary_nodes + (instance.primary_node,)):
4770       self.cfg.SetDiskID(disk, node)
4771       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4772       result.Raise()
4773       if (not result.data or not isinstance(result.data, (list, tuple)) or
4774           len(result.data) != 2):
4775         raise errors.OpExecError("Grow request failed to node %s" % node)
4776       elif not result.data[0]:
4777         raise errors.OpExecError("Grow request failed to node %s: %s" %
4778                                  (node, result.data[1]))
4779     disk.RecordGrow(self.op.amount)
4780     self.cfg.Update(instance)
4781     if self.op.wait_for_sync:
4782       disk_abort = not _WaitForSync(self, instance)
4783       if disk_abort:
4784         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4785                              " status.\nPlease check the instance.")
4786
4787
4788 class LUQueryInstanceData(NoHooksLU):
4789   """Query runtime instance data.
4790
4791   """
4792   _OP_REQP = ["instances", "static"]
4793   REQ_BGL = False
4794
4795   def ExpandNames(self):
4796     self.needed_locks = {}
4797     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4798
4799     if not isinstance(self.op.instances, list):
4800       raise errors.OpPrereqError("Invalid argument type 'instances'")
4801
4802     if self.op.instances:
4803       self.wanted_names = []
4804       for name in self.op.instances:
4805         full_name = self.cfg.ExpandInstanceName(name)
4806         if full_name is None:
4807           raise errors.OpPrereqError("Instance '%s' not known" %
4808                                      self.op.instance_name)
4809         self.wanted_names.append(full_name)
4810       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4811     else:
4812       self.wanted_names = None
4813       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4814
4815     self.needed_locks[locking.LEVEL_NODE] = []
4816     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4817
4818   def DeclareLocks(self, level):
4819     if level == locking.LEVEL_NODE:
4820       self._LockInstancesNodes()
4821
4822   def CheckPrereq(self):
4823     """Check prerequisites.
4824
4825     This only checks the optional instance list against the existing names.
4826
4827     """
4828     if self.wanted_names is None:
4829       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4830
4831     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4832                              in self.wanted_names]
4833     return
4834
4835   def _ComputeDiskStatus(self, instance, snode, dev):
4836     """Compute block device status.
4837
4838     """
4839     static = self.op.static
4840     if not static:
4841       self.cfg.SetDiskID(dev, instance.primary_node)
4842       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4843       dev_pstatus.Raise()
4844       dev_pstatus = dev_pstatus.data
4845     else:
4846       dev_pstatus = None
4847
4848     if dev.dev_type in constants.LDS_DRBD:
4849       # we change the snode then (otherwise we use the one passed in)
4850       if dev.logical_id[0] == instance.primary_node:
4851         snode = dev.logical_id[1]
4852       else:
4853         snode = dev.logical_id[0]
4854
4855     if snode and not static:
4856       self.cfg.SetDiskID(dev, snode)
4857       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4858       dev_sstatus.Raise()
4859       dev_sstatus = dev_sstatus.data
4860     else:
4861       dev_sstatus = None
4862
4863     if dev.children:
4864       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4865                       for child in dev.children]
4866     else:
4867       dev_children = []
4868
4869     data = {
4870       "iv_name": dev.iv_name,
4871       "dev_type": dev.dev_type,
4872       "logical_id": dev.logical_id,
4873       "physical_id": dev.physical_id,
4874       "pstatus": dev_pstatus,
4875       "sstatus": dev_sstatus,
4876       "children": dev_children,
4877       "mode": dev.mode,
4878       }
4879
4880     return data
4881
4882   def Exec(self, feedback_fn):
4883     """Gather and return data"""
4884     result = {}
4885
4886     cluster = self.cfg.GetClusterInfo()
4887
4888     for instance in self.wanted_instances:
4889       if not self.op.static:
4890         remote_info = self.rpc.call_instance_info(instance.primary_node,
4891                                                   instance.name,
4892                                                   instance.hypervisor)
4893         remote_info.Raise()
4894         remote_info = remote_info.data
4895         if remote_info and "state" in remote_info:
4896           remote_state = "up"
4897         else:
4898           remote_state = "down"
4899       else:
4900         remote_state = None
4901       if instance.status == "down":
4902         config_state = "down"
4903       else:
4904         config_state = "up"
4905
4906       disks = [self._ComputeDiskStatus(instance, None, device)
4907                for device in instance.disks]
4908
4909       idict = {
4910         "name": instance.name,
4911         "config_state": config_state,
4912         "run_state": remote_state,
4913         "pnode": instance.primary_node,
4914         "snodes": instance.secondary_nodes,
4915         "os": instance.os,
4916         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4917         "disks": disks,
4918         "hypervisor": instance.hypervisor,
4919         "network_port": instance.network_port,
4920         "hv_instance": instance.hvparams,
4921         "hv_actual": cluster.FillHV(instance),
4922         "be_instance": instance.beparams,
4923         "be_actual": cluster.FillBE(instance),
4924         }
4925
4926       result[instance.name] = idict
4927
4928     return result
4929
4930
4931 class LUSetInstanceParams(LogicalUnit):
4932   """Modifies an instances's parameters.
4933
4934   """
4935   HPATH = "instance-modify"
4936   HTYPE = constants.HTYPE_INSTANCE
4937   _OP_REQP = ["instance_name"]
4938   REQ_BGL = False
4939
4940   def CheckArguments(self):
4941     if not hasattr(self.op, 'nics'):
4942       self.op.nics = []
4943     if not hasattr(self.op, 'disks'):
4944       self.op.disks = []
4945     if not hasattr(self.op, 'beparams'):
4946       self.op.beparams = {}
4947     if not hasattr(self.op, 'hvparams'):
4948       self.op.hvparams = {}
4949     self.op.force = getattr(self.op, "force", False)
4950     if not (self.op.nics or self.op.disks or
4951             self.op.hvparams or self.op.beparams):
4952       raise errors.OpPrereqError("No changes submitted")
4953
4954     utils.CheckBEParams(self.op.beparams)
4955
4956     # Disk validation
4957     disk_addremove = 0
4958     for disk_op, disk_dict in self.op.disks:
4959       if disk_op == constants.DDM_REMOVE:
4960         disk_addremove += 1
4961         continue
4962       elif disk_op == constants.DDM_ADD:
4963         disk_addremove += 1
4964       else:
4965         if not isinstance(disk_op, int):
4966           raise errors.OpPrereqError("Invalid disk index")
4967       if disk_op == constants.DDM_ADD:
4968         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
4969         if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
4970           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
4971         size = disk_dict.get('size', None)
4972         if size is None:
4973           raise errors.OpPrereqError("Required disk parameter size missing")
4974         try:
4975           size = int(size)
4976         except ValueError, err:
4977           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
4978                                      str(err))
4979         disk_dict['size'] = size
4980       else:
4981         # modification of disk
4982         if 'size' in disk_dict:
4983           raise errors.OpPrereqError("Disk size change not possible, use"
4984                                      " grow-disk")
4985
4986     if disk_addremove > 1:
4987       raise errors.OpPrereqError("Only one disk add or remove operation"
4988                                  " supported at a time")
4989
4990     # NIC validation
4991     nic_addremove = 0
4992     for nic_op, nic_dict in self.op.nics:
4993       if nic_op == constants.DDM_REMOVE:
4994         nic_addremove += 1
4995         continue
4996       elif nic_op == constants.DDM_ADD:
4997         nic_addremove += 1
4998       else:
4999         if not isinstance(nic_op, int):
5000           raise errors.OpPrereqError("Invalid nic index")
5001
5002       # nic_dict should be a dict
5003       nic_ip = nic_dict.get('ip', None)
5004       if nic_ip is not None:
5005         if nic_ip.lower() == "none":
5006           nic_dict['ip'] = None
5007         else:
5008           if not utils.IsValidIP(nic_ip):
5009             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5010       # we can only check None bridges and assign the default one
5011       nic_bridge = nic_dict.get('bridge', None)
5012       if nic_bridge is None:
5013         nic_dict['bridge'] = self.cfg.GetDefBridge()
5014       # but we can validate MACs
5015       nic_mac = nic_dict.get('mac', None)
5016       if nic_mac is not None:
5017         if self.cfg.IsMacInUse(nic_mac):
5018           raise errors.OpPrereqError("MAC address %s already in use"
5019                                      " in cluster" % nic_mac)
5020         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5021           if not utils.IsValidMac(nic_mac):
5022             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5023     if nic_addremove > 1:
5024       raise errors.OpPrereqError("Only one NIC add or remove operation"
5025                                  " supported at a time")
5026
5027   def ExpandNames(self):
5028     self._ExpandAndLockInstance()
5029     self.needed_locks[locking.LEVEL_NODE] = []
5030     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5031
5032   def DeclareLocks(self, level):
5033     if level == locking.LEVEL_NODE:
5034       self._LockInstancesNodes()
5035
5036   def BuildHooksEnv(self):
5037     """Build hooks env.
5038
5039     This runs on the master, primary and secondaries.
5040
5041     """
5042     args = dict()
5043     if constants.BE_MEMORY in self.be_new:
5044       args['memory'] = self.be_new[constants.BE_MEMORY]
5045     if constants.BE_VCPUS in self.be_new:
5046       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5047     # FIXME: readd disk/nic changes
5048     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5049     nl = [self.cfg.GetMasterNode(),
5050           self.instance.primary_node] + list(self.instance.secondary_nodes)
5051     return env, nl, nl
5052
5053   def CheckPrereq(self):
5054     """Check prerequisites.
5055
5056     This only checks the instance list against the existing names.
5057
5058     """
5059     force = self.force = self.op.force
5060
5061     # checking the new params on the primary/secondary nodes
5062
5063     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5064     assert self.instance is not None, \
5065       "Cannot retrieve locked instance %s" % self.op.instance_name
5066     pnode = self.instance.primary_node
5067     nodelist = [pnode]
5068     nodelist.extend(instance.secondary_nodes)
5069
5070     # hvparams processing
5071     if self.op.hvparams:
5072       i_hvdict = copy.deepcopy(instance.hvparams)
5073       for key, val in self.op.hvparams.iteritems():
5074         if val == constants.VALUE_DEFAULT:
5075           try:
5076             del i_hvdict[key]
5077           except KeyError:
5078             pass
5079         elif val == constants.VALUE_NONE:
5080           i_hvdict[key] = None
5081         else:
5082           i_hvdict[key] = val
5083       cluster = self.cfg.GetClusterInfo()
5084       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5085                                 i_hvdict)
5086       # local check
5087       hypervisor.GetHypervisor(
5088         instance.hypervisor).CheckParameterSyntax(hv_new)
5089       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5090       self.hv_new = hv_new # the new actual values
5091       self.hv_inst = i_hvdict # the new dict (without defaults)
5092     else:
5093       self.hv_new = self.hv_inst = {}
5094
5095     # beparams processing
5096     if self.op.beparams:
5097       i_bedict = copy.deepcopy(instance.beparams)
5098       for key, val in self.op.beparams.iteritems():
5099         if val == constants.VALUE_DEFAULT:
5100           try:
5101             del i_bedict[key]
5102           except KeyError:
5103             pass
5104         else:
5105           i_bedict[key] = val
5106       cluster = self.cfg.GetClusterInfo()
5107       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5108                                 i_bedict)
5109       self.be_new = be_new # the new actual values
5110       self.be_inst = i_bedict # the new dict (without defaults)
5111     else:
5112       self.be_new = self.be_inst = {}
5113
5114     self.warn = []
5115
5116     if constants.BE_MEMORY in self.op.beparams and not self.force:
5117       mem_check_list = [pnode]
5118       if be_new[constants.BE_AUTO_BALANCE]:
5119         # either we changed auto_balance to yes or it was from before
5120         mem_check_list.extend(instance.secondary_nodes)
5121       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5122                                                   instance.hypervisor)
5123       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5124                                          instance.hypervisor)
5125       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5126         # Assume the primary node is unreachable and go ahead
5127         self.warn.append("Can't get info from primary node %s" % pnode)
5128       else:
5129         if not instance_info.failed and instance_info.data:
5130           current_mem = instance_info.data['memory']
5131         else:
5132           # Assume instance not running
5133           # (there is a slight race condition here, but it's not very probable,
5134           # and we have no other way to check)
5135           current_mem = 0
5136         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5137                     nodeinfo[pnode].data['memory_free'])
5138         if miss_mem > 0:
5139           raise errors.OpPrereqError("This change will prevent the instance"
5140                                      " from starting, due to %d MB of memory"
5141                                      " missing on its primary node" % miss_mem)
5142
5143       if be_new[constants.BE_AUTO_BALANCE]:
5144         for node, nres in instance.secondary_nodes.iteritems():
5145           if nres.failed or not isinstance(nres.data, dict):
5146             self.warn.append("Can't get info from secondary node %s" % node)
5147           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5148             self.warn.append("Not enough memory to failover instance to"
5149                              " secondary node %s" % node)
5150
5151     # NIC processing
5152     for nic_op, nic_dict in self.op.nics:
5153       if nic_op == constants.DDM_REMOVE:
5154         if not instance.nics:
5155           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5156         continue
5157       if nic_op != constants.DDM_ADD:
5158         # an existing nic
5159         if nic_op < 0 or nic_op >= len(instance.nics):
5160           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5161                                      " are 0 to %d" %
5162                                      (nic_op, len(instance.nics)))
5163       nic_bridge = nic_dict.get('bridge', None)
5164       if nic_bridge is not None:
5165         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5166           msg = ("Bridge '%s' doesn't exist on one of"
5167                  " the instance nodes" % nic_bridge)
5168           if self.force:
5169             self.warn.append(msg)
5170           else:
5171             raise errors.OpPrereqError(msg)
5172
5173     # DISK processing
5174     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5175       raise errors.OpPrereqError("Disk operations not supported for"
5176                                  " diskless instances")
5177     for disk_op, disk_dict in self.op.disks:
5178       if disk_op == constants.DDM_REMOVE:
5179         if len(instance.disks) == 1:
5180           raise errors.OpPrereqError("Cannot remove the last disk of"
5181                                      " an instance")
5182         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5183         ins_l = ins_l[pnode]
5184         if not type(ins_l) is list:
5185           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5186         if instance.name in ins_l:
5187           raise errors.OpPrereqError("Instance is running, can't remove"
5188                                      " disks.")
5189
5190       if (disk_op == constants.DDM_ADD and
5191           len(instance.nics) >= constants.MAX_DISKS):
5192         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5193                                    " add more" % constants.MAX_DISKS)
5194       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5195         # an existing disk
5196         if disk_op < 0 or disk_op >= len(instance.disks):
5197           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5198                                      " are 0 to %d" %
5199                                      (disk_op, len(instance.disks)))
5200
5201     return
5202
5203   def Exec(self, feedback_fn):
5204     """Modifies an instance.
5205
5206     All parameters take effect only at the next restart of the instance.
5207
5208     """
5209     # Process here the warnings from CheckPrereq, as we don't have a
5210     # feedback_fn there.
5211     for warn in self.warn:
5212       feedback_fn("WARNING: %s" % warn)
5213
5214     result = []
5215     instance = self.instance
5216     # disk changes
5217     for disk_op, disk_dict in self.op.disks:
5218       if disk_op == constants.DDM_REMOVE:
5219         # remove the last disk
5220         device = instance.disks.pop()
5221         device_idx = len(instance.disks)
5222         for node, disk in device.ComputeNodeTree(instance.primary_node):
5223           self.cfg.SetDiskID(disk, node)
5224           result = self.rpc.call_blockdev_remove(node, disk)
5225           if result.failed or not result.data:
5226             self.proc.LogWarning("Could not remove disk/%d on node %s,"
5227                                  " continuing anyway", device_idx, node)
5228         result.append(("disk/%d" % device_idx, "remove"))
5229       elif disk_op == constants.DDM_ADD:
5230         # add a new disk
5231         if instance.disk_template == constants.DT_FILE:
5232           file_driver, file_path = instance.disks[0].logical_id
5233           file_path = os.path.dirname(file_path)
5234         else:
5235           file_driver = file_path = None
5236         disk_idx_base = len(instance.disks)
5237         new_disk = _GenerateDiskTemplate(self,
5238                                          instance.disk_template,
5239                                          instance, instance.primary_node,
5240                                          instance.secondary_nodes,
5241                                          [disk_dict],
5242                                          file_path,
5243                                          file_driver,
5244                                          disk_idx_base)[0]
5245         new_disk.mode = disk_dict['mode']
5246         instance.disks.append(new_disk)
5247         info = _GetInstanceInfoText(instance)
5248
5249         logging.info("Creating volume %s for instance %s",
5250                      new_disk.iv_name, instance.name)
5251         # Note: this needs to be kept in sync with _CreateDisks
5252         #HARDCODE
5253         for secondary_node in instance.secondary_nodes:
5254           if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5255                                             new_disk, False, info):
5256             self.LogWarning("Failed to create volume %s (%s) on"
5257                             " secondary node %s!",
5258                             new_disk.iv_name, new_disk, secondary_node)
5259         #HARDCODE
5260         if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5261                                         instance, new_disk, info):
5262           self.LogWarning("Failed to create volume %s on primary!",
5263                           new_disk.iv_name)
5264         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5265                        (new_disk.size, new_disk.mode)))
5266       else:
5267         # change a given disk
5268         instance.disks[disk_op].mode = disk_dict['mode']
5269         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5270     # NIC changes
5271     for nic_op, nic_dict in self.op.nics:
5272       if nic_op == constants.DDM_REMOVE:
5273         # remove the last nic
5274         del instance.nics[-1]
5275         result.append(("nic.%d" % len(instance.nics), "remove"))
5276       elif nic_op == constants.DDM_ADD:
5277         # add a new nic
5278         if 'mac' not in nic_dict:
5279           mac = constants.VALUE_GENERATE
5280         else:
5281           mac = nic_dict['mac']
5282         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5283           mac = self.cfg.GenerateMAC()
5284         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5285                               bridge=nic_dict.get('bridge', None))
5286         instance.nics.append(new_nic)
5287         result.append(("nic.%d" % (len(instance.nics) - 1),
5288                        "add:mac=%s,ip=%s,bridge=%s" %
5289                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5290       else:
5291         # change a given nic
5292         for key in 'mac', 'ip', 'bridge':
5293           if key in nic_dict:
5294             setattr(instance.nics[nic_op], key, nic_dict[key])
5295             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5296
5297     # hvparams changes
5298     if self.op.hvparams:
5299       instance.hvparams = self.hv_new
5300       for key, val in self.op.hvparams.iteritems():
5301         result.append(("hv/%s" % key, val))
5302
5303     # beparams changes
5304     if self.op.beparams:
5305       instance.beparams = self.be_inst
5306       for key, val in self.op.beparams.iteritems():
5307         result.append(("be/%s" % key, val))
5308
5309     self.cfg.Update(instance)
5310
5311     return result
5312
5313
5314 class LUQueryExports(NoHooksLU):
5315   """Query the exports list
5316
5317   """
5318   _OP_REQP = ['nodes']
5319   REQ_BGL = False
5320
5321   def ExpandNames(self):
5322     self.needed_locks = {}
5323     self.share_locks[locking.LEVEL_NODE] = 1
5324     if not self.op.nodes:
5325       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5326     else:
5327       self.needed_locks[locking.LEVEL_NODE] = \
5328         _GetWantedNodes(self, self.op.nodes)
5329
5330   def CheckPrereq(self):
5331     """Check prerequisites.
5332
5333     """
5334     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5335
5336   def Exec(self, feedback_fn):
5337     """Compute the list of all the exported system images.
5338
5339     @rtype: dict
5340     @return: a dictionary with the structure node->(export-list)
5341         where export-list is a list of the instances exported on
5342         that node.
5343
5344     """
5345     result = self.rpc.call_export_list(self.nodes)
5346     result.Raise()
5347     return result.data
5348
5349
5350 class LUExportInstance(LogicalUnit):
5351   """Export an instance to an image in the cluster.
5352
5353   """
5354   HPATH = "instance-export"
5355   HTYPE = constants.HTYPE_INSTANCE
5356   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5357   REQ_BGL = False
5358
5359   def ExpandNames(self):
5360     self._ExpandAndLockInstance()
5361     # FIXME: lock only instance primary and destination node
5362     #
5363     # Sad but true, for now we have do lock all nodes, as we don't know where
5364     # the previous export might be, and and in this LU we search for it and
5365     # remove it from its current node. In the future we could fix this by:
5366     #  - making a tasklet to search (share-lock all), then create the new one,
5367     #    then one to remove, after
5368     #  - removing the removal operation altoghether
5369     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5370
5371   def DeclareLocks(self, level):
5372     """Last minute lock declaration."""
5373     # All nodes are locked anyway, so nothing to do here.
5374
5375   def BuildHooksEnv(self):
5376     """Build hooks env.
5377
5378     This will run on the master, primary node and target node.
5379
5380     """
5381     env = {
5382       "EXPORT_NODE": self.op.target_node,
5383       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5384       }
5385     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5386     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5387           self.op.target_node]
5388     return env, nl, nl
5389
5390   def CheckPrereq(self):
5391     """Check prerequisites.
5392
5393     This checks that the instance and node names are valid.
5394
5395     """
5396     instance_name = self.op.instance_name
5397     self.instance = self.cfg.GetInstanceInfo(instance_name)
5398     assert self.instance is not None, \
5399           "Cannot retrieve locked instance %s" % self.op.instance_name
5400
5401     self.dst_node = self.cfg.GetNodeInfo(
5402       self.cfg.ExpandNodeName(self.op.target_node))
5403
5404     if self.dst_node is None:
5405       # This is wrong node name, not a non-locked node
5406       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5407
5408     # instance disk type verification
5409     for disk in self.instance.disks:
5410       if disk.dev_type == constants.LD_FILE:
5411         raise errors.OpPrereqError("Export not supported for instances with"
5412                                    " file-based disks")
5413
5414   def Exec(self, feedback_fn):
5415     """Export an instance to an image in the cluster.
5416
5417     """
5418     instance = self.instance
5419     dst_node = self.dst_node
5420     src_node = instance.primary_node
5421     if self.op.shutdown:
5422       # shutdown the instance, but not the disks
5423       result = self.rpc.call_instance_shutdown(src_node, instance)
5424       result.Raise()
5425       if not result.data:
5426         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5427                                  (instance.name, src_node))
5428
5429     vgname = self.cfg.GetVGName()
5430
5431     snap_disks = []
5432
5433     try:
5434       for disk in instance.disks:
5435         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5436         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5437         if new_dev_name.failed or not new_dev_name.data:
5438           self.LogWarning("Could not snapshot block device %s on node %s",
5439                           disk.logical_id[1], src_node)
5440           snap_disks.append(False)
5441         else:
5442           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5443                                  logical_id=(vgname, new_dev_name.data),
5444                                  physical_id=(vgname, new_dev_name.data),
5445                                  iv_name=disk.iv_name)
5446           snap_disks.append(new_dev)
5447
5448     finally:
5449       if self.op.shutdown and instance.status == "up":
5450         result = self.rpc.call_instance_start(src_node, instance, None)
5451         if result.failed or not result.data:
5452           _ShutdownInstanceDisks(self, instance)
5453           raise errors.OpExecError("Could not start instance")
5454
5455     # TODO: check for size
5456
5457     cluster_name = self.cfg.GetClusterName()
5458     for idx, dev in enumerate(snap_disks):
5459       if dev:
5460         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5461                                                instance, cluster_name, idx)
5462         if result.failed or not result.data:
5463           self.LogWarning("Could not export block device %s from node %s to"
5464                           " node %s", dev.logical_id[1], src_node,
5465                           dst_node.name)
5466         result = self.rpc.call_blockdev_remove(src_node, dev)
5467         if result.failed or not result.data:
5468           self.LogWarning("Could not remove snapshot block device %s from node"
5469                           " %s", dev.logical_id[1], src_node)
5470
5471     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5472     if result.failed or not result.data:
5473       self.LogWarning("Could not finalize export for instance %s on node %s",
5474                       instance.name, dst_node.name)
5475
5476     nodelist = self.cfg.GetNodeList()
5477     nodelist.remove(dst_node.name)
5478
5479     # on one-node clusters nodelist will be empty after the removal
5480     # if we proceed the backup would be removed because OpQueryExports
5481     # substitutes an empty list with the full cluster node list.
5482     if nodelist:
5483       exportlist = self.rpc.call_export_list(nodelist)
5484       for node in exportlist:
5485         if exportlist[node].failed:
5486           continue
5487         if instance.name in exportlist[node].data:
5488           if not self.rpc.call_export_remove(node, instance.name):
5489             self.LogWarning("Could not remove older export for instance %s"
5490                             " on node %s", instance.name, node)
5491
5492
5493 class LURemoveExport(NoHooksLU):
5494   """Remove exports related to the named instance.
5495
5496   """
5497   _OP_REQP = ["instance_name"]
5498   REQ_BGL = False
5499
5500   def ExpandNames(self):
5501     self.needed_locks = {}
5502     # We need all nodes to be locked in order for RemoveExport to work, but we
5503     # don't need to lock the instance itself, as nothing will happen to it (and
5504     # we can remove exports also for a removed instance)
5505     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5506
5507   def CheckPrereq(self):
5508     """Check prerequisites.
5509     """
5510     pass
5511
5512   def Exec(self, feedback_fn):
5513     """Remove any export.
5514
5515     """
5516     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5517     # If the instance was not found we'll try with the name that was passed in.
5518     # This will only work if it was an FQDN, though.
5519     fqdn_warn = False
5520     if not instance_name:
5521       fqdn_warn = True
5522       instance_name = self.op.instance_name
5523
5524     exportlist = self.rpc.call_export_list(self.acquired_locks[
5525       locking.LEVEL_NODE])
5526     found = False
5527     for node in exportlist:
5528       if exportlist[node].failed:
5529         self.LogWarning("Failed to query node %s, continuing" % node)
5530         continue
5531       if instance_name in exportlist[node].data:
5532         found = True
5533         result = self.rpc.call_export_remove(node, instance_name)
5534         if result.failed or not result.data:
5535           logging.error("Could not remove export for instance %s"
5536                         " on node %s", instance_name, node)
5537
5538     if fqdn_warn and not found:
5539       feedback_fn("Export not found. If trying to remove an export belonging"
5540                   " to a deleted instance please use its Fully Qualified"
5541                   " Domain Name.")
5542
5543
5544 class TagsLU(NoHooksLU):
5545   """Generic tags LU.
5546
5547   This is an abstract class which is the parent of all the other tags LUs.
5548
5549   """
5550
5551   def ExpandNames(self):
5552     self.needed_locks = {}
5553     if self.op.kind == constants.TAG_NODE:
5554       name = self.cfg.ExpandNodeName(self.op.name)
5555       if name is None:
5556         raise errors.OpPrereqError("Invalid node name (%s)" %
5557                                    (self.op.name,))
5558       self.op.name = name
5559       self.needed_locks[locking.LEVEL_NODE] = name
5560     elif self.op.kind == constants.TAG_INSTANCE:
5561       name = self.cfg.ExpandInstanceName(self.op.name)
5562       if name is None:
5563         raise errors.OpPrereqError("Invalid instance name (%s)" %
5564                                    (self.op.name,))
5565       self.op.name = name
5566       self.needed_locks[locking.LEVEL_INSTANCE] = name
5567
5568   def CheckPrereq(self):
5569     """Check prerequisites.
5570
5571     """
5572     if self.op.kind == constants.TAG_CLUSTER:
5573       self.target = self.cfg.GetClusterInfo()
5574     elif self.op.kind == constants.TAG_NODE:
5575       self.target = self.cfg.GetNodeInfo(self.op.name)
5576     elif self.op.kind == constants.TAG_INSTANCE:
5577       self.target = self.cfg.GetInstanceInfo(self.op.name)
5578     else:
5579       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5580                                  str(self.op.kind))
5581
5582
5583 class LUGetTags(TagsLU):
5584   """Returns the tags of a given object.
5585
5586   """
5587   _OP_REQP = ["kind", "name"]
5588   REQ_BGL = False
5589
5590   def Exec(self, feedback_fn):
5591     """Returns the tag list.
5592
5593     """
5594     return list(self.target.GetTags())
5595
5596
5597 class LUSearchTags(NoHooksLU):
5598   """Searches the tags for a given pattern.
5599
5600   """
5601   _OP_REQP = ["pattern"]
5602   REQ_BGL = False
5603
5604   def ExpandNames(self):
5605     self.needed_locks = {}
5606
5607   def CheckPrereq(self):
5608     """Check prerequisites.
5609
5610     This checks the pattern passed for validity by compiling it.
5611
5612     """
5613     try:
5614       self.re = re.compile(self.op.pattern)
5615     except re.error, err:
5616       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5617                                  (self.op.pattern, err))
5618
5619   def Exec(self, feedback_fn):
5620     """Returns the tag list.
5621
5622     """
5623     cfg = self.cfg
5624     tgts = [("/cluster", cfg.GetClusterInfo())]
5625     ilist = cfg.GetAllInstancesInfo().values()
5626     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5627     nlist = cfg.GetAllNodesInfo().values()
5628     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5629     results = []
5630     for path, target in tgts:
5631       for tag in target.GetTags():
5632         if self.re.search(tag):
5633           results.append((path, tag))
5634     return results
5635
5636
5637 class LUAddTags(TagsLU):
5638   """Sets a tag on a given object.
5639
5640   """
5641   _OP_REQP = ["kind", "name", "tags"]
5642   REQ_BGL = False
5643
5644   def CheckPrereq(self):
5645     """Check prerequisites.
5646
5647     This checks the type and length of the tag name and value.
5648
5649     """
5650     TagsLU.CheckPrereq(self)
5651     for tag in self.op.tags:
5652       objects.TaggableObject.ValidateTag(tag)
5653
5654   def Exec(self, feedback_fn):
5655     """Sets the tag.
5656
5657     """
5658     try:
5659       for tag in self.op.tags:
5660         self.target.AddTag(tag)
5661     except errors.TagError, err:
5662       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5663     try:
5664       self.cfg.Update(self.target)
5665     except errors.ConfigurationError:
5666       raise errors.OpRetryError("There has been a modification to the"
5667                                 " config file and the operation has been"
5668                                 " aborted. Please retry.")
5669
5670
5671 class LUDelTags(TagsLU):
5672   """Delete a list of tags from a given object.
5673
5674   """
5675   _OP_REQP = ["kind", "name", "tags"]
5676   REQ_BGL = False
5677
5678   def CheckPrereq(self):
5679     """Check prerequisites.
5680
5681     This checks that we have the given tag.
5682
5683     """
5684     TagsLU.CheckPrereq(self)
5685     for tag in self.op.tags:
5686       objects.TaggableObject.ValidateTag(tag)
5687     del_tags = frozenset(self.op.tags)
5688     cur_tags = self.target.GetTags()
5689     if not del_tags <= cur_tags:
5690       diff_tags = del_tags - cur_tags
5691       diff_names = ["'%s'" % tag for tag in diff_tags]
5692       diff_names.sort()
5693       raise errors.OpPrereqError("Tag(s) %s not found" %
5694                                  (",".join(diff_names)))
5695
5696   def Exec(self, feedback_fn):
5697     """Remove the tag from the object.
5698
5699     """
5700     for tag in self.op.tags:
5701       self.target.RemoveTag(tag)
5702     try:
5703       self.cfg.Update(self.target)
5704     except errors.ConfigurationError:
5705       raise errors.OpRetryError("There has been a modification to the"
5706                                 " config file and the operation has been"
5707                                 " aborted. Please retry.")
5708
5709
5710 class LUTestDelay(NoHooksLU):
5711   """Sleep for a specified amount of time.
5712
5713   This LU sleeps on the master and/or nodes for a specified amount of
5714   time.
5715
5716   """
5717   _OP_REQP = ["duration", "on_master", "on_nodes"]
5718   REQ_BGL = False
5719
5720   def ExpandNames(self):
5721     """Expand names and set required locks.
5722
5723     This expands the node list, if any.
5724
5725     """
5726     self.needed_locks = {}
5727     if self.op.on_nodes:
5728       # _GetWantedNodes can be used here, but is not always appropriate to use
5729       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5730       # more information.
5731       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5732       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5733
5734   def CheckPrereq(self):
5735     """Check prerequisites.
5736
5737     """
5738
5739   def Exec(self, feedback_fn):
5740     """Do the actual sleep.
5741
5742     """
5743     if self.op.on_master:
5744       if not utils.TestDelay(self.op.duration):
5745         raise errors.OpExecError("Error during master delay test")
5746     if self.op.on_nodes:
5747       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5748       if not result:
5749         raise errors.OpExecError("Complete failure from rpc call")
5750       for node, node_result in result.items():
5751         node_result.Raise()
5752         if not node_result.data:
5753           raise errors.OpExecError("Failure during rpc call to node %s,"
5754                                    " result: %s" % (node, node_result.data))
5755
5756
5757 class IAllocator(object):
5758   """IAllocator framework.
5759
5760   An IAllocator instance has three sets of attributes:
5761     - cfg that is needed to query the cluster
5762     - input data (all members of the _KEYS class attribute are required)
5763     - four buffer attributes (in|out_data|text), that represent the
5764       input (to the external script) in text and data structure format,
5765       and the output from it, again in two formats
5766     - the result variables from the script (success, info, nodes) for
5767       easy usage
5768
5769   """
5770   _ALLO_KEYS = [
5771     "mem_size", "disks", "disk_template",
5772     "os", "tags", "nics", "vcpus", "hypervisor",
5773     ]
5774   _RELO_KEYS = [
5775     "relocate_from",
5776     ]
5777
5778   def __init__(self, lu, mode, name, **kwargs):
5779     self.lu = lu
5780     # init buffer variables
5781     self.in_text = self.out_text = self.in_data = self.out_data = None
5782     # init all input fields so that pylint is happy
5783     self.mode = mode
5784     self.name = name
5785     self.mem_size = self.disks = self.disk_template = None
5786     self.os = self.tags = self.nics = self.vcpus = None
5787     self.relocate_from = None
5788     # computed fields
5789     self.required_nodes = None
5790     # init result fields
5791     self.success = self.info = self.nodes = None
5792     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5793       keyset = self._ALLO_KEYS
5794     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5795       keyset = self._RELO_KEYS
5796     else:
5797       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5798                                    " IAllocator" % self.mode)
5799     for key in kwargs:
5800       if key not in keyset:
5801         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5802                                      " IAllocator" % key)
5803       setattr(self, key, kwargs[key])
5804     for key in keyset:
5805       if key not in kwargs:
5806         raise errors.ProgrammerError("Missing input parameter '%s' to"
5807                                      " IAllocator" % key)
5808     self._BuildInputData()
5809
5810   def _ComputeClusterData(self):
5811     """Compute the generic allocator input data.
5812
5813     This is the data that is independent of the actual operation.
5814
5815     """
5816     cfg = self.lu.cfg
5817     cluster_info = cfg.GetClusterInfo()
5818     # cluster data
5819     data = {
5820       "version": 1,
5821       "cluster_name": cfg.GetClusterName(),
5822       "cluster_tags": list(cluster_info.GetTags()),
5823       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5824       # we don't have job IDs
5825       }
5826     iinfo = cfg.GetAllInstancesInfo().values()
5827     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5828
5829     # node data
5830     node_results = {}
5831     node_list = cfg.GetNodeList()
5832
5833     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5834       hypervisor = self.hypervisor
5835     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5836       hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5837
5838     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5839                                            hypervisor)
5840     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5841                        cluster_info.enabled_hypervisors)
5842     for nname in node_list:
5843       ninfo = cfg.GetNodeInfo(nname)
5844       node_data[nname].Raise()
5845       if not isinstance(node_data[nname].data, dict):
5846         raise errors.OpExecError("Can't get data for node %s" % nname)
5847       remote_info = node_data[nname].data
5848       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5849                    'vg_size', 'vg_free', 'cpu_total']:
5850         if attr not in remote_info:
5851           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5852                                    (nname, attr))
5853         try:
5854           remote_info[attr] = int(remote_info[attr])
5855         except ValueError, err:
5856           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5857                                    " %s" % (nname, attr, str(err)))
5858       # compute memory used by primary instances
5859       i_p_mem = i_p_up_mem = 0
5860       for iinfo, beinfo in i_list:
5861         if iinfo.primary_node == nname:
5862           i_p_mem += beinfo[constants.BE_MEMORY]
5863           if iinfo.name not in node_iinfo[nname]:
5864             i_used_mem = 0
5865           else:
5866             i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5867           i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5868           remote_info['memory_free'] -= max(0, i_mem_diff)
5869
5870           if iinfo.status == "up":
5871             i_p_up_mem += beinfo[constants.BE_MEMORY]
5872
5873       # compute memory used by instances
5874       pnr = {
5875         "tags": list(ninfo.GetTags()),
5876         "total_memory": remote_info['memory_total'],
5877         "reserved_memory": remote_info['memory_dom0'],
5878         "free_memory": remote_info['memory_free'],
5879         "i_pri_memory": i_p_mem,
5880         "i_pri_up_memory": i_p_up_mem,
5881         "total_disk": remote_info['vg_size'],
5882         "free_disk": remote_info['vg_free'],
5883         "primary_ip": ninfo.primary_ip,
5884         "secondary_ip": ninfo.secondary_ip,
5885         "total_cpus": remote_info['cpu_total'],
5886         }
5887       node_results[nname] = pnr
5888     data["nodes"] = node_results
5889
5890     # instance data
5891     instance_data = {}
5892     for iinfo, beinfo in i_list:
5893       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5894                   for n in iinfo.nics]
5895       pir = {
5896         "tags": list(iinfo.GetTags()),
5897         "should_run": iinfo.status == "up",
5898         "vcpus": beinfo[constants.BE_VCPUS],
5899         "memory": beinfo[constants.BE_MEMORY],
5900         "os": iinfo.os,
5901         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5902         "nics": nic_data,
5903         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5904         "disk_template": iinfo.disk_template,
5905         "hypervisor": iinfo.hypervisor,
5906         }
5907       instance_data[iinfo.name] = pir
5908
5909     data["instances"] = instance_data
5910
5911     self.in_data = data
5912
5913   def _AddNewInstance(self):
5914     """Add new instance data to allocator structure.
5915
5916     This in combination with _AllocatorGetClusterData will create the
5917     correct structure needed as input for the allocator.
5918
5919     The checks for the completeness of the opcode must have already been
5920     done.
5921
5922     """
5923     data = self.in_data
5924     if len(self.disks) != 2:
5925       raise errors.OpExecError("Only two-disk configurations supported")
5926
5927     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5928
5929     if self.disk_template in constants.DTS_NET_MIRROR:
5930       self.required_nodes = 2
5931     else:
5932       self.required_nodes = 1
5933     request = {
5934       "type": "allocate",
5935       "name": self.name,
5936       "disk_template": self.disk_template,
5937       "tags": self.tags,
5938       "os": self.os,
5939       "vcpus": self.vcpus,
5940       "memory": self.mem_size,
5941       "disks": self.disks,
5942       "disk_space_total": disk_space,
5943       "nics": self.nics,
5944       "required_nodes": self.required_nodes,
5945       }
5946     data["request"] = request
5947
5948   def _AddRelocateInstance(self):
5949     """Add relocate instance data to allocator structure.
5950
5951     This in combination with _IAllocatorGetClusterData will create the
5952     correct structure needed as input for the allocator.
5953
5954     The checks for the completeness of the opcode must have already been
5955     done.
5956
5957     """
5958     instance = self.lu.cfg.GetInstanceInfo(self.name)
5959     if instance is None:
5960       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5961                                    " IAllocator" % self.name)
5962
5963     if instance.disk_template not in constants.DTS_NET_MIRROR:
5964       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5965
5966     if len(instance.secondary_nodes) != 1:
5967       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5968
5969     self.required_nodes = 1
5970     disk_sizes = [{'size': disk.size} for disk in instance.disks]
5971     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5972
5973     request = {
5974       "type": "relocate",
5975       "name": self.name,
5976       "disk_space_total": disk_space,
5977       "required_nodes": self.required_nodes,
5978       "relocate_from": self.relocate_from,
5979       }
5980     self.in_data["request"] = request
5981
5982   def _BuildInputData(self):
5983     """Build input data structures.
5984
5985     """
5986     self._ComputeClusterData()
5987
5988     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5989       self._AddNewInstance()
5990     else:
5991       self._AddRelocateInstance()
5992
5993     self.in_text = serializer.Dump(self.in_data)
5994
5995   def Run(self, name, validate=True, call_fn=None):
5996     """Run an instance allocator and return the results.
5997
5998     """
5999     if call_fn is None:
6000       call_fn = self.lu.rpc.call_iallocator_runner
6001     data = self.in_text
6002
6003     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6004     result.Raise()
6005
6006     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6007       raise errors.OpExecError("Invalid result from master iallocator runner")
6008
6009     rcode, stdout, stderr, fail = result.data
6010
6011     if rcode == constants.IARUN_NOTFOUND:
6012       raise errors.OpExecError("Can't find allocator '%s'" % name)
6013     elif rcode == constants.IARUN_FAILURE:
6014       raise errors.OpExecError("Instance allocator call failed: %s,"
6015                                " output: %s" % (fail, stdout+stderr))
6016     self.out_text = stdout
6017     if validate:
6018       self._ValidateResult()
6019
6020   def _ValidateResult(self):
6021     """Process the allocator results.
6022
6023     This will process and if successful save the result in
6024     self.out_data and the other parameters.
6025
6026     """
6027     try:
6028       rdict = serializer.Load(self.out_text)
6029     except Exception, err:
6030       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6031
6032     if not isinstance(rdict, dict):
6033       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6034
6035     for key in "success", "info", "nodes":
6036       if key not in rdict:
6037         raise errors.OpExecError("Can't parse iallocator results:"
6038                                  " missing key '%s'" % key)
6039       setattr(self, key, rdict[key])
6040
6041     if not isinstance(rdict["nodes"], list):
6042       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6043                                " is not a list")
6044     self.out_data = rdict
6045
6046
6047 class LUTestAllocator(NoHooksLU):
6048   """Run allocator tests.
6049
6050   This LU runs the allocator tests
6051
6052   """
6053   _OP_REQP = ["direction", "mode", "name"]
6054
6055   def CheckPrereq(self):
6056     """Check prerequisites.
6057
6058     This checks the opcode parameters depending on the director and mode test.
6059
6060     """
6061     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6062       for attr in ["name", "mem_size", "disks", "disk_template",
6063                    "os", "tags", "nics", "vcpus"]:
6064         if not hasattr(self.op, attr):
6065           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6066                                      attr)
6067       iname = self.cfg.ExpandInstanceName(self.op.name)
6068       if iname is not None:
6069         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6070                                    iname)
6071       if not isinstance(self.op.nics, list):
6072         raise errors.OpPrereqError("Invalid parameter 'nics'")
6073       for row in self.op.nics:
6074         if (not isinstance(row, dict) or
6075             "mac" not in row or
6076             "ip" not in row or
6077             "bridge" not in row):
6078           raise errors.OpPrereqError("Invalid contents of the"
6079                                      " 'nics' parameter")
6080       if not isinstance(self.op.disks, list):
6081         raise errors.OpPrereqError("Invalid parameter 'disks'")
6082       if len(self.op.disks) != 2:
6083         raise errors.OpPrereqError("Only two-disk configurations supported")
6084       for row in self.op.disks:
6085         if (not isinstance(row, dict) or
6086             "size" not in row or
6087             not isinstance(row["size"], int) or
6088             "mode" not in row or
6089             row["mode"] not in ['r', 'w']):
6090           raise errors.OpPrereqError("Invalid contents of the"
6091                                      " 'disks' parameter")
6092       if self.op.hypervisor is None:
6093         self.op.hypervisor = self.cfg.GetHypervisorType()
6094     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6095       if not hasattr(self.op, "name"):
6096         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6097       fname = self.cfg.ExpandInstanceName(self.op.name)
6098       if fname is None:
6099         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6100                                    self.op.name)
6101       self.op.name = fname
6102       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6103     else:
6104       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6105                                  self.op.mode)
6106
6107     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6108       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6109         raise errors.OpPrereqError("Missing allocator name")
6110     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6111       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6112                                  self.op.direction)
6113
6114   def Exec(self, feedback_fn):
6115     """Run the allocator test.
6116
6117     """
6118     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6119       ial = IAllocator(self,
6120                        mode=self.op.mode,
6121                        name=self.op.name,
6122                        mem_size=self.op.mem_size,
6123                        disks=self.op.disks,
6124                        disk_template=self.op.disk_template,
6125                        os=self.op.os,
6126                        tags=self.op.tags,
6127                        nics=self.op.nics,
6128                        vcpus=self.op.vcpus,
6129                        hypervisor=self.op.hypervisor,
6130                        )
6131     else:
6132       ial = IAllocator(self,
6133                        mode=self.op.mode,
6134                        name=self.op.name,
6135                        relocate_from=list(self.relocate_from),
6136                        )
6137
6138     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6139       result = ial.in_text
6140     else:
6141       ial.Run(self.op.allocator, validate=False)
6142       result = ial.out_text
6143     return result