Cleanup the config file on demotion from candidate
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = lu.cfg.GetInstanceList()
396   return utils.NiceSort(wanted)
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
419                           memory, vcpus, nics):
420   """Builds instance related env variables for hooks
421
422   This builds the hook environment from individual variables.
423
424   @type name: string
425   @param name: the name of the instance
426   @type primary_node: string
427   @param primary_node: the name of the instance's primary node
428   @type secondary_nodes: list
429   @param secondary_nodes: list of secondary nodes as strings
430   @type os_type: string
431   @param os_type: the name of the instance's OS
432   @type status: string
433   @param status: the desired status of the instances
434   @type memory: string
435   @param memory: the memory size of the instance
436   @type vcpus: string
437   @param vcpus: the count of VCPUs the instance has
438   @type nics: list
439   @param nics: list of tuples (ip, bridge, mac) representing
440       the NICs the instance  has
441   @rtype: dict
442   @return: the hook environment for this instance
443
444   """
445   env = {
446     "OP_TARGET": name,
447     "INSTANCE_NAME": name,
448     "INSTANCE_PRIMARY": primary_node,
449     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
450     "INSTANCE_OS_TYPE": os_type,
451     "INSTANCE_STATUS": status,
452     "INSTANCE_MEMORY": memory,
453     "INSTANCE_VCPUS": vcpus,
454   }
455
456   if nics:
457     nic_count = len(nics)
458     for idx, (ip, bridge, mac) in enumerate(nics):
459       if ip is None:
460         ip = ""
461       env["INSTANCE_NIC%d_IP" % idx] = ip
462       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
463       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
464   else:
465     nic_count = 0
466
467   env["INSTANCE_NIC_COUNT"] = nic_count
468
469   return env
470
471
472 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
473   """Builds instance related env variables for hooks from an object.
474
475   @type lu: L{LogicalUnit}
476   @param lu: the logical unit on whose behalf we execute
477   @type instance: L{objects.Instance}
478   @param instance: the instance for which we should build the
479       environment
480   @type override: dict
481   @param override: dictionary with key/values that will override
482       our values
483   @rtype: dict
484   @return: the hook environment dictionary
485
486   """
487   bep = lu.cfg.GetClusterInfo().FillBE(instance)
488   args = {
489     'name': instance.name,
490     'primary_node': instance.primary_node,
491     'secondary_nodes': instance.secondary_nodes,
492     'os_type': instance.os,
493     'status': instance.os,
494     'memory': bep[constants.BE_MEMORY],
495     'vcpus': bep[constants.BE_VCPUS],
496     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
497   }
498   if override:
499     args.update(override)
500   return _BuildInstanceHookEnv(**args)
501
502
503 def _CheckInstanceBridgesExist(lu, instance):
504   """Check that the brigdes needed by an instance exist.
505
506   """
507   # check bridges existance
508   brlist = [nic.bridge for nic in instance.nics]
509   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
510   result.Raise()
511   if not result.data:
512     raise errors.OpPrereqError("One or more target bridges %s does not"
513                                " exist on destination node '%s'" %
514                                (brlist, instance.primary_node))
515
516
517 class LUDestroyCluster(NoHooksLU):
518   """Logical unit for destroying the cluster.
519
520   """
521   _OP_REQP = []
522
523   def CheckPrereq(self):
524     """Check prerequisites.
525
526     This checks whether the cluster is empty.
527
528     Any errors are signalled by raising errors.OpPrereqError.
529
530     """
531     master = self.cfg.GetMasterNode()
532
533     nodelist = self.cfg.GetNodeList()
534     if len(nodelist) != 1 or nodelist[0] != master:
535       raise errors.OpPrereqError("There are still %d node(s) in"
536                                  " this cluster." % (len(nodelist) - 1))
537     instancelist = self.cfg.GetInstanceList()
538     if instancelist:
539       raise errors.OpPrereqError("There are still %d instance(s) in"
540                                  " this cluster." % len(instancelist))
541
542   def Exec(self, feedback_fn):
543     """Destroys the cluster.
544
545     """
546     master = self.cfg.GetMasterNode()
547     result = self.rpc.call_node_stop_master(master, False)
548     result.Raise()
549     if not result.data:
550       raise errors.OpExecError("Could not disable the master role")
551     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
552     utils.CreateBackup(priv_key)
553     utils.CreateBackup(pub_key)
554     return master
555
556
557 class LUVerifyCluster(LogicalUnit):
558   """Verifies the cluster status.
559
560   """
561   HPATH = "cluster-verify"
562   HTYPE = constants.HTYPE_CLUSTER
563   _OP_REQP = ["skip_checks"]
564   REQ_BGL = False
565
566   def ExpandNames(self):
567     self.needed_locks = {
568       locking.LEVEL_NODE: locking.ALL_SET,
569       locking.LEVEL_INSTANCE: locking.ALL_SET,
570     }
571     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
572
573   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
574                   node_result, feedback_fn, master_files):
575     """Run multiple tests against a node.
576
577     Test list:
578
579       - compares ganeti version
580       - checks vg existance and size > 20G
581       - checks config file checksum
582       - checks ssh to other nodes
583
584     @type nodeinfo: L{objects.Node}
585     @param nodeinfo: the node to check
586     @param file_list: required list of files
587     @param local_cksum: dictionary of local files and their checksums
588     @param node_result: the results from the node
589     @param feedback_fn: function used to accumulate results
590     @param master_files: list of files that only masters should have
591
592     """
593     node = nodeinfo.name
594
595     # main result, node_result should be a non-empty dict
596     if not node_result or not isinstance(node_result, dict):
597       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
598       return True
599
600     # compares ganeti version
601     local_version = constants.PROTOCOL_VERSION
602     remote_version = node_result.get('version', None)
603     if not remote_version:
604       feedback_fn("  - ERROR: connection to %s failed" % (node))
605       return True
606
607     if local_version != remote_version:
608       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
609                       (local_version, node, remote_version))
610       return True
611
612     # checks vg existance and size > 20G
613
614     bad = False
615     vglist = node_result.get(constants.NV_VGLIST, None)
616     if not vglist:
617       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
618                       (node,))
619       bad = True
620     else:
621       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
622                                             constants.MIN_VG_SIZE)
623       if vgstatus:
624         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
625         bad = True
626
627     # checks config file checksum
628
629     remote_cksum = node_result.get(constants.NV_FILELIST, None)
630     if not isinstance(remote_cksum, dict):
631       bad = True
632       feedback_fn("  - ERROR: node hasn't returned file checksum data")
633     else:
634       for file_name in file_list:
635         node_is_mc = nodeinfo.master_candidate
636         must_have_file = file_name not in master_files
637         if file_name not in remote_cksum:
638           if node_is_mc or must_have_file:
639             bad = True
640             feedback_fn("  - ERROR: file '%s' missing" % file_name)
641         elif remote_cksum[file_name] != local_cksum[file_name]:
642           if node_is_mc or must_have_file:
643             bad = True
644             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
645           else:
646             # not candidate and this is not a must-have file
647             bad = True
648             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
649                         " '%s'" % file_name)
650         else:
651           # all good, except non-master/non-must have combination
652           if not node_is_mc and not must_have_file:
653             feedback_fn("  - ERROR: file '%s' should not exist on non master"
654                         " candidates" % file_name)
655
656     # checks ssh to any
657
658     if constants.NV_NODELIST not in node_result:
659       bad = True
660       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
661     else:
662       if node_result[constants.NV_NODELIST]:
663         bad = True
664         for node in node_result[constants.NV_NODELIST]:
665           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
666                           (node, node_result[constants.NV_NODELIST][node]))
667
668     if constants.NV_NODENETTEST not in node_result:
669       bad = True
670       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
671     else:
672       if node_result[constants.NV_NODENETTEST]:
673         bad = True
674         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
675         for node in nlist:
676           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
677                           (node, node_result[constants.NV_NODENETTEST][node]))
678
679     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
680     if isinstance(hyp_result, dict):
681       for hv_name, hv_result in hyp_result.iteritems():
682         if hv_result is not None:
683           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
684                       (hv_name, hv_result))
685     return bad
686
687   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688                       node_instance, feedback_fn):
689     """Verify an instance.
690
691     This function checks to see if the required block devices are
692     available on the instance's node.
693
694     """
695     bad = False
696
697     node_current = instanceconfig.primary_node
698
699     node_vol_should = {}
700     instanceconfig.MapLVsByNode(node_vol_should)
701
702     for node in node_vol_should:
703       for volume in node_vol_should[node]:
704         if node not in node_vol_is or volume not in node_vol_is[node]:
705           feedback_fn("  - ERROR: volume %s missing on node %s" %
706                           (volume, node))
707           bad = True
708
709     if not instanceconfig.status == 'down':
710       if (node_current not in node_instance or
711           not instance in node_instance[node_current]):
712         feedback_fn("  - ERROR: instance %s not running on node %s" %
713                         (instance, node_current))
714         bad = True
715
716     for node in node_instance:
717       if (not node == node_current):
718         if instance in node_instance[node]:
719           feedback_fn("  - ERROR: instance %s should not run on node %s" %
720                           (instance, node))
721           bad = True
722
723     return bad
724
725   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726     """Verify if there are any unknown volumes in the cluster.
727
728     The .os, .swap and backup volumes are ignored. All other volumes are
729     reported as unknown.
730
731     """
732     bad = False
733
734     for node in node_vol_is:
735       for volume in node_vol_is[node]:
736         if node not in node_vol_should or volume not in node_vol_should[node]:
737           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738                       (volume, node))
739           bad = True
740     return bad
741
742   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743     """Verify the list of running instances.
744
745     This checks what instances are running but unknown to the cluster.
746
747     """
748     bad = False
749     for node in node_instance:
750       for runninginstance in node_instance[node]:
751         if runninginstance not in instancelist:
752           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753                           (runninginstance, node))
754           bad = True
755     return bad
756
757   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758     """Verify N+1 Memory Resilience.
759
760     Check that if one single node dies we can still start all the instances it
761     was primary for.
762
763     """
764     bad = False
765
766     for node, nodeinfo in node_info.iteritems():
767       # This code checks that every node which is now listed as secondary has
768       # enough memory to host all instances it is supposed to should a single
769       # other node in the cluster fail.
770       # FIXME: not ready for failover to an arbitrary node
771       # FIXME: does not support file-backed instances
772       # WARNING: we currently take into account down instances as well as up
773       # ones, considering that even if they're down someone might want to start
774       # them even in the event of a node failure.
775       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
776         needed_mem = 0
777         for instance in instances:
778           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
779           if bep[constants.BE_AUTO_BALANCE]:
780             needed_mem += bep[constants.BE_MEMORY]
781         if nodeinfo['mfree'] < needed_mem:
782           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
783                       " failovers should node %s fail" % (node, prinode))
784           bad = True
785     return bad
786
787   def CheckPrereq(self):
788     """Check prerequisites.
789
790     Transform the list of checks we're going to skip into a set and check that
791     all its members are valid.
792
793     """
794     self.skip_set = frozenset(self.op.skip_checks)
795     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
796       raise errors.OpPrereqError("Invalid checks to be skipped specified")
797
798   def BuildHooksEnv(self):
799     """Build hooks env.
800
801     Cluster-Verify hooks just rone in the post phase and their failure makes
802     the output be logged in the verify output and the verification to fail.
803
804     """
805     all_nodes = self.cfg.GetNodeList()
806     # TODO: populate the environment with useful information for verify hooks
807     env = {}
808     return env, [], all_nodes
809
810   def Exec(self, feedback_fn):
811     """Verify integrity of cluster, performing various test on nodes.
812
813     """
814     bad = False
815     feedback_fn("* Verifying global settings")
816     for msg in self.cfg.VerifyConfig():
817       feedback_fn("  - ERROR: %s" % msg)
818
819     vg_name = self.cfg.GetVGName()
820     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
821     nodelist = utils.NiceSort(self.cfg.GetNodeList())
822     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
823     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
824     i_non_redundant = [] # Non redundant instances
825     i_non_a_balanced = [] # Non auto-balanced instances
826     node_volume = {}
827     node_instance = {}
828     node_info = {}
829     instance_cfg = {}
830
831     # FIXME: verify OS list
832     # do local checksums
833     master_files = [constants.CLUSTER_CONF_FILE]
834
835     file_names = ssconf.SimpleStore().GetFileList()
836     file_names.append(constants.SSL_CERT_FILE)
837     file_names.extend(master_files)
838
839     local_checksums = utils.FingerprintFiles(file_names)
840
841     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
842     node_verify_param = {
843       constants.NV_FILELIST: file_names,
844       constants.NV_NODELIST: nodelist,
845       constants.NV_HYPERVISOR: hypervisors,
846       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
847                                   node.secondary_ip) for node in nodeinfo],
848       constants.NV_LVLIST: vg_name,
849       constants.NV_INSTANCELIST: hypervisors,
850       constants.NV_VGLIST: None,
851       constants.NV_VERSION: None,
852       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
853       }
854     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
855                                            self.cfg.GetClusterName())
856
857     cluster = self.cfg.GetClusterInfo()
858     master_node = self.cfg.GetMasterNode()
859     for node_i in nodeinfo:
860       node = node_i.name
861       nresult = all_nvinfo[node].data
862
863       if node == master_node:
864         ntype = "master"
865       elif node_i.master_candidate:
866         ntype = "master candidate"
867       else:
868         ntype = "regular"
869       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
870
871       if all_nvinfo[node].failed or not isinstance(nresult, dict):
872         feedback_fn("  - ERROR: connection to %s failed" % (node,))
873         bad = True
874         continue
875
876       result = self._VerifyNode(node_i, file_names, local_checksums,
877                                 nresult, feedback_fn, master_files)
878       bad = bad or result
879
880       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
881       if isinstance(lvdata, basestring):
882         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
883                     (node, lvdata.encode('string_escape')))
884         bad = True
885         node_volume[node] = {}
886       elif not isinstance(lvdata, dict):
887         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
888         bad = True
889         continue
890       else:
891         node_volume[node] = lvdata
892
893       # node_instance
894       idata = nresult.get(constants.NV_INSTANCELIST, None)
895       if not isinstance(idata, list):
896         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
897                     (node,))
898         bad = True
899         continue
900
901       node_instance[node] = idata
902
903       # node_info
904       nodeinfo = nresult.get(constants.NV_HVINFO, None)
905       if not isinstance(nodeinfo, dict):
906         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
907         bad = True
908         continue
909
910       try:
911         node_info[node] = {
912           "mfree": int(nodeinfo['memory_free']),
913           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
914           "pinst": [],
915           "sinst": [],
916           # dictionary holding all instances this node is secondary for,
917           # grouped by their primary node. Each key is a cluster node, and each
918           # value is a list of instances which have the key as primary and the
919           # current node as secondary.  this is handy to calculate N+1 memory
920           # availability if you can only failover from a primary to its
921           # secondary.
922           "sinst-by-pnode": {},
923         }
924       except ValueError:
925         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
926         bad = True
927         continue
928
929     node_vol_should = {}
930
931     for instance in instancelist:
932       feedback_fn("* Verifying instance %s" % instance)
933       inst_config = self.cfg.GetInstanceInfo(instance)
934       result =  self._VerifyInstance(instance, inst_config, node_volume,
935                                      node_instance, feedback_fn)
936       bad = bad or result
937
938       inst_config.MapLVsByNode(node_vol_should)
939
940       instance_cfg[instance] = inst_config
941
942       pnode = inst_config.primary_node
943       if pnode in node_info:
944         node_info[pnode]['pinst'].append(instance)
945       else:
946         feedback_fn("  - ERROR: instance %s, connection to primary node"
947                     " %s failed" % (instance, pnode))
948         bad = True
949
950       # If the instance is non-redundant we cannot survive losing its primary
951       # node, so we are not N+1 compliant. On the other hand we have no disk
952       # templates with more than one secondary so that situation is not well
953       # supported either.
954       # FIXME: does not support file-backed instances
955       if len(inst_config.secondary_nodes) == 0:
956         i_non_redundant.append(instance)
957       elif len(inst_config.secondary_nodes) > 1:
958         feedback_fn("  - WARNING: multiple secondaries for instance %s"
959                     % instance)
960
961       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
962         i_non_a_balanced.append(instance)
963
964       for snode in inst_config.secondary_nodes:
965         if snode in node_info:
966           node_info[snode]['sinst'].append(instance)
967           if pnode not in node_info[snode]['sinst-by-pnode']:
968             node_info[snode]['sinst-by-pnode'][pnode] = []
969           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
970         else:
971           feedback_fn("  - ERROR: instance %s, connection to secondary node"
972                       " %s failed" % (instance, snode))
973
974     feedback_fn("* Verifying orphan volumes")
975     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
976                                        feedback_fn)
977     bad = bad or result
978
979     feedback_fn("* Verifying remaining instances")
980     result = self._VerifyOrphanInstances(instancelist, node_instance,
981                                          feedback_fn)
982     bad = bad or result
983
984     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
985       feedback_fn("* Verifying N+1 Memory redundancy")
986       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
987       bad = bad or result
988
989     feedback_fn("* Other Notes")
990     if i_non_redundant:
991       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
992                   % len(i_non_redundant))
993
994     if i_non_a_balanced:
995       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
996                   % len(i_non_a_balanced))
997
998     return not bad
999
1000   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1001     """Analize the post-hooks' result
1002
1003     This method analyses the hook result, handles it, and sends some
1004     nicely-formatted feedback back to the user.
1005
1006     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1007         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1008     @param hooks_results: the results of the multi-node hooks rpc call
1009     @param feedback_fn: function used send feedback back to the caller
1010     @param lu_result: previous Exec result
1011     @return: the new Exec result, based on the previous result
1012         and hook results
1013
1014     """
1015     # We only really run POST phase hooks, and are only interested in
1016     # their results
1017     if phase == constants.HOOKS_PHASE_POST:
1018       # Used to change hooks' output to proper indentation
1019       indent_re = re.compile('^', re.M)
1020       feedback_fn("* Hooks Results")
1021       if not hooks_results:
1022         feedback_fn("  - ERROR: general communication failure")
1023         lu_result = 1
1024       else:
1025         for node_name in hooks_results:
1026           show_node_header = True
1027           res = hooks_results[node_name]
1028           if res.failed or res.data is False or not isinstance(res.data, list):
1029             feedback_fn("    Communication failure in hooks execution")
1030             lu_result = 1
1031             continue
1032           for script, hkr, output in res.data:
1033             if hkr == constants.HKR_FAIL:
1034               # The node header is only shown once, if there are
1035               # failing hooks on that node
1036               if show_node_header:
1037                 feedback_fn("  Node %s:" % node_name)
1038                 show_node_header = False
1039               feedback_fn("    ERROR: Script %s failed, output:" % script)
1040               output = indent_re.sub('      ', output)
1041               feedback_fn("%s" % output)
1042               lu_result = 1
1043
1044       return lu_result
1045
1046
1047 class LUVerifyDisks(NoHooksLU):
1048   """Verifies the cluster disks status.
1049
1050   """
1051   _OP_REQP = []
1052   REQ_BGL = False
1053
1054   def ExpandNames(self):
1055     self.needed_locks = {
1056       locking.LEVEL_NODE: locking.ALL_SET,
1057       locking.LEVEL_INSTANCE: locking.ALL_SET,
1058     }
1059     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1060
1061   def CheckPrereq(self):
1062     """Check prerequisites.
1063
1064     This has no prerequisites.
1065
1066     """
1067     pass
1068
1069   def Exec(self, feedback_fn):
1070     """Verify integrity of cluster disks.
1071
1072     """
1073     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1074
1075     vg_name = self.cfg.GetVGName()
1076     nodes = utils.NiceSort(self.cfg.GetNodeList())
1077     instances = [self.cfg.GetInstanceInfo(name)
1078                  for name in self.cfg.GetInstanceList()]
1079
1080     nv_dict = {}
1081     for inst in instances:
1082       inst_lvs = {}
1083       if (inst.status != "up" or
1084           inst.disk_template not in constants.DTS_NET_MIRROR):
1085         continue
1086       inst.MapLVsByNode(inst_lvs)
1087       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1088       for node, vol_list in inst_lvs.iteritems():
1089         for vol in vol_list:
1090           nv_dict[(node, vol)] = inst
1091
1092     if not nv_dict:
1093       return result
1094
1095     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1096
1097     to_act = set()
1098     for node in nodes:
1099       # node_volume
1100       lvs = node_lvs[node]
1101       if lvs.failed:
1102         self.LogWarning("Connection to node %s failed: %s" %
1103                         (node, lvs.data))
1104         continue
1105       lvs = lvs.data
1106       if isinstance(lvs, basestring):
1107         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1108         res_nlvm[node] = lvs
1109       elif not isinstance(lvs, dict):
1110         logging.warning("Connection to node %s failed or invalid data"
1111                         " returned", node)
1112         res_nodes.append(node)
1113         continue
1114
1115       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1116         inst = nv_dict.pop((node, lv_name), None)
1117         if (not lv_online and inst is not None
1118             and inst.name not in res_instances):
1119           res_instances.append(inst.name)
1120
1121     # any leftover items in nv_dict are missing LVs, let's arrange the
1122     # data better
1123     for key, inst in nv_dict.iteritems():
1124       if inst.name not in res_missing:
1125         res_missing[inst.name] = []
1126       res_missing[inst.name].append(key)
1127
1128     return result
1129
1130
1131 class LURenameCluster(LogicalUnit):
1132   """Rename the cluster.
1133
1134   """
1135   HPATH = "cluster-rename"
1136   HTYPE = constants.HTYPE_CLUSTER
1137   _OP_REQP = ["name"]
1138
1139   def BuildHooksEnv(self):
1140     """Build hooks env.
1141
1142     """
1143     env = {
1144       "OP_TARGET": self.cfg.GetClusterName(),
1145       "NEW_NAME": self.op.name,
1146       }
1147     mn = self.cfg.GetMasterNode()
1148     return env, [mn], [mn]
1149
1150   def CheckPrereq(self):
1151     """Verify that the passed name is a valid one.
1152
1153     """
1154     hostname = utils.HostInfo(self.op.name)
1155
1156     new_name = hostname.name
1157     self.ip = new_ip = hostname.ip
1158     old_name = self.cfg.GetClusterName()
1159     old_ip = self.cfg.GetMasterIP()
1160     if new_name == old_name and new_ip == old_ip:
1161       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1162                                  " cluster has changed")
1163     if new_ip != old_ip:
1164       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1165         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1166                                    " reachable on the network. Aborting." %
1167                                    new_ip)
1168
1169     self.op.name = new_name
1170
1171   def Exec(self, feedback_fn):
1172     """Rename the cluster.
1173
1174     """
1175     clustername = self.op.name
1176     ip = self.ip
1177
1178     # shutdown the master IP
1179     master = self.cfg.GetMasterNode()
1180     result = self.rpc.call_node_stop_master(master, False)
1181     if result.failed or not result.data:
1182       raise errors.OpExecError("Could not disable the master role")
1183
1184     try:
1185       cluster = self.cfg.GetClusterInfo()
1186       cluster.cluster_name = clustername
1187       cluster.master_ip = ip
1188       self.cfg.Update(cluster)
1189
1190       # update the known hosts file
1191       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1192       node_list = self.cfg.GetNodeList()
1193       try:
1194         node_list.remove(master)
1195       except ValueError:
1196         pass
1197       result = self.rpc.call_upload_file(node_list,
1198                                          constants.SSH_KNOWN_HOSTS_FILE)
1199       for to_node, to_result in result.iteritems():
1200         if to_result.failed or not to_result.data:
1201           logging.error("Copy of file %s to node %s failed", fname, to_node)
1202
1203     finally:
1204       result = self.rpc.call_node_start_master(master, False)
1205       if result.failed or not result.data:
1206         self.LogWarning("Could not re-enable the master role on"
1207                         " the master, please restart manually.")
1208
1209
1210 def _RecursiveCheckIfLVMBased(disk):
1211   """Check if the given disk or its children are lvm-based.
1212
1213   @type disk: L{objects.Disk}
1214   @param disk: the disk to check
1215   @rtype: booleean
1216   @return: boolean indicating whether a LD_LV dev_type was found or not
1217
1218   """
1219   if disk.children:
1220     for chdisk in disk.children:
1221       if _RecursiveCheckIfLVMBased(chdisk):
1222         return True
1223   return disk.dev_type == constants.LD_LV
1224
1225
1226 class LUSetClusterParams(LogicalUnit):
1227   """Change the parameters of the cluster.
1228
1229   """
1230   HPATH = "cluster-modify"
1231   HTYPE = constants.HTYPE_CLUSTER
1232   _OP_REQP = []
1233   REQ_BGL = False
1234
1235   def CheckParameters(self):
1236     """Check parameters
1237
1238     """
1239     if not hasattr(self.op, "candidate_pool_size"):
1240       self.op.candidate_pool_size = None
1241     if self.op.candidate_pool_size is not None:
1242       try:
1243         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1244       except ValueError, err:
1245         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1246                                    str(err))
1247       if self.op.candidate_pool_size < 1:
1248         raise errors.OpPrereqError("At least one master candidate needed")
1249
1250   def ExpandNames(self):
1251     # FIXME: in the future maybe other cluster params won't require checking on
1252     # all nodes to be modified.
1253     self.needed_locks = {
1254       locking.LEVEL_NODE: locking.ALL_SET,
1255     }
1256     self.share_locks[locking.LEVEL_NODE] = 1
1257
1258   def BuildHooksEnv(self):
1259     """Build hooks env.
1260
1261     """
1262     env = {
1263       "OP_TARGET": self.cfg.GetClusterName(),
1264       "NEW_VG_NAME": self.op.vg_name,
1265       }
1266     mn = self.cfg.GetMasterNode()
1267     return env, [mn], [mn]
1268
1269   def CheckPrereq(self):
1270     """Check prerequisites.
1271
1272     This checks whether the given params don't conflict and
1273     if the given volume group is valid.
1274
1275     """
1276     # FIXME: This only works because there is only one parameter that can be
1277     # changed or removed.
1278     if self.op.vg_name is not None and not self.op.vg_name:
1279       instances = self.cfg.GetAllInstancesInfo().values()
1280       for inst in instances:
1281         for disk in inst.disks:
1282           if _RecursiveCheckIfLVMBased(disk):
1283             raise errors.OpPrereqError("Cannot disable lvm storage while"
1284                                        " lvm-based instances exist")
1285
1286     node_list = self.acquired_locks[locking.LEVEL_NODE]
1287
1288     # if vg_name not None, checks given volume group on all nodes
1289     if self.op.vg_name:
1290       vglist = self.rpc.call_vg_list(node_list)
1291       for node in node_list:
1292         if vglist[node].failed:
1293           # ignoring down node
1294           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1295           continue
1296         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1297                                               self.op.vg_name,
1298                                               constants.MIN_VG_SIZE)
1299         if vgstatus:
1300           raise errors.OpPrereqError("Error on node '%s': %s" %
1301                                      (node, vgstatus))
1302
1303     self.cluster = cluster = self.cfg.GetClusterInfo()
1304     # validate beparams changes
1305     if self.op.beparams:
1306       utils.CheckBEParams(self.op.beparams)
1307       self.new_beparams = cluster.FillDict(
1308         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1309
1310     # hypervisor list/parameters
1311     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1312     if self.op.hvparams:
1313       if not isinstance(self.op.hvparams, dict):
1314         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1315       for hv_name, hv_dict in self.op.hvparams.items():
1316         if hv_name not in self.new_hvparams:
1317           self.new_hvparams[hv_name] = hv_dict
1318         else:
1319           self.new_hvparams[hv_name].update(hv_dict)
1320
1321     if self.op.enabled_hypervisors is not None:
1322       self.hv_list = self.op.enabled_hypervisors
1323     else:
1324       self.hv_list = cluster.enabled_hypervisors
1325
1326     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1327       # either the enabled list has changed, or the parameters have, validate
1328       for hv_name, hv_params in self.new_hvparams.items():
1329         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1330             (self.op.enabled_hypervisors and
1331              hv_name in self.op.enabled_hypervisors)):
1332           # either this is a new hypervisor, or its parameters have changed
1333           hv_class = hypervisor.GetHypervisor(hv_name)
1334           hv_class.CheckParameterSyntax(hv_params)
1335           _CheckHVParams(self, node_list, hv_name, hv_params)
1336
1337   def Exec(self, feedback_fn):
1338     """Change the parameters of the cluster.
1339
1340     """
1341     if self.op.vg_name is not None:
1342       if self.op.vg_name != self.cfg.GetVGName():
1343         self.cfg.SetVGName(self.op.vg_name)
1344       else:
1345         feedback_fn("Cluster LVM configuration already in desired"
1346                     " state, not changing")
1347     if self.op.hvparams:
1348       self.cluster.hvparams = self.new_hvparams
1349     if self.op.enabled_hypervisors is not None:
1350       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1351     if self.op.beparams:
1352       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1353     if self.op.candidate_pool_size is not None:
1354       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1355
1356     self.cfg.Update(self.cluster)
1357
1358     # we want to update nodes after the cluster so that if any errors
1359     # happen, we have recorded and saved the cluster info
1360     if self.op.candidate_pool_size is not None:
1361       node_info = self.cfg.GetAllNodesInfo().values()
1362       num_candidates = len([node for node in node_info
1363                             if node.master_candidate])
1364       num_nodes = len(node_info)
1365       if num_candidates < self.op.candidate_pool_size:
1366         random.shuffle(node_info)
1367         for node in node_info:
1368           if num_candidates >= self.op.candidate_pool_size:
1369             break
1370           if node.master_candidate:
1371             continue
1372           node.master_candidate = True
1373           self.LogInfo("Promoting node %s to master candidate", node.name)
1374           self.cfg.Update(node)
1375           self.context.ReaddNode(node)
1376           num_candidates += 1
1377       elif num_candidates > self.op.candidate_pool_size:
1378         self.LogInfo("Note: more nodes are candidates (%d) than the new value"
1379                      " of candidate_pool_size (%d)" %
1380                      (num_candidates, self.op.candidate_pool_size))
1381
1382
1383 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1384   """Sleep and poll for an instance's disk to sync.
1385
1386   """
1387   if not instance.disks:
1388     return True
1389
1390   if not oneshot:
1391     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1392
1393   node = instance.primary_node
1394
1395   for dev in instance.disks:
1396     lu.cfg.SetDiskID(dev, node)
1397
1398   retries = 0
1399   while True:
1400     max_time = 0
1401     done = True
1402     cumul_degraded = False
1403     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1404     if rstats.failed or not rstats.data:
1405       lu.LogWarning("Can't get any data from node %s", node)
1406       retries += 1
1407       if retries >= 10:
1408         raise errors.RemoteError("Can't contact node %s for mirror data,"
1409                                  " aborting." % node)
1410       time.sleep(6)
1411       continue
1412     rstats = rstats.data
1413     retries = 0
1414     for i in range(len(rstats)):
1415       mstat = rstats[i]
1416       if mstat is None:
1417         lu.LogWarning("Can't compute data for node %s/%s",
1418                            node, instance.disks[i].iv_name)
1419         continue
1420       # we ignore the ldisk parameter
1421       perc_done, est_time, is_degraded, _ = mstat
1422       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1423       if perc_done is not None:
1424         done = False
1425         if est_time is not None:
1426           rem_time = "%d estimated seconds remaining" % est_time
1427           max_time = est_time
1428         else:
1429           rem_time = "no time estimate"
1430         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1431                         (instance.disks[i].iv_name, perc_done, rem_time))
1432     if done or oneshot:
1433       break
1434
1435     time.sleep(min(60, max_time))
1436
1437   if done:
1438     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1439   return not cumul_degraded
1440
1441
1442 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1443   """Check that mirrors are not degraded.
1444
1445   The ldisk parameter, if True, will change the test from the
1446   is_degraded attribute (which represents overall non-ok status for
1447   the device(s)) to the ldisk (representing the local storage status).
1448
1449   """
1450   lu.cfg.SetDiskID(dev, node)
1451   if ldisk:
1452     idx = 6
1453   else:
1454     idx = 5
1455
1456   result = True
1457   if on_primary or dev.AssembleOnSecondary():
1458     rstats = lu.rpc.call_blockdev_find(node, dev)
1459     if rstats.failed or not rstats.data:
1460       logging.warning("Node %s: disk degraded, not found or node down", node)
1461       result = False
1462     else:
1463       result = result and (not rstats.data[idx])
1464   if dev.children:
1465     for child in dev.children:
1466       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1467
1468   return result
1469
1470
1471 class LUDiagnoseOS(NoHooksLU):
1472   """Logical unit for OS diagnose/query.
1473
1474   """
1475   _OP_REQP = ["output_fields", "names"]
1476   REQ_BGL = False
1477   _FIELDS_STATIC = utils.FieldSet()
1478   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1479
1480   def ExpandNames(self):
1481     if self.op.names:
1482       raise errors.OpPrereqError("Selective OS query not supported")
1483
1484     _CheckOutputFields(static=self._FIELDS_STATIC,
1485                        dynamic=self._FIELDS_DYNAMIC,
1486                        selected=self.op.output_fields)
1487
1488     # Lock all nodes, in shared mode
1489     self.needed_locks = {}
1490     self.share_locks[locking.LEVEL_NODE] = 1
1491     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1492
1493   def CheckPrereq(self):
1494     """Check prerequisites.
1495
1496     """
1497
1498   @staticmethod
1499   def _DiagnoseByOS(node_list, rlist):
1500     """Remaps a per-node return list into an a per-os per-node dictionary
1501
1502     @param node_list: a list with the names of all nodes
1503     @param rlist: a map with node names as keys and OS objects as values
1504
1505     @rtype: dict
1506     @returns: a dictionary with osnames as keys and as value another map, with
1507         nodes as keys and list of OS objects as values, eg::
1508
1509           {"debian-etch": {"node1": [<object>,...],
1510                            "node2": [<object>,]}
1511           }
1512
1513     """
1514     all_os = {}
1515     for node_name, nr in rlist.iteritems():
1516       if nr.failed or not nr.data:
1517         continue
1518       for os_obj in nr.data:
1519         if os_obj.name not in all_os:
1520           # build a list of nodes for this os containing empty lists
1521           # for each node in node_list
1522           all_os[os_obj.name] = {}
1523           for nname in node_list:
1524             all_os[os_obj.name][nname] = []
1525         all_os[os_obj.name][node_name].append(os_obj)
1526     return all_os
1527
1528   def Exec(self, feedback_fn):
1529     """Compute the list of OSes.
1530
1531     """
1532     node_list = self.acquired_locks[locking.LEVEL_NODE]
1533     node_data = self.rpc.call_os_diagnose(node_list)
1534     if node_data == False:
1535       raise errors.OpExecError("Can't gather the list of OSes")
1536     pol = self._DiagnoseByOS(node_list, node_data)
1537     output = []
1538     for os_name, os_data in pol.iteritems():
1539       row = []
1540       for field in self.op.output_fields:
1541         if field == "name":
1542           val = os_name
1543         elif field == "valid":
1544           val = utils.all([osl and osl[0] for osl in os_data.values()])
1545         elif field == "node_status":
1546           val = {}
1547           for node_name, nos_list in os_data.iteritems():
1548             val[node_name] = [(v.status, v.path) for v in nos_list]
1549         else:
1550           raise errors.ParameterError(field)
1551         row.append(val)
1552       output.append(row)
1553
1554     return output
1555
1556
1557 class LURemoveNode(LogicalUnit):
1558   """Logical unit for removing a node.
1559
1560   """
1561   HPATH = "node-remove"
1562   HTYPE = constants.HTYPE_NODE
1563   _OP_REQP = ["node_name"]
1564
1565   def BuildHooksEnv(self):
1566     """Build hooks env.
1567
1568     This doesn't run on the target node in the pre phase as a failed
1569     node would then be impossible to remove.
1570
1571     """
1572     env = {
1573       "OP_TARGET": self.op.node_name,
1574       "NODE_NAME": self.op.node_name,
1575       }
1576     all_nodes = self.cfg.GetNodeList()
1577     all_nodes.remove(self.op.node_name)
1578     return env, all_nodes, all_nodes
1579
1580   def CheckPrereq(self):
1581     """Check prerequisites.
1582
1583     This checks:
1584      - the node exists in the configuration
1585      - it does not have primary or secondary instances
1586      - it's not the master
1587
1588     Any errors are signalled by raising errors.OpPrereqError.
1589
1590     """
1591     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1592     if node is None:
1593       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1594
1595     instance_list = self.cfg.GetInstanceList()
1596
1597     masternode = self.cfg.GetMasterNode()
1598     if node.name == masternode:
1599       raise errors.OpPrereqError("Node is the master node,"
1600                                  " you need to failover first.")
1601
1602     for instance_name in instance_list:
1603       instance = self.cfg.GetInstanceInfo(instance_name)
1604       if node.name == instance.primary_node:
1605         raise errors.OpPrereqError("Instance %s still running on the node,"
1606                                    " please remove first." % instance_name)
1607       if node.name in instance.secondary_nodes:
1608         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1609                                    " please remove first." % instance_name)
1610     self.op.node_name = node.name
1611     self.node = node
1612
1613   def Exec(self, feedback_fn):
1614     """Removes the node from the cluster.
1615
1616     """
1617     node = self.node
1618     logging.info("Stopping the node daemon and removing configs from node %s",
1619                  node.name)
1620
1621     self.context.RemoveNode(node.name)
1622
1623     self.rpc.call_node_leave_cluster(node.name)
1624
1625     # Promote nodes to master candidate as needed
1626     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1627     node_info = self.cfg.GetAllNodesInfo().values()
1628     num_candidates = len([n for n in node_info
1629                           if n.master_candidate])
1630     num_nodes = len(node_info)
1631     random.shuffle(node_info)
1632     for node in node_info:
1633       if num_candidates >= cp_size or num_candidates >= num_nodes:
1634         break
1635       if node.master_candidate:
1636         continue
1637       node.master_candidate = True
1638       self.LogInfo("Promoting node %s to master candidate", node.name)
1639       self.cfg.Update(node)
1640       self.context.ReaddNode(node)
1641       num_candidates += 1
1642
1643
1644 class LUQueryNodes(NoHooksLU):
1645   """Logical unit for querying nodes.
1646
1647   """
1648   _OP_REQP = ["output_fields", "names"]
1649   REQ_BGL = False
1650   _FIELDS_DYNAMIC = utils.FieldSet(
1651     "dtotal", "dfree",
1652     "mtotal", "mnode", "mfree",
1653     "bootid",
1654     "ctotal",
1655     )
1656
1657   _FIELDS_STATIC = utils.FieldSet(
1658     "name", "pinst_cnt", "sinst_cnt",
1659     "pinst_list", "sinst_list",
1660     "pip", "sip", "tags",
1661     "serial_no",
1662     "master_candidate",
1663     "master",
1664     "offline",
1665     )
1666
1667   def ExpandNames(self):
1668     _CheckOutputFields(static=self._FIELDS_STATIC,
1669                        dynamic=self._FIELDS_DYNAMIC,
1670                        selected=self.op.output_fields)
1671
1672     self.needed_locks = {}
1673     self.share_locks[locking.LEVEL_NODE] = 1
1674
1675     if self.op.names:
1676       self.wanted = _GetWantedNodes(self, self.op.names)
1677     else:
1678       self.wanted = locking.ALL_SET
1679
1680     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1681     if self.do_locking:
1682       # if we don't request only static fields, we need to lock the nodes
1683       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1684
1685
1686   def CheckPrereq(self):
1687     """Check prerequisites.
1688
1689     """
1690     # The validation of the node list is done in the _GetWantedNodes,
1691     # if non empty, and if empty, there's no validation to do
1692     pass
1693
1694   def Exec(self, feedback_fn):
1695     """Computes the list of nodes and their attributes.
1696
1697     """
1698     all_info = self.cfg.GetAllNodesInfo()
1699     if self.do_locking:
1700       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1701     elif self.wanted != locking.ALL_SET:
1702       nodenames = self.wanted
1703       missing = set(nodenames).difference(all_info.keys())
1704       if missing:
1705         raise errors.OpExecError(
1706           "Some nodes were removed before retrieving their data: %s" % missing)
1707     else:
1708       nodenames = all_info.keys()
1709
1710     nodenames = utils.NiceSort(nodenames)
1711     nodelist = [all_info[name] for name in nodenames]
1712
1713     # begin data gathering
1714
1715     if self.do_locking:
1716       live_data = {}
1717       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1718                                           self.cfg.GetHypervisorType())
1719       for name in nodenames:
1720         nodeinfo = node_data[name]
1721         if not nodeinfo.failed and nodeinfo.data:
1722           nodeinfo = nodeinfo.data
1723           fn = utils.TryConvert
1724           live_data[name] = {
1725             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1726             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1727             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1728             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1729             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1730             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1731             "bootid": nodeinfo.get('bootid', None),
1732             }
1733         else:
1734           live_data[name] = {}
1735     else:
1736       live_data = dict.fromkeys(nodenames, {})
1737
1738     node_to_primary = dict([(name, set()) for name in nodenames])
1739     node_to_secondary = dict([(name, set()) for name in nodenames])
1740
1741     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1742                              "sinst_cnt", "sinst_list"))
1743     if inst_fields & frozenset(self.op.output_fields):
1744       instancelist = self.cfg.GetInstanceList()
1745
1746       for instance_name in instancelist:
1747         inst = self.cfg.GetInstanceInfo(instance_name)
1748         if inst.primary_node in node_to_primary:
1749           node_to_primary[inst.primary_node].add(inst.name)
1750         for secnode in inst.secondary_nodes:
1751           if secnode in node_to_secondary:
1752             node_to_secondary[secnode].add(inst.name)
1753
1754     master_node = self.cfg.GetMasterNode()
1755
1756     # end data gathering
1757
1758     output = []
1759     for node in nodelist:
1760       node_output = []
1761       for field in self.op.output_fields:
1762         if field == "name":
1763           val = node.name
1764         elif field == "pinst_list":
1765           val = list(node_to_primary[node.name])
1766         elif field == "sinst_list":
1767           val = list(node_to_secondary[node.name])
1768         elif field == "pinst_cnt":
1769           val = len(node_to_primary[node.name])
1770         elif field == "sinst_cnt":
1771           val = len(node_to_secondary[node.name])
1772         elif field == "pip":
1773           val = node.primary_ip
1774         elif field == "sip":
1775           val = node.secondary_ip
1776         elif field == "tags":
1777           val = list(node.GetTags())
1778         elif field == "serial_no":
1779           val = node.serial_no
1780         elif field == "master_candidate":
1781           val = node.master_candidate
1782         elif field == "master":
1783           val = node.name == master_node
1784         elif field == "offline":
1785           val = node.offline
1786         elif self._FIELDS_DYNAMIC.Matches(field):
1787           val = live_data[node.name].get(field, None)
1788         else:
1789           raise errors.ParameterError(field)
1790         node_output.append(val)
1791       output.append(node_output)
1792
1793     return output
1794
1795
1796 class LUQueryNodeVolumes(NoHooksLU):
1797   """Logical unit for getting volumes on node(s).
1798
1799   """
1800   _OP_REQP = ["nodes", "output_fields"]
1801   REQ_BGL = False
1802   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1803   _FIELDS_STATIC = utils.FieldSet("node")
1804
1805   def ExpandNames(self):
1806     _CheckOutputFields(static=self._FIELDS_STATIC,
1807                        dynamic=self._FIELDS_DYNAMIC,
1808                        selected=self.op.output_fields)
1809
1810     self.needed_locks = {}
1811     self.share_locks[locking.LEVEL_NODE] = 1
1812     if not self.op.nodes:
1813       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1814     else:
1815       self.needed_locks[locking.LEVEL_NODE] = \
1816         _GetWantedNodes(self, self.op.nodes)
1817
1818   def CheckPrereq(self):
1819     """Check prerequisites.
1820
1821     This checks that the fields required are valid output fields.
1822
1823     """
1824     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1825
1826   def Exec(self, feedback_fn):
1827     """Computes the list of nodes and their attributes.
1828
1829     """
1830     nodenames = self.nodes
1831     volumes = self.rpc.call_node_volumes(nodenames)
1832
1833     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1834              in self.cfg.GetInstanceList()]
1835
1836     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1837
1838     output = []
1839     for node in nodenames:
1840       if node not in volumes or volumes[node].failed or not volumes[node].data:
1841         continue
1842
1843       node_vols = volumes[node].data[:]
1844       node_vols.sort(key=lambda vol: vol['dev'])
1845
1846       for vol in node_vols:
1847         node_output = []
1848         for field in self.op.output_fields:
1849           if field == "node":
1850             val = node
1851           elif field == "phys":
1852             val = vol['dev']
1853           elif field == "vg":
1854             val = vol['vg']
1855           elif field == "name":
1856             val = vol['name']
1857           elif field == "size":
1858             val = int(float(vol['size']))
1859           elif field == "instance":
1860             for inst in ilist:
1861               if node not in lv_by_node[inst]:
1862                 continue
1863               if vol['name'] in lv_by_node[inst][node]:
1864                 val = inst.name
1865                 break
1866             else:
1867               val = '-'
1868           else:
1869             raise errors.ParameterError(field)
1870           node_output.append(str(val))
1871
1872         output.append(node_output)
1873
1874     return output
1875
1876
1877 class LUAddNode(LogicalUnit):
1878   """Logical unit for adding node to the cluster.
1879
1880   """
1881   HPATH = "node-add"
1882   HTYPE = constants.HTYPE_NODE
1883   _OP_REQP = ["node_name"]
1884
1885   def BuildHooksEnv(self):
1886     """Build hooks env.
1887
1888     This will run on all nodes before, and on all nodes + the new node after.
1889
1890     """
1891     env = {
1892       "OP_TARGET": self.op.node_name,
1893       "NODE_NAME": self.op.node_name,
1894       "NODE_PIP": self.op.primary_ip,
1895       "NODE_SIP": self.op.secondary_ip,
1896       }
1897     nodes_0 = self.cfg.GetNodeList()
1898     nodes_1 = nodes_0 + [self.op.node_name, ]
1899     return env, nodes_0, nodes_1
1900
1901   def CheckPrereq(self):
1902     """Check prerequisites.
1903
1904     This checks:
1905      - the new node is not already in the config
1906      - it is resolvable
1907      - its parameters (single/dual homed) matches the cluster
1908
1909     Any errors are signalled by raising errors.OpPrereqError.
1910
1911     """
1912     node_name = self.op.node_name
1913     cfg = self.cfg
1914
1915     dns_data = utils.HostInfo(node_name)
1916
1917     node = dns_data.name
1918     primary_ip = self.op.primary_ip = dns_data.ip
1919     secondary_ip = getattr(self.op, "secondary_ip", None)
1920     if secondary_ip is None:
1921       secondary_ip = primary_ip
1922     if not utils.IsValidIP(secondary_ip):
1923       raise errors.OpPrereqError("Invalid secondary IP given")
1924     self.op.secondary_ip = secondary_ip
1925
1926     node_list = cfg.GetNodeList()
1927     if not self.op.readd and node in node_list:
1928       raise errors.OpPrereqError("Node %s is already in the configuration" %
1929                                  node)
1930     elif self.op.readd and node not in node_list:
1931       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1932
1933     for existing_node_name in node_list:
1934       existing_node = cfg.GetNodeInfo(existing_node_name)
1935
1936       if self.op.readd and node == existing_node_name:
1937         if (existing_node.primary_ip != primary_ip or
1938             existing_node.secondary_ip != secondary_ip):
1939           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1940                                      " address configuration as before")
1941         continue
1942
1943       if (existing_node.primary_ip == primary_ip or
1944           existing_node.secondary_ip == primary_ip or
1945           existing_node.primary_ip == secondary_ip or
1946           existing_node.secondary_ip == secondary_ip):
1947         raise errors.OpPrereqError("New node ip address(es) conflict with"
1948                                    " existing node %s" % existing_node.name)
1949
1950     # check that the type of the node (single versus dual homed) is the
1951     # same as for the master
1952     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1953     master_singlehomed = myself.secondary_ip == myself.primary_ip
1954     newbie_singlehomed = secondary_ip == primary_ip
1955     if master_singlehomed != newbie_singlehomed:
1956       if master_singlehomed:
1957         raise errors.OpPrereqError("The master has no private ip but the"
1958                                    " new node has one")
1959       else:
1960         raise errors.OpPrereqError("The master has a private ip but the"
1961                                    " new node doesn't have one")
1962
1963     # checks reachablity
1964     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1965       raise errors.OpPrereqError("Node not reachable by ping")
1966
1967     if not newbie_singlehomed:
1968       # check reachability from my secondary ip to newbie's secondary ip
1969       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1970                            source=myself.secondary_ip):
1971         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1972                                    " based ping to noded port")
1973
1974     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1975     node_info = self.cfg.GetAllNodesInfo().values()
1976     num_candidates = len([n for n in node_info
1977                           if n.master_candidate])
1978     master_candidate = num_candidates < cp_size
1979
1980     self.new_node = objects.Node(name=node,
1981                                  primary_ip=primary_ip,
1982                                  secondary_ip=secondary_ip,
1983                                  master_candidate=master_candidate,
1984                                  offline=False)
1985
1986   def Exec(self, feedback_fn):
1987     """Adds the new node to the cluster.
1988
1989     """
1990     new_node = self.new_node
1991     node = new_node.name
1992
1993     # check connectivity
1994     result = self.rpc.call_version([node])[node]
1995     result.Raise()
1996     if result.data:
1997       if constants.PROTOCOL_VERSION == result.data:
1998         logging.info("Communication to node %s fine, sw version %s match",
1999                      node, result.data)
2000       else:
2001         raise errors.OpExecError("Version mismatch master version %s,"
2002                                  " node version %s" %
2003                                  (constants.PROTOCOL_VERSION, result.data))
2004     else:
2005       raise errors.OpExecError("Cannot get version from the new node")
2006
2007     # setup ssh on node
2008     logging.info("Copy ssh key to node %s", node)
2009     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2010     keyarray = []
2011     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2012                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2013                 priv_key, pub_key]
2014
2015     for i in keyfiles:
2016       f = open(i, 'r')
2017       try:
2018         keyarray.append(f.read())
2019       finally:
2020         f.close()
2021
2022     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2023                                     keyarray[2],
2024                                     keyarray[3], keyarray[4], keyarray[5])
2025
2026     if result.failed or not result.data:
2027       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2028
2029     # Add node to our /etc/hosts, and add key to known_hosts
2030     utils.AddHostToEtcHosts(new_node.name)
2031
2032     if new_node.secondary_ip != new_node.primary_ip:
2033       result = self.rpc.call_node_has_ip_address(new_node.name,
2034                                                  new_node.secondary_ip)
2035       if result.failed or not result.data:
2036         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2037                                  " you gave (%s). Please fix and re-run this"
2038                                  " command." % new_node.secondary_ip)
2039
2040     node_verify_list = [self.cfg.GetMasterNode()]
2041     node_verify_param = {
2042       'nodelist': [node],
2043       # TODO: do a node-net-test as well?
2044     }
2045
2046     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2047                                        self.cfg.GetClusterName())
2048     for verifier in node_verify_list:
2049       if result[verifier].failed or not result[verifier].data:
2050         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2051                                  " for remote verification" % verifier)
2052       if result[verifier].data['nodelist']:
2053         for failed in result[verifier].data['nodelist']:
2054           feedback_fn("ssh/hostname verification failed %s -> %s" %
2055                       (verifier, result[verifier]['nodelist'][failed]))
2056         raise errors.OpExecError("ssh/hostname verification failed.")
2057
2058     # Distribute updated /etc/hosts and known_hosts to all nodes,
2059     # including the node just added
2060     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2061     dist_nodes = self.cfg.GetNodeList()
2062     if not self.op.readd:
2063       dist_nodes.append(node)
2064     if myself.name in dist_nodes:
2065       dist_nodes.remove(myself.name)
2066
2067     logging.debug("Copying hosts and known_hosts to all nodes")
2068     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2069       result = self.rpc.call_upload_file(dist_nodes, fname)
2070       for to_node, to_result in result.iteritems():
2071         if to_result.failed or not to_result.data:
2072           logging.error("Copy of file %s to node %s failed", fname, to_node)
2073
2074     to_copy = []
2075     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2076       to_copy.append(constants.VNC_PASSWORD_FILE)
2077     for fname in to_copy:
2078       result = self.rpc.call_upload_file([node], fname)
2079       if result[node].failed or not result[node]:
2080         logging.error("Could not copy file %s to node %s", fname, node)
2081
2082     if self.op.readd:
2083       self.context.ReaddNode(new_node)
2084     else:
2085       self.context.AddNode(new_node)
2086
2087
2088 class LUSetNodeParams(LogicalUnit):
2089   """Modifies the parameters of a node.
2090
2091   """
2092   HPATH = "node-modify"
2093   HTYPE = constants.HTYPE_NODE
2094   _OP_REQP = ["node_name"]
2095   REQ_BGL = False
2096
2097   def CheckArguments(self):
2098     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2099     if node_name is None:
2100       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2101     self.op.node_name = node_name
2102     if not hasattr(self.op, 'master_candidate'):
2103       raise errors.OpPrereqError("Please pass at least one modification")
2104     self.op.master_candidate = bool(self.op.master_candidate)
2105
2106   def ExpandNames(self):
2107     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2108
2109   def BuildHooksEnv(self):
2110     """Build hooks env.
2111
2112     This runs on the master node.
2113
2114     """
2115     env = {
2116       "OP_TARGET": self.op.node_name,
2117       "MASTER_CANDIDATE": str(self.op.master_candidate),
2118       }
2119     nl = [self.cfg.GetMasterNode(),
2120           self.op.node_name]
2121     return env, nl, nl
2122
2123   def CheckPrereq(self):
2124     """Check prerequisites.
2125
2126     This only checks the instance list against the existing names.
2127
2128     """
2129     force = self.force = self.op.force
2130
2131     if self.op.master_candidate == False:
2132       if self.op.node_name == self.cfg.GetMasterNode():
2133         raise errors.OpPrereqError("The master node has to be a"
2134                                    " master candidate")
2135       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2136       node_info = self.cfg.GetAllNodesInfo().values()
2137       num_candidates = len([node for node in node_info
2138                             if node.master_candidate])
2139       if num_candidates <= cp_size:
2140         msg = ("Not enough master candidates (desired"
2141                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2142         if force:
2143           self.LogWarning(msg)
2144         else:
2145           raise errors.OpPrereqError(msg)
2146
2147     return
2148
2149   def Exec(self, feedback_fn):
2150     """Modifies a node.
2151
2152     """
2153     node = self.cfg.GetNodeInfo(self.op.node_name)
2154
2155     result = []
2156
2157     if self.op.master_candidate is not None:
2158       node.master_candidate = self.op.master_candidate
2159       result.append(("master_candidate", str(self.op.master_candidate)))
2160       if self.op.master_candidate == False:
2161         rrc = self.rpc.call_node_demote_from_mc(node.name)
2162         if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2163             or len(rrc.data) != 2):
2164           self.LogWarning("Node rpc error: %s" % rrc.error)
2165         elif not rrc.data[0]:
2166           self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2167
2168     # this will trigger configuration file update, if needed
2169     self.cfg.Update(node)
2170     # this will trigger job queue propagation or cleanup
2171     if self.op.node_name != self.cfg.GetMasterNode():
2172       self.context.ReaddNode(node)
2173
2174     return result
2175
2176
2177 class LUQueryClusterInfo(NoHooksLU):
2178   """Query cluster configuration.
2179
2180   """
2181   _OP_REQP = []
2182   REQ_BGL = False
2183
2184   def ExpandNames(self):
2185     self.needed_locks = {}
2186
2187   def CheckPrereq(self):
2188     """No prerequsites needed for this LU.
2189
2190     """
2191     pass
2192
2193   def Exec(self, feedback_fn):
2194     """Return cluster config.
2195
2196     """
2197     cluster = self.cfg.GetClusterInfo()
2198     result = {
2199       "software_version": constants.RELEASE_VERSION,
2200       "protocol_version": constants.PROTOCOL_VERSION,
2201       "config_version": constants.CONFIG_VERSION,
2202       "os_api_version": constants.OS_API_VERSION,
2203       "export_version": constants.EXPORT_VERSION,
2204       "architecture": (platform.architecture()[0], platform.machine()),
2205       "name": cluster.cluster_name,
2206       "master": cluster.master_node,
2207       "default_hypervisor": cluster.default_hypervisor,
2208       "enabled_hypervisors": cluster.enabled_hypervisors,
2209       "hvparams": cluster.hvparams,
2210       "beparams": cluster.beparams,
2211       "candidate_pool_size": cluster.candidate_pool_size,
2212       }
2213
2214     return result
2215
2216
2217 class LUQueryConfigValues(NoHooksLU):
2218   """Return configuration values.
2219
2220   """
2221   _OP_REQP = []
2222   REQ_BGL = False
2223   _FIELDS_DYNAMIC = utils.FieldSet()
2224   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2225
2226   def ExpandNames(self):
2227     self.needed_locks = {}
2228
2229     _CheckOutputFields(static=self._FIELDS_STATIC,
2230                        dynamic=self._FIELDS_DYNAMIC,
2231                        selected=self.op.output_fields)
2232
2233   def CheckPrereq(self):
2234     """No prerequisites.
2235
2236     """
2237     pass
2238
2239   def Exec(self, feedback_fn):
2240     """Dump a representation of the cluster config to the standard output.
2241
2242     """
2243     values = []
2244     for field in self.op.output_fields:
2245       if field == "cluster_name":
2246         entry = self.cfg.GetClusterName()
2247       elif field == "master_node":
2248         entry = self.cfg.GetMasterNode()
2249       elif field == "drain_flag":
2250         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2251       else:
2252         raise errors.ParameterError(field)
2253       values.append(entry)
2254     return values
2255
2256
2257 class LUActivateInstanceDisks(NoHooksLU):
2258   """Bring up an instance's disks.
2259
2260   """
2261   _OP_REQP = ["instance_name"]
2262   REQ_BGL = False
2263
2264   def ExpandNames(self):
2265     self._ExpandAndLockInstance()
2266     self.needed_locks[locking.LEVEL_NODE] = []
2267     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2268
2269   def DeclareLocks(self, level):
2270     if level == locking.LEVEL_NODE:
2271       self._LockInstancesNodes()
2272
2273   def CheckPrereq(self):
2274     """Check prerequisites.
2275
2276     This checks that the instance is in the cluster.
2277
2278     """
2279     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2280     assert self.instance is not None, \
2281       "Cannot retrieve locked instance %s" % self.op.instance_name
2282
2283   def Exec(self, feedback_fn):
2284     """Activate the disks.
2285
2286     """
2287     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2288     if not disks_ok:
2289       raise errors.OpExecError("Cannot activate block devices")
2290
2291     return disks_info
2292
2293
2294 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2295   """Prepare the block devices for an instance.
2296
2297   This sets up the block devices on all nodes.
2298
2299   @type lu: L{LogicalUnit}
2300   @param lu: the logical unit on whose behalf we execute
2301   @type instance: L{objects.Instance}
2302   @param instance: the instance for whose disks we assemble
2303   @type ignore_secondaries: boolean
2304   @param ignore_secondaries: if true, errors on secondary nodes
2305       won't result in an error return from the function
2306   @return: False if the operation failed, otherwise a list of
2307       (host, instance_visible_name, node_visible_name)
2308       with the mapping from node devices to instance devices
2309
2310   """
2311   device_info = []
2312   disks_ok = True
2313   iname = instance.name
2314   # With the two passes mechanism we try to reduce the window of
2315   # opportunity for the race condition of switching DRBD to primary
2316   # before handshaking occured, but we do not eliminate it
2317
2318   # The proper fix would be to wait (with some limits) until the
2319   # connection has been made and drbd transitions from WFConnection
2320   # into any other network-connected state (Connected, SyncTarget,
2321   # SyncSource, etc.)
2322
2323   # 1st pass, assemble on all nodes in secondary mode
2324   for inst_disk in instance.disks:
2325     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2326       lu.cfg.SetDiskID(node_disk, node)
2327       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2328       if result.failed or not result:
2329         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2330                            " (is_primary=False, pass=1)",
2331                            inst_disk.iv_name, node)
2332         if not ignore_secondaries:
2333           disks_ok = False
2334
2335   # FIXME: race condition on drbd migration to primary
2336
2337   # 2nd pass, do only the primary node
2338   for inst_disk in instance.disks:
2339     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2340       if node != instance.primary_node:
2341         continue
2342       lu.cfg.SetDiskID(node_disk, node)
2343       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2344       if result.failed or not result:
2345         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2346                            " (is_primary=True, pass=2)",
2347                            inst_disk.iv_name, node)
2348         disks_ok = False
2349     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2350
2351   # leave the disks configured for the primary node
2352   # this is a workaround that would be fixed better by
2353   # improving the logical/physical id handling
2354   for disk in instance.disks:
2355     lu.cfg.SetDiskID(disk, instance.primary_node)
2356
2357   return disks_ok, device_info
2358
2359
2360 def _StartInstanceDisks(lu, instance, force):
2361   """Start the disks of an instance.
2362
2363   """
2364   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2365                                            ignore_secondaries=force)
2366   if not disks_ok:
2367     _ShutdownInstanceDisks(lu, instance)
2368     if force is not None and not force:
2369       lu.proc.LogWarning("", hint="If the message above refers to a"
2370                          " secondary node,"
2371                          " you can retry the operation using '--force'.")
2372     raise errors.OpExecError("Disk consistency error")
2373
2374
2375 class LUDeactivateInstanceDisks(NoHooksLU):
2376   """Shutdown an instance's disks.
2377
2378   """
2379   _OP_REQP = ["instance_name"]
2380   REQ_BGL = False
2381
2382   def ExpandNames(self):
2383     self._ExpandAndLockInstance()
2384     self.needed_locks[locking.LEVEL_NODE] = []
2385     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2386
2387   def DeclareLocks(self, level):
2388     if level == locking.LEVEL_NODE:
2389       self._LockInstancesNodes()
2390
2391   def CheckPrereq(self):
2392     """Check prerequisites.
2393
2394     This checks that the instance is in the cluster.
2395
2396     """
2397     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2398     assert self.instance is not None, \
2399       "Cannot retrieve locked instance %s" % self.op.instance_name
2400
2401   def Exec(self, feedback_fn):
2402     """Deactivate the disks
2403
2404     """
2405     instance = self.instance
2406     _SafeShutdownInstanceDisks(self, instance)
2407
2408
2409 def _SafeShutdownInstanceDisks(lu, instance):
2410   """Shutdown block devices of an instance.
2411
2412   This function checks if an instance is running, before calling
2413   _ShutdownInstanceDisks.
2414
2415   """
2416   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2417                                       [instance.hypervisor])
2418   ins_l = ins_l[instance.primary_node]
2419   if ins_l.failed or not isinstance(ins_l.data, list):
2420     raise errors.OpExecError("Can't contact node '%s'" %
2421                              instance.primary_node)
2422
2423   if instance.name in ins_l.data:
2424     raise errors.OpExecError("Instance is running, can't shutdown"
2425                              " block devices.")
2426
2427   _ShutdownInstanceDisks(lu, instance)
2428
2429
2430 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2431   """Shutdown block devices of an instance.
2432
2433   This does the shutdown on all nodes of the instance.
2434
2435   If the ignore_primary is false, errors on the primary node are
2436   ignored.
2437
2438   """
2439   result = True
2440   for disk in instance.disks:
2441     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2442       lu.cfg.SetDiskID(top_disk, node)
2443       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2444       if result.failed or not result.data:
2445         logging.error("Could not shutdown block device %s on node %s",
2446                       disk.iv_name, node)
2447         if not ignore_primary or node != instance.primary_node:
2448           result = False
2449   return result
2450
2451
2452 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2453   """Checks if a node has enough free memory.
2454
2455   This function check if a given node has the needed amount of free
2456   memory. In case the node has less memory or we cannot get the
2457   information from the node, this function raise an OpPrereqError
2458   exception.
2459
2460   @type lu: C{LogicalUnit}
2461   @param lu: a logical unit from which we get configuration data
2462   @type node: C{str}
2463   @param node: the node to check
2464   @type reason: C{str}
2465   @param reason: string to use in the error message
2466   @type requested: C{int}
2467   @param requested: the amount of memory in MiB to check for
2468   @type hypervisor: C{str}
2469   @param hypervisor: the hypervisor to ask for memory stats
2470   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2471       we cannot check the node
2472
2473   """
2474   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2475   nodeinfo[node].Raise()
2476   free_mem = nodeinfo[node].data.get('memory_free')
2477   if not isinstance(free_mem, int):
2478     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2479                              " was '%s'" % (node, free_mem))
2480   if requested > free_mem:
2481     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2482                              " needed %s MiB, available %s MiB" %
2483                              (node, reason, requested, free_mem))
2484
2485
2486 class LUStartupInstance(LogicalUnit):
2487   """Starts an instance.
2488
2489   """
2490   HPATH = "instance-start"
2491   HTYPE = constants.HTYPE_INSTANCE
2492   _OP_REQP = ["instance_name", "force"]
2493   REQ_BGL = False
2494
2495   def ExpandNames(self):
2496     self._ExpandAndLockInstance()
2497
2498   def BuildHooksEnv(self):
2499     """Build hooks env.
2500
2501     This runs on master, primary and secondary nodes of the instance.
2502
2503     """
2504     env = {
2505       "FORCE": self.op.force,
2506       }
2507     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2508     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2509           list(self.instance.secondary_nodes))
2510     return env, nl, nl
2511
2512   def CheckPrereq(self):
2513     """Check prerequisites.
2514
2515     This checks that the instance is in the cluster.
2516
2517     """
2518     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2519     assert self.instance is not None, \
2520       "Cannot retrieve locked instance %s" % self.op.instance_name
2521
2522     bep = self.cfg.GetClusterInfo().FillBE(instance)
2523     # check bridges existance
2524     _CheckInstanceBridgesExist(self, instance)
2525
2526     _CheckNodeFreeMemory(self, instance.primary_node,
2527                          "starting instance %s" % instance.name,
2528                          bep[constants.BE_MEMORY], instance.hypervisor)
2529
2530   def Exec(self, feedback_fn):
2531     """Start the instance.
2532
2533     """
2534     instance = self.instance
2535     force = self.op.force
2536     extra_args = getattr(self.op, "extra_args", "")
2537
2538     self.cfg.MarkInstanceUp(instance.name)
2539
2540     node_current = instance.primary_node
2541
2542     _StartInstanceDisks(self, instance, force)
2543
2544     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2545     if result.failed or not result.data:
2546       _ShutdownInstanceDisks(self, instance)
2547       raise errors.OpExecError("Could not start instance")
2548
2549
2550 class LURebootInstance(LogicalUnit):
2551   """Reboot an instance.
2552
2553   """
2554   HPATH = "instance-reboot"
2555   HTYPE = constants.HTYPE_INSTANCE
2556   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2557   REQ_BGL = False
2558
2559   def ExpandNames(self):
2560     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2561                                    constants.INSTANCE_REBOOT_HARD,
2562                                    constants.INSTANCE_REBOOT_FULL]:
2563       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2564                                   (constants.INSTANCE_REBOOT_SOFT,
2565                                    constants.INSTANCE_REBOOT_HARD,
2566                                    constants.INSTANCE_REBOOT_FULL))
2567     self._ExpandAndLockInstance()
2568
2569   def BuildHooksEnv(self):
2570     """Build hooks env.
2571
2572     This runs on master, primary and secondary nodes of the instance.
2573
2574     """
2575     env = {
2576       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2577       }
2578     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2579     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2580           list(self.instance.secondary_nodes))
2581     return env, nl, nl
2582
2583   def CheckPrereq(self):
2584     """Check prerequisites.
2585
2586     This checks that the instance is in the cluster.
2587
2588     """
2589     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2590     assert self.instance is not None, \
2591       "Cannot retrieve locked instance %s" % self.op.instance_name
2592
2593     # check bridges existance
2594     _CheckInstanceBridgesExist(self, instance)
2595
2596   def Exec(self, feedback_fn):
2597     """Reboot the instance.
2598
2599     """
2600     instance = self.instance
2601     ignore_secondaries = self.op.ignore_secondaries
2602     reboot_type = self.op.reboot_type
2603     extra_args = getattr(self.op, "extra_args", "")
2604
2605     node_current = instance.primary_node
2606
2607     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2608                        constants.INSTANCE_REBOOT_HARD]:
2609       result = self.rpc.call_instance_reboot(node_current, instance,
2610                                              reboot_type, extra_args)
2611       if result.failed or not result.data:
2612         raise errors.OpExecError("Could not reboot instance")
2613     else:
2614       if not self.rpc.call_instance_shutdown(node_current, instance):
2615         raise errors.OpExecError("could not shutdown instance for full reboot")
2616       _ShutdownInstanceDisks(self, instance)
2617       _StartInstanceDisks(self, instance, ignore_secondaries)
2618       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2619       if result.failed or not result.data:
2620         _ShutdownInstanceDisks(self, instance)
2621         raise errors.OpExecError("Could not start instance for full reboot")
2622
2623     self.cfg.MarkInstanceUp(instance.name)
2624
2625
2626 class LUShutdownInstance(LogicalUnit):
2627   """Shutdown an instance.
2628
2629   """
2630   HPATH = "instance-stop"
2631   HTYPE = constants.HTYPE_INSTANCE
2632   _OP_REQP = ["instance_name"]
2633   REQ_BGL = False
2634
2635   def ExpandNames(self):
2636     self._ExpandAndLockInstance()
2637
2638   def BuildHooksEnv(self):
2639     """Build hooks env.
2640
2641     This runs on master, primary and secondary nodes of the instance.
2642
2643     """
2644     env = _BuildInstanceHookEnvByObject(self, self.instance)
2645     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2646           list(self.instance.secondary_nodes))
2647     return env, nl, nl
2648
2649   def CheckPrereq(self):
2650     """Check prerequisites.
2651
2652     This checks that the instance is in the cluster.
2653
2654     """
2655     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2656     assert self.instance is not None, \
2657       "Cannot retrieve locked instance %s" % self.op.instance_name
2658
2659   def Exec(self, feedback_fn):
2660     """Shutdown the instance.
2661
2662     """
2663     instance = self.instance
2664     node_current = instance.primary_node
2665     self.cfg.MarkInstanceDown(instance.name)
2666     result = self.rpc.call_instance_shutdown(node_current, instance)
2667     if result.failed or not result.data:
2668       self.proc.LogWarning("Could not shutdown instance")
2669
2670     _ShutdownInstanceDisks(self, instance)
2671
2672
2673 class LUReinstallInstance(LogicalUnit):
2674   """Reinstall an instance.
2675
2676   """
2677   HPATH = "instance-reinstall"
2678   HTYPE = constants.HTYPE_INSTANCE
2679   _OP_REQP = ["instance_name"]
2680   REQ_BGL = False
2681
2682   def ExpandNames(self):
2683     self._ExpandAndLockInstance()
2684
2685   def BuildHooksEnv(self):
2686     """Build hooks env.
2687
2688     This runs on master, primary and secondary nodes of the instance.
2689
2690     """
2691     env = _BuildInstanceHookEnvByObject(self, self.instance)
2692     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2693           list(self.instance.secondary_nodes))
2694     return env, nl, nl
2695
2696   def CheckPrereq(self):
2697     """Check prerequisites.
2698
2699     This checks that the instance is in the cluster and is not running.
2700
2701     """
2702     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2703     assert instance is not None, \
2704       "Cannot retrieve locked instance %s" % self.op.instance_name
2705
2706     if instance.disk_template == constants.DT_DISKLESS:
2707       raise errors.OpPrereqError("Instance '%s' has no disks" %
2708                                  self.op.instance_name)
2709     if instance.status != "down":
2710       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2711                                  self.op.instance_name)
2712     remote_info = self.rpc.call_instance_info(instance.primary_node,
2713                                               instance.name,
2714                                               instance.hypervisor)
2715     if remote_info.failed or remote_info.data:
2716       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2717                                  (self.op.instance_name,
2718                                   instance.primary_node))
2719
2720     self.op.os_type = getattr(self.op, "os_type", None)
2721     if self.op.os_type is not None:
2722       # OS verification
2723       pnode = self.cfg.GetNodeInfo(
2724         self.cfg.ExpandNodeName(instance.primary_node))
2725       if pnode is None:
2726         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2727                                    self.op.pnode)
2728       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2729       result.Raise()
2730       if not isinstance(result.data, objects.OS):
2731         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2732                                    " primary node"  % self.op.os_type)
2733
2734     self.instance = instance
2735
2736   def Exec(self, feedback_fn):
2737     """Reinstall the instance.
2738
2739     """
2740     inst = self.instance
2741
2742     if self.op.os_type is not None:
2743       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2744       inst.os = self.op.os_type
2745       self.cfg.Update(inst)
2746
2747     _StartInstanceDisks(self, inst, None)
2748     try:
2749       feedback_fn("Running the instance OS create scripts...")
2750       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2751       result.Raise()
2752       if not result.data:
2753         raise errors.OpExecError("Could not install OS for instance %s"
2754                                  " on node %s" %
2755                                  (inst.name, inst.primary_node))
2756     finally:
2757       _ShutdownInstanceDisks(self, inst)
2758
2759
2760 class LURenameInstance(LogicalUnit):
2761   """Rename an instance.
2762
2763   """
2764   HPATH = "instance-rename"
2765   HTYPE = constants.HTYPE_INSTANCE
2766   _OP_REQP = ["instance_name", "new_name"]
2767
2768   def BuildHooksEnv(self):
2769     """Build hooks env.
2770
2771     This runs on master, primary and secondary nodes of the instance.
2772
2773     """
2774     env = _BuildInstanceHookEnvByObject(self, self.instance)
2775     env["INSTANCE_NEW_NAME"] = self.op.new_name
2776     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2777           list(self.instance.secondary_nodes))
2778     return env, nl, nl
2779
2780   def CheckPrereq(self):
2781     """Check prerequisites.
2782
2783     This checks that the instance is in the cluster and is not running.
2784
2785     """
2786     instance = self.cfg.GetInstanceInfo(
2787       self.cfg.ExpandInstanceName(self.op.instance_name))
2788     if instance is None:
2789       raise errors.OpPrereqError("Instance '%s' not known" %
2790                                  self.op.instance_name)
2791     if instance.status != "down":
2792       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2793                                  self.op.instance_name)
2794     remote_info = self.rpc.call_instance_info(instance.primary_node,
2795                                               instance.name,
2796                                               instance.hypervisor)
2797     remote_info.Raise()
2798     if remote_info.data:
2799       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2800                                  (self.op.instance_name,
2801                                   instance.primary_node))
2802     self.instance = instance
2803
2804     # new name verification
2805     name_info = utils.HostInfo(self.op.new_name)
2806
2807     self.op.new_name = new_name = name_info.name
2808     instance_list = self.cfg.GetInstanceList()
2809     if new_name in instance_list:
2810       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2811                                  new_name)
2812
2813     if not getattr(self.op, "ignore_ip", False):
2814       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2815         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2816                                    (name_info.ip, new_name))
2817
2818
2819   def Exec(self, feedback_fn):
2820     """Reinstall the instance.
2821
2822     """
2823     inst = self.instance
2824     old_name = inst.name
2825
2826     if inst.disk_template == constants.DT_FILE:
2827       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2828
2829     self.cfg.RenameInstance(inst.name, self.op.new_name)
2830     # Change the instance lock. This is definitely safe while we hold the BGL
2831     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2832     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2833
2834     # re-read the instance from the configuration after rename
2835     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2836
2837     if inst.disk_template == constants.DT_FILE:
2838       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2839       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2840                                                      old_file_storage_dir,
2841                                                      new_file_storage_dir)
2842       result.Raise()
2843       if not result.data:
2844         raise errors.OpExecError("Could not connect to node '%s' to rename"
2845                                  " directory '%s' to '%s' (but the instance"
2846                                  " has been renamed in Ganeti)" % (
2847                                  inst.primary_node, old_file_storage_dir,
2848                                  new_file_storage_dir))
2849
2850       if not result.data[0]:
2851         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2852                                  " (but the instance has been renamed in"
2853                                  " Ganeti)" % (old_file_storage_dir,
2854                                                new_file_storage_dir))
2855
2856     _StartInstanceDisks(self, inst, None)
2857     try:
2858       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2859                                                  old_name)
2860       if result.failed or not result.data:
2861         msg = ("Could not run OS rename script for instance %s on node %s"
2862                " (but the instance has been renamed in Ganeti)" %
2863                (inst.name, inst.primary_node))
2864         self.proc.LogWarning(msg)
2865     finally:
2866       _ShutdownInstanceDisks(self, inst)
2867
2868
2869 class LURemoveInstance(LogicalUnit):
2870   """Remove an instance.
2871
2872   """
2873   HPATH = "instance-remove"
2874   HTYPE = constants.HTYPE_INSTANCE
2875   _OP_REQP = ["instance_name", "ignore_failures"]
2876   REQ_BGL = False
2877
2878   def ExpandNames(self):
2879     self._ExpandAndLockInstance()
2880     self.needed_locks[locking.LEVEL_NODE] = []
2881     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2882
2883   def DeclareLocks(self, level):
2884     if level == locking.LEVEL_NODE:
2885       self._LockInstancesNodes()
2886
2887   def BuildHooksEnv(self):
2888     """Build hooks env.
2889
2890     This runs on master, primary and secondary nodes of the instance.
2891
2892     """
2893     env = _BuildInstanceHookEnvByObject(self, self.instance)
2894     nl = [self.cfg.GetMasterNode()]
2895     return env, nl, nl
2896
2897   def CheckPrereq(self):
2898     """Check prerequisites.
2899
2900     This checks that the instance is in the cluster.
2901
2902     """
2903     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2904     assert self.instance is not None, \
2905       "Cannot retrieve locked instance %s" % self.op.instance_name
2906
2907   def Exec(self, feedback_fn):
2908     """Remove the instance.
2909
2910     """
2911     instance = self.instance
2912     logging.info("Shutting down instance %s on node %s",
2913                  instance.name, instance.primary_node)
2914
2915     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2916     if result.failed or not result.data:
2917       if self.op.ignore_failures:
2918         feedback_fn("Warning: can't shutdown instance")
2919       else:
2920         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2921                                  (instance.name, instance.primary_node))
2922
2923     logging.info("Removing block devices for instance %s", instance.name)
2924
2925     if not _RemoveDisks(self, instance):
2926       if self.op.ignore_failures:
2927         feedback_fn("Warning: can't remove instance's disks")
2928       else:
2929         raise errors.OpExecError("Can't remove instance's disks")
2930
2931     logging.info("Removing instance %s out of cluster config", instance.name)
2932
2933     self.cfg.RemoveInstance(instance.name)
2934     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2935
2936
2937 class LUQueryInstances(NoHooksLU):
2938   """Logical unit for querying instances.
2939
2940   """
2941   _OP_REQP = ["output_fields", "names"]
2942   REQ_BGL = False
2943   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2944                                     "admin_state", "admin_ram",
2945                                     "disk_template", "ip", "mac", "bridge",
2946                                     "sda_size", "sdb_size", "vcpus", "tags",
2947                                     "network_port", "beparams",
2948                                     "(disk).(size)/([0-9]+)",
2949                                     "(disk).(sizes)",
2950                                     "(nic).(mac|ip|bridge)/([0-9]+)",
2951                                     "(nic).(macs|ips|bridges)",
2952                                     "(disk|nic).(count)",
2953                                     "serial_no", "hypervisor", "hvparams",] +
2954                                   ["hv/%s" % name
2955                                    for name in constants.HVS_PARAMETERS] +
2956                                   ["be/%s" % name
2957                                    for name in constants.BES_PARAMETERS])
2958   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2959
2960
2961   def ExpandNames(self):
2962     _CheckOutputFields(static=self._FIELDS_STATIC,
2963                        dynamic=self._FIELDS_DYNAMIC,
2964                        selected=self.op.output_fields)
2965
2966     self.needed_locks = {}
2967     self.share_locks[locking.LEVEL_INSTANCE] = 1
2968     self.share_locks[locking.LEVEL_NODE] = 1
2969
2970     if self.op.names:
2971       self.wanted = _GetWantedInstances(self, self.op.names)
2972     else:
2973       self.wanted = locking.ALL_SET
2974
2975     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2976     if self.do_locking:
2977       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2978       self.needed_locks[locking.LEVEL_NODE] = []
2979       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2980
2981   def DeclareLocks(self, level):
2982     if level == locking.LEVEL_NODE and self.do_locking:
2983       self._LockInstancesNodes()
2984
2985   def CheckPrereq(self):
2986     """Check prerequisites.
2987
2988     """
2989     pass
2990
2991   def Exec(self, feedback_fn):
2992     """Computes the list of nodes and their attributes.
2993
2994     """
2995     all_info = self.cfg.GetAllInstancesInfo()
2996     if self.do_locking:
2997       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2998     elif self.wanted != locking.ALL_SET:
2999       instance_names = self.wanted
3000       missing = set(instance_names).difference(all_info.keys())
3001       if missing:
3002         raise errors.OpExecError(
3003           "Some instances were removed before retrieving their data: %s"
3004           % missing)
3005     else:
3006       instance_names = all_info.keys()
3007
3008     instance_names = utils.NiceSort(instance_names)
3009     instance_list = [all_info[iname] for iname in instance_names]
3010
3011     # begin data gathering
3012
3013     nodes = frozenset([inst.primary_node for inst in instance_list])
3014     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3015
3016     bad_nodes = []
3017     off_nodes = []
3018     if self.do_locking:
3019       live_data = {}
3020       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3021       for name in nodes:
3022         result = node_data[name]
3023         if result.offline:
3024           # offline nodes will be in both lists
3025           off_nodes.append(name)
3026         if result.failed:
3027           bad_nodes.append(name)
3028         else:
3029           if result.data:
3030             live_data.update(result.data)
3031             # else no instance is alive
3032     else:
3033       live_data = dict([(name, {}) for name in instance_names])
3034
3035     # end data gathering
3036
3037     HVPREFIX = "hv/"
3038     BEPREFIX = "be/"
3039     output = []
3040     for instance in instance_list:
3041       iout = []
3042       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3043       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3044       for field in self.op.output_fields:
3045         st_match = self._FIELDS_STATIC.Matches(field)
3046         if field == "name":
3047           val = instance.name
3048         elif field == "os":
3049           val = instance.os
3050         elif field == "pnode":
3051           val = instance.primary_node
3052         elif field == "snodes":
3053           val = list(instance.secondary_nodes)
3054         elif field == "admin_state":
3055           val = (instance.status != "down")
3056         elif field == "oper_state":
3057           if instance.primary_node in bad_nodes:
3058             val = None
3059           else:
3060             val = bool(live_data.get(instance.name))
3061         elif field == "status":
3062           if instance.primary_node in off_nodes:
3063             val = "ERROR_nodeoffline"
3064           elif instance.primary_node in bad_nodes:
3065             val = "ERROR_nodedown"
3066           else:
3067             running = bool(live_data.get(instance.name))
3068             if running:
3069               if instance.status != "down":
3070                 val = "running"
3071               else:
3072                 val = "ERROR_up"
3073             else:
3074               if instance.status != "down":
3075                 val = "ERROR_down"
3076               else:
3077                 val = "ADMIN_down"
3078         elif field == "oper_ram":
3079           if instance.primary_node in bad_nodes:
3080             val = None
3081           elif instance.name in live_data:
3082             val = live_data[instance.name].get("memory", "?")
3083           else:
3084             val = "-"
3085         elif field == "disk_template":
3086           val = instance.disk_template
3087         elif field == "ip":
3088           val = instance.nics[0].ip
3089         elif field == "bridge":
3090           val = instance.nics[0].bridge
3091         elif field == "mac":
3092           val = instance.nics[0].mac
3093         elif field == "sda_size" or field == "sdb_size":
3094           idx = ord(field[2]) - ord('a')
3095           try:
3096             val = instance.FindDisk(idx).size
3097           except errors.OpPrereqError:
3098             val = None
3099         elif field == "tags":
3100           val = list(instance.GetTags())
3101         elif field == "serial_no":
3102           val = instance.serial_no
3103         elif field == "network_port":
3104           val = instance.network_port
3105         elif field == "hypervisor":
3106           val = instance.hypervisor
3107         elif field == "hvparams":
3108           val = i_hv
3109         elif (field.startswith(HVPREFIX) and
3110               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3111           val = i_hv.get(field[len(HVPREFIX):], None)
3112         elif field == "beparams":
3113           val = i_be
3114         elif (field.startswith(BEPREFIX) and
3115               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3116           val = i_be.get(field[len(BEPREFIX):], None)
3117         elif st_match and st_match.groups():
3118           # matches a variable list
3119           st_groups = st_match.groups()
3120           if st_groups and st_groups[0] == "disk":
3121             if st_groups[1] == "count":
3122               val = len(instance.disks)
3123             elif st_groups[1] == "sizes":
3124               val = [disk.size for disk in instance.disks]
3125             elif st_groups[1] == "size":
3126               try:
3127                 val = instance.FindDisk(st_groups[2]).size
3128               except errors.OpPrereqError:
3129                 val = None
3130             else:
3131               assert False, "Unhandled disk parameter"
3132           elif st_groups[0] == "nic":
3133             if st_groups[1] == "count":
3134               val = len(instance.nics)
3135             elif st_groups[1] == "macs":
3136               val = [nic.mac for nic in instance.nics]
3137             elif st_groups[1] == "ips":
3138               val = [nic.ip for nic in instance.nics]
3139             elif st_groups[1] == "bridges":
3140               val = [nic.bridge for nic in instance.nics]
3141             else:
3142               # index-based item
3143               nic_idx = int(st_groups[2])
3144               if nic_idx >= len(instance.nics):
3145                 val = None
3146               else:
3147                 if st_groups[1] == "mac":
3148                   val = instance.nics[nic_idx].mac
3149                 elif st_groups[1] == "ip":
3150                   val = instance.nics[nic_idx].ip
3151                 elif st_groups[1] == "bridge":
3152                   val = instance.nics[nic_idx].bridge
3153                 else:
3154                   assert False, "Unhandled NIC parameter"
3155           else:
3156             assert False, "Unhandled variable parameter"
3157         else:
3158           raise errors.ParameterError(field)
3159         iout.append(val)
3160       output.append(iout)
3161
3162     return output
3163
3164
3165 class LUFailoverInstance(LogicalUnit):
3166   """Failover an instance.
3167
3168   """
3169   HPATH = "instance-failover"
3170   HTYPE = constants.HTYPE_INSTANCE
3171   _OP_REQP = ["instance_name", "ignore_consistency"]
3172   REQ_BGL = False
3173
3174   def ExpandNames(self):
3175     self._ExpandAndLockInstance()
3176     self.needed_locks[locking.LEVEL_NODE] = []
3177     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3178
3179   def DeclareLocks(self, level):
3180     if level == locking.LEVEL_NODE:
3181       self._LockInstancesNodes()
3182
3183   def BuildHooksEnv(self):
3184     """Build hooks env.
3185
3186     This runs on master, primary and secondary nodes of the instance.
3187
3188     """
3189     env = {
3190       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3191       }
3192     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3193     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3194     return env, nl, nl
3195
3196   def CheckPrereq(self):
3197     """Check prerequisites.
3198
3199     This checks that the instance is in the cluster.
3200
3201     """
3202     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3203     assert self.instance is not None, \
3204       "Cannot retrieve locked instance %s" % self.op.instance_name
3205
3206     bep = self.cfg.GetClusterInfo().FillBE(instance)
3207     if instance.disk_template not in constants.DTS_NET_MIRROR:
3208       raise errors.OpPrereqError("Instance's disk layout is not"
3209                                  " network mirrored, cannot failover.")
3210
3211     secondary_nodes = instance.secondary_nodes
3212     if not secondary_nodes:
3213       raise errors.ProgrammerError("no secondary node but using "
3214                                    "a mirrored disk template")
3215
3216     target_node = secondary_nodes[0]
3217     # check memory requirements on the secondary node
3218     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3219                          instance.name, bep[constants.BE_MEMORY],
3220                          instance.hypervisor)
3221
3222     # check bridge existance
3223     brlist = [nic.bridge for nic in instance.nics]
3224     result = self.rpc.call_bridges_exist(target_node, brlist)
3225     result.Raise()
3226     if not result.data:
3227       raise errors.OpPrereqError("One or more target bridges %s does not"
3228                                  " exist on destination node '%s'" %
3229                                  (brlist, target_node))
3230
3231   def Exec(self, feedback_fn):
3232     """Failover an instance.
3233
3234     The failover is done by shutting it down on its present node and
3235     starting it on the secondary.
3236
3237     """
3238     instance = self.instance
3239
3240     source_node = instance.primary_node
3241     target_node = instance.secondary_nodes[0]
3242
3243     feedback_fn("* checking disk consistency between source and target")
3244     for dev in instance.disks:
3245       # for drbd, these are drbd over lvm
3246       if not _CheckDiskConsistency(self, dev, target_node, False):
3247         if instance.status == "up" and not self.op.ignore_consistency:
3248           raise errors.OpExecError("Disk %s is degraded on target node,"
3249                                    " aborting failover." % dev.iv_name)
3250
3251     feedback_fn("* shutting down instance on source node")
3252     logging.info("Shutting down instance %s on node %s",
3253                  instance.name, source_node)
3254
3255     result = self.rpc.call_instance_shutdown(source_node, instance)
3256     if result.failed or not result.data:
3257       if self.op.ignore_consistency:
3258         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3259                              " Proceeding"
3260                              " anyway. Please make sure node %s is down",
3261                              instance.name, source_node, source_node)
3262       else:
3263         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3264                                  (instance.name, source_node))
3265
3266     feedback_fn("* deactivating the instance's disks on source node")
3267     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3268       raise errors.OpExecError("Can't shut down the instance's disks.")
3269
3270     instance.primary_node = target_node
3271     # distribute new instance config to the other nodes
3272     self.cfg.Update(instance)
3273
3274     # Only start the instance if it's marked as up
3275     if instance.status == "up":
3276       feedback_fn("* activating the instance's disks on target node")
3277       logging.info("Starting instance %s on node %s",
3278                    instance.name, target_node)
3279
3280       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3281                                                ignore_secondaries=True)
3282       if not disks_ok:
3283         _ShutdownInstanceDisks(self, instance)
3284         raise errors.OpExecError("Can't activate the instance's disks")
3285
3286       feedback_fn("* starting the instance on the target node")
3287       result = self.rpc.call_instance_start(target_node, instance, None)
3288       if result.failed or not result.data:
3289         _ShutdownInstanceDisks(self, instance)
3290         raise errors.OpExecError("Could not start instance %s on node %s." %
3291                                  (instance.name, target_node))
3292
3293
3294 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3295   """Create a tree of block devices on the primary node.
3296
3297   This always creates all devices.
3298
3299   """
3300   if device.children:
3301     for child in device.children:
3302       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3303         return False
3304
3305   lu.cfg.SetDiskID(device, node)
3306   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3307                                        instance.name, True, info)
3308   if new_id.failed or not new_id.data:
3309     return False
3310   if device.physical_id is None:
3311     device.physical_id = new_id
3312   return True
3313
3314
3315 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3316   """Create a tree of block devices on a secondary node.
3317
3318   If this device type has to be created on secondaries, create it and
3319   all its children.
3320
3321   If not, just recurse to children keeping the same 'force' value.
3322
3323   """
3324   if device.CreateOnSecondary():
3325     force = True
3326   if device.children:
3327     for child in device.children:
3328       if not _CreateBlockDevOnSecondary(lu, node, instance,
3329                                         child, force, info):
3330         return False
3331
3332   if not force:
3333     return True
3334   lu.cfg.SetDiskID(device, node)
3335   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3336                                        instance.name, False, info)
3337   if new_id.failed or not new_id.data:
3338     return False
3339   if device.physical_id is None:
3340     device.physical_id = new_id
3341   return True
3342
3343
3344 def _GenerateUniqueNames(lu, exts):
3345   """Generate a suitable LV name.
3346
3347   This will generate a logical volume name for the given instance.
3348
3349   """
3350   results = []
3351   for val in exts:
3352     new_id = lu.cfg.GenerateUniqueID()
3353     results.append("%s%s" % (new_id, val))
3354   return results
3355
3356
3357 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3358                          p_minor, s_minor):
3359   """Generate a drbd8 device complete with its children.
3360
3361   """
3362   port = lu.cfg.AllocatePort()
3363   vgname = lu.cfg.GetVGName()
3364   shared_secret = lu.cfg.GenerateDRBDSecret()
3365   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3366                           logical_id=(vgname, names[0]))
3367   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3368                           logical_id=(vgname, names[1]))
3369   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3370                           logical_id=(primary, secondary, port,
3371                                       p_minor, s_minor,
3372                                       shared_secret),
3373                           children=[dev_data, dev_meta],
3374                           iv_name=iv_name)
3375   return drbd_dev
3376
3377
3378 def _GenerateDiskTemplate(lu, template_name,
3379                           instance_name, primary_node,
3380                           secondary_nodes, disk_info,
3381                           file_storage_dir, file_driver,
3382                           base_index):
3383   """Generate the entire disk layout for a given template type.
3384
3385   """
3386   #TODO: compute space requirements
3387
3388   vgname = lu.cfg.GetVGName()
3389   disk_count = len(disk_info)
3390   disks = []
3391   if template_name == constants.DT_DISKLESS:
3392     pass
3393   elif template_name == constants.DT_PLAIN:
3394     if len(secondary_nodes) != 0:
3395       raise errors.ProgrammerError("Wrong template configuration")
3396
3397     names = _GenerateUniqueNames(lu, [".disk%d" % i
3398                                       for i in range(disk_count)])
3399     for idx, disk in enumerate(disk_info):
3400       disk_index = idx + base_index
3401       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3402                               logical_id=(vgname, names[idx]),
3403                               iv_name="disk/%d" % disk_index)
3404       disks.append(disk_dev)
3405   elif template_name == constants.DT_DRBD8:
3406     if len(secondary_nodes) != 1:
3407       raise errors.ProgrammerError("Wrong template configuration")
3408     remote_node = secondary_nodes[0]
3409     minors = lu.cfg.AllocateDRBDMinor(
3410       [primary_node, remote_node] * len(disk_info), instance_name)
3411
3412     names = _GenerateUniqueNames(lu,
3413                                  [".disk%d_%s" % (i, s)
3414                                   for i in range(disk_count)
3415                                   for s in ("data", "meta")
3416                                   ])
3417     for idx, disk in enumerate(disk_info):
3418       disk_index = idx + base_index
3419       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3420                                       disk["size"], names[idx*2:idx*2+2],
3421                                       "disk/%d" % disk_index,
3422                                       minors[idx*2], minors[idx*2+1])
3423       disks.append(disk_dev)
3424   elif template_name == constants.DT_FILE:
3425     if len(secondary_nodes) != 0:
3426       raise errors.ProgrammerError("Wrong template configuration")
3427
3428     for idx, disk in enumerate(disk_info):
3429       disk_index = idx + base_index
3430       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3431                               iv_name="disk/%d" % disk_index,
3432                               logical_id=(file_driver,
3433                                           "%s/disk%d" % (file_storage_dir,
3434                                                          idx)))
3435       disks.append(disk_dev)
3436   else:
3437     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3438   return disks
3439
3440
3441 def _GetInstanceInfoText(instance):
3442   """Compute that text that should be added to the disk's metadata.
3443
3444   """
3445   return "originstname+%s" % instance.name
3446
3447
3448 def _CreateDisks(lu, instance):
3449   """Create all disks for an instance.
3450
3451   This abstracts away some work from AddInstance.
3452
3453   @type lu: L{LogicalUnit}
3454   @param lu: the logical unit on whose behalf we execute
3455   @type instance: L{objects.Instance}
3456   @param instance: the instance whose disks we should create
3457   @rtype: boolean
3458   @return: the success of the creation
3459
3460   """
3461   info = _GetInstanceInfoText(instance)
3462
3463   if instance.disk_template == constants.DT_FILE:
3464     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3465     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3466                                                  file_storage_dir)
3467
3468     if result.failed or not result.data:
3469       logging.error("Could not connect to node '%s'", instance.primary_node)
3470       return False
3471
3472     if not result.data[0]:
3473       logging.error("Failed to create directory '%s'", file_storage_dir)
3474       return False
3475
3476   # Note: this needs to be kept in sync with adding of disks in
3477   # LUSetInstanceParams
3478   for device in instance.disks:
3479     logging.info("Creating volume %s for instance %s",
3480                  device.iv_name, instance.name)
3481     #HARDCODE
3482     for secondary_node in instance.secondary_nodes:
3483       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3484                                         device, False, info):
3485         logging.error("Failed to create volume %s (%s) on secondary node %s!",
3486                       device.iv_name, device, secondary_node)
3487         return False
3488     #HARDCODE
3489     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3490                                     instance, device, info):
3491       logging.error("Failed to create volume %s on primary!", device.iv_name)
3492       return False
3493
3494   return True
3495
3496
3497 def _RemoveDisks(lu, instance):
3498   """Remove all disks for an instance.
3499
3500   This abstracts away some work from `AddInstance()` and
3501   `RemoveInstance()`. Note that in case some of the devices couldn't
3502   be removed, the removal will continue with the other ones (compare
3503   with `_CreateDisks()`).
3504
3505   @type lu: L{LogicalUnit}
3506   @param lu: the logical unit on whose behalf we execute
3507   @type instance: L{objects.Instance}
3508   @param instance: the instance whose disks we should remove
3509   @rtype: boolean
3510   @return: the success of the removal
3511
3512   """
3513   logging.info("Removing block devices for instance %s", instance.name)
3514
3515   result = True
3516   for device in instance.disks:
3517     for node, disk in device.ComputeNodeTree(instance.primary_node):
3518       lu.cfg.SetDiskID(disk, node)
3519       result = lu.rpc.call_blockdev_remove(node, disk)
3520       if result.failed or not result.data:
3521         lu.proc.LogWarning("Could not remove block device %s on node %s,"
3522                            " continuing anyway", device.iv_name, node)
3523         result = False
3524
3525   if instance.disk_template == constants.DT_FILE:
3526     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3527     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3528                                                  file_storage_dir)
3529     if result.failed or not result.data:
3530       logging.error("Could not remove directory '%s'", file_storage_dir)
3531       result = False
3532
3533   return result
3534
3535
3536 def _ComputeDiskSize(disk_template, disks):
3537   """Compute disk size requirements in the volume group
3538
3539   """
3540   # Required free disk space as a function of disk and swap space
3541   req_size_dict = {
3542     constants.DT_DISKLESS: None,
3543     constants.DT_PLAIN: sum(d["size"] for d in disks),
3544     # 128 MB are added for drbd metadata for each disk
3545     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3546     constants.DT_FILE: None,
3547   }
3548
3549   if disk_template not in req_size_dict:
3550     raise errors.ProgrammerError("Disk template '%s' size requirement"
3551                                  " is unknown" %  disk_template)
3552
3553   return req_size_dict[disk_template]
3554
3555
3556 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3557   """Hypervisor parameter validation.
3558
3559   This function abstract the hypervisor parameter validation to be
3560   used in both instance create and instance modify.
3561
3562   @type lu: L{LogicalUnit}
3563   @param lu: the logical unit for which we check
3564   @type nodenames: list
3565   @param nodenames: the list of nodes on which we should check
3566   @type hvname: string
3567   @param hvname: the name of the hypervisor we should use
3568   @type hvparams: dict
3569   @param hvparams: the parameters which we need to check
3570   @raise errors.OpPrereqError: if the parameters are not valid
3571
3572   """
3573   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3574                                                   hvname,
3575                                                   hvparams)
3576   for node in nodenames:
3577     info = hvinfo[node]
3578     info.Raise()
3579     if not info.data or not isinstance(info.data, (tuple, list)):
3580       raise errors.OpPrereqError("Cannot get current information"
3581                                  " from node '%s' (%s)" % (node, info.data))
3582     if not info.data[0]:
3583       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3584                                  " %s" % info.data[1])
3585
3586
3587 class LUCreateInstance(LogicalUnit):
3588   """Create an instance.
3589
3590   """
3591   HPATH = "instance-add"
3592   HTYPE = constants.HTYPE_INSTANCE
3593   _OP_REQP = ["instance_name", "disks", "disk_template",
3594               "mode", "start",
3595               "wait_for_sync", "ip_check", "nics",
3596               "hvparams", "beparams"]
3597   REQ_BGL = False
3598
3599   def _ExpandNode(self, node):
3600     """Expands and checks one node name.
3601
3602     """
3603     node_full = self.cfg.ExpandNodeName(node)
3604     if node_full is None:
3605       raise errors.OpPrereqError("Unknown node %s" % node)
3606     return node_full
3607
3608   def ExpandNames(self):
3609     """ExpandNames for CreateInstance.
3610
3611     Figure out the right locks for instance creation.
3612
3613     """
3614     self.needed_locks = {}
3615
3616     # set optional parameters to none if they don't exist
3617     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3618       if not hasattr(self.op, attr):
3619         setattr(self.op, attr, None)
3620
3621     # cheap checks, mostly valid constants given
3622
3623     # verify creation mode
3624     if self.op.mode not in (constants.INSTANCE_CREATE,
3625                             constants.INSTANCE_IMPORT):
3626       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3627                                  self.op.mode)
3628
3629     # disk template and mirror node verification
3630     if self.op.disk_template not in constants.DISK_TEMPLATES:
3631       raise errors.OpPrereqError("Invalid disk template name")
3632
3633     if self.op.hypervisor is None:
3634       self.op.hypervisor = self.cfg.GetHypervisorType()
3635
3636     cluster = self.cfg.GetClusterInfo()
3637     enabled_hvs = cluster.enabled_hypervisors
3638     if self.op.hypervisor not in enabled_hvs:
3639       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3640                                  " cluster (%s)" % (self.op.hypervisor,
3641                                   ",".join(enabled_hvs)))
3642
3643     # check hypervisor parameter syntax (locally)
3644
3645     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3646                                   self.op.hvparams)
3647     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3648     hv_type.CheckParameterSyntax(filled_hvp)
3649
3650     # fill and remember the beparams dict
3651     utils.CheckBEParams(self.op.beparams)
3652     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3653                                     self.op.beparams)
3654
3655     #### instance parameters check
3656
3657     # instance name verification
3658     hostname1 = utils.HostInfo(self.op.instance_name)
3659     self.op.instance_name = instance_name = hostname1.name
3660
3661     # this is just a preventive check, but someone might still add this
3662     # instance in the meantime, and creation will fail at lock-add time
3663     if instance_name in self.cfg.GetInstanceList():
3664       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3665                                  instance_name)
3666
3667     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3668
3669     # NIC buildup
3670     self.nics = []
3671     for nic in self.op.nics:
3672       # ip validity checks
3673       ip = nic.get("ip", None)
3674       if ip is None or ip.lower() == "none":
3675         nic_ip = None
3676       elif ip.lower() == constants.VALUE_AUTO:
3677         nic_ip = hostname1.ip
3678       else:
3679         if not utils.IsValidIP(ip):
3680           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3681                                      " like a valid IP" % ip)
3682         nic_ip = ip
3683
3684       # MAC address verification
3685       mac = nic.get("mac", constants.VALUE_AUTO)
3686       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3687         if not utils.IsValidMac(mac.lower()):
3688           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3689                                      mac)
3690       # bridge verification
3691       bridge = nic.get("bridge", self.cfg.GetDefBridge())
3692       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3693
3694     # disk checks/pre-build
3695     self.disks = []
3696     for disk in self.op.disks:
3697       mode = disk.get("mode", constants.DISK_RDWR)
3698       if mode not in constants.DISK_ACCESS_SET:
3699         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3700                                    mode)
3701       size = disk.get("size", None)
3702       if size is None:
3703         raise errors.OpPrereqError("Missing disk size")
3704       try:
3705         size = int(size)
3706       except ValueError:
3707         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3708       self.disks.append({"size": size, "mode": mode})
3709
3710     # used in CheckPrereq for ip ping check
3711     self.check_ip = hostname1.ip
3712
3713     # file storage checks
3714     if (self.op.file_driver and
3715         not self.op.file_driver in constants.FILE_DRIVER):
3716       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3717                                  self.op.file_driver)
3718
3719     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3720       raise errors.OpPrereqError("File storage directory path not absolute")
3721
3722     ### Node/iallocator related checks
3723     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3724       raise errors.OpPrereqError("One and only one of iallocator and primary"
3725                                  " node must be given")
3726
3727     if self.op.iallocator:
3728       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3729     else:
3730       self.op.pnode = self._ExpandNode(self.op.pnode)
3731       nodelist = [self.op.pnode]
3732       if self.op.snode is not None:
3733         self.op.snode = self._ExpandNode(self.op.snode)
3734         nodelist.append(self.op.snode)
3735       self.needed_locks[locking.LEVEL_NODE] = nodelist
3736
3737     # in case of import lock the source node too
3738     if self.op.mode == constants.INSTANCE_IMPORT:
3739       src_node = getattr(self.op, "src_node", None)
3740       src_path = getattr(self.op, "src_path", None)
3741
3742       if src_path is None:
3743         self.op.src_path = src_path = self.op.instance_name
3744
3745       if src_node is None:
3746         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3747         self.op.src_node = None
3748         if os.path.isabs(src_path):
3749           raise errors.OpPrereqError("Importing an instance from an absolute"
3750                                      " path requires a source node option.")
3751       else:
3752         self.op.src_node = src_node = self._ExpandNode(src_node)
3753         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3754           self.needed_locks[locking.LEVEL_NODE].append(src_node)
3755         if not os.path.isabs(src_path):
3756           self.op.src_path = src_path = \
3757             os.path.join(constants.EXPORT_DIR, src_path)
3758
3759     else: # INSTANCE_CREATE
3760       if getattr(self.op, "os_type", None) is None:
3761         raise errors.OpPrereqError("No guest OS specified")
3762
3763   def _RunAllocator(self):
3764     """Run the allocator based on input opcode.
3765
3766     """
3767     nics = [n.ToDict() for n in self.nics]
3768     ial = IAllocator(self,
3769                      mode=constants.IALLOCATOR_MODE_ALLOC,
3770                      name=self.op.instance_name,
3771                      disk_template=self.op.disk_template,
3772                      tags=[],
3773                      os=self.op.os_type,
3774                      vcpus=self.be_full[constants.BE_VCPUS],
3775                      mem_size=self.be_full[constants.BE_MEMORY],
3776                      disks=self.disks,
3777                      nics=nics,
3778                      hypervisor=self.op.hypervisor,
3779                      )
3780
3781     ial.Run(self.op.iallocator)
3782
3783     if not ial.success:
3784       raise errors.OpPrereqError("Can't compute nodes using"
3785                                  " iallocator '%s': %s" % (self.op.iallocator,
3786                                                            ial.info))
3787     if len(ial.nodes) != ial.required_nodes:
3788       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3789                                  " of nodes (%s), required %s" %
3790                                  (self.op.iallocator, len(ial.nodes),
3791                                   ial.required_nodes))
3792     self.op.pnode = ial.nodes[0]
3793     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3794                  self.op.instance_name, self.op.iallocator,
3795                  ", ".join(ial.nodes))
3796     if ial.required_nodes == 2:
3797       self.op.snode = ial.nodes[1]
3798
3799   def BuildHooksEnv(self):
3800     """Build hooks env.
3801
3802     This runs on master, primary and secondary nodes of the instance.
3803
3804     """
3805     env = {
3806       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3807       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3808       "INSTANCE_ADD_MODE": self.op.mode,
3809       }
3810     if self.op.mode == constants.INSTANCE_IMPORT:
3811       env["INSTANCE_SRC_NODE"] = self.op.src_node
3812       env["INSTANCE_SRC_PATH"] = self.op.src_path
3813       env["INSTANCE_SRC_IMAGES"] = self.src_images
3814
3815     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3816       primary_node=self.op.pnode,
3817       secondary_nodes=self.secondaries,
3818       status=self.instance_status,
3819       os_type=self.op.os_type,
3820       memory=self.be_full[constants.BE_MEMORY],
3821       vcpus=self.be_full[constants.BE_VCPUS],
3822       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3823     ))
3824
3825     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3826           self.secondaries)
3827     return env, nl, nl
3828
3829
3830   def CheckPrereq(self):
3831     """Check prerequisites.
3832
3833     """
3834     if (not self.cfg.GetVGName() and
3835         self.op.disk_template not in constants.DTS_NOT_LVM):
3836       raise errors.OpPrereqError("Cluster does not support lvm-based"
3837                                  " instances")
3838
3839
3840     if self.op.mode == constants.INSTANCE_IMPORT:
3841       src_node = self.op.src_node
3842       src_path = self.op.src_path
3843
3844       if src_node is None:
3845         exp_list = self.rpc.call_export_list(
3846           self.acquired_locks[locking.LEVEL_NODE])
3847         found = False
3848         for node in exp_list:
3849           if not exp_list[node].failed and src_path in exp_list[node].data:
3850             found = True
3851             self.op.src_node = src_node = node
3852             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3853                                                        src_path)
3854             break
3855         if not found:
3856           raise errors.OpPrereqError("No export found for relative path %s" %
3857                                       src_path)
3858
3859       result = self.rpc.call_export_info(src_node, src_path)
3860       result.Raise()
3861       if not result.data:
3862         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3863
3864       export_info = result.data
3865       if not export_info.has_section(constants.INISECT_EXP):
3866         raise errors.ProgrammerError("Corrupted export config")
3867
3868       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3869       if (int(ei_version) != constants.EXPORT_VERSION):
3870         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3871                                    (ei_version, constants.EXPORT_VERSION))
3872
3873       # Check that the new instance doesn't have less disks than the export
3874       instance_disks = len(self.disks)
3875       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3876       if instance_disks < export_disks:
3877         raise errors.OpPrereqError("Not enough disks to import."
3878                                    " (instance: %d, export: %d)" %
3879                                    (instance_disks, export_disks))
3880
3881       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3882       disk_images = []
3883       for idx in range(export_disks):
3884         option = 'disk%d_dump' % idx
3885         if export_info.has_option(constants.INISECT_INS, option):
3886           # FIXME: are the old os-es, disk sizes, etc. useful?
3887           export_name = export_info.get(constants.INISECT_INS, option)
3888           image = os.path.join(src_path, export_name)
3889           disk_images.append(image)
3890         else:
3891           disk_images.append(False)
3892
3893       self.src_images = disk_images
3894
3895       old_name = export_info.get(constants.INISECT_INS, 'name')
3896       # FIXME: int() here could throw a ValueError on broken exports
3897       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3898       if self.op.instance_name == old_name:
3899         for idx, nic in enumerate(self.nics):
3900           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3901             nic_mac_ini = 'nic%d_mac' % idx
3902             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3903
3904     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3905     if self.op.start and not self.op.ip_check:
3906       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3907                                  " adding an instance in start mode")
3908
3909     if self.op.ip_check:
3910       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3911         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3912                                    (self.check_ip, self.op.instance_name))
3913
3914     #### allocator run
3915
3916     if self.op.iallocator is not None:
3917       self._RunAllocator()
3918
3919     #### node related checks
3920
3921     # check primary node
3922     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3923     assert self.pnode is not None, \
3924       "Cannot retrieve locked node %s" % self.op.pnode
3925     self.secondaries = []
3926
3927     # mirror node verification
3928     if self.op.disk_template in constants.DTS_NET_MIRROR:
3929       if self.op.snode is None:
3930         raise errors.OpPrereqError("The networked disk templates need"
3931                                    " a mirror node")
3932       if self.op.snode == pnode.name:
3933         raise errors.OpPrereqError("The secondary node cannot be"
3934                                    " the primary node.")
3935       self.secondaries.append(self.op.snode)
3936
3937     nodenames = [pnode.name] + self.secondaries
3938
3939     req_size = _ComputeDiskSize(self.op.disk_template,
3940                                 self.disks)
3941
3942     # Check lv size requirements
3943     if req_size is not None:
3944       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3945                                          self.op.hypervisor)
3946       for node in nodenames:
3947         info = nodeinfo[node]
3948         info.Raise()
3949         info = info.data
3950         if not info:
3951           raise errors.OpPrereqError("Cannot get current information"
3952                                      " from node '%s'" % node)
3953         vg_free = info.get('vg_free', None)
3954         if not isinstance(vg_free, int):
3955           raise errors.OpPrereqError("Can't compute free disk space on"
3956                                      " node %s" % node)
3957         if req_size > info['vg_free']:
3958           raise errors.OpPrereqError("Not enough disk space on target node %s."
3959                                      " %d MB available, %d MB required" %
3960                                      (node, info['vg_free'], req_size))
3961
3962     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3963
3964     # os verification
3965     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3966     result.Raise()
3967     if not isinstance(result.data, objects.OS):
3968       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3969                                  " primary node"  % self.op.os_type)
3970
3971     # bridge check on primary node
3972     bridges = [n.bridge for n in self.nics]
3973     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3974     result.Raise()
3975     if not result.data:
3976       raise errors.OpPrereqError("One of the target bridges '%s' does not"
3977                                  " exist on destination node '%s'" %
3978                                  (",".join(bridges), pnode.name))
3979
3980     # memory check on primary node
3981     if self.op.start:
3982       _CheckNodeFreeMemory(self, self.pnode.name,
3983                            "creating instance %s" % self.op.instance_name,
3984                            self.be_full[constants.BE_MEMORY],
3985                            self.op.hypervisor)
3986
3987     if self.op.start:
3988       self.instance_status = 'up'
3989     else:
3990       self.instance_status = 'down'
3991
3992   def Exec(self, feedback_fn):
3993     """Create and add the instance to the cluster.
3994
3995     """
3996     instance = self.op.instance_name
3997     pnode_name = self.pnode.name
3998
3999     for nic in self.nics:
4000       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4001         nic.mac = self.cfg.GenerateMAC()
4002
4003     ht_kind = self.op.hypervisor
4004     if ht_kind in constants.HTS_REQ_PORT:
4005       network_port = self.cfg.AllocatePort()
4006     else:
4007       network_port = None
4008
4009     ##if self.op.vnc_bind_address is None:
4010     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4011
4012     # this is needed because os.path.join does not accept None arguments
4013     if self.op.file_storage_dir is None:
4014       string_file_storage_dir = ""
4015     else:
4016       string_file_storage_dir = self.op.file_storage_dir
4017
4018     # build the full file storage dir path
4019     file_storage_dir = os.path.normpath(os.path.join(
4020                                         self.cfg.GetFileStorageDir(),
4021                                         string_file_storage_dir, instance))
4022
4023
4024     disks = _GenerateDiskTemplate(self,
4025                                   self.op.disk_template,
4026                                   instance, pnode_name,
4027                                   self.secondaries,
4028                                   self.disks,
4029                                   file_storage_dir,
4030                                   self.op.file_driver,
4031                                   0)
4032
4033     iobj = objects.Instance(name=instance, os=self.op.os_type,
4034                             primary_node=pnode_name,
4035                             nics=self.nics, disks=disks,
4036                             disk_template=self.op.disk_template,
4037                             status=self.instance_status,
4038                             network_port=network_port,
4039                             beparams=self.op.beparams,
4040                             hvparams=self.op.hvparams,
4041                             hypervisor=self.op.hypervisor,
4042                             )
4043
4044     feedback_fn("* creating instance disks...")
4045     if not _CreateDisks(self, iobj):
4046       _RemoveDisks(self, iobj)
4047       self.cfg.ReleaseDRBDMinors(instance)
4048       raise errors.OpExecError("Device creation failed, reverting...")
4049
4050     feedback_fn("adding instance %s to cluster config" % instance)
4051
4052     self.cfg.AddInstance(iobj)
4053     # Declare that we don't want to remove the instance lock anymore, as we've
4054     # added the instance to the config
4055     del self.remove_locks[locking.LEVEL_INSTANCE]
4056     # Remove the temp. assignements for the instance's drbds
4057     self.cfg.ReleaseDRBDMinors(instance)
4058     # Unlock all the nodes
4059     if self.op.mode == constants.INSTANCE_IMPORT:
4060       nodes_keep = [self.op.src_node]
4061       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4062                        if node != self.op.src_node]
4063       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4064       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4065     else:
4066       self.context.glm.release(locking.LEVEL_NODE)
4067       del self.acquired_locks[locking.LEVEL_NODE]
4068
4069     if self.op.wait_for_sync:
4070       disk_abort = not _WaitForSync(self, iobj)
4071     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4072       # make sure the disks are not degraded (still sync-ing is ok)
4073       time.sleep(15)
4074       feedback_fn("* checking mirrors status")
4075       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4076     else:
4077       disk_abort = False
4078
4079     if disk_abort:
4080       _RemoveDisks(self, iobj)
4081       self.cfg.RemoveInstance(iobj.name)
4082       # Make sure the instance lock gets removed
4083       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4084       raise errors.OpExecError("There are some degraded disks for"
4085                                " this instance")
4086
4087     feedback_fn("creating os for instance %s on node %s" %
4088                 (instance, pnode_name))
4089
4090     if iobj.disk_template != constants.DT_DISKLESS:
4091       if self.op.mode == constants.INSTANCE_CREATE:
4092         feedback_fn("* running the instance OS create scripts...")
4093         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4094         result.Raise()
4095         if not result.data:
4096           raise errors.OpExecError("Could not add os for instance %s"
4097                                    " on node %s" %
4098                                    (instance, pnode_name))
4099
4100       elif self.op.mode == constants.INSTANCE_IMPORT:
4101         feedback_fn("* running the instance OS import scripts...")
4102         src_node = self.op.src_node
4103         src_images = self.src_images
4104         cluster_name = self.cfg.GetClusterName()
4105         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4106                                                          src_node, src_images,
4107                                                          cluster_name)
4108         import_result.Raise()
4109         for idx, result in enumerate(import_result.data):
4110           if not result:
4111             self.LogWarning("Could not import the image %s for instance"
4112                             " %s, disk %d, on node %s" %
4113                             (src_images[idx], instance, idx, pnode_name))
4114       else:
4115         # also checked in the prereq part
4116         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4117                                      % self.op.mode)
4118
4119     if self.op.start:
4120       logging.info("Starting instance %s on node %s", instance, pnode_name)
4121       feedback_fn("* starting instance...")
4122       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4123       result.Raise()
4124       if not result.data:
4125         raise errors.OpExecError("Could not start instance")
4126
4127
4128 class LUConnectConsole(NoHooksLU):
4129   """Connect to an instance's console.
4130
4131   This is somewhat special in that it returns the command line that
4132   you need to run on the master node in order to connect to the
4133   console.
4134
4135   """
4136   _OP_REQP = ["instance_name"]
4137   REQ_BGL = False
4138
4139   def ExpandNames(self):
4140     self._ExpandAndLockInstance()
4141
4142   def CheckPrereq(self):
4143     """Check prerequisites.
4144
4145     This checks that the instance is in the cluster.
4146
4147     """
4148     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4149     assert self.instance is not None, \
4150       "Cannot retrieve locked instance %s" % self.op.instance_name
4151
4152   def Exec(self, feedback_fn):
4153     """Connect to the console of an instance
4154
4155     """
4156     instance = self.instance
4157     node = instance.primary_node
4158
4159     node_insts = self.rpc.call_instance_list([node],
4160                                              [instance.hypervisor])[node]
4161     node_insts.Raise()
4162
4163     if instance.name not in node_insts.data:
4164       raise errors.OpExecError("Instance %s is not running." % instance.name)
4165
4166     logging.debug("Connecting to console of %s on %s", instance.name, node)
4167
4168     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4169     console_cmd = hyper.GetShellCommandForConsole(instance)
4170
4171     # build ssh cmdline
4172     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4173
4174
4175 class LUReplaceDisks(LogicalUnit):
4176   """Replace the disks of an instance.
4177
4178   """
4179   HPATH = "mirrors-replace"
4180   HTYPE = constants.HTYPE_INSTANCE
4181   _OP_REQP = ["instance_name", "mode", "disks"]
4182   REQ_BGL = False
4183
4184   def ExpandNames(self):
4185     self._ExpandAndLockInstance()
4186
4187     if not hasattr(self.op, "remote_node"):
4188       self.op.remote_node = None
4189
4190     ia_name = getattr(self.op, "iallocator", None)
4191     if ia_name is not None:
4192       if self.op.remote_node is not None:
4193         raise errors.OpPrereqError("Give either the iallocator or the new"
4194                                    " secondary, not both")
4195       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4196     elif self.op.remote_node is not None:
4197       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4198       if remote_node is None:
4199         raise errors.OpPrereqError("Node '%s' not known" %
4200                                    self.op.remote_node)
4201       self.op.remote_node = remote_node
4202       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4203       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4204     else:
4205       self.needed_locks[locking.LEVEL_NODE] = []
4206       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4207
4208   def DeclareLocks(self, level):
4209     # If we're not already locking all nodes in the set we have to declare the
4210     # instance's primary/secondary nodes.
4211     if (level == locking.LEVEL_NODE and
4212         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4213       self._LockInstancesNodes()
4214
4215   def _RunAllocator(self):
4216     """Compute a new secondary node using an IAllocator.
4217
4218     """
4219     ial = IAllocator(self,
4220                      mode=constants.IALLOCATOR_MODE_RELOC,
4221                      name=self.op.instance_name,
4222                      relocate_from=[self.sec_node])
4223
4224     ial.Run(self.op.iallocator)
4225
4226     if not ial.success:
4227       raise errors.OpPrereqError("Can't compute nodes using"
4228                                  " iallocator '%s': %s" % (self.op.iallocator,
4229                                                            ial.info))
4230     if len(ial.nodes) != ial.required_nodes:
4231       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4232                                  " of nodes (%s), required %s" %
4233                                  (len(ial.nodes), ial.required_nodes))
4234     self.op.remote_node = ial.nodes[0]
4235     self.LogInfo("Selected new secondary for the instance: %s",
4236                  self.op.remote_node)
4237
4238   def BuildHooksEnv(self):
4239     """Build hooks env.
4240
4241     This runs on the master, the primary and all the secondaries.
4242
4243     """
4244     env = {
4245       "MODE": self.op.mode,
4246       "NEW_SECONDARY": self.op.remote_node,
4247       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4248       }
4249     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4250     nl = [
4251       self.cfg.GetMasterNode(),
4252       self.instance.primary_node,
4253       ]
4254     if self.op.remote_node is not None:
4255       nl.append(self.op.remote_node)
4256     return env, nl, nl
4257
4258   def CheckPrereq(self):
4259     """Check prerequisites.
4260
4261     This checks that the instance is in the cluster.
4262
4263     """
4264     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4265     assert instance is not None, \
4266       "Cannot retrieve locked instance %s" % self.op.instance_name
4267     self.instance = instance
4268
4269     if instance.disk_template not in constants.DTS_NET_MIRROR:
4270       raise errors.OpPrereqError("Instance's disk layout is not"
4271                                  " network mirrored.")
4272
4273     if len(instance.secondary_nodes) != 1:
4274       raise errors.OpPrereqError("The instance has a strange layout,"
4275                                  " expected one secondary but found %d" %
4276                                  len(instance.secondary_nodes))
4277
4278     self.sec_node = instance.secondary_nodes[0]
4279
4280     ia_name = getattr(self.op, "iallocator", None)
4281     if ia_name is not None:
4282       self._RunAllocator()
4283
4284     remote_node = self.op.remote_node
4285     if remote_node is not None:
4286       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4287       assert self.remote_node_info is not None, \
4288         "Cannot retrieve locked node %s" % remote_node
4289     else:
4290       self.remote_node_info = None
4291     if remote_node == instance.primary_node:
4292       raise errors.OpPrereqError("The specified node is the primary node of"
4293                                  " the instance.")
4294     elif remote_node == self.sec_node:
4295       if self.op.mode == constants.REPLACE_DISK_SEC:
4296         # this is for DRBD8, where we can't execute the same mode of
4297         # replacement as for drbd7 (no different port allocated)
4298         raise errors.OpPrereqError("Same secondary given, cannot execute"
4299                                    " replacement")
4300     if instance.disk_template == constants.DT_DRBD8:
4301       if (self.op.mode == constants.REPLACE_DISK_ALL and
4302           remote_node is not None):
4303         # switch to replace secondary mode
4304         self.op.mode = constants.REPLACE_DISK_SEC
4305
4306       if self.op.mode == constants.REPLACE_DISK_ALL:
4307         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4308                                    " secondary disk replacement, not"
4309                                    " both at once")
4310       elif self.op.mode == constants.REPLACE_DISK_PRI:
4311         if remote_node is not None:
4312           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4313                                      " the secondary while doing a primary"
4314                                      " node disk replacement")
4315         self.tgt_node = instance.primary_node
4316         self.oth_node = instance.secondary_nodes[0]
4317       elif self.op.mode == constants.REPLACE_DISK_SEC:
4318         self.new_node = remote_node # this can be None, in which case
4319                                     # we don't change the secondary
4320         self.tgt_node = instance.secondary_nodes[0]
4321         self.oth_node = instance.primary_node
4322       else:
4323         raise errors.ProgrammerError("Unhandled disk replace mode")
4324
4325     if not self.op.disks:
4326       self.op.disks = range(len(instance.disks))
4327
4328     for disk_idx in self.op.disks:
4329       instance.FindDisk(disk_idx)
4330
4331   def _ExecD8DiskOnly(self, feedback_fn):
4332     """Replace a disk on the primary or secondary for dbrd8.
4333
4334     The algorithm for replace is quite complicated:
4335
4336       1. for each disk to be replaced:
4337
4338         1. create new LVs on the target node with unique names
4339         1. detach old LVs from the drbd device
4340         1. rename old LVs to name_replaced.<time_t>
4341         1. rename new LVs to old LVs
4342         1. attach the new LVs (with the old names now) to the drbd device
4343
4344       1. wait for sync across all devices
4345
4346       1. for each modified disk:
4347
4348         1. remove old LVs (which have the name name_replaces.<time_t>)
4349
4350     Failures are not very well handled.
4351
4352     """
4353     steps_total = 6
4354     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4355     instance = self.instance
4356     iv_names = {}
4357     vgname = self.cfg.GetVGName()
4358     # start of work
4359     cfg = self.cfg
4360     tgt_node = self.tgt_node
4361     oth_node = self.oth_node
4362
4363     # Step: check device activation
4364     self.proc.LogStep(1, steps_total, "check device existence")
4365     info("checking volume groups")
4366     my_vg = cfg.GetVGName()
4367     results = self.rpc.call_vg_list([oth_node, tgt_node])
4368     if not results:
4369       raise errors.OpExecError("Can't list volume groups on the nodes")
4370     for node in oth_node, tgt_node:
4371       res = results[node]
4372       if res.failed or not res.data or my_vg not in res.data:
4373         raise errors.OpExecError("Volume group '%s' not found on %s" %
4374                                  (my_vg, node))
4375     for idx, dev in enumerate(instance.disks):
4376       if idx not in self.op.disks:
4377         continue
4378       for node in tgt_node, oth_node:
4379         info("checking disk/%d on %s" % (idx, node))
4380         cfg.SetDiskID(dev, node)
4381         if not self.rpc.call_blockdev_find(node, dev):
4382           raise errors.OpExecError("Can't find disk/%d on node %s" %
4383                                    (idx, node))
4384
4385     # Step: check other node consistency
4386     self.proc.LogStep(2, steps_total, "check peer consistency")
4387     for idx, dev in enumerate(instance.disks):
4388       if idx not in self.op.disks:
4389         continue
4390       info("checking disk/%d consistency on %s" % (idx, oth_node))
4391       if not _CheckDiskConsistency(self, dev, oth_node,
4392                                    oth_node==instance.primary_node):
4393         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4394                                  " to replace disks on this node (%s)" %
4395                                  (oth_node, tgt_node))
4396
4397     # Step: create new storage
4398     self.proc.LogStep(3, steps_total, "allocate new storage")
4399     for idx, dev in enumerate(instance.disks):
4400       if idx not in self.op.disks:
4401         continue
4402       size = dev.size
4403       cfg.SetDiskID(dev, tgt_node)
4404       lv_names = [".disk%d_%s" % (idx, suf)
4405                   for suf in ["data", "meta"]]
4406       names = _GenerateUniqueNames(self, lv_names)
4407       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4408                              logical_id=(vgname, names[0]))
4409       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4410                              logical_id=(vgname, names[1]))
4411       new_lvs = [lv_data, lv_meta]
4412       old_lvs = dev.children
4413       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4414       info("creating new local storage on %s for %s" %
4415            (tgt_node, dev.iv_name))
4416       # since we *always* want to create this LV, we use the
4417       # _Create...OnPrimary (which forces the creation), even if we
4418       # are talking about the secondary node
4419       for new_lv in new_lvs:
4420         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4421                                         _GetInstanceInfoText(instance)):
4422           raise errors.OpExecError("Failed to create new LV named '%s' on"
4423                                    " node '%s'" %
4424                                    (new_lv.logical_id[1], tgt_node))
4425
4426     # Step: for each lv, detach+rename*2+attach
4427     self.proc.LogStep(4, steps_total, "change drbd configuration")
4428     for dev, old_lvs, new_lvs in iv_names.itervalues():
4429       info("detaching %s drbd from local storage" % dev.iv_name)
4430       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4431       result.Raise()
4432       if not result.data:
4433         raise errors.OpExecError("Can't detach drbd from local storage on node"
4434                                  " %s for device %s" % (tgt_node, dev.iv_name))
4435       #dev.children = []
4436       #cfg.Update(instance)
4437
4438       # ok, we created the new LVs, so now we know we have the needed
4439       # storage; as such, we proceed on the target node to rename
4440       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4441       # using the assumption that logical_id == physical_id (which in
4442       # turn is the unique_id on that node)
4443
4444       # FIXME(iustin): use a better name for the replaced LVs
4445       temp_suffix = int(time.time())
4446       ren_fn = lambda d, suff: (d.physical_id[0],
4447                                 d.physical_id[1] + "_replaced-%s" % suff)
4448       # build the rename list based on what LVs exist on the node
4449       rlist = []
4450       for to_ren in old_lvs:
4451         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4452         if not find_res.failed and find_res.data is not None: # device exists
4453           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4454
4455       info("renaming the old LVs on the target node")
4456       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4457       result.Raise()
4458       if not result.data:
4459         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4460       # now we rename the new LVs to the old LVs
4461       info("renaming the new LVs on the target node")
4462       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4463       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4464       result.Raise()
4465       if not result.data:
4466         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4467
4468       for old, new in zip(old_lvs, new_lvs):
4469         new.logical_id = old.logical_id
4470         cfg.SetDiskID(new, tgt_node)
4471
4472       for disk in old_lvs:
4473         disk.logical_id = ren_fn(disk, temp_suffix)
4474         cfg.SetDiskID(disk, tgt_node)
4475
4476       # now that the new lvs have the old name, we can add them to the device
4477       info("adding new mirror component on %s" % tgt_node)
4478       result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4479       if result.failed or not result.data:
4480         for new_lv in new_lvs:
4481           result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4482           if result.failed or not result.data:
4483             warning("Can't rollback device %s", hint="manually cleanup unused"
4484                     " logical volumes")
4485         raise errors.OpExecError("Can't add local storage to drbd")
4486
4487       dev.children = new_lvs
4488       cfg.Update(instance)
4489
4490     # Step: wait for sync
4491
4492     # this can fail as the old devices are degraded and _WaitForSync
4493     # does a combined result over all disks, so we don't check its
4494     # return value
4495     self.proc.LogStep(5, steps_total, "sync devices")
4496     _WaitForSync(self, instance, unlock=True)
4497
4498     # so check manually all the devices
4499     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4500       cfg.SetDiskID(dev, instance.primary_node)
4501       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4502       if result.failed or result.data[5]:
4503         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4504
4505     # Step: remove old storage
4506     self.proc.LogStep(6, steps_total, "removing old storage")
4507     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4508       info("remove logical volumes for %s" % name)
4509       for lv in old_lvs:
4510         cfg.SetDiskID(lv, tgt_node)
4511         result = self.rpc.call_blockdev_remove(tgt_node, lv)
4512         if result.failed or not result.data:
4513           warning("Can't remove old LV", hint="manually remove unused LVs")
4514           continue
4515
4516   def _ExecD8Secondary(self, feedback_fn):
4517     """Replace the secondary node for drbd8.
4518
4519     The algorithm for replace is quite complicated:
4520       - for all disks of the instance:
4521         - create new LVs on the new node with same names
4522         - shutdown the drbd device on the old secondary
4523         - disconnect the drbd network on the primary
4524         - create the drbd device on the new secondary
4525         - network attach the drbd on the primary, using an artifice:
4526           the drbd code for Attach() will connect to the network if it
4527           finds a device which is connected to the good local disks but
4528           not network enabled
4529       - wait for sync across all devices
4530       - remove all disks from the old secondary
4531
4532     Failures are not very well handled.
4533
4534     """
4535     steps_total = 6
4536     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4537     instance = self.instance
4538     iv_names = {}
4539     vgname = self.cfg.GetVGName()
4540     # start of work
4541     cfg = self.cfg
4542     old_node = self.tgt_node
4543     new_node = self.new_node
4544     pri_node = instance.primary_node
4545
4546     # Step: check device activation
4547     self.proc.LogStep(1, steps_total, "check device existence")
4548     info("checking volume groups")
4549     my_vg = cfg.GetVGName()
4550     results = self.rpc.call_vg_list([pri_node, new_node])
4551     for node in pri_node, new_node:
4552       res = results[node]
4553       if res.failed or not res.data or my_vg not in res.data:
4554         raise errors.OpExecError("Volume group '%s' not found on %s" %
4555                                  (my_vg, node))
4556     for idx, dev in enumerate(instance.disks):
4557       if idx not in self.op.disks:
4558         continue
4559       info("checking disk/%d on %s" % (idx, pri_node))
4560       cfg.SetDiskID(dev, pri_node)
4561       result = self.rpc.call_blockdev_find(pri_node, dev)
4562       result.Raise()
4563       if not result.data:
4564         raise errors.OpExecError("Can't find disk/%d on node %s" %
4565                                  (idx, pri_node))
4566
4567     # Step: check other node consistency
4568     self.proc.LogStep(2, steps_total, "check peer consistency")
4569     for idx, dev in enumerate(instance.disks):
4570       if idx not in self.op.disks:
4571         continue
4572       info("checking disk/%d consistency on %s" % (idx, pri_node))
4573       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4574         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4575                                  " unsafe to replace the secondary" %
4576                                  pri_node)
4577
4578     # Step: create new storage
4579     self.proc.LogStep(3, steps_total, "allocate new storage")
4580     for idx, dev in enumerate(instance.disks):
4581       size = dev.size
4582       info("adding new local storage on %s for disk/%d" %
4583            (new_node, idx))
4584       # since we *always* want to create this LV, we use the
4585       # _Create...OnPrimary (which forces the creation), even if we
4586       # are talking about the secondary node
4587       for new_lv in dev.children:
4588         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4589                                         _GetInstanceInfoText(instance)):
4590           raise errors.OpExecError("Failed to create new LV named '%s' on"
4591                                    " node '%s'" %
4592                                    (new_lv.logical_id[1], new_node))
4593
4594     # Step 4: dbrd minors and drbd setups changes
4595     # after this, we must manually remove the drbd minors on both the
4596     # error and the success paths
4597     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4598                                    instance.name)
4599     logging.debug("Allocated minors %s" % (minors,))
4600     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4601     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4602       size = dev.size
4603       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4604       # create new devices on new_node
4605       if pri_node == dev.logical_id[0]:
4606         new_logical_id = (pri_node, new_node,
4607                           dev.logical_id[2], dev.logical_id[3], new_minor,
4608                           dev.logical_id[5])
4609       else:
4610         new_logical_id = (new_node, pri_node,
4611                           dev.logical_id[2], new_minor, dev.logical_id[4],
4612                           dev.logical_id[5])
4613       iv_names[idx] = (dev, dev.children, new_logical_id)
4614       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4615                     new_logical_id)
4616       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4617                               logical_id=new_logical_id,
4618                               children=dev.children)
4619       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4620                                         new_drbd, False,
4621                                         _GetInstanceInfoText(instance)):
4622         self.cfg.ReleaseDRBDMinors(instance.name)
4623         raise errors.OpExecError("Failed to create new DRBD on"
4624                                  " node '%s'" % new_node)
4625
4626     for idx, dev in enumerate(instance.disks):
4627       # we have new devices, shutdown the drbd on the old secondary
4628       info("shutting down drbd for disk/%d on old node" % idx)
4629       cfg.SetDiskID(dev, old_node)
4630       result = self.rpc.call_blockdev_shutdown(old_node, dev)
4631       if result.failed or not result.data:
4632         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4633                 hint="Please cleanup this device manually as soon as possible")
4634
4635     info("detaching primary drbds from the network (=> standalone)")
4636     done = 0
4637     for idx, dev in enumerate(instance.disks):
4638       cfg.SetDiskID(dev, pri_node)
4639       # set the network part of the physical (unique in bdev terms) id
4640       # to None, meaning detach from network
4641       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4642       # and 'find' the device, which will 'fix' it to match the
4643       # standalone state
4644       result = self.rpc.call_blockdev_find(pri_node, dev)
4645       if not result.failed and result.data:
4646         done += 1
4647       else:
4648         warning("Failed to detach drbd disk/%d from network, unusual case" %
4649                 idx)
4650
4651     if not done:
4652       # no detaches succeeded (very unlikely)
4653       self.cfg.ReleaseDRBDMinors(instance.name)
4654       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4655
4656     # if we managed to detach at least one, we update all the disks of
4657     # the instance to point to the new secondary
4658     info("updating instance configuration")
4659     for dev, _, new_logical_id in iv_names.itervalues():
4660       dev.logical_id = new_logical_id
4661       cfg.SetDiskID(dev, pri_node)
4662     cfg.Update(instance)
4663     # we can remove now the temp minors as now the new values are
4664     # written to the config file (and therefore stable)
4665     self.cfg.ReleaseDRBDMinors(instance.name)
4666
4667     # and now perform the drbd attach
4668     info("attaching primary drbds to new secondary (standalone => connected)")
4669     failures = []
4670     for idx, dev in enumerate(instance.disks):
4671       info("attaching primary drbd for disk/%d to new secondary node" % idx)
4672       # since the attach is smart, it's enough to 'find' the device,
4673       # it will automatically activate the network, if the physical_id
4674       # is correct
4675       cfg.SetDiskID(dev, pri_node)
4676       logging.debug("Disk to attach: %s", dev)
4677       result = self.rpc.call_blockdev_find(pri_node, dev)
4678       if result.failed or not result.data:
4679         warning("can't attach drbd disk/%d to new secondary!" % idx,
4680                 "please do a gnt-instance info to see the status of disks")
4681
4682     # this can fail as the old devices are degraded and _WaitForSync
4683     # does a combined result over all disks, so we don't check its
4684     # return value
4685     self.proc.LogStep(5, steps_total, "sync devices")
4686     _WaitForSync(self, instance, unlock=True)
4687
4688     # so check manually all the devices
4689     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4690       cfg.SetDiskID(dev, pri_node)
4691       result = self.rpc.call_blockdev_find(pri_node, dev)
4692       result.Raise()
4693       if result.data[5]:
4694         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4695
4696     self.proc.LogStep(6, steps_total, "removing old storage")
4697     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4698       info("remove logical volumes for disk/%d" % idx)
4699       for lv in old_lvs:
4700         cfg.SetDiskID(lv, old_node)
4701         result = self.rpc.call_blockdev_remove(old_node, lv)
4702         if result.failed or not result.data:
4703           warning("Can't remove LV on old secondary",
4704                   hint="Cleanup stale volumes by hand")
4705
4706   def Exec(self, feedback_fn):
4707     """Execute disk replacement.
4708
4709     This dispatches the disk replacement to the appropriate handler.
4710
4711     """
4712     instance = self.instance
4713
4714     # Activate the instance disks if we're replacing them on a down instance
4715     if instance.status == "down":
4716       _StartInstanceDisks(self, instance, True)
4717
4718     if instance.disk_template == constants.DT_DRBD8:
4719       if self.op.remote_node is None:
4720         fn = self._ExecD8DiskOnly
4721       else:
4722         fn = self._ExecD8Secondary
4723     else:
4724       raise errors.ProgrammerError("Unhandled disk replacement case")
4725
4726     ret = fn(feedback_fn)
4727
4728     # Deactivate the instance disks if we're replacing them on a down instance
4729     if instance.status == "down":
4730       _SafeShutdownInstanceDisks(self, instance)
4731
4732     return ret
4733
4734
4735 class LUGrowDisk(LogicalUnit):
4736   """Grow a disk of an instance.
4737
4738   """
4739   HPATH = "disk-grow"
4740   HTYPE = constants.HTYPE_INSTANCE
4741   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4742   REQ_BGL = False
4743
4744   def ExpandNames(self):
4745     self._ExpandAndLockInstance()
4746     self.needed_locks[locking.LEVEL_NODE] = []
4747     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4748
4749   def DeclareLocks(self, level):
4750     if level == locking.LEVEL_NODE:
4751       self._LockInstancesNodes()
4752
4753   def BuildHooksEnv(self):
4754     """Build hooks env.
4755
4756     This runs on the master, the primary and all the secondaries.
4757
4758     """
4759     env = {
4760       "DISK": self.op.disk,
4761       "AMOUNT": self.op.amount,
4762       }
4763     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4764     nl = [
4765       self.cfg.GetMasterNode(),
4766       self.instance.primary_node,
4767       ]
4768     return env, nl, nl
4769
4770   def CheckPrereq(self):
4771     """Check prerequisites.
4772
4773     This checks that the instance is in the cluster.
4774
4775     """
4776     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4777     assert instance is not None, \
4778       "Cannot retrieve locked instance %s" % self.op.instance_name
4779
4780     self.instance = instance
4781
4782     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4783       raise errors.OpPrereqError("Instance's disk layout does not support"
4784                                  " growing.")
4785
4786     self.disk = instance.FindDisk(self.op.disk)
4787
4788     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4789     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4790                                        instance.hypervisor)
4791     for node in nodenames:
4792       info = nodeinfo[node]
4793       if info.failed or not info.data:
4794         raise errors.OpPrereqError("Cannot get current information"
4795                                    " from node '%s'" % node)
4796       vg_free = info.data.get('vg_free', None)
4797       if not isinstance(vg_free, int):
4798         raise errors.OpPrereqError("Can't compute free disk space on"
4799                                    " node %s" % node)
4800       if self.op.amount > vg_free:
4801         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4802                                    " %d MiB available, %d MiB required" %
4803                                    (node, vg_free, self.op.amount))
4804
4805   def Exec(self, feedback_fn):
4806     """Execute disk grow.
4807
4808     """
4809     instance = self.instance
4810     disk = self.disk
4811     for node in (instance.secondary_nodes + (instance.primary_node,)):
4812       self.cfg.SetDiskID(disk, node)
4813       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4814       result.Raise()
4815       if (not result.data or not isinstance(result.data, (list, tuple)) or
4816           len(result.data) != 2):
4817         raise errors.OpExecError("Grow request failed to node %s" % node)
4818       elif not result.data[0]:
4819         raise errors.OpExecError("Grow request failed to node %s: %s" %
4820                                  (node, result.data[1]))
4821     disk.RecordGrow(self.op.amount)
4822     self.cfg.Update(instance)
4823     if self.op.wait_for_sync:
4824       disk_abort = not _WaitForSync(self, instance)
4825       if disk_abort:
4826         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4827                              " status.\nPlease check the instance.")
4828
4829
4830 class LUQueryInstanceData(NoHooksLU):
4831   """Query runtime instance data.
4832
4833   """
4834   _OP_REQP = ["instances", "static"]
4835   REQ_BGL = False
4836
4837   def ExpandNames(self):
4838     self.needed_locks = {}
4839     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4840
4841     if not isinstance(self.op.instances, list):
4842       raise errors.OpPrereqError("Invalid argument type 'instances'")
4843
4844     if self.op.instances:
4845       self.wanted_names = []
4846       for name in self.op.instances:
4847         full_name = self.cfg.ExpandInstanceName(name)
4848         if full_name is None:
4849           raise errors.OpPrereqError("Instance '%s' not known" %
4850                                      self.op.instance_name)
4851         self.wanted_names.append(full_name)
4852       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4853     else:
4854       self.wanted_names = None
4855       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4856
4857     self.needed_locks[locking.LEVEL_NODE] = []
4858     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4859
4860   def DeclareLocks(self, level):
4861     if level == locking.LEVEL_NODE:
4862       self._LockInstancesNodes()
4863
4864   def CheckPrereq(self):
4865     """Check prerequisites.
4866
4867     This only checks the optional instance list against the existing names.
4868
4869     """
4870     if self.wanted_names is None:
4871       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4872
4873     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4874                              in self.wanted_names]
4875     return
4876
4877   def _ComputeDiskStatus(self, instance, snode, dev):
4878     """Compute block device status.
4879
4880     """
4881     static = self.op.static
4882     if not static:
4883       self.cfg.SetDiskID(dev, instance.primary_node)
4884       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4885       dev_pstatus.Raise()
4886       dev_pstatus = dev_pstatus.data
4887     else:
4888       dev_pstatus = None
4889
4890     if dev.dev_type in constants.LDS_DRBD:
4891       # we change the snode then (otherwise we use the one passed in)
4892       if dev.logical_id[0] == instance.primary_node:
4893         snode = dev.logical_id[1]
4894       else:
4895         snode = dev.logical_id[0]
4896
4897     if snode and not static:
4898       self.cfg.SetDiskID(dev, snode)
4899       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4900       dev_sstatus.Raise()
4901       dev_sstatus = dev_sstatus.data
4902     else:
4903       dev_sstatus = None
4904
4905     if dev.children:
4906       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4907                       for child in dev.children]
4908     else:
4909       dev_children = []
4910
4911     data = {
4912       "iv_name": dev.iv_name,
4913       "dev_type": dev.dev_type,
4914       "logical_id": dev.logical_id,
4915       "physical_id": dev.physical_id,
4916       "pstatus": dev_pstatus,
4917       "sstatus": dev_sstatus,
4918       "children": dev_children,
4919       "mode": dev.mode,
4920       }
4921
4922     return data
4923
4924   def Exec(self, feedback_fn):
4925     """Gather and return data"""
4926     result = {}
4927
4928     cluster = self.cfg.GetClusterInfo()
4929
4930     for instance in self.wanted_instances:
4931       if not self.op.static:
4932         remote_info = self.rpc.call_instance_info(instance.primary_node,
4933                                                   instance.name,
4934                                                   instance.hypervisor)
4935         remote_info.Raise()
4936         remote_info = remote_info.data
4937         if remote_info and "state" in remote_info:
4938           remote_state = "up"
4939         else:
4940           remote_state = "down"
4941       else:
4942         remote_state = None
4943       if instance.status == "down":
4944         config_state = "down"
4945       else:
4946         config_state = "up"
4947
4948       disks = [self._ComputeDiskStatus(instance, None, device)
4949                for device in instance.disks]
4950
4951       idict = {
4952         "name": instance.name,
4953         "config_state": config_state,
4954         "run_state": remote_state,
4955         "pnode": instance.primary_node,
4956         "snodes": instance.secondary_nodes,
4957         "os": instance.os,
4958         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4959         "disks": disks,
4960         "hypervisor": instance.hypervisor,
4961         "network_port": instance.network_port,
4962         "hv_instance": instance.hvparams,
4963         "hv_actual": cluster.FillHV(instance),
4964         "be_instance": instance.beparams,
4965         "be_actual": cluster.FillBE(instance),
4966         }
4967
4968       result[instance.name] = idict
4969
4970     return result
4971
4972
4973 class LUSetInstanceParams(LogicalUnit):
4974   """Modifies an instances's parameters.
4975
4976   """
4977   HPATH = "instance-modify"
4978   HTYPE = constants.HTYPE_INSTANCE
4979   _OP_REQP = ["instance_name"]
4980   REQ_BGL = False
4981
4982   def CheckArguments(self):
4983     if not hasattr(self.op, 'nics'):
4984       self.op.nics = []
4985     if not hasattr(self.op, 'disks'):
4986       self.op.disks = []
4987     if not hasattr(self.op, 'beparams'):
4988       self.op.beparams = {}
4989     if not hasattr(self.op, 'hvparams'):
4990       self.op.hvparams = {}
4991     self.op.force = getattr(self.op, "force", False)
4992     if not (self.op.nics or self.op.disks or
4993             self.op.hvparams or self.op.beparams):
4994       raise errors.OpPrereqError("No changes submitted")
4995
4996     utils.CheckBEParams(self.op.beparams)
4997
4998     # Disk validation
4999     disk_addremove = 0
5000     for disk_op, disk_dict in self.op.disks:
5001       if disk_op == constants.DDM_REMOVE:
5002         disk_addremove += 1
5003         continue
5004       elif disk_op == constants.DDM_ADD:
5005         disk_addremove += 1
5006       else:
5007         if not isinstance(disk_op, int):
5008           raise errors.OpPrereqError("Invalid disk index")
5009       if disk_op == constants.DDM_ADD:
5010         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5011         if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
5012           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5013         size = disk_dict.get('size', None)
5014         if size is None:
5015           raise errors.OpPrereqError("Required disk parameter size missing")
5016         try:
5017           size = int(size)
5018         except ValueError, err:
5019           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5020                                      str(err))
5021         disk_dict['size'] = size
5022       else:
5023         # modification of disk
5024         if 'size' in disk_dict:
5025           raise errors.OpPrereqError("Disk size change not possible, use"
5026                                      " grow-disk")
5027
5028     if disk_addremove > 1:
5029       raise errors.OpPrereqError("Only one disk add or remove operation"
5030                                  " supported at a time")
5031
5032     # NIC validation
5033     nic_addremove = 0
5034     for nic_op, nic_dict in self.op.nics:
5035       if nic_op == constants.DDM_REMOVE:
5036         nic_addremove += 1
5037         continue
5038       elif nic_op == constants.DDM_ADD:
5039         nic_addremove += 1
5040       else:
5041         if not isinstance(nic_op, int):
5042           raise errors.OpPrereqError("Invalid nic index")
5043
5044       # nic_dict should be a dict
5045       nic_ip = nic_dict.get('ip', None)
5046       if nic_ip is not None:
5047         if nic_ip.lower() == "none":
5048           nic_dict['ip'] = None
5049         else:
5050           if not utils.IsValidIP(nic_ip):
5051             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5052       # we can only check None bridges and assign the default one
5053       nic_bridge = nic_dict.get('bridge', None)
5054       if nic_bridge is None:
5055         nic_dict['bridge'] = self.cfg.GetDefBridge()
5056       # but we can validate MACs
5057       nic_mac = nic_dict.get('mac', None)
5058       if nic_mac is not None:
5059         if self.cfg.IsMacInUse(nic_mac):
5060           raise errors.OpPrereqError("MAC address %s already in use"
5061                                      " in cluster" % nic_mac)
5062         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5063           if not utils.IsValidMac(nic_mac):
5064             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5065     if nic_addremove > 1:
5066       raise errors.OpPrereqError("Only one NIC add or remove operation"
5067                                  " supported at a time")
5068
5069   def ExpandNames(self):
5070     self._ExpandAndLockInstance()
5071     self.needed_locks[locking.LEVEL_NODE] = []
5072     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5073
5074   def DeclareLocks(self, level):
5075     if level == locking.LEVEL_NODE:
5076       self._LockInstancesNodes()
5077
5078   def BuildHooksEnv(self):
5079     """Build hooks env.
5080
5081     This runs on the master, primary and secondaries.
5082
5083     """
5084     args = dict()
5085     if constants.BE_MEMORY in self.be_new:
5086       args['memory'] = self.be_new[constants.BE_MEMORY]
5087     if constants.BE_VCPUS in self.be_new:
5088       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5089     # FIXME: readd disk/nic changes
5090     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5091     nl = [self.cfg.GetMasterNode(),
5092           self.instance.primary_node] + list(self.instance.secondary_nodes)
5093     return env, nl, nl
5094
5095   def CheckPrereq(self):
5096     """Check prerequisites.
5097
5098     This only checks the instance list against the existing names.
5099
5100     """
5101     force = self.force = self.op.force
5102
5103     # checking the new params on the primary/secondary nodes
5104
5105     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5106     assert self.instance is not None, \
5107       "Cannot retrieve locked instance %s" % self.op.instance_name
5108     pnode = self.instance.primary_node
5109     nodelist = [pnode]
5110     nodelist.extend(instance.secondary_nodes)
5111
5112     # hvparams processing
5113     if self.op.hvparams:
5114       i_hvdict = copy.deepcopy(instance.hvparams)
5115       for key, val in self.op.hvparams.iteritems():
5116         if val == constants.VALUE_DEFAULT:
5117           try:
5118             del i_hvdict[key]
5119           except KeyError:
5120             pass
5121         elif val == constants.VALUE_NONE:
5122           i_hvdict[key] = None
5123         else:
5124           i_hvdict[key] = val
5125       cluster = self.cfg.GetClusterInfo()
5126       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5127                                 i_hvdict)
5128       # local check
5129       hypervisor.GetHypervisor(
5130         instance.hypervisor).CheckParameterSyntax(hv_new)
5131       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5132       self.hv_new = hv_new # the new actual values
5133       self.hv_inst = i_hvdict # the new dict (without defaults)
5134     else:
5135       self.hv_new = self.hv_inst = {}
5136
5137     # beparams processing
5138     if self.op.beparams:
5139       i_bedict = copy.deepcopy(instance.beparams)
5140       for key, val in self.op.beparams.iteritems():
5141         if val == constants.VALUE_DEFAULT:
5142           try:
5143             del i_bedict[key]
5144           except KeyError:
5145             pass
5146         else:
5147           i_bedict[key] = val
5148       cluster = self.cfg.GetClusterInfo()
5149       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5150                                 i_bedict)
5151       self.be_new = be_new # the new actual values
5152       self.be_inst = i_bedict # the new dict (without defaults)
5153     else:
5154       self.be_new = self.be_inst = {}
5155
5156     self.warn = []
5157
5158     if constants.BE_MEMORY in self.op.beparams and not self.force:
5159       mem_check_list = [pnode]
5160       if be_new[constants.BE_AUTO_BALANCE]:
5161         # either we changed auto_balance to yes or it was from before
5162         mem_check_list.extend(instance.secondary_nodes)
5163       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5164                                                   instance.hypervisor)
5165       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5166                                          instance.hypervisor)
5167       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5168         # Assume the primary node is unreachable and go ahead
5169         self.warn.append("Can't get info from primary node %s" % pnode)
5170       else:
5171         if not instance_info.failed and instance_info.data:
5172           current_mem = instance_info.data['memory']
5173         else:
5174           # Assume instance not running
5175           # (there is a slight race condition here, but it's not very probable,
5176           # and we have no other way to check)
5177           current_mem = 0
5178         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5179                     nodeinfo[pnode].data['memory_free'])
5180         if miss_mem > 0:
5181           raise errors.OpPrereqError("This change will prevent the instance"
5182                                      " from starting, due to %d MB of memory"
5183                                      " missing on its primary node" % miss_mem)
5184
5185       if be_new[constants.BE_AUTO_BALANCE]:
5186         for node, nres in instance.secondary_nodes.iteritems():
5187           if nres.failed or not isinstance(nres.data, dict):
5188             self.warn.append("Can't get info from secondary node %s" % node)
5189           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5190             self.warn.append("Not enough memory to failover instance to"
5191                              " secondary node %s" % node)
5192
5193     # NIC processing
5194     for nic_op, nic_dict in self.op.nics:
5195       if nic_op == constants.DDM_REMOVE:
5196         if not instance.nics:
5197           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5198         continue
5199       if nic_op != constants.DDM_ADD:
5200         # an existing nic
5201         if nic_op < 0 or nic_op >= len(instance.nics):
5202           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5203                                      " are 0 to %d" %
5204                                      (nic_op, len(instance.nics)))
5205       nic_bridge = nic_dict.get('bridge', None)
5206       if nic_bridge is not None:
5207         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5208           msg = ("Bridge '%s' doesn't exist on one of"
5209                  " the instance nodes" % nic_bridge)
5210           if self.force:
5211             self.warn.append(msg)
5212           else:
5213             raise errors.OpPrereqError(msg)
5214
5215     # DISK processing
5216     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5217       raise errors.OpPrereqError("Disk operations not supported for"
5218                                  " diskless instances")
5219     for disk_op, disk_dict in self.op.disks:
5220       if disk_op == constants.DDM_REMOVE:
5221         if len(instance.disks) == 1:
5222           raise errors.OpPrereqError("Cannot remove the last disk of"
5223                                      " an instance")
5224         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5225         ins_l = ins_l[pnode]
5226         if not type(ins_l) is list:
5227           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5228         if instance.name in ins_l:
5229           raise errors.OpPrereqError("Instance is running, can't remove"
5230                                      " disks.")
5231
5232       if (disk_op == constants.DDM_ADD and
5233           len(instance.nics) >= constants.MAX_DISKS):
5234         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5235                                    " add more" % constants.MAX_DISKS)
5236       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5237         # an existing disk
5238         if disk_op < 0 or disk_op >= len(instance.disks):
5239           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5240                                      " are 0 to %d" %
5241                                      (disk_op, len(instance.disks)))
5242
5243     return
5244
5245   def Exec(self, feedback_fn):
5246     """Modifies an instance.
5247
5248     All parameters take effect only at the next restart of the instance.
5249
5250     """
5251     # Process here the warnings from CheckPrereq, as we don't have a
5252     # feedback_fn there.
5253     for warn in self.warn:
5254       feedback_fn("WARNING: %s" % warn)
5255
5256     result = []
5257     instance = self.instance
5258     # disk changes
5259     for disk_op, disk_dict in self.op.disks:
5260       if disk_op == constants.DDM_REMOVE:
5261         # remove the last disk
5262         device = instance.disks.pop()
5263         device_idx = len(instance.disks)
5264         for node, disk in device.ComputeNodeTree(instance.primary_node):
5265           self.cfg.SetDiskID(disk, node)
5266           result = self.rpc.call_blockdev_remove(node, disk)
5267           if result.failed or not result.data:
5268             self.proc.LogWarning("Could not remove disk/%d on node %s,"
5269                                  " continuing anyway", device_idx, node)
5270         result.append(("disk/%d" % device_idx, "remove"))
5271       elif disk_op == constants.DDM_ADD:
5272         # add a new disk
5273         if instance.disk_template == constants.DT_FILE:
5274           file_driver, file_path = instance.disks[0].logical_id
5275           file_path = os.path.dirname(file_path)
5276         else:
5277           file_driver = file_path = None
5278         disk_idx_base = len(instance.disks)
5279         new_disk = _GenerateDiskTemplate(self,
5280                                          instance.disk_template,
5281                                          instance, instance.primary_node,
5282                                          instance.secondary_nodes,
5283                                          [disk_dict],
5284                                          file_path,
5285                                          file_driver,
5286                                          disk_idx_base)[0]
5287         new_disk.mode = disk_dict['mode']
5288         instance.disks.append(new_disk)
5289         info = _GetInstanceInfoText(instance)
5290
5291         logging.info("Creating volume %s for instance %s",
5292                      new_disk.iv_name, instance.name)
5293         # Note: this needs to be kept in sync with _CreateDisks
5294         #HARDCODE
5295         for secondary_node in instance.secondary_nodes:
5296           if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5297                                             new_disk, False, info):
5298             self.LogWarning("Failed to create volume %s (%s) on"
5299                             " secondary node %s!",
5300                             new_disk.iv_name, new_disk, secondary_node)
5301         #HARDCODE
5302         if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5303                                         instance, new_disk, info):
5304           self.LogWarning("Failed to create volume %s on primary!",
5305                           new_disk.iv_name)
5306         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5307                        (new_disk.size, new_disk.mode)))
5308       else:
5309         # change a given disk
5310         instance.disks[disk_op].mode = disk_dict['mode']
5311         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5312     # NIC changes
5313     for nic_op, nic_dict in self.op.nics:
5314       if nic_op == constants.DDM_REMOVE:
5315         # remove the last nic
5316         del instance.nics[-1]
5317         result.append(("nic.%d" % len(instance.nics), "remove"))
5318       elif nic_op == constants.DDM_ADD:
5319         # add a new nic
5320         if 'mac' not in nic_dict:
5321           mac = constants.VALUE_GENERATE
5322         else:
5323           mac = nic_dict['mac']
5324         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5325           mac = self.cfg.GenerateMAC()
5326         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5327                               bridge=nic_dict.get('bridge', None))
5328         instance.nics.append(new_nic)
5329         result.append(("nic.%d" % (len(instance.nics) - 1),
5330                        "add:mac=%s,ip=%s,bridge=%s" %
5331                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5332       else:
5333         # change a given nic
5334         for key in 'mac', 'ip', 'bridge':
5335           if key in nic_dict:
5336             setattr(instance.nics[nic_op], key, nic_dict[key])
5337             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5338
5339     # hvparams changes
5340     if self.op.hvparams:
5341       instance.hvparams = self.hv_new
5342       for key, val in self.op.hvparams.iteritems():
5343         result.append(("hv/%s" % key, val))
5344
5345     # beparams changes
5346     if self.op.beparams:
5347       instance.beparams = self.be_inst
5348       for key, val in self.op.beparams.iteritems():
5349         result.append(("be/%s" % key, val))
5350
5351     self.cfg.Update(instance)
5352
5353     return result
5354
5355
5356 class LUQueryExports(NoHooksLU):
5357   """Query the exports list
5358
5359   """
5360   _OP_REQP = ['nodes']
5361   REQ_BGL = False
5362
5363   def ExpandNames(self):
5364     self.needed_locks = {}
5365     self.share_locks[locking.LEVEL_NODE] = 1
5366     if not self.op.nodes:
5367       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5368     else:
5369       self.needed_locks[locking.LEVEL_NODE] = \
5370         _GetWantedNodes(self, self.op.nodes)
5371
5372   def CheckPrereq(self):
5373     """Check prerequisites.
5374
5375     """
5376     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5377
5378   def Exec(self, feedback_fn):
5379     """Compute the list of all the exported system images.
5380
5381     @rtype: dict
5382     @return: a dictionary with the structure node->(export-list)
5383         where export-list is a list of the instances exported on
5384         that node.
5385
5386     """
5387     rpcresult = self.rpc.call_export_list(self.nodes)
5388     result = {}
5389     for node in rpcresult:
5390       if rpcresult[node].failed:
5391         result[node] = False
5392       else:
5393         result[node] = rpcresult[node].data
5394
5395     return result
5396
5397
5398 class LUExportInstance(LogicalUnit):
5399   """Export an instance to an image in the cluster.
5400
5401   """
5402   HPATH = "instance-export"
5403   HTYPE = constants.HTYPE_INSTANCE
5404   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5405   REQ_BGL = False
5406
5407   def ExpandNames(self):
5408     self._ExpandAndLockInstance()
5409     # FIXME: lock only instance primary and destination node
5410     #
5411     # Sad but true, for now we have do lock all nodes, as we don't know where
5412     # the previous export might be, and and in this LU we search for it and
5413     # remove it from its current node. In the future we could fix this by:
5414     #  - making a tasklet to search (share-lock all), then create the new one,
5415     #    then one to remove, after
5416     #  - removing the removal operation altoghether
5417     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5418
5419   def DeclareLocks(self, level):
5420     """Last minute lock declaration."""
5421     # All nodes are locked anyway, so nothing to do here.
5422
5423   def BuildHooksEnv(self):
5424     """Build hooks env.
5425
5426     This will run on the master, primary node and target node.
5427
5428     """
5429     env = {
5430       "EXPORT_NODE": self.op.target_node,
5431       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5432       }
5433     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5434     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5435           self.op.target_node]
5436     return env, nl, nl
5437
5438   def CheckPrereq(self):
5439     """Check prerequisites.
5440
5441     This checks that the instance and node names are valid.
5442
5443     """
5444     instance_name = self.op.instance_name
5445     self.instance = self.cfg.GetInstanceInfo(instance_name)
5446     assert self.instance is not None, \
5447           "Cannot retrieve locked instance %s" % self.op.instance_name
5448
5449     self.dst_node = self.cfg.GetNodeInfo(
5450       self.cfg.ExpandNodeName(self.op.target_node))
5451
5452     if self.dst_node is None:
5453       # This is wrong node name, not a non-locked node
5454       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5455
5456     # instance disk type verification
5457     for disk in self.instance.disks:
5458       if disk.dev_type == constants.LD_FILE:
5459         raise errors.OpPrereqError("Export not supported for instances with"
5460                                    " file-based disks")
5461
5462   def Exec(self, feedback_fn):
5463     """Export an instance to an image in the cluster.
5464
5465     """
5466     instance = self.instance
5467     dst_node = self.dst_node
5468     src_node = instance.primary_node
5469     if self.op.shutdown:
5470       # shutdown the instance, but not the disks
5471       result = self.rpc.call_instance_shutdown(src_node, instance)
5472       result.Raise()
5473       if not result.data:
5474         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5475                                  (instance.name, src_node))
5476
5477     vgname = self.cfg.GetVGName()
5478
5479     snap_disks = []
5480
5481     try:
5482       for disk in instance.disks:
5483         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5484         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5485         if new_dev_name.failed or not new_dev_name.data:
5486           self.LogWarning("Could not snapshot block device %s on node %s",
5487                           disk.logical_id[1], src_node)
5488           snap_disks.append(False)
5489         else:
5490           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5491                                  logical_id=(vgname, new_dev_name.data),
5492                                  physical_id=(vgname, new_dev_name.data),
5493                                  iv_name=disk.iv_name)
5494           snap_disks.append(new_dev)
5495
5496     finally:
5497       if self.op.shutdown and instance.status == "up":
5498         result = self.rpc.call_instance_start(src_node, instance, None)
5499         if result.failed or not result.data:
5500           _ShutdownInstanceDisks(self, instance)
5501           raise errors.OpExecError("Could not start instance")
5502
5503     # TODO: check for size
5504
5505     cluster_name = self.cfg.GetClusterName()
5506     for idx, dev in enumerate(snap_disks):
5507       if dev:
5508         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5509                                                instance, cluster_name, idx)
5510         if result.failed or not result.data:
5511           self.LogWarning("Could not export block device %s from node %s to"
5512                           " node %s", dev.logical_id[1], src_node,
5513                           dst_node.name)
5514         result = self.rpc.call_blockdev_remove(src_node, dev)
5515         if result.failed or not result.data:
5516           self.LogWarning("Could not remove snapshot block device %s from node"
5517                           " %s", dev.logical_id[1], src_node)
5518
5519     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5520     if result.failed or not result.data:
5521       self.LogWarning("Could not finalize export for instance %s on node %s",
5522                       instance.name, dst_node.name)
5523
5524     nodelist = self.cfg.GetNodeList()
5525     nodelist.remove(dst_node.name)
5526
5527     # on one-node clusters nodelist will be empty after the removal
5528     # if we proceed the backup would be removed because OpQueryExports
5529     # substitutes an empty list with the full cluster node list.
5530     if nodelist:
5531       exportlist = self.rpc.call_export_list(nodelist)
5532       for node in exportlist:
5533         if exportlist[node].failed:
5534           continue
5535         if instance.name in exportlist[node].data:
5536           if not self.rpc.call_export_remove(node, instance.name):
5537             self.LogWarning("Could not remove older export for instance %s"
5538                             " on node %s", instance.name, node)
5539
5540
5541 class LURemoveExport(NoHooksLU):
5542   """Remove exports related to the named instance.
5543
5544   """
5545   _OP_REQP = ["instance_name"]
5546   REQ_BGL = False
5547
5548   def ExpandNames(self):
5549     self.needed_locks = {}
5550     # We need all nodes to be locked in order for RemoveExport to work, but we
5551     # don't need to lock the instance itself, as nothing will happen to it (and
5552     # we can remove exports also for a removed instance)
5553     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5554
5555   def CheckPrereq(self):
5556     """Check prerequisites.
5557     """
5558     pass
5559
5560   def Exec(self, feedback_fn):
5561     """Remove any export.
5562
5563     """
5564     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5565     # If the instance was not found we'll try with the name that was passed in.
5566     # This will only work if it was an FQDN, though.
5567     fqdn_warn = False
5568     if not instance_name:
5569       fqdn_warn = True
5570       instance_name = self.op.instance_name
5571
5572     exportlist = self.rpc.call_export_list(self.acquired_locks[
5573       locking.LEVEL_NODE])
5574     found = False
5575     for node in exportlist:
5576       if exportlist[node].failed:
5577         self.LogWarning("Failed to query node %s, continuing" % node)
5578         continue
5579       if instance_name in exportlist[node].data:
5580         found = True
5581         result = self.rpc.call_export_remove(node, instance_name)
5582         if result.failed or not result.data:
5583           logging.error("Could not remove export for instance %s"
5584                         " on node %s", instance_name, node)
5585
5586     if fqdn_warn and not found:
5587       feedback_fn("Export not found. If trying to remove an export belonging"
5588                   " to a deleted instance please use its Fully Qualified"
5589                   " Domain Name.")
5590
5591
5592 class TagsLU(NoHooksLU):
5593   """Generic tags LU.
5594
5595   This is an abstract class which is the parent of all the other tags LUs.
5596
5597   """
5598
5599   def ExpandNames(self):
5600     self.needed_locks = {}
5601     if self.op.kind == constants.TAG_NODE:
5602       name = self.cfg.ExpandNodeName(self.op.name)
5603       if name is None:
5604         raise errors.OpPrereqError("Invalid node name (%s)" %
5605                                    (self.op.name,))
5606       self.op.name = name
5607       self.needed_locks[locking.LEVEL_NODE] = name
5608     elif self.op.kind == constants.TAG_INSTANCE:
5609       name = self.cfg.ExpandInstanceName(self.op.name)
5610       if name is None:
5611         raise errors.OpPrereqError("Invalid instance name (%s)" %
5612                                    (self.op.name,))
5613       self.op.name = name
5614       self.needed_locks[locking.LEVEL_INSTANCE] = name
5615
5616   def CheckPrereq(self):
5617     """Check prerequisites.
5618
5619     """
5620     if self.op.kind == constants.TAG_CLUSTER:
5621       self.target = self.cfg.GetClusterInfo()
5622     elif self.op.kind == constants.TAG_NODE:
5623       self.target = self.cfg.GetNodeInfo(self.op.name)
5624     elif self.op.kind == constants.TAG_INSTANCE:
5625       self.target = self.cfg.GetInstanceInfo(self.op.name)
5626     else:
5627       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5628                                  str(self.op.kind))
5629
5630
5631 class LUGetTags(TagsLU):
5632   """Returns the tags of a given object.
5633
5634   """
5635   _OP_REQP = ["kind", "name"]
5636   REQ_BGL = False
5637
5638   def Exec(self, feedback_fn):
5639     """Returns the tag list.
5640
5641     """
5642     return list(self.target.GetTags())
5643
5644
5645 class LUSearchTags(NoHooksLU):
5646   """Searches the tags for a given pattern.
5647
5648   """
5649   _OP_REQP = ["pattern"]
5650   REQ_BGL = False
5651
5652   def ExpandNames(self):
5653     self.needed_locks = {}
5654
5655   def CheckPrereq(self):
5656     """Check prerequisites.
5657
5658     This checks the pattern passed for validity by compiling it.
5659
5660     """
5661     try:
5662       self.re = re.compile(self.op.pattern)
5663     except re.error, err:
5664       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5665                                  (self.op.pattern, err))
5666
5667   def Exec(self, feedback_fn):
5668     """Returns the tag list.
5669
5670     """
5671     cfg = self.cfg
5672     tgts = [("/cluster", cfg.GetClusterInfo())]
5673     ilist = cfg.GetAllInstancesInfo().values()
5674     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5675     nlist = cfg.GetAllNodesInfo().values()
5676     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5677     results = []
5678     for path, target in tgts:
5679       for tag in target.GetTags():
5680         if self.re.search(tag):
5681           results.append((path, tag))
5682     return results
5683
5684
5685 class LUAddTags(TagsLU):
5686   """Sets a tag on a given object.
5687
5688   """
5689   _OP_REQP = ["kind", "name", "tags"]
5690   REQ_BGL = False
5691
5692   def CheckPrereq(self):
5693     """Check prerequisites.
5694
5695     This checks the type and length of the tag name and value.
5696
5697     """
5698     TagsLU.CheckPrereq(self)
5699     for tag in self.op.tags:
5700       objects.TaggableObject.ValidateTag(tag)
5701
5702   def Exec(self, feedback_fn):
5703     """Sets the tag.
5704
5705     """
5706     try:
5707       for tag in self.op.tags:
5708         self.target.AddTag(tag)
5709     except errors.TagError, err:
5710       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5711     try:
5712       self.cfg.Update(self.target)
5713     except errors.ConfigurationError:
5714       raise errors.OpRetryError("There has been a modification to the"
5715                                 " config file and the operation has been"
5716                                 " aborted. Please retry.")
5717
5718
5719 class LUDelTags(TagsLU):
5720   """Delete a list of tags from a given object.
5721
5722   """
5723   _OP_REQP = ["kind", "name", "tags"]
5724   REQ_BGL = False
5725
5726   def CheckPrereq(self):
5727     """Check prerequisites.
5728
5729     This checks that we have the given tag.
5730
5731     """
5732     TagsLU.CheckPrereq(self)
5733     for tag in self.op.tags:
5734       objects.TaggableObject.ValidateTag(tag)
5735     del_tags = frozenset(self.op.tags)
5736     cur_tags = self.target.GetTags()
5737     if not del_tags <= cur_tags:
5738       diff_tags = del_tags - cur_tags
5739       diff_names = ["'%s'" % tag for tag in diff_tags]
5740       diff_names.sort()
5741       raise errors.OpPrereqError("Tag(s) %s not found" %
5742                                  (",".join(diff_names)))
5743
5744   def Exec(self, feedback_fn):
5745     """Remove the tag from the object.
5746
5747     """
5748     for tag in self.op.tags:
5749       self.target.RemoveTag(tag)
5750     try:
5751       self.cfg.Update(self.target)
5752     except errors.ConfigurationError:
5753       raise errors.OpRetryError("There has been a modification to the"
5754                                 " config file and the operation has been"
5755                                 " aborted. Please retry.")
5756
5757
5758 class LUTestDelay(NoHooksLU):
5759   """Sleep for a specified amount of time.
5760
5761   This LU sleeps on the master and/or nodes for a specified amount of
5762   time.
5763
5764   """
5765   _OP_REQP = ["duration", "on_master", "on_nodes"]
5766   REQ_BGL = False
5767
5768   def ExpandNames(self):
5769     """Expand names and set required locks.
5770
5771     This expands the node list, if any.
5772
5773     """
5774     self.needed_locks = {}
5775     if self.op.on_nodes:
5776       # _GetWantedNodes can be used here, but is not always appropriate to use
5777       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5778       # more information.
5779       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5780       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5781
5782   def CheckPrereq(self):
5783     """Check prerequisites.
5784
5785     """
5786
5787   def Exec(self, feedback_fn):
5788     """Do the actual sleep.
5789
5790     """
5791     if self.op.on_master:
5792       if not utils.TestDelay(self.op.duration):
5793         raise errors.OpExecError("Error during master delay test")
5794     if self.op.on_nodes:
5795       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5796       if not result:
5797         raise errors.OpExecError("Complete failure from rpc call")
5798       for node, node_result in result.items():
5799         node_result.Raise()
5800         if not node_result.data:
5801           raise errors.OpExecError("Failure during rpc call to node %s,"
5802                                    " result: %s" % (node, node_result.data))
5803
5804
5805 class IAllocator(object):
5806   """IAllocator framework.
5807
5808   An IAllocator instance has three sets of attributes:
5809     - cfg that is needed to query the cluster
5810     - input data (all members of the _KEYS class attribute are required)
5811     - four buffer attributes (in|out_data|text), that represent the
5812       input (to the external script) in text and data structure format,
5813       and the output from it, again in two formats
5814     - the result variables from the script (success, info, nodes) for
5815       easy usage
5816
5817   """
5818   _ALLO_KEYS = [
5819     "mem_size", "disks", "disk_template",
5820     "os", "tags", "nics", "vcpus", "hypervisor",
5821     ]
5822   _RELO_KEYS = [
5823     "relocate_from",
5824     ]
5825
5826   def __init__(self, lu, mode, name, **kwargs):
5827     self.lu = lu
5828     # init buffer variables
5829     self.in_text = self.out_text = self.in_data = self.out_data = None
5830     # init all input fields so that pylint is happy
5831     self.mode = mode
5832     self.name = name
5833     self.mem_size = self.disks = self.disk_template = None
5834     self.os = self.tags = self.nics = self.vcpus = None
5835     self.relocate_from = None
5836     # computed fields
5837     self.required_nodes = None
5838     # init result fields
5839     self.success = self.info = self.nodes = None
5840     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5841       keyset = self._ALLO_KEYS
5842     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5843       keyset = self._RELO_KEYS
5844     else:
5845       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5846                                    " IAllocator" % self.mode)
5847     for key in kwargs:
5848       if key not in keyset:
5849         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5850                                      " IAllocator" % key)
5851       setattr(self, key, kwargs[key])
5852     for key in keyset:
5853       if key not in kwargs:
5854         raise errors.ProgrammerError("Missing input parameter '%s' to"
5855                                      " IAllocator" % key)
5856     self._BuildInputData()
5857
5858   def _ComputeClusterData(self):
5859     """Compute the generic allocator input data.
5860
5861     This is the data that is independent of the actual operation.
5862
5863     """
5864     cfg = self.lu.cfg
5865     cluster_info = cfg.GetClusterInfo()
5866     # cluster data
5867     data = {
5868       "version": 1,
5869       "cluster_name": cfg.GetClusterName(),
5870       "cluster_tags": list(cluster_info.GetTags()),
5871       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5872       # we don't have job IDs
5873       }
5874     iinfo = cfg.GetAllInstancesInfo().values()
5875     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5876
5877     # node data
5878     node_results = {}
5879     node_list = cfg.GetNodeList()
5880
5881     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5882       hypervisor = self.hypervisor
5883     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5884       hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5885
5886     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5887                                            hypervisor)
5888     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5889                        cluster_info.enabled_hypervisors)
5890     for nname in node_list:
5891       ninfo = cfg.GetNodeInfo(nname)
5892       node_data[nname].Raise()
5893       if not isinstance(node_data[nname].data, dict):
5894         raise errors.OpExecError("Can't get data for node %s" % nname)
5895       remote_info = node_data[nname].data
5896       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5897                    'vg_size', 'vg_free', 'cpu_total']:
5898         if attr not in remote_info:
5899           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5900                                    (nname, attr))
5901         try:
5902           remote_info[attr] = int(remote_info[attr])
5903         except ValueError, err:
5904           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5905                                    " %s" % (nname, attr, str(err)))
5906       # compute memory used by primary instances
5907       i_p_mem = i_p_up_mem = 0
5908       for iinfo, beinfo in i_list:
5909         if iinfo.primary_node == nname:
5910           i_p_mem += beinfo[constants.BE_MEMORY]
5911           if iinfo.name not in node_iinfo[nname]:
5912             i_used_mem = 0
5913           else:
5914             i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5915           i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5916           remote_info['memory_free'] -= max(0, i_mem_diff)
5917
5918           if iinfo.status == "up":
5919             i_p_up_mem += beinfo[constants.BE_MEMORY]
5920
5921       # compute memory used by instances
5922       pnr = {
5923         "tags": list(ninfo.GetTags()),
5924         "total_memory": remote_info['memory_total'],
5925         "reserved_memory": remote_info['memory_dom0'],
5926         "free_memory": remote_info['memory_free'],
5927         "i_pri_memory": i_p_mem,
5928         "i_pri_up_memory": i_p_up_mem,
5929         "total_disk": remote_info['vg_size'],
5930         "free_disk": remote_info['vg_free'],
5931         "primary_ip": ninfo.primary_ip,
5932         "secondary_ip": ninfo.secondary_ip,
5933         "total_cpus": remote_info['cpu_total'],
5934         "offline": ninfo.offline,
5935         }
5936       node_results[nname] = pnr
5937     data["nodes"] = node_results
5938
5939     # instance data
5940     instance_data = {}
5941     for iinfo, beinfo in i_list:
5942       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5943                   for n in iinfo.nics]
5944       pir = {
5945         "tags": list(iinfo.GetTags()),
5946         "should_run": iinfo.status == "up",
5947         "vcpus": beinfo[constants.BE_VCPUS],
5948         "memory": beinfo[constants.BE_MEMORY],
5949         "os": iinfo.os,
5950         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5951         "nics": nic_data,
5952         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5953         "disk_template": iinfo.disk_template,
5954         "hypervisor": iinfo.hypervisor,
5955         }
5956       instance_data[iinfo.name] = pir
5957
5958     data["instances"] = instance_data
5959
5960     self.in_data = data
5961
5962   def _AddNewInstance(self):
5963     """Add new instance data to allocator structure.
5964
5965     This in combination with _AllocatorGetClusterData will create the
5966     correct structure needed as input for the allocator.
5967
5968     The checks for the completeness of the opcode must have already been
5969     done.
5970
5971     """
5972     data = self.in_data
5973     if len(self.disks) != 2:
5974       raise errors.OpExecError("Only two-disk configurations supported")
5975
5976     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5977
5978     if self.disk_template in constants.DTS_NET_MIRROR:
5979       self.required_nodes = 2
5980     else:
5981       self.required_nodes = 1
5982     request = {
5983       "type": "allocate",
5984       "name": self.name,
5985       "disk_template": self.disk_template,
5986       "tags": self.tags,
5987       "os": self.os,
5988       "vcpus": self.vcpus,
5989       "memory": self.mem_size,
5990       "disks": self.disks,
5991       "disk_space_total": disk_space,
5992       "nics": self.nics,
5993       "required_nodes": self.required_nodes,
5994       }
5995     data["request"] = request
5996
5997   def _AddRelocateInstance(self):
5998     """Add relocate instance data to allocator structure.
5999
6000     This in combination with _IAllocatorGetClusterData will create the
6001     correct structure needed as input for the allocator.
6002
6003     The checks for the completeness of the opcode must have already been
6004     done.
6005
6006     """
6007     instance = self.lu.cfg.GetInstanceInfo(self.name)
6008     if instance is None:
6009       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6010                                    " IAllocator" % self.name)
6011
6012     if instance.disk_template not in constants.DTS_NET_MIRROR:
6013       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6014
6015     if len(instance.secondary_nodes) != 1:
6016       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6017
6018     self.required_nodes = 1
6019     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6020     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6021
6022     request = {
6023       "type": "relocate",
6024       "name": self.name,
6025       "disk_space_total": disk_space,
6026       "required_nodes": self.required_nodes,
6027       "relocate_from": self.relocate_from,
6028       }
6029     self.in_data["request"] = request
6030
6031   def _BuildInputData(self):
6032     """Build input data structures.
6033
6034     """
6035     self._ComputeClusterData()
6036
6037     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6038       self._AddNewInstance()
6039     else:
6040       self._AddRelocateInstance()
6041
6042     self.in_text = serializer.Dump(self.in_data)
6043
6044   def Run(self, name, validate=True, call_fn=None):
6045     """Run an instance allocator and return the results.
6046
6047     """
6048     if call_fn is None:
6049       call_fn = self.lu.rpc.call_iallocator_runner
6050     data = self.in_text
6051
6052     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6053     result.Raise()
6054
6055     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6056       raise errors.OpExecError("Invalid result from master iallocator runner")
6057
6058     rcode, stdout, stderr, fail = result.data
6059
6060     if rcode == constants.IARUN_NOTFOUND:
6061       raise errors.OpExecError("Can't find allocator '%s'" % name)
6062     elif rcode == constants.IARUN_FAILURE:
6063       raise errors.OpExecError("Instance allocator call failed: %s,"
6064                                " output: %s" % (fail, stdout+stderr))
6065     self.out_text = stdout
6066     if validate:
6067       self._ValidateResult()
6068
6069   def _ValidateResult(self):
6070     """Process the allocator results.
6071
6072     This will process and if successful save the result in
6073     self.out_data and the other parameters.
6074
6075     """
6076     try:
6077       rdict = serializer.Load(self.out_text)
6078     except Exception, err:
6079       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6080
6081     if not isinstance(rdict, dict):
6082       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6083
6084     for key in "success", "info", "nodes":
6085       if key not in rdict:
6086         raise errors.OpExecError("Can't parse iallocator results:"
6087                                  " missing key '%s'" % key)
6088       setattr(self, key, rdict[key])
6089
6090     if not isinstance(rdict["nodes"], list):
6091       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6092                                " is not a list")
6093     self.out_data = rdict
6094
6095
6096 class LUTestAllocator(NoHooksLU):
6097   """Run allocator tests.
6098
6099   This LU runs the allocator tests
6100
6101   """
6102   _OP_REQP = ["direction", "mode", "name"]
6103
6104   def CheckPrereq(self):
6105     """Check prerequisites.
6106
6107     This checks the opcode parameters depending on the director and mode test.
6108
6109     """
6110     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6111       for attr in ["name", "mem_size", "disks", "disk_template",
6112                    "os", "tags", "nics", "vcpus"]:
6113         if not hasattr(self.op, attr):
6114           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6115                                      attr)
6116       iname = self.cfg.ExpandInstanceName(self.op.name)
6117       if iname is not None:
6118         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6119                                    iname)
6120       if not isinstance(self.op.nics, list):
6121         raise errors.OpPrereqError("Invalid parameter 'nics'")
6122       for row in self.op.nics:
6123         if (not isinstance(row, dict) or
6124             "mac" not in row or
6125             "ip" not in row or
6126             "bridge" not in row):
6127           raise errors.OpPrereqError("Invalid contents of the"
6128                                      " 'nics' parameter")
6129       if not isinstance(self.op.disks, list):
6130         raise errors.OpPrereqError("Invalid parameter 'disks'")
6131       if len(self.op.disks) != 2:
6132         raise errors.OpPrereqError("Only two-disk configurations supported")
6133       for row in self.op.disks:
6134         if (not isinstance(row, dict) or
6135             "size" not in row or
6136             not isinstance(row["size"], int) or
6137             "mode" not in row or
6138             row["mode"] not in ['r', 'w']):
6139           raise errors.OpPrereqError("Invalid contents of the"
6140                                      " 'disks' parameter")
6141       if self.op.hypervisor is None:
6142         self.op.hypervisor = self.cfg.GetHypervisorType()
6143     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6144       if not hasattr(self.op, "name"):
6145         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6146       fname = self.cfg.ExpandInstanceName(self.op.name)
6147       if fname is None:
6148         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6149                                    self.op.name)
6150       self.op.name = fname
6151       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6152     else:
6153       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6154                                  self.op.mode)
6155
6156     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6157       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6158         raise errors.OpPrereqError("Missing allocator name")
6159     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6160       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6161                                  self.op.direction)
6162
6163   def Exec(self, feedback_fn):
6164     """Run the allocator test.
6165
6166     """
6167     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6168       ial = IAllocator(self,
6169                        mode=self.op.mode,
6170                        name=self.op.name,
6171                        mem_size=self.op.mem_size,
6172                        disks=self.op.disks,
6173                        disk_template=self.op.disk_template,
6174                        os=self.op.os,
6175                        tags=self.op.tags,
6176                        nics=self.op.nics,
6177                        vcpus=self.op.vcpus,
6178                        hypervisor=self.op.hypervisor,
6179                        )
6180     else:
6181       ial = IAllocator(self,
6182                        mode=self.op.mode,
6183                        name=self.op.name,
6184                        relocate_from=list(self.relocate_from),
6185                        )
6186
6187     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6188       result = ial.in_text
6189     else:
6190       ial.Run(self.op.allocator, validate=False)
6191       result = ial.out_text
6192     return result