watcher: handle offline nodes better
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = lu.cfg.GetInstanceList()
396   return utils.NiceSort(wanted)
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
419                           memory, vcpus, nics):
420   """Builds instance related env variables for hooks
421
422   This builds the hook environment from individual variables.
423
424   @type name: string
425   @param name: the name of the instance
426   @type primary_node: string
427   @param primary_node: the name of the instance's primary node
428   @type secondary_nodes: list
429   @param secondary_nodes: list of secondary nodes as strings
430   @type os_type: string
431   @param os_type: the name of the instance's OS
432   @type status: string
433   @param status: the desired status of the instances
434   @type memory: string
435   @param memory: the memory size of the instance
436   @type vcpus: string
437   @param vcpus: the count of VCPUs the instance has
438   @type nics: list
439   @param nics: list of tuples (ip, bridge, mac) representing
440       the NICs the instance  has
441   @rtype: dict
442   @return: the hook environment for this instance
443
444   """
445   env = {
446     "OP_TARGET": name,
447     "INSTANCE_NAME": name,
448     "INSTANCE_PRIMARY": primary_node,
449     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
450     "INSTANCE_OS_TYPE": os_type,
451     "INSTANCE_STATUS": status,
452     "INSTANCE_MEMORY": memory,
453     "INSTANCE_VCPUS": vcpus,
454   }
455
456   if nics:
457     nic_count = len(nics)
458     for idx, (ip, bridge, mac) in enumerate(nics):
459       if ip is None:
460         ip = ""
461       env["INSTANCE_NIC%d_IP" % idx] = ip
462       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
463       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
464   else:
465     nic_count = 0
466
467   env["INSTANCE_NIC_COUNT"] = nic_count
468
469   return env
470
471
472 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
473   """Builds instance related env variables for hooks from an object.
474
475   @type lu: L{LogicalUnit}
476   @param lu: the logical unit on whose behalf we execute
477   @type instance: L{objects.Instance}
478   @param instance: the instance for which we should build the
479       environment
480   @type override: dict
481   @param override: dictionary with key/values that will override
482       our values
483   @rtype: dict
484   @return: the hook environment dictionary
485
486   """
487   bep = lu.cfg.GetClusterInfo().FillBE(instance)
488   args = {
489     'name': instance.name,
490     'primary_node': instance.primary_node,
491     'secondary_nodes': instance.secondary_nodes,
492     'os_type': instance.os,
493     'status': instance.os,
494     'memory': bep[constants.BE_MEMORY],
495     'vcpus': bep[constants.BE_VCPUS],
496     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
497   }
498   if override:
499     args.update(override)
500   return _BuildInstanceHookEnv(**args)
501
502
503 def _CheckInstanceBridgesExist(lu, instance):
504   """Check that the brigdes needed by an instance exist.
505
506   """
507   # check bridges existance
508   brlist = [nic.bridge for nic in instance.nics]
509   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
510   result.Raise()
511   if not result.data:
512     raise errors.OpPrereqError("One or more target bridges %s does not"
513                                " exist on destination node '%s'" %
514                                (brlist, instance.primary_node))
515
516
517 class LUDestroyCluster(NoHooksLU):
518   """Logical unit for destroying the cluster.
519
520   """
521   _OP_REQP = []
522
523   def CheckPrereq(self):
524     """Check prerequisites.
525
526     This checks whether the cluster is empty.
527
528     Any errors are signalled by raising errors.OpPrereqError.
529
530     """
531     master = self.cfg.GetMasterNode()
532
533     nodelist = self.cfg.GetNodeList()
534     if len(nodelist) != 1 or nodelist[0] != master:
535       raise errors.OpPrereqError("There are still %d node(s) in"
536                                  " this cluster." % (len(nodelist) - 1))
537     instancelist = self.cfg.GetInstanceList()
538     if instancelist:
539       raise errors.OpPrereqError("There are still %d instance(s) in"
540                                  " this cluster." % len(instancelist))
541
542   def Exec(self, feedback_fn):
543     """Destroys the cluster.
544
545     """
546     master = self.cfg.GetMasterNode()
547     result = self.rpc.call_node_stop_master(master, False)
548     result.Raise()
549     if not result.data:
550       raise errors.OpExecError("Could not disable the master role")
551     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
552     utils.CreateBackup(priv_key)
553     utils.CreateBackup(pub_key)
554     return master
555
556
557 class LUVerifyCluster(LogicalUnit):
558   """Verifies the cluster status.
559
560   """
561   HPATH = "cluster-verify"
562   HTYPE = constants.HTYPE_CLUSTER
563   _OP_REQP = ["skip_checks"]
564   REQ_BGL = False
565
566   def ExpandNames(self):
567     self.needed_locks = {
568       locking.LEVEL_NODE: locking.ALL_SET,
569       locking.LEVEL_INSTANCE: locking.ALL_SET,
570     }
571     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
572
573   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
574                   node_result, feedback_fn, master_files):
575     """Run multiple tests against a node.
576
577     Test list:
578
579       - compares ganeti version
580       - checks vg existance and size > 20G
581       - checks config file checksum
582       - checks ssh to other nodes
583
584     @type nodeinfo: L{objects.Node}
585     @param nodeinfo: the node to check
586     @param file_list: required list of files
587     @param local_cksum: dictionary of local files and their checksums
588     @param node_result: the results from the node
589     @param feedback_fn: function used to accumulate results
590     @param master_files: list of files that only masters should have
591
592     """
593     node = nodeinfo.name
594
595     # main result, node_result should be a non-empty dict
596     if not node_result or not isinstance(node_result, dict):
597       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
598       return True
599
600     # compares ganeti version
601     local_version = constants.PROTOCOL_VERSION
602     remote_version = node_result.get('version', None)
603     if not remote_version:
604       feedback_fn("  - ERROR: connection to %s failed" % (node))
605       return True
606
607     if local_version != remote_version:
608       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
609                       (local_version, node, remote_version))
610       return True
611
612     # checks vg existance and size > 20G
613
614     bad = False
615     vglist = node_result.get(constants.NV_VGLIST, None)
616     if not vglist:
617       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
618                       (node,))
619       bad = True
620     else:
621       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
622                                             constants.MIN_VG_SIZE)
623       if vgstatus:
624         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
625         bad = True
626
627     # checks config file checksum
628
629     remote_cksum = node_result.get(constants.NV_FILELIST, None)
630     if not isinstance(remote_cksum, dict):
631       bad = True
632       feedback_fn("  - ERROR: node hasn't returned file checksum data")
633     else:
634       for file_name in file_list:
635         node_is_mc = nodeinfo.master_candidate
636         must_have_file = file_name not in master_files
637         if file_name not in remote_cksum:
638           if node_is_mc or must_have_file:
639             bad = True
640             feedback_fn("  - ERROR: file '%s' missing" % file_name)
641         elif remote_cksum[file_name] != local_cksum[file_name]:
642           if node_is_mc or must_have_file:
643             bad = True
644             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
645           else:
646             # not candidate and this is not a must-have file
647             bad = True
648             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
649                         " '%s'" % file_name)
650         else:
651           # all good, except non-master/non-must have combination
652           if not node_is_mc and not must_have_file:
653             feedback_fn("  - ERROR: file '%s' should not exist on non master"
654                         " candidates" % file_name)
655
656     # checks ssh to any
657
658     if constants.NV_NODELIST not in node_result:
659       bad = True
660       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
661     else:
662       if node_result[constants.NV_NODELIST]:
663         bad = True
664         for node in node_result[constants.NV_NODELIST]:
665           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
666                           (node, node_result[constants.NV_NODELIST][node]))
667
668     if constants.NV_NODENETTEST not in node_result:
669       bad = True
670       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
671     else:
672       if node_result[constants.NV_NODENETTEST]:
673         bad = True
674         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
675         for node in nlist:
676           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
677                           (node, node_result[constants.NV_NODENETTEST][node]))
678
679     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
680     if isinstance(hyp_result, dict):
681       for hv_name, hv_result in hyp_result.iteritems():
682         if hv_result is not None:
683           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
684                       (hv_name, hv_result))
685     return bad
686
687   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688                       node_instance, feedback_fn):
689     """Verify an instance.
690
691     This function checks to see if the required block devices are
692     available on the instance's node.
693
694     """
695     bad = False
696
697     node_current = instanceconfig.primary_node
698
699     node_vol_should = {}
700     instanceconfig.MapLVsByNode(node_vol_should)
701
702     for node in node_vol_should:
703       for volume in node_vol_should[node]:
704         if node not in node_vol_is or volume not in node_vol_is[node]:
705           feedback_fn("  - ERROR: volume %s missing on node %s" %
706                           (volume, node))
707           bad = True
708
709     if not instanceconfig.status == 'down':
710       if (node_current not in node_instance or
711           not instance in node_instance[node_current]):
712         feedback_fn("  - ERROR: instance %s not running on node %s" %
713                         (instance, node_current))
714         bad = True
715
716     for node in node_instance:
717       if (not node == node_current):
718         if instance in node_instance[node]:
719           feedback_fn("  - ERROR: instance %s should not run on node %s" %
720                           (instance, node))
721           bad = True
722
723     return bad
724
725   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726     """Verify if there are any unknown volumes in the cluster.
727
728     The .os, .swap and backup volumes are ignored. All other volumes are
729     reported as unknown.
730
731     """
732     bad = False
733
734     for node in node_vol_is:
735       for volume in node_vol_is[node]:
736         if node not in node_vol_should or volume not in node_vol_should[node]:
737           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738                       (volume, node))
739           bad = True
740     return bad
741
742   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743     """Verify the list of running instances.
744
745     This checks what instances are running but unknown to the cluster.
746
747     """
748     bad = False
749     for node in node_instance:
750       for runninginstance in node_instance[node]:
751         if runninginstance not in instancelist:
752           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753                           (runninginstance, node))
754           bad = True
755     return bad
756
757   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758     """Verify N+1 Memory Resilience.
759
760     Check that if one single node dies we can still start all the instances it
761     was primary for.
762
763     """
764     bad = False
765
766     for node, nodeinfo in node_info.iteritems():
767       # This code checks that every node which is now listed as secondary has
768       # enough memory to host all instances it is supposed to should a single
769       # other node in the cluster fail.
770       # FIXME: not ready for failover to an arbitrary node
771       # FIXME: does not support file-backed instances
772       # WARNING: we currently take into account down instances as well as up
773       # ones, considering that even if they're down someone might want to start
774       # them even in the event of a node failure.
775       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
776         needed_mem = 0
777         for instance in instances:
778           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
779           if bep[constants.BE_AUTO_BALANCE]:
780             needed_mem += bep[constants.BE_MEMORY]
781         if nodeinfo['mfree'] < needed_mem:
782           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
783                       " failovers should node %s fail" % (node, prinode))
784           bad = True
785     return bad
786
787   def CheckPrereq(self):
788     """Check prerequisites.
789
790     Transform the list of checks we're going to skip into a set and check that
791     all its members are valid.
792
793     """
794     self.skip_set = frozenset(self.op.skip_checks)
795     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
796       raise errors.OpPrereqError("Invalid checks to be skipped specified")
797
798   def BuildHooksEnv(self):
799     """Build hooks env.
800
801     Cluster-Verify hooks just rone in the post phase and their failure makes
802     the output be logged in the verify output and the verification to fail.
803
804     """
805     all_nodes = self.cfg.GetNodeList()
806     # TODO: populate the environment with useful information for verify hooks
807     env = {}
808     return env, [], all_nodes
809
810   def Exec(self, feedback_fn):
811     """Verify integrity of cluster, performing various test on nodes.
812
813     """
814     bad = False
815     feedback_fn("* Verifying global settings")
816     for msg in self.cfg.VerifyConfig():
817       feedback_fn("  - ERROR: %s" % msg)
818
819     vg_name = self.cfg.GetVGName()
820     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
821     nodelist = utils.NiceSort(self.cfg.GetNodeList())
822     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
823     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
824     i_non_redundant = [] # Non redundant instances
825     i_non_a_balanced = [] # Non auto-balanced instances
826     node_volume = {}
827     node_instance = {}
828     node_info = {}
829     instance_cfg = {}
830
831     # FIXME: verify OS list
832     # do local checksums
833     master_files = [constants.CLUSTER_CONF_FILE]
834
835     file_names = ssconf.SimpleStore().GetFileList()
836     file_names.append(constants.SSL_CERT_FILE)
837     file_names.extend(master_files)
838
839     local_checksums = utils.FingerprintFiles(file_names)
840
841     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
842     node_verify_param = {
843       constants.NV_FILELIST: file_names,
844       constants.NV_NODELIST: nodelist,
845       constants.NV_HYPERVISOR: hypervisors,
846       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
847                                   node.secondary_ip) for node in nodeinfo],
848       constants.NV_LVLIST: vg_name,
849       constants.NV_INSTANCELIST: hypervisors,
850       constants.NV_VGLIST: None,
851       constants.NV_VERSION: None,
852       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
853       }
854     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
855                                            self.cfg.GetClusterName())
856
857     cluster = self.cfg.GetClusterInfo()
858     master_node = self.cfg.GetMasterNode()
859     for node_i in nodeinfo:
860       node = node_i.name
861       nresult = all_nvinfo[node].data
862
863       if node == master_node:
864         ntype = "master"
865       elif node_i.master_candidate:
866         ntype = "master candidate"
867       else:
868         ntype = "regular"
869       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
870
871       if all_nvinfo[node].failed or not isinstance(nresult, dict):
872         feedback_fn("  - ERROR: connection to %s failed" % (node,))
873         bad = True
874         continue
875
876       result = self._VerifyNode(node_i, file_names, local_checksums,
877                                 nresult, feedback_fn, master_files)
878       bad = bad or result
879
880       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
881       if isinstance(lvdata, basestring):
882         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
883                     (node, lvdata.encode('string_escape')))
884         bad = True
885         node_volume[node] = {}
886       elif not isinstance(lvdata, dict):
887         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
888         bad = True
889         continue
890       else:
891         node_volume[node] = lvdata
892
893       # node_instance
894       idata = nresult.get(constants.NV_INSTANCELIST, None)
895       if not isinstance(idata, list):
896         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
897                     (node,))
898         bad = True
899         continue
900
901       node_instance[node] = idata
902
903       # node_info
904       nodeinfo = nresult.get(constants.NV_HVINFO, None)
905       if not isinstance(nodeinfo, dict):
906         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
907         bad = True
908         continue
909
910       try:
911         node_info[node] = {
912           "mfree": int(nodeinfo['memory_free']),
913           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
914           "pinst": [],
915           "sinst": [],
916           # dictionary holding all instances this node is secondary for,
917           # grouped by their primary node. Each key is a cluster node, and each
918           # value is a list of instances which have the key as primary and the
919           # current node as secondary.  this is handy to calculate N+1 memory
920           # availability if you can only failover from a primary to its
921           # secondary.
922           "sinst-by-pnode": {},
923         }
924       except ValueError:
925         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
926         bad = True
927         continue
928
929     node_vol_should = {}
930
931     for instance in instancelist:
932       feedback_fn("* Verifying instance %s" % instance)
933       inst_config = self.cfg.GetInstanceInfo(instance)
934       result =  self._VerifyInstance(instance, inst_config, node_volume,
935                                      node_instance, feedback_fn)
936       bad = bad or result
937
938       inst_config.MapLVsByNode(node_vol_should)
939
940       instance_cfg[instance] = inst_config
941
942       pnode = inst_config.primary_node
943       if pnode in node_info:
944         node_info[pnode]['pinst'].append(instance)
945       else:
946         feedback_fn("  - ERROR: instance %s, connection to primary node"
947                     " %s failed" % (instance, pnode))
948         bad = True
949
950       # If the instance is non-redundant we cannot survive losing its primary
951       # node, so we are not N+1 compliant. On the other hand we have no disk
952       # templates with more than one secondary so that situation is not well
953       # supported either.
954       # FIXME: does not support file-backed instances
955       if len(inst_config.secondary_nodes) == 0:
956         i_non_redundant.append(instance)
957       elif len(inst_config.secondary_nodes) > 1:
958         feedback_fn("  - WARNING: multiple secondaries for instance %s"
959                     % instance)
960
961       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
962         i_non_a_balanced.append(instance)
963
964       for snode in inst_config.secondary_nodes:
965         if snode in node_info:
966           node_info[snode]['sinst'].append(instance)
967           if pnode not in node_info[snode]['sinst-by-pnode']:
968             node_info[snode]['sinst-by-pnode'][pnode] = []
969           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
970         else:
971           feedback_fn("  - ERROR: instance %s, connection to secondary node"
972                       " %s failed" % (instance, snode))
973
974     feedback_fn("* Verifying orphan volumes")
975     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
976                                        feedback_fn)
977     bad = bad or result
978
979     feedback_fn("* Verifying remaining instances")
980     result = self._VerifyOrphanInstances(instancelist, node_instance,
981                                          feedback_fn)
982     bad = bad or result
983
984     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
985       feedback_fn("* Verifying N+1 Memory redundancy")
986       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
987       bad = bad or result
988
989     feedback_fn("* Other Notes")
990     if i_non_redundant:
991       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
992                   % len(i_non_redundant))
993
994     if i_non_a_balanced:
995       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
996                   % len(i_non_a_balanced))
997
998     return not bad
999
1000   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1001     """Analize the post-hooks' result
1002
1003     This method analyses the hook result, handles it, and sends some
1004     nicely-formatted feedback back to the user.
1005
1006     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1007         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1008     @param hooks_results: the results of the multi-node hooks rpc call
1009     @param feedback_fn: function used send feedback back to the caller
1010     @param lu_result: previous Exec result
1011     @return: the new Exec result, based on the previous result
1012         and hook results
1013
1014     """
1015     # We only really run POST phase hooks, and are only interested in
1016     # their results
1017     if phase == constants.HOOKS_PHASE_POST:
1018       # Used to change hooks' output to proper indentation
1019       indent_re = re.compile('^', re.M)
1020       feedback_fn("* Hooks Results")
1021       if not hooks_results:
1022         feedback_fn("  - ERROR: general communication failure")
1023         lu_result = 1
1024       else:
1025         for node_name in hooks_results:
1026           show_node_header = True
1027           res = hooks_results[node_name]
1028           if res.failed or res.data is False or not isinstance(res.data, list):
1029             feedback_fn("    Communication failure in hooks execution")
1030             lu_result = 1
1031             continue
1032           for script, hkr, output in res.data:
1033             if hkr == constants.HKR_FAIL:
1034               # The node header is only shown once, if there are
1035               # failing hooks on that node
1036               if show_node_header:
1037                 feedback_fn("  Node %s:" % node_name)
1038                 show_node_header = False
1039               feedback_fn("    ERROR: Script %s failed, output:" % script)
1040               output = indent_re.sub('      ', output)
1041               feedback_fn("%s" % output)
1042               lu_result = 1
1043
1044       return lu_result
1045
1046
1047 class LUVerifyDisks(NoHooksLU):
1048   """Verifies the cluster disks status.
1049
1050   """
1051   _OP_REQP = []
1052   REQ_BGL = False
1053
1054   def ExpandNames(self):
1055     self.needed_locks = {
1056       locking.LEVEL_NODE: locking.ALL_SET,
1057       locking.LEVEL_INSTANCE: locking.ALL_SET,
1058     }
1059     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1060
1061   def CheckPrereq(self):
1062     """Check prerequisites.
1063
1064     This has no prerequisites.
1065
1066     """
1067     pass
1068
1069   def Exec(self, feedback_fn):
1070     """Verify integrity of cluster disks.
1071
1072     """
1073     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1074
1075     vg_name = self.cfg.GetVGName()
1076     nodes = utils.NiceSort(self.cfg.GetNodeList())
1077     instances = [self.cfg.GetInstanceInfo(name)
1078                  for name in self.cfg.GetInstanceList()]
1079
1080     nv_dict = {}
1081     for inst in instances:
1082       inst_lvs = {}
1083       if (inst.status != "up" or
1084           inst.disk_template not in constants.DTS_NET_MIRROR):
1085         continue
1086       inst.MapLVsByNode(inst_lvs)
1087       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1088       for node, vol_list in inst_lvs.iteritems():
1089         for vol in vol_list:
1090           nv_dict[(node, vol)] = inst
1091
1092     if not nv_dict:
1093       return result
1094
1095     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1096
1097     to_act = set()
1098     for node in nodes:
1099       # node_volume
1100       lvs = node_lvs[node]
1101       if lvs.failed:
1102         self.LogWarning("Connection to node %s failed: %s" %
1103                         (node, lvs.data))
1104         continue
1105       lvs = lvs.data
1106       if isinstance(lvs, basestring):
1107         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1108         res_nlvm[node] = lvs
1109       elif not isinstance(lvs, dict):
1110         logging.warning("Connection to node %s failed or invalid data"
1111                         " returned", node)
1112         res_nodes.append(node)
1113         continue
1114
1115       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1116         inst = nv_dict.pop((node, lv_name), None)
1117         if (not lv_online and inst is not None
1118             and inst.name not in res_instances):
1119           res_instances.append(inst.name)
1120
1121     # any leftover items in nv_dict are missing LVs, let's arrange the
1122     # data better
1123     for key, inst in nv_dict.iteritems():
1124       if inst.name not in res_missing:
1125         res_missing[inst.name] = []
1126       res_missing[inst.name].append(key)
1127
1128     return result
1129
1130
1131 class LURenameCluster(LogicalUnit):
1132   """Rename the cluster.
1133
1134   """
1135   HPATH = "cluster-rename"
1136   HTYPE = constants.HTYPE_CLUSTER
1137   _OP_REQP = ["name"]
1138
1139   def BuildHooksEnv(self):
1140     """Build hooks env.
1141
1142     """
1143     env = {
1144       "OP_TARGET": self.cfg.GetClusterName(),
1145       "NEW_NAME": self.op.name,
1146       }
1147     mn = self.cfg.GetMasterNode()
1148     return env, [mn], [mn]
1149
1150   def CheckPrereq(self):
1151     """Verify that the passed name is a valid one.
1152
1153     """
1154     hostname = utils.HostInfo(self.op.name)
1155
1156     new_name = hostname.name
1157     self.ip = new_ip = hostname.ip
1158     old_name = self.cfg.GetClusterName()
1159     old_ip = self.cfg.GetMasterIP()
1160     if new_name == old_name and new_ip == old_ip:
1161       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1162                                  " cluster has changed")
1163     if new_ip != old_ip:
1164       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1165         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1166                                    " reachable on the network. Aborting." %
1167                                    new_ip)
1168
1169     self.op.name = new_name
1170
1171   def Exec(self, feedback_fn):
1172     """Rename the cluster.
1173
1174     """
1175     clustername = self.op.name
1176     ip = self.ip
1177
1178     # shutdown the master IP
1179     master = self.cfg.GetMasterNode()
1180     result = self.rpc.call_node_stop_master(master, False)
1181     if result.failed or not result.data:
1182       raise errors.OpExecError("Could not disable the master role")
1183
1184     try:
1185       cluster = self.cfg.GetClusterInfo()
1186       cluster.cluster_name = clustername
1187       cluster.master_ip = ip
1188       self.cfg.Update(cluster)
1189
1190       # update the known hosts file
1191       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1192       node_list = self.cfg.GetNodeList()
1193       try:
1194         node_list.remove(master)
1195       except ValueError:
1196         pass
1197       result = self.rpc.call_upload_file(node_list,
1198                                          constants.SSH_KNOWN_HOSTS_FILE)
1199       for to_node, to_result in result.iteritems():
1200         if to_result.failed or not to_result.data:
1201           logging.error("Copy of file %s to node %s failed", fname, to_node)
1202
1203     finally:
1204       result = self.rpc.call_node_start_master(master, False)
1205       if result.failed or not result.data:
1206         self.LogWarning("Could not re-enable the master role on"
1207                         " the master, please restart manually.")
1208
1209
1210 def _RecursiveCheckIfLVMBased(disk):
1211   """Check if the given disk or its children are lvm-based.
1212
1213   @type disk: L{objects.Disk}
1214   @param disk: the disk to check
1215   @rtype: booleean
1216   @return: boolean indicating whether a LD_LV dev_type was found or not
1217
1218   """
1219   if disk.children:
1220     for chdisk in disk.children:
1221       if _RecursiveCheckIfLVMBased(chdisk):
1222         return True
1223   return disk.dev_type == constants.LD_LV
1224
1225
1226 class LUSetClusterParams(LogicalUnit):
1227   """Change the parameters of the cluster.
1228
1229   """
1230   HPATH = "cluster-modify"
1231   HTYPE = constants.HTYPE_CLUSTER
1232   _OP_REQP = []
1233   REQ_BGL = False
1234
1235   def CheckParameters(self):
1236     """Check parameters
1237
1238     """
1239     if not hasattr(self.op, "candidate_pool_size"):
1240       self.op.candidate_pool_size = None
1241     if self.op.candidate_pool_size is not None:
1242       try:
1243         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1244       except ValueError, err:
1245         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1246                                    str(err))
1247       if self.op.candidate_pool_size < 1:
1248         raise errors.OpPrereqError("At least one master candidate needed")
1249
1250   def ExpandNames(self):
1251     # FIXME: in the future maybe other cluster params won't require checking on
1252     # all nodes to be modified.
1253     self.needed_locks = {
1254       locking.LEVEL_NODE: locking.ALL_SET,
1255     }
1256     self.share_locks[locking.LEVEL_NODE] = 1
1257
1258   def BuildHooksEnv(self):
1259     """Build hooks env.
1260
1261     """
1262     env = {
1263       "OP_TARGET": self.cfg.GetClusterName(),
1264       "NEW_VG_NAME": self.op.vg_name,
1265       }
1266     mn = self.cfg.GetMasterNode()
1267     return env, [mn], [mn]
1268
1269   def CheckPrereq(self):
1270     """Check prerequisites.
1271
1272     This checks whether the given params don't conflict and
1273     if the given volume group is valid.
1274
1275     """
1276     # FIXME: This only works because there is only one parameter that can be
1277     # changed or removed.
1278     if self.op.vg_name is not None and not self.op.vg_name:
1279       instances = self.cfg.GetAllInstancesInfo().values()
1280       for inst in instances:
1281         for disk in inst.disks:
1282           if _RecursiveCheckIfLVMBased(disk):
1283             raise errors.OpPrereqError("Cannot disable lvm storage while"
1284                                        " lvm-based instances exist")
1285
1286     node_list = self.acquired_locks[locking.LEVEL_NODE]
1287
1288     # if vg_name not None, checks given volume group on all nodes
1289     if self.op.vg_name:
1290       vglist = self.rpc.call_vg_list(node_list)
1291       for node in node_list:
1292         if vglist[node].failed:
1293           # ignoring down node
1294           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1295           continue
1296         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1297                                               self.op.vg_name,
1298                                               constants.MIN_VG_SIZE)
1299         if vgstatus:
1300           raise errors.OpPrereqError("Error on node '%s': %s" %
1301                                      (node, vgstatus))
1302
1303     self.cluster = cluster = self.cfg.GetClusterInfo()
1304     # validate beparams changes
1305     if self.op.beparams:
1306       utils.CheckBEParams(self.op.beparams)
1307       self.new_beparams = cluster.FillDict(
1308         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1309
1310     # hypervisor list/parameters
1311     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1312     if self.op.hvparams:
1313       if not isinstance(self.op.hvparams, dict):
1314         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1315       for hv_name, hv_dict in self.op.hvparams.items():
1316         if hv_name not in self.new_hvparams:
1317           self.new_hvparams[hv_name] = hv_dict
1318         else:
1319           self.new_hvparams[hv_name].update(hv_dict)
1320
1321     if self.op.enabled_hypervisors is not None:
1322       self.hv_list = self.op.enabled_hypervisors
1323     else:
1324       self.hv_list = cluster.enabled_hypervisors
1325
1326     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1327       # either the enabled list has changed, or the parameters have, validate
1328       for hv_name, hv_params in self.new_hvparams.items():
1329         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1330             (self.op.enabled_hypervisors and
1331              hv_name in self.op.enabled_hypervisors)):
1332           # either this is a new hypervisor, or its parameters have changed
1333           hv_class = hypervisor.GetHypervisor(hv_name)
1334           hv_class.CheckParameterSyntax(hv_params)
1335           _CheckHVParams(self, node_list, hv_name, hv_params)
1336
1337   def Exec(self, feedback_fn):
1338     """Change the parameters of the cluster.
1339
1340     """
1341     if self.op.vg_name is not None:
1342       if self.op.vg_name != self.cfg.GetVGName():
1343         self.cfg.SetVGName(self.op.vg_name)
1344       else:
1345         feedback_fn("Cluster LVM configuration already in desired"
1346                     " state, not changing")
1347     if self.op.hvparams:
1348       self.cluster.hvparams = self.new_hvparams
1349     if self.op.enabled_hypervisors is not None:
1350       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1351     if self.op.beparams:
1352       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1353     if self.op.candidate_pool_size is not None:
1354       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1355
1356     self.cfg.Update(self.cluster)
1357
1358     # we want to update nodes after the cluster so that if any errors
1359     # happen, we have recorded and saved the cluster info
1360     if self.op.candidate_pool_size is not None:
1361       node_info = self.cfg.GetAllNodesInfo().values()
1362       num_candidates = len([node for node in node_info
1363                             if node.master_candidate])
1364       num_nodes = len(node_info)
1365       if num_candidates < self.op.candidate_pool_size:
1366         random.shuffle(node_info)
1367         for node in node_info:
1368           if num_candidates >= self.op.candidate_pool_size:
1369             break
1370           if node.master_candidate:
1371             continue
1372           node.master_candidate = True
1373           self.LogInfo("Promoting node %s to master candidate", node.name)
1374           self.cfg.Update(node)
1375           self.context.ReaddNode(node)
1376           num_candidates += 1
1377       elif num_candidates > self.op.candidate_pool_size:
1378         self.LogInfo("Note: more nodes are candidates (%d) than the new value"
1379                      " of candidate_pool_size (%d)" %
1380                      (num_candidates, self.op.candidate_pool_size))
1381
1382
1383 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1384   """Sleep and poll for an instance's disk to sync.
1385
1386   """
1387   if not instance.disks:
1388     return True
1389
1390   if not oneshot:
1391     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1392
1393   node = instance.primary_node
1394
1395   for dev in instance.disks:
1396     lu.cfg.SetDiskID(dev, node)
1397
1398   retries = 0
1399   while True:
1400     max_time = 0
1401     done = True
1402     cumul_degraded = False
1403     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1404     if rstats.failed or not rstats.data:
1405       lu.LogWarning("Can't get any data from node %s", node)
1406       retries += 1
1407       if retries >= 10:
1408         raise errors.RemoteError("Can't contact node %s for mirror data,"
1409                                  " aborting." % node)
1410       time.sleep(6)
1411       continue
1412     rstats = rstats.data
1413     retries = 0
1414     for i in range(len(rstats)):
1415       mstat = rstats[i]
1416       if mstat is None:
1417         lu.LogWarning("Can't compute data for node %s/%s",
1418                            node, instance.disks[i].iv_name)
1419         continue
1420       # we ignore the ldisk parameter
1421       perc_done, est_time, is_degraded, _ = mstat
1422       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1423       if perc_done is not None:
1424         done = False
1425         if est_time is not None:
1426           rem_time = "%d estimated seconds remaining" % est_time
1427           max_time = est_time
1428         else:
1429           rem_time = "no time estimate"
1430         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1431                         (instance.disks[i].iv_name, perc_done, rem_time))
1432     if done or oneshot:
1433       break
1434
1435     time.sleep(min(60, max_time))
1436
1437   if done:
1438     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1439   return not cumul_degraded
1440
1441
1442 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1443   """Check that mirrors are not degraded.
1444
1445   The ldisk parameter, if True, will change the test from the
1446   is_degraded attribute (which represents overall non-ok status for
1447   the device(s)) to the ldisk (representing the local storage status).
1448
1449   """
1450   lu.cfg.SetDiskID(dev, node)
1451   if ldisk:
1452     idx = 6
1453   else:
1454     idx = 5
1455
1456   result = True
1457   if on_primary or dev.AssembleOnSecondary():
1458     rstats = lu.rpc.call_blockdev_find(node, dev)
1459     if rstats.failed or not rstats.data:
1460       logging.warning("Node %s: disk degraded, not found or node down", node)
1461       result = False
1462     else:
1463       result = result and (not rstats.data[idx])
1464   if dev.children:
1465     for child in dev.children:
1466       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1467
1468   return result
1469
1470
1471 class LUDiagnoseOS(NoHooksLU):
1472   """Logical unit for OS diagnose/query.
1473
1474   """
1475   _OP_REQP = ["output_fields", "names"]
1476   REQ_BGL = False
1477   _FIELDS_STATIC = utils.FieldSet()
1478   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1479
1480   def ExpandNames(self):
1481     if self.op.names:
1482       raise errors.OpPrereqError("Selective OS query not supported")
1483
1484     _CheckOutputFields(static=self._FIELDS_STATIC,
1485                        dynamic=self._FIELDS_DYNAMIC,
1486                        selected=self.op.output_fields)
1487
1488     # Lock all nodes, in shared mode
1489     self.needed_locks = {}
1490     self.share_locks[locking.LEVEL_NODE] = 1
1491     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1492
1493   def CheckPrereq(self):
1494     """Check prerequisites.
1495
1496     """
1497
1498   @staticmethod
1499   def _DiagnoseByOS(node_list, rlist):
1500     """Remaps a per-node return list into an a per-os per-node dictionary
1501
1502     @param node_list: a list with the names of all nodes
1503     @param rlist: a map with node names as keys and OS objects as values
1504
1505     @rtype: dict
1506     @returns: a dictionary with osnames as keys and as value another map, with
1507         nodes as keys and list of OS objects as values, eg::
1508
1509           {"debian-etch": {"node1": [<object>,...],
1510                            "node2": [<object>,]}
1511           }
1512
1513     """
1514     all_os = {}
1515     for node_name, nr in rlist.iteritems():
1516       if nr.failed or not nr.data:
1517         continue
1518       for os_obj in nr.data:
1519         if os_obj.name not in all_os:
1520           # build a list of nodes for this os containing empty lists
1521           # for each node in node_list
1522           all_os[os_obj.name] = {}
1523           for nname in node_list:
1524             all_os[os_obj.name][nname] = []
1525         all_os[os_obj.name][node_name].append(os_obj)
1526     return all_os
1527
1528   def Exec(self, feedback_fn):
1529     """Compute the list of OSes.
1530
1531     """
1532     node_list = self.acquired_locks[locking.LEVEL_NODE]
1533     node_data = self.rpc.call_os_diagnose(node_list)
1534     if node_data == False:
1535       raise errors.OpExecError("Can't gather the list of OSes")
1536     pol = self._DiagnoseByOS(node_list, node_data)
1537     output = []
1538     for os_name, os_data in pol.iteritems():
1539       row = []
1540       for field in self.op.output_fields:
1541         if field == "name":
1542           val = os_name
1543         elif field == "valid":
1544           val = utils.all([osl and osl[0] for osl in os_data.values()])
1545         elif field == "node_status":
1546           val = {}
1547           for node_name, nos_list in os_data.iteritems():
1548             val[node_name] = [(v.status, v.path) for v in nos_list]
1549         else:
1550           raise errors.ParameterError(field)
1551         row.append(val)
1552       output.append(row)
1553
1554     return output
1555
1556
1557 class LURemoveNode(LogicalUnit):
1558   """Logical unit for removing a node.
1559
1560   """
1561   HPATH = "node-remove"
1562   HTYPE = constants.HTYPE_NODE
1563   _OP_REQP = ["node_name"]
1564
1565   def BuildHooksEnv(self):
1566     """Build hooks env.
1567
1568     This doesn't run on the target node in the pre phase as a failed
1569     node would then be impossible to remove.
1570
1571     """
1572     env = {
1573       "OP_TARGET": self.op.node_name,
1574       "NODE_NAME": self.op.node_name,
1575       }
1576     all_nodes = self.cfg.GetNodeList()
1577     all_nodes.remove(self.op.node_name)
1578     return env, all_nodes, all_nodes
1579
1580   def CheckPrereq(self):
1581     """Check prerequisites.
1582
1583     This checks:
1584      - the node exists in the configuration
1585      - it does not have primary or secondary instances
1586      - it's not the master
1587
1588     Any errors are signalled by raising errors.OpPrereqError.
1589
1590     """
1591     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1592     if node is None:
1593       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1594
1595     instance_list = self.cfg.GetInstanceList()
1596
1597     masternode = self.cfg.GetMasterNode()
1598     if node.name == masternode:
1599       raise errors.OpPrereqError("Node is the master node,"
1600                                  " you need to failover first.")
1601
1602     for instance_name in instance_list:
1603       instance = self.cfg.GetInstanceInfo(instance_name)
1604       if node.name == instance.primary_node:
1605         raise errors.OpPrereqError("Instance %s still running on the node,"
1606                                    " please remove first." % instance_name)
1607       if node.name in instance.secondary_nodes:
1608         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1609                                    " please remove first." % instance_name)
1610     self.op.node_name = node.name
1611     self.node = node
1612
1613   def Exec(self, feedback_fn):
1614     """Removes the node from the cluster.
1615
1616     """
1617     node = self.node
1618     logging.info("Stopping the node daemon and removing configs from node %s",
1619                  node.name)
1620
1621     self.context.RemoveNode(node.name)
1622
1623     self.rpc.call_node_leave_cluster(node.name)
1624
1625     # Promote nodes to master candidate as needed
1626     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1627     node_info = self.cfg.GetAllNodesInfo().values()
1628     num_candidates = len([n for n in node_info
1629                           if n.master_candidate])
1630     num_nodes = len(node_info)
1631     random.shuffle(node_info)
1632     for node in node_info:
1633       if num_candidates >= cp_size or num_candidates >= num_nodes:
1634         break
1635       if node.master_candidate:
1636         continue
1637       node.master_candidate = True
1638       self.LogInfo("Promoting node %s to master candidate", node.name)
1639       self.cfg.Update(node)
1640       self.context.ReaddNode(node)
1641       num_candidates += 1
1642
1643
1644 class LUQueryNodes(NoHooksLU):
1645   """Logical unit for querying nodes.
1646
1647   """
1648   _OP_REQP = ["output_fields", "names"]
1649   REQ_BGL = False
1650   _FIELDS_DYNAMIC = utils.FieldSet(
1651     "dtotal", "dfree",
1652     "mtotal", "mnode", "mfree",
1653     "bootid",
1654     "ctotal",
1655     )
1656
1657   _FIELDS_STATIC = utils.FieldSet(
1658     "name", "pinst_cnt", "sinst_cnt",
1659     "pinst_list", "sinst_list",
1660     "pip", "sip", "tags",
1661     "serial_no",
1662     "master_candidate",
1663     "master",
1664     "offline",
1665     )
1666
1667   def ExpandNames(self):
1668     _CheckOutputFields(static=self._FIELDS_STATIC,
1669                        dynamic=self._FIELDS_DYNAMIC,
1670                        selected=self.op.output_fields)
1671
1672     self.needed_locks = {}
1673     self.share_locks[locking.LEVEL_NODE] = 1
1674
1675     if self.op.names:
1676       self.wanted = _GetWantedNodes(self, self.op.names)
1677     else:
1678       self.wanted = locking.ALL_SET
1679
1680     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1681     if self.do_locking:
1682       # if we don't request only static fields, we need to lock the nodes
1683       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1684
1685
1686   def CheckPrereq(self):
1687     """Check prerequisites.
1688
1689     """
1690     # The validation of the node list is done in the _GetWantedNodes,
1691     # if non empty, and if empty, there's no validation to do
1692     pass
1693
1694   def Exec(self, feedback_fn):
1695     """Computes the list of nodes and their attributes.
1696
1697     """
1698     all_info = self.cfg.GetAllNodesInfo()
1699     if self.do_locking:
1700       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1701     elif self.wanted != locking.ALL_SET:
1702       nodenames = self.wanted
1703       missing = set(nodenames).difference(all_info.keys())
1704       if missing:
1705         raise errors.OpExecError(
1706           "Some nodes were removed before retrieving their data: %s" % missing)
1707     else:
1708       nodenames = all_info.keys()
1709
1710     nodenames = utils.NiceSort(nodenames)
1711     nodelist = [all_info[name] for name in nodenames]
1712
1713     # begin data gathering
1714
1715     if self.do_locking:
1716       live_data = {}
1717       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1718                                           self.cfg.GetHypervisorType())
1719       for name in nodenames:
1720         nodeinfo = node_data[name]
1721         if not nodeinfo.failed and nodeinfo.data:
1722           nodeinfo = nodeinfo.data
1723           fn = utils.TryConvert
1724           live_data[name] = {
1725             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1726             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1727             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1728             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1729             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1730             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1731             "bootid": nodeinfo.get('bootid', None),
1732             }
1733         else:
1734           live_data[name] = {}
1735     else:
1736       live_data = dict.fromkeys(nodenames, {})
1737
1738     node_to_primary = dict([(name, set()) for name in nodenames])
1739     node_to_secondary = dict([(name, set()) for name in nodenames])
1740
1741     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1742                              "sinst_cnt", "sinst_list"))
1743     if inst_fields & frozenset(self.op.output_fields):
1744       instancelist = self.cfg.GetInstanceList()
1745
1746       for instance_name in instancelist:
1747         inst = self.cfg.GetInstanceInfo(instance_name)
1748         if inst.primary_node in node_to_primary:
1749           node_to_primary[inst.primary_node].add(inst.name)
1750         for secnode in inst.secondary_nodes:
1751           if secnode in node_to_secondary:
1752             node_to_secondary[secnode].add(inst.name)
1753
1754     master_node = self.cfg.GetMasterNode()
1755
1756     # end data gathering
1757
1758     output = []
1759     for node in nodelist:
1760       node_output = []
1761       for field in self.op.output_fields:
1762         if field == "name":
1763           val = node.name
1764         elif field == "pinst_list":
1765           val = list(node_to_primary[node.name])
1766         elif field == "sinst_list":
1767           val = list(node_to_secondary[node.name])
1768         elif field == "pinst_cnt":
1769           val = len(node_to_primary[node.name])
1770         elif field == "sinst_cnt":
1771           val = len(node_to_secondary[node.name])
1772         elif field == "pip":
1773           val = node.primary_ip
1774         elif field == "sip":
1775           val = node.secondary_ip
1776         elif field == "tags":
1777           val = list(node.GetTags())
1778         elif field == "serial_no":
1779           val = node.serial_no
1780         elif field == "master_candidate":
1781           val = node.master_candidate
1782         elif field == "master":
1783           val = node.name == master_node
1784         elif field == "offline":
1785           val = node.offline
1786         elif self._FIELDS_DYNAMIC.Matches(field):
1787           val = live_data[node.name].get(field, None)
1788         else:
1789           raise errors.ParameterError(field)
1790         node_output.append(val)
1791       output.append(node_output)
1792
1793     return output
1794
1795
1796 class LUQueryNodeVolumes(NoHooksLU):
1797   """Logical unit for getting volumes on node(s).
1798
1799   """
1800   _OP_REQP = ["nodes", "output_fields"]
1801   REQ_BGL = False
1802   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1803   _FIELDS_STATIC = utils.FieldSet("node")
1804
1805   def ExpandNames(self):
1806     _CheckOutputFields(static=self._FIELDS_STATIC,
1807                        dynamic=self._FIELDS_DYNAMIC,
1808                        selected=self.op.output_fields)
1809
1810     self.needed_locks = {}
1811     self.share_locks[locking.LEVEL_NODE] = 1
1812     if not self.op.nodes:
1813       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1814     else:
1815       self.needed_locks[locking.LEVEL_NODE] = \
1816         _GetWantedNodes(self, self.op.nodes)
1817
1818   def CheckPrereq(self):
1819     """Check prerequisites.
1820
1821     This checks that the fields required are valid output fields.
1822
1823     """
1824     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1825
1826   def Exec(self, feedback_fn):
1827     """Computes the list of nodes and their attributes.
1828
1829     """
1830     nodenames = self.nodes
1831     volumes = self.rpc.call_node_volumes(nodenames)
1832
1833     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1834              in self.cfg.GetInstanceList()]
1835
1836     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1837
1838     output = []
1839     for node in nodenames:
1840       if node not in volumes or volumes[node].failed or not volumes[node].data:
1841         continue
1842
1843       node_vols = volumes[node].data[:]
1844       node_vols.sort(key=lambda vol: vol['dev'])
1845
1846       for vol in node_vols:
1847         node_output = []
1848         for field in self.op.output_fields:
1849           if field == "node":
1850             val = node
1851           elif field == "phys":
1852             val = vol['dev']
1853           elif field == "vg":
1854             val = vol['vg']
1855           elif field == "name":
1856             val = vol['name']
1857           elif field == "size":
1858             val = int(float(vol['size']))
1859           elif field == "instance":
1860             for inst in ilist:
1861               if node not in lv_by_node[inst]:
1862                 continue
1863               if vol['name'] in lv_by_node[inst][node]:
1864                 val = inst.name
1865                 break
1866             else:
1867               val = '-'
1868           else:
1869             raise errors.ParameterError(field)
1870           node_output.append(str(val))
1871
1872         output.append(node_output)
1873
1874     return output
1875
1876
1877 class LUAddNode(LogicalUnit):
1878   """Logical unit for adding node to the cluster.
1879
1880   """
1881   HPATH = "node-add"
1882   HTYPE = constants.HTYPE_NODE
1883   _OP_REQP = ["node_name"]
1884
1885   def BuildHooksEnv(self):
1886     """Build hooks env.
1887
1888     This will run on all nodes before, and on all nodes + the new node after.
1889
1890     """
1891     env = {
1892       "OP_TARGET": self.op.node_name,
1893       "NODE_NAME": self.op.node_name,
1894       "NODE_PIP": self.op.primary_ip,
1895       "NODE_SIP": self.op.secondary_ip,
1896       }
1897     nodes_0 = self.cfg.GetNodeList()
1898     nodes_1 = nodes_0 + [self.op.node_name, ]
1899     return env, nodes_0, nodes_1
1900
1901   def CheckPrereq(self):
1902     """Check prerequisites.
1903
1904     This checks:
1905      - the new node is not already in the config
1906      - it is resolvable
1907      - its parameters (single/dual homed) matches the cluster
1908
1909     Any errors are signalled by raising errors.OpPrereqError.
1910
1911     """
1912     node_name = self.op.node_name
1913     cfg = self.cfg
1914
1915     dns_data = utils.HostInfo(node_name)
1916
1917     node = dns_data.name
1918     primary_ip = self.op.primary_ip = dns_data.ip
1919     secondary_ip = getattr(self.op, "secondary_ip", None)
1920     if secondary_ip is None:
1921       secondary_ip = primary_ip
1922     if not utils.IsValidIP(secondary_ip):
1923       raise errors.OpPrereqError("Invalid secondary IP given")
1924     self.op.secondary_ip = secondary_ip
1925
1926     node_list = cfg.GetNodeList()
1927     if not self.op.readd and node in node_list:
1928       raise errors.OpPrereqError("Node %s is already in the configuration" %
1929                                  node)
1930     elif self.op.readd and node not in node_list:
1931       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1932
1933     for existing_node_name in node_list:
1934       existing_node = cfg.GetNodeInfo(existing_node_name)
1935
1936       if self.op.readd and node == existing_node_name:
1937         if (existing_node.primary_ip != primary_ip or
1938             existing_node.secondary_ip != secondary_ip):
1939           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1940                                      " address configuration as before")
1941         continue
1942
1943       if (existing_node.primary_ip == primary_ip or
1944           existing_node.secondary_ip == primary_ip or
1945           existing_node.primary_ip == secondary_ip or
1946           existing_node.secondary_ip == secondary_ip):
1947         raise errors.OpPrereqError("New node ip address(es) conflict with"
1948                                    " existing node %s" % existing_node.name)
1949
1950     # check that the type of the node (single versus dual homed) is the
1951     # same as for the master
1952     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1953     master_singlehomed = myself.secondary_ip == myself.primary_ip
1954     newbie_singlehomed = secondary_ip == primary_ip
1955     if master_singlehomed != newbie_singlehomed:
1956       if master_singlehomed:
1957         raise errors.OpPrereqError("The master has no private ip but the"
1958                                    " new node has one")
1959       else:
1960         raise errors.OpPrereqError("The master has a private ip but the"
1961                                    " new node doesn't have one")
1962
1963     # checks reachablity
1964     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1965       raise errors.OpPrereqError("Node not reachable by ping")
1966
1967     if not newbie_singlehomed:
1968       # check reachability from my secondary ip to newbie's secondary ip
1969       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1970                            source=myself.secondary_ip):
1971         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1972                                    " based ping to noded port")
1973
1974     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1975     node_info = self.cfg.GetAllNodesInfo().values()
1976     num_candidates = len([n for n in node_info
1977                           if n.master_candidate])
1978     master_candidate = num_candidates < cp_size
1979
1980     self.new_node = objects.Node(name=node,
1981                                  primary_ip=primary_ip,
1982                                  secondary_ip=secondary_ip,
1983                                  master_candidate=master_candidate,
1984                                  offline=False)
1985
1986   def Exec(self, feedback_fn):
1987     """Adds the new node to the cluster.
1988
1989     """
1990     new_node = self.new_node
1991     node = new_node.name
1992
1993     # check connectivity
1994     result = self.rpc.call_version([node])[node]
1995     result.Raise()
1996     if result.data:
1997       if constants.PROTOCOL_VERSION == result.data:
1998         logging.info("Communication to node %s fine, sw version %s match",
1999                      node, result.data)
2000       else:
2001         raise errors.OpExecError("Version mismatch master version %s,"
2002                                  " node version %s" %
2003                                  (constants.PROTOCOL_VERSION, result.data))
2004     else:
2005       raise errors.OpExecError("Cannot get version from the new node")
2006
2007     # setup ssh on node
2008     logging.info("Copy ssh key to node %s", node)
2009     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2010     keyarray = []
2011     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2012                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2013                 priv_key, pub_key]
2014
2015     for i in keyfiles:
2016       f = open(i, 'r')
2017       try:
2018         keyarray.append(f.read())
2019       finally:
2020         f.close()
2021
2022     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2023                                     keyarray[2],
2024                                     keyarray[3], keyarray[4], keyarray[5])
2025
2026     if result.failed or not result.data:
2027       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2028
2029     # Add node to our /etc/hosts, and add key to known_hosts
2030     utils.AddHostToEtcHosts(new_node.name)
2031
2032     if new_node.secondary_ip != new_node.primary_ip:
2033       result = self.rpc.call_node_has_ip_address(new_node.name,
2034                                                  new_node.secondary_ip)
2035       if result.failed or not result.data:
2036         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2037                                  " you gave (%s). Please fix and re-run this"
2038                                  " command." % new_node.secondary_ip)
2039
2040     node_verify_list = [self.cfg.GetMasterNode()]
2041     node_verify_param = {
2042       'nodelist': [node],
2043       # TODO: do a node-net-test as well?
2044     }
2045
2046     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2047                                        self.cfg.GetClusterName())
2048     for verifier in node_verify_list:
2049       if result[verifier].failed or not result[verifier].data:
2050         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2051                                  " for remote verification" % verifier)
2052       if result[verifier].data['nodelist']:
2053         for failed in result[verifier].data['nodelist']:
2054           feedback_fn("ssh/hostname verification failed %s -> %s" %
2055                       (verifier, result[verifier]['nodelist'][failed]))
2056         raise errors.OpExecError("ssh/hostname verification failed.")
2057
2058     # Distribute updated /etc/hosts and known_hosts to all nodes,
2059     # including the node just added
2060     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2061     dist_nodes = self.cfg.GetNodeList()
2062     if not self.op.readd:
2063       dist_nodes.append(node)
2064     if myself.name in dist_nodes:
2065       dist_nodes.remove(myself.name)
2066
2067     logging.debug("Copying hosts and known_hosts to all nodes")
2068     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2069       result = self.rpc.call_upload_file(dist_nodes, fname)
2070       for to_node, to_result in result.iteritems():
2071         if to_result.failed or not to_result.data:
2072           logging.error("Copy of file %s to node %s failed", fname, to_node)
2073
2074     to_copy = []
2075     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2076       to_copy.append(constants.VNC_PASSWORD_FILE)
2077     for fname in to_copy:
2078       result = self.rpc.call_upload_file([node], fname)
2079       if result[node].failed or not result[node]:
2080         logging.error("Could not copy file %s to node %s", fname, node)
2081
2082     if self.op.readd:
2083       self.context.ReaddNode(new_node)
2084     else:
2085       self.context.AddNode(new_node)
2086
2087
2088 class LUSetNodeParams(LogicalUnit):
2089   """Modifies the parameters of a node.
2090
2091   """
2092   HPATH = "node-modify"
2093   HTYPE = constants.HTYPE_NODE
2094   _OP_REQP = ["node_name"]
2095   REQ_BGL = False
2096
2097   def CheckArguments(self):
2098     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2099     if node_name is None:
2100       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2101     self.op.node_name = node_name
2102     if not hasattr(self.op, 'master_candidate'):
2103       raise errors.OpPrereqError("Please pass at least one modification")
2104     self.op.master_candidate = bool(self.op.master_candidate)
2105
2106   def ExpandNames(self):
2107     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2108
2109   def BuildHooksEnv(self):
2110     """Build hooks env.
2111
2112     This runs on the master node.
2113
2114     """
2115     env = {
2116       "OP_TARGET": self.op.node_name,
2117       "MASTER_CANDIDATE": str(self.op.master_candidate),
2118       }
2119     nl = [self.cfg.GetMasterNode(),
2120           self.op.node_name]
2121     return env, nl, nl
2122
2123   def CheckPrereq(self):
2124     """Check prerequisites.
2125
2126     This only checks the instance list against the existing names.
2127
2128     """
2129     force = self.force = self.op.force
2130
2131     if self.op.master_candidate == False:
2132       if self.op.node_name == self.cfg.GetMasterNode():
2133         raise errors.OpPrereqError("The master node has to be a"
2134                                    " master candidate")
2135       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2136       node_info = self.cfg.GetAllNodesInfo().values()
2137       num_candidates = len([node for node in node_info
2138                             if node.master_candidate])
2139       if num_candidates <= cp_size:
2140         msg = ("Not enough master candidates (desired"
2141                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2142         if force:
2143           self.LogWarning(msg)
2144         else:
2145           raise errors.OpPrereqError(msg)
2146
2147     return
2148
2149   def Exec(self, feedback_fn):
2150     """Modifies a node.
2151
2152     """
2153     node = self.cfg.GetNodeInfo(self.op.node_name)
2154
2155     result = []
2156
2157     if self.op.master_candidate is not None:
2158       node.master_candidate = self.op.master_candidate
2159       result.append(("master_candidate", str(self.op.master_candidate)))
2160
2161     # this will trigger configuration file update, if needed
2162     self.cfg.Update(node)
2163     # this will trigger job queue propagation or cleanup
2164     if self.op.node_name != self.cfg.GetMasterNode():
2165       self.context.ReaddNode(node)
2166
2167     return result
2168
2169
2170 class LUQueryClusterInfo(NoHooksLU):
2171   """Query cluster configuration.
2172
2173   """
2174   _OP_REQP = []
2175   REQ_BGL = False
2176
2177   def ExpandNames(self):
2178     self.needed_locks = {}
2179
2180   def CheckPrereq(self):
2181     """No prerequsites needed for this LU.
2182
2183     """
2184     pass
2185
2186   def Exec(self, feedback_fn):
2187     """Return cluster config.
2188
2189     """
2190     cluster = self.cfg.GetClusterInfo()
2191     result = {
2192       "software_version": constants.RELEASE_VERSION,
2193       "protocol_version": constants.PROTOCOL_VERSION,
2194       "config_version": constants.CONFIG_VERSION,
2195       "os_api_version": constants.OS_API_VERSION,
2196       "export_version": constants.EXPORT_VERSION,
2197       "architecture": (platform.architecture()[0], platform.machine()),
2198       "name": cluster.cluster_name,
2199       "master": cluster.master_node,
2200       "default_hypervisor": cluster.default_hypervisor,
2201       "enabled_hypervisors": cluster.enabled_hypervisors,
2202       "hvparams": cluster.hvparams,
2203       "beparams": cluster.beparams,
2204       "candidate_pool_size": cluster.candidate_pool_size,
2205       }
2206
2207     return result
2208
2209
2210 class LUQueryConfigValues(NoHooksLU):
2211   """Return configuration values.
2212
2213   """
2214   _OP_REQP = []
2215   REQ_BGL = False
2216   _FIELDS_DYNAMIC = utils.FieldSet()
2217   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2218
2219   def ExpandNames(self):
2220     self.needed_locks = {}
2221
2222     _CheckOutputFields(static=self._FIELDS_STATIC,
2223                        dynamic=self._FIELDS_DYNAMIC,
2224                        selected=self.op.output_fields)
2225
2226   def CheckPrereq(self):
2227     """No prerequisites.
2228
2229     """
2230     pass
2231
2232   def Exec(self, feedback_fn):
2233     """Dump a representation of the cluster config to the standard output.
2234
2235     """
2236     values = []
2237     for field in self.op.output_fields:
2238       if field == "cluster_name":
2239         entry = self.cfg.GetClusterName()
2240       elif field == "master_node":
2241         entry = self.cfg.GetMasterNode()
2242       elif field == "drain_flag":
2243         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2244       else:
2245         raise errors.ParameterError(field)
2246       values.append(entry)
2247     return values
2248
2249
2250 class LUActivateInstanceDisks(NoHooksLU):
2251   """Bring up an instance's disks.
2252
2253   """
2254   _OP_REQP = ["instance_name"]
2255   REQ_BGL = False
2256
2257   def ExpandNames(self):
2258     self._ExpandAndLockInstance()
2259     self.needed_locks[locking.LEVEL_NODE] = []
2260     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2261
2262   def DeclareLocks(self, level):
2263     if level == locking.LEVEL_NODE:
2264       self._LockInstancesNodes()
2265
2266   def CheckPrereq(self):
2267     """Check prerequisites.
2268
2269     This checks that the instance is in the cluster.
2270
2271     """
2272     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2273     assert self.instance is not None, \
2274       "Cannot retrieve locked instance %s" % self.op.instance_name
2275
2276   def Exec(self, feedback_fn):
2277     """Activate the disks.
2278
2279     """
2280     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2281     if not disks_ok:
2282       raise errors.OpExecError("Cannot activate block devices")
2283
2284     return disks_info
2285
2286
2287 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2288   """Prepare the block devices for an instance.
2289
2290   This sets up the block devices on all nodes.
2291
2292   @type lu: L{LogicalUnit}
2293   @param lu: the logical unit on whose behalf we execute
2294   @type instance: L{objects.Instance}
2295   @param instance: the instance for whose disks we assemble
2296   @type ignore_secondaries: boolean
2297   @param ignore_secondaries: if true, errors on secondary nodes
2298       won't result in an error return from the function
2299   @return: False if the operation failed, otherwise a list of
2300       (host, instance_visible_name, node_visible_name)
2301       with the mapping from node devices to instance devices
2302
2303   """
2304   device_info = []
2305   disks_ok = True
2306   iname = instance.name
2307   # With the two passes mechanism we try to reduce the window of
2308   # opportunity for the race condition of switching DRBD to primary
2309   # before handshaking occured, but we do not eliminate it
2310
2311   # The proper fix would be to wait (with some limits) until the
2312   # connection has been made and drbd transitions from WFConnection
2313   # into any other network-connected state (Connected, SyncTarget,
2314   # SyncSource, etc.)
2315
2316   # 1st pass, assemble on all nodes in secondary mode
2317   for inst_disk in instance.disks:
2318     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2319       lu.cfg.SetDiskID(node_disk, node)
2320       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2321       if result.failed or not result:
2322         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2323                            " (is_primary=False, pass=1)",
2324                            inst_disk.iv_name, node)
2325         if not ignore_secondaries:
2326           disks_ok = False
2327
2328   # FIXME: race condition on drbd migration to primary
2329
2330   # 2nd pass, do only the primary node
2331   for inst_disk in instance.disks:
2332     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2333       if node != instance.primary_node:
2334         continue
2335       lu.cfg.SetDiskID(node_disk, node)
2336       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2337       if result.failed or not result:
2338         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2339                            " (is_primary=True, pass=2)",
2340                            inst_disk.iv_name, node)
2341         disks_ok = False
2342     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2343
2344   # leave the disks configured for the primary node
2345   # this is a workaround that would be fixed better by
2346   # improving the logical/physical id handling
2347   for disk in instance.disks:
2348     lu.cfg.SetDiskID(disk, instance.primary_node)
2349
2350   return disks_ok, device_info
2351
2352
2353 def _StartInstanceDisks(lu, instance, force):
2354   """Start the disks of an instance.
2355
2356   """
2357   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2358                                            ignore_secondaries=force)
2359   if not disks_ok:
2360     _ShutdownInstanceDisks(lu, instance)
2361     if force is not None and not force:
2362       lu.proc.LogWarning("", hint="If the message above refers to a"
2363                          " secondary node,"
2364                          " you can retry the operation using '--force'.")
2365     raise errors.OpExecError("Disk consistency error")
2366
2367
2368 class LUDeactivateInstanceDisks(NoHooksLU):
2369   """Shutdown an instance's disks.
2370
2371   """
2372   _OP_REQP = ["instance_name"]
2373   REQ_BGL = False
2374
2375   def ExpandNames(self):
2376     self._ExpandAndLockInstance()
2377     self.needed_locks[locking.LEVEL_NODE] = []
2378     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2379
2380   def DeclareLocks(self, level):
2381     if level == locking.LEVEL_NODE:
2382       self._LockInstancesNodes()
2383
2384   def CheckPrereq(self):
2385     """Check prerequisites.
2386
2387     This checks that the instance is in the cluster.
2388
2389     """
2390     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2391     assert self.instance is not None, \
2392       "Cannot retrieve locked instance %s" % self.op.instance_name
2393
2394   def Exec(self, feedback_fn):
2395     """Deactivate the disks
2396
2397     """
2398     instance = self.instance
2399     _SafeShutdownInstanceDisks(self, instance)
2400
2401
2402 def _SafeShutdownInstanceDisks(lu, instance):
2403   """Shutdown block devices of an instance.
2404
2405   This function checks if an instance is running, before calling
2406   _ShutdownInstanceDisks.
2407
2408   """
2409   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2410                                       [instance.hypervisor])
2411   ins_l = ins_l[instance.primary_node]
2412   if ins_l.failed or not isinstance(ins_l.data, list):
2413     raise errors.OpExecError("Can't contact node '%s'" %
2414                              instance.primary_node)
2415
2416   if instance.name in ins_l.data:
2417     raise errors.OpExecError("Instance is running, can't shutdown"
2418                              " block devices.")
2419
2420   _ShutdownInstanceDisks(lu, instance)
2421
2422
2423 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2424   """Shutdown block devices of an instance.
2425
2426   This does the shutdown on all nodes of the instance.
2427
2428   If the ignore_primary is false, errors on the primary node are
2429   ignored.
2430
2431   """
2432   result = True
2433   for disk in instance.disks:
2434     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2435       lu.cfg.SetDiskID(top_disk, node)
2436       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2437       if result.failed or not result.data:
2438         logging.error("Could not shutdown block device %s on node %s",
2439                       disk.iv_name, node)
2440         if not ignore_primary or node != instance.primary_node:
2441           result = False
2442   return result
2443
2444
2445 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2446   """Checks if a node has enough free memory.
2447
2448   This function check if a given node has the needed amount of free
2449   memory. In case the node has less memory or we cannot get the
2450   information from the node, this function raise an OpPrereqError
2451   exception.
2452
2453   @type lu: C{LogicalUnit}
2454   @param lu: a logical unit from which we get configuration data
2455   @type node: C{str}
2456   @param node: the node to check
2457   @type reason: C{str}
2458   @param reason: string to use in the error message
2459   @type requested: C{int}
2460   @param requested: the amount of memory in MiB to check for
2461   @type hypervisor: C{str}
2462   @param hypervisor: the hypervisor to ask for memory stats
2463   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2464       we cannot check the node
2465
2466   """
2467   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2468   nodeinfo[node].Raise()
2469   free_mem = nodeinfo[node].data.get('memory_free')
2470   if not isinstance(free_mem, int):
2471     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2472                              " was '%s'" % (node, free_mem))
2473   if requested > free_mem:
2474     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2475                              " needed %s MiB, available %s MiB" %
2476                              (node, reason, requested, free_mem))
2477
2478
2479 class LUStartupInstance(LogicalUnit):
2480   """Starts an instance.
2481
2482   """
2483   HPATH = "instance-start"
2484   HTYPE = constants.HTYPE_INSTANCE
2485   _OP_REQP = ["instance_name", "force"]
2486   REQ_BGL = False
2487
2488   def ExpandNames(self):
2489     self._ExpandAndLockInstance()
2490
2491   def BuildHooksEnv(self):
2492     """Build hooks env.
2493
2494     This runs on master, primary and secondary nodes of the instance.
2495
2496     """
2497     env = {
2498       "FORCE": self.op.force,
2499       }
2500     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2501     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2502           list(self.instance.secondary_nodes))
2503     return env, nl, nl
2504
2505   def CheckPrereq(self):
2506     """Check prerequisites.
2507
2508     This checks that the instance is in the cluster.
2509
2510     """
2511     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2512     assert self.instance is not None, \
2513       "Cannot retrieve locked instance %s" % self.op.instance_name
2514
2515     bep = self.cfg.GetClusterInfo().FillBE(instance)
2516     # check bridges existance
2517     _CheckInstanceBridgesExist(self, instance)
2518
2519     _CheckNodeFreeMemory(self, instance.primary_node,
2520                          "starting instance %s" % instance.name,
2521                          bep[constants.BE_MEMORY], instance.hypervisor)
2522
2523   def Exec(self, feedback_fn):
2524     """Start the instance.
2525
2526     """
2527     instance = self.instance
2528     force = self.op.force
2529     extra_args = getattr(self.op, "extra_args", "")
2530
2531     self.cfg.MarkInstanceUp(instance.name)
2532
2533     node_current = instance.primary_node
2534
2535     _StartInstanceDisks(self, instance, force)
2536
2537     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2538     if result.failed or not result.data:
2539       _ShutdownInstanceDisks(self, instance)
2540       raise errors.OpExecError("Could not start instance")
2541
2542
2543 class LURebootInstance(LogicalUnit):
2544   """Reboot an instance.
2545
2546   """
2547   HPATH = "instance-reboot"
2548   HTYPE = constants.HTYPE_INSTANCE
2549   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2550   REQ_BGL = False
2551
2552   def ExpandNames(self):
2553     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2554                                    constants.INSTANCE_REBOOT_HARD,
2555                                    constants.INSTANCE_REBOOT_FULL]:
2556       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2557                                   (constants.INSTANCE_REBOOT_SOFT,
2558                                    constants.INSTANCE_REBOOT_HARD,
2559                                    constants.INSTANCE_REBOOT_FULL))
2560     self._ExpandAndLockInstance()
2561
2562   def BuildHooksEnv(self):
2563     """Build hooks env.
2564
2565     This runs on master, primary and secondary nodes of the instance.
2566
2567     """
2568     env = {
2569       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2570       }
2571     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2572     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2573           list(self.instance.secondary_nodes))
2574     return env, nl, nl
2575
2576   def CheckPrereq(self):
2577     """Check prerequisites.
2578
2579     This checks that the instance is in the cluster.
2580
2581     """
2582     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2583     assert self.instance is not None, \
2584       "Cannot retrieve locked instance %s" % self.op.instance_name
2585
2586     # check bridges existance
2587     _CheckInstanceBridgesExist(self, instance)
2588
2589   def Exec(self, feedback_fn):
2590     """Reboot the instance.
2591
2592     """
2593     instance = self.instance
2594     ignore_secondaries = self.op.ignore_secondaries
2595     reboot_type = self.op.reboot_type
2596     extra_args = getattr(self.op, "extra_args", "")
2597
2598     node_current = instance.primary_node
2599
2600     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2601                        constants.INSTANCE_REBOOT_HARD]:
2602       result = self.rpc.call_instance_reboot(node_current, instance,
2603                                              reboot_type, extra_args)
2604       if result.failed or not result.data:
2605         raise errors.OpExecError("Could not reboot instance")
2606     else:
2607       if not self.rpc.call_instance_shutdown(node_current, instance):
2608         raise errors.OpExecError("could not shutdown instance for full reboot")
2609       _ShutdownInstanceDisks(self, instance)
2610       _StartInstanceDisks(self, instance, ignore_secondaries)
2611       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2612       if result.failed or not result.data:
2613         _ShutdownInstanceDisks(self, instance)
2614         raise errors.OpExecError("Could not start instance for full reboot")
2615
2616     self.cfg.MarkInstanceUp(instance.name)
2617
2618
2619 class LUShutdownInstance(LogicalUnit):
2620   """Shutdown an instance.
2621
2622   """
2623   HPATH = "instance-stop"
2624   HTYPE = constants.HTYPE_INSTANCE
2625   _OP_REQP = ["instance_name"]
2626   REQ_BGL = False
2627
2628   def ExpandNames(self):
2629     self._ExpandAndLockInstance()
2630
2631   def BuildHooksEnv(self):
2632     """Build hooks env.
2633
2634     This runs on master, primary and secondary nodes of the instance.
2635
2636     """
2637     env = _BuildInstanceHookEnvByObject(self, self.instance)
2638     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2639           list(self.instance.secondary_nodes))
2640     return env, nl, nl
2641
2642   def CheckPrereq(self):
2643     """Check prerequisites.
2644
2645     This checks that the instance is in the cluster.
2646
2647     """
2648     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2649     assert self.instance is not None, \
2650       "Cannot retrieve locked instance %s" % self.op.instance_name
2651
2652   def Exec(self, feedback_fn):
2653     """Shutdown the instance.
2654
2655     """
2656     instance = self.instance
2657     node_current = instance.primary_node
2658     self.cfg.MarkInstanceDown(instance.name)
2659     result = self.rpc.call_instance_shutdown(node_current, instance)
2660     if result.failed or not result.data:
2661       self.proc.LogWarning("Could not shutdown instance")
2662
2663     _ShutdownInstanceDisks(self, instance)
2664
2665
2666 class LUReinstallInstance(LogicalUnit):
2667   """Reinstall an instance.
2668
2669   """
2670   HPATH = "instance-reinstall"
2671   HTYPE = constants.HTYPE_INSTANCE
2672   _OP_REQP = ["instance_name"]
2673   REQ_BGL = False
2674
2675   def ExpandNames(self):
2676     self._ExpandAndLockInstance()
2677
2678   def BuildHooksEnv(self):
2679     """Build hooks env.
2680
2681     This runs on master, primary and secondary nodes of the instance.
2682
2683     """
2684     env = _BuildInstanceHookEnvByObject(self, self.instance)
2685     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2686           list(self.instance.secondary_nodes))
2687     return env, nl, nl
2688
2689   def CheckPrereq(self):
2690     """Check prerequisites.
2691
2692     This checks that the instance is in the cluster and is not running.
2693
2694     """
2695     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2696     assert instance is not None, \
2697       "Cannot retrieve locked instance %s" % self.op.instance_name
2698
2699     if instance.disk_template == constants.DT_DISKLESS:
2700       raise errors.OpPrereqError("Instance '%s' has no disks" %
2701                                  self.op.instance_name)
2702     if instance.status != "down":
2703       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2704                                  self.op.instance_name)
2705     remote_info = self.rpc.call_instance_info(instance.primary_node,
2706                                               instance.name,
2707                                               instance.hypervisor)
2708     if remote_info.failed or remote_info.data:
2709       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2710                                  (self.op.instance_name,
2711                                   instance.primary_node))
2712
2713     self.op.os_type = getattr(self.op, "os_type", None)
2714     if self.op.os_type is not None:
2715       # OS verification
2716       pnode = self.cfg.GetNodeInfo(
2717         self.cfg.ExpandNodeName(instance.primary_node))
2718       if pnode is None:
2719         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2720                                    self.op.pnode)
2721       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2722       result.Raise()
2723       if not isinstance(result.data, objects.OS):
2724         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2725                                    " primary node"  % self.op.os_type)
2726
2727     self.instance = instance
2728
2729   def Exec(self, feedback_fn):
2730     """Reinstall the instance.
2731
2732     """
2733     inst = self.instance
2734
2735     if self.op.os_type is not None:
2736       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2737       inst.os = self.op.os_type
2738       self.cfg.Update(inst)
2739
2740     _StartInstanceDisks(self, inst, None)
2741     try:
2742       feedback_fn("Running the instance OS create scripts...")
2743       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2744       result.Raise()
2745       if not result.data:
2746         raise errors.OpExecError("Could not install OS for instance %s"
2747                                  " on node %s" %
2748                                  (inst.name, inst.primary_node))
2749     finally:
2750       _ShutdownInstanceDisks(self, inst)
2751
2752
2753 class LURenameInstance(LogicalUnit):
2754   """Rename an instance.
2755
2756   """
2757   HPATH = "instance-rename"
2758   HTYPE = constants.HTYPE_INSTANCE
2759   _OP_REQP = ["instance_name", "new_name"]
2760
2761   def BuildHooksEnv(self):
2762     """Build hooks env.
2763
2764     This runs on master, primary and secondary nodes of the instance.
2765
2766     """
2767     env = _BuildInstanceHookEnvByObject(self, self.instance)
2768     env["INSTANCE_NEW_NAME"] = self.op.new_name
2769     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2770           list(self.instance.secondary_nodes))
2771     return env, nl, nl
2772
2773   def CheckPrereq(self):
2774     """Check prerequisites.
2775
2776     This checks that the instance is in the cluster and is not running.
2777
2778     """
2779     instance = self.cfg.GetInstanceInfo(
2780       self.cfg.ExpandInstanceName(self.op.instance_name))
2781     if instance is None:
2782       raise errors.OpPrereqError("Instance '%s' not known" %
2783                                  self.op.instance_name)
2784     if instance.status != "down":
2785       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2786                                  self.op.instance_name)
2787     remote_info = self.rpc.call_instance_info(instance.primary_node,
2788                                               instance.name,
2789                                               instance.hypervisor)
2790     remote_info.Raise()
2791     if remote_info.data:
2792       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2793                                  (self.op.instance_name,
2794                                   instance.primary_node))
2795     self.instance = instance
2796
2797     # new name verification
2798     name_info = utils.HostInfo(self.op.new_name)
2799
2800     self.op.new_name = new_name = name_info.name
2801     instance_list = self.cfg.GetInstanceList()
2802     if new_name in instance_list:
2803       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2804                                  new_name)
2805
2806     if not getattr(self.op, "ignore_ip", False):
2807       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2808         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2809                                    (name_info.ip, new_name))
2810
2811
2812   def Exec(self, feedback_fn):
2813     """Reinstall the instance.
2814
2815     """
2816     inst = self.instance
2817     old_name = inst.name
2818
2819     if inst.disk_template == constants.DT_FILE:
2820       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2821
2822     self.cfg.RenameInstance(inst.name, self.op.new_name)
2823     # Change the instance lock. This is definitely safe while we hold the BGL
2824     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2825     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2826
2827     # re-read the instance from the configuration after rename
2828     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2829
2830     if inst.disk_template == constants.DT_FILE:
2831       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2832       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2833                                                      old_file_storage_dir,
2834                                                      new_file_storage_dir)
2835       result.Raise()
2836       if not result.data:
2837         raise errors.OpExecError("Could not connect to node '%s' to rename"
2838                                  " directory '%s' to '%s' (but the instance"
2839                                  " has been renamed in Ganeti)" % (
2840                                  inst.primary_node, old_file_storage_dir,
2841                                  new_file_storage_dir))
2842
2843       if not result.data[0]:
2844         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2845                                  " (but the instance has been renamed in"
2846                                  " Ganeti)" % (old_file_storage_dir,
2847                                                new_file_storage_dir))
2848
2849     _StartInstanceDisks(self, inst, None)
2850     try:
2851       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2852                                                  old_name)
2853       if result.failed or not result.data:
2854         msg = ("Could not run OS rename script for instance %s on node %s"
2855                " (but the instance has been renamed in Ganeti)" %
2856                (inst.name, inst.primary_node))
2857         self.proc.LogWarning(msg)
2858     finally:
2859       _ShutdownInstanceDisks(self, inst)
2860
2861
2862 class LURemoveInstance(LogicalUnit):
2863   """Remove an instance.
2864
2865   """
2866   HPATH = "instance-remove"
2867   HTYPE = constants.HTYPE_INSTANCE
2868   _OP_REQP = ["instance_name", "ignore_failures"]
2869   REQ_BGL = False
2870
2871   def ExpandNames(self):
2872     self._ExpandAndLockInstance()
2873     self.needed_locks[locking.LEVEL_NODE] = []
2874     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2875
2876   def DeclareLocks(self, level):
2877     if level == locking.LEVEL_NODE:
2878       self._LockInstancesNodes()
2879
2880   def BuildHooksEnv(self):
2881     """Build hooks env.
2882
2883     This runs on master, primary and secondary nodes of the instance.
2884
2885     """
2886     env = _BuildInstanceHookEnvByObject(self, self.instance)
2887     nl = [self.cfg.GetMasterNode()]
2888     return env, nl, nl
2889
2890   def CheckPrereq(self):
2891     """Check prerequisites.
2892
2893     This checks that the instance is in the cluster.
2894
2895     """
2896     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2897     assert self.instance is not None, \
2898       "Cannot retrieve locked instance %s" % self.op.instance_name
2899
2900   def Exec(self, feedback_fn):
2901     """Remove the instance.
2902
2903     """
2904     instance = self.instance
2905     logging.info("Shutting down instance %s on node %s",
2906                  instance.name, instance.primary_node)
2907
2908     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2909     if result.failed or not result.data:
2910       if self.op.ignore_failures:
2911         feedback_fn("Warning: can't shutdown instance")
2912       else:
2913         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2914                                  (instance.name, instance.primary_node))
2915
2916     logging.info("Removing block devices for instance %s", instance.name)
2917
2918     if not _RemoveDisks(self, instance):
2919       if self.op.ignore_failures:
2920         feedback_fn("Warning: can't remove instance's disks")
2921       else:
2922         raise errors.OpExecError("Can't remove instance's disks")
2923
2924     logging.info("Removing instance %s out of cluster config", instance.name)
2925
2926     self.cfg.RemoveInstance(instance.name)
2927     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2928
2929
2930 class LUQueryInstances(NoHooksLU):
2931   """Logical unit for querying instances.
2932
2933   """
2934   _OP_REQP = ["output_fields", "names"]
2935   REQ_BGL = False
2936   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2937                                     "admin_state", "admin_ram",
2938                                     "disk_template", "ip", "mac", "bridge",
2939                                     "sda_size", "sdb_size", "vcpus", "tags",
2940                                     "network_port", "beparams",
2941                                     "(disk).(size)/([0-9]+)",
2942                                     "(disk).(sizes)",
2943                                     "(nic).(mac|ip|bridge)/([0-9]+)",
2944                                     "(nic).(macs|ips|bridges)",
2945                                     "(disk|nic).(count)",
2946                                     "serial_no", "hypervisor", "hvparams",] +
2947                                   ["hv/%s" % name
2948                                    for name in constants.HVS_PARAMETERS] +
2949                                   ["be/%s" % name
2950                                    for name in constants.BES_PARAMETERS])
2951   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2952
2953
2954   def ExpandNames(self):
2955     _CheckOutputFields(static=self._FIELDS_STATIC,
2956                        dynamic=self._FIELDS_DYNAMIC,
2957                        selected=self.op.output_fields)
2958
2959     self.needed_locks = {}
2960     self.share_locks[locking.LEVEL_INSTANCE] = 1
2961     self.share_locks[locking.LEVEL_NODE] = 1
2962
2963     if self.op.names:
2964       self.wanted = _GetWantedInstances(self, self.op.names)
2965     else:
2966       self.wanted = locking.ALL_SET
2967
2968     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2969     if self.do_locking:
2970       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2971       self.needed_locks[locking.LEVEL_NODE] = []
2972       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2973
2974   def DeclareLocks(self, level):
2975     if level == locking.LEVEL_NODE and self.do_locking:
2976       self._LockInstancesNodes()
2977
2978   def CheckPrereq(self):
2979     """Check prerequisites.
2980
2981     """
2982     pass
2983
2984   def Exec(self, feedback_fn):
2985     """Computes the list of nodes and their attributes.
2986
2987     """
2988     all_info = self.cfg.GetAllInstancesInfo()
2989     if self.do_locking:
2990       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2991     elif self.wanted != locking.ALL_SET:
2992       instance_names = self.wanted
2993       missing = set(instance_names).difference(all_info.keys())
2994       if missing:
2995         raise errors.OpExecError(
2996           "Some instances were removed before retrieving their data: %s"
2997           % missing)
2998     else:
2999       instance_names = all_info.keys()
3000
3001     instance_names = utils.NiceSort(instance_names)
3002     instance_list = [all_info[iname] for iname in instance_names]
3003
3004     # begin data gathering
3005
3006     nodes = frozenset([inst.primary_node for inst in instance_list])
3007     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3008
3009     bad_nodes = []
3010     off_nodes = []
3011     if self.do_locking:
3012       live_data = {}
3013       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3014       for name in nodes:
3015         result = node_data[name]
3016         if result.offline:
3017           # offline nodes will be in both lists
3018           off_nodes.append(name)
3019         if result.failed:
3020           bad_nodes.append(name)
3021         else:
3022           if result.data:
3023             live_data.update(result.data)
3024             # else no instance is alive
3025     else:
3026       live_data = dict([(name, {}) for name in instance_names])
3027
3028     # end data gathering
3029
3030     HVPREFIX = "hv/"
3031     BEPREFIX = "be/"
3032     output = []
3033     for instance in instance_list:
3034       iout = []
3035       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3036       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3037       for field in self.op.output_fields:
3038         st_match = self._FIELDS_STATIC.Matches(field)
3039         if field == "name":
3040           val = instance.name
3041         elif field == "os":
3042           val = instance.os
3043         elif field == "pnode":
3044           val = instance.primary_node
3045         elif field == "snodes":
3046           val = list(instance.secondary_nodes)
3047         elif field == "admin_state":
3048           val = (instance.status != "down")
3049         elif field == "oper_state":
3050           if instance.primary_node in bad_nodes:
3051             val = None
3052           else:
3053             val = bool(live_data.get(instance.name))
3054         elif field == "status":
3055           if instance.primary_node in off_nodes:
3056             val = "ERROR_nodeoffline"
3057           elif instance.primary_node in bad_nodes:
3058             val = "ERROR_nodedown"
3059           else:
3060             running = bool(live_data.get(instance.name))
3061             if running:
3062               if instance.status != "down":
3063                 val = "running"
3064               else:
3065                 val = "ERROR_up"
3066             else:
3067               if instance.status != "down":
3068                 val = "ERROR_down"
3069               else:
3070                 val = "ADMIN_down"
3071         elif field == "oper_ram":
3072           if instance.primary_node in bad_nodes:
3073             val = None
3074           elif instance.name in live_data:
3075             val = live_data[instance.name].get("memory", "?")
3076           else:
3077             val = "-"
3078         elif field == "disk_template":
3079           val = instance.disk_template
3080         elif field == "ip":
3081           val = instance.nics[0].ip
3082         elif field == "bridge":
3083           val = instance.nics[0].bridge
3084         elif field == "mac":
3085           val = instance.nics[0].mac
3086         elif field == "sda_size" or field == "sdb_size":
3087           idx = ord(field[2]) - ord('a')
3088           try:
3089             val = instance.FindDisk(idx).size
3090           except errors.OpPrereqError:
3091             val = None
3092         elif field == "tags":
3093           val = list(instance.GetTags())
3094         elif field == "serial_no":
3095           val = instance.serial_no
3096         elif field == "network_port":
3097           val = instance.network_port
3098         elif field == "hypervisor":
3099           val = instance.hypervisor
3100         elif field == "hvparams":
3101           val = i_hv
3102         elif (field.startswith(HVPREFIX) and
3103               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3104           val = i_hv.get(field[len(HVPREFIX):], None)
3105         elif field == "beparams":
3106           val = i_be
3107         elif (field.startswith(BEPREFIX) and
3108               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3109           val = i_be.get(field[len(BEPREFIX):], None)
3110         elif st_match and st_match.groups():
3111           # matches a variable list
3112           st_groups = st_match.groups()
3113           if st_groups and st_groups[0] == "disk":
3114             if st_groups[1] == "count":
3115               val = len(instance.disks)
3116             elif st_groups[1] == "sizes":
3117               val = [disk.size for disk in instance.disks]
3118             elif st_groups[1] == "size":
3119               try:
3120                 val = instance.FindDisk(st_groups[2]).size
3121               except errors.OpPrereqError:
3122                 val = None
3123             else:
3124               assert False, "Unhandled disk parameter"
3125           elif st_groups[0] == "nic":
3126             if st_groups[1] == "count":
3127               val = len(instance.nics)
3128             elif st_groups[1] == "macs":
3129               val = [nic.mac for nic in instance.nics]
3130             elif st_groups[1] == "ips":
3131               val = [nic.ip for nic in instance.nics]
3132             elif st_groups[1] == "bridges":
3133               val = [nic.bridge for nic in instance.nics]
3134             else:
3135               # index-based item
3136               nic_idx = int(st_groups[2])
3137               if nic_idx >= len(instance.nics):
3138                 val = None
3139               else:
3140                 if st_groups[1] == "mac":
3141                   val = instance.nics[nic_idx].mac
3142                 elif st_groups[1] == "ip":
3143                   val = instance.nics[nic_idx].ip
3144                 elif st_groups[1] == "bridge":
3145                   val = instance.nics[nic_idx].bridge
3146                 else:
3147                   assert False, "Unhandled NIC parameter"
3148           else:
3149             assert False, "Unhandled variable parameter"
3150         else:
3151           raise errors.ParameterError(field)
3152         iout.append(val)
3153       output.append(iout)
3154
3155     return output
3156
3157
3158 class LUFailoverInstance(LogicalUnit):
3159   """Failover an instance.
3160
3161   """
3162   HPATH = "instance-failover"
3163   HTYPE = constants.HTYPE_INSTANCE
3164   _OP_REQP = ["instance_name", "ignore_consistency"]
3165   REQ_BGL = False
3166
3167   def ExpandNames(self):
3168     self._ExpandAndLockInstance()
3169     self.needed_locks[locking.LEVEL_NODE] = []
3170     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3171
3172   def DeclareLocks(self, level):
3173     if level == locking.LEVEL_NODE:
3174       self._LockInstancesNodes()
3175
3176   def BuildHooksEnv(self):
3177     """Build hooks env.
3178
3179     This runs on master, primary and secondary nodes of the instance.
3180
3181     """
3182     env = {
3183       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3184       }
3185     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3186     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3187     return env, nl, nl
3188
3189   def CheckPrereq(self):
3190     """Check prerequisites.
3191
3192     This checks that the instance is in the cluster.
3193
3194     """
3195     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3196     assert self.instance is not None, \
3197       "Cannot retrieve locked instance %s" % self.op.instance_name
3198
3199     bep = self.cfg.GetClusterInfo().FillBE(instance)
3200     if instance.disk_template not in constants.DTS_NET_MIRROR:
3201       raise errors.OpPrereqError("Instance's disk layout is not"
3202                                  " network mirrored, cannot failover.")
3203
3204     secondary_nodes = instance.secondary_nodes
3205     if not secondary_nodes:
3206       raise errors.ProgrammerError("no secondary node but using "
3207                                    "a mirrored disk template")
3208
3209     target_node = secondary_nodes[0]
3210     # check memory requirements on the secondary node
3211     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3212                          instance.name, bep[constants.BE_MEMORY],
3213                          instance.hypervisor)
3214
3215     # check bridge existance
3216     brlist = [nic.bridge for nic in instance.nics]
3217     result = self.rpc.call_bridges_exist(target_node, brlist)
3218     result.Raise()
3219     if not result.data:
3220       raise errors.OpPrereqError("One or more target bridges %s does not"
3221                                  " exist on destination node '%s'" %
3222                                  (brlist, target_node))
3223
3224   def Exec(self, feedback_fn):
3225     """Failover an instance.
3226
3227     The failover is done by shutting it down on its present node and
3228     starting it on the secondary.
3229
3230     """
3231     instance = self.instance
3232
3233     source_node = instance.primary_node
3234     target_node = instance.secondary_nodes[0]
3235
3236     feedback_fn("* checking disk consistency between source and target")
3237     for dev in instance.disks:
3238       # for drbd, these are drbd over lvm
3239       if not _CheckDiskConsistency(self, dev, target_node, False):
3240         if instance.status == "up" and not self.op.ignore_consistency:
3241           raise errors.OpExecError("Disk %s is degraded on target node,"
3242                                    " aborting failover." % dev.iv_name)
3243
3244     feedback_fn("* shutting down instance on source node")
3245     logging.info("Shutting down instance %s on node %s",
3246                  instance.name, source_node)
3247
3248     result = self.rpc.call_instance_shutdown(source_node, instance)
3249     if result.failed or not result.data:
3250       if self.op.ignore_consistency:
3251         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3252                              " Proceeding"
3253                              " anyway. Please make sure node %s is down",
3254                              instance.name, source_node, source_node)
3255       else:
3256         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3257                                  (instance.name, source_node))
3258
3259     feedback_fn("* deactivating the instance's disks on source node")
3260     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3261       raise errors.OpExecError("Can't shut down the instance's disks.")
3262
3263     instance.primary_node = target_node
3264     # distribute new instance config to the other nodes
3265     self.cfg.Update(instance)
3266
3267     # Only start the instance if it's marked as up
3268     if instance.status == "up":
3269       feedback_fn("* activating the instance's disks on target node")
3270       logging.info("Starting instance %s on node %s",
3271                    instance.name, target_node)
3272
3273       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3274                                                ignore_secondaries=True)
3275       if not disks_ok:
3276         _ShutdownInstanceDisks(self, instance)
3277         raise errors.OpExecError("Can't activate the instance's disks")
3278
3279       feedback_fn("* starting the instance on the target node")
3280       result = self.rpc.call_instance_start(target_node, instance, None)
3281       if result.failed or not result.data:
3282         _ShutdownInstanceDisks(self, instance)
3283         raise errors.OpExecError("Could not start instance %s on node %s." %
3284                                  (instance.name, target_node))
3285
3286
3287 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3288   """Create a tree of block devices on the primary node.
3289
3290   This always creates all devices.
3291
3292   """
3293   if device.children:
3294     for child in device.children:
3295       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3296         return False
3297
3298   lu.cfg.SetDiskID(device, node)
3299   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3300                                        instance.name, True, info)
3301   if new_id.failed or not new_id.data:
3302     return False
3303   if device.physical_id is None:
3304     device.physical_id = new_id
3305   return True
3306
3307
3308 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3309   """Create a tree of block devices on a secondary node.
3310
3311   If this device type has to be created on secondaries, create it and
3312   all its children.
3313
3314   If not, just recurse to children keeping the same 'force' value.
3315
3316   """
3317   if device.CreateOnSecondary():
3318     force = True
3319   if device.children:
3320     for child in device.children:
3321       if not _CreateBlockDevOnSecondary(lu, node, instance,
3322                                         child, force, info):
3323         return False
3324
3325   if not force:
3326     return True
3327   lu.cfg.SetDiskID(device, node)
3328   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3329                                        instance.name, False, info)
3330   if new_id.failed or not new_id.data:
3331     return False
3332   if device.physical_id is None:
3333     device.physical_id = new_id
3334   return True
3335
3336
3337 def _GenerateUniqueNames(lu, exts):
3338   """Generate a suitable LV name.
3339
3340   This will generate a logical volume name for the given instance.
3341
3342   """
3343   results = []
3344   for val in exts:
3345     new_id = lu.cfg.GenerateUniqueID()
3346     results.append("%s%s" % (new_id, val))
3347   return results
3348
3349
3350 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3351                          p_minor, s_minor):
3352   """Generate a drbd8 device complete with its children.
3353
3354   """
3355   port = lu.cfg.AllocatePort()
3356   vgname = lu.cfg.GetVGName()
3357   shared_secret = lu.cfg.GenerateDRBDSecret()
3358   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3359                           logical_id=(vgname, names[0]))
3360   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3361                           logical_id=(vgname, names[1]))
3362   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3363                           logical_id=(primary, secondary, port,
3364                                       p_minor, s_minor,
3365                                       shared_secret),
3366                           children=[dev_data, dev_meta],
3367                           iv_name=iv_name)
3368   return drbd_dev
3369
3370
3371 def _GenerateDiskTemplate(lu, template_name,
3372                           instance_name, primary_node,
3373                           secondary_nodes, disk_info,
3374                           file_storage_dir, file_driver,
3375                           base_index):
3376   """Generate the entire disk layout for a given template type.
3377
3378   """
3379   #TODO: compute space requirements
3380
3381   vgname = lu.cfg.GetVGName()
3382   disk_count = len(disk_info)
3383   disks = []
3384   if template_name == constants.DT_DISKLESS:
3385     pass
3386   elif template_name == constants.DT_PLAIN:
3387     if len(secondary_nodes) != 0:
3388       raise errors.ProgrammerError("Wrong template configuration")
3389
3390     names = _GenerateUniqueNames(lu, [".disk%d" % i
3391                                       for i in range(disk_count)])
3392     for idx, disk in enumerate(disk_info):
3393       disk_index = idx + base_index
3394       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3395                               logical_id=(vgname, names[idx]),
3396                               iv_name="disk/%d" % disk_index)
3397       disks.append(disk_dev)
3398   elif template_name == constants.DT_DRBD8:
3399     if len(secondary_nodes) != 1:
3400       raise errors.ProgrammerError("Wrong template configuration")
3401     remote_node = secondary_nodes[0]
3402     minors = lu.cfg.AllocateDRBDMinor(
3403       [primary_node, remote_node] * len(disk_info), instance_name)
3404
3405     names = _GenerateUniqueNames(lu,
3406                                  [".disk%d_%s" % (i, s)
3407                                   for i in range(disk_count)
3408                                   for s in ("data", "meta")
3409                                   ])
3410     for idx, disk in enumerate(disk_info):
3411       disk_index = idx + base_index
3412       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3413                                       disk["size"], names[idx*2:idx*2+2],
3414                                       "disk/%d" % disk_index,
3415                                       minors[idx*2], minors[idx*2+1])
3416       disks.append(disk_dev)
3417   elif template_name == constants.DT_FILE:
3418     if len(secondary_nodes) != 0:
3419       raise errors.ProgrammerError("Wrong template configuration")
3420
3421     for idx, disk in enumerate(disk_info):
3422       disk_index = idx + base_index
3423       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3424                               iv_name="disk/%d" % disk_index,
3425                               logical_id=(file_driver,
3426                                           "%s/disk%d" % (file_storage_dir,
3427                                                          idx)))
3428       disks.append(disk_dev)
3429   else:
3430     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3431   return disks
3432
3433
3434 def _GetInstanceInfoText(instance):
3435   """Compute that text that should be added to the disk's metadata.
3436
3437   """
3438   return "originstname+%s" % instance.name
3439
3440
3441 def _CreateDisks(lu, instance):
3442   """Create all disks for an instance.
3443
3444   This abstracts away some work from AddInstance.
3445
3446   @type lu: L{LogicalUnit}
3447   @param lu: the logical unit on whose behalf we execute
3448   @type instance: L{objects.Instance}
3449   @param instance: the instance whose disks we should create
3450   @rtype: boolean
3451   @return: the success of the creation
3452
3453   """
3454   info = _GetInstanceInfoText(instance)
3455
3456   if instance.disk_template == constants.DT_FILE:
3457     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3458     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3459                                                  file_storage_dir)
3460
3461     if result.failed or not result.data:
3462       logging.error("Could not connect to node '%s'", instance.primary_node)
3463       return False
3464
3465     if not result.data[0]:
3466       logging.error("Failed to create directory '%s'", file_storage_dir)
3467       return False
3468
3469   # Note: this needs to be kept in sync with adding of disks in
3470   # LUSetInstanceParams
3471   for device in instance.disks:
3472     logging.info("Creating volume %s for instance %s",
3473                  device.iv_name, instance.name)
3474     #HARDCODE
3475     for secondary_node in instance.secondary_nodes:
3476       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3477                                         device, False, info):
3478         logging.error("Failed to create volume %s (%s) on secondary node %s!",
3479                       device.iv_name, device, secondary_node)
3480         return False
3481     #HARDCODE
3482     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3483                                     instance, device, info):
3484       logging.error("Failed to create volume %s on primary!", device.iv_name)
3485       return False
3486
3487   return True
3488
3489
3490 def _RemoveDisks(lu, instance):
3491   """Remove all disks for an instance.
3492
3493   This abstracts away some work from `AddInstance()` and
3494   `RemoveInstance()`. Note that in case some of the devices couldn't
3495   be removed, the removal will continue with the other ones (compare
3496   with `_CreateDisks()`).
3497
3498   @type lu: L{LogicalUnit}
3499   @param lu: the logical unit on whose behalf we execute
3500   @type instance: L{objects.Instance}
3501   @param instance: the instance whose disks we should remove
3502   @rtype: boolean
3503   @return: the success of the removal
3504
3505   """
3506   logging.info("Removing block devices for instance %s", instance.name)
3507
3508   result = True
3509   for device in instance.disks:
3510     for node, disk in device.ComputeNodeTree(instance.primary_node):
3511       lu.cfg.SetDiskID(disk, node)
3512       result = lu.rpc.call_blockdev_remove(node, disk)
3513       if result.failed or not result.data:
3514         lu.proc.LogWarning("Could not remove block device %s on node %s,"
3515                            " continuing anyway", device.iv_name, node)
3516         result = False
3517
3518   if instance.disk_template == constants.DT_FILE:
3519     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3520     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3521                                                  file_storage_dir)
3522     if result.failed or not result.data:
3523       logging.error("Could not remove directory '%s'", file_storage_dir)
3524       result = False
3525
3526   return result
3527
3528
3529 def _ComputeDiskSize(disk_template, disks):
3530   """Compute disk size requirements in the volume group
3531
3532   """
3533   # Required free disk space as a function of disk and swap space
3534   req_size_dict = {
3535     constants.DT_DISKLESS: None,
3536     constants.DT_PLAIN: sum(d["size"] for d in disks),
3537     # 128 MB are added for drbd metadata for each disk
3538     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3539     constants.DT_FILE: None,
3540   }
3541
3542   if disk_template not in req_size_dict:
3543     raise errors.ProgrammerError("Disk template '%s' size requirement"
3544                                  " is unknown" %  disk_template)
3545
3546   return req_size_dict[disk_template]
3547
3548
3549 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3550   """Hypervisor parameter validation.
3551
3552   This function abstract the hypervisor parameter validation to be
3553   used in both instance create and instance modify.
3554
3555   @type lu: L{LogicalUnit}
3556   @param lu: the logical unit for which we check
3557   @type nodenames: list
3558   @param nodenames: the list of nodes on which we should check
3559   @type hvname: string
3560   @param hvname: the name of the hypervisor we should use
3561   @type hvparams: dict
3562   @param hvparams: the parameters which we need to check
3563   @raise errors.OpPrereqError: if the parameters are not valid
3564
3565   """
3566   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3567                                                   hvname,
3568                                                   hvparams)
3569   for node in nodenames:
3570     info = hvinfo[node]
3571     info.Raise()
3572     if not info.data or not isinstance(info.data, (tuple, list)):
3573       raise errors.OpPrereqError("Cannot get current information"
3574                                  " from node '%s' (%s)" % (node, info.data))
3575     if not info.data[0]:
3576       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3577                                  " %s" % info.data[1])
3578
3579
3580 class LUCreateInstance(LogicalUnit):
3581   """Create an instance.
3582
3583   """
3584   HPATH = "instance-add"
3585   HTYPE = constants.HTYPE_INSTANCE
3586   _OP_REQP = ["instance_name", "disks", "disk_template",
3587               "mode", "start",
3588               "wait_for_sync", "ip_check", "nics",
3589               "hvparams", "beparams"]
3590   REQ_BGL = False
3591
3592   def _ExpandNode(self, node):
3593     """Expands and checks one node name.
3594
3595     """
3596     node_full = self.cfg.ExpandNodeName(node)
3597     if node_full is None:
3598       raise errors.OpPrereqError("Unknown node %s" % node)
3599     return node_full
3600
3601   def ExpandNames(self):
3602     """ExpandNames for CreateInstance.
3603
3604     Figure out the right locks for instance creation.
3605
3606     """
3607     self.needed_locks = {}
3608
3609     # set optional parameters to none if they don't exist
3610     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3611       if not hasattr(self.op, attr):
3612         setattr(self.op, attr, None)
3613
3614     # cheap checks, mostly valid constants given
3615
3616     # verify creation mode
3617     if self.op.mode not in (constants.INSTANCE_CREATE,
3618                             constants.INSTANCE_IMPORT):
3619       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3620                                  self.op.mode)
3621
3622     # disk template and mirror node verification
3623     if self.op.disk_template not in constants.DISK_TEMPLATES:
3624       raise errors.OpPrereqError("Invalid disk template name")
3625
3626     if self.op.hypervisor is None:
3627       self.op.hypervisor = self.cfg.GetHypervisorType()
3628
3629     cluster = self.cfg.GetClusterInfo()
3630     enabled_hvs = cluster.enabled_hypervisors
3631     if self.op.hypervisor not in enabled_hvs:
3632       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3633                                  " cluster (%s)" % (self.op.hypervisor,
3634                                   ",".join(enabled_hvs)))
3635
3636     # check hypervisor parameter syntax (locally)
3637
3638     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3639                                   self.op.hvparams)
3640     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3641     hv_type.CheckParameterSyntax(filled_hvp)
3642
3643     # fill and remember the beparams dict
3644     utils.CheckBEParams(self.op.beparams)
3645     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3646                                     self.op.beparams)
3647
3648     #### instance parameters check
3649
3650     # instance name verification
3651     hostname1 = utils.HostInfo(self.op.instance_name)
3652     self.op.instance_name = instance_name = hostname1.name
3653
3654     # this is just a preventive check, but someone might still add this
3655     # instance in the meantime, and creation will fail at lock-add time
3656     if instance_name in self.cfg.GetInstanceList():
3657       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3658                                  instance_name)
3659
3660     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3661
3662     # NIC buildup
3663     self.nics = []
3664     for nic in self.op.nics:
3665       # ip validity checks
3666       ip = nic.get("ip", None)
3667       if ip is None or ip.lower() == "none":
3668         nic_ip = None
3669       elif ip.lower() == constants.VALUE_AUTO:
3670         nic_ip = hostname1.ip
3671       else:
3672         if not utils.IsValidIP(ip):
3673           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3674                                      " like a valid IP" % ip)
3675         nic_ip = ip
3676
3677       # MAC address verification
3678       mac = nic.get("mac", constants.VALUE_AUTO)
3679       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3680         if not utils.IsValidMac(mac.lower()):
3681           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3682                                      mac)
3683       # bridge verification
3684       bridge = nic.get("bridge", self.cfg.GetDefBridge())
3685       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3686
3687     # disk checks/pre-build
3688     self.disks = []
3689     for disk in self.op.disks:
3690       mode = disk.get("mode", constants.DISK_RDWR)
3691       if mode not in constants.DISK_ACCESS_SET:
3692         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3693                                    mode)
3694       size = disk.get("size", None)
3695       if size is None:
3696         raise errors.OpPrereqError("Missing disk size")
3697       try:
3698         size = int(size)
3699       except ValueError:
3700         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3701       self.disks.append({"size": size, "mode": mode})
3702
3703     # used in CheckPrereq for ip ping check
3704     self.check_ip = hostname1.ip
3705
3706     # file storage checks
3707     if (self.op.file_driver and
3708         not self.op.file_driver in constants.FILE_DRIVER):
3709       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3710                                  self.op.file_driver)
3711
3712     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3713       raise errors.OpPrereqError("File storage directory path not absolute")
3714
3715     ### Node/iallocator related checks
3716     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3717       raise errors.OpPrereqError("One and only one of iallocator and primary"
3718                                  " node must be given")
3719
3720     if self.op.iallocator:
3721       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3722     else:
3723       self.op.pnode = self._ExpandNode(self.op.pnode)
3724       nodelist = [self.op.pnode]
3725       if self.op.snode is not None:
3726         self.op.snode = self._ExpandNode(self.op.snode)
3727         nodelist.append(self.op.snode)
3728       self.needed_locks[locking.LEVEL_NODE] = nodelist
3729
3730     # in case of import lock the source node too
3731     if self.op.mode == constants.INSTANCE_IMPORT:
3732       src_node = getattr(self.op, "src_node", None)
3733       src_path = getattr(self.op, "src_path", None)
3734
3735       if src_path is None:
3736         self.op.src_path = src_path = self.op.instance_name
3737
3738       if src_node is None:
3739         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3740         self.op.src_node = None
3741         if os.path.isabs(src_path):
3742           raise errors.OpPrereqError("Importing an instance from an absolute"
3743                                      " path requires a source node option.")
3744       else:
3745         self.op.src_node = src_node = self._ExpandNode(src_node)
3746         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3747           self.needed_locks[locking.LEVEL_NODE].append(src_node)
3748         if not os.path.isabs(src_path):
3749           self.op.src_path = src_path = \
3750             os.path.join(constants.EXPORT_DIR, src_path)
3751
3752     else: # INSTANCE_CREATE
3753       if getattr(self.op, "os_type", None) is None:
3754         raise errors.OpPrereqError("No guest OS specified")
3755
3756   def _RunAllocator(self):
3757     """Run the allocator based on input opcode.
3758
3759     """
3760     nics = [n.ToDict() for n in self.nics]
3761     ial = IAllocator(self,
3762                      mode=constants.IALLOCATOR_MODE_ALLOC,
3763                      name=self.op.instance_name,
3764                      disk_template=self.op.disk_template,
3765                      tags=[],
3766                      os=self.op.os_type,
3767                      vcpus=self.be_full[constants.BE_VCPUS],
3768                      mem_size=self.be_full[constants.BE_MEMORY],
3769                      disks=self.disks,
3770                      nics=nics,
3771                      hypervisor=self.op.hypervisor,
3772                      )
3773
3774     ial.Run(self.op.iallocator)
3775
3776     if not ial.success:
3777       raise errors.OpPrereqError("Can't compute nodes using"
3778                                  " iallocator '%s': %s" % (self.op.iallocator,
3779                                                            ial.info))
3780     if len(ial.nodes) != ial.required_nodes:
3781       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3782                                  " of nodes (%s), required %s" %
3783                                  (self.op.iallocator, len(ial.nodes),
3784                                   ial.required_nodes))
3785     self.op.pnode = ial.nodes[0]
3786     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3787                  self.op.instance_name, self.op.iallocator,
3788                  ", ".join(ial.nodes))
3789     if ial.required_nodes == 2:
3790       self.op.snode = ial.nodes[1]
3791
3792   def BuildHooksEnv(self):
3793     """Build hooks env.
3794
3795     This runs on master, primary and secondary nodes of the instance.
3796
3797     """
3798     env = {
3799       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3800       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3801       "INSTANCE_ADD_MODE": self.op.mode,
3802       }
3803     if self.op.mode == constants.INSTANCE_IMPORT:
3804       env["INSTANCE_SRC_NODE"] = self.op.src_node
3805       env["INSTANCE_SRC_PATH"] = self.op.src_path
3806       env["INSTANCE_SRC_IMAGES"] = self.src_images
3807
3808     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3809       primary_node=self.op.pnode,
3810       secondary_nodes=self.secondaries,
3811       status=self.instance_status,
3812       os_type=self.op.os_type,
3813       memory=self.be_full[constants.BE_MEMORY],
3814       vcpus=self.be_full[constants.BE_VCPUS],
3815       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3816     ))
3817
3818     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3819           self.secondaries)
3820     return env, nl, nl
3821
3822
3823   def CheckPrereq(self):
3824     """Check prerequisites.
3825
3826     """
3827     if (not self.cfg.GetVGName() and
3828         self.op.disk_template not in constants.DTS_NOT_LVM):
3829       raise errors.OpPrereqError("Cluster does not support lvm-based"
3830                                  " instances")
3831
3832
3833     if self.op.mode == constants.INSTANCE_IMPORT:
3834       src_node = self.op.src_node
3835       src_path = self.op.src_path
3836
3837       if src_node is None:
3838         exp_list = self.rpc.call_export_list(
3839           self.acquired_locks[locking.LEVEL_NODE])
3840         found = False
3841         for node in exp_list:
3842           if not exp_list[node].failed and src_path in exp_list[node].data:
3843             found = True
3844             self.op.src_node = src_node = node
3845             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3846                                                        src_path)
3847             break
3848         if not found:
3849           raise errors.OpPrereqError("No export found for relative path %s" %
3850                                       src_path)
3851
3852       result = self.rpc.call_export_info(src_node, src_path)
3853       result.Raise()
3854       if not result.data:
3855         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3856
3857       export_info = result.data
3858       if not export_info.has_section(constants.INISECT_EXP):
3859         raise errors.ProgrammerError("Corrupted export config")
3860
3861       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3862       if (int(ei_version) != constants.EXPORT_VERSION):
3863         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3864                                    (ei_version, constants.EXPORT_VERSION))
3865
3866       # Check that the new instance doesn't have less disks than the export
3867       instance_disks = len(self.disks)
3868       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3869       if instance_disks < export_disks:
3870         raise errors.OpPrereqError("Not enough disks to import."
3871                                    " (instance: %d, export: %d)" %
3872                                    (instance_disks, export_disks))
3873
3874       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3875       disk_images = []
3876       for idx in range(export_disks):
3877         option = 'disk%d_dump' % idx
3878         if export_info.has_option(constants.INISECT_INS, option):
3879           # FIXME: are the old os-es, disk sizes, etc. useful?
3880           export_name = export_info.get(constants.INISECT_INS, option)
3881           image = os.path.join(src_path, export_name)
3882           disk_images.append(image)
3883         else:
3884           disk_images.append(False)
3885
3886       self.src_images = disk_images
3887
3888       old_name = export_info.get(constants.INISECT_INS, 'name')
3889       # FIXME: int() here could throw a ValueError on broken exports
3890       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3891       if self.op.instance_name == old_name:
3892         for idx, nic in enumerate(self.nics):
3893           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3894             nic_mac_ini = 'nic%d_mac' % idx
3895             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3896
3897     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3898     if self.op.start and not self.op.ip_check:
3899       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3900                                  " adding an instance in start mode")
3901
3902     if self.op.ip_check:
3903       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3904         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3905                                    (self.check_ip, self.op.instance_name))
3906
3907     #### allocator run
3908
3909     if self.op.iallocator is not None:
3910       self._RunAllocator()
3911
3912     #### node related checks
3913
3914     # check primary node
3915     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3916     assert self.pnode is not None, \
3917       "Cannot retrieve locked node %s" % self.op.pnode
3918     self.secondaries = []
3919
3920     # mirror node verification
3921     if self.op.disk_template in constants.DTS_NET_MIRROR:
3922       if self.op.snode is None:
3923         raise errors.OpPrereqError("The networked disk templates need"
3924                                    " a mirror node")
3925       if self.op.snode == pnode.name:
3926         raise errors.OpPrereqError("The secondary node cannot be"
3927                                    " the primary node.")
3928       self.secondaries.append(self.op.snode)
3929
3930     nodenames = [pnode.name] + self.secondaries
3931
3932     req_size = _ComputeDiskSize(self.op.disk_template,
3933                                 self.disks)
3934
3935     # Check lv size requirements
3936     if req_size is not None:
3937       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3938                                          self.op.hypervisor)
3939       for node in nodenames:
3940         info = nodeinfo[node]
3941         info.Raise()
3942         info = info.data
3943         if not info:
3944           raise errors.OpPrereqError("Cannot get current information"
3945                                      " from node '%s'" % node)
3946         vg_free = info.get('vg_free', None)
3947         if not isinstance(vg_free, int):
3948           raise errors.OpPrereqError("Can't compute free disk space on"
3949                                      " node %s" % node)
3950         if req_size > info['vg_free']:
3951           raise errors.OpPrereqError("Not enough disk space on target node %s."
3952                                      " %d MB available, %d MB required" %
3953                                      (node, info['vg_free'], req_size))
3954
3955     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3956
3957     # os verification
3958     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3959     result.Raise()
3960     if not isinstance(result.data, objects.OS):
3961       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3962                                  " primary node"  % self.op.os_type)
3963
3964     # bridge check on primary node
3965     bridges = [n.bridge for n in self.nics]
3966     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3967     result.Raise()
3968     if not result.data:
3969       raise errors.OpPrereqError("One of the target bridges '%s' does not"
3970                                  " exist on destination node '%s'" %
3971                                  (",".join(bridges), pnode.name))
3972
3973     # memory check on primary node
3974     if self.op.start:
3975       _CheckNodeFreeMemory(self, self.pnode.name,
3976                            "creating instance %s" % self.op.instance_name,
3977                            self.be_full[constants.BE_MEMORY],
3978                            self.op.hypervisor)
3979
3980     if self.op.start:
3981       self.instance_status = 'up'
3982     else:
3983       self.instance_status = 'down'
3984
3985   def Exec(self, feedback_fn):
3986     """Create and add the instance to the cluster.
3987
3988     """
3989     instance = self.op.instance_name
3990     pnode_name = self.pnode.name
3991
3992     for nic in self.nics:
3993       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3994         nic.mac = self.cfg.GenerateMAC()
3995
3996     ht_kind = self.op.hypervisor
3997     if ht_kind in constants.HTS_REQ_PORT:
3998       network_port = self.cfg.AllocatePort()
3999     else:
4000       network_port = None
4001
4002     ##if self.op.vnc_bind_address is None:
4003     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4004
4005     # this is needed because os.path.join does not accept None arguments
4006     if self.op.file_storage_dir is None:
4007       string_file_storage_dir = ""
4008     else:
4009       string_file_storage_dir = self.op.file_storage_dir
4010
4011     # build the full file storage dir path
4012     file_storage_dir = os.path.normpath(os.path.join(
4013                                         self.cfg.GetFileStorageDir(),
4014                                         string_file_storage_dir, instance))
4015
4016
4017     disks = _GenerateDiskTemplate(self,
4018                                   self.op.disk_template,
4019                                   instance, pnode_name,
4020                                   self.secondaries,
4021                                   self.disks,
4022                                   file_storage_dir,
4023                                   self.op.file_driver,
4024                                   0)
4025
4026     iobj = objects.Instance(name=instance, os=self.op.os_type,
4027                             primary_node=pnode_name,
4028                             nics=self.nics, disks=disks,
4029                             disk_template=self.op.disk_template,
4030                             status=self.instance_status,
4031                             network_port=network_port,
4032                             beparams=self.op.beparams,
4033                             hvparams=self.op.hvparams,
4034                             hypervisor=self.op.hypervisor,
4035                             )
4036
4037     feedback_fn("* creating instance disks...")
4038     if not _CreateDisks(self, iobj):
4039       _RemoveDisks(self, iobj)
4040       self.cfg.ReleaseDRBDMinors(instance)
4041       raise errors.OpExecError("Device creation failed, reverting...")
4042
4043     feedback_fn("adding instance %s to cluster config" % instance)
4044
4045     self.cfg.AddInstance(iobj)
4046     # Declare that we don't want to remove the instance lock anymore, as we've
4047     # added the instance to the config
4048     del self.remove_locks[locking.LEVEL_INSTANCE]
4049     # Remove the temp. assignements for the instance's drbds
4050     self.cfg.ReleaseDRBDMinors(instance)
4051     # Unlock all the nodes
4052     if self.op.mode == constants.INSTANCE_IMPORT:
4053       nodes_keep = [self.op.src_node]
4054       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4055                        if node != self.op.src_node]
4056       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4057       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4058     else:
4059       self.context.glm.release(locking.LEVEL_NODE)
4060       del self.acquired_locks[locking.LEVEL_NODE]
4061
4062     if self.op.wait_for_sync:
4063       disk_abort = not _WaitForSync(self, iobj)
4064     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4065       # make sure the disks are not degraded (still sync-ing is ok)
4066       time.sleep(15)
4067       feedback_fn("* checking mirrors status")
4068       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4069     else:
4070       disk_abort = False
4071
4072     if disk_abort:
4073       _RemoveDisks(self, iobj)
4074       self.cfg.RemoveInstance(iobj.name)
4075       # Make sure the instance lock gets removed
4076       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4077       raise errors.OpExecError("There are some degraded disks for"
4078                                " this instance")
4079
4080     feedback_fn("creating os for instance %s on node %s" %
4081                 (instance, pnode_name))
4082
4083     if iobj.disk_template != constants.DT_DISKLESS:
4084       if self.op.mode == constants.INSTANCE_CREATE:
4085         feedback_fn("* running the instance OS create scripts...")
4086         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4087         result.Raise()
4088         if not result.data:
4089           raise errors.OpExecError("Could not add os for instance %s"
4090                                    " on node %s" %
4091                                    (instance, pnode_name))
4092
4093       elif self.op.mode == constants.INSTANCE_IMPORT:
4094         feedback_fn("* running the instance OS import scripts...")
4095         src_node = self.op.src_node
4096         src_images = self.src_images
4097         cluster_name = self.cfg.GetClusterName()
4098         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4099                                                          src_node, src_images,
4100                                                          cluster_name)
4101         import_result.Raise()
4102         for idx, result in enumerate(import_result.data):
4103           if not result:
4104             self.LogWarning("Could not import the image %s for instance"
4105                             " %s, disk %d, on node %s" %
4106                             (src_images[idx], instance, idx, pnode_name))
4107       else:
4108         # also checked in the prereq part
4109         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4110                                      % self.op.mode)
4111
4112     if self.op.start:
4113       logging.info("Starting instance %s on node %s", instance, pnode_name)
4114       feedback_fn("* starting instance...")
4115       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4116       result.Raise()
4117       if not result.data:
4118         raise errors.OpExecError("Could not start instance")
4119
4120
4121 class LUConnectConsole(NoHooksLU):
4122   """Connect to an instance's console.
4123
4124   This is somewhat special in that it returns the command line that
4125   you need to run on the master node in order to connect to the
4126   console.
4127
4128   """
4129   _OP_REQP = ["instance_name"]
4130   REQ_BGL = False
4131
4132   def ExpandNames(self):
4133     self._ExpandAndLockInstance()
4134
4135   def CheckPrereq(self):
4136     """Check prerequisites.
4137
4138     This checks that the instance is in the cluster.
4139
4140     """
4141     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4142     assert self.instance is not None, \
4143       "Cannot retrieve locked instance %s" % self.op.instance_name
4144
4145   def Exec(self, feedback_fn):
4146     """Connect to the console of an instance
4147
4148     """
4149     instance = self.instance
4150     node = instance.primary_node
4151
4152     node_insts = self.rpc.call_instance_list([node],
4153                                              [instance.hypervisor])[node]
4154     node_insts.Raise()
4155
4156     if instance.name not in node_insts.data:
4157       raise errors.OpExecError("Instance %s is not running." % instance.name)
4158
4159     logging.debug("Connecting to console of %s on %s", instance.name, node)
4160
4161     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4162     console_cmd = hyper.GetShellCommandForConsole(instance)
4163
4164     # build ssh cmdline
4165     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4166
4167
4168 class LUReplaceDisks(LogicalUnit):
4169   """Replace the disks of an instance.
4170
4171   """
4172   HPATH = "mirrors-replace"
4173   HTYPE = constants.HTYPE_INSTANCE
4174   _OP_REQP = ["instance_name", "mode", "disks"]
4175   REQ_BGL = False
4176
4177   def ExpandNames(self):
4178     self._ExpandAndLockInstance()
4179
4180     if not hasattr(self.op, "remote_node"):
4181       self.op.remote_node = None
4182
4183     ia_name = getattr(self.op, "iallocator", None)
4184     if ia_name is not None:
4185       if self.op.remote_node is not None:
4186         raise errors.OpPrereqError("Give either the iallocator or the new"
4187                                    " secondary, not both")
4188       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4189     elif self.op.remote_node is not None:
4190       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4191       if remote_node is None:
4192         raise errors.OpPrereqError("Node '%s' not known" %
4193                                    self.op.remote_node)
4194       self.op.remote_node = remote_node
4195       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4196       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4197     else:
4198       self.needed_locks[locking.LEVEL_NODE] = []
4199       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4200
4201   def DeclareLocks(self, level):
4202     # If we're not already locking all nodes in the set we have to declare the
4203     # instance's primary/secondary nodes.
4204     if (level == locking.LEVEL_NODE and
4205         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4206       self._LockInstancesNodes()
4207
4208   def _RunAllocator(self):
4209     """Compute a new secondary node using an IAllocator.
4210
4211     """
4212     ial = IAllocator(self,
4213                      mode=constants.IALLOCATOR_MODE_RELOC,
4214                      name=self.op.instance_name,
4215                      relocate_from=[self.sec_node])
4216
4217     ial.Run(self.op.iallocator)
4218
4219     if not ial.success:
4220       raise errors.OpPrereqError("Can't compute nodes using"
4221                                  " iallocator '%s': %s" % (self.op.iallocator,
4222                                                            ial.info))
4223     if len(ial.nodes) != ial.required_nodes:
4224       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4225                                  " of nodes (%s), required %s" %
4226                                  (len(ial.nodes), ial.required_nodes))
4227     self.op.remote_node = ial.nodes[0]
4228     self.LogInfo("Selected new secondary for the instance: %s",
4229                  self.op.remote_node)
4230
4231   def BuildHooksEnv(self):
4232     """Build hooks env.
4233
4234     This runs on the master, the primary and all the secondaries.
4235
4236     """
4237     env = {
4238       "MODE": self.op.mode,
4239       "NEW_SECONDARY": self.op.remote_node,
4240       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4241       }
4242     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4243     nl = [
4244       self.cfg.GetMasterNode(),
4245       self.instance.primary_node,
4246       ]
4247     if self.op.remote_node is not None:
4248       nl.append(self.op.remote_node)
4249     return env, nl, nl
4250
4251   def CheckPrereq(self):
4252     """Check prerequisites.
4253
4254     This checks that the instance is in the cluster.
4255
4256     """
4257     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4258     assert instance is not None, \
4259       "Cannot retrieve locked instance %s" % self.op.instance_name
4260     self.instance = instance
4261
4262     if instance.disk_template not in constants.DTS_NET_MIRROR:
4263       raise errors.OpPrereqError("Instance's disk layout is not"
4264                                  " network mirrored.")
4265
4266     if len(instance.secondary_nodes) != 1:
4267       raise errors.OpPrereqError("The instance has a strange layout,"
4268                                  " expected one secondary but found %d" %
4269                                  len(instance.secondary_nodes))
4270
4271     self.sec_node = instance.secondary_nodes[0]
4272
4273     ia_name = getattr(self.op, "iallocator", None)
4274     if ia_name is not None:
4275       self._RunAllocator()
4276
4277     remote_node = self.op.remote_node
4278     if remote_node is not None:
4279       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4280       assert self.remote_node_info is not None, \
4281         "Cannot retrieve locked node %s" % remote_node
4282     else:
4283       self.remote_node_info = None
4284     if remote_node == instance.primary_node:
4285       raise errors.OpPrereqError("The specified node is the primary node of"
4286                                  " the instance.")
4287     elif remote_node == self.sec_node:
4288       if self.op.mode == constants.REPLACE_DISK_SEC:
4289         # this is for DRBD8, where we can't execute the same mode of
4290         # replacement as for drbd7 (no different port allocated)
4291         raise errors.OpPrereqError("Same secondary given, cannot execute"
4292                                    " replacement")
4293     if instance.disk_template == constants.DT_DRBD8:
4294       if (self.op.mode == constants.REPLACE_DISK_ALL and
4295           remote_node is not None):
4296         # switch to replace secondary mode
4297         self.op.mode = constants.REPLACE_DISK_SEC
4298
4299       if self.op.mode == constants.REPLACE_DISK_ALL:
4300         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4301                                    " secondary disk replacement, not"
4302                                    " both at once")
4303       elif self.op.mode == constants.REPLACE_DISK_PRI:
4304         if remote_node is not None:
4305           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4306                                      " the secondary while doing a primary"
4307                                      " node disk replacement")
4308         self.tgt_node = instance.primary_node
4309         self.oth_node = instance.secondary_nodes[0]
4310       elif self.op.mode == constants.REPLACE_DISK_SEC:
4311         self.new_node = remote_node # this can be None, in which case
4312                                     # we don't change the secondary
4313         self.tgt_node = instance.secondary_nodes[0]
4314         self.oth_node = instance.primary_node
4315       else:
4316         raise errors.ProgrammerError("Unhandled disk replace mode")
4317
4318     if not self.op.disks:
4319       self.op.disks = range(len(instance.disks))
4320
4321     for disk_idx in self.op.disks:
4322       instance.FindDisk(disk_idx)
4323
4324   def _ExecD8DiskOnly(self, feedback_fn):
4325     """Replace a disk on the primary or secondary for dbrd8.
4326
4327     The algorithm for replace is quite complicated:
4328
4329       1. for each disk to be replaced:
4330
4331         1. create new LVs on the target node with unique names
4332         1. detach old LVs from the drbd device
4333         1. rename old LVs to name_replaced.<time_t>
4334         1. rename new LVs to old LVs
4335         1. attach the new LVs (with the old names now) to the drbd device
4336
4337       1. wait for sync across all devices
4338
4339       1. for each modified disk:
4340
4341         1. remove old LVs (which have the name name_replaces.<time_t>)
4342
4343     Failures are not very well handled.
4344
4345     """
4346     steps_total = 6
4347     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4348     instance = self.instance
4349     iv_names = {}
4350     vgname = self.cfg.GetVGName()
4351     # start of work
4352     cfg = self.cfg
4353     tgt_node = self.tgt_node
4354     oth_node = self.oth_node
4355
4356     # Step: check device activation
4357     self.proc.LogStep(1, steps_total, "check device existence")
4358     info("checking volume groups")
4359     my_vg = cfg.GetVGName()
4360     results = self.rpc.call_vg_list([oth_node, tgt_node])
4361     if not results:
4362       raise errors.OpExecError("Can't list volume groups on the nodes")
4363     for node in oth_node, tgt_node:
4364       res = results[node]
4365       if res.failed or not res.data or my_vg not in res.data:
4366         raise errors.OpExecError("Volume group '%s' not found on %s" %
4367                                  (my_vg, node))
4368     for idx, dev in enumerate(instance.disks):
4369       if idx not in self.op.disks:
4370         continue
4371       for node in tgt_node, oth_node:
4372         info("checking disk/%d on %s" % (idx, node))
4373         cfg.SetDiskID(dev, node)
4374         if not self.rpc.call_blockdev_find(node, dev):
4375           raise errors.OpExecError("Can't find disk/%d on node %s" %
4376                                    (idx, node))
4377
4378     # Step: check other node consistency
4379     self.proc.LogStep(2, steps_total, "check peer consistency")
4380     for idx, dev in enumerate(instance.disks):
4381       if idx not in self.op.disks:
4382         continue
4383       info("checking disk/%d consistency on %s" % (idx, oth_node))
4384       if not _CheckDiskConsistency(self, dev, oth_node,
4385                                    oth_node==instance.primary_node):
4386         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4387                                  " to replace disks on this node (%s)" %
4388                                  (oth_node, tgt_node))
4389
4390     # Step: create new storage
4391     self.proc.LogStep(3, steps_total, "allocate new storage")
4392     for idx, dev in enumerate(instance.disks):
4393       if idx not in self.op.disks:
4394         continue
4395       size = dev.size
4396       cfg.SetDiskID(dev, tgt_node)
4397       lv_names = [".disk%d_%s" % (idx, suf)
4398                   for suf in ["data", "meta"]]
4399       names = _GenerateUniqueNames(self, lv_names)
4400       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4401                              logical_id=(vgname, names[0]))
4402       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4403                              logical_id=(vgname, names[1]))
4404       new_lvs = [lv_data, lv_meta]
4405       old_lvs = dev.children
4406       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4407       info("creating new local storage on %s for %s" %
4408            (tgt_node, dev.iv_name))
4409       # since we *always* want to create this LV, we use the
4410       # _Create...OnPrimary (which forces the creation), even if we
4411       # are talking about the secondary node
4412       for new_lv in new_lvs:
4413         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4414                                         _GetInstanceInfoText(instance)):
4415           raise errors.OpExecError("Failed to create new LV named '%s' on"
4416                                    " node '%s'" %
4417                                    (new_lv.logical_id[1], tgt_node))
4418
4419     # Step: for each lv, detach+rename*2+attach
4420     self.proc.LogStep(4, steps_total, "change drbd configuration")
4421     for dev, old_lvs, new_lvs in iv_names.itervalues():
4422       info("detaching %s drbd from local storage" % dev.iv_name)
4423       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4424       result.Raise()
4425       if not result.data:
4426         raise errors.OpExecError("Can't detach drbd from local storage on node"
4427                                  " %s for device %s" % (tgt_node, dev.iv_name))
4428       #dev.children = []
4429       #cfg.Update(instance)
4430
4431       # ok, we created the new LVs, so now we know we have the needed
4432       # storage; as such, we proceed on the target node to rename
4433       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4434       # using the assumption that logical_id == physical_id (which in
4435       # turn is the unique_id on that node)
4436
4437       # FIXME(iustin): use a better name for the replaced LVs
4438       temp_suffix = int(time.time())
4439       ren_fn = lambda d, suff: (d.physical_id[0],
4440                                 d.physical_id[1] + "_replaced-%s" % suff)
4441       # build the rename list based on what LVs exist on the node
4442       rlist = []
4443       for to_ren in old_lvs:
4444         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4445         if not find_res.failed and find_res.data is not None: # device exists
4446           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4447
4448       info("renaming the old LVs on the target node")
4449       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4450       result.Raise()
4451       if not result.data:
4452         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4453       # now we rename the new LVs to the old LVs
4454       info("renaming the new LVs on the target node")
4455       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4456       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4457       result.Raise()
4458       if not result.data:
4459         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4460
4461       for old, new in zip(old_lvs, new_lvs):
4462         new.logical_id = old.logical_id
4463         cfg.SetDiskID(new, tgt_node)
4464
4465       for disk in old_lvs:
4466         disk.logical_id = ren_fn(disk, temp_suffix)
4467         cfg.SetDiskID(disk, tgt_node)
4468
4469       # now that the new lvs have the old name, we can add them to the device
4470       info("adding new mirror component on %s" % tgt_node)
4471       result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4472       if result.failed or not result.data:
4473         for new_lv in new_lvs:
4474           result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4475           if result.failed or not result.data:
4476             warning("Can't rollback device %s", hint="manually cleanup unused"
4477                     " logical volumes")
4478         raise errors.OpExecError("Can't add local storage to drbd")
4479
4480       dev.children = new_lvs
4481       cfg.Update(instance)
4482
4483     # Step: wait for sync
4484
4485     # this can fail as the old devices are degraded and _WaitForSync
4486     # does a combined result over all disks, so we don't check its
4487     # return value
4488     self.proc.LogStep(5, steps_total, "sync devices")
4489     _WaitForSync(self, instance, unlock=True)
4490
4491     # so check manually all the devices
4492     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4493       cfg.SetDiskID(dev, instance.primary_node)
4494       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4495       if result.failed or result.data[5]:
4496         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4497
4498     # Step: remove old storage
4499     self.proc.LogStep(6, steps_total, "removing old storage")
4500     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4501       info("remove logical volumes for %s" % name)
4502       for lv in old_lvs:
4503         cfg.SetDiskID(lv, tgt_node)
4504         result = self.rpc.call_blockdev_remove(tgt_node, lv)
4505         if result.failed or not result.data:
4506           warning("Can't remove old LV", hint="manually remove unused LVs")
4507           continue
4508
4509   def _ExecD8Secondary(self, feedback_fn):
4510     """Replace the secondary node for drbd8.
4511
4512     The algorithm for replace is quite complicated:
4513       - for all disks of the instance:
4514         - create new LVs on the new node with same names
4515         - shutdown the drbd device on the old secondary
4516         - disconnect the drbd network on the primary
4517         - create the drbd device on the new secondary
4518         - network attach the drbd on the primary, using an artifice:
4519           the drbd code for Attach() will connect to the network if it
4520           finds a device which is connected to the good local disks but
4521           not network enabled
4522       - wait for sync across all devices
4523       - remove all disks from the old secondary
4524
4525     Failures are not very well handled.
4526
4527     """
4528     steps_total = 6
4529     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4530     instance = self.instance
4531     iv_names = {}
4532     vgname = self.cfg.GetVGName()
4533     # start of work
4534     cfg = self.cfg
4535     old_node = self.tgt_node
4536     new_node = self.new_node
4537     pri_node = instance.primary_node
4538
4539     # Step: check device activation
4540     self.proc.LogStep(1, steps_total, "check device existence")
4541     info("checking volume groups")
4542     my_vg = cfg.GetVGName()
4543     results = self.rpc.call_vg_list([pri_node, new_node])
4544     for node in pri_node, new_node:
4545       res = results[node]
4546       if res.failed or not res.data or my_vg not in res.data:
4547         raise errors.OpExecError("Volume group '%s' not found on %s" %
4548                                  (my_vg, node))
4549     for idx, dev in enumerate(instance.disks):
4550       if idx not in self.op.disks:
4551         continue
4552       info("checking disk/%d on %s" % (idx, pri_node))
4553       cfg.SetDiskID(dev, pri_node)
4554       result = self.rpc.call_blockdev_find(pri_node, dev)
4555       result.Raise()
4556       if not result.data:
4557         raise errors.OpExecError("Can't find disk/%d on node %s" %
4558                                  (idx, pri_node))
4559
4560     # Step: check other node consistency
4561     self.proc.LogStep(2, steps_total, "check peer consistency")
4562     for idx, dev in enumerate(instance.disks):
4563       if idx not in self.op.disks:
4564         continue
4565       info("checking disk/%d consistency on %s" % (idx, pri_node))
4566       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4567         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4568                                  " unsafe to replace the secondary" %
4569                                  pri_node)
4570
4571     # Step: create new storage
4572     self.proc.LogStep(3, steps_total, "allocate new storage")
4573     for idx, dev in enumerate(instance.disks):
4574       size = dev.size
4575       info("adding new local storage on %s for disk/%d" %
4576            (new_node, idx))
4577       # since we *always* want to create this LV, we use the
4578       # _Create...OnPrimary (which forces the creation), even if we
4579       # are talking about the secondary node
4580       for new_lv in dev.children:
4581         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4582                                         _GetInstanceInfoText(instance)):
4583           raise errors.OpExecError("Failed to create new LV named '%s' on"
4584                                    " node '%s'" %
4585                                    (new_lv.logical_id[1], new_node))
4586
4587     # Step 4: dbrd minors and drbd setups changes
4588     # after this, we must manually remove the drbd minors on both the
4589     # error and the success paths
4590     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4591                                    instance.name)
4592     logging.debug("Allocated minors %s" % (minors,))
4593     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4594     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4595       size = dev.size
4596       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4597       # create new devices on new_node
4598       if pri_node == dev.logical_id[0]:
4599         new_logical_id = (pri_node, new_node,
4600                           dev.logical_id[2], dev.logical_id[3], new_minor,
4601                           dev.logical_id[5])
4602       else:
4603         new_logical_id = (new_node, pri_node,
4604                           dev.logical_id[2], new_minor, dev.logical_id[4],
4605                           dev.logical_id[5])
4606       iv_names[idx] = (dev, dev.children, new_logical_id)
4607       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4608                     new_logical_id)
4609       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4610                               logical_id=new_logical_id,
4611                               children=dev.children)
4612       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4613                                         new_drbd, False,
4614                                         _GetInstanceInfoText(instance)):
4615         self.cfg.ReleaseDRBDMinors(instance.name)
4616         raise errors.OpExecError("Failed to create new DRBD on"
4617                                  " node '%s'" % new_node)
4618
4619     for idx, dev in enumerate(instance.disks):
4620       # we have new devices, shutdown the drbd on the old secondary
4621       info("shutting down drbd for disk/%d on old node" % idx)
4622       cfg.SetDiskID(dev, old_node)
4623       result = self.rpc.call_blockdev_shutdown(old_node, dev)
4624       if result.failed or not result.data:
4625         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4626                 hint="Please cleanup this device manually as soon as possible")
4627
4628     info("detaching primary drbds from the network (=> standalone)")
4629     done = 0
4630     for idx, dev in enumerate(instance.disks):
4631       cfg.SetDiskID(dev, pri_node)
4632       # set the network part of the physical (unique in bdev terms) id
4633       # to None, meaning detach from network
4634       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4635       # and 'find' the device, which will 'fix' it to match the
4636       # standalone state
4637       result = self.rpc.call_blockdev_find(pri_node, dev)
4638       if not result.failed and result.data:
4639         done += 1
4640       else:
4641         warning("Failed to detach drbd disk/%d from network, unusual case" %
4642                 idx)
4643
4644     if not done:
4645       # no detaches succeeded (very unlikely)
4646       self.cfg.ReleaseDRBDMinors(instance.name)
4647       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4648
4649     # if we managed to detach at least one, we update all the disks of
4650     # the instance to point to the new secondary
4651     info("updating instance configuration")
4652     for dev, _, new_logical_id in iv_names.itervalues():
4653       dev.logical_id = new_logical_id
4654       cfg.SetDiskID(dev, pri_node)
4655     cfg.Update(instance)
4656     # we can remove now the temp minors as now the new values are
4657     # written to the config file (and therefore stable)
4658     self.cfg.ReleaseDRBDMinors(instance.name)
4659
4660     # and now perform the drbd attach
4661     info("attaching primary drbds to new secondary (standalone => connected)")
4662     failures = []
4663     for idx, dev in enumerate(instance.disks):
4664       info("attaching primary drbd for disk/%d to new secondary node" % idx)
4665       # since the attach is smart, it's enough to 'find' the device,
4666       # it will automatically activate the network, if the physical_id
4667       # is correct
4668       cfg.SetDiskID(dev, pri_node)
4669       logging.debug("Disk to attach: %s", dev)
4670       result = self.rpc.call_blockdev_find(pri_node, dev)
4671       if result.failed or not result.data:
4672         warning("can't attach drbd disk/%d to new secondary!" % idx,
4673                 "please do a gnt-instance info to see the status of disks")
4674
4675     # this can fail as the old devices are degraded and _WaitForSync
4676     # does a combined result over all disks, so we don't check its
4677     # return value
4678     self.proc.LogStep(5, steps_total, "sync devices")
4679     _WaitForSync(self, instance, unlock=True)
4680
4681     # so check manually all the devices
4682     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4683       cfg.SetDiskID(dev, pri_node)
4684       result = self.rpc.call_blockdev_find(pri_node, dev)
4685       result.Raise()
4686       if result.data[5]:
4687         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4688
4689     self.proc.LogStep(6, steps_total, "removing old storage")
4690     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4691       info("remove logical volumes for disk/%d" % idx)
4692       for lv in old_lvs:
4693         cfg.SetDiskID(lv, old_node)
4694         result = self.rpc.call_blockdev_remove(old_node, lv)
4695         if result.failed or not result.data:
4696           warning("Can't remove LV on old secondary",
4697                   hint="Cleanup stale volumes by hand")
4698
4699   def Exec(self, feedback_fn):
4700     """Execute disk replacement.
4701
4702     This dispatches the disk replacement to the appropriate handler.
4703
4704     """
4705     instance = self.instance
4706
4707     # Activate the instance disks if we're replacing them on a down instance
4708     if instance.status == "down":
4709       _StartInstanceDisks(self, instance, True)
4710
4711     if instance.disk_template == constants.DT_DRBD8:
4712       if self.op.remote_node is None:
4713         fn = self._ExecD8DiskOnly
4714       else:
4715         fn = self._ExecD8Secondary
4716     else:
4717       raise errors.ProgrammerError("Unhandled disk replacement case")
4718
4719     ret = fn(feedback_fn)
4720
4721     # Deactivate the instance disks if we're replacing them on a down instance
4722     if instance.status == "down":
4723       _SafeShutdownInstanceDisks(self, instance)
4724
4725     return ret
4726
4727
4728 class LUGrowDisk(LogicalUnit):
4729   """Grow a disk of an instance.
4730
4731   """
4732   HPATH = "disk-grow"
4733   HTYPE = constants.HTYPE_INSTANCE
4734   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4735   REQ_BGL = False
4736
4737   def ExpandNames(self):
4738     self._ExpandAndLockInstance()
4739     self.needed_locks[locking.LEVEL_NODE] = []
4740     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4741
4742   def DeclareLocks(self, level):
4743     if level == locking.LEVEL_NODE:
4744       self._LockInstancesNodes()
4745
4746   def BuildHooksEnv(self):
4747     """Build hooks env.
4748
4749     This runs on the master, the primary and all the secondaries.
4750
4751     """
4752     env = {
4753       "DISK": self.op.disk,
4754       "AMOUNT": self.op.amount,
4755       }
4756     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4757     nl = [
4758       self.cfg.GetMasterNode(),
4759       self.instance.primary_node,
4760       ]
4761     return env, nl, nl
4762
4763   def CheckPrereq(self):
4764     """Check prerequisites.
4765
4766     This checks that the instance is in the cluster.
4767
4768     """
4769     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4770     assert instance is not None, \
4771       "Cannot retrieve locked instance %s" % self.op.instance_name
4772
4773     self.instance = instance
4774
4775     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4776       raise errors.OpPrereqError("Instance's disk layout does not support"
4777                                  " growing.")
4778
4779     self.disk = instance.FindDisk(self.op.disk)
4780
4781     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4782     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4783                                        instance.hypervisor)
4784     for node in nodenames:
4785       info = nodeinfo[node]
4786       if info.failed or not info.data:
4787         raise errors.OpPrereqError("Cannot get current information"
4788                                    " from node '%s'" % node)
4789       vg_free = info.data.get('vg_free', None)
4790       if not isinstance(vg_free, int):
4791         raise errors.OpPrereqError("Can't compute free disk space on"
4792                                    " node %s" % node)
4793       if self.op.amount > vg_free:
4794         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4795                                    " %d MiB available, %d MiB required" %
4796                                    (node, vg_free, self.op.amount))
4797
4798   def Exec(self, feedback_fn):
4799     """Execute disk grow.
4800
4801     """
4802     instance = self.instance
4803     disk = self.disk
4804     for node in (instance.secondary_nodes + (instance.primary_node,)):
4805       self.cfg.SetDiskID(disk, node)
4806       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4807       result.Raise()
4808       if (not result.data or not isinstance(result.data, (list, tuple)) or
4809           len(result.data) != 2):
4810         raise errors.OpExecError("Grow request failed to node %s" % node)
4811       elif not result.data[0]:
4812         raise errors.OpExecError("Grow request failed to node %s: %s" %
4813                                  (node, result.data[1]))
4814     disk.RecordGrow(self.op.amount)
4815     self.cfg.Update(instance)
4816     if self.op.wait_for_sync:
4817       disk_abort = not _WaitForSync(self, instance)
4818       if disk_abort:
4819         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4820                              " status.\nPlease check the instance.")
4821
4822
4823 class LUQueryInstanceData(NoHooksLU):
4824   """Query runtime instance data.
4825
4826   """
4827   _OP_REQP = ["instances", "static"]
4828   REQ_BGL = False
4829
4830   def ExpandNames(self):
4831     self.needed_locks = {}
4832     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4833
4834     if not isinstance(self.op.instances, list):
4835       raise errors.OpPrereqError("Invalid argument type 'instances'")
4836
4837     if self.op.instances:
4838       self.wanted_names = []
4839       for name in self.op.instances:
4840         full_name = self.cfg.ExpandInstanceName(name)
4841         if full_name is None:
4842           raise errors.OpPrereqError("Instance '%s' not known" %
4843                                      self.op.instance_name)
4844         self.wanted_names.append(full_name)
4845       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4846     else:
4847       self.wanted_names = None
4848       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4849
4850     self.needed_locks[locking.LEVEL_NODE] = []
4851     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4852
4853   def DeclareLocks(self, level):
4854     if level == locking.LEVEL_NODE:
4855       self._LockInstancesNodes()
4856
4857   def CheckPrereq(self):
4858     """Check prerequisites.
4859
4860     This only checks the optional instance list against the existing names.
4861
4862     """
4863     if self.wanted_names is None:
4864       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4865
4866     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4867                              in self.wanted_names]
4868     return
4869
4870   def _ComputeDiskStatus(self, instance, snode, dev):
4871     """Compute block device status.
4872
4873     """
4874     static = self.op.static
4875     if not static:
4876       self.cfg.SetDiskID(dev, instance.primary_node)
4877       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4878       dev_pstatus.Raise()
4879       dev_pstatus = dev_pstatus.data
4880     else:
4881       dev_pstatus = None
4882
4883     if dev.dev_type in constants.LDS_DRBD:
4884       # we change the snode then (otherwise we use the one passed in)
4885       if dev.logical_id[0] == instance.primary_node:
4886         snode = dev.logical_id[1]
4887       else:
4888         snode = dev.logical_id[0]
4889
4890     if snode and not static:
4891       self.cfg.SetDiskID(dev, snode)
4892       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4893       dev_sstatus.Raise()
4894       dev_sstatus = dev_sstatus.data
4895     else:
4896       dev_sstatus = None
4897
4898     if dev.children:
4899       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4900                       for child in dev.children]
4901     else:
4902       dev_children = []
4903
4904     data = {
4905       "iv_name": dev.iv_name,
4906       "dev_type": dev.dev_type,
4907       "logical_id": dev.logical_id,
4908       "physical_id": dev.physical_id,
4909       "pstatus": dev_pstatus,
4910       "sstatus": dev_sstatus,
4911       "children": dev_children,
4912       "mode": dev.mode,
4913       }
4914
4915     return data
4916
4917   def Exec(self, feedback_fn):
4918     """Gather and return data"""
4919     result = {}
4920
4921     cluster = self.cfg.GetClusterInfo()
4922
4923     for instance in self.wanted_instances:
4924       if not self.op.static:
4925         remote_info = self.rpc.call_instance_info(instance.primary_node,
4926                                                   instance.name,
4927                                                   instance.hypervisor)
4928         remote_info.Raise()
4929         remote_info = remote_info.data
4930         if remote_info and "state" in remote_info:
4931           remote_state = "up"
4932         else:
4933           remote_state = "down"
4934       else:
4935         remote_state = None
4936       if instance.status == "down":
4937         config_state = "down"
4938       else:
4939         config_state = "up"
4940
4941       disks = [self._ComputeDiskStatus(instance, None, device)
4942                for device in instance.disks]
4943
4944       idict = {
4945         "name": instance.name,
4946         "config_state": config_state,
4947         "run_state": remote_state,
4948         "pnode": instance.primary_node,
4949         "snodes": instance.secondary_nodes,
4950         "os": instance.os,
4951         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4952         "disks": disks,
4953         "hypervisor": instance.hypervisor,
4954         "network_port": instance.network_port,
4955         "hv_instance": instance.hvparams,
4956         "hv_actual": cluster.FillHV(instance),
4957         "be_instance": instance.beparams,
4958         "be_actual": cluster.FillBE(instance),
4959         }
4960
4961       result[instance.name] = idict
4962
4963     return result
4964
4965
4966 class LUSetInstanceParams(LogicalUnit):
4967   """Modifies an instances's parameters.
4968
4969   """
4970   HPATH = "instance-modify"
4971   HTYPE = constants.HTYPE_INSTANCE
4972   _OP_REQP = ["instance_name"]
4973   REQ_BGL = False
4974
4975   def CheckArguments(self):
4976     if not hasattr(self.op, 'nics'):
4977       self.op.nics = []
4978     if not hasattr(self.op, 'disks'):
4979       self.op.disks = []
4980     if not hasattr(self.op, 'beparams'):
4981       self.op.beparams = {}
4982     if not hasattr(self.op, 'hvparams'):
4983       self.op.hvparams = {}
4984     self.op.force = getattr(self.op, "force", False)
4985     if not (self.op.nics or self.op.disks or
4986             self.op.hvparams or self.op.beparams):
4987       raise errors.OpPrereqError("No changes submitted")
4988
4989     utils.CheckBEParams(self.op.beparams)
4990
4991     # Disk validation
4992     disk_addremove = 0
4993     for disk_op, disk_dict in self.op.disks:
4994       if disk_op == constants.DDM_REMOVE:
4995         disk_addremove += 1
4996         continue
4997       elif disk_op == constants.DDM_ADD:
4998         disk_addremove += 1
4999       else:
5000         if not isinstance(disk_op, int):
5001           raise errors.OpPrereqError("Invalid disk index")
5002       if disk_op == constants.DDM_ADD:
5003         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5004         if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
5005           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5006         size = disk_dict.get('size', None)
5007         if size is None:
5008           raise errors.OpPrereqError("Required disk parameter size missing")
5009         try:
5010           size = int(size)
5011         except ValueError, err:
5012           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5013                                      str(err))
5014         disk_dict['size'] = size
5015       else:
5016         # modification of disk
5017         if 'size' in disk_dict:
5018           raise errors.OpPrereqError("Disk size change not possible, use"
5019                                      " grow-disk")
5020
5021     if disk_addremove > 1:
5022       raise errors.OpPrereqError("Only one disk add or remove operation"
5023                                  " supported at a time")
5024
5025     # NIC validation
5026     nic_addremove = 0
5027     for nic_op, nic_dict in self.op.nics:
5028       if nic_op == constants.DDM_REMOVE:
5029         nic_addremove += 1
5030         continue
5031       elif nic_op == constants.DDM_ADD:
5032         nic_addremove += 1
5033       else:
5034         if not isinstance(nic_op, int):
5035           raise errors.OpPrereqError("Invalid nic index")
5036
5037       # nic_dict should be a dict
5038       nic_ip = nic_dict.get('ip', None)
5039       if nic_ip is not None:
5040         if nic_ip.lower() == "none":
5041           nic_dict['ip'] = None
5042         else:
5043           if not utils.IsValidIP(nic_ip):
5044             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5045       # we can only check None bridges and assign the default one
5046       nic_bridge = nic_dict.get('bridge', None)
5047       if nic_bridge is None:
5048         nic_dict['bridge'] = self.cfg.GetDefBridge()
5049       # but we can validate MACs
5050       nic_mac = nic_dict.get('mac', None)
5051       if nic_mac is not None:
5052         if self.cfg.IsMacInUse(nic_mac):
5053           raise errors.OpPrereqError("MAC address %s already in use"
5054                                      " in cluster" % nic_mac)
5055         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5056           if not utils.IsValidMac(nic_mac):
5057             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5058     if nic_addremove > 1:
5059       raise errors.OpPrereqError("Only one NIC add or remove operation"
5060                                  " supported at a time")
5061
5062   def ExpandNames(self):
5063     self._ExpandAndLockInstance()
5064     self.needed_locks[locking.LEVEL_NODE] = []
5065     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5066
5067   def DeclareLocks(self, level):
5068     if level == locking.LEVEL_NODE:
5069       self._LockInstancesNodes()
5070
5071   def BuildHooksEnv(self):
5072     """Build hooks env.
5073
5074     This runs on the master, primary and secondaries.
5075
5076     """
5077     args = dict()
5078     if constants.BE_MEMORY in self.be_new:
5079       args['memory'] = self.be_new[constants.BE_MEMORY]
5080     if constants.BE_VCPUS in self.be_new:
5081       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5082     # FIXME: readd disk/nic changes
5083     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5084     nl = [self.cfg.GetMasterNode(),
5085           self.instance.primary_node] + list(self.instance.secondary_nodes)
5086     return env, nl, nl
5087
5088   def CheckPrereq(self):
5089     """Check prerequisites.
5090
5091     This only checks the instance list against the existing names.
5092
5093     """
5094     force = self.force = self.op.force
5095
5096     # checking the new params on the primary/secondary nodes
5097
5098     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5099     assert self.instance is not None, \
5100       "Cannot retrieve locked instance %s" % self.op.instance_name
5101     pnode = self.instance.primary_node
5102     nodelist = [pnode]
5103     nodelist.extend(instance.secondary_nodes)
5104
5105     # hvparams processing
5106     if self.op.hvparams:
5107       i_hvdict = copy.deepcopy(instance.hvparams)
5108       for key, val in self.op.hvparams.iteritems():
5109         if val == constants.VALUE_DEFAULT:
5110           try:
5111             del i_hvdict[key]
5112           except KeyError:
5113             pass
5114         elif val == constants.VALUE_NONE:
5115           i_hvdict[key] = None
5116         else:
5117           i_hvdict[key] = val
5118       cluster = self.cfg.GetClusterInfo()
5119       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5120                                 i_hvdict)
5121       # local check
5122       hypervisor.GetHypervisor(
5123         instance.hypervisor).CheckParameterSyntax(hv_new)
5124       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5125       self.hv_new = hv_new # the new actual values
5126       self.hv_inst = i_hvdict # the new dict (without defaults)
5127     else:
5128       self.hv_new = self.hv_inst = {}
5129
5130     # beparams processing
5131     if self.op.beparams:
5132       i_bedict = copy.deepcopy(instance.beparams)
5133       for key, val in self.op.beparams.iteritems():
5134         if val == constants.VALUE_DEFAULT:
5135           try:
5136             del i_bedict[key]
5137           except KeyError:
5138             pass
5139         else:
5140           i_bedict[key] = val
5141       cluster = self.cfg.GetClusterInfo()
5142       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5143                                 i_bedict)
5144       self.be_new = be_new # the new actual values
5145       self.be_inst = i_bedict # the new dict (without defaults)
5146     else:
5147       self.be_new = self.be_inst = {}
5148
5149     self.warn = []
5150
5151     if constants.BE_MEMORY in self.op.beparams and not self.force:
5152       mem_check_list = [pnode]
5153       if be_new[constants.BE_AUTO_BALANCE]:
5154         # either we changed auto_balance to yes or it was from before
5155         mem_check_list.extend(instance.secondary_nodes)
5156       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5157                                                   instance.hypervisor)
5158       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5159                                          instance.hypervisor)
5160       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5161         # Assume the primary node is unreachable and go ahead
5162         self.warn.append("Can't get info from primary node %s" % pnode)
5163       else:
5164         if not instance_info.failed and instance_info.data:
5165           current_mem = instance_info.data['memory']
5166         else:
5167           # Assume instance not running
5168           # (there is a slight race condition here, but it's not very probable,
5169           # and we have no other way to check)
5170           current_mem = 0
5171         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5172                     nodeinfo[pnode].data['memory_free'])
5173         if miss_mem > 0:
5174           raise errors.OpPrereqError("This change will prevent the instance"
5175                                      " from starting, due to %d MB of memory"
5176                                      " missing on its primary node" % miss_mem)
5177
5178       if be_new[constants.BE_AUTO_BALANCE]:
5179         for node, nres in instance.secondary_nodes.iteritems():
5180           if nres.failed or not isinstance(nres.data, dict):
5181             self.warn.append("Can't get info from secondary node %s" % node)
5182           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5183             self.warn.append("Not enough memory to failover instance to"
5184                              " secondary node %s" % node)
5185
5186     # NIC processing
5187     for nic_op, nic_dict in self.op.nics:
5188       if nic_op == constants.DDM_REMOVE:
5189         if not instance.nics:
5190           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5191         continue
5192       if nic_op != constants.DDM_ADD:
5193         # an existing nic
5194         if nic_op < 0 or nic_op >= len(instance.nics):
5195           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5196                                      " are 0 to %d" %
5197                                      (nic_op, len(instance.nics)))
5198       nic_bridge = nic_dict.get('bridge', None)
5199       if nic_bridge is not None:
5200         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5201           msg = ("Bridge '%s' doesn't exist on one of"
5202                  " the instance nodes" % nic_bridge)
5203           if self.force:
5204             self.warn.append(msg)
5205           else:
5206             raise errors.OpPrereqError(msg)
5207
5208     # DISK processing
5209     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5210       raise errors.OpPrereqError("Disk operations not supported for"
5211                                  " diskless instances")
5212     for disk_op, disk_dict in self.op.disks:
5213       if disk_op == constants.DDM_REMOVE:
5214         if len(instance.disks) == 1:
5215           raise errors.OpPrereqError("Cannot remove the last disk of"
5216                                      " an instance")
5217         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5218         ins_l = ins_l[pnode]
5219         if not type(ins_l) is list:
5220           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5221         if instance.name in ins_l:
5222           raise errors.OpPrereqError("Instance is running, can't remove"
5223                                      " disks.")
5224
5225       if (disk_op == constants.DDM_ADD and
5226           len(instance.nics) >= constants.MAX_DISKS):
5227         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5228                                    " add more" % constants.MAX_DISKS)
5229       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5230         # an existing disk
5231         if disk_op < 0 or disk_op >= len(instance.disks):
5232           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5233                                      " are 0 to %d" %
5234                                      (disk_op, len(instance.disks)))
5235
5236     return
5237
5238   def Exec(self, feedback_fn):
5239     """Modifies an instance.
5240
5241     All parameters take effect only at the next restart of the instance.
5242
5243     """
5244     # Process here the warnings from CheckPrereq, as we don't have a
5245     # feedback_fn there.
5246     for warn in self.warn:
5247       feedback_fn("WARNING: %s" % warn)
5248
5249     result = []
5250     instance = self.instance
5251     # disk changes
5252     for disk_op, disk_dict in self.op.disks:
5253       if disk_op == constants.DDM_REMOVE:
5254         # remove the last disk
5255         device = instance.disks.pop()
5256         device_idx = len(instance.disks)
5257         for node, disk in device.ComputeNodeTree(instance.primary_node):
5258           self.cfg.SetDiskID(disk, node)
5259           result = self.rpc.call_blockdev_remove(node, disk)
5260           if result.failed or not result.data:
5261             self.proc.LogWarning("Could not remove disk/%d on node %s,"
5262                                  " continuing anyway", device_idx, node)
5263         result.append(("disk/%d" % device_idx, "remove"))
5264       elif disk_op == constants.DDM_ADD:
5265         # add a new disk
5266         if instance.disk_template == constants.DT_FILE:
5267           file_driver, file_path = instance.disks[0].logical_id
5268           file_path = os.path.dirname(file_path)
5269         else:
5270           file_driver = file_path = None
5271         disk_idx_base = len(instance.disks)
5272         new_disk = _GenerateDiskTemplate(self,
5273                                          instance.disk_template,
5274                                          instance, instance.primary_node,
5275                                          instance.secondary_nodes,
5276                                          [disk_dict],
5277                                          file_path,
5278                                          file_driver,
5279                                          disk_idx_base)[0]
5280         new_disk.mode = disk_dict['mode']
5281         instance.disks.append(new_disk)
5282         info = _GetInstanceInfoText(instance)
5283
5284         logging.info("Creating volume %s for instance %s",
5285                      new_disk.iv_name, instance.name)
5286         # Note: this needs to be kept in sync with _CreateDisks
5287         #HARDCODE
5288         for secondary_node in instance.secondary_nodes:
5289           if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5290                                             new_disk, False, info):
5291             self.LogWarning("Failed to create volume %s (%s) on"
5292                             " secondary node %s!",
5293                             new_disk.iv_name, new_disk, secondary_node)
5294         #HARDCODE
5295         if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5296                                         instance, new_disk, info):
5297           self.LogWarning("Failed to create volume %s on primary!",
5298                           new_disk.iv_name)
5299         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5300                        (new_disk.size, new_disk.mode)))
5301       else:
5302         # change a given disk
5303         instance.disks[disk_op].mode = disk_dict['mode']
5304         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5305     # NIC changes
5306     for nic_op, nic_dict in self.op.nics:
5307       if nic_op == constants.DDM_REMOVE:
5308         # remove the last nic
5309         del instance.nics[-1]
5310         result.append(("nic.%d" % len(instance.nics), "remove"))
5311       elif nic_op == constants.DDM_ADD:
5312         # add a new nic
5313         if 'mac' not in nic_dict:
5314           mac = constants.VALUE_GENERATE
5315         else:
5316           mac = nic_dict['mac']
5317         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5318           mac = self.cfg.GenerateMAC()
5319         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5320                               bridge=nic_dict.get('bridge', None))
5321         instance.nics.append(new_nic)
5322         result.append(("nic.%d" % (len(instance.nics) - 1),
5323                        "add:mac=%s,ip=%s,bridge=%s" %
5324                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5325       else:
5326         # change a given nic
5327         for key in 'mac', 'ip', 'bridge':
5328           if key in nic_dict:
5329             setattr(instance.nics[nic_op], key, nic_dict[key])
5330             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5331
5332     # hvparams changes
5333     if self.op.hvparams:
5334       instance.hvparams = self.hv_new
5335       for key, val in self.op.hvparams.iteritems():
5336         result.append(("hv/%s" % key, val))
5337
5338     # beparams changes
5339     if self.op.beparams:
5340       instance.beparams = self.be_inst
5341       for key, val in self.op.beparams.iteritems():
5342         result.append(("be/%s" % key, val))
5343
5344     self.cfg.Update(instance)
5345
5346     return result
5347
5348
5349 class LUQueryExports(NoHooksLU):
5350   """Query the exports list
5351
5352   """
5353   _OP_REQP = ['nodes']
5354   REQ_BGL = False
5355
5356   def ExpandNames(self):
5357     self.needed_locks = {}
5358     self.share_locks[locking.LEVEL_NODE] = 1
5359     if not self.op.nodes:
5360       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5361     else:
5362       self.needed_locks[locking.LEVEL_NODE] = \
5363         _GetWantedNodes(self, self.op.nodes)
5364
5365   def CheckPrereq(self):
5366     """Check prerequisites.
5367
5368     """
5369     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5370
5371   def Exec(self, feedback_fn):
5372     """Compute the list of all the exported system images.
5373
5374     @rtype: dict
5375     @return: a dictionary with the structure node->(export-list)
5376         where export-list is a list of the instances exported on
5377         that node.
5378
5379     """
5380     rpcresult = self.rpc.call_export_list(self.nodes)
5381     result = {}
5382     for node in rpcresult:
5383       if rpcresult[node].failed:
5384         result[node] = False
5385       else:
5386         result[node] = rpcresult[node].data
5387
5388     return result
5389
5390
5391 class LUExportInstance(LogicalUnit):
5392   """Export an instance to an image in the cluster.
5393
5394   """
5395   HPATH = "instance-export"
5396   HTYPE = constants.HTYPE_INSTANCE
5397   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5398   REQ_BGL = False
5399
5400   def ExpandNames(self):
5401     self._ExpandAndLockInstance()
5402     # FIXME: lock only instance primary and destination node
5403     #
5404     # Sad but true, for now we have do lock all nodes, as we don't know where
5405     # the previous export might be, and and in this LU we search for it and
5406     # remove it from its current node. In the future we could fix this by:
5407     #  - making a tasklet to search (share-lock all), then create the new one,
5408     #    then one to remove, after
5409     #  - removing the removal operation altoghether
5410     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5411
5412   def DeclareLocks(self, level):
5413     """Last minute lock declaration."""
5414     # All nodes are locked anyway, so nothing to do here.
5415
5416   def BuildHooksEnv(self):
5417     """Build hooks env.
5418
5419     This will run on the master, primary node and target node.
5420
5421     """
5422     env = {
5423       "EXPORT_NODE": self.op.target_node,
5424       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5425       }
5426     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5427     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5428           self.op.target_node]
5429     return env, nl, nl
5430
5431   def CheckPrereq(self):
5432     """Check prerequisites.
5433
5434     This checks that the instance and node names are valid.
5435
5436     """
5437     instance_name = self.op.instance_name
5438     self.instance = self.cfg.GetInstanceInfo(instance_name)
5439     assert self.instance is not None, \
5440           "Cannot retrieve locked instance %s" % self.op.instance_name
5441
5442     self.dst_node = self.cfg.GetNodeInfo(
5443       self.cfg.ExpandNodeName(self.op.target_node))
5444
5445     if self.dst_node is None:
5446       # This is wrong node name, not a non-locked node
5447       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5448
5449     # instance disk type verification
5450     for disk in self.instance.disks:
5451       if disk.dev_type == constants.LD_FILE:
5452         raise errors.OpPrereqError("Export not supported for instances with"
5453                                    " file-based disks")
5454
5455   def Exec(self, feedback_fn):
5456     """Export an instance to an image in the cluster.
5457
5458     """
5459     instance = self.instance
5460     dst_node = self.dst_node
5461     src_node = instance.primary_node
5462     if self.op.shutdown:
5463       # shutdown the instance, but not the disks
5464       result = self.rpc.call_instance_shutdown(src_node, instance)
5465       result.Raise()
5466       if not result.data:
5467         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5468                                  (instance.name, src_node))
5469
5470     vgname = self.cfg.GetVGName()
5471
5472     snap_disks = []
5473
5474     try:
5475       for disk in instance.disks:
5476         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5477         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5478         if new_dev_name.failed or not new_dev_name.data:
5479           self.LogWarning("Could not snapshot block device %s on node %s",
5480                           disk.logical_id[1], src_node)
5481           snap_disks.append(False)
5482         else:
5483           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5484                                  logical_id=(vgname, new_dev_name.data),
5485                                  physical_id=(vgname, new_dev_name.data),
5486                                  iv_name=disk.iv_name)
5487           snap_disks.append(new_dev)
5488
5489     finally:
5490       if self.op.shutdown and instance.status == "up":
5491         result = self.rpc.call_instance_start(src_node, instance, None)
5492         if result.failed or not result.data:
5493           _ShutdownInstanceDisks(self, instance)
5494           raise errors.OpExecError("Could not start instance")
5495
5496     # TODO: check for size
5497
5498     cluster_name = self.cfg.GetClusterName()
5499     for idx, dev in enumerate(snap_disks):
5500       if dev:
5501         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5502                                                instance, cluster_name, idx)
5503         if result.failed or not result.data:
5504           self.LogWarning("Could not export block device %s from node %s to"
5505                           " node %s", dev.logical_id[1], src_node,
5506                           dst_node.name)
5507         result = self.rpc.call_blockdev_remove(src_node, dev)
5508         if result.failed or not result.data:
5509           self.LogWarning("Could not remove snapshot block device %s from node"
5510                           " %s", dev.logical_id[1], src_node)
5511
5512     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5513     if result.failed or not result.data:
5514       self.LogWarning("Could not finalize export for instance %s on node %s",
5515                       instance.name, dst_node.name)
5516
5517     nodelist = self.cfg.GetNodeList()
5518     nodelist.remove(dst_node.name)
5519
5520     # on one-node clusters nodelist will be empty after the removal
5521     # if we proceed the backup would be removed because OpQueryExports
5522     # substitutes an empty list with the full cluster node list.
5523     if nodelist:
5524       exportlist = self.rpc.call_export_list(nodelist)
5525       for node in exportlist:
5526         if exportlist[node].failed:
5527           continue
5528         if instance.name in exportlist[node].data:
5529           if not self.rpc.call_export_remove(node, instance.name):
5530             self.LogWarning("Could not remove older export for instance %s"
5531                             " on node %s", instance.name, node)
5532
5533
5534 class LURemoveExport(NoHooksLU):
5535   """Remove exports related to the named instance.
5536
5537   """
5538   _OP_REQP = ["instance_name"]
5539   REQ_BGL = False
5540
5541   def ExpandNames(self):
5542     self.needed_locks = {}
5543     # We need all nodes to be locked in order for RemoveExport to work, but we
5544     # don't need to lock the instance itself, as nothing will happen to it (and
5545     # we can remove exports also for a removed instance)
5546     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5547
5548   def CheckPrereq(self):
5549     """Check prerequisites.
5550     """
5551     pass
5552
5553   def Exec(self, feedback_fn):
5554     """Remove any export.
5555
5556     """
5557     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5558     # If the instance was not found we'll try with the name that was passed in.
5559     # This will only work if it was an FQDN, though.
5560     fqdn_warn = False
5561     if not instance_name:
5562       fqdn_warn = True
5563       instance_name = self.op.instance_name
5564
5565     exportlist = self.rpc.call_export_list(self.acquired_locks[
5566       locking.LEVEL_NODE])
5567     found = False
5568     for node in exportlist:
5569       if exportlist[node].failed:
5570         self.LogWarning("Failed to query node %s, continuing" % node)
5571         continue
5572       if instance_name in exportlist[node].data:
5573         found = True
5574         result = self.rpc.call_export_remove(node, instance_name)
5575         if result.failed or not result.data:
5576           logging.error("Could not remove export for instance %s"
5577                         " on node %s", instance_name, node)
5578
5579     if fqdn_warn and not found:
5580       feedback_fn("Export not found. If trying to remove an export belonging"
5581                   " to a deleted instance please use its Fully Qualified"
5582                   " Domain Name.")
5583
5584
5585 class TagsLU(NoHooksLU):
5586   """Generic tags LU.
5587
5588   This is an abstract class which is the parent of all the other tags LUs.
5589
5590   """
5591
5592   def ExpandNames(self):
5593     self.needed_locks = {}
5594     if self.op.kind == constants.TAG_NODE:
5595       name = self.cfg.ExpandNodeName(self.op.name)
5596       if name is None:
5597         raise errors.OpPrereqError("Invalid node name (%s)" %
5598                                    (self.op.name,))
5599       self.op.name = name
5600       self.needed_locks[locking.LEVEL_NODE] = name
5601     elif self.op.kind == constants.TAG_INSTANCE:
5602       name = self.cfg.ExpandInstanceName(self.op.name)
5603       if name is None:
5604         raise errors.OpPrereqError("Invalid instance name (%s)" %
5605                                    (self.op.name,))
5606       self.op.name = name
5607       self.needed_locks[locking.LEVEL_INSTANCE] = name
5608
5609   def CheckPrereq(self):
5610     """Check prerequisites.
5611
5612     """
5613     if self.op.kind == constants.TAG_CLUSTER:
5614       self.target = self.cfg.GetClusterInfo()
5615     elif self.op.kind == constants.TAG_NODE:
5616       self.target = self.cfg.GetNodeInfo(self.op.name)
5617     elif self.op.kind == constants.TAG_INSTANCE:
5618       self.target = self.cfg.GetInstanceInfo(self.op.name)
5619     else:
5620       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5621                                  str(self.op.kind))
5622
5623
5624 class LUGetTags(TagsLU):
5625   """Returns the tags of a given object.
5626
5627   """
5628   _OP_REQP = ["kind", "name"]
5629   REQ_BGL = False
5630
5631   def Exec(self, feedback_fn):
5632     """Returns the tag list.
5633
5634     """
5635     return list(self.target.GetTags())
5636
5637
5638 class LUSearchTags(NoHooksLU):
5639   """Searches the tags for a given pattern.
5640
5641   """
5642   _OP_REQP = ["pattern"]
5643   REQ_BGL = False
5644
5645   def ExpandNames(self):
5646     self.needed_locks = {}
5647
5648   def CheckPrereq(self):
5649     """Check prerequisites.
5650
5651     This checks the pattern passed for validity by compiling it.
5652
5653     """
5654     try:
5655       self.re = re.compile(self.op.pattern)
5656     except re.error, err:
5657       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5658                                  (self.op.pattern, err))
5659
5660   def Exec(self, feedback_fn):
5661     """Returns the tag list.
5662
5663     """
5664     cfg = self.cfg
5665     tgts = [("/cluster", cfg.GetClusterInfo())]
5666     ilist = cfg.GetAllInstancesInfo().values()
5667     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5668     nlist = cfg.GetAllNodesInfo().values()
5669     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5670     results = []
5671     for path, target in tgts:
5672       for tag in target.GetTags():
5673         if self.re.search(tag):
5674           results.append((path, tag))
5675     return results
5676
5677
5678 class LUAddTags(TagsLU):
5679   """Sets a tag on a given object.
5680
5681   """
5682   _OP_REQP = ["kind", "name", "tags"]
5683   REQ_BGL = False
5684
5685   def CheckPrereq(self):
5686     """Check prerequisites.
5687
5688     This checks the type and length of the tag name and value.
5689
5690     """
5691     TagsLU.CheckPrereq(self)
5692     for tag in self.op.tags:
5693       objects.TaggableObject.ValidateTag(tag)
5694
5695   def Exec(self, feedback_fn):
5696     """Sets the tag.
5697
5698     """
5699     try:
5700       for tag in self.op.tags:
5701         self.target.AddTag(tag)
5702     except errors.TagError, err:
5703       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5704     try:
5705       self.cfg.Update(self.target)
5706     except errors.ConfigurationError:
5707       raise errors.OpRetryError("There has been a modification to the"
5708                                 " config file and the operation has been"
5709                                 " aborted. Please retry.")
5710
5711
5712 class LUDelTags(TagsLU):
5713   """Delete a list of tags from a given object.
5714
5715   """
5716   _OP_REQP = ["kind", "name", "tags"]
5717   REQ_BGL = False
5718
5719   def CheckPrereq(self):
5720     """Check prerequisites.
5721
5722     This checks that we have the given tag.
5723
5724     """
5725     TagsLU.CheckPrereq(self)
5726     for tag in self.op.tags:
5727       objects.TaggableObject.ValidateTag(tag)
5728     del_tags = frozenset(self.op.tags)
5729     cur_tags = self.target.GetTags()
5730     if not del_tags <= cur_tags:
5731       diff_tags = del_tags - cur_tags
5732       diff_names = ["'%s'" % tag for tag in diff_tags]
5733       diff_names.sort()
5734       raise errors.OpPrereqError("Tag(s) %s not found" %
5735                                  (",".join(diff_names)))
5736
5737   def Exec(self, feedback_fn):
5738     """Remove the tag from the object.
5739
5740     """
5741     for tag in self.op.tags:
5742       self.target.RemoveTag(tag)
5743     try:
5744       self.cfg.Update(self.target)
5745     except errors.ConfigurationError:
5746       raise errors.OpRetryError("There has been a modification to the"
5747                                 " config file and the operation has been"
5748                                 " aborted. Please retry.")
5749
5750
5751 class LUTestDelay(NoHooksLU):
5752   """Sleep for a specified amount of time.
5753
5754   This LU sleeps on the master and/or nodes for a specified amount of
5755   time.
5756
5757   """
5758   _OP_REQP = ["duration", "on_master", "on_nodes"]
5759   REQ_BGL = False
5760
5761   def ExpandNames(self):
5762     """Expand names and set required locks.
5763
5764     This expands the node list, if any.
5765
5766     """
5767     self.needed_locks = {}
5768     if self.op.on_nodes:
5769       # _GetWantedNodes can be used here, but is not always appropriate to use
5770       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5771       # more information.
5772       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5773       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5774
5775   def CheckPrereq(self):
5776     """Check prerequisites.
5777
5778     """
5779
5780   def Exec(self, feedback_fn):
5781     """Do the actual sleep.
5782
5783     """
5784     if self.op.on_master:
5785       if not utils.TestDelay(self.op.duration):
5786         raise errors.OpExecError("Error during master delay test")
5787     if self.op.on_nodes:
5788       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5789       if not result:
5790         raise errors.OpExecError("Complete failure from rpc call")
5791       for node, node_result in result.items():
5792         node_result.Raise()
5793         if not node_result.data:
5794           raise errors.OpExecError("Failure during rpc call to node %s,"
5795                                    " result: %s" % (node, node_result.data))
5796
5797
5798 class IAllocator(object):
5799   """IAllocator framework.
5800
5801   An IAllocator instance has three sets of attributes:
5802     - cfg that is needed to query the cluster
5803     - input data (all members of the _KEYS class attribute are required)
5804     - four buffer attributes (in|out_data|text), that represent the
5805       input (to the external script) in text and data structure format,
5806       and the output from it, again in two formats
5807     - the result variables from the script (success, info, nodes) for
5808       easy usage
5809
5810   """
5811   _ALLO_KEYS = [
5812     "mem_size", "disks", "disk_template",
5813     "os", "tags", "nics", "vcpus", "hypervisor",
5814     ]
5815   _RELO_KEYS = [
5816     "relocate_from",
5817     ]
5818
5819   def __init__(self, lu, mode, name, **kwargs):
5820     self.lu = lu
5821     # init buffer variables
5822     self.in_text = self.out_text = self.in_data = self.out_data = None
5823     # init all input fields so that pylint is happy
5824     self.mode = mode
5825     self.name = name
5826     self.mem_size = self.disks = self.disk_template = None
5827     self.os = self.tags = self.nics = self.vcpus = None
5828     self.relocate_from = None
5829     # computed fields
5830     self.required_nodes = None
5831     # init result fields
5832     self.success = self.info = self.nodes = None
5833     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5834       keyset = self._ALLO_KEYS
5835     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5836       keyset = self._RELO_KEYS
5837     else:
5838       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5839                                    " IAllocator" % self.mode)
5840     for key in kwargs:
5841       if key not in keyset:
5842         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5843                                      " IAllocator" % key)
5844       setattr(self, key, kwargs[key])
5845     for key in keyset:
5846       if key not in kwargs:
5847         raise errors.ProgrammerError("Missing input parameter '%s' to"
5848                                      " IAllocator" % key)
5849     self._BuildInputData()
5850
5851   def _ComputeClusterData(self):
5852     """Compute the generic allocator input data.
5853
5854     This is the data that is independent of the actual operation.
5855
5856     """
5857     cfg = self.lu.cfg
5858     cluster_info = cfg.GetClusterInfo()
5859     # cluster data
5860     data = {
5861       "version": 1,
5862       "cluster_name": cfg.GetClusterName(),
5863       "cluster_tags": list(cluster_info.GetTags()),
5864       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5865       # we don't have job IDs
5866       }
5867     iinfo = cfg.GetAllInstancesInfo().values()
5868     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5869
5870     # node data
5871     node_results = {}
5872     node_list = cfg.GetNodeList()
5873
5874     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5875       hypervisor = self.hypervisor
5876     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5877       hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5878
5879     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5880                                            hypervisor)
5881     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5882                        cluster_info.enabled_hypervisors)
5883     for nname in node_list:
5884       ninfo = cfg.GetNodeInfo(nname)
5885       node_data[nname].Raise()
5886       if not isinstance(node_data[nname].data, dict):
5887         raise errors.OpExecError("Can't get data for node %s" % nname)
5888       remote_info = node_data[nname].data
5889       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5890                    'vg_size', 'vg_free', 'cpu_total']:
5891         if attr not in remote_info:
5892           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5893                                    (nname, attr))
5894         try:
5895           remote_info[attr] = int(remote_info[attr])
5896         except ValueError, err:
5897           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5898                                    " %s" % (nname, attr, str(err)))
5899       # compute memory used by primary instances
5900       i_p_mem = i_p_up_mem = 0
5901       for iinfo, beinfo in i_list:
5902         if iinfo.primary_node == nname:
5903           i_p_mem += beinfo[constants.BE_MEMORY]
5904           if iinfo.name not in node_iinfo[nname]:
5905             i_used_mem = 0
5906           else:
5907             i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5908           i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5909           remote_info['memory_free'] -= max(0, i_mem_diff)
5910
5911           if iinfo.status == "up":
5912             i_p_up_mem += beinfo[constants.BE_MEMORY]
5913
5914       # compute memory used by instances
5915       pnr = {
5916         "tags": list(ninfo.GetTags()),
5917         "total_memory": remote_info['memory_total'],
5918         "reserved_memory": remote_info['memory_dom0'],
5919         "free_memory": remote_info['memory_free'],
5920         "i_pri_memory": i_p_mem,
5921         "i_pri_up_memory": i_p_up_mem,
5922         "total_disk": remote_info['vg_size'],
5923         "free_disk": remote_info['vg_free'],
5924         "primary_ip": ninfo.primary_ip,
5925         "secondary_ip": ninfo.secondary_ip,
5926         "total_cpus": remote_info['cpu_total'],
5927         "offline": ninfo.offline,
5928         }
5929       node_results[nname] = pnr
5930     data["nodes"] = node_results
5931
5932     # instance data
5933     instance_data = {}
5934     for iinfo, beinfo in i_list:
5935       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5936                   for n in iinfo.nics]
5937       pir = {
5938         "tags": list(iinfo.GetTags()),
5939         "should_run": iinfo.status == "up",
5940         "vcpus": beinfo[constants.BE_VCPUS],
5941         "memory": beinfo[constants.BE_MEMORY],
5942         "os": iinfo.os,
5943         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5944         "nics": nic_data,
5945         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5946         "disk_template": iinfo.disk_template,
5947         "hypervisor": iinfo.hypervisor,
5948         }
5949       instance_data[iinfo.name] = pir
5950
5951     data["instances"] = instance_data
5952
5953     self.in_data = data
5954
5955   def _AddNewInstance(self):
5956     """Add new instance data to allocator structure.
5957
5958     This in combination with _AllocatorGetClusterData will create the
5959     correct structure needed as input for the allocator.
5960
5961     The checks for the completeness of the opcode must have already been
5962     done.
5963
5964     """
5965     data = self.in_data
5966     if len(self.disks) != 2:
5967       raise errors.OpExecError("Only two-disk configurations supported")
5968
5969     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5970
5971     if self.disk_template in constants.DTS_NET_MIRROR:
5972       self.required_nodes = 2
5973     else:
5974       self.required_nodes = 1
5975     request = {
5976       "type": "allocate",
5977       "name": self.name,
5978       "disk_template": self.disk_template,
5979       "tags": self.tags,
5980       "os": self.os,
5981       "vcpus": self.vcpus,
5982       "memory": self.mem_size,
5983       "disks": self.disks,
5984       "disk_space_total": disk_space,
5985       "nics": self.nics,
5986       "required_nodes": self.required_nodes,
5987       }
5988     data["request"] = request
5989
5990   def _AddRelocateInstance(self):
5991     """Add relocate instance data to allocator structure.
5992
5993     This in combination with _IAllocatorGetClusterData will create the
5994     correct structure needed as input for the allocator.
5995
5996     The checks for the completeness of the opcode must have already been
5997     done.
5998
5999     """
6000     instance = self.lu.cfg.GetInstanceInfo(self.name)
6001     if instance is None:
6002       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6003                                    " IAllocator" % self.name)
6004
6005     if instance.disk_template not in constants.DTS_NET_MIRROR:
6006       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6007
6008     if len(instance.secondary_nodes) != 1:
6009       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6010
6011     self.required_nodes = 1
6012     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6013     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6014
6015     request = {
6016       "type": "relocate",
6017       "name": self.name,
6018       "disk_space_total": disk_space,
6019       "required_nodes": self.required_nodes,
6020       "relocate_from": self.relocate_from,
6021       }
6022     self.in_data["request"] = request
6023
6024   def _BuildInputData(self):
6025     """Build input data structures.
6026
6027     """
6028     self._ComputeClusterData()
6029
6030     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6031       self._AddNewInstance()
6032     else:
6033       self._AddRelocateInstance()
6034
6035     self.in_text = serializer.Dump(self.in_data)
6036
6037   def Run(self, name, validate=True, call_fn=None):
6038     """Run an instance allocator and return the results.
6039
6040     """
6041     if call_fn is None:
6042       call_fn = self.lu.rpc.call_iallocator_runner
6043     data = self.in_text
6044
6045     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6046     result.Raise()
6047
6048     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6049       raise errors.OpExecError("Invalid result from master iallocator runner")
6050
6051     rcode, stdout, stderr, fail = result.data
6052
6053     if rcode == constants.IARUN_NOTFOUND:
6054       raise errors.OpExecError("Can't find allocator '%s'" % name)
6055     elif rcode == constants.IARUN_FAILURE:
6056       raise errors.OpExecError("Instance allocator call failed: %s,"
6057                                " output: %s" % (fail, stdout+stderr))
6058     self.out_text = stdout
6059     if validate:
6060       self._ValidateResult()
6061
6062   def _ValidateResult(self):
6063     """Process the allocator results.
6064
6065     This will process and if successful save the result in
6066     self.out_data and the other parameters.
6067
6068     """
6069     try:
6070       rdict = serializer.Load(self.out_text)
6071     except Exception, err:
6072       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6073
6074     if not isinstance(rdict, dict):
6075       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6076
6077     for key in "success", "info", "nodes":
6078       if key not in rdict:
6079         raise errors.OpExecError("Can't parse iallocator results:"
6080                                  " missing key '%s'" % key)
6081       setattr(self, key, rdict[key])
6082
6083     if not isinstance(rdict["nodes"], list):
6084       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6085                                " is not a list")
6086     self.out_data = rdict
6087
6088
6089 class LUTestAllocator(NoHooksLU):
6090   """Run allocator tests.
6091
6092   This LU runs the allocator tests
6093
6094   """
6095   _OP_REQP = ["direction", "mode", "name"]
6096
6097   def CheckPrereq(self):
6098     """Check prerequisites.
6099
6100     This checks the opcode parameters depending on the director and mode test.
6101
6102     """
6103     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6104       for attr in ["name", "mem_size", "disks", "disk_template",
6105                    "os", "tags", "nics", "vcpus"]:
6106         if not hasattr(self.op, attr):
6107           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6108                                      attr)
6109       iname = self.cfg.ExpandInstanceName(self.op.name)
6110       if iname is not None:
6111         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6112                                    iname)
6113       if not isinstance(self.op.nics, list):
6114         raise errors.OpPrereqError("Invalid parameter 'nics'")
6115       for row in self.op.nics:
6116         if (not isinstance(row, dict) or
6117             "mac" not in row or
6118             "ip" not in row or
6119             "bridge" not in row):
6120           raise errors.OpPrereqError("Invalid contents of the"
6121                                      " 'nics' parameter")
6122       if not isinstance(self.op.disks, list):
6123         raise errors.OpPrereqError("Invalid parameter 'disks'")
6124       if len(self.op.disks) != 2:
6125         raise errors.OpPrereqError("Only two-disk configurations supported")
6126       for row in self.op.disks:
6127         if (not isinstance(row, dict) or
6128             "size" not in row or
6129             not isinstance(row["size"], int) or
6130             "mode" not in row or
6131             row["mode"] not in ['r', 'w']):
6132           raise errors.OpPrereqError("Invalid contents of the"
6133                                      " 'disks' parameter")
6134       if self.op.hypervisor is None:
6135         self.op.hypervisor = self.cfg.GetHypervisorType()
6136     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6137       if not hasattr(self.op, "name"):
6138         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6139       fname = self.cfg.ExpandInstanceName(self.op.name)
6140       if fname is None:
6141         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6142                                    self.op.name)
6143       self.op.name = fname
6144       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6145     else:
6146       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6147                                  self.op.mode)
6148
6149     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6150       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6151         raise errors.OpPrereqError("Missing allocator name")
6152     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6153       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6154                                  self.op.direction)
6155
6156   def Exec(self, feedback_fn):
6157     """Run the allocator test.
6158
6159     """
6160     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6161       ial = IAllocator(self,
6162                        mode=self.op.mode,
6163                        name=self.op.name,
6164                        mem_size=self.op.mem_size,
6165                        disks=self.op.disks,
6166                        disk_template=self.op.disk_template,
6167                        os=self.op.os,
6168                        tags=self.op.tags,
6169                        nics=self.op.nics,
6170                        vcpus=self.op.vcpus,
6171                        hypervisor=self.op.hypervisor,
6172                        )
6173     else:
6174       ial = IAllocator(self,
6175                        mode=self.op.mode,
6176                        name=self.op.name,
6177                        relocate_from=list(self.relocate_from),
6178                        )
6179
6180     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6181       result = ial.in_text
6182     else:
6183       ial.Run(self.op.allocator, validate=False)
6184       result = ial.out_text
6185     return result