Fix cluster rename and known_hosts
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = lu.cfg.GetInstanceList()
396   return utils.NiceSort(wanted)
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
419                           memory, vcpus, nics):
420   """Builds instance related env variables for hooks
421
422   This builds the hook environment from individual variables.
423
424   @type name: string
425   @param name: the name of the instance
426   @type primary_node: string
427   @param primary_node: the name of the instance's primary node
428   @type secondary_nodes: list
429   @param secondary_nodes: list of secondary nodes as strings
430   @type os_type: string
431   @param os_type: the name of the instance's OS
432   @type status: string
433   @param status: the desired status of the instances
434   @type memory: string
435   @param memory: the memory size of the instance
436   @type vcpus: string
437   @param vcpus: the count of VCPUs the instance has
438   @type nics: list
439   @param nics: list of tuples (ip, bridge, mac) representing
440       the NICs the instance  has
441   @rtype: dict
442   @return: the hook environment for this instance
443
444   """
445   env = {
446     "OP_TARGET": name,
447     "INSTANCE_NAME": name,
448     "INSTANCE_PRIMARY": primary_node,
449     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
450     "INSTANCE_OS_TYPE": os_type,
451     "INSTANCE_STATUS": status,
452     "INSTANCE_MEMORY": memory,
453     "INSTANCE_VCPUS": vcpus,
454   }
455
456   if nics:
457     nic_count = len(nics)
458     for idx, (ip, bridge, mac) in enumerate(nics):
459       if ip is None:
460         ip = ""
461       env["INSTANCE_NIC%d_IP" % idx] = ip
462       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
463       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
464   else:
465     nic_count = 0
466
467   env["INSTANCE_NIC_COUNT"] = nic_count
468
469   return env
470
471
472 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
473   """Builds instance related env variables for hooks from an object.
474
475   @type lu: L{LogicalUnit}
476   @param lu: the logical unit on whose behalf we execute
477   @type instance: L{objects.Instance}
478   @param instance: the instance for which we should build the
479       environment
480   @type override: dict
481   @param override: dictionary with key/values that will override
482       our values
483   @rtype: dict
484   @return: the hook environment dictionary
485
486   """
487   bep = lu.cfg.GetClusterInfo().FillBE(instance)
488   args = {
489     'name': instance.name,
490     'primary_node': instance.primary_node,
491     'secondary_nodes': instance.secondary_nodes,
492     'os_type': instance.os,
493     'status': instance.os,
494     'memory': bep[constants.BE_MEMORY],
495     'vcpus': bep[constants.BE_VCPUS],
496     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
497   }
498   if override:
499     args.update(override)
500   return _BuildInstanceHookEnv(**args)
501
502
503 def _CheckInstanceBridgesExist(lu, instance):
504   """Check that the brigdes needed by an instance exist.
505
506   """
507   # check bridges existance
508   brlist = [nic.bridge for nic in instance.nics]
509   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
510   result.Raise()
511   if not result.data:
512     raise errors.OpPrereqError("One or more target bridges %s does not"
513                                " exist on destination node '%s'" %
514                                (brlist, instance.primary_node))
515
516
517 class LUDestroyCluster(NoHooksLU):
518   """Logical unit for destroying the cluster.
519
520   """
521   _OP_REQP = []
522
523   def CheckPrereq(self):
524     """Check prerequisites.
525
526     This checks whether the cluster is empty.
527
528     Any errors are signalled by raising errors.OpPrereqError.
529
530     """
531     master = self.cfg.GetMasterNode()
532
533     nodelist = self.cfg.GetNodeList()
534     if len(nodelist) != 1 or nodelist[0] != master:
535       raise errors.OpPrereqError("There are still %d node(s) in"
536                                  " this cluster." % (len(nodelist) - 1))
537     instancelist = self.cfg.GetInstanceList()
538     if instancelist:
539       raise errors.OpPrereqError("There are still %d instance(s) in"
540                                  " this cluster." % len(instancelist))
541
542   def Exec(self, feedback_fn):
543     """Destroys the cluster.
544
545     """
546     master = self.cfg.GetMasterNode()
547     result = self.rpc.call_node_stop_master(master, False)
548     result.Raise()
549     if not result.data:
550       raise errors.OpExecError("Could not disable the master role")
551     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
552     utils.CreateBackup(priv_key)
553     utils.CreateBackup(pub_key)
554     return master
555
556
557 class LUVerifyCluster(LogicalUnit):
558   """Verifies the cluster status.
559
560   """
561   HPATH = "cluster-verify"
562   HTYPE = constants.HTYPE_CLUSTER
563   _OP_REQP = ["skip_checks"]
564   REQ_BGL = False
565
566   def ExpandNames(self):
567     self.needed_locks = {
568       locking.LEVEL_NODE: locking.ALL_SET,
569       locking.LEVEL_INSTANCE: locking.ALL_SET,
570     }
571     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
572
573   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
574                   node_result, feedback_fn, master_files):
575     """Run multiple tests against a node.
576
577     Test list:
578
579       - compares ganeti version
580       - checks vg existance and size > 20G
581       - checks config file checksum
582       - checks ssh to other nodes
583
584     @type nodeinfo: L{objects.Node}
585     @param nodeinfo: the node to check
586     @param file_list: required list of files
587     @param local_cksum: dictionary of local files and their checksums
588     @param node_result: the results from the node
589     @param feedback_fn: function used to accumulate results
590     @param master_files: list of files that only masters should have
591
592     """
593     node = nodeinfo.name
594
595     # main result, node_result should be a non-empty dict
596     if not node_result or not isinstance(node_result, dict):
597       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
598       return True
599
600     # compares ganeti version
601     local_version = constants.PROTOCOL_VERSION
602     remote_version = node_result.get('version', None)
603     if not remote_version:
604       feedback_fn("  - ERROR: connection to %s failed" % (node))
605       return True
606
607     if local_version != remote_version:
608       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
609                       (local_version, node, remote_version))
610       return True
611
612     # checks vg existance and size > 20G
613
614     bad = False
615     vglist = node_result.get(constants.NV_VGLIST, None)
616     if not vglist:
617       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
618                       (node,))
619       bad = True
620     else:
621       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
622                                             constants.MIN_VG_SIZE)
623       if vgstatus:
624         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
625         bad = True
626
627     # checks config file checksum
628
629     remote_cksum = node_result.get(constants.NV_FILELIST, None)
630     if not isinstance(remote_cksum, dict):
631       bad = True
632       feedback_fn("  - ERROR: node hasn't returned file checksum data")
633     else:
634       for file_name in file_list:
635         node_is_mc = nodeinfo.master_candidate
636         must_have_file = file_name not in master_files
637         if file_name not in remote_cksum:
638           if node_is_mc or must_have_file:
639             bad = True
640             feedback_fn("  - ERROR: file '%s' missing" % file_name)
641         elif remote_cksum[file_name] != local_cksum[file_name]:
642           if node_is_mc or must_have_file:
643             bad = True
644             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
645           else:
646             # not candidate and this is not a must-have file
647             bad = True
648             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
649                         " '%s'" % file_name)
650         else:
651           # all good, except non-master/non-must have combination
652           if not node_is_mc and not must_have_file:
653             feedback_fn("  - ERROR: file '%s' should not exist on non master"
654                         " candidates" % file_name)
655
656     # checks ssh to any
657
658     if constants.NV_NODELIST not in node_result:
659       bad = True
660       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
661     else:
662       if node_result[constants.NV_NODELIST]:
663         bad = True
664         for node in node_result[constants.NV_NODELIST]:
665           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
666                           (node, node_result[constants.NV_NODELIST][node]))
667
668     if constants.NV_NODENETTEST not in node_result:
669       bad = True
670       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
671     else:
672       if node_result[constants.NV_NODENETTEST]:
673         bad = True
674         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
675         for node in nlist:
676           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
677                           (node, node_result[constants.NV_NODENETTEST][node]))
678
679     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
680     if isinstance(hyp_result, dict):
681       for hv_name, hv_result in hyp_result.iteritems():
682         if hv_result is not None:
683           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
684                       (hv_name, hv_result))
685     return bad
686
687   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688                       node_instance, feedback_fn):
689     """Verify an instance.
690
691     This function checks to see if the required block devices are
692     available on the instance's node.
693
694     """
695     bad = False
696
697     node_current = instanceconfig.primary_node
698
699     node_vol_should = {}
700     instanceconfig.MapLVsByNode(node_vol_should)
701
702     for node in node_vol_should:
703       for volume in node_vol_should[node]:
704         if node not in node_vol_is or volume not in node_vol_is[node]:
705           feedback_fn("  - ERROR: volume %s missing on node %s" %
706                           (volume, node))
707           bad = True
708
709     if not instanceconfig.status == 'down':
710       if (node_current not in node_instance or
711           not instance in node_instance[node_current]):
712         feedback_fn("  - ERROR: instance %s not running on node %s" %
713                         (instance, node_current))
714         bad = True
715
716     for node in node_instance:
717       if (not node == node_current):
718         if instance in node_instance[node]:
719           feedback_fn("  - ERROR: instance %s should not run on node %s" %
720                           (instance, node))
721           bad = True
722
723     return bad
724
725   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726     """Verify if there are any unknown volumes in the cluster.
727
728     The .os, .swap and backup volumes are ignored. All other volumes are
729     reported as unknown.
730
731     """
732     bad = False
733
734     for node in node_vol_is:
735       for volume in node_vol_is[node]:
736         if node not in node_vol_should or volume not in node_vol_should[node]:
737           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738                       (volume, node))
739           bad = True
740     return bad
741
742   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743     """Verify the list of running instances.
744
745     This checks what instances are running but unknown to the cluster.
746
747     """
748     bad = False
749     for node in node_instance:
750       for runninginstance in node_instance[node]:
751         if runninginstance not in instancelist:
752           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753                           (runninginstance, node))
754           bad = True
755     return bad
756
757   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758     """Verify N+1 Memory Resilience.
759
760     Check that if one single node dies we can still start all the instances it
761     was primary for.
762
763     """
764     bad = False
765
766     for node, nodeinfo in node_info.iteritems():
767       # This code checks that every node which is now listed as secondary has
768       # enough memory to host all instances it is supposed to should a single
769       # other node in the cluster fail.
770       # FIXME: not ready for failover to an arbitrary node
771       # FIXME: does not support file-backed instances
772       # WARNING: we currently take into account down instances as well as up
773       # ones, considering that even if they're down someone might want to start
774       # them even in the event of a node failure.
775       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
776         needed_mem = 0
777         for instance in instances:
778           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
779           if bep[constants.BE_AUTO_BALANCE]:
780             needed_mem += bep[constants.BE_MEMORY]
781         if nodeinfo['mfree'] < needed_mem:
782           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
783                       " failovers should node %s fail" % (node, prinode))
784           bad = True
785     return bad
786
787   def CheckPrereq(self):
788     """Check prerequisites.
789
790     Transform the list of checks we're going to skip into a set and check that
791     all its members are valid.
792
793     """
794     self.skip_set = frozenset(self.op.skip_checks)
795     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
796       raise errors.OpPrereqError("Invalid checks to be skipped specified")
797
798   def BuildHooksEnv(self):
799     """Build hooks env.
800
801     Cluster-Verify hooks just rone in the post phase and their failure makes
802     the output be logged in the verify output and the verification to fail.
803
804     """
805     all_nodes = self.cfg.GetNodeList()
806     # TODO: populate the environment with useful information for verify hooks
807     env = {}
808     return env, [], all_nodes
809
810   def Exec(self, feedback_fn):
811     """Verify integrity of cluster, performing various test on nodes.
812
813     """
814     bad = False
815     feedback_fn("* Verifying global settings")
816     for msg in self.cfg.VerifyConfig():
817       feedback_fn("  - ERROR: %s" % msg)
818
819     vg_name = self.cfg.GetVGName()
820     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
821     nodelist = utils.NiceSort(self.cfg.GetNodeList())
822     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
823     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
824     i_non_redundant = [] # Non redundant instances
825     i_non_a_balanced = [] # Non auto-balanced instances
826     node_volume = {}
827     node_instance = {}
828     node_info = {}
829     instance_cfg = {}
830
831     # FIXME: verify OS list
832     # do local checksums
833     master_files = [constants.CLUSTER_CONF_FILE]
834
835     file_names = ssconf.SimpleStore().GetFileList()
836     file_names.append(constants.SSL_CERT_FILE)
837     file_names.extend(master_files)
838
839     local_checksums = utils.FingerprintFiles(file_names)
840
841     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
842     node_verify_param = {
843       constants.NV_FILELIST: file_names,
844       constants.NV_NODELIST: nodelist,
845       constants.NV_HYPERVISOR: hypervisors,
846       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
847                                   node.secondary_ip) for node in nodeinfo],
848       constants.NV_LVLIST: vg_name,
849       constants.NV_INSTANCELIST: hypervisors,
850       constants.NV_VGLIST: None,
851       constants.NV_VERSION: None,
852       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
853       }
854     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
855                                            self.cfg.GetClusterName())
856
857     cluster = self.cfg.GetClusterInfo()
858     master_node = self.cfg.GetMasterNode()
859     for node_i in nodeinfo:
860       node = node_i.name
861       nresult = all_nvinfo[node].data
862
863       if node == master_node:
864         ntype = "master"
865       elif node_i.master_candidate:
866         ntype = "master candidate"
867       else:
868         ntype = "regular"
869       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
870
871       if all_nvinfo[node].failed or not isinstance(nresult, dict):
872         feedback_fn("  - ERROR: connection to %s failed" % (node,))
873         bad = True
874         continue
875
876       result = self._VerifyNode(node_i, file_names, local_checksums,
877                                 nresult, feedback_fn, master_files)
878       bad = bad or result
879
880       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
881       if isinstance(lvdata, basestring):
882         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
883                     (node, lvdata.encode('string_escape')))
884         bad = True
885         node_volume[node] = {}
886       elif not isinstance(lvdata, dict):
887         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
888         bad = True
889         continue
890       else:
891         node_volume[node] = lvdata
892
893       # node_instance
894       idata = nresult.get(constants.NV_INSTANCELIST, None)
895       if not isinstance(idata, list):
896         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
897                     (node,))
898         bad = True
899         continue
900
901       node_instance[node] = idata
902
903       # node_info
904       nodeinfo = nresult.get(constants.NV_HVINFO, None)
905       if not isinstance(nodeinfo, dict):
906         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
907         bad = True
908         continue
909
910       try:
911         node_info[node] = {
912           "mfree": int(nodeinfo['memory_free']),
913           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
914           "pinst": [],
915           "sinst": [],
916           # dictionary holding all instances this node is secondary for,
917           # grouped by their primary node. Each key is a cluster node, and each
918           # value is a list of instances which have the key as primary and the
919           # current node as secondary.  this is handy to calculate N+1 memory
920           # availability if you can only failover from a primary to its
921           # secondary.
922           "sinst-by-pnode": {},
923         }
924       except ValueError:
925         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
926         bad = True
927         continue
928
929     node_vol_should = {}
930
931     for instance in instancelist:
932       feedback_fn("* Verifying instance %s" % instance)
933       inst_config = self.cfg.GetInstanceInfo(instance)
934       result =  self._VerifyInstance(instance, inst_config, node_volume,
935                                      node_instance, feedback_fn)
936       bad = bad or result
937
938       inst_config.MapLVsByNode(node_vol_should)
939
940       instance_cfg[instance] = inst_config
941
942       pnode = inst_config.primary_node
943       if pnode in node_info:
944         node_info[pnode]['pinst'].append(instance)
945       else:
946         feedback_fn("  - ERROR: instance %s, connection to primary node"
947                     " %s failed" % (instance, pnode))
948         bad = True
949
950       # If the instance is non-redundant we cannot survive losing its primary
951       # node, so we are not N+1 compliant. On the other hand we have no disk
952       # templates with more than one secondary so that situation is not well
953       # supported either.
954       # FIXME: does not support file-backed instances
955       if len(inst_config.secondary_nodes) == 0:
956         i_non_redundant.append(instance)
957       elif len(inst_config.secondary_nodes) > 1:
958         feedback_fn("  - WARNING: multiple secondaries for instance %s"
959                     % instance)
960
961       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
962         i_non_a_balanced.append(instance)
963
964       for snode in inst_config.secondary_nodes:
965         if snode in node_info:
966           node_info[snode]['sinst'].append(instance)
967           if pnode not in node_info[snode]['sinst-by-pnode']:
968             node_info[snode]['sinst-by-pnode'][pnode] = []
969           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
970         else:
971           feedback_fn("  - ERROR: instance %s, connection to secondary node"
972                       " %s failed" % (instance, snode))
973
974     feedback_fn("* Verifying orphan volumes")
975     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
976                                        feedback_fn)
977     bad = bad or result
978
979     feedback_fn("* Verifying remaining instances")
980     result = self._VerifyOrphanInstances(instancelist, node_instance,
981                                          feedback_fn)
982     bad = bad or result
983
984     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
985       feedback_fn("* Verifying N+1 Memory redundancy")
986       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
987       bad = bad or result
988
989     feedback_fn("* Other Notes")
990     if i_non_redundant:
991       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
992                   % len(i_non_redundant))
993
994     if i_non_a_balanced:
995       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
996                   % len(i_non_a_balanced))
997
998     return not bad
999
1000   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1001     """Analize the post-hooks' result
1002
1003     This method analyses the hook result, handles it, and sends some
1004     nicely-formatted feedback back to the user.
1005
1006     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1007         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1008     @param hooks_results: the results of the multi-node hooks rpc call
1009     @param feedback_fn: function used send feedback back to the caller
1010     @param lu_result: previous Exec result
1011     @return: the new Exec result, based on the previous result
1012         and hook results
1013
1014     """
1015     # We only really run POST phase hooks, and are only interested in
1016     # their results
1017     if phase == constants.HOOKS_PHASE_POST:
1018       # Used to change hooks' output to proper indentation
1019       indent_re = re.compile('^', re.M)
1020       feedback_fn("* Hooks Results")
1021       if not hooks_results:
1022         feedback_fn("  - ERROR: general communication failure")
1023         lu_result = 1
1024       else:
1025         for node_name in hooks_results:
1026           show_node_header = True
1027           res = hooks_results[node_name]
1028           if res.failed or res.data is False or not isinstance(res.data, list):
1029             feedback_fn("    Communication failure in hooks execution")
1030             lu_result = 1
1031             continue
1032           for script, hkr, output in res.data:
1033             if hkr == constants.HKR_FAIL:
1034               # The node header is only shown once, if there are
1035               # failing hooks on that node
1036               if show_node_header:
1037                 feedback_fn("  Node %s:" % node_name)
1038                 show_node_header = False
1039               feedback_fn("    ERROR: Script %s failed, output:" % script)
1040               output = indent_re.sub('      ', output)
1041               feedback_fn("%s" % output)
1042               lu_result = 1
1043
1044       return lu_result
1045
1046
1047 class LUVerifyDisks(NoHooksLU):
1048   """Verifies the cluster disks status.
1049
1050   """
1051   _OP_REQP = []
1052   REQ_BGL = False
1053
1054   def ExpandNames(self):
1055     self.needed_locks = {
1056       locking.LEVEL_NODE: locking.ALL_SET,
1057       locking.LEVEL_INSTANCE: locking.ALL_SET,
1058     }
1059     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1060
1061   def CheckPrereq(self):
1062     """Check prerequisites.
1063
1064     This has no prerequisites.
1065
1066     """
1067     pass
1068
1069   def Exec(self, feedback_fn):
1070     """Verify integrity of cluster disks.
1071
1072     """
1073     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1074
1075     vg_name = self.cfg.GetVGName()
1076     nodes = utils.NiceSort(self.cfg.GetNodeList())
1077     instances = [self.cfg.GetInstanceInfo(name)
1078                  for name in self.cfg.GetInstanceList()]
1079
1080     nv_dict = {}
1081     for inst in instances:
1082       inst_lvs = {}
1083       if (inst.status != "up" or
1084           inst.disk_template not in constants.DTS_NET_MIRROR):
1085         continue
1086       inst.MapLVsByNode(inst_lvs)
1087       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1088       for node, vol_list in inst_lvs.iteritems():
1089         for vol in vol_list:
1090           nv_dict[(node, vol)] = inst
1091
1092     if not nv_dict:
1093       return result
1094
1095     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1096
1097     to_act = set()
1098     for node in nodes:
1099       # node_volume
1100       lvs = node_lvs[node]
1101       if lvs.failed:
1102         self.LogWarning("Connection to node %s failed: %s" %
1103                         (node, lvs.data))
1104         continue
1105       lvs = lvs.data
1106       if isinstance(lvs, basestring):
1107         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1108         res_nlvm[node] = lvs
1109       elif not isinstance(lvs, dict):
1110         logging.warning("Connection to node %s failed or invalid data"
1111                         " returned", node)
1112         res_nodes.append(node)
1113         continue
1114
1115       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1116         inst = nv_dict.pop((node, lv_name), None)
1117         if (not lv_online and inst is not None
1118             and inst.name not in res_instances):
1119           res_instances.append(inst.name)
1120
1121     # any leftover items in nv_dict are missing LVs, let's arrange the
1122     # data better
1123     for key, inst in nv_dict.iteritems():
1124       if inst.name not in res_missing:
1125         res_missing[inst.name] = []
1126       res_missing[inst.name].append(key)
1127
1128     return result
1129
1130
1131 class LURenameCluster(LogicalUnit):
1132   """Rename the cluster.
1133
1134   """
1135   HPATH = "cluster-rename"
1136   HTYPE = constants.HTYPE_CLUSTER
1137   _OP_REQP = ["name"]
1138
1139   def BuildHooksEnv(self):
1140     """Build hooks env.
1141
1142     """
1143     env = {
1144       "OP_TARGET": self.cfg.GetClusterName(),
1145       "NEW_NAME": self.op.name,
1146       }
1147     mn = self.cfg.GetMasterNode()
1148     return env, [mn], [mn]
1149
1150   def CheckPrereq(self):
1151     """Verify that the passed name is a valid one.
1152
1153     """
1154     hostname = utils.HostInfo(self.op.name)
1155
1156     new_name = hostname.name
1157     self.ip = new_ip = hostname.ip
1158     old_name = self.cfg.GetClusterName()
1159     old_ip = self.cfg.GetMasterIP()
1160     if new_name == old_name and new_ip == old_ip:
1161       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1162                                  " cluster has changed")
1163     if new_ip != old_ip:
1164       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1165         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1166                                    " reachable on the network. Aborting." %
1167                                    new_ip)
1168
1169     self.op.name = new_name
1170
1171   def Exec(self, feedback_fn):
1172     """Rename the cluster.
1173
1174     """
1175     clustername = self.op.name
1176     ip = self.ip
1177
1178     # shutdown the master IP
1179     master = self.cfg.GetMasterNode()
1180     result = self.rpc.call_node_stop_master(master, False)
1181     if result.failed or not result.data:
1182       raise errors.OpExecError("Could not disable the master role")
1183
1184     try:
1185       cluster = self.cfg.GetClusterInfo()
1186       cluster.cluster_name = clustername
1187       cluster.master_ip = ip
1188       self.cfg.Update(cluster)
1189
1190       # update the known hosts file
1191       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1192       node_list = self.cfg.GetNodeList()
1193       try:
1194         node_list.remove(master)
1195       except ValueError:
1196         pass
1197       result = self.rpc.call_upload_file(node_list,
1198                                          constants.SSH_KNOWN_HOSTS_FILE)
1199       for to_node, to_result in result.iteritems():
1200         if to_result.failed or not to_result.data:
1201           logging.error("Copy of file %s to node %s failed", fname, to_node)
1202
1203     finally:
1204       result = self.rpc.call_node_start_master(master, False)
1205       if result.failed or not result.data:
1206         self.LogWarning("Could not re-enable the master role on"
1207                         " the master, please restart manually.")
1208
1209
1210 def _RecursiveCheckIfLVMBased(disk):
1211   """Check if the given disk or its children are lvm-based.
1212
1213   @type disk: L{objects.Disk}
1214   @param disk: the disk to check
1215   @rtype: booleean
1216   @return: boolean indicating whether a LD_LV dev_type was found or not
1217
1218   """
1219   if disk.children:
1220     for chdisk in disk.children:
1221       if _RecursiveCheckIfLVMBased(chdisk):
1222         return True
1223   return disk.dev_type == constants.LD_LV
1224
1225
1226 class LUSetClusterParams(LogicalUnit):
1227   """Change the parameters of the cluster.
1228
1229   """
1230   HPATH = "cluster-modify"
1231   HTYPE = constants.HTYPE_CLUSTER
1232   _OP_REQP = []
1233   REQ_BGL = False
1234
1235   def CheckParameters(self):
1236     """Check parameters
1237
1238     """
1239     if not hasattr(self.op, "candidate_pool_size"):
1240       self.op.candidate_pool_size = None
1241     if self.op.candidate_pool_size is not None:
1242       try:
1243         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1244       except ValueError, err:
1245         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1246                                    str(err))
1247       if self.op.candidate_pool_size < 1:
1248         raise errors.OpPrereqError("At least one master candidate needed")
1249
1250   def ExpandNames(self):
1251     # FIXME: in the future maybe other cluster params won't require checking on
1252     # all nodes to be modified.
1253     self.needed_locks = {
1254       locking.LEVEL_NODE: locking.ALL_SET,
1255     }
1256     self.share_locks[locking.LEVEL_NODE] = 1
1257
1258   def BuildHooksEnv(self):
1259     """Build hooks env.
1260
1261     """
1262     env = {
1263       "OP_TARGET": self.cfg.GetClusterName(),
1264       "NEW_VG_NAME": self.op.vg_name,
1265       }
1266     mn = self.cfg.GetMasterNode()
1267     return env, [mn], [mn]
1268
1269   def CheckPrereq(self):
1270     """Check prerequisites.
1271
1272     This checks whether the given params don't conflict and
1273     if the given volume group is valid.
1274
1275     """
1276     # FIXME: This only works because there is only one parameter that can be
1277     # changed or removed.
1278     if self.op.vg_name is not None and not self.op.vg_name:
1279       instances = self.cfg.GetAllInstancesInfo().values()
1280       for inst in instances:
1281         for disk in inst.disks:
1282           if _RecursiveCheckIfLVMBased(disk):
1283             raise errors.OpPrereqError("Cannot disable lvm storage while"
1284                                        " lvm-based instances exist")
1285
1286     node_list = self.acquired_locks[locking.LEVEL_NODE]
1287
1288     # if vg_name not None, checks given volume group on all nodes
1289     if self.op.vg_name:
1290       vglist = self.rpc.call_vg_list(node_list)
1291       for node in node_list:
1292         if vglist[node].failed:
1293           # ignoring down node
1294           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1295           continue
1296         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1297                                               self.op.vg_name,
1298                                               constants.MIN_VG_SIZE)
1299         if vgstatus:
1300           raise errors.OpPrereqError("Error on node '%s': %s" %
1301                                      (node, vgstatus))
1302
1303     self.cluster = cluster = self.cfg.GetClusterInfo()
1304     # validate beparams changes
1305     if self.op.beparams:
1306       utils.CheckBEParams(self.op.beparams)
1307       self.new_beparams = cluster.FillDict(
1308         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1309
1310     # hypervisor list/parameters
1311     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1312     if self.op.hvparams:
1313       if not isinstance(self.op.hvparams, dict):
1314         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1315       for hv_name, hv_dict in self.op.hvparams.items():
1316         if hv_name not in self.new_hvparams:
1317           self.new_hvparams[hv_name] = hv_dict
1318         else:
1319           self.new_hvparams[hv_name].update(hv_dict)
1320
1321     if self.op.enabled_hypervisors is not None:
1322       self.hv_list = self.op.enabled_hypervisors
1323     else:
1324       self.hv_list = cluster.enabled_hypervisors
1325
1326     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1327       # either the enabled list has changed, or the parameters have, validate
1328       for hv_name, hv_params in self.new_hvparams.items():
1329         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1330             (self.op.enabled_hypervisors and
1331              hv_name in self.op.enabled_hypervisors)):
1332           # either this is a new hypervisor, or its parameters have changed
1333           hv_class = hypervisor.GetHypervisor(hv_name)
1334           hv_class.CheckParameterSyntax(hv_params)
1335           _CheckHVParams(self, node_list, hv_name, hv_params)
1336
1337   def Exec(self, feedback_fn):
1338     """Change the parameters of the cluster.
1339
1340     """
1341     if self.op.vg_name is not None:
1342       if self.op.vg_name != self.cfg.GetVGName():
1343         self.cfg.SetVGName(self.op.vg_name)
1344       else:
1345         feedback_fn("Cluster LVM configuration already in desired"
1346                     " state, not changing")
1347     if self.op.hvparams:
1348       self.cluster.hvparams = self.new_hvparams
1349     if self.op.enabled_hypervisors is not None:
1350       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1351     if self.op.beparams:
1352       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1353     if self.op.candidate_pool_size is not None:
1354       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1355
1356     self.cfg.Update(self.cluster)
1357
1358     # we want to update nodes after the cluster so that if any errors
1359     # happen, we have recorded and saved the cluster info
1360     if self.op.candidate_pool_size is not None:
1361       node_info = self.cfg.GetAllNodesInfo().values()
1362       num_candidates = len([node for node in node_info
1363                             if node.master_candidate])
1364       num_nodes = len(node_info)
1365       if num_candidates < self.op.candidate_pool_size:
1366         random.shuffle(node_info)
1367         for node in node_info:
1368           if num_candidates >= self.op.candidate_pool_size:
1369             break
1370           if node.master_candidate:
1371             continue
1372           node.master_candidate = True
1373           self.LogInfo("Promoting node %s to master candidate", node.name)
1374           self.cfg.Update(node)
1375           self.context.ReaddNode(node)
1376           num_candidates += 1
1377       elif num_candidates > self.op.candidate_pool_size:
1378         self.LogInfo("Note: more nodes are candidates (%d) than the new value"
1379                      " of candidate_pool_size (%d)" %
1380                      (num_candidates, self.op.candidate_pool_size))
1381
1382
1383 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1384   """Sleep and poll for an instance's disk to sync.
1385
1386   """
1387   if not instance.disks:
1388     return True
1389
1390   if not oneshot:
1391     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1392
1393   node = instance.primary_node
1394
1395   for dev in instance.disks:
1396     lu.cfg.SetDiskID(dev, node)
1397
1398   retries = 0
1399   while True:
1400     max_time = 0
1401     done = True
1402     cumul_degraded = False
1403     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1404     if rstats.failed or not rstats.data:
1405       lu.LogWarning("Can't get any data from node %s", node)
1406       retries += 1
1407       if retries >= 10:
1408         raise errors.RemoteError("Can't contact node %s for mirror data,"
1409                                  " aborting." % node)
1410       time.sleep(6)
1411       continue
1412     rstats = rstats.data
1413     retries = 0
1414     for i in range(len(rstats)):
1415       mstat = rstats[i]
1416       if mstat is None:
1417         lu.LogWarning("Can't compute data for node %s/%s",
1418                            node, instance.disks[i].iv_name)
1419         continue
1420       # we ignore the ldisk parameter
1421       perc_done, est_time, is_degraded, _ = mstat
1422       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1423       if perc_done is not None:
1424         done = False
1425         if est_time is not None:
1426           rem_time = "%d estimated seconds remaining" % est_time
1427           max_time = est_time
1428         else:
1429           rem_time = "no time estimate"
1430         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1431                         (instance.disks[i].iv_name, perc_done, rem_time))
1432     if done or oneshot:
1433       break
1434
1435     time.sleep(min(60, max_time))
1436
1437   if done:
1438     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1439   return not cumul_degraded
1440
1441
1442 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1443   """Check that mirrors are not degraded.
1444
1445   The ldisk parameter, if True, will change the test from the
1446   is_degraded attribute (which represents overall non-ok status for
1447   the device(s)) to the ldisk (representing the local storage status).
1448
1449   """
1450   lu.cfg.SetDiskID(dev, node)
1451   if ldisk:
1452     idx = 6
1453   else:
1454     idx = 5
1455
1456   result = True
1457   if on_primary or dev.AssembleOnSecondary():
1458     rstats = lu.rpc.call_blockdev_find(node, dev)
1459     if rstats.failed or not rstats.data:
1460       logging.warning("Node %s: disk degraded, not found or node down", node)
1461       result = False
1462     else:
1463       result = result and (not rstats.data[idx])
1464   if dev.children:
1465     for child in dev.children:
1466       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1467
1468   return result
1469
1470
1471 class LUDiagnoseOS(NoHooksLU):
1472   """Logical unit for OS diagnose/query.
1473
1474   """
1475   _OP_REQP = ["output_fields", "names"]
1476   REQ_BGL = False
1477   _FIELDS_STATIC = utils.FieldSet()
1478   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1479
1480   def ExpandNames(self):
1481     if self.op.names:
1482       raise errors.OpPrereqError("Selective OS query not supported")
1483
1484     _CheckOutputFields(static=self._FIELDS_STATIC,
1485                        dynamic=self._FIELDS_DYNAMIC,
1486                        selected=self.op.output_fields)
1487
1488     # Lock all nodes, in shared mode
1489     self.needed_locks = {}
1490     self.share_locks[locking.LEVEL_NODE] = 1
1491     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1492
1493   def CheckPrereq(self):
1494     """Check prerequisites.
1495
1496     """
1497
1498   @staticmethod
1499   def _DiagnoseByOS(node_list, rlist):
1500     """Remaps a per-node return list into an a per-os per-node dictionary
1501
1502     @param node_list: a list with the names of all nodes
1503     @param rlist: a map with node names as keys and OS objects as values
1504
1505     @rtype: dict
1506     @returns: a dictionary with osnames as keys and as value another map, with
1507         nodes as keys and list of OS objects as values, eg::
1508
1509           {"debian-etch": {"node1": [<object>,...],
1510                            "node2": [<object>,]}
1511           }
1512
1513     """
1514     all_os = {}
1515     for node_name, nr in rlist.iteritems():
1516       if nr.failed or not nr.data:
1517         continue
1518       for os_obj in nr.data:
1519         if os_obj.name not in all_os:
1520           # build a list of nodes for this os containing empty lists
1521           # for each node in node_list
1522           all_os[os_obj.name] = {}
1523           for nname in node_list:
1524             all_os[os_obj.name][nname] = []
1525         all_os[os_obj.name][node_name].append(os_obj)
1526     return all_os
1527
1528   def Exec(self, feedback_fn):
1529     """Compute the list of OSes.
1530
1531     """
1532     node_list = self.acquired_locks[locking.LEVEL_NODE]
1533     node_data = self.rpc.call_os_diagnose(node_list)
1534     if node_data == False:
1535       raise errors.OpExecError("Can't gather the list of OSes")
1536     pol = self._DiagnoseByOS(node_list, node_data)
1537     output = []
1538     for os_name, os_data in pol.iteritems():
1539       row = []
1540       for field in self.op.output_fields:
1541         if field == "name":
1542           val = os_name
1543         elif field == "valid":
1544           val = utils.all([osl and osl[0] for osl in os_data.values()])
1545         elif field == "node_status":
1546           val = {}
1547           for node_name, nos_list in os_data.iteritems():
1548             val[node_name] = [(v.status, v.path) for v in nos_list]
1549         else:
1550           raise errors.ParameterError(field)
1551         row.append(val)
1552       output.append(row)
1553
1554     return output
1555
1556
1557 class LURemoveNode(LogicalUnit):
1558   """Logical unit for removing a node.
1559
1560   """
1561   HPATH = "node-remove"
1562   HTYPE = constants.HTYPE_NODE
1563   _OP_REQP = ["node_name"]
1564
1565   def BuildHooksEnv(self):
1566     """Build hooks env.
1567
1568     This doesn't run on the target node in the pre phase as a failed
1569     node would then be impossible to remove.
1570
1571     """
1572     env = {
1573       "OP_TARGET": self.op.node_name,
1574       "NODE_NAME": self.op.node_name,
1575       }
1576     all_nodes = self.cfg.GetNodeList()
1577     all_nodes.remove(self.op.node_name)
1578     return env, all_nodes, all_nodes
1579
1580   def CheckPrereq(self):
1581     """Check prerequisites.
1582
1583     This checks:
1584      - the node exists in the configuration
1585      - it does not have primary or secondary instances
1586      - it's not the master
1587
1588     Any errors are signalled by raising errors.OpPrereqError.
1589
1590     """
1591     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1592     if node is None:
1593       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1594
1595     instance_list = self.cfg.GetInstanceList()
1596
1597     masternode = self.cfg.GetMasterNode()
1598     if node.name == masternode:
1599       raise errors.OpPrereqError("Node is the master node,"
1600                                  " you need to failover first.")
1601
1602     for instance_name in instance_list:
1603       instance = self.cfg.GetInstanceInfo(instance_name)
1604       if node.name == instance.primary_node:
1605         raise errors.OpPrereqError("Instance %s still running on the node,"
1606                                    " please remove first." % instance_name)
1607       if node.name in instance.secondary_nodes:
1608         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1609                                    " please remove first." % instance_name)
1610     self.op.node_name = node.name
1611     self.node = node
1612
1613   def Exec(self, feedback_fn):
1614     """Removes the node from the cluster.
1615
1616     """
1617     node = self.node
1618     logging.info("Stopping the node daemon and removing configs from node %s",
1619                  node.name)
1620
1621     self.context.RemoveNode(node.name)
1622
1623     self.rpc.call_node_leave_cluster(node.name)
1624
1625
1626 class LUQueryNodes(NoHooksLU):
1627   """Logical unit for querying nodes.
1628
1629   """
1630   _OP_REQP = ["output_fields", "names"]
1631   REQ_BGL = False
1632   _FIELDS_DYNAMIC = utils.FieldSet(
1633     "dtotal", "dfree",
1634     "mtotal", "mnode", "mfree",
1635     "bootid",
1636     "ctotal",
1637     )
1638
1639   _FIELDS_STATIC = utils.FieldSet(
1640     "name", "pinst_cnt", "sinst_cnt",
1641     "pinst_list", "sinst_list",
1642     "pip", "sip", "tags",
1643     "serial_no",
1644     "master_candidate",
1645     "master",
1646     )
1647
1648   def ExpandNames(self):
1649     _CheckOutputFields(static=self._FIELDS_STATIC,
1650                        dynamic=self._FIELDS_DYNAMIC,
1651                        selected=self.op.output_fields)
1652
1653     self.needed_locks = {}
1654     self.share_locks[locking.LEVEL_NODE] = 1
1655
1656     if self.op.names:
1657       self.wanted = _GetWantedNodes(self, self.op.names)
1658     else:
1659       self.wanted = locking.ALL_SET
1660
1661     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1662     if self.do_locking:
1663       # if we don't request only static fields, we need to lock the nodes
1664       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1665
1666
1667   def CheckPrereq(self):
1668     """Check prerequisites.
1669
1670     """
1671     # The validation of the node list is done in the _GetWantedNodes,
1672     # if non empty, and if empty, there's no validation to do
1673     pass
1674
1675   def Exec(self, feedback_fn):
1676     """Computes the list of nodes and their attributes.
1677
1678     """
1679     all_info = self.cfg.GetAllNodesInfo()
1680     if self.do_locking:
1681       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1682     elif self.wanted != locking.ALL_SET:
1683       nodenames = self.wanted
1684       missing = set(nodenames).difference(all_info.keys())
1685       if missing:
1686         raise errors.OpExecError(
1687           "Some nodes were removed before retrieving their data: %s" % missing)
1688     else:
1689       nodenames = all_info.keys()
1690
1691     nodenames = utils.NiceSort(nodenames)
1692     nodelist = [all_info[name] for name in nodenames]
1693
1694     # begin data gathering
1695
1696     if self.do_locking:
1697       live_data = {}
1698       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1699                                           self.cfg.GetHypervisorType())
1700       for name in nodenames:
1701         nodeinfo = node_data[name]
1702         if not nodeinfo.failed and nodeinfo.data:
1703           nodeinfo = nodeinfo.data
1704           fn = utils.TryConvert
1705           live_data[name] = {
1706             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1707             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1708             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1709             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1710             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1711             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1712             "bootid": nodeinfo.get('bootid', None),
1713             }
1714         else:
1715           live_data[name] = {}
1716     else:
1717       live_data = dict.fromkeys(nodenames, {})
1718
1719     node_to_primary = dict([(name, set()) for name in nodenames])
1720     node_to_secondary = dict([(name, set()) for name in nodenames])
1721
1722     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1723                              "sinst_cnt", "sinst_list"))
1724     if inst_fields & frozenset(self.op.output_fields):
1725       instancelist = self.cfg.GetInstanceList()
1726
1727       for instance_name in instancelist:
1728         inst = self.cfg.GetInstanceInfo(instance_name)
1729         if inst.primary_node in node_to_primary:
1730           node_to_primary[inst.primary_node].add(inst.name)
1731         for secnode in inst.secondary_nodes:
1732           if secnode in node_to_secondary:
1733             node_to_secondary[secnode].add(inst.name)
1734
1735     master_node = self.cfg.GetMasterNode()
1736
1737     # end data gathering
1738
1739     output = []
1740     for node in nodelist:
1741       node_output = []
1742       for field in self.op.output_fields:
1743         if field == "name":
1744           val = node.name
1745         elif field == "pinst_list":
1746           val = list(node_to_primary[node.name])
1747         elif field == "sinst_list":
1748           val = list(node_to_secondary[node.name])
1749         elif field == "pinst_cnt":
1750           val = len(node_to_primary[node.name])
1751         elif field == "sinst_cnt":
1752           val = len(node_to_secondary[node.name])
1753         elif field == "pip":
1754           val = node.primary_ip
1755         elif field == "sip":
1756           val = node.secondary_ip
1757         elif field == "tags":
1758           val = list(node.GetTags())
1759         elif field == "serial_no":
1760           val = node.serial_no
1761         elif field == "master_candidate":
1762           val = node.master_candidate
1763         elif field == "master":
1764           val = node.name == master_node
1765         elif self._FIELDS_DYNAMIC.Matches(field):
1766           val = live_data[node.name].get(field, None)
1767         else:
1768           raise errors.ParameterError(field)
1769         node_output.append(val)
1770       output.append(node_output)
1771
1772     return output
1773
1774
1775 class LUQueryNodeVolumes(NoHooksLU):
1776   """Logical unit for getting volumes on node(s).
1777
1778   """
1779   _OP_REQP = ["nodes", "output_fields"]
1780   REQ_BGL = False
1781   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1782   _FIELDS_STATIC = utils.FieldSet("node")
1783
1784   def ExpandNames(self):
1785     _CheckOutputFields(static=self._FIELDS_STATIC,
1786                        dynamic=self._FIELDS_DYNAMIC,
1787                        selected=self.op.output_fields)
1788
1789     self.needed_locks = {}
1790     self.share_locks[locking.LEVEL_NODE] = 1
1791     if not self.op.nodes:
1792       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1793     else:
1794       self.needed_locks[locking.LEVEL_NODE] = \
1795         _GetWantedNodes(self, self.op.nodes)
1796
1797   def CheckPrereq(self):
1798     """Check prerequisites.
1799
1800     This checks that the fields required are valid output fields.
1801
1802     """
1803     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1804
1805   def Exec(self, feedback_fn):
1806     """Computes the list of nodes and their attributes.
1807
1808     """
1809     nodenames = self.nodes
1810     volumes = self.rpc.call_node_volumes(nodenames)
1811
1812     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1813              in self.cfg.GetInstanceList()]
1814
1815     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1816
1817     output = []
1818     for node in nodenames:
1819       if node not in volumes or volumes[node].failed or not volumes[node].data:
1820         continue
1821
1822       node_vols = volumes[node].data[:]
1823       node_vols.sort(key=lambda vol: vol['dev'])
1824
1825       for vol in node_vols:
1826         node_output = []
1827         for field in self.op.output_fields:
1828           if field == "node":
1829             val = node
1830           elif field == "phys":
1831             val = vol['dev']
1832           elif field == "vg":
1833             val = vol['vg']
1834           elif field == "name":
1835             val = vol['name']
1836           elif field == "size":
1837             val = int(float(vol['size']))
1838           elif field == "instance":
1839             for inst in ilist:
1840               if node not in lv_by_node[inst]:
1841                 continue
1842               if vol['name'] in lv_by_node[inst][node]:
1843                 val = inst.name
1844                 break
1845             else:
1846               val = '-'
1847           else:
1848             raise errors.ParameterError(field)
1849           node_output.append(str(val))
1850
1851         output.append(node_output)
1852
1853     return output
1854
1855
1856 class LUAddNode(LogicalUnit):
1857   """Logical unit for adding node to the cluster.
1858
1859   """
1860   HPATH = "node-add"
1861   HTYPE = constants.HTYPE_NODE
1862   _OP_REQP = ["node_name"]
1863
1864   def BuildHooksEnv(self):
1865     """Build hooks env.
1866
1867     This will run on all nodes before, and on all nodes + the new node after.
1868
1869     """
1870     env = {
1871       "OP_TARGET": self.op.node_name,
1872       "NODE_NAME": self.op.node_name,
1873       "NODE_PIP": self.op.primary_ip,
1874       "NODE_SIP": self.op.secondary_ip,
1875       }
1876     nodes_0 = self.cfg.GetNodeList()
1877     nodes_1 = nodes_0 + [self.op.node_name, ]
1878     return env, nodes_0, nodes_1
1879
1880   def CheckPrereq(self):
1881     """Check prerequisites.
1882
1883     This checks:
1884      - the new node is not already in the config
1885      - it is resolvable
1886      - its parameters (single/dual homed) matches the cluster
1887
1888     Any errors are signalled by raising errors.OpPrereqError.
1889
1890     """
1891     node_name = self.op.node_name
1892     cfg = self.cfg
1893
1894     dns_data = utils.HostInfo(node_name)
1895
1896     node = dns_data.name
1897     primary_ip = self.op.primary_ip = dns_data.ip
1898     secondary_ip = getattr(self.op, "secondary_ip", None)
1899     if secondary_ip is None:
1900       secondary_ip = primary_ip
1901     if not utils.IsValidIP(secondary_ip):
1902       raise errors.OpPrereqError("Invalid secondary IP given")
1903     self.op.secondary_ip = secondary_ip
1904
1905     node_list = cfg.GetNodeList()
1906     if not self.op.readd and node in node_list:
1907       raise errors.OpPrereqError("Node %s is already in the configuration" %
1908                                  node)
1909     elif self.op.readd and node not in node_list:
1910       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1911
1912     for existing_node_name in node_list:
1913       existing_node = cfg.GetNodeInfo(existing_node_name)
1914
1915       if self.op.readd and node == existing_node_name:
1916         if (existing_node.primary_ip != primary_ip or
1917             existing_node.secondary_ip != secondary_ip):
1918           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1919                                      " address configuration as before")
1920         continue
1921
1922       if (existing_node.primary_ip == primary_ip or
1923           existing_node.secondary_ip == primary_ip or
1924           existing_node.primary_ip == secondary_ip or
1925           existing_node.secondary_ip == secondary_ip):
1926         raise errors.OpPrereqError("New node ip address(es) conflict with"
1927                                    " existing node %s" % existing_node.name)
1928
1929     # check that the type of the node (single versus dual homed) is the
1930     # same as for the master
1931     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1932     master_singlehomed = myself.secondary_ip == myself.primary_ip
1933     newbie_singlehomed = secondary_ip == primary_ip
1934     if master_singlehomed != newbie_singlehomed:
1935       if master_singlehomed:
1936         raise errors.OpPrereqError("The master has no private ip but the"
1937                                    " new node has one")
1938       else:
1939         raise errors.OpPrereqError("The master has a private ip but the"
1940                                    " new node doesn't have one")
1941
1942     # checks reachablity
1943     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1944       raise errors.OpPrereqError("Node not reachable by ping")
1945
1946     if not newbie_singlehomed:
1947       # check reachability from my secondary ip to newbie's secondary ip
1948       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1949                            source=myself.secondary_ip):
1950         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1951                                    " based ping to noded port")
1952
1953     self.new_node = objects.Node(name=node,
1954                                  primary_ip=primary_ip,
1955                                  secondary_ip=secondary_ip)
1956
1957   def Exec(self, feedback_fn):
1958     """Adds the new node to the cluster.
1959
1960     """
1961     new_node = self.new_node
1962     node = new_node.name
1963
1964     # check connectivity
1965     result = self.rpc.call_version([node])[node]
1966     result.Raise()
1967     if result.data:
1968       if constants.PROTOCOL_VERSION == result.data:
1969         logging.info("Communication to node %s fine, sw version %s match",
1970                      node, result.data)
1971       else:
1972         raise errors.OpExecError("Version mismatch master version %s,"
1973                                  " node version %s" %
1974                                  (constants.PROTOCOL_VERSION, result.data))
1975     else:
1976       raise errors.OpExecError("Cannot get version from the new node")
1977
1978     # setup ssh on node
1979     logging.info("Copy ssh key to node %s", node)
1980     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1981     keyarray = []
1982     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1983                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1984                 priv_key, pub_key]
1985
1986     for i in keyfiles:
1987       f = open(i, 'r')
1988       try:
1989         keyarray.append(f.read())
1990       finally:
1991         f.close()
1992
1993     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1994                                     keyarray[2],
1995                                     keyarray[3], keyarray[4], keyarray[5])
1996
1997     if result.failed or not result.data:
1998       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1999
2000     # Add node to our /etc/hosts, and add key to known_hosts
2001     utils.AddHostToEtcHosts(new_node.name)
2002
2003     if new_node.secondary_ip != new_node.primary_ip:
2004       result = self.rpc.call_node_has_ip_address(new_node.name,
2005                                                  new_node.secondary_ip)
2006       if result.failed or not result.data:
2007         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2008                                  " you gave (%s). Please fix and re-run this"
2009                                  " command." % new_node.secondary_ip)
2010
2011     node_verify_list = [self.cfg.GetMasterNode()]
2012     node_verify_param = {
2013       'nodelist': [node],
2014       # TODO: do a node-net-test as well?
2015     }
2016
2017     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2018                                        self.cfg.GetClusterName())
2019     for verifier in node_verify_list:
2020       if result.failed or not result[verifier].data:
2021         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2022                                  " for remote verification" % verifier)
2023       if result[verifier].data['nodelist']:
2024         for failed in result[verifier].data['nodelist']:
2025           feedback_fn("ssh/hostname verification failed %s -> %s" %
2026                       (verifier, result[verifier]['nodelist'][failed]))
2027         raise errors.OpExecError("ssh/hostname verification failed.")
2028
2029     # Distribute updated /etc/hosts and known_hosts to all nodes,
2030     # including the node just added
2031     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2032     dist_nodes = self.cfg.GetNodeList()
2033     if not self.op.readd:
2034       dist_nodes.append(node)
2035     if myself.name in dist_nodes:
2036       dist_nodes.remove(myself.name)
2037
2038     logging.debug("Copying hosts and known_hosts to all nodes")
2039     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2040       result = self.rpc.call_upload_file(dist_nodes, fname)
2041       for to_node, to_result in result.iteritems():
2042         if to_result.failed or not to_result.data:
2043           logging.error("Copy of file %s to node %s failed", fname, to_node)
2044
2045     to_copy = []
2046     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2047       to_copy.append(constants.VNC_PASSWORD_FILE)
2048     for fname in to_copy:
2049       result = self.rpc.call_upload_file([node], fname)
2050       if result[node].failed or not result[node]:
2051         logging.error("Could not copy file %s to node %s", fname, node)
2052
2053     if self.op.readd:
2054       self.context.ReaddNode(new_node)
2055     else:
2056       self.context.AddNode(new_node)
2057
2058
2059 class LUSetNodeParams(LogicalUnit):
2060   """Modifies the parameters of a node.
2061
2062   """
2063   HPATH = "node-modify"
2064   HTYPE = constants.HTYPE_NODE
2065   _OP_REQP = ["node_name"]
2066   REQ_BGL = False
2067
2068   def CheckArguments(self):
2069     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2070     if node_name is None:
2071       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2072     self.op.node_name = node_name
2073     if not hasattr(self.op, 'master_candidate'):
2074       raise errors.OpPrereqError("Please pass at least one modification")
2075     self.op.master_candidate = bool(self.op.master_candidate)
2076
2077   def ExpandNames(self):
2078     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2079
2080   def BuildHooksEnv(self):
2081     """Build hooks env.
2082
2083     This runs on the master node.
2084
2085     """
2086     env = {
2087       "OP_TARGET": self.op.node_name,
2088       "MASTER_CANDIDATE": str(self.op.master_candidate),
2089       }
2090     nl = [self.cfg.GetMasterNode(),
2091           self.op.node_name]
2092     return env, nl, nl
2093
2094   def CheckPrereq(self):
2095     """Check prerequisites.
2096
2097     This only checks the instance list against the existing names.
2098
2099     """
2100     force = self.force = self.op.force
2101
2102     if self.op.master_candidate == False:
2103       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2104       node_info = self.cfg.GetAllNodesInfo().values()
2105       num_candidates = len([node for node in node_info
2106                             if node.master_candidate])
2107       if num_candidates <= cp_size:
2108         msg = ("Not enough master candidates (desired"
2109                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2110         if force:
2111           self.LogWarning(msg)
2112         else:
2113           raise errors.OpPrereqError(msg)
2114
2115     return
2116
2117   def Exec(self, feedback_fn):
2118     """Modifies a node.
2119
2120     """
2121     node = self.cfg.GetNodeInfo(self.op.node_name)
2122
2123     result = []
2124
2125     if self.op.master_candidate is not None:
2126       node.master_candidate = self.op.master_candidate
2127       result.append(("master_candidate", str(self.op.master_candidate)))
2128
2129     # this will trigger configuration file update, if needed
2130     self.cfg.Update(node)
2131     # this will trigger job queue propagation or cleanup
2132     self.context.ReaddNode(node)
2133
2134     return result
2135
2136
2137 class LUQueryClusterInfo(NoHooksLU):
2138   """Query cluster configuration.
2139
2140   """
2141   _OP_REQP = []
2142   REQ_BGL = False
2143
2144   def ExpandNames(self):
2145     self.needed_locks = {}
2146
2147   def CheckPrereq(self):
2148     """No prerequsites needed for this LU.
2149
2150     """
2151     pass
2152
2153   def Exec(self, feedback_fn):
2154     """Return cluster config.
2155
2156     """
2157     cluster = self.cfg.GetClusterInfo()
2158     result = {
2159       "software_version": constants.RELEASE_VERSION,
2160       "protocol_version": constants.PROTOCOL_VERSION,
2161       "config_version": constants.CONFIG_VERSION,
2162       "os_api_version": constants.OS_API_VERSION,
2163       "export_version": constants.EXPORT_VERSION,
2164       "architecture": (platform.architecture()[0], platform.machine()),
2165       "name": cluster.cluster_name,
2166       "master": cluster.master_node,
2167       "default_hypervisor": cluster.default_hypervisor,
2168       "enabled_hypervisors": cluster.enabled_hypervisors,
2169       "hvparams": cluster.hvparams,
2170       "beparams": cluster.beparams,
2171       "candidate_pool_size": cluster.candidate_pool_size,
2172       }
2173
2174     return result
2175
2176
2177 class LUQueryConfigValues(NoHooksLU):
2178   """Return configuration values.
2179
2180   """
2181   _OP_REQP = []
2182   REQ_BGL = False
2183   _FIELDS_DYNAMIC = utils.FieldSet()
2184   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2185
2186   def ExpandNames(self):
2187     self.needed_locks = {}
2188
2189     _CheckOutputFields(static=self._FIELDS_STATIC,
2190                        dynamic=self._FIELDS_DYNAMIC,
2191                        selected=self.op.output_fields)
2192
2193   def CheckPrereq(self):
2194     """No prerequisites.
2195
2196     """
2197     pass
2198
2199   def Exec(self, feedback_fn):
2200     """Dump a representation of the cluster config to the standard output.
2201
2202     """
2203     values = []
2204     for field in self.op.output_fields:
2205       if field == "cluster_name":
2206         entry = self.cfg.GetClusterName()
2207       elif field == "master_node":
2208         entry = self.cfg.GetMasterNode()
2209       elif field == "drain_flag":
2210         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2211       else:
2212         raise errors.ParameterError(field)
2213       values.append(entry)
2214     return values
2215
2216
2217 class LUActivateInstanceDisks(NoHooksLU):
2218   """Bring up an instance's disks.
2219
2220   """
2221   _OP_REQP = ["instance_name"]
2222   REQ_BGL = False
2223
2224   def ExpandNames(self):
2225     self._ExpandAndLockInstance()
2226     self.needed_locks[locking.LEVEL_NODE] = []
2227     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2228
2229   def DeclareLocks(self, level):
2230     if level == locking.LEVEL_NODE:
2231       self._LockInstancesNodes()
2232
2233   def CheckPrereq(self):
2234     """Check prerequisites.
2235
2236     This checks that the instance is in the cluster.
2237
2238     """
2239     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2240     assert self.instance is not None, \
2241       "Cannot retrieve locked instance %s" % self.op.instance_name
2242
2243   def Exec(self, feedback_fn):
2244     """Activate the disks.
2245
2246     """
2247     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2248     if not disks_ok:
2249       raise errors.OpExecError("Cannot activate block devices")
2250
2251     return disks_info
2252
2253
2254 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2255   """Prepare the block devices for an instance.
2256
2257   This sets up the block devices on all nodes.
2258
2259   @type lu: L{LogicalUnit}
2260   @param lu: the logical unit on whose behalf we execute
2261   @type instance: L{objects.Instance}
2262   @param instance: the instance for whose disks we assemble
2263   @type ignore_secondaries: boolean
2264   @param ignore_secondaries: if true, errors on secondary nodes
2265       won't result in an error return from the function
2266   @return: False if the operation failed, otherwise a list of
2267       (host, instance_visible_name, node_visible_name)
2268       with the mapping from node devices to instance devices
2269
2270   """
2271   device_info = []
2272   disks_ok = True
2273   iname = instance.name
2274   # With the two passes mechanism we try to reduce the window of
2275   # opportunity for the race condition of switching DRBD to primary
2276   # before handshaking occured, but we do not eliminate it
2277
2278   # The proper fix would be to wait (with some limits) until the
2279   # connection has been made and drbd transitions from WFConnection
2280   # into any other network-connected state (Connected, SyncTarget,
2281   # SyncSource, etc.)
2282
2283   # 1st pass, assemble on all nodes in secondary mode
2284   for inst_disk in instance.disks:
2285     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2286       lu.cfg.SetDiskID(node_disk, node)
2287       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2288       if result.failed or not result:
2289         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2290                            " (is_primary=False, pass=1)",
2291                            inst_disk.iv_name, node)
2292         if not ignore_secondaries:
2293           disks_ok = False
2294
2295   # FIXME: race condition on drbd migration to primary
2296
2297   # 2nd pass, do only the primary node
2298   for inst_disk in instance.disks:
2299     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2300       if node != instance.primary_node:
2301         continue
2302       lu.cfg.SetDiskID(node_disk, node)
2303       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2304       if result.failed or not result:
2305         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2306                            " (is_primary=True, pass=2)",
2307                            inst_disk.iv_name, node)
2308         disks_ok = False
2309     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2310
2311   # leave the disks configured for the primary node
2312   # this is a workaround that would be fixed better by
2313   # improving the logical/physical id handling
2314   for disk in instance.disks:
2315     lu.cfg.SetDiskID(disk, instance.primary_node)
2316
2317   return disks_ok, device_info
2318
2319
2320 def _StartInstanceDisks(lu, instance, force):
2321   """Start the disks of an instance.
2322
2323   """
2324   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2325                                            ignore_secondaries=force)
2326   if not disks_ok:
2327     _ShutdownInstanceDisks(lu, instance)
2328     if force is not None and not force:
2329       lu.proc.LogWarning("", hint="If the message above refers to a"
2330                          " secondary node,"
2331                          " you can retry the operation using '--force'.")
2332     raise errors.OpExecError("Disk consistency error")
2333
2334
2335 class LUDeactivateInstanceDisks(NoHooksLU):
2336   """Shutdown an instance's disks.
2337
2338   """
2339   _OP_REQP = ["instance_name"]
2340   REQ_BGL = False
2341
2342   def ExpandNames(self):
2343     self._ExpandAndLockInstance()
2344     self.needed_locks[locking.LEVEL_NODE] = []
2345     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2346
2347   def DeclareLocks(self, level):
2348     if level == locking.LEVEL_NODE:
2349       self._LockInstancesNodes()
2350
2351   def CheckPrereq(self):
2352     """Check prerequisites.
2353
2354     This checks that the instance is in the cluster.
2355
2356     """
2357     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2358     assert self.instance is not None, \
2359       "Cannot retrieve locked instance %s" % self.op.instance_name
2360
2361   def Exec(self, feedback_fn):
2362     """Deactivate the disks
2363
2364     """
2365     instance = self.instance
2366     _SafeShutdownInstanceDisks(self, instance)
2367
2368
2369 def _SafeShutdownInstanceDisks(lu, instance):
2370   """Shutdown block devices of an instance.
2371
2372   This function checks if an instance is running, before calling
2373   _ShutdownInstanceDisks.
2374
2375   """
2376   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2377                                       [instance.hypervisor])
2378   ins_l = ins_l[instance.primary_node]
2379   if ins_l.failed or not isinstance(ins_l.data, list):
2380     raise errors.OpExecError("Can't contact node '%s'" %
2381                              instance.primary_node)
2382
2383   if instance.name in ins_l.data:
2384     raise errors.OpExecError("Instance is running, can't shutdown"
2385                              " block devices.")
2386
2387   _ShutdownInstanceDisks(lu, instance)
2388
2389
2390 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2391   """Shutdown block devices of an instance.
2392
2393   This does the shutdown on all nodes of the instance.
2394
2395   If the ignore_primary is false, errors on the primary node are
2396   ignored.
2397
2398   """
2399   result = True
2400   for disk in instance.disks:
2401     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2402       lu.cfg.SetDiskID(top_disk, node)
2403       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2404       if result.failed or not result.data:
2405         logging.error("Could not shutdown block device %s on node %s",
2406                       disk.iv_name, node)
2407         if not ignore_primary or node != instance.primary_node:
2408           result = False
2409   return result
2410
2411
2412 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2413   """Checks if a node has enough free memory.
2414
2415   This function check if a given node has the needed amount of free
2416   memory. In case the node has less memory or we cannot get the
2417   information from the node, this function raise an OpPrereqError
2418   exception.
2419
2420   @type lu: C{LogicalUnit}
2421   @param lu: a logical unit from which we get configuration data
2422   @type node: C{str}
2423   @param node: the node to check
2424   @type reason: C{str}
2425   @param reason: string to use in the error message
2426   @type requested: C{int}
2427   @param requested: the amount of memory in MiB to check for
2428   @type hypervisor: C{str}
2429   @param hypervisor: the hypervisor to ask for memory stats
2430   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2431       we cannot check the node
2432
2433   """
2434   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2435   nodeinfo[node].Raise()
2436   free_mem = nodeinfo[node].data.get('memory_free')
2437   if not isinstance(free_mem, int):
2438     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2439                              " was '%s'" % (node, free_mem))
2440   if requested > free_mem:
2441     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2442                              " needed %s MiB, available %s MiB" %
2443                              (node, reason, requested, free_mem))
2444
2445
2446 class LUStartupInstance(LogicalUnit):
2447   """Starts an instance.
2448
2449   """
2450   HPATH = "instance-start"
2451   HTYPE = constants.HTYPE_INSTANCE
2452   _OP_REQP = ["instance_name", "force"]
2453   REQ_BGL = False
2454
2455   def ExpandNames(self):
2456     self._ExpandAndLockInstance()
2457
2458   def BuildHooksEnv(self):
2459     """Build hooks env.
2460
2461     This runs on master, primary and secondary nodes of the instance.
2462
2463     """
2464     env = {
2465       "FORCE": self.op.force,
2466       }
2467     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2468     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2469           list(self.instance.secondary_nodes))
2470     return env, nl, nl
2471
2472   def CheckPrereq(self):
2473     """Check prerequisites.
2474
2475     This checks that the instance is in the cluster.
2476
2477     """
2478     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2479     assert self.instance is not None, \
2480       "Cannot retrieve locked instance %s" % self.op.instance_name
2481
2482     bep = self.cfg.GetClusterInfo().FillBE(instance)
2483     # check bridges existance
2484     _CheckInstanceBridgesExist(self, instance)
2485
2486     _CheckNodeFreeMemory(self, instance.primary_node,
2487                          "starting instance %s" % instance.name,
2488                          bep[constants.BE_MEMORY], instance.hypervisor)
2489
2490   def Exec(self, feedback_fn):
2491     """Start the instance.
2492
2493     """
2494     instance = self.instance
2495     force = self.op.force
2496     extra_args = getattr(self.op, "extra_args", "")
2497
2498     self.cfg.MarkInstanceUp(instance.name)
2499
2500     node_current = instance.primary_node
2501
2502     _StartInstanceDisks(self, instance, force)
2503
2504     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2505     if result.failed or not result.data:
2506       _ShutdownInstanceDisks(self, instance)
2507       raise errors.OpExecError("Could not start instance")
2508
2509
2510 class LURebootInstance(LogicalUnit):
2511   """Reboot an instance.
2512
2513   """
2514   HPATH = "instance-reboot"
2515   HTYPE = constants.HTYPE_INSTANCE
2516   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2517   REQ_BGL = False
2518
2519   def ExpandNames(self):
2520     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2521                                    constants.INSTANCE_REBOOT_HARD,
2522                                    constants.INSTANCE_REBOOT_FULL]:
2523       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2524                                   (constants.INSTANCE_REBOOT_SOFT,
2525                                    constants.INSTANCE_REBOOT_HARD,
2526                                    constants.INSTANCE_REBOOT_FULL))
2527     self._ExpandAndLockInstance()
2528
2529   def BuildHooksEnv(self):
2530     """Build hooks env.
2531
2532     This runs on master, primary and secondary nodes of the instance.
2533
2534     """
2535     env = {
2536       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2537       }
2538     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2539     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2540           list(self.instance.secondary_nodes))
2541     return env, nl, nl
2542
2543   def CheckPrereq(self):
2544     """Check prerequisites.
2545
2546     This checks that the instance is in the cluster.
2547
2548     """
2549     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2550     assert self.instance is not None, \
2551       "Cannot retrieve locked instance %s" % self.op.instance_name
2552
2553     # check bridges existance
2554     _CheckInstanceBridgesExist(self, instance)
2555
2556   def Exec(self, feedback_fn):
2557     """Reboot the instance.
2558
2559     """
2560     instance = self.instance
2561     ignore_secondaries = self.op.ignore_secondaries
2562     reboot_type = self.op.reboot_type
2563     extra_args = getattr(self.op, "extra_args", "")
2564
2565     node_current = instance.primary_node
2566
2567     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2568                        constants.INSTANCE_REBOOT_HARD]:
2569       result = self.rpc.call_instance_reboot(node_current, instance,
2570                                              reboot_type, extra_args)
2571       if result.failed or not result.data:
2572         raise errors.OpExecError("Could not reboot instance")
2573     else:
2574       if not self.rpc.call_instance_shutdown(node_current, instance):
2575         raise errors.OpExecError("could not shutdown instance for full reboot")
2576       _ShutdownInstanceDisks(self, instance)
2577       _StartInstanceDisks(self, instance, ignore_secondaries)
2578       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2579       if result.failed or not result.data:
2580         _ShutdownInstanceDisks(self, instance)
2581         raise errors.OpExecError("Could not start instance for full reboot")
2582
2583     self.cfg.MarkInstanceUp(instance.name)
2584
2585
2586 class LUShutdownInstance(LogicalUnit):
2587   """Shutdown an instance.
2588
2589   """
2590   HPATH = "instance-stop"
2591   HTYPE = constants.HTYPE_INSTANCE
2592   _OP_REQP = ["instance_name"]
2593   REQ_BGL = False
2594
2595   def ExpandNames(self):
2596     self._ExpandAndLockInstance()
2597
2598   def BuildHooksEnv(self):
2599     """Build hooks env.
2600
2601     This runs on master, primary and secondary nodes of the instance.
2602
2603     """
2604     env = _BuildInstanceHookEnvByObject(self, self.instance)
2605     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2606           list(self.instance.secondary_nodes))
2607     return env, nl, nl
2608
2609   def CheckPrereq(self):
2610     """Check prerequisites.
2611
2612     This checks that the instance is in the cluster.
2613
2614     """
2615     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2616     assert self.instance is not None, \
2617       "Cannot retrieve locked instance %s" % self.op.instance_name
2618
2619   def Exec(self, feedback_fn):
2620     """Shutdown the instance.
2621
2622     """
2623     instance = self.instance
2624     node_current = instance.primary_node
2625     self.cfg.MarkInstanceDown(instance.name)
2626     result = self.rpc.call_instance_shutdown(node_current, instance)
2627     if result.failed or not result.data:
2628       self.proc.LogWarning("Could not shutdown instance")
2629
2630     _ShutdownInstanceDisks(self, instance)
2631
2632
2633 class LUReinstallInstance(LogicalUnit):
2634   """Reinstall an instance.
2635
2636   """
2637   HPATH = "instance-reinstall"
2638   HTYPE = constants.HTYPE_INSTANCE
2639   _OP_REQP = ["instance_name"]
2640   REQ_BGL = False
2641
2642   def ExpandNames(self):
2643     self._ExpandAndLockInstance()
2644
2645   def BuildHooksEnv(self):
2646     """Build hooks env.
2647
2648     This runs on master, primary and secondary nodes of the instance.
2649
2650     """
2651     env = _BuildInstanceHookEnvByObject(self, self.instance)
2652     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2653           list(self.instance.secondary_nodes))
2654     return env, nl, nl
2655
2656   def CheckPrereq(self):
2657     """Check prerequisites.
2658
2659     This checks that the instance is in the cluster and is not running.
2660
2661     """
2662     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2663     assert instance is not None, \
2664       "Cannot retrieve locked instance %s" % self.op.instance_name
2665
2666     if instance.disk_template == constants.DT_DISKLESS:
2667       raise errors.OpPrereqError("Instance '%s' has no disks" %
2668                                  self.op.instance_name)
2669     if instance.status != "down":
2670       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2671                                  self.op.instance_name)
2672     remote_info = self.rpc.call_instance_info(instance.primary_node,
2673                                               instance.name,
2674                                               instance.hypervisor)
2675     if remote_info.failed or remote_info.data:
2676       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2677                                  (self.op.instance_name,
2678                                   instance.primary_node))
2679
2680     self.op.os_type = getattr(self.op, "os_type", None)
2681     if self.op.os_type is not None:
2682       # OS verification
2683       pnode = self.cfg.GetNodeInfo(
2684         self.cfg.ExpandNodeName(instance.primary_node))
2685       if pnode is None:
2686         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2687                                    self.op.pnode)
2688       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2689       result.Raise()
2690       if not isinstance(result.data, objects.OS):
2691         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2692                                    " primary node"  % self.op.os_type)
2693
2694     self.instance = instance
2695
2696   def Exec(self, feedback_fn):
2697     """Reinstall the instance.
2698
2699     """
2700     inst = self.instance
2701
2702     if self.op.os_type is not None:
2703       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2704       inst.os = self.op.os_type
2705       self.cfg.Update(inst)
2706
2707     _StartInstanceDisks(self, inst, None)
2708     try:
2709       feedback_fn("Running the instance OS create scripts...")
2710       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2711       result.Raise()
2712       if not result.data:
2713         raise errors.OpExecError("Could not install OS for instance %s"
2714                                  " on node %s" %
2715                                  (inst.name, inst.primary_node))
2716     finally:
2717       _ShutdownInstanceDisks(self, inst)
2718
2719
2720 class LURenameInstance(LogicalUnit):
2721   """Rename an instance.
2722
2723   """
2724   HPATH = "instance-rename"
2725   HTYPE = constants.HTYPE_INSTANCE
2726   _OP_REQP = ["instance_name", "new_name"]
2727
2728   def BuildHooksEnv(self):
2729     """Build hooks env.
2730
2731     This runs on master, primary and secondary nodes of the instance.
2732
2733     """
2734     env = _BuildInstanceHookEnvByObject(self, self.instance)
2735     env["INSTANCE_NEW_NAME"] = self.op.new_name
2736     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2737           list(self.instance.secondary_nodes))
2738     return env, nl, nl
2739
2740   def CheckPrereq(self):
2741     """Check prerequisites.
2742
2743     This checks that the instance is in the cluster and is not running.
2744
2745     """
2746     instance = self.cfg.GetInstanceInfo(
2747       self.cfg.ExpandInstanceName(self.op.instance_name))
2748     if instance is None:
2749       raise errors.OpPrereqError("Instance '%s' not known" %
2750                                  self.op.instance_name)
2751     if instance.status != "down":
2752       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2753                                  self.op.instance_name)
2754     remote_info = self.rpc.call_instance_info(instance.primary_node,
2755                                               instance.name,
2756                                               instance.hypervisor)
2757     remote_info.Raise()
2758     if remote_info.data:
2759       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2760                                  (self.op.instance_name,
2761                                   instance.primary_node))
2762     self.instance = instance
2763
2764     # new name verification
2765     name_info = utils.HostInfo(self.op.new_name)
2766
2767     self.op.new_name = new_name = name_info.name
2768     instance_list = self.cfg.GetInstanceList()
2769     if new_name in instance_list:
2770       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2771                                  new_name)
2772
2773     if not getattr(self.op, "ignore_ip", False):
2774       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2775         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2776                                    (name_info.ip, new_name))
2777
2778
2779   def Exec(self, feedback_fn):
2780     """Reinstall the instance.
2781
2782     """
2783     inst = self.instance
2784     old_name = inst.name
2785
2786     if inst.disk_template == constants.DT_FILE:
2787       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2788
2789     self.cfg.RenameInstance(inst.name, self.op.new_name)
2790     # Change the instance lock. This is definitely safe while we hold the BGL
2791     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2792     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2793
2794     # re-read the instance from the configuration after rename
2795     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2796
2797     if inst.disk_template == constants.DT_FILE:
2798       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2799       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2800                                                      old_file_storage_dir,
2801                                                      new_file_storage_dir)
2802       result.Raise()
2803       if not result.data:
2804         raise errors.OpExecError("Could not connect to node '%s' to rename"
2805                                  " directory '%s' to '%s' (but the instance"
2806                                  " has been renamed in Ganeti)" % (
2807                                  inst.primary_node, old_file_storage_dir,
2808                                  new_file_storage_dir))
2809
2810       if not result.data[0]:
2811         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2812                                  " (but the instance has been renamed in"
2813                                  " Ganeti)" % (old_file_storage_dir,
2814                                                new_file_storage_dir))
2815
2816     _StartInstanceDisks(self, inst, None)
2817     try:
2818       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2819                                                  old_name)
2820       if result.failed or not result.data:
2821         msg = ("Could not run OS rename script for instance %s on node %s"
2822                " (but the instance has been renamed in Ganeti)" %
2823                (inst.name, inst.primary_node))
2824         self.proc.LogWarning(msg)
2825     finally:
2826       _ShutdownInstanceDisks(self, inst)
2827
2828
2829 class LURemoveInstance(LogicalUnit):
2830   """Remove an instance.
2831
2832   """
2833   HPATH = "instance-remove"
2834   HTYPE = constants.HTYPE_INSTANCE
2835   _OP_REQP = ["instance_name", "ignore_failures"]
2836   REQ_BGL = False
2837
2838   def ExpandNames(self):
2839     self._ExpandAndLockInstance()
2840     self.needed_locks[locking.LEVEL_NODE] = []
2841     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2842
2843   def DeclareLocks(self, level):
2844     if level == locking.LEVEL_NODE:
2845       self._LockInstancesNodes()
2846
2847   def BuildHooksEnv(self):
2848     """Build hooks env.
2849
2850     This runs on master, primary and secondary nodes of the instance.
2851
2852     """
2853     env = _BuildInstanceHookEnvByObject(self, self.instance)
2854     nl = [self.cfg.GetMasterNode()]
2855     return env, nl, nl
2856
2857   def CheckPrereq(self):
2858     """Check prerequisites.
2859
2860     This checks that the instance is in the cluster.
2861
2862     """
2863     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2864     assert self.instance is not None, \
2865       "Cannot retrieve locked instance %s" % self.op.instance_name
2866
2867   def Exec(self, feedback_fn):
2868     """Remove the instance.
2869
2870     """
2871     instance = self.instance
2872     logging.info("Shutting down instance %s on node %s",
2873                  instance.name, instance.primary_node)
2874
2875     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2876     if result.failed or not result.data:
2877       if self.op.ignore_failures:
2878         feedback_fn("Warning: can't shutdown instance")
2879       else:
2880         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2881                                  (instance.name, instance.primary_node))
2882
2883     logging.info("Removing block devices for instance %s", instance.name)
2884
2885     if not _RemoveDisks(self, instance):
2886       if self.op.ignore_failures:
2887         feedback_fn("Warning: can't remove instance's disks")
2888       else:
2889         raise errors.OpExecError("Can't remove instance's disks")
2890
2891     logging.info("Removing instance %s out of cluster config", instance.name)
2892
2893     self.cfg.RemoveInstance(instance.name)
2894     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2895
2896
2897 class LUQueryInstances(NoHooksLU):
2898   """Logical unit for querying instances.
2899
2900   """
2901   _OP_REQP = ["output_fields", "names"]
2902   REQ_BGL = False
2903   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2904                                     "admin_state", "admin_ram",
2905                                     "disk_template", "ip", "mac", "bridge",
2906                                     "sda_size", "sdb_size", "vcpus", "tags",
2907                                     "network_port", "beparams",
2908                                     "(disk).(size)/([0-9]+)",
2909                                     "(disk).(sizes)",
2910                                     "(nic).(mac|ip|bridge)/([0-9]+)",
2911                                     "(nic).(macs|ips|bridges)",
2912                                     "(disk|nic).(count)",
2913                                     "serial_no", "hypervisor", "hvparams",] +
2914                                   ["hv/%s" % name
2915                                    for name in constants.HVS_PARAMETERS] +
2916                                   ["be/%s" % name
2917                                    for name in constants.BES_PARAMETERS])
2918   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2919
2920
2921   def ExpandNames(self):
2922     _CheckOutputFields(static=self._FIELDS_STATIC,
2923                        dynamic=self._FIELDS_DYNAMIC,
2924                        selected=self.op.output_fields)
2925
2926     self.needed_locks = {}
2927     self.share_locks[locking.LEVEL_INSTANCE] = 1
2928     self.share_locks[locking.LEVEL_NODE] = 1
2929
2930     if self.op.names:
2931       self.wanted = _GetWantedInstances(self, self.op.names)
2932     else:
2933       self.wanted = locking.ALL_SET
2934
2935     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2936     if self.do_locking:
2937       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2938       self.needed_locks[locking.LEVEL_NODE] = []
2939       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2940
2941   def DeclareLocks(self, level):
2942     if level == locking.LEVEL_NODE and self.do_locking:
2943       self._LockInstancesNodes()
2944
2945   def CheckPrereq(self):
2946     """Check prerequisites.
2947
2948     """
2949     pass
2950
2951   def Exec(self, feedback_fn):
2952     """Computes the list of nodes and their attributes.
2953
2954     """
2955     all_info = self.cfg.GetAllInstancesInfo()
2956     if self.do_locking:
2957       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2958     elif self.wanted != locking.ALL_SET:
2959       instance_names = self.wanted
2960       missing = set(instance_names).difference(all_info.keys())
2961       if missing:
2962         raise errors.OpExecError(
2963           "Some instances were removed before retrieving their data: %s"
2964           % missing)
2965     else:
2966       instance_names = all_info.keys()
2967
2968     instance_names = utils.NiceSort(instance_names)
2969     instance_list = [all_info[iname] for iname in instance_names]
2970
2971     # begin data gathering
2972
2973     nodes = frozenset([inst.primary_node for inst in instance_list])
2974     hv_list = list(set([inst.hypervisor for inst in instance_list]))
2975
2976     bad_nodes = []
2977     if self.do_locking:
2978       live_data = {}
2979       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2980       for name in nodes:
2981         result = node_data[name]
2982         if result.failed:
2983           bad_nodes.append(name)
2984         else:
2985           if result.data:
2986             live_data.update(result.data)
2987             # else no instance is alive
2988     else:
2989       live_data = dict([(name, {}) for name in instance_names])
2990
2991     # end data gathering
2992
2993     HVPREFIX = "hv/"
2994     BEPREFIX = "be/"
2995     output = []
2996     for instance in instance_list:
2997       iout = []
2998       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2999       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3000       for field in self.op.output_fields:
3001         st_match = self._FIELDS_STATIC.Matches(field)
3002         if field == "name":
3003           val = instance.name
3004         elif field == "os":
3005           val = instance.os
3006         elif field == "pnode":
3007           val = instance.primary_node
3008         elif field == "snodes":
3009           val = list(instance.secondary_nodes)
3010         elif field == "admin_state":
3011           val = (instance.status != "down")
3012         elif field == "oper_state":
3013           if instance.primary_node in bad_nodes:
3014             val = None
3015           else:
3016             val = bool(live_data.get(instance.name))
3017         elif field == "status":
3018           if instance.primary_node in bad_nodes:
3019             val = "ERROR_nodedown"
3020           else:
3021             running = bool(live_data.get(instance.name))
3022             if running:
3023               if instance.status != "down":
3024                 val = "running"
3025               else:
3026                 val = "ERROR_up"
3027             else:
3028               if instance.status != "down":
3029                 val = "ERROR_down"
3030               else:
3031                 val = "ADMIN_down"
3032         elif field == "oper_ram":
3033           if instance.primary_node in bad_nodes:
3034             val = None
3035           elif instance.name in live_data:
3036             val = live_data[instance.name].get("memory", "?")
3037           else:
3038             val = "-"
3039         elif field == "disk_template":
3040           val = instance.disk_template
3041         elif field == "ip":
3042           val = instance.nics[0].ip
3043         elif field == "bridge":
3044           val = instance.nics[0].bridge
3045         elif field == "mac":
3046           val = instance.nics[0].mac
3047         elif field == "sda_size" or field == "sdb_size":
3048           idx = ord(field[2]) - ord('a')
3049           try:
3050             val = instance.FindDisk(idx).size
3051           except errors.OpPrereqError:
3052             val = None
3053         elif field == "tags":
3054           val = list(instance.GetTags())
3055         elif field == "serial_no":
3056           val = instance.serial_no
3057         elif field == "network_port":
3058           val = instance.network_port
3059         elif field == "hypervisor":
3060           val = instance.hypervisor
3061         elif field == "hvparams":
3062           val = i_hv
3063         elif (field.startswith(HVPREFIX) and
3064               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3065           val = i_hv.get(field[len(HVPREFIX):], None)
3066         elif field == "beparams":
3067           val = i_be
3068         elif (field.startswith(BEPREFIX) and
3069               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3070           val = i_be.get(field[len(BEPREFIX):], None)
3071         elif st_match and st_match.groups():
3072           # matches a variable list
3073           st_groups = st_match.groups()
3074           if st_groups and st_groups[0] == "disk":
3075             if st_groups[1] == "count":
3076               val = len(instance.disks)
3077             elif st_groups[1] == "sizes":
3078               val = [disk.size for disk in instance.disks]
3079             elif st_groups[1] == "size":
3080               try:
3081                 val = instance.FindDisk(st_groups[2]).size
3082               except errors.OpPrereqError:
3083                 val = None
3084             else:
3085               assert False, "Unhandled disk parameter"
3086           elif st_groups[0] == "nic":
3087             if st_groups[1] == "count":
3088               val = len(instance.nics)
3089             elif st_groups[1] == "macs":
3090               val = [nic.mac for nic in instance.nics]
3091             elif st_groups[1] == "ips":
3092               val = [nic.ip for nic in instance.nics]
3093             elif st_groups[1] == "bridges":
3094               val = [nic.bridge for nic in instance.nics]
3095             else:
3096               # index-based item
3097               nic_idx = int(st_groups[2])
3098               if nic_idx >= len(instance.nics):
3099                 val = None
3100               else:
3101                 if st_groups[1] == "mac":
3102                   val = instance.nics[nic_idx].mac
3103                 elif st_groups[1] == "ip":
3104                   val = instance.nics[nic_idx].ip
3105                 elif st_groups[1] == "bridge":
3106                   val = instance.nics[nic_idx].bridge
3107                 else:
3108                   assert False, "Unhandled NIC parameter"
3109           else:
3110             assert False, "Unhandled variable parameter"
3111         else:
3112           raise errors.ParameterError(field)
3113         iout.append(val)
3114       output.append(iout)
3115
3116     return output
3117
3118
3119 class LUFailoverInstance(LogicalUnit):
3120   """Failover an instance.
3121
3122   """
3123   HPATH = "instance-failover"
3124   HTYPE = constants.HTYPE_INSTANCE
3125   _OP_REQP = ["instance_name", "ignore_consistency"]
3126   REQ_BGL = False
3127
3128   def ExpandNames(self):
3129     self._ExpandAndLockInstance()
3130     self.needed_locks[locking.LEVEL_NODE] = []
3131     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3132
3133   def DeclareLocks(self, level):
3134     if level == locking.LEVEL_NODE:
3135       self._LockInstancesNodes()
3136
3137   def BuildHooksEnv(self):
3138     """Build hooks env.
3139
3140     This runs on master, primary and secondary nodes of the instance.
3141
3142     """
3143     env = {
3144       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3145       }
3146     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3147     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3148     return env, nl, nl
3149
3150   def CheckPrereq(self):
3151     """Check prerequisites.
3152
3153     This checks that the instance is in the cluster.
3154
3155     """
3156     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3157     assert self.instance is not None, \
3158       "Cannot retrieve locked instance %s" % self.op.instance_name
3159
3160     bep = self.cfg.GetClusterInfo().FillBE(instance)
3161     if instance.disk_template not in constants.DTS_NET_MIRROR:
3162       raise errors.OpPrereqError("Instance's disk layout is not"
3163                                  " network mirrored, cannot failover.")
3164
3165     secondary_nodes = instance.secondary_nodes
3166     if not secondary_nodes:
3167       raise errors.ProgrammerError("no secondary node but using "
3168                                    "a mirrored disk template")
3169
3170     target_node = secondary_nodes[0]
3171     # check memory requirements on the secondary node
3172     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3173                          instance.name, bep[constants.BE_MEMORY],
3174                          instance.hypervisor)
3175
3176     # check bridge existance
3177     brlist = [nic.bridge for nic in instance.nics]
3178     result = self.rpc.call_bridges_exist(target_node, brlist)
3179     result.Raise()
3180     if not result.data:
3181       raise errors.OpPrereqError("One or more target bridges %s does not"
3182                                  " exist on destination node '%s'" %
3183                                  (brlist, target_node))
3184
3185   def Exec(self, feedback_fn):
3186     """Failover an instance.
3187
3188     The failover is done by shutting it down on its present node and
3189     starting it on the secondary.
3190
3191     """
3192     instance = self.instance
3193
3194     source_node = instance.primary_node
3195     target_node = instance.secondary_nodes[0]
3196
3197     feedback_fn("* checking disk consistency between source and target")
3198     for dev in instance.disks:
3199       # for drbd, these are drbd over lvm
3200       if not _CheckDiskConsistency(self, dev, target_node, False):
3201         if instance.status == "up" and not self.op.ignore_consistency:
3202           raise errors.OpExecError("Disk %s is degraded on target node,"
3203                                    " aborting failover." % dev.iv_name)
3204
3205     feedback_fn("* shutting down instance on source node")
3206     logging.info("Shutting down instance %s on node %s",
3207                  instance.name, source_node)
3208
3209     result = self.rpc.call_instance_shutdown(source_node, instance)
3210     if result.failed or not result.data:
3211       if self.op.ignore_consistency:
3212         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3213                              " Proceeding"
3214                              " anyway. Please make sure node %s is down",
3215                              instance.name, source_node, source_node)
3216       else:
3217         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3218                                  (instance.name, source_node))
3219
3220     feedback_fn("* deactivating the instance's disks on source node")
3221     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3222       raise errors.OpExecError("Can't shut down the instance's disks.")
3223
3224     instance.primary_node = target_node
3225     # distribute new instance config to the other nodes
3226     self.cfg.Update(instance)
3227
3228     # Only start the instance if it's marked as up
3229     if instance.status == "up":
3230       feedback_fn("* activating the instance's disks on target node")
3231       logging.info("Starting instance %s on node %s",
3232                    instance.name, target_node)
3233
3234       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3235                                                ignore_secondaries=True)
3236       if not disks_ok:
3237         _ShutdownInstanceDisks(self, instance)
3238         raise errors.OpExecError("Can't activate the instance's disks")
3239
3240       feedback_fn("* starting the instance on the target node")
3241       result = self.rpc.call_instance_start(target_node, instance, None)
3242       if result.failed or not result.data:
3243         _ShutdownInstanceDisks(self, instance)
3244         raise errors.OpExecError("Could not start instance %s on node %s." %
3245                                  (instance.name, target_node))
3246
3247
3248 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3249   """Create a tree of block devices on the primary node.
3250
3251   This always creates all devices.
3252
3253   """
3254   if device.children:
3255     for child in device.children:
3256       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3257         return False
3258
3259   lu.cfg.SetDiskID(device, node)
3260   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3261                                        instance.name, True, info)
3262   if new_id.failed or not new_id.data:
3263     return False
3264   if device.physical_id is None:
3265     device.physical_id = new_id
3266   return True
3267
3268
3269 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3270   """Create a tree of block devices on a secondary node.
3271
3272   If this device type has to be created on secondaries, create it and
3273   all its children.
3274
3275   If not, just recurse to children keeping the same 'force' value.
3276
3277   """
3278   if device.CreateOnSecondary():
3279     force = True
3280   if device.children:
3281     for child in device.children:
3282       if not _CreateBlockDevOnSecondary(lu, node, instance,
3283                                         child, force, info):
3284         return False
3285
3286   if not force:
3287     return True
3288   lu.cfg.SetDiskID(device, node)
3289   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3290                                        instance.name, False, info)
3291   if new_id.failed or not new_id.data:
3292     return False
3293   if device.physical_id is None:
3294     device.physical_id = new_id
3295   return True
3296
3297
3298 def _GenerateUniqueNames(lu, exts):
3299   """Generate a suitable LV name.
3300
3301   This will generate a logical volume name for the given instance.
3302
3303   """
3304   results = []
3305   for val in exts:
3306     new_id = lu.cfg.GenerateUniqueID()
3307     results.append("%s%s" % (new_id, val))
3308   return results
3309
3310
3311 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3312                          p_minor, s_minor):
3313   """Generate a drbd8 device complete with its children.
3314
3315   """
3316   port = lu.cfg.AllocatePort()
3317   vgname = lu.cfg.GetVGName()
3318   shared_secret = lu.cfg.GenerateDRBDSecret()
3319   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3320                           logical_id=(vgname, names[0]))
3321   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3322                           logical_id=(vgname, names[1]))
3323   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3324                           logical_id=(primary, secondary, port,
3325                                       p_minor, s_minor,
3326                                       shared_secret),
3327                           children=[dev_data, dev_meta],
3328                           iv_name=iv_name)
3329   return drbd_dev
3330
3331
3332 def _GenerateDiskTemplate(lu, template_name,
3333                           instance_name, primary_node,
3334                           secondary_nodes, disk_info,
3335                           file_storage_dir, file_driver,
3336                           base_index):
3337   """Generate the entire disk layout for a given template type.
3338
3339   """
3340   #TODO: compute space requirements
3341
3342   vgname = lu.cfg.GetVGName()
3343   disk_count = len(disk_info)
3344   disks = []
3345   if template_name == constants.DT_DISKLESS:
3346     pass
3347   elif template_name == constants.DT_PLAIN:
3348     if len(secondary_nodes) != 0:
3349       raise errors.ProgrammerError("Wrong template configuration")
3350
3351     names = _GenerateUniqueNames(lu, [".disk%d" % i
3352                                       for i in range(disk_count)])
3353     for idx, disk in enumerate(disk_info):
3354       disk_index = idx + base_index
3355       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3356                               logical_id=(vgname, names[idx]),
3357                               iv_name="disk/%d" % disk_index)
3358       disks.append(disk_dev)
3359   elif template_name == constants.DT_DRBD8:
3360     if len(secondary_nodes) != 1:
3361       raise errors.ProgrammerError("Wrong template configuration")
3362     remote_node = secondary_nodes[0]
3363     minors = lu.cfg.AllocateDRBDMinor(
3364       [primary_node, remote_node] * len(disk_info), instance_name)
3365
3366     names = _GenerateUniqueNames(lu,
3367                                  [".disk%d_%s" % (i, s)
3368                                   for i in range(disk_count)
3369                                   for s in ("data", "meta")
3370                                   ])
3371     for idx, disk in enumerate(disk_info):
3372       disk_index = idx + base_index
3373       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3374                                       disk["size"], names[idx*2:idx*2+2],
3375                                       "disk/%d" % disk_index,
3376                                       minors[idx*2], minors[idx*2+1])
3377       disks.append(disk_dev)
3378   elif template_name == constants.DT_FILE:
3379     if len(secondary_nodes) != 0:
3380       raise errors.ProgrammerError("Wrong template configuration")
3381
3382     for idx, disk in enumerate(disk_info):
3383       disk_index = idx + base_index
3384       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3385                               iv_name="disk/%d" % disk_index,
3386                               logical_id=(file_driver,
3387                                           "%s/disk%d" % (file_storage_dir,
3388                                                          idx)))
3389       disks.append(disk_dev)
3390   else:
3391     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3392   return disks
3393
3394
3395 def _GetInstanceInfoText(instance):
3396   """Compute that text that should be added to the disk's metadata.
3397
3398   """
3399   return "originstname+%s" % instance.name
3400
3401
3402 def _CreateDisks(lu, instance):
3403   """Create all disks for an instance.
3404
3405   This abstracts away some work from AddInstance.
3406
3407   @type lu: L{LogicalUnit}
3408   @param lu: the logical unit on whose behalf we execute
3409   @type instance: L{objects.Instance}
3410   @param instance: the instance whose disks we should create
3411   @rtype: boolean
3412   @return: the success of the creation
3413
3414   """
3415   info = _GetInstanceInfoText(instance)
3416
3417   if instance.disk_template == constants.DT_FILE:
3418     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3419     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3420                                                  file_storage_dir)
3421
3422     if result.failed or not result.data:
3423       logging.error("Could not connect to node '%s'", instance.primary_node)
3424       return False
3425
3426     if not result.data[0]:
3427       logging.error("Failed to create directory '%s'", file_storage_dir)
3428       return False
3429
3430   # Note: this needs to be kept in sync with adding of disks in
3431   # LUSetInstanceParams
3432   for device in instance.disks:
3433     logging.info("Creating volume %s for instance %s",
3434                  device.iv_name, instance.name)
3435     #HARDCODE
3436     for secondary_node in instance.secondary_nodes:
3437       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3438                                         device, False, info):
3439         logging.error("Failed to create volume %s (%s) on secondary node %s!",
3440                       device.iv_name, device, secondary_node)
3441         return False
3442     #HARDCODE
3443     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3444                                     instance, device, info):
3445       logging.error("Failed to create volume %s on primary!", device.iv_name)
3446       return False
3447
3448   return True
3449
3450
3451 def _RemoveDisks(lu, instance):
3452   """Remove all disks for an instance.
3453
3454   This abstracts away some work from `AddInstance()` and
3455   `RemoveInstance()`. Note that in case some of the devices couldn't
3456   be removed, the removal will continue with the other ones (compare
3457   with `_CreateDisks()`).
3458
3459   @type lu: L{LogicalUnit}
3460   @param lu: the logical unit on whose behalf we execute
3461   @type instance: L{objects.Instance}
3462   @param instance: the instance whose disks we should remove
3463   @rtype: boolean
3464   @return: the success of the removal
3465
3466   """
3467   logging.info("Removing block devices for instance %s", instance.name)
3468
3469   result = True
3470   for device in instance.disks:
3471     for node, disk in device.ComputeNodeTree(instance.primary_node):
3472       lu.cfg.SetDiskID(disk, node)
3473       result = lu.rpc.call_blockdev_remove(node, disk)
3474       if result.failed or not result.data:
3475         lu.proc.LogWarning("Could not remove block device %s on node %s,"
3476                            " continuing anyway", device.iv_name, node)
3477         result = False
3478
3479   if instance.disk_template == constants.DT_FILE:
3480     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3481     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3482                                                  file_storage_dir)
3483     if result.failed or not result.data:
3484       logging.error("Could not remove directory '%s'", file_storage_dir)
3485       result = False
3486
3487   return result
3488
3489
3490 def _ComputeDiskSize(disk_template, disks):
3491   """Compute disk size requirements in the volume group
3492
3493   """
3494   # Required free disk space as a function of disk and swap space
3495   req_size_dict = {
3496     constants.DT_DISKLESS: None,
3497     constants.DT_PLAIN: sum(d["size"] for d in disks),
3498     # 128 MB are added for drbd metadata for each disk
3499     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3500     constants.DT_FILE: None,
3501   }
3502
3503   if disk_template not in req_size_dict:
3504     raise errors.ProgrammerError("Disk template '%s' size requirement"
3505                                  " is unknown" %  disk_template)
3506
3507   return req_size_dict[disk_template]
3508
3509
3510 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3511   """Hypervisor parameter validation.
3512
3513   This function abstract the hypervisor parameter validation to be
3514   used in both instance create and instance modify.
3515
3516   @type lu: L{LogicalUnit}
3517   @param lu: the logical unit for which we check
3518   @type nodenames: list
3519   @param nodenames: the list of nodes on which we should check
3520   @type hvname: string
3521   @param hvname: the name of the hypervisor we should use
3522   @type hvparams: dict
3523   @param hvparams: the parameters which we need to check
3524   @raise errors.OpPrereqError: if the parameters are not valid
3525
3526   """
3527   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3528                                                   hvname,
3529                                                   hvparams)
3530   for node in nodenames:
3531     info = hvinfo[node]
3532     info.Raise()
3533     if not info.data or not isinstance(info.data, (tuple, list)):
3534       raise errors.OpPrereqError("Cannot get current information"
3535                                  " from node '%s' (%s)" % (node, info.data))
3536     if not info.data[0]:
3537       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3538                                  " %s" % info.data[1])
3539
3540
3541 class LUCreateInstance(LogicalUnit):
3542   """Create an instance.
3543
3544   """
3545   HPATH = "instance-add"
3546   HTYPE = constants.HTYPE_INSTANCE
3547   _OP_REQP = ["instance_name", "disks", "disk_template",
3548               "mode", "start",
3549               "wait_for_sync", "ip_check", "nics",
3550               "hvparams", "beparams"]
3551   REQ_BGL = False
3552
3553   def _ExpandNode(self, node):
3554     """Expands and checks one node name.
3555
3556     """
3557     node_full = self.cfg.ExpandNodeName(node)
3558     if node_full is None:
3559       raise errors.OpPrereqError("Unknown node %s" % node)
3560     return node_full
3561
3562   def ExpandNames(self):
3563     """ExpandNames for CreateInstance.
3564
3565     Figure out the right locks for instance creation.
3566
3567     """
3568     self.needed_locks = {}
3569
3570     # set optional parameters to none if they don't exist
3571     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3572       if not hasattr(self.op, attr):
3573         setattr(self.op, attr, None)
3574
3575     # cheap checks, mostly valid constants given
3576
3577     # verify creation mode
3578     if self.op.mode not in (constants.INSTANCE_CREATE,
3579                             constants.INSTANCE_IMPORT):
3580       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3581                                  self.op.mode)
3582
3583     # disk template and mirror node verification
3584     if self.op.disk_template not in constants.DISK_TEMPLATES:
3585       raise errors.OpPrereqError("Invalid disk template name")
3586
3587     if self.op.hypervisor is None:
3588       self.op.hypervisor = self.cfg.GetHypervisorType()
3589
3590     cluster = self.cfg.GetClusterInfo()
3591     enabled_hvs = cluster.enabled_hypervisors
3592     if self.op.hypervisor not in enabled_hvs:
3593       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3594                                  " cluster (%s)" % (self.op.hypervisor,
3595                                   ",".join(enabled_hvs)))
3596
3597     # check hypervisor parameter syntax (locally)
3598
3599     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3600                                   self.op.hvparams)
3601     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3602     hv_type.CheckParameterSyntax(filled_hvp)
3603
3604     # fill and remember the beparams dict
3605     utils.CheckBEParams(self.op.beparams)
3606     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3607                                     self.op.beparams)
3608
3609     #### instance parameters check
3610
3611     # instance name verification
3612     hostname1 = utils.HostInfo(self.op.instance_name)
3613     self.op.instance_name = instance_name = hostname1.name
3614
3615     # this is just a preventive check, but someone might still add this
3616     # instance in the meantime, and creation will fail at lock-add time
3617     if instance_name in self.cfg.GetInstanceList():
3618       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3619                                  instance_name)
3620
3621     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3622
3623     # NIC buildup
3624     self.nics = []
3625     for nic in self.op.nics:
3626       # ip validity checks
3627       ip = nic.get("ip", None)
3628       if ip is None or ip.lower() == "none":
3629         nic_ip = None
3630       elif ip.lower() == constants.VALUE_AUTO:
3631         nic_ip = hostname1.ip
3632       else:
3633         if not utils.IsValidIP(ip):
3634           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3635                                      " like a valid IP" % ip)
3636         nic_ip = ip
3637
3638       # MAC address verification
3639       mac = nic.get("mac", constants.VALUE_AUTO)
3640       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3641         if not utils.IsValidMac(mac.lower()):
3642           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3643                                      mac)
3644       # bridge verification
3645       bridge = nic.get("bridge", self.cfg.GetDefBridge())
3646       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3647
3648     # disk checks/pre-build
3649     self.disks = []
3650     for disk in self.op.disks:
3651       mode = disk.get("mode", constants.DISK_RDWR)
3652       if mode not in constants.DISK_ACCESS_SET:
3653         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3654                                    mode)
3655       size = disk.get("size", None)
3656       if size is None:
3657         raise errors.OpPrereqError("Missing disk size")
3658       try:
3659         size = int(size)
3660       except ValueError:
3661         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3662       self.disks.append({"size": size, "mode": mode})
3663
3664     # used in CheckPrereq for ip ping check
3665     self.check_ip = hostname1.ip
3666
3667     # file storage checks
3668     if (self.op.file_driver and
3669         not self.op.file_driver in constants.FILE_DRIVER):
3670       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3671                                  self.op.file_driver)
3672
3673     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3674       raise errors.OpPrereqError("File storage directory path not absolute")
3675
3676     ### Node/iallocator related checks
3677     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3678       raise errors.OpPrereqError("One and only one of iallocator and primary"
3679                                  " node must be given")
3680
3681     if self.op.iallocator:
3682       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3683     else:
3684       self.op.pnode = self._ExpandNode(self.op.pnode)
3685       nodelist = [self.op.pnode]
3686       if self.op.snode is not None:
3687         self.op.snode = self._ExpandNode(self.op.snode)
3688         nodelist.append(self.op.snode)
3689       self.needed_locks[locking.LEVEL_NODE] = nodelist
3690
3691     # in case of import lock the source node too
3692     if self.op.mode == constants.INSTANCE_IMPORT:
3693       src_node = getattr(self.op, "src_node", None)
3694       src_path = getattr(self.op, "src_path", None)
3695
3696       if src_path is None:
3697         self.op.src_path = src_path = self.op.instance_name
3698
3699       if src_node is None:
3700         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3701         self.op.src_node = None
3702         if os.path.isabs(src_path):
3703           raise errors.OpPrereqError("Importing an instance from an absolute"
3704                                      " path requires a source node option.")
3705       else:
3706         self.op.src_node = src_node = self._ExpandNode(src_node)
3707         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3708           self.needed_locks[locking.LEVEL_NODE].append(src_node)
3709         if not os.path.isabs(src_path):
3710           self.op.src_path = src_path = \
3711             os.path.join(constants.EXPORT_DIR, src_path)
3712
3713     else: # INSTANCE_CREATE
3714       if getattr(self.op, "os_type", None) is None:
3715         raise errors.OpPrereqError("No guest OS specified")
3716
3717   def _RunAllocator(self):
3718     """Run the allocator based on input opcode.
3719
3720     """
3721     nics = [n.ToDict() for n in self.nics]
3722     ial = IAllocator(self,
3723                      mode=constants.IALLOCATOR_MODE_ALLOC,
3724                      name=self.op.instance_name,
3725                      disk_template=self.op.disk_template,
3726                      tags=[],
3727                      os=self.op.os_type,
3728                      vcpus=self.be_full[constants.BE_VCPUS],
3729                      mem_size=self.be_full[constants.BE_MEMORY],
3730                      disks=self.disks,
3731                      nics=nics,
3732                      hypervisor=self.op.hypervisor,
3733                      )
3734
3735     ial.Run(self.op.iallocator)
3736
3737     if not ial.success:
3738       raise errors.OpPrereqError("Can't compute nodes using"
3739                                  " iallocator '%s': %s" % (self.op.iallocator,
3740                                                            ial.info))
3741     if len(ial.nodes) != ial.required_nodes:
3742       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3743                                  " of nodes (%s), required %s" %
3744                                  (self.op.iallocator, len(ial.nodes),
3745                                   ial.required_nodes))
3746     self.op.pnode = ial.nodes[0]
3747     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3748                  self.op.instance_name, self.op.iallocator,
3749                  ", ".join(ial.nodes))
3750     if ial.required_nodes == 2:
3751       self.op.snode = ial.nodes[1]
3752
3753   def BuildHooksEnv(self):
3754     """Build hooks env.
3755
3756     This runs on master, primary and secondary nodes of the instance.
3757
3758     """
3759     env = {
3760       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3761       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3762       "INSTANCE_ADD_MODE": self.op.mode,
3763       }
3764     if self.op.mode == constants.INSTANCE_IMPORT:
3765       env["INSTANCE_SRC_NODE"] = self.op.src_node
3766       env["INSTANCE_SRC_PATH"] = self.op.src_path
3767       env["INSTANCE_SRC_IMAGES"] = self.src_images
3768
3769     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3770       primary_node=self.op.pnode,
3771       secondary_nodes=self.secondaries,
3772       status=self.instance_status,
3773       os_type=self.op.os_type,
3774       memory=self.be_full[constants.BE_MEMORY],
3775       vcpus=self.be_full[constants.BE_VCPUS],
3776       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3777     ))
3778
3779     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3780           self.secondaries)
3781     return env, nl, nl
3782
3783
3784   def CheckPrereq(self):
3785     """Check prerequisites.
3786
3787     """
3788     if (not self.cfg.GetVGName() and
3789         self.op.disk_template not in constants.DTS_NOT_LVM):
3790       raise errors.OpPrereqError("Cluster does not support lvm-based"
3791                                  " instances")
3792
3793
3794     if self.op.mode == constants.INSTANCE_IMPORT:
3795       src_node = self.op.src_node
3796       src_path = self.op.src_path
3797
3798       if src_node is None:
3799         exp_list = self.rpc.call_export_list(
3800           self.acquired_locks[locking.LEVEL_NODE])
3801         found = False
3802         for node in exp_list:
3803           if not exp_list[node].failed and src_path in exp_list[node].data:
3804             found = True
3805             self.op.src_node = src_node = node
3806             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3807                                                        src_path)
3808             break
3809         if not found:
3810           raise errors.OpPrereqError("No export found for relative path %s" %
3811                                       src_path)
3812
3813       result = self.rpc.call_export_info(src_node, src_path)
3814       result.Raise()
3815       if not result.data:
3816         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3817
3818       export_info = result.data
3819       if not export_info.has_section(constants.INISECT_EXP):
3820         raise errors.ProgrammerError("Corrupted export config")
3821
3822       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3823       if (int(ei_version) != constants.EXPORT_VERSION):
3824         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3825                                    (ei_version, constants.EXPORT_VERSION))
3826
3827       # Check that the new instance doesn't have less disks than the export
3828       instance_disks = len(self.disks)
3829       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3830       if instance_disks < export_disks:
3831         raise errors.OpPrereqError("Not enough disks to import."
3832                                    " (instance: %d, export: %d)" %
3833                                    (instance_disks, export_disks))
3834
3835       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3836       disk_images = []
3837       for idx in range(export_disks):
3838         option = 'disk%d_dump' % idx
3839         if export_info.has_option(constants.INISECT_INS, option):
3840           # FIXME: are the old os-es, disk sizes, etc. useful?
3841           export_name = export_info.get(constants.INISECT_INS, option)
3842           image = os.path.join(src_path, export_name)
3843           disk_images.append(image)
3844         else:
3845           disk_images.append(False)
3846
3847       self.src_images = disk_images
3848
3849       old_name = export_info.get(constants.INISECT_INS, 'name')
3850       # FIXME: int() here could throw a ValueError on broken exports
3851       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3852       if self.op.instance_name == old_name:
3853         for idx, nic in enumerate(self.nics):
3854           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3855             nic_mac_ini = 'nic%d_mac' % idx
3856             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3857
3858     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3859     if self.op.start and not self.op.ip_check:
3860       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3861                                  " adding an instance in start mode")
3862
3863     if self.op.ip_check:
3864       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3865         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3866                                    (self.check_ip, self.op.instance_name))
3867
3868     #### allocator run
3869
3870     if self.op.iallocator is not None:
3871       self._RunAllocator()
3872
3873     #### node related checks
3874
3875     # check primary node
3876     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3877     assert self.pnode is not None, \
3878       "Cannot retrieve locked node %s" % self.op.pnode
3879     self.secondaries = []
3880
3881     # mirror node verification
3882     if self.op.disk_template in constants.DTS_NET_MIRROR:
3883       if self.op.snode is None:
3884         raise errors.OpPrereqError("The networked disk templates need"
3885                                    " a mirror node")
3886       if self.op.snode == pnode.name:
3887         raise errors.OpPrereqError("The secondary node cannot be"
3888                                    " the primary node.")
3889       self.secondaries.append(self.op.snode)
3890
3891     nodenames = [pnode.name] + self.secondaries
3892
3893     req_size = _ComputeDiskSize(self.op.disk_template,
3894                                 self.disks)
3895
3896     # Check lv size requirements
3897     if req_size is not None:
3898       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3899                                          self.op.hypervisor)
3900       for node in nodenames:
3901         info = nodeinfo[node]
3902         info.Raise()
3903         info = info.data
3904         if not info:
3905           raise errors.OpPrereqError("Cannot get current information"
3906                                      " from node '%s'" % node)
3907         vg_free = info.get('vg_free', None)
3908         if not isinstance(vg_free, int):
3909           raise errors.OpPrereqError("Can't compute free disk space on"
3910                                      " node %s" % node)
3911         if req_size > info['vg_free']:
3912           raise errors.OpPrereqError("Not enough disk space on target node %s."
3913                                      " %d MB available, %d MB required" %
3914                                      (node, info['vg_free'], req_size))
3915
3916     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3917
3918     # os verification
3919     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3920     result.Raise()
3921     if not isinstance(result.data, objects.OS):
3922       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3923                                  " primary node"  % self.op.os_type)
3924
3925     # bridge check on primary node
3926     bridges = [n.bridge for n in self.nics]
3927     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3928     result.Raise()
3929     if not result.data:
3930       raise errors.OpPrereqError("One of the target bridges '%s' does not"
3931                                  " exist on destination node '%s'" %
3932                                  (",".join(bridges), pnode.name))
3933
3934     # memory check on primary node
3935     if self.op.start:
3936       _CheckNodeFreeMemory(self, self.pnode.name,
3937                            "creating instance %s" % self.op.instance_name,
3938                            self.be_full[constants.BE_MEMORY],
3939                            self.op.hypervisor)
3940
3941     if self.op.start:
3942       self.instance_status = 'up'
3943     else:
3944       self.instance_status = 'down'
3945
3946   def Exec(self, feedback_fn):
3947     """Create and add the instance to the cluster.
3948
3949     """
3950     instance = self.op.instance_name
3951     pnode_name = self.pnode.name
3952
3953     for nic in self.nics:
3954       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3955         nic.mac = self.cfg.GenerateMAC()
3956
3957     ht_kind = self.op.hypervisor
3958     if ht_kind in constants.HTS_REQ_PORT:
3959       network_port = self.cfg.AllocatePort()
3960     else:
3961       network_port = None
3962
3963     ##if self.op.vnc_bind_address is None:
3964     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3965
3966     # this is needed because os.path.join does not accept None arguments
3967     if self.op.file_storage_dir is None:
3968       string_file_storage_dir = ""
3969     else:
3970       string_file_storage_dir = self.op.file_storage_dir
3971
3972     # build the full file storage dir path
3973     file_storage_dir = os.path.normpath(os.path.join(
3974                                         self.cfg.GetFileStorageDir(),
3975                                         string_file_storage_dir, instance))
3976
3977
3978     disks = _GenerateDiskTemplate(self,
3979                                   self.op.disk_template,
3980                                   instance, pnode_name,
3981                                   self.secondaries,
3982                                   self.disks,
3983                                   file_storage_dir,
3984                                   self.op.file_driver,
3985                                   0)
3986
3987     iobj = objects.Instance(name=instance, os=self.op.os_type,
3988                             primary_node=pnode_name,
3989                             nics=self.nics, disks=disks,
3990                             disk_template=self.op.disk_template,
3991                             status=self.instance_status,
3992                             network_port=network_port,
3993                             beparams=self.op.beparams,
3994                             hvparams=self.op.hvparams,
3995                             hypervisor=self.op.hypervisor,
3996                             )
3997
3998     feedback_fn("* creating instance disks...")
3999     if not _CreateDisks(self, iobj):
4000       _RemoveDisks(self, iobj)
4001       self.cfg.ReleaseDRBDMinors(instance)
4002       raise errors.OpExecError("Device creation failed, reverting...")
4003
4004     feedback_fn("adding instance %s to cluster config" % instance)
4005
4006     self.cfg.AddInstance(iobj)
4007     # Declare that we don't want to remove the instance lock anymore, as we've
4008     # added the instance to the config
4009     del self.remove_locks[locking.LEVEL_INSTANCE]
4010     # Remove the temp. assignements for the instance's drbds
4011     self.cfg.ReleaseDRBDMinors(instance)
4012     # Unlock all the nodes
4013     if self.op.mode == constants.INSTANCE_IMPORT:
4014       nodes_keep = [self.op.src_node]
4015       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4016                        if node != self.op.src_node]
4017       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4018       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4019     else:
4020       self.context.glm.release(locking.LEVEL_NODE)
4021       del self.acquired_locks[locking.LEVEL_NODE]
4022
4023     if self.op.wait_for_sync:
4024       disk_abort = not _WaitForSync(self, iobj)
4025     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4026       # make sure the disks are not degraded (still sync-ing is ok)
4027       time.sleep(15)
4028       feedback_fn("* checking mirrors status")
4029       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4030     else:
4031       disk_abort = False
4032
4033     if disk_abort:
4034       _RemoveDisks(self, iobj)
4035       self.cfg.RemoveInstance(iobj.name)
4036       # Make sure the instance lock gets removed
4037       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4038       raise errors.OpExecError("There are some degraded disks for"
4039                                " this instance")
4040
4041     feedback_fn("creating os for instance %s on node %s" %
4042                 (instance, pnode_name))
4043
4044     if iobj.disk_template != constants.DT_DISKLESS:
4045       if self.op.mode == constants.INSTANCE_CREATE:
4046         feedback_fn("* running the instance OS create scripts...")
4047         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4048         result.Raise()
4049         if not result.data:
4050           raise errors.OpExecError("Could not add os for instance %s"
4051                                    " on node %s" %
4052                                    (instance, pnode_name))
4053
4054       elif self.op.mode == constants.INSTANCE_IMPORT:
4055         feedback_fn("* running the instance OS import scripts...")
4056         src_node = self.op.src_node
4057         src_images = self.src_images
4058         cluster_name = self.cfg.GetClusterName()
4059         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4060                                                          src_node, src_images,
4061                                                          cluster_name)
4062         import_result.Raise()
4063         for idx, result in enumerate(import_result.data):
4064           if not result:
4065             self.LogWarning("Could not import the image %s for instance"
4066                             " %s, disk %d, on node %s" %
4067                             (src_images[idx], instance, idx, pnode_name))
4068       else:
4069         # also checked in the prereq part
4070         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4071                                      % self.op.mode)
4072
4073     if self.op.start:
4074       logging.info("Starting instance %s on node %s", instance, pnode_name)
4075       feedback_fn("* starting instance...")
4076       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4077       result.Raise()
4078       if not result.data:
4079         raise errors.OpExecError("Could not start instance")
4080
4081
4082 class LUConnectConsole(NoHooksLU):
4083   """Connect to an instance's console.
4084
4085   This is somewhat special in that it returns the command line that
4086   you need to run on the master node in order to connect to the
4087   console.
4088
4089   """
4090   _OP_REQP = ["instance_name"]
4091   REQ_BGL = False
4092
4093   def ExpandNames(self):
4094     self._ExpandAndLockInstance()
4095
4096   def CheckPrereq(self):
4097     """Check prerequisites.
4098
4099     This checks that the instance is in the cluster.
4100
4101     """
4102     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4103     assert self.instance is not None, \
4104       "Cannot retrieve locked instance %s" % self.op.instance_name
4105
4106   def Exec(self, feedback_fn):
4107     """Connect to the console of an instance
4108
4109     """
4110     instance = self.instance
4111     node = instance.primary_node
4112
4113     node_insts = self.rpc.call_instance_list([node],
4114                                              [instance.hypervisor])[node]
4115     node_insts.Raise()
4116
4117     if instance.name not in node_insts.data:
4118       raise errors.OpExecError("Instance %s is not running." % instance.name)
4119
4120     logging.debug("Connecting to console of %s on %s", instance.name, node)
4121
4122     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4123     console_cmd = hyper.GetShellCommandForConsole(instance)
4124
4125     # build ssh cmdline
4126     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4127
4128
4129 class LUReplaceDisks(LogicalUnit):
4130   """Replace the disks of an instance.
4131
4132   """
4133   HPATH = "mirrors-replace"
4134   HTYPE = constants.HTYPE_INSTANCE
4135   _OP_REQP = ["instance_name", "mode", "disks"]
4136   REQ_BGL = False
4137
4138   def ExpandNames(self):
4139     self._ExpandAndLockInstance()
4140
4141     if not hasattr(self.op, "remote_node"):
4142       self.op.remote_node = None
4143
4144     ia_name = getattr(self.op, "iallocator", None)
4145     if ia_name is not None:
4146       if self.op.remote_node is not None:
4147         raise errors.OpPrereqError("Give either the iallocator or the new"
4148                                    " secondary, not both")
4149       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4150     elif self.op.remote_node is not None:
4151       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4152       if remote_node is None:
4153         raise errors.OpPrereqError("Node '%s' not known" %
4154                                    self.op.remote_node)
4155       self.op.remote_node = remote_node
4156       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4157       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4158     else:
4159       self.needed_locks[locking.LEVEL_NODE] = []
4160       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4161
4162   def DeclareLocks(self, level):
4163     # If we're not already locking all nodes in the set we have to declare the
4164     # instance's primary/secondary nodes.
4165     if (level == locking.LEVEL_NODE and
4166         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4167       self._LockInstancesNodes()
4168
4169   def _RunAllocator(self):
4170     """Compute a new secondary node using an IAllocator.
4171
4172     """
4173     ial = IAllocator(self,
4174                      mode=constants.IALLOCATOR_MODE_RELOC,
4175                      name=self.op.instance_name,
4176                      relocate_from=[self.sec_node])
4177
4178     ial.Run(self.op.iallocator)
4179
4180     if not ial.success:
4181       raise errors.OpPrereqError("Can't compute nodes using"
4182                                  " iallocator '%s': %s" % (self.op.iallocator,
4183                                                            ial.info))
4184     if len(ial.nodes) != ial.required_nodes:
4185       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4186                                  " of nodes (%s), required %s" %
4187                                  (len(ial.nodes), ial.required_nodes))
4188     self.op.remote_node = ial.nodes[0]
4189     self.LogInfo("Selected new secondary for the instance: %s",
4190                  self.op.remote_node)
4191
4192   def BuildHooksEnv(self):
4193     """Build hooks env.
4194
4195     This runs on the master, the primary and all the secondaries.
4196
4197     """
4198     env = {
4199       "MODE": self.op.mode,
4200       "NEW_SECONDARY": self.op.remote_node,
4201       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4202       }
4203     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4204     nl = [
4205       self.cfg.GetMasterNode(),
4206       self.instance.primary_node,
4207       ]
4208     if self.op.remote_node is not None:
4209       nl.append(self.op.remote_node)
4210     return env, nl, nl
4211
4212   def CheckPrereq(self):
4213     """Check prerequisites.
4214
4215     This checks that the instance is in the cluster.
4216
4217     """
4218     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4219     assert instance is not None, \
4220       "Cannot retrieve locked instance %s" % self.op.instance_name
4221     self.instance = instance
4222
4223     if instance.disk_template not in constants.DTS_NET_MIRROR:
4224       raise errors.OpPrereqError("Instance's disk layout is not"
4225                                  " network mirrored.")
4226
4227     if len(instance.secondary_nodes) != 1:
4228       raise errors.OpPrereqError("The instance has a strange layout,"
4229                                  " expected one secondary but found %d" %
4230                                  len(instance.secondary_nodes))
4231
4232     self.sec_node = instance.secondary_nodes[0]
4233
4234     ia_name = getattr(self.op, "iallocator", None)
4235     if ia_name is not None:
4236       self._RunAllocator()
4237
4238     remote_node = self.op.remote_node
4239     if remote_node is not None:
4240       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4241       assert self.remote_node_info is not None, \
4242         "Cannot retrieve locked node %s" % remote_node
4243     else:
4244       self.remote_node_info = None
4245     if remote_node == instance.primary_node:
4246       raise errors.OpPrereqError("The specified node is the primary node of"
4247                                  " the instance.")
4248     elif remote_node == self.sec_node:
4249       if self.op.mode == constants.REPLACE_DISK_SEC:
4250         # this is for DRBD8, where we can't execute the same mode of
4251         # replacement as for drbd7 (no different port allocated)
4252         raise errors.OpPrereqError("Same secondary given, cannot execute"
4253                                    " replacement")
4254     if instance.disk_template == constants.DT_DRBD8:
4255       if (self.op.mode == constants.REPLACE_DISK_ALL and
4256           remote_node is not None):
4257         # switch to replace secondary mode
4258         self.op.mode = constants.REPLACE_DISK_SEC
4259
4260       if self.op.mode == constants.REPLACE_DISK_ALL:
4261         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4262                                    " secondary disk replacement, not"
4263                                    " both at once")
4264       elif self.op.mode == constants.REPLACE_DISK_PRI:
4265         if remote_node is not None:
4266           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4267                                      " the secondary while doing a primary"
4268                                      " node disk replacement")
4269         self.tgt_node = instance.primary_node
4270         self.oth_node = instance.secondary_nodes[0]
4271       elif self.op.mode == constants.REPLACE_DISK_SEC:
4272         self.new_node = remote_node # this can be None, in which case
4273                                     # we don't change the secondary
4274         self.tgt_node = instance.secondary_nodes[0]
4275         self.oth_node = instance.primary_node
4276       else:
4277         raise errors.ProgrammerError("Unhandled disk replace mode")
4278
4279     if not self.op.disks:
4280       self.op.disks = range(len(instance.disks))
4281
4282     for disk_idx in self.op.disks:
4283       instance.FindDisk(disk_idx)
4284
4285   def _ExecD8DiskOnly(self, feedback_fn):
4286     """Replace a disk on the primary or secondary for dbrd8.
4287
4288     The algorithm for replace is quite complicated:
4289
4290       1. for each disk to be replaced:
4291
4292         1. create new LVs on the target node with unique names
4293         1. detach old LVs from the drbd device
4294         1. rename old LVs to name_replaced.<time_t>
4295         1. rename new LVs to old LVs
4296         1. attach the new LVs (with the old names now) to the drbd device
4297
4298       1. wait for sync across all devices
4299
4300       1. for each modified disk:
4301
4302         1. remove old LVs (which have the name name_replaces.<time_t>)
4303
4304     Failures are not very well handled.
4305
4306     """
4307     steps_total = 6
4308     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4309     instance = self.instance
4310     iv_names = {}
4311     vgname = self.cfg.GetVGName()
4312     # start of work
4313     cfg = self.cfg
4314     tgt_node = self.tgt_node
4315     oth_node = self.oth_node
4316
4317     # Step: check device activation
4318     self.proc.LogStep(1, steps_total, "check device existence")
4319     info("checking volume groups")
4320     my_vg = cfg.GetVGName()
4321     results = self.rpc.call_vg_list([oth_node, tgt_node])
4322     if not results:
4323       raise errors.OpExecError("Can't list volume groups on the nodes")
4324     for node in oth_node, tgt_node:
4325       res = results[node]
4326       if res.failed or not res.data or my_vg not in res.data:
4327         raise errors.OpExecError("Volume group '%s' not found on %s" %
4328                                  (my_vg, node))
4329     for idx, dev in enumerate(instance.disks):
4330       if idx not in self.op.disks:
4331         continue
4332       for node in tgt_node, oth_node:
4333         info("checking disk/%d on %s" % (idx, node))
4334         cfg.SetDiskID(dev, node)
4335         if not self.rpc.call_blockdev_find(node, dev):
4336           raise errors.OpExecError("Can't find disk/%d on node %s" %
4337                                    (idx, node))
4338
4339     # Step: check other node consistency
4340     self.proc.LogStep(2, steps_total, "check peer consistency")
4341     for idx, dev in enumerate(instance.disks):
4342       if idx not in self.op.disks:
4343         continue
4344       info("checking disk/%d consistency on %s" % (idx, oth_node))
4345       if not _CheckDiskConsistency(self, dev, oth_node,
4346                                    oth_node==instance.primary_node):
4347         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4348                                  " to replace disks on this node (%s)" %
4349                                  (oth_node, tgt_node))
4350
4351     # Step: create new storage
4352     self.proc.LogStep(3, steps_total, "allocate new storage")
4353     for idx, dev in enumerate(instance.disks):
4354       if idx not in self.op.disks:
4355         continue
4356       size = dev.size
4357       cfg.SetDiskID(dev, tgt_node)
4358       lv_names = [".disk%d_%s" % (idx, suf)
4359                   for suf in ["data", "meta"]]
4360       names = _GenerateUniqueNames(self, lv_names)
4361       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4362                              logical_id=(vgname, names[0]))
4363       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4364                              logical_id=(vgname, names[1]))
4365       new_lvs = [lv_data, lv_meta]
4366       old_lvs = dev.children
4367       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4368       info("creating new local storage on %s for %s" %
4369            (tgt_node, dev.iv_name))
4370       # since we *always* want to create this LV, we use the
4371       # _Create...OnPrimary (which forces the creation), even if we
4372       # are talking about the secondary node
4373       for new_lv in new_lvs:
4374         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4375                                         _GetInstanceInfoText(instance)):
4376           raise errors.OpExecError("Failed to create new LV named '%s' on"
4377                                    " node '%s'" %
4378                                    (new_lv.logical_id[1], tgt_node))
4379
4380     # Step: for each lv, detach+rename*2+attach
4381     self.proc.LogStep(4, steps_total, "change drbd configuration")
4382     for dev, old_lvs, new_lvs in iv_names.itervalues():
4383       info("detaching %s drbd from local storage" % dev.iv_name)
4384       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4385       result.Raise()
4386       if not result.data:
4387         raise errors.OpExecError("Can't detach drbd from local storage on node"
4388                                  " %s for device %s" % (tgt_node, dev.iv_name))
4389       #dev.children = []
4390       #cfg.Update(instance)
4391
4392       # ok, we created the new LVs, so now we know we have the needed
4393       # storage; as such, we proceed on the target node to rename
4394       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4395       # using the assumption that logical_id == physical_id (which in
4396       # turn is the unique_id on that node)
4397
4398       # FIXME(iustin): use a better name for the replaced LVs
4399       temp_suffix = int(time.time())
4400       ren_fn = lambda d, suff: (d.physical_id[0],
4401                                 d.physical_id[1] + "_replaced-%s" % suff)
4402       # build the rename list based on what LVs exist on the node
4403       rlist = []
4404       for to_ren in old_lvs:
4405         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4406         if not find_res.failed and find_res.data is not None: # device exists
4407           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4408
4409       info("renaming the old LVs on the target node")
4410       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4411       result.Raise()
4412       if not result.data:
4413         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4414       # now we rename the new LVs to the old LVs
4415       info("renaming the new LVs on the target node")
4416       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4417       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4418       result.Raise()
4419       if not result.data:
4420         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4421
4422       for old, new in zip(old_lvs, new_lvs):
4423         new.logical_id = old.logical_id
4424         cfg.SetDiskID(new, tgt_node)
4425
4426       for disk in old_lvs:
4427         disk.logical_id = ren_fn(disk, temp_suffix)
4428         cfg.SetDiskID(disk, tgt_node)
4429
4430       # now that the new lvs have the old name, we can add them to the device
4431       info("adding new mirror component on %s" % tgt_node)
4432       result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4433       if result.failed or not result.data:
4434         for new_lv in new_lvs:
4435           result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4436           if result.failed or not result.data:
4437             warning("Can't rollback device %s", hint="manually cleanup unused"
4438                     " logical volumes")
4439         raise errors.OpExecError("Can't add local storage to drbd")
4440
4441       dev.children = new_lvs
4442       cfg.Update(instance)
4443
4444     # Step: wait for sync
4445
4446     # this can fail as the old devices are degraded and _WaitForSync
4447     # does a combined result over all disks, so we don't check its
4448     # return value
4449     self.proc.LogStep(5, steps_total, "sync devices")
4450     _WaitForSync(self, instance, unlock=True)
4451
4452     # so check manually all the devices
4453     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4454       cfg.SetDiskID(dev, instance.primary_node)
4455       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4456       if result.failed or result.data[5]:
4457         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4458
4459     # Step: remove old storage
4460     self.proc.LogStep(6, steps_total, "removing old storage")
4461     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4462       info("remove logical volumes for %s" % name)
4463       for lv in old_lvs:
4464         cfg.SetDiskID(lv, tgt_node)
4465         result = self.rpc.call_blockdev_remove(tgt_node, lv)
4466         if result.failed or not result.data:
4467           warning("Can't remove old LV", hint="manually remove unused LVs")
4468           continue
4469
4470   def _ExecD8Secondary(self, feedback_fn):
4471     """Replace the secondary node for drbd8.
4472
4473     The algorithm for replace is quite complicated:
4474       - for all disks of the instance:
4475         - create new LVs on the new node with same names
4476         - shutdown the drbd device on the old secondary
4477         - disconnect the drbd network on the primary
4478         - create the drbd device on the new secondary
4479         - network attach the drbd on the primary, using an artifice:
4480           the drbd code for Attach() will connect to the network if it
4481           finds a device which is connected to the good local disks but
4482           not network enabled
4483       - wait for sync across all devices
4484       - remove all disks from the old secondary
4485
4486     Failures are not very well handled.
4487
4488     """
4489     steps_total = 6
4490     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4491     instance = self.instance
4492     iv_names = {}
4493     vgname = self.cfg.GetVGName()
4494     # start of work
4495     cfg = self.cfg
4496     old_node = self.tgt_node
4497     new_node = self.new_node
4498     pri_node = instance.primary_node
4499
4500     # Step: check device activation
4501     self.proc.LogStep(1, steps_total, "check device existence")
4502     info("checking volume groups")
4503     my_vg = cfg.GetVGName()
4504     results = self.rpc.call_vg_list([pri_node, new_node])
4505     for node in pri_node, new_node:
4506       res = results[node]
4507       if res.failed or not res.data or my_vg not in res.data:
4508         raise errors.OpExecError("Volume group '%s' not found on %s" %
4509                                  (my_vg, node))
4510     for idx, dev in enumerate(instance.disks):
4511       if idx not in self.op.disks:
4512         continue
4513       info("checking disk/%d on %s" % (idx, pri_node))
4514       cfg.SetDiskID(dev, pri_node)
4515       result = self.rpc.call_blockdev_find(pri_node, dev)
4516       result.Raise()
4517       if not result.data:
4518         raise errors.OpExecError("Can't find disk/%d on node %s" %
4519                                  (idx, pri_node))
4520
4521     # Step: check other node consistency
4522     self.proc.LogStep(2, steps_total, "check peer consistency")
4523     for idx, dev in enumerate(instance.disks):
4524       if idx not in self.op.disks:
4525         continue
4526       info("checking disk/%d consistency on %s" % (idx, pri_node))
4527       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4528         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4529                                  " unsafe to replace the secondary" %
4530                                  pri_node)
4531
4532     # Step: create new storage
4533     self.proc.LogStep(3, steps_total, "allocate new storage")
4534     for idx, dev in enumerate(instance.disks):
4535       size = dev.size
4536       info("adding new local storage on %s for disk/%d" %
4537            (new_node, idx))
4538       # since we *always* want to create this LV, we use the
4539       # _Create...OnPrimary (which forces the creation), even if we
4540       # are talking about the secondary node
4541       for new_lv in dev.children:
4542         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4543                                         _GetInstanceInfoText(instance)):
4544           raise errors.OpExecError("Failed to create new LV named '%s' on"
4545                                    " node '%s'" %
4546                                    (new_lv.logical_id[1], new_node))
4547
4548     # Step 4: dbrd minors and drbd setups changes
4549     # after this, we must manually remove the drbd minors on both the
4550     # error and the success paths
4551     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4552                                    instance.name)
4553     logging.debug("Allocated minors %s" % (minors,))
4554     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4555     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4556       size = dev.size
4557       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4558       # create new devices on new_node
4559       if pri_node == dev.logical_id[0]:
4560         new_logical_id = (pri_node, new_node,
4561                           dev.logical_id[2], dev.logical_id[3], new_minor,
4562                           dev.logical_id[5])
4563       else:
4564         new_logical_id = (new_node, pri_node,
4565                           dev.logical_id[2], new_minor, dev.logical_id[4],
4566                           dev.logical_id[5])
4567       iv_names[idx] = (dev, dev.children, new_logical_id)
4568       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4569                     new_logical_id)
4570       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4571                               logical_id=new_logical_id,
4572                               children=dev.children)
4573       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4574                                         new_drbd, False,
4575                                         _GetInstanceInfoText(instance)):
4576         self.cfg.ReleaseDRBDMinors(instance.name)
4577         raise errors.OpExecError("Failed to create new DRBD on"
4578                                  " node '%s'" % new_node)
4579
4580     for idx, dev in enumerate(instance.disks):
4581       # we have new devices, shutdown the drbd on the old secondary
4582       info("shutting down drbd for disk/%d on old node" % idx)
4583       cfg.SetDiskID(dev, old_node)
4584       result = self.rpc.call_blockdev_shutdown(old_node, dev)
4585       if result.failed or not result.data:
4586         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4587                 hint="Please cleanup this device manually as soon as possible")
4588
4589     info("detaching primary drbds from the network (=> standalone)")
4590     done = 0
4591     for idx, dev in enumerate(instance.disks):
4592       cfg.SetDiskID(dev, pri_node)
4593       # set the network part of the physical (unique in bdev terms) id
4594       # to None, meaning detach from network
4595       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4596       # and 'find' the device, which will 'fix' it to match the
4597       # standalone state
4598       result = self.rpc.call_blockdev_find(pri_node, dev)
4599       if not result.failed and result.data:
4600         done += 1
4601       else:
4602         warning("Failed to detach drbd disk/%d from network, unusual case" %
4603                 idx)
4604
4605     if not done:
4606       # no detaches succeeded (very unlikely)
4607       self.cfg.ReleaseDRBDMinors(instance.name)
4608       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4609
4610     # if we managed to detach at least one, we update all the disks of
4611     # the instance to point to the new secondary
4612     info("updating instance configuration")
4613     for dev, _, new_logical_id in iv_names.itervalues():
4614       dev.logical_id = new_logical_id
4615       cfg.SetDiskID(dev, pri_node)
4616     cfg.Update(instance)
4617     # we can remove now the temp minors as now the new values are
4618     # written to the config file (and therefore stable)
4619     self.cfg.ReleaseDRBDMinors(instance.name)
4620
4621     # and now perform the drbd attach
4622     info("attaching primary drbds to new secondary (standalone => connected)")
4623     failures = []
4624     for idx, dev in enumerate(instance.disks):
4625       info("attaching primary drbd for disk/%d to new secondary node" % idx)
4626       # since the attach is smart, it's enough to 'find' the device,
4627       # it will automatically activate the network, if the physical_id
4628       # is correct
4629       cfg.SetDiskID(dev, pri_node)
4630       logging.debug("Disk to attach: %s", dev)
4631       result = self.rpc.call_blockdev_find(pri_node, dev)
4632       if result.failed or not result.data:
4633         warning("can't attach drbd disk/%d to new secondary!" % idx,
4634                 "please do a gnt-instance info to see the status of disks")
4635
4636     # this can fail as the old devices are degraded and _WaitForSync
4637     # does a combined result over all disks, so we don't check its
4638     # return value
4639     self.proc.LogStep(5, steps_total, "sync devices")
4640     _WaitForSync(self, instance, unlock=True)
4641
4642     # so check manually all the devices
4643     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4644       cfg.SetDiskID(dev, pri_node)
4645       result = self.rpc.call_blockdev_find(pri_node, dev)
4646       result.Raise()
4647       if result.data[5]:
4648         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4649
4650     self.proc.LogStep(6, steps_total, "removing old storage")
4651     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4652       info("remove logical volumes for disk/%d" % idx)
4653       for lv in old_lvs:
4654         cfg.SetDiskID(lv, old_node)
4655         result = self.rpc.call_blockdev_remove(old_node, lv)
4656         if result.failed or not result.data:
4657           warning("Can't remove LV on old secondary",
4658                   hint="Cleanup stale volumes by hand")
4659
4660   def Exec(self, feedback_fn):
4661     """Execute disk replacement.
4662
4663     This dispatches the disk replacement to the appropriate handler.
4664
4665     """
4666     instance = self.instance
4667
4668     # Activate the instance disks if we're replacing them on a down instance
4669     if instance.status == "down":
4670       _StartInstanceDisks(self, instance, True)
4671
4672     if instance.disk_template == constants.DT_DRBD8:
4673       if self.op.remote_node is None:
4674         fn = self._ExecD8DiskOnly
4675       else:
4676         fn = self._ExecD8Secondary
4677     else:
4678       raise errors.ProgrammerError("Unhandled disk replacement case")
4679
4680     ret = fn(feedback_fn)
4681
4682     # Deactivate the instance disks if we're replacing them on a down instance
4683     if instance.status == "down":
4684       _SafeShutdownInstanceDisks(self, instance)
4685
4686     return ret
4687
4688
4689 class LUGrowDisk(LogicalUnit):
4690   """Grow a disk of an instance.
4691
4692   """
4693   HPATH = "disk-grow"
4694   HTYPE = constants.HTYPE_INSTANCE
4695   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4696   REQ_BGL = False
4697
4698   def ExpandNames(self):
4699     self._ExpandAndLockInstance()
4700     self.needed_locks[locking.LEVEL_NODE] = []
4701     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4702
4703   def DeclareLocks(self, level):
4704     if level == locking.LEVEL_NODE:
4705       self._LockInstancesNodes()
4706
4707   def BuildHooksEnv(self):
4708     """Build hooks env.
4709
4710     This runs on the master, the primary and all the secondaries.
4711
4712     """
4713     env = {
4714       "DISK": self.op.disk,
4715       "AMOUNT": self.op.amount,
4716       }
4717     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4718     nl = [
4719       self.cfg.GetMasterNode(),
4720       self.instance.primary_node,
4721       ]
4722     return env, nl, nl
4723
4724   def CheckPrereq(self):
4725     """Check prerequisites.
4726
4727     This checks that the instance is in the cluster.
4728
4729     """
4730     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4731     assert instance is not None, \
4732       "Cannot retrieve locked instance %s" % self.op.instance_name
4733
4734     self.instance = instance
4735
4736     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4737       raise errors.OpPrereqError("Instance's disk layout does not support"
4738                                  " growing.")
4739
4740     self.disk = instance.FindDisk(self.op.disk)
4741
4742     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4743     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4744                                        instance.hypervisor)
4745     for node in nodenames:
4746       info = nodeinfo[node]
4747       if info.failed or not info.data:
4748         raise errors.OpPrereqError("Cannot get current information"
4749                                    " from node '%s'" % node)
4750       vg_free = info.data.get('vg_free', None)
4751       if not isinstance(vg_free, int):
4752         raise errors.OpPrereqError("Can't compute free disk space on"
4753                                    " node %s" % node)
4754       if self.op.amount > vg_free:
4755         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4756                                    " %d MiB available, %d MiB required" %
4757                                    (node, vg_free, self.op.amount))
4758
4759   def Exec(self, feedback_fn):
4760     """Execute disk grow.
4761
4762     """
4763     instance = self.instance
4764     disk = self.disk
4765     for node in (instance.secondary_nodes + (instance.primary_node,)):
4766       self.cfg.SetDiskID(disk, node)
4767       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4768       result.Raise()
4769       if (not result.data or not isinstance(result.data, (list, tuple)) or
4770           len(result.data) != 2):
4771         raise errors.OpExecError("Grow request failed to node %s" % node)
4772       elif not result.data[0]:
4773         raise errors.OpExecError("Grow request failed to node %s: %s" %
4774                                  (node, result.data[1]))
4775     disk.RecordGrow(self.op.amount)
4776     self.cfg.Update(instance)
4777     if self.op.wait_for_sync:
4778       disk_abort = not _WaitForSync(self, instance)
4779       if disk_abort:
4780         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4781                              " status.\nPlease check the instance.")
4782
4783
4784 class LUQueryInstanceData(NoHooksLU):
4785   """Query runtime instance data.
4786
4787   """
4788   _OP_REQP = ["instances", "static"]
4789   REQ_BGL = False
4790
4791   def ExpandNames(self):
4792     self.needed_locks = {}
4793     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4794
4795     if not isinstance(self.op.instances, list):
4796       raise errors.OpPrereqError("Invalid argument type 'instances'")
4797
4798     if self.op.instances:
4799       self.wanted_names = []
4800       for name in self.op.instances:
4801         full_name = self.cfg.ExpandInstanceName(name)
4802         if full_name is None:
4803           raise errors.OpPrereqError("Instance '%s' not known" %
4804                                      self.op.instance_name)
4805         self.wanted_names.append(full_name)
4806       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4807     else:
4808       self.wanted_names = None
4809       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4810
4811     self.needed_locks[locking.LEVEL_NODE] = []
4812     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4813
4814   def DeclareLocks(self, level):
4815     if level == locking.LEVEL_NODE:
4816       self._LockInstancesNodes()
4817
4818   def CheckPrereq(self):
4819     """Check prerequisites.
4820
4821     This only checks the optional instance list against the existing names.
4822
4823     """
4824     if self.wanted_names is None:
4825       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4826
4827     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4828                              in self.wanted_names]
4829     return
4830
4831   def _ComputeDiskStatus(self, instance, snode, dev):
4832     """Compute block device status.
4833
4834     """
4835     static = self.op.static
4836     if not static:
4837       self.cfg.SetDiskID(dev, instance.primary_node)
4838       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4839       dev_pstatus.Raise()
4840       dev_pstatus = dev_pstatus.data
4841     else:
4842       dev_pstatus = None
4843
4844     if dev.dev_type in constants.LDS_DRBD:
4845       # we change the snode then (otherwise we use the one passed in)
4846       if dev.logical_id[0] == instance.primary_node:
4847         snode = dev.logical_id[1]
4848       else:
4849         snode = dev.logical_id[0]
4850
4851     if snode and not static:
4852       self.cfg.SetDiskID(dev, snode)
4853       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4854       dev_sstatus.Raise()
4855       dev_sstatus = dev_sstatus.data
4856     else:
4857       dev_sstatus = None
4858
4859     if dev.children:
4860       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4861                       for child in dev.children]
4862     else:
4863       dev_children = []
4864
4865     data = {
4866       "iv_name": dev.iv_name,
4867       "dev_type": dev.dev_type,
4868       "logical_id": dev.logical_id,
4869       "physical_id": dev.physical_id,
4870       "pstatus": dev_pstatus,
4871       "sstatus": dev_sstatus,
4872       "children": dev_children,
4873       "mode": dev.mode,
4874       }
4875
4876     return data
4877
4878   def Exec(self, feedback_fn):
4879     """Gather and return data"""
4880     result = {}
4881
4882     cluster = self.cfg.GetClusterInfo()
4883
4884     for instance in self.wanted_instances:
4885       if not self.op.static:
4886         remote_info = self.rpc.call_instance_info(instance.primary_node,
4887                                                   instance.name,
4888                                                   instance.hypervisor)
4889         remote_info.Raise()
4890         remote_info = remote_info.data
4891         if remote_info and "state" in remote_info:
4892           remote_state = "up"
4893         else:
4894           remote_state = "down"
4895       else:
4896         remote_state = None
4897       if instance.status == "down":
4898         config_state = "down"
4899       else:
4900         config_state = "up"
4901
4902       disks = [self._ComputeDiskStatus(instance, None, device)
4903                for device in instance.disks]
4904
4905       idict = {
4906         "name": instance.name,
4907         "config_state": config_state,
4908         "run_state": remote_state,
4909         "pnode": instance.primary_node,
4910         "snodes": instance.secondary_nodes,
4911         "os": instance.os,
4912         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4913         "disks": disks,
4914         "hypervisor": instance.hypervisor,
4915         "network_port": instance.network_port,
4916         "hv_instance": instance.hvparams,
4917         "hv_actual": cluster.FillHV(instance),
4918         "be_instance": instance.beparams,
4919         "be_actual": cluster.FillBE(instance),
4920         }
4921
4922       result[instance.name] = idict
4923
4924     return result
4925
4926
4927 class LUSetInstanceParams(LogicalUnit):
4928   """Modifies an instances's parameters.
4929
4930   """
4931   HPATH = "instance-modify"
4932   HTYPE = constants.HTYPE_INSTANCE
4933   _OP_REQP = ["instance_name"]
4934   REQ_BGL = False
4935
4936   def CheckArguments(self):
4937     if not hasattr(self.op, 'nics'):
4938       self.op.nics = []
4939     if not hasattr(self.op, 'disks'):
4940       self.op.disks = []
4941     if not hasattr(self.op, 'beparams'):
4942       self.op.beparams = {}
4943     if not hasattr(self.op, 'hvparams'):
4944       self.op.hvparams = {}
4945     self.op.force = getattr(self.op, "force", False)
4946     if not (self.op.nics or self.op.disks or
4947             self.op.hvparams or self.op.beparams):
4948       raise errors.OpPrereqError("No changes submitted")
4949
4950     utils.CheckBEParams(self.op.beparams)
4951
4952     # Disk validation
4953     disk_addremove = 0
4954     for disk_op, disk_dict in self.op.disks:
4955       if disk_op == constants.DDM_REMOVE:
4956         disk_addremove += 1
4957         continue
4958       elif disk_op == constants.DDM_ADD:
4959         disk_addremove += 1
4960       else:
4961         if not isinstance(disk_op, int):
4962           raise errors.OpPrereqError("Invalid disk index")
4963       if disk_op == constants.DDM_ADD:
4964         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
4965         if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
4966           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
4967         size = disk_dict.get('size', None)
4968         if size is None:
4969           raise errors.OpPrereqError("Required disk parameter size missing")
4970         try:
4971           size = int(size)
4972         except ValueError, err:
4973           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
4974                                      str(err))
4975         disk_dict['size'] = size
4976       else:
4977         # modification of disk
4978         if 'size' in disk_dict:
4979           raise errors.OpPrereqError("Disk size change not possible, use"
4980                                      " grow-disk")
4981
4982     if disk_addremove > 1:
4983       raise errors.OpPrereqError("Only one disk add or remove operation"
4984                                  " supported at a time")
4985
4986     # NIC validation
4987     nic_addremove = 0
4988     for nic_op, nic_dict in self.op.nics:
4989       if nic_op == constants.DDM_REMOVE:
4990         nic_addremove += 1
4991         continue
4992       elif nic_op == constants.DDM_ADD:
4993         nic_addremove += 1
4994       else:
4995         if not isinstance(nic_op, int):
4996           raise errors.OpPrereqError("Invalid nic index")
4997
4998       # nic_dict should be a dict
4999       nic_ip = nic_dict.get('ip', None)
5000       if nic_ip is not None:
5001         if nic_ip.lower() == "none":
5002           nic_dict['ip'] = None
5003         else:
5004           if not utils.IsValidIP(nic_ip):
5005             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5006       # we can only check None bridges and assign the default one
5007       nic_bridge = nic_dict.get('bridge', None)
5008       if nic_bridge is None:
5009         nic_dict['bridge'] = self.cfg.GetDefBridge()
5010       # but we can validate MACs
5011       nic_mac = nic_dict.get('mac', None)
5012       if nic_mac is not None:
5013         if self.cfg.IsMacInUse(nic_mac):
5014           raise errors.OpPrereqError("MAC address %s already in use"
5015                                      " in cluster" % nic_mac)
5016         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5017           if not utils.IsValidMac(nic_mac):
5018             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5019     if nic_addremove > 1:
5020       raise errors.OpPrereqError("Only one NIC add or remove operation"
5021                                  " supported at a time")
5022
5023   def ExpandNames(self):
5024     self._ExpandAndLockInstance()
5025     self.needed_locks[locking.LEVEL_NODE] = []
5026     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5027
5028   def DeclareLocks(self, level):
5029     if level == locking.LEVEL_NODE:
5030       self._LockInstancesNodes()
5031
5032   def BuildHooksEnv(self):
5033     """Build hooks env.
5034
5035     This runs on the master, primary and secondaries.
5036
5037     """
5038     args = dict()
5039     if constants.BE_MEMORY in self.be_new:
5040       args['memory'] = self.be_new[constants.BE_MEMORY]
5041     if constants.BE_VCPUS in self.be_new:
5042       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5043     # FIXME: readd disk/nic changes
5044     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5045     nl = [self.cfg.GetMasterNode(),
5046           self.instance.primary_node] + list(self.instance.secondary_nodes)
5047     return env, nl, nl
5048
5049   def CheckPrereq(self):
5050     """Check prerequisites.
5051
5052     This only checks the instance list against the existing names.
5053
5054     """
5055     force = self.force = self.op.force
5056
5057     # checking the new params on the primary/secondary nodes
5058
5059     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5060     assert self.instance is not None, \
5061       "Cannot retrieve locked instance %s" % self.op.instance_name
5062     pnode = self.instance.primary_node
5063     nodelist = [pnode]
5064     nodelist.extend(instance.secondary_nodes)
5065
5066     # hvparams processing
5067     if self.op.hvparams:
5068       i_hvdict = copy.deepcopy(instance.hvparams)
5069       for key, val in self.op.hvparams.iteritems():
5070         if val == constants.VALUE_DEFAULT:
5071           try:
5072             del i_hvdict[key]
5073           except KeyError:
5074             pass
5075         elif val == constants.VALUE_NONE:
5076           i_hvdict[key] = None
5077         else:
5078           i_hvdict[key] = val
5079       cluster = self.cfg.GetClusterInfo()
5080       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5081                                 i_hvdict)
5082       # local check
5083       hypervisor.GetHypervisor(
5084         instance.hypervisor).CheckParameterSyntax(hv_new)
5085       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5086       self.hv_new = hv_new # the new actual values
5087       self.hv_inst = i_hvdict # the new dict (without defaults)
5088     else:
5089       self.hv_new = self.hv_inst = {}
5090
5091     # beparams processing
5092     if self.op.beparams:
5093       i_bedict = copy.deepcopy(instance.beparams)
5094       for key, val in self.op.beparams.iteritems():
5095         if val == constants.VALUE_DEFAULT:
5096           try:
5097             del i_bedict[key]
5098           except KeyError:
5099             pass
5100         else:
5101           i_bedict[key] = val
5102       cluster = self.cfg.GetClusterInfo()
5103       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5104                                 i_bedict)
5105       self.be_new = be_new # the new actual values
5106       self.be_inst = i_bedict # the new dict (without defaults)
5107     else:
5108       self.be_new = self.be_inst = {}
5109
5110     self.warn = []
5111
5112     if constants.BE_MEMORY in self.op.beparams and not self.force:
5113       mem_check_list = [pnode]
5114       if be_new[constants.BE_AUTO_BALANCE]:
5115         # either we changed auto_balance to yes or it was from before
5116         mem_check_list.extend(instance.secondary_nodes)
5117       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5118                                                   instance.hypervisor)
5119       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5120                                          instance.hypervisor)
5121       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5122         # Assume the primary node is unreachable and go ahead
5123         self.warn.append("Can't get info from primary node %s" % pnode)
5124       else:
5125         if not instance_info.failed and instance_info.data:
5126           current_mem = instance_info.data['memory']
5127         else:
5128           # Assume instance not running
5129           # (there is a slight race condition here, but it's not very probable,
5130           # and we have no other way to check)
5131           current_mem = 0
5132         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5133                     nodeinfo[pnode].data['memory_free'])
5134         if miss_mem > 0:
5135           raise errors.OpPrereqError("This change will prevent the instance"
5136                                      " from starting, due to %d MB of memory"
5137                                      " missing on its primary node" % miss_mem)
5138
5139       if be_new[constants.BE_AUTO_BALANCE]:
5140         for node, nres in instance.secondary_nodes.iteritems():
5141           if nres.failed or not isinstance(nres.data, dict):
5142             self.warn.append("Can't get info from secondary node %s" % node)
5143           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5144             self.warn.append("Not enough memory to failover instance to"
5145                              " secondary node %s" % node)
5146
5147     # NIC processing
5148     for nic_op, nic_dict in self.op.nics:
5149       if nic_op == constants.DDM_REMOVE:
5150         if not instance.nics:
5151           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5152         continue
5153       if nic_op != constants.DDM_ADD:
5154         # an existing nic
5155         if nic_op < 0 or nic_op >= len(instance.nics):
5156           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5157                                      " are 0 to %d" %
5158                                      (nic_op, len(instance.nics)))
5159       nic_bridge = nic_dict.get('bridge', None)
5160       if nic_bridge is not None:
5161         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5162           msg = ("Bridge '%s' doesn't exist on one of"
5163                  " the instance nodes" % nic_bridge)
5164           if self.force:
5165             self.warn.append(msg)
5166           else:
5167             raise errors.OpPrereqError(msg)
5168
5169     # DISK processing
5170     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5171       raise errors.OpPrereqError("Disk operations not supported for"
5172                                  " diskless instances")
5173     for disk_op, disk_dict in self.op.disks:
5174       if disk_op == constants.DDM_REMOVE:
5175         if len(instance.disks) == 1:
5176           raise errors.OpPrereqError("Cannot remove the last disk of"
5177                                      " an instance")
5178         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5179         ins_l = ins_l[pnode]
5180         if not type(ins_l) is list:
5181           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5182         if instance.name in ins_l:
5183           raise errors.OpPrereqError("Instance is running, can't remove"
5184                                      " disks.")
5185
5186       if (disk_op == constants.DDM_ADD and
5187           len(instance.nics) >= constants.MAX_DISKS):
5188         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5189                                    " add more" % constants.MAX_DISKS)
5190       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5191         # an existing disk
5192         if disk_op < 0 or disk_op >= len(instance.disks):
5193           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5194                                      " are 0 to %d" %
5195                                      (disk_op, len(instance.disks)))
5196
5197     return
5198
5199   def Exec(self, feedback_fn):
5200     """Modifies an instance.
5201
5202     All parameters take effect only at the next restart of the instance.
5203
5204     """
5205     # Process here the warnings from CheckPrereq, as we don't have a
5206     # feedback_fn there.
5207     for warn in self.warn:
5208       feedback_fn("WARNING: %s" % warn)
5209
5210     result = []
5211     instance = self.instance
5212     # disk changes
5213     for disk_op, disk_dict in self.op.disks:
5214       if disk_op == constants.DDM_REMOVE:
5215         # remove the last disk
5216         device = instance.disks.pop()
5217         device_idx = len(instance.disks)
5218         for node, disk in device.ComputeNodeTree(instance.primary_node):
5219           self.cfg.SetDiskID(disk, node)
5220           result = self.rpc.call_blockdev_remove(node, disk)
5221           if result.failed or not result.data:
5222             self.proc.LogWarning("Could not remove disk/%d on node %s,"
5223                                  " continuing anyway", device_idx, node)
5224         result.append(("disk/%d" % device_idx, "remove"))
5225       elif disk_op == constants.DDM_ADD:
5226         # add a new disk
5227         if instance.disk_template == constants.DT_FILE:
5228           file_driver, file_path = instance.disks[0].logical_id
5229           file_path = os.path.dirname(file_path)
5230         else:
5231           file_driver = file_path = None
5232         disk_idx_base = len(instance.disks)
5233         new_disk = _GenerateDiskTemplate(self,
5234                                          instance.disk_template,
5235                                          instance, instance.primary_node,
5236                                          instance.secondary_nodes,
5237                                          [disk_dict],
5238                                          file_path,
5239                                          file_driver,
5240                                          disk_idx_base)[0]
5241         new_disk.mode = disk_dict['mode']
5242         instance.disks.append(new_disk)
5243         info = _GetInstanceInfoText(instance)
5244
5245         logging.info("Creating volume %s for instance %s",
5246                      new_disk.iv_name, instance.name)
5247         # Note: this needs to be kept in sync with _CreateDisks
5248         #HARDCODE
5249         for secondary_node in instance.secondary_nodes:
5250           if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5251                                             new_disk, False, info):
5252             self.LogWarning("Failed to create volume %s (%s) on"
5253                             " secondary node %s!",
5254                             new_disk.iv_name, new_disk, secondary_node)
5255         #HARDCODE
5256         if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5257                                         instance, new_disk, info):
5258           self.LogWarning("Failed to create volume %s on primary!",
5259                           new_disk.iv_name)
5260         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5261                        (new_disk.size, new_disk.mode)))
5262       else:
5263         # change a given disk
5264         instance.disks[disk_op].mode = disk_dict['mode']
5265         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5266     # NIC changes
5267     for nic_op, nic_dict in self.op.nics:
5268       if nic_op == constants.DDM_REMOVE:
5269         # remove the last nic
5270         del instance.nics[-1]
5271         result.append(("nic.%d" % len(instance.nics), "remove"))
5272       elif nic_op == constants.DDM_ADD:
5273         # add a new nic
5274         if 'mac' not in nic_dict:
5275           mac = constants.VALUE_GENERATE
5276         else:
5277           mac = nic_dict['mac']
5278         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5279           mac = self.cfg.GenerateMAC()
5280         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5281                               bridge=nic_dict.get('bridge', None))
5282         instance.nics.append(new_nic)
5283         result.append(("nic.%d" % (len(instance.nics) - 1),
5284                        "add:mac=%s,ip=%s,bridge=%s" %
5285                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5286       else:
5287         # change a given nic
5288         for key in 'mac', 'ip', 'bridge':
5289           if key in nic_dict:
5290             setattr(instance.nics[nic_op], key, nic_dict[key])
5291             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5292
5293     # hvparams changes
5294     if self.op.hvparams:
5295       instance.hvparams = self.hv_new
5296       for key, val in self.op.hvparams.iteritems():
5297         result.append(("hv/%s" % key, val))
5298
5299     # beparams changes
5300     if self.op.beparams:
5301       instance.beparams = self.be_inst
5302       for key, val in self.op.beparams.iteritems():
5303         result.append(("be/%s" % key, val))
5304
5305     self.cfg.Update(instance)
5306
5307     return result
5308
5309
5310 class LUQueryExports(NoHooksLU):
5311   """Query the exports list
5312
5313   """
5314   _OP_REQP = ['nodes']
5315   REQ_BGL = False
5316
5317   def ExpandNames(self):
5318     self.needed_locks = {}
5319     self.share_locks[locking.LEVEL_NODE] = 1
5320     if not self.op.nodes:
5321       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5322     else:
5323       self.needed_locks[locking.LEVEL_NODE] = \
5324         _GetWantedNodes(self, self.op.nodes)
5325
5326   def CheckPrereq(self):
5327     """Check prerequisites.
5328
5329     """
5330     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5331
5332   def Exec(self, feedback_fn):
5333     """Compute the list of all the exported system images.
5334
5335     @rtype: dict
5336     @return: a dictionary with the structure node->(export-list)
5337         where export-list is a list of the instances exported on
5338         that node.
5339
5340     """
5341     result = self.rpc.call_export_list(self.nodes)
5342     result.Raise()
5343     return result.data
5344
5345
5346 class LUExportInstance(LogicalUnit):
5347   """Export an instance to an image in the cluster.
5348
5349   """
5350   HPATH = "instance-export"
5351   HTYPE = constants.HTYPE_INSTANCE
5352   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5353   REQ_BGL = False
5354
5355   def ExpandNames(self):
5356     self._ExpandAndLockInstance()
5357     # FIXME: lock only instance primary and destination node
5358     #
5359     # Sad but true, for now we have do lock all nodes, as we don't know where
5360     # the previous export might be, and and in this LU we search for it and
5361     # remove it from its current node. In the future we could fix this by:
5362     #  - making a tasklet to search (share-lock all), then create the new one,
5363     #    then one to remove, after
5364     #  - removing the removal operation altoghether
5365     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5366
5367   def DeclareLocks(self, level):
5368     """Last minute lock declaration."""
5369     # All nodes are locked anyway, so nothing to do here.
5370
5371   def BuildHooksEnv(self):
5372     """Build hooks env.
5373
5374     This will run on the master, primary node and target node.
5375
5376     """
5377     env = {
5378       "EXPORT_NODE": self.op.target_node,
5379       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5380       }
5381     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5382     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5383           self.op.target_node]
5384     return env, nl, nl
5385
5386   def CheckPrereq(self):
5387     """Check prerequisites.
5388
5389     This checks that the instance and node names are valid.
5390
5391     """
5392     instance_name = self.op.instance_name
5393     self.instance = self.cfg.GetInstanceInfo(instance_name)
5394     assert self.instance is not None, \
5395           "Cannot retrieve locked instance %s" % self.op.instance_name
5396
5397     self.dst_node = self.cfg.GetNodeInfo(
5398       self.cfg.ExpandNodeName(self.op.target_node))
5399
5400     if self.dst_node is None:
5401       # This is wrong node name, not a non-locked node
5402       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5403
5404     # instance disk type verification
5405     for disk in self.instance.disks:
5406       if disk.dev_type == constants.LD_FILE:
5407         raise errors.OpPrereqError("Export not supported for instances with"
5408                                    " file-based disks")
5409
5410   def Exec(self, feedback_fn):
5411     """Export an instance to an image in the cluster.
5412
5413     """
5414     instance = self.instance
5415     dst_node = self.dst_node
5416     src_node = instance.primary_node
5417     if self.op.shutdown:
5418       # shutdown the instance, but not the disks
5419       result = self.rpc.call_instance_shutdown(src_node, instance)
5420       result.Raise()
5421       if not result.data:
5422         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5423                                  (instance.name, src_node))
5424
5425     vgname = self.cfg.GetVGName()
5426
5427     snap_disks = []
5428
5429     try:
5430       for disk in instance.disks:
5431         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5432         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5433         if new_dev_name.failed or not new_dev_name.data:
5434           self.LogWarning("Could not snapshot block device %s on node %s",
5435                           disk.logical_id[1], src_node)
5436           snap_disks.append(False)
5437         else:
5438           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5439                                  logical_id=(vgname, new_dev_name.data),
5440                                  physical_id=(vgname, new_dev_name.data),
5441                                  iv_name=disk.iv_name)
5442           snap_disks.append(new_dev)
5443
5444     finally:
5445       if self.op.shutdown and instance.status == "up":
5446         result = self.rpc.call_instance_start(src_node, instance, None)
5447         if result.failed or not result.data:
5448           _ShutdownInstanceDisks(self, instance)
5449           raise errors.OpExecError("Could not start instance")
5450
5451     # TODO: check for size
5452
5453     cluster_name = self.cfg.GetClusterName()
5454     for idx, dev in enumerate(snap_disks):
5455       if dev:
5456         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5457                                                instance, cluster_name, idx)
5458         if result.failed or not result.data:
5459           self.LogWarning("Could not export block device %s from node %s to"
5460                           " node %s", dev.logical_id[1], src_node,
5461                           dst_node.name)
5462         result = self.rpc.call_blockdev_remove(src_node, dev)
5463         if result.failed or not result.data:
5464           self.LogWarning("Could not remove snapshot block device %s from node"
5465                           " %s", dev.logical_id[1], src_node)
5466
5467     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5468     if result.failed or not result.data:
5469       self.LogWarning("Could not finalize export for instance %s on node %s",
5470                       instance.name, dst_node.name)
5471
5472     nodelist = self.cfg.GetNodeList()
5473     nodelist.remove(dst_node.name)
5474
5475     # on one-node clusters nodelist will be empty after the removal
5476     # if we proceed the backup would be removed because OpQueryExports
5477     # substitutes an empty list with the full cluster node list.
5478     if nodelist:
5479       exportlist = self.rpc.call_export_list(nodelist)
5480       for node in exportlist:
5481         if exportlist[node].failed:
5482           continue
5483         if instance.name in exportlist[node].data:
5484           if not self.rpc.call_export_remove(node, instance.name):
5485             self.LogWarning("Could not remove older export for instance %s"
5486                             " on node %s", instance.name, node)
5487
5488
5489 class LURemoveExport(NoHooksLU):
5490   """Remove exports related to the named instance.
5491
5492   """
5493   _OP_REQP = ["instance_name"]
5494   REQ_BGL = False
5495
5496   def ExpandNames(self):
5497     self.needed_locks = {}
5498     # We need all nodes to be locked in order for RemoveExport to work, but we
5499     # don't need to lock the instance itself, as nothing will happen to it (and
5500     # we can remove exports also for a removed instance)
5501     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5502
5503   def CheckPrereq(self):
5504     """Check prerequisites.
5505     """
5506     pass
5507
5508   def Exec(self, feedback_fn):
5509     """Remove any export.
5510
5511     """
5512     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5513     # If the instance was not found we'll try with the name that was passed in.
5514     # This will only work if it was an FQDN, though.
5515     fqdn_warn = False
5516     if not instance_name:
5517       fqdn_warn = True
5518       instance_name = self.op.instance_name
5519
5520     exportlist = self.rpc.call_export_list(self.acquired_locks[
5521       locking.LEVEL_NODE])
5522     found = False
5523     for node in exportlist:
5524       if exportlist[node].failed:
5525         self.LogWarning("Failed to query node %s, continuing" % node)
5526         continue
5527       if instance_name in exportlist[node].data:
5528         found = True
5529         result = self.rpc.call_export_remove(node, instance_name)
5530         if result.failed or not result.data:
5531           logging.error("Could not remove export for instance %s"
5532                         " on node %s", instance_name, node)
5533
5534     if fqdn_warn and not found:
5535       feedback_fn("Export not found. If trying to remove an export belonging"
5536                   " to a deleted instance please use its Fully Qualified"
5537                   " Domain Name.")
5538
5539
5540 class TagsLU(NoHooksLU):
5541   """Generic tags LU.
5542
5543   This is an abstract class which is the parent of all the other tags LUs.
5544
5545   """
5546
5547   def ExpandNames(self):
5548     self.needed_locks = {}
5549     if self.op.kind == constants.TAG_NODE:
5550       name = self.cfg.ExpandNodeName(self.op.name)
5551       if name is None:
5552         raise errors.OpPrereqError("Invalid node name (%s)" %
5553                                    (self.op.name,))
5554       self.op.name = name
5555       self.needed_locks[locking.LEVEL_NODE] = name
5556     elif self.op.kind == constants.TAG_INSTANCE:
5557       name = self.cfg.ExpandInstanceName(self.op.name)
5558       if name is None:
5559         raise errors.OpPrereqError("Invalid instance name (%s)" %
5560                                    (self.op.name,))
5561       self.op.name = name
5562       self.needed_locks[locking.LEVEL_INSTANCE] = name
5563
5564   def CheckPrereq(self):
5565     """Check prerequisites.
5566
5567     """
5568     if self.op.kind == constants.TAG_CLUSTER:
5569       self.target = self.cfg.GetClusterInfo()
5570     elif self.op.kind == constants.TAG_NODE:
5571       self.target = self.cfg.GetNodeInfo(self.op.name)
5572     elif self.op.kind == constants.TAG_INSTANCE:
5573       self.target = self.cfg.GetInstanceInfo(self.op.name)
5574     else:
5575       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5576                                  str(self.op.kind))
5577
5578
5579 class LUGetTags(TagsLU):
5580   """Returns the tags of a given object.
5581
5582   """
5583   _OP_REQP = ["kind", "name"]
5584   REQ_BGL = False
5585
5586   def Exec(self, feedback_fn):
5587     """Returns the tag list.
5588
5589     """
5590     return list(self.target.GetTags())
5591
5592
5593 class LUSearchTags(NoHooksLU):
5594   """Searches the tags for a given pattern.
5595
5596   """
5597   _OP_REQP = ["pattern"]
5598   REQ_BGL = False
5599
5600   def ExpandNames(self):
5601     self.needed_locks = {}
5602
5603   def CheckPrereq(self):
5604     """Check prerequisites.
5605
5606     This checks the pattern passed for validity by compiling it.
5607
5608     """
5609     try:
5610       self.re = re.compile(self.op.pattern)
5611     except re.error, err:
5612       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5613                                  (self.op.pattern, err))
5614
5615   def Exec(self, feedback_fn):
5616     """Returns the tag list.
5617
5618     """
5619     cfg = self.cfg
5620     tgts = [("/cluster", cfg.GetClusterInfo())]
5621     ilist = cfg.GetAllInstancesInfo().values()
5622     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5623     nlist = cfg.GetAllNodesInfo().values()
5624     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5625     results = []
5626     for path, target in tgts:
5627       for tag in target.GetTags():
5628         if self.re.search(tag):
5629           results.append((path, tag))
5630     return results
5631
5632
5633 class LUAddTags(TagsLU):
5634   """Sets a tag on a given object.
5635
5636   """
5637   _OP_REQP = ["kind", "name", "tags"]
5638   REQ_BGL = False
5639
5640   def CheckPrereq(self):
5641     """Check prerequisites.
5642
5643     This checks the type and length of the tag name and value.
5644
5645     """
5646     TagsLU.CheckPrereq(self)
5647     for tag in self.op.tags:
5648       objects.TaggableObject.ValidateTag(tag)
5649
5650   def Exec(self, feedback_fn):
5651     """Sets the tag.
5652
5653     """
5654     try:
5655       for tag in self.op.tags:
5656         self.target.AddTag(tag)
5657     except errors.TagError, err:
5658       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5659     try:
5660       self.cfg.Update(self.target)
5661     except errors.ConfigurationError:
5662       raise errors.OpRetryError("There has been a modification to the"
5663                                 " config file and the operation has been"
5664                                 " aborted. Please retry.")
5665
5666
5667 class LUDelTags(TagsLU):
5668   """Delete a list of tags from a given object.
5669
5670   """
5671   _OP_REQP = ["kind", "name", "tags"]
5672   REQ_BGL = False
5673
5674   def CheckPrereq(self):
5675     """Check prerequisites.
5676
5677     This checks that we have the given tag.
5678
5679     """
5680     TagsLU.CheckPrereq(self)
5681     for tag in self.op.tags:
5682       objects.TaggableObject.ValidateTag(tag)
5683     del_tags = frozenset(self.op.tags)
5684     cur_tags = self.target.GetTags()
5685     if not del_tags <= cur_tags:
5686       diff_tags = del_tags - cur_tags
5687       diff_names = ["'%s'" % tag for tag in diff_tags]
5688       diff_names.sort()
5689       raise errors.OpPrereqError("Tag(s) %s not found" %
5690                                  (",".join(diff_names)))
5691
5692   def Exec(self, feedback_fn):
5693     """Remove the tag from the object.
5694
5695     """
5696     for tag in self.op.tags:
5697       self.target.RemoveTag(tag)
5698     try:
5699       self.cfg.Update(self.target)
5700     except errors.ConfigurationError:
5701       raise errors.OpRetryError("There has been a modification to the"
5702                                 " config file and the operation has been"
5703                                 " aborted. Please retry.")
5704
5705
5706 class LUTestDelay(NoHooksLU):
5707   """Sleep for a specified amount of time.
5708
5709   This LU sleeps on the master and/or nodes for a specified amount of
5710   time.
5711
5712   """
5713   _OP_REQP = ["duration", "on_master", "on_nodes"]
5714   REQ_BGL = False
5715
5716   def ExpandNames(self):
5717     """Expand names and set required locks.
5718
5719     This expands the node list, if any.
5720
5721     """
5722     self.needed_locks = {}
5723     if self.op.on_nodes:
5724       # _GetWantedNodes can be used here, but is not always appropriate to use
5725       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5726       # more information.
5727       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5728       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5729
5730   def CheckPrereq(self):
5731     """Check prerequisites.
5732
5733     """
5734
5735   def Exec(self, feedback_fn):
5736     """Do the actual sleep.
5737
5738     """
5739     if self.op.on_master:
5740       if not utils.TestDelay(self.op.duration):
5741         raise errors.OpExecError("Error during master delay test")
5742     if self.op.on_nodes:
5743       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5744       if not result:
5745         raise errors.OpExecError("Complete failure from rpc call")
5746       for node, node_result in result.items():
5747         node_result.Raise()
5748         if not node_result.data:
5749           raise errors.OpExecError("Failure during rpc call to node %s,"
5750                                    " result: %s" % (node, node_result.data))
5751
5752
5753 class IAllocator(object):
5754   """IAllocator framework.
5755
5756   An IAllocator instance has three sets of attributes:
5757     - cfg that is needed to query the cluster
5758     - input data (all members of the _KEYS class attribute are required)
5759     - four buffer attributes (in|out_data|text), that represent the
5760       input (to the external script) in text and data structure format,
5761       and the output from it, again in two formats
5762     - the result variables from the script (success, info, nodes) for
5763       easy usage
5764
5765   """
5766   _ALLO_KEYS = [
5767     "mem_size", "disks", "disk_template",
5768     "os", "tags", "nics", "vcpus", "hypervisor",
5769     ]
5770   _RELO_KEYS = [
5771     "relocate_from",
5772     ]
5773
5774   def __init__(self, lu, mode, name, **kwargs):
5775     self.lu = lu
5776     # init buffer variables
5777     self.in_text = self.out_text = self.in_data = self.out_data = None
5778     # init all input fields so that pylint is happy
5779     self.mode = mode
5780     self.name = name
5781     self.mem_size = self.disks = self.disk_template = None
5782     self.os = self.tags = self.nics = self.vcpus = None
5783     self.relocate_from = None
5784     # computed fields
5785     self.required_nodes = None
5786     # init result fields
5787     self.success = self.info = self.nodes = None
5788     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5789       keyset = self._ALLO_KEYS
5790     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5791       keyset = self._RELO_KEYS
5792     else:
5793       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5794                                    " IAllocator" % self.mode)
5795     for key in kwargs:
5796       if key not in keyset:
5797         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5798                                      " IAllocator" % key)
5799       setattr(self, key, kwargs[key])
5800     for key in keyset:
5801       if key not in kwargs:
5802         raise errors.ProgrammerError("Missing input parameter '%s' to"
5803                                      " IAllocator" % key)
5804     self._BuildInputData()
5805
5806   def _ComputeClusterData(self):
5807     """Compute the generic allocator input data.
5808
5809     This is the data that is independent of the actual operation.
5810
5811     """
5812     cfg = self.lu.cfg
5813     cluster_info = cfg.GetClusterInfo()
5814     # cluster data
5815     data = {
5816       "version": 1,
5817       "cluster_name": cfg.GetClusterName(),
5818       "cluster_tags": list(cluster_info.GetTags()),
5819       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5820       # we don't have job IDs
5821       }
5822     iinfo = cfg.GetAllInstancesInfo().values()
5823     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5824
5825     # node data
5826     node_results = {}
5827     node_list = cfg.GetNodeList()
5828
5829     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5830       hypervisor = self.hypervisor
5831     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5832       hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5833
5834     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5835                                            hypervisor)
5836     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5837                        cluster_info.enabled_hypervisors)
5838     for nname in node_list:
5839       ninfo = cfg.GetNodeInfo(nname)
5840       node_data[nname].Raise()
5841       if not isinstance(node_data[nname].data, dict):
5842         raise errors.OpExecError("Can't get data for node %s" % nname)
5843       remote_info = node_data[nname].data
5844       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5845                    'vg_size', 'vg_free', 'cpu_total']:
5846         if attr not in remote_info:
5847           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5848                                    (nname, attr))
5849         try:
5850           remote_info[attr] = int(remote_info[attr])
5851         except ValueError, err:
5852           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5853                                    " %s" % (nname, attr, str(err)))
5854       # compute memory used by primary instances
5855       i_p_mem = i_p_up_mem = 0
5856       for iinfo, beinfo in i_list:
5857         if iinfo.primary_node == nname:
5858           i_p_mem += beinfo[constants.BE_MEMORY]
5859           if iinfo.name not in node_iinfo[nname]:
5860             i_used_mem = 0
5861           else:
5862             i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5863           i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5864           remote_info['memory_free'] -= max(0, i_mem_diff)
5865
5866           if iinfo.status == "up":
5867             i_p_up_mem += beinfo[constants.BE_MEMORY]
5868
5869       # compute memory used by instances
5870       pnr = {
5871         "tags": list(ninfo.GetTags()),
5872         "total_memory": remote_info['memory_total'],
5873         "reserved_memory": remote_info['memory_dom0'],
5874         "free_memory": remote_info['memory_free'],
5875         "i_pri_memory": i_p_mem,
5876         "i_pri_up_memory": i_p_up_mem,
5877         "total_disk": remote_info['vg_size'],
5878         "free_disk": remote_info['vg_free'],
5879         "primary_ip": ninfo.primary_ip,
5880         "secondary_ip": ninfo.secondary_ip,
5881         "total_cpus": remote_info['cpu_total'],
5882         }
5883       node_results[nname] = pnr
5884     data["nodes"] = node_results
5885
5886     # instance data
5887     instance_data = {}
5888     for iinfo, beinfo in i_list:
5889       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5890                   for n in iinfo.nics]
5891       pir = {
5892         "tags": list(iinfo.GetTags()),
5893         "should_run": iinfo.status == "up",
5894         "vcpus": beinfo[constants.BE_VCPUS],
5895         "memory": beinfo[constants.BE_MEMORY],
5896         "os": iinfo.os,
5897         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5898         "nics": nic_data,
5899         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5900         "disk_template": iinfo.disk_template,
5901         "hypervisor": iinfo.hypervisor,
5902         }
5903       instance_data[iinfo.name] = pir
5904
5905     data["instances"] = instance_data
5906
5907     self.in_data = data
5908
5909   def _AddNewInstance(self):
5910     """Add new instance data to allocator structure.
5911
5912     This in combination with _AllocatorGetClusterData will create the
5913     correct structure needed as input for the allocator.
5914
5915     The checks for the completeness of the opcode must have already been
5916     done.
5917
5918     """
5919     data = self.in_data
5920     if len(self.disks) != 2:
5921       raise errors.OpExecError("Only two-disk configurations supported")
5922
5923     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5924
5925     if self.disk_template in constants.DTS_NET_MIRROR:
5926       self.required_nodes = 2
5927     else:
5928       self.required_nodes = 1
5929     request = {
5930       "type": "allocate",
5931       "name": self.name,
5932       "disk_template": self.disk_template,
5933       "tags": self.tags,
5934       "os": self.os,
5935       "vcpus": self.vcpus,
5936       "memory": self.mem_size,
5937       "disks": self.disks,
5938       "disk_space_total": disk_space,
5939       "nics": self.nics,
5940       "required_nodes": self.required_nodes,
5941       }
5942     data["request"] = request
5943
5944   def _AddRelocateInstance(self):
5945     """Add relocate instance data to allocator structure.
5946
5947     This in combination with _IAllocatorGetClusterData will create the
5948     correct structure needed as input for the allocator.
5949
5950     The checks for the completeness of the opcode must have already been
5951     done.
5952
5953     """
5954     instance = self.lu.cfg.GetInstanceInfo(self.name)
5955     if instance is None:
5956       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5957                                    " IAllocator" % self.name)
5958
5959     if instance.disk_template not in constants.DTS_NET_MIRROR:
5960       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5961
5962     if len(instance.secondary_nodes) != 1:
5963       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5964
5965     self.required_nodes = 1
5966     disk_sizes = [{'size': disk.size} for disk in instance.disks]
5967     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5968
5969     request = {
5970       "type": "relocate",
5971       "name": self.name,
5972       "disk_space_total": disk_space,
5973       "required_nodes": self.required_nodes,
5974       "relocate_from": self.relocate_from,
5975       }
5976     self.in_data["request"] = request
5977
5978   def _BuildInputData(self):
5979     """Build input data structures.
5980
5981     """
5982     self._ComputeClusterData()
5983
5984     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5985       self._AddNewInstance()
5986     else:
5987       self._AddRelocateInstance()
5988
5989     self.in_text = serializer.Dump(self.in_data)
5990
5991   def Run(self, name, validate=True, call_fn=None):
5992     """Run an instance allocator and return the results.
5993
5994     """
5995     if call_fn is None:
5996       call_fn = self.lu.rpc.call_iallocator_runner
5997     data = self.in_text
5998
5999     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6000     result.Raise()
6001
6002     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6003       raise errors.OpExecError("Invalid result from master iallocator runner")
6004
6005     rcode, stdout, stderr, fail = result.data
6006
6007     if rcode == constants.IARUN_NOTFOUND:
6008       raise errors.OpExecError("Can't find allocator '%s'" % name)
6009     elif rcode == constants.IARUN_FAILURE:
6010       raise errors.OpExecError("Instance allocator call failed: %s,"
6011                                " output: %s" % (fail, stdout+stderr))
6012     self.out_text = stdout
6013     if validate:
6014       self._ValidateResult()
6015
6016   def _ValidateResult(self):
6017     """Process the allocator results.
6018
6019     This will process and if successful save the result in
6020     self.out_data and the other parameters.
6021
6022     """
6023     try:
6024       rdict = serializer.Load(self.out_text)
6025     except Exception, err:
6026       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6027
6028     if not isinstance(rdict, dict):
6029       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6030
6031     for key in "success", "info", "nodes":
6032       if key not in rdict:
6033         raise errors.OpExecError("Can't parse iallocator results:"
6034                                  " missing key '%s'" % key)
6035       setattr(self, key, rdict[key])
6036
6037     if not isinstance(rdict["nodes"], list):
6038       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6039                                " is not a list")
6040     self.out_data = rdict
6041
6042
6043 class LUTestAllocator(NoHooksLU):
6044   """Run allocator tests.
6045
6046   This LU runs the allocator tests
6047
6048   """
6049   _OP_REQP = ["direction", "mode", "name"]
6050
6051   def CheckPrereq(self):
6052     """Check prerequisites.
6053
6054     This checks the opcode parameters depending on the director and mode test.
6055
6056     """
6057     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6058       for attr in ["name", "mem_size", "disks", "disk_template",
6059                    "os", "tags", "nics", "vcpus"]:
6060         if not hasattr(self.op, attr):
6061           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6062                                      attr)
6063       iname = self.cfg.ExpandInstanceName(self.op.name)
6064       if iname is not None:
6065         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6066                                    iname)
6067       if not isinstance(self.op.nics, list):
6068         raise errors.OpPrereqError("Invalid parameter 'nics'")
6069       for row in self.op.nics:
6070         if (not isinstance(row, dict) or
6071             "mac" not in row or
6072             "ip" not in row or
6073             "bridge" not in row):
6074           raise errors.OpPrereqError("Invalid contents of the"
6075                                      " 'nics' parameter")
6076       if not isinstance(self.op.disks, list):
6077         raise errors.OpPrereqError("Invalid parameter 'disks'")
6078       if len(self.op.disks) != 2:
6079         raise errors.OpPrereqError("Only two-disk configurations supported")
6080       for row in self.op.disks:
6081         if (not isinstance(row, dict) or
6082             "size" not in row or
6083             not isinstance(row["size"], int) or
6084             "mode" not in row or
6085             row["mode"] not in ['r', 'w']):
6086           raise errors.OpPrereqError("Invalid contents of the"
6087                                      " 'disks' parameter")
6088       if self.op.hypervisor is None:
6089         self.op.hypervisor = self.cfg.GetHypervisorType()
6090     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6091       if not hasattr(self.op, "name"):
6092         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6093       fname = self.cfg.ExpandInstanceName(self.op.name)
6094       if fname is None:
6095         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6096                                    self.op.name)
6097       self.op.name = fname
6098       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6099     else:
6100       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6101                                  self.op.mode)
6102
6103     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6104       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6105         raise errors.OpPrereqError("Missing allocator name")
6106     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6107       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6108                                  self.op.direction)
6109
6110   def Exec(self, feedback_fn):
6111     """Run the allocator test.
6112
6113     """
6114     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6115       ial = IAllocator(self,
6116                        mode=self.op.mode,
6117                        name=self.op.name,
6118                        mem_size=self.op.mem_size,
6119                        disks=self.op.disks,
6120                        disk_template=self.op.disk_template,
6121                        os=self.op.os,
6122                        tags=self.op.tags,
6123                        nics=self.op.nics,
6124                        vcpus=self.op.vcpus,
6125                        hypervisor=self.op.hypervisor,
6126                        )
6127     else:
6128       ial = IAllocator(self,
6129                        mode=self.op.mode,
6130                        name=self.op.name,
6131                        relocate_from=list(self.relocate_from),
6132                        )
6133
6134     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6135       result = ial.in_text
6136     else:
6137       ial.Run(self.op.allocator, validate=False)
6138       result = ial.out_text
6139     return result