Use constants.VALUE_AUTO for ip comparison too
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement ExpandNames
52     - implement CheckPrereq
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements:
57         REQ_MASTER: the LU needs to run on the master node
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_MASTER = True
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90
91     for attr_name in self._OP_REQP:
92       attr_val = getattr(op, attr_name, None)
93       if attr_val is None:
94         raise errors.OpPrereqError("Required parameter '%s' missing" %
95                                    attr_name)
96
97     if not self.cfg.IsCluster():
98       raise errors.OpPrereqError("Cluster not initialized yet,"
99                                  " use 'gnt-cluster init' first.")
100     if self.REQ_MASTER:
101       master = self.cfg.GetMasterNode()
102       if master != utils.HostInfo().name:
103         raise errors.OpPrereqError("Commands must be run on the master"
104                                    " node %s" % master)
105
106   def __GetSSH(self):
107     """Returns the SshRunner object
108
109     """
110     if not self.__ssh:
111       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
112     return self.__ssh
113
114   ssh = property(fget=__GetSSH)
115
116   def ExpandNames(self):
117     """Expand names for this LU.
118
119     This method is called before starting to execute the opcode, and it should
120     update all the parameters of the opcode to their canonical form (e.g. a
121     short node name must be fully expanded after this method has successfully
122     completed). This way locking, hooks, logging, ecc. can work correctly.
123
124     LUs which implement this method must also populate the self.needed_locks
125     member, as a dict with lock levels as keys, and a list of needed lock names
126     as values. Rules:
127       - Use an empty dict if you don't need any lock
128       - If you don't need any lock at a particular level omit that level
129       - Don't put anything for the BGL level
130       - If you want all locks at a level use locking.ALL_SET as a value
131
132     If you need to share locks (rather than acquire them exclusively) at one
133     level you can modify self.share_locks, setting a true value (usually 1) for
134     that level. By default locks are not shared.
135
136     Examples:
137     # Acquire all nodes and one instance
138     self.needed_locks = {
139       locking.LEVEL_NODE: locking.ALL_SET,
140       locking.LEVEL_INSTANCE: ['instance1.example.tld'],
141     }
142     # Acquire just two nodes
143     self.needed_locks = {
144       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145     }
146     # Acquire no locks
147     self.needed_locks = {} # No, you can't leave it to the default value None
148
149     """
150     # The implementation of this method is mandatory only if the new LU is
151     # concurrent, so that old LUs don't need to be changed all at the same
152     # time.
153     if self.REQ_BGL:
154       self.needed_locks = {} # Exclusive LUs don't need locks.
155     else:
156       raise NotImplementedError
157
158   def DeclareLocks(self, level):
159     """Declare LU locking needs for a level
160
161     While most LUs can just declare their locking needs at ExpandNames time,
162     sometimes there's the need to calculate some locks after having acquired
163     the ones before. This function is called just before acquiring locks at a
164     particular level, but after acquiring the ones at lower levels, and permits
165     such calculations. It can be used to modify self.needed_locks, and by
166     default it does nothing.
167
168     This function is only called if you have something already set in
169     self.needed_locks for the level.
170
171     @param level: Locking level which is going to be locked
172     @type level: member of ganeti.locking.LEVELS
173
174     """
175
176   def CheckPrereq(self):
177     """Check prerequisites for this LU.
178
179     This method should check that the prerequisites for the execution
180     of this LU are fulfilled. It can do internode communication, but
181     it should be idempotent - no cluster or system changes are
182     allowed.
183
184     The method should raise errors.OpPrereqError in case something is
185     not fulfilled. Its return value is ignored.
186
187     This method should also update all the parameters of the opcode to
188     their canonical form if it hasn't been done by ExpandNames before.
189
190     """
191     raise NotImplementedError
192
193   def Exec(self, feedback_fn):
194     """Execute the LU.
195
196     This method should implement the actual work. It should raise
197     errors.OpExecError for failures that are somewhat dealt with in
198     code, or expected.
199
200     """
201     raise NotImplementedError
202
203   def BuildHooksEnv(self):
204     """Build hooks environment for this LU.
205
206     This method should return a three-node tuple consisting of: a dict
207     containing the environment that will be used for running the
208     specific hook for this LU, a list of node names on which the hook
209     should run before the execution, and a list of node names on which
210     the hook should run after the execution.
211
212     The keys of the dict must not have 'GANETI_' prefixed as this will
213     be handled in the hooks runner. Also note additional keys will be
214     added by the hooks runner. If the LU doesn't define any
215     environment, an empty dict (and not None) should be returned.
216
217     No nodes should be returned as an empty list (and not None).
218
219     Note that if the HPATH for a LU class is None, this function will
220     not be called.
221
222     """
223     raise NotImplementedError
224
225   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226     """Notify the LU about the results of its hooks.
227
228     This method is called every time a hooks phase is executed, and notifies
229     the Logical Unit about the hooks' result. The LU can then use it to alter
230     its result based on the hooks.  By default the method does nothing and the
231     previous result is passed back unchanged but any LU can define it if it
232     wants to use the local cluster hook-scripts somehow.
233
234     Args:
235       phase: the hooks phase that has just been run
236       hooks_results: the results of the multi-node hooks rpc call
237       feedback_fn: function to send feedback back to the caller
238       lu_result: the previous result this LU had, or None in the PRE phase.
239
240     """
241     return lu_result
242
243   def _ExpandAndLockInstance(self):
244     """Helper function to expand and lock an instance.
245
246     Many LUs that work on an instance take its name in self.op.instance_name
247     and need to expand it and then declare the expanded name for locking. This
248     function does it, and then updates self.op.instance_name to the expanded
249     name. It also initializes needed_locks as a dict, if this hasn't been done
250     before.
251
252     """
253     if self.needed_locks is None:
254       self.needed_locks = {}
255     else:
256       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257         "_ExpandAndLockInstance called with instance-level locks set"
258     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259     if expanded_name is None:
260       raise errors.OpPrereqError("Instance '%s' not known" %
261                                   self.op.instance_name)
262     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263     self.op.instance_name = expanded_name
264
265   def _LockInstancesNodes(self, primary_only=False):
266     """Helper function to declare instances' nodes for locking.
267
268     This function should be called after locking one or more instances to lock
269     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270     with all primary or secondary nodes for instances already locked and
271     present in self.needed_locks[locking.LEVEL_INSTANCE].
272
273     It should be called from DeclareLocks, and for safety only works if
274     self.recalculate_locks[locking.LEVEL_NODE] is set.
275
276     In the future it may grow parameters to just lock some instance's nodes, or
277     to just lock primaries or secondary nodes, if needed.
278
279     If should be called in DeclareLocks in a way similar to:
280
281     if level == locking.LEVEL_NODE:
282       self._LockInstancesNodes()
283
284     @type primary_only: boolean
285     @param primary_only: only lock primary nodes of locked instances
286
287     """
288     assert locking.LEVEL_NODE in self.recalculate_locks, \
289       "_LockInstancesNodes helper function called with no nodes to recalculate"
290
291     # TODO: check if we're really been called with the instance locks held
292
293     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
294     # future we might want to have different behaviors depending on the value
295     # of self.recalculate_locks[locking.LEVEL_NODE]
296     wanted_nodes = []
297     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
298       instance = self.context.cfg.GetInstanceInfo(instance_name)
299       wanted_nodes.append(instance.primary_node)
300       if not primary_only:
301         wanted_nodes.extend(instance.secondary_nodes)
302
303     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
304       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
305     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
306       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
307
308     del self.recalculate_locks[locking.LEVEL_NODE]
309
310
311 class NoHooksLU(LogicalUnit):
312   """Simple LU which runs no hooks.
313
314   This LU is intended as a parent for other LogicalUnits which will
315   run no hooks, in order to reduce duplicate code.
316
317   """
318   HPATH = None
319   HTYPE = None
320
321
322 def _GetWantedNodes(lu, nodes):
323   """Returns list of checked and expanded node names.
324
325   Args:
326     nodes: List of nodes (strings) or None for all
327
328   """
329   if not isinstance(nodes, list):
330     raise errors.OpPrereqError("Invalid argument type 'nodes'")
331
332   if not nodes:
333     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
334       " non-empty list of nodes whose name is to be expanded.")
335
336   wanted = []
337   for name in nodes:
338     node = lu.cfg.ExpandNodeName(name)
339     if node is None:
340       raise errors.OpPrereqError("No such node name '%s'" % name)
341     wanted.append(node)
342
343   return utils.NiceSort(wanted)
344
345
346 def _GetWantedInstances(lu, instances):
347   """Returns list of checked and expanded instance names.
348
349   Args:
350     instances: List of instances (strings) or None for all
351
352   """
353   if not isinstance(instances, list):
354     raise errors.OpPrereqError("Invalid argument type 'instances'")
355
356   if instances:
357     wanted = []
358
359     for name in instances:
360       instance = lu.cfg.ExpandInstanceName(name)
361       if instance is None:
362         raise errors.OpPrereqError("No such instance name '%s'" % name)
363       wanted.append(instance)
364
365   else:
366     wanted = lu.cfg.GetInstanceList()
367   return utils.NiceSort(wanted)
368
369
370 def _CheckOutputFields(static, dynamic, selected):
371   """Checks whether all selected fields are valid.
372
373   Args:
374     static: Static fields
375     dynamic: Dynamic fields
376
377   """
378   static_fields = frozenset(static)
379   dynamic_fields = frozenset(dynamic)
380
381   all_fields = static_fields | dynamic_fields
382
383   if not all_fields.issuperset(selected):
384     raise errors.OpPrereqError("Unknown output fields selected: %s"
385                                % ",".join(frozenset(selected).
386                                           difference(all_fields)))
387
388
389 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
390                           memory, vcpus, nics):
391   """Builds instance related env variables for hooks from single variables.
392
393   Args:
394     secondary_nodes: List of secondary nodes as strings
395   """
396   env = {
397     "OP_TARGET": name,
398     "INSTANCE_NAME": name,
399     "INSTANCE_PRIMARY": primary_node,
400     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
401     "INSTANCE_OS_TYPE": os_type,
402     "INSTANCE_STATUS": status,
403     "INSTANCE_MEMORY": memory,
404     "INSTANCE_VCPUS": vcpus,
405   }
406
407   if nics:
408     nic_count = len(nics)
409     for idx, (ip, bridge, mac) in enumerate(nics):
410       if ip is None:
411         ip = ""
412       env["INSTANCE_NIC%d_IP" % idx] = ip
413       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
414       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
415   else:
416     nic_count = 0
417
418   env["INSTANCE_NIC_COUNT"] = nic_count
419
420   return env
421
422
423 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
424   """Builds instance related env variables for hooks from an object.
425
426   Args:
427     instance: objects.Instance object of instance
428     override: dict of values to override
429   """
430   bep = lu.cfg.GetClusterInfo().FillBE(instance)
431   args = {
432     'name': instance.name,
433     'primary_node': instance.primary_node,
434     'secondary_nodes': instance.secondary_nodes,
435     'os_type': instance.os,
436     'status': instance.os,
437     'memory': bep[constants.BE_MEMORY],
438     'vcpus': bep[constants.BE_VCPUS],
439     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
440   }
441   if override:
442     args.update(override)
443   return _BuildInstanceHookEnv(**args)
444
445
446 def _CheckInstanceBridgesExist(lu, instance):
447   """Check that the brigdes needed by an instance exist.
448
449   """
450   # check bridges existance
451   brlist = [nic.bridge for nic in instance.nics]
452   if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
453     raise errors.OpPrereqError("one or more target bridges %s does not"
454                                " exist on destination node '%s'" %
455                                (brlist, instance.primary_node))
456
457
458 class LUDestroyCluster(NoHooksLU):
459   """Logical unit for destroying the cluster.
460
461   """
462   _OP_REQP = []
463
464   def CheckPrereq(self):
465     """Check prerequisites.
466
467     This checks whether the cluster is empty.
468
469     Any errors are signalled by raising errors.OpPrereqError.
470
471     """
472     master = self.cfg.GetMasterNode()
473
474     nodelist = self.cfg.GetNodeList()
475     if len(nodelist) != 1 or nodelist[0] != master:
476       raise errors.OpPrereqError("There are still %d node(s) in"
477                                  " this cluster." % (len(nodelist) - 1))
478     instancelist = self.cfg.GetInstanceList()
479     if instancelist:
480       raise errors.OpPrereqError("There are still %d instance(s) in"
481                                  " this cluster." % len(instancelist))
482
483   def Exec(self, feedback_fn):
484     """Destroys the cluster.
485
486     """
487     master = self.cfg.GetMasterNode()
488     if not self.rpc.call_node_stop_master(master, False):
489       raise errors.OpExecError("Could not disable the master role")
490     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
491     utils.CreateBackup(priv_key)
492     utils.CreateBackup(pub_key)
493     return master
494
495
496 class LUVerifyCluster(LogicalUnit):
497   """Verifies the cluster status.
498
499   """
500   HPATH = "cluster-verify"
501   HTYPE = constants.HTYPE_CLUSTER
502   _OP_REQP = ["skip_checks"]
503   REQ_BGL = False
504
505   def ExpandNames(self):
506     self.needed_locks = {
507       locking.LEVEL_NODE: locking.ALL_SET,
508       locking.LEVEL_INSTANCE: locking.ALL_SET,
509     }
510     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
511
512   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
513                   remote_version, feedback_fn):
514     """Run multiple tests against a node.
515
516     Test list:
517       - compares ganeti version
518       - checks vg existance and size > 20G
519       - checks config file checksum
520       - checks ssh to other nodes
521
522     Args:
523       node: name of the node to check
524       file_list: required list of files
525       local_cksum: dictionary of local files and their checksums
526
527     """
528     # compares ganeti version
529     local_version = constants.PROTOCOL_VERSION
530     if not remote_version:
531       feedback_fn("  - ERROR: connection to %s failed" % (node))
532       return True
533
534     if local_version != remote_version:
535       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
536                       (local_version, node, remote_version))
537       return True
538
539     # checks vg existance and size > 20G
540
541     bad = False
542     if not vglist:
543       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
544                       (node,))
545       bad = True
546     else:
547       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
548                                             constants.MIN_VG_SIZE)
549       if vgstatus:
550         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
551         bad = True
552
553     if not node_result:
554       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
555       return True
556
557     # checks config file checksum
558     # checks ssh to any
559
560     if 'filelist' not in node_result:
561       bad = True
562       feedback_fn("  - ERROR: node hasn't returned file checksum data")
563     else:
564       remote_cksum = node_result['filelist']
565       for file_name in file_list:
566         if file_name not in remote_cksum:
567           bad = True
568           feedback_fn("  - ERROR: file '%s' missing" % file_name)
569         elif remote_cksum[file_name] != local_cksum[file_name]:
570           bad = True
571           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
572
573     if 'nodelist' not in node_result:
574       bad = True
575       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
576     else:
577       if node_result['nodelist']:
578         bad = True
579         for node in node_result['nodelist']:
580           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
581                           (node, node_result['nodelist'][node]))
582     if 'node-net-test' not in node_result:
583       bad = True
584       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
585     else:
586       if node_result['node-net-test']:
587         bad = True
588         nlist = utils.NiceSort(node_result['node-net-test'].keys())
589         for node in nlist:
590           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
591                           (node, node_result['node-net-test'][node]))
592
593     hyp_result = node_result.get('hypervisor', None)
594     if isinstance(hyp_result, dict):
595       for hv_name, hv_result in hyp_result.iteritems():
596         if hv_result is not None:
597           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
598                       (hv_name, hv_result))
599     return bad
600
601   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
602                       node_instance, feedback_fn):
603     """Verify an instance.
604
605     This function checks to see if the required block devices are
606     available on the instance's node.
607
608     """
609     bad = False
610
611     node_current = instanceconfig.primary_node
612
613     node_vol_should = {}
614     instanceconfig.MapLVsByNode(node_vol_should)
615
616     for node in node_vol_should:
617       for volume in node_vol_should[node]:
618         if node not in node_vol_is or volume not in node_vol_is[node]:
619           feedback_fn("  - ERROR: volume %s missing on node %s" %
620                           (volume, node))
621           bad = True
622
623     if not instanceconfig.status == 'down':
624       if (node_current not in node_instance or
625           not instance in node_instance[node_current]):
626         feedback_fn("  - ERROR: instance %s not running on node %s" %
627                         (instance, node_current))
628         bad = True
629
630     for node in node_instance:
631       if (not node == node_current):
632         if instance in node_instance[node]:
633           feedback_fn("  - ERROR: instance %s should not run on node %s" %
634                           (instance, node))
635           bad = True
636
637     return bad
638
639   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
640     """Verify if there are any unknown volumes in the cluster.
641
642     The .os, .swap and backup volumes are ignored. All other volumes are
643     reported as unknown.
644
645     """
646     bad = False
647
648     for node in node_vol_is:
649       for volume in node_vol_is[node]:
650         if node not in node_vol_should or volume not in node_vol_should[node]:
651           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
652                       (volume, node))
653           bad = True
654     return bad
655
656   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
657     """Verify the list of running instances.
658
659     This checks what instances are running but unknown to the cluster.
660
661     """
662     bad = False
663     for node in node_instance:
664       for runninginstance in node_instance[node]:
665         if runninginstance not in instancelist:
666           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
667                           (runninginstance, node))
668           bad = True
669     return bad
670
671   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
672     """Verify N+1 Memory Resilience.
673
674     Check that if one single node dies we can still start all the instances it
675     was primary for.
676
677     """
678     bad = False
679
680     for node, nodeinfo in node_info.iteritems():
681       # This code checks that every node which is now listed as secondary has
682       # enough memory to host all instances it is supposed to should a single
683       # other node in the cluster fail.
684       # FIXME: not ready for failover to an arbitrary node
685       # FIXME: does not support file-backed instances
686       # WARNING: we currently take into account down instances as well as up
687       # ones, considering that even if they're down someone might want to start
688       # them even in the event of a node failure.
689       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
690         needed_mem = 0
691         for instance in instances:
692           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
693           if bep[constants.BE_AUTO_BALANCE]:
694             needed_mem += bep[constants.BE_MEMORY]
695         if nodeinfo['mfree'] < needed_mem:
696           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
697                       " failovers should node %s fail" % (node, prinode))
698           bad = True
699     return bad
700
701   def CheckPrereq(self):
702     """Check prerequisites.
703
704     Transform the list of checks we're going to skip into a set and check that
705     all its members are valid.
706
707     """
708     self.skip_set = frozenset(self.op.skip_checks)
709     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
710       raise errors.OpPrereqError("Invalid checks to be skipped specified")
711
712   def BuildHooksEnv(self):
713     """Build hooks env.
714
715     Cluster-Verify hooks just rone in the post phase and their failure makes
716     the output be logged in the verify output and the verification to fail.
717
718     """
719     all_nodes = self.cfg.GetNodeList()
720     # TODO: populate the environment with useful information for verify hooks
721     env = {}
722     return env, [], all_nodes
723
724   def Exec(self, feedback_fn):
725     """Verify integrity of cluster, performing various test on nodes.
726
727     """
728     bad = False
729     feedback_fn("* Verifying global settings")
730     for msg in self.cfg.VerifyConfig():
731       feedback_fn("  - ERROR: %s" % msg)
732
733     vg_name = self.cfg.GetVGName()
734     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
735     nodelist = utils.NiceSort(self.cfg.GetNodeList())
736     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
737     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
738     i_non_redundant = [] # Non redundant instances
739     i_non_a_balanced = [] # Non auto-balanced instances
740     node_volume = {}
741     node_instance = {}
742     node_info = {}
743     instance_cfg = {}
744
745     # FIXME: verify OS list
746     # do local checksums
747     file_names = []
748     file_names.append(constants.SSL_CERT_FILE)
749     file_names.append(constants.CLUSTER_CONF_FILE)
750     local_checksums = utils.FingerprintFiles(file_names)
751
752     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
753     all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
754     all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
755     all_vglist = self.rpc.call_vg_list(nodelist)
756     node_verify_param = {
757       'filelist': file_names,
758       'nodelist': nodelist,
759       'hypervisor': hypervisors,
760       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
761                         for node in nodeinfo]
762       }
763     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
764                                            self.cfg.GetClusterName())
765     all_rversion = self.rpc.call_version(nodelist)
766     all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
767                                         self.cfg.GetHypervisorType())
768
769     cluster = self.cfg.GetClusterInfo()
770     for node in nodelist:
771       feedback_fn("* Verifying node %s" % node)
772       result = self._VerifyNode(node, file_names, local_checksums,
773                                 all_vglist[node], all_nvinfo[node],
774                                 all_rversion[node], feedback_fn)
775       bad = bad or result
776
777       # node_volume
778       volumeinfo = all_volumeinfo[node]
779
780       if isinstance(volumeinfo, basestring):
781         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
782                     (node, volumeinfo[-400:].encode('string_escape')))
783         bad = True
784         node_volume[node] = {}
785       elif not isinstance(volumeinfo, dict):
786         feedback_fn("  - ERROR: connection to %s failed" % (node,))
787         bad = True
788         continue
789       else:
790         node_volume[node] = volumeinfo
791
792       # node_instance
793       nodeinstance = all_instanceinfo[node]
794       if type(nodeinstance) != list:
795         feedback_fn("  - ERROR: connection to %s failed" % (node,))
796         bad = True
797         continue
798
799       node_instance[node] = nodeinstance
800
801       # node_info
802       nodeinfo = all_ninfo[node]
803       if not isinstance(nodeinfo, dict):
804         feedback_fn("  - ERROR: connection to %s failed" % (node,))
805         bad = True
806         continue
807
808       try:
809         node_info[node] = {
810           "mfree": int(nodeinfo['memory_free']),
811           "dfree": int(nodeinfo['vg_free']),
812           "pinst": [],
813           "sinst": [],
814           # dictionary holding all instances this node is secondary for,
815           # grouped by their primary node. Each key is a cluster node, and each
816           # value is a list of instances which have the key as primary and the
817           # current node as secondary.  this is handy to calculate N+1 memory
818           # availability if you can only failover from a primary to its
819           # secondary.
820           "sinst-by-pnode": {},
821         }
822       except ValueError:
823         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
824         bad = True
825         continue
826
827     node_vol_should = {}
828
829     for instance in instancelist:
830       feedback_fn("* Verifying instance %s" % instance)
831       inst_config = self.cfg.GetInstanceInfo(instance)
832       result =  self._VerifyInstance(instance, inst_config, node_volume,
833                                      node_instance, feedback_fn)
834       bad = bad or result
835
836       inst_config.MapLVsByNode(node_vol_should)
837
838       instance_cfg[instance] = inst_config
839
840       pnode = inst_config.primary_node
841       if pnode in node_info:
842         node_info[pnode]['pinst'].append(instance)
843       else:
844         feedback_fn("  - ERROR: instance %s, connection to primary node"
845                     " %s failed" % (instance, pnode))
846         bad = True
847
848       # If the instance is non-redundant we cannot survive losing its primary
849       # node, so we are not N+1 compliant. On the other hand we have no disk
850       # templates with more than one secondary so that situation is not well
851       # supported either.
852       # FIXME: does not support file-backed instances
853       if len(inst_config.secondary_nodes) == 0:
854         i_non_redundant.append(instance)
855       elif len(inst_config.secondary_nodes) > 1:
856         feedback_fn("  - WARNING: multiple secondaries for instance %s"
857                     % instance)
858
859       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
860         i_non_a_balanced.append(instance)
861
862       for snode in inst_config.secondary_nodes:
863         if snode in node_info:
864           node_info[snode]['sinst'].append(instance)
865           if pnode not in node_info[snode]['sinst-by-pnode']:
866             node_info[snode]['sinst-by-pnode'][pnode] = []
867           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
868         else:
869           feedback_fn("  - ERROR: instance %s, connection to secondary node"
870                       " %s failed" % (instance, snode))
871
872     feedback_fn("* Verifying orphan volumes")
873     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
874                                        feedback_fn)
875     bad = bad or result
876
877     feedback_fn("* Verifying remaining instances")
878     result = self._VerifyOrphanInstances(instancelist, node_instance,
879                                          feedback_fn)
880     bad = bad or result
881
882     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
883       feedback_fn("* Verifying N+1 Memory redundancy")
884       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
885       bad = bad or result
886
887     feedback_fn("* Other Notes")
888     if i_non_redundant:
889       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
890                   % len(i_non_redundant))
891
892     if i_non_a_balanced:
893       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
894                   % len(i_non_a_balanced))
895
896     return not bad
897
898   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
899     """Analize the post-hooks' result, handle it, and send some
900     nicely-formatted feedback back to the user.
901
902     Args:
903       phase: the hooks phase that has just been run
904       hooks_results: the results of the multi-node hooks rpc call
905       feedback_fn: function to send feedback back to the caller
906       lu_result: previous Exec result
907
908     """
909     # We only really run POST phase hooks, and are only interested in
910     # their results
911     if phase == constants.HOOKS_PHASE_POST:
912       # Used to change hooks' output to proper indentation
913       indent_re = re.compile('^', re.M)
914       feedback_fn("* Hooks Results")
915       if not hooks_results:
916         feedback_fn("  - ERROR: general communication failure")
917         lu_result = 1
918       else:
919         for node_name in hooks_results:
920           show_node_header = True
921           res = hooks_results[node_name]
922           if res is False or not isinstance(res, list):
923             feedback_fn("    Communication failure")
924             lu_result = 1
925             continue
926           for script, hkr, output in res:
927             if hkr == constants.HKR_FAIL:
928               # The node header is only shown once, if there are
929               # failing hooks on that node
930               if show_node_header:
931                 feedback_fn("  Node %s:" % node_name)
932                 show_node_header = False
933               feedback_fn("    ERROR: Script %s failed, output:" % script)
934               output = indent_re.sub('      ', output)
935               feedback_fn("%s" % output)
936               lu_result = 1
937
938       return lu_result
939
940
941 class LUVerifyDisks(NoHooksLU):
942   """Verifies the cluster disks status.
943
944   """
945   _OP_REQP = []
946   REQ_BGL = False
947
948   def ExpandNames(self):
949     self.needed_locks = {
950       locking.LEVEL_NODE: locking.ALL_SET,
951       locking.LEVEL_INSTANCE: locking.ALL_SET,
952     }
953     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
954
955   def CheckPrereq(self):
956     """Check prerequisites.
957
958     This has no prerequisites.
959
960     """
961     pass
962
963   def Exec(self, feedback_fn):
964     """Verify integrity of cluster disks.
965
966     """
967     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
968
969     vg_name = self.cfg.GetVGName()
970     nodes = utils.NiceSort(self.cfg.GetNodeList())
971     instances = [self.cfg.GetInstanceInfo(name)
972                  for name in self.cfg.GetInstanceList()]
973
974     nv_dict = {}
975     for inst in instances:
976       inst_lvs = {}
977       if (inst.status != "up" or
978           inst.disk_template not in constants.DTS_NET_MIRROR):
979         continue
980       inst.MapLVsByNode(inst_lvs)
981       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
982       for node, vol_list in inst_lvs.iteritems():
983         for vol in vol_list:
984           nv_dict[(node, vol)] = inst
985
986     if not nv_dict:
987       return result
988
989     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
990
991     to_act = set()
992     for node in nodes:
993       # node_volume
994       lvs = node_lvs[node]
995
996       if isinstance(lvs, basestring):
997         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
998         res_nlvm[node] = lvs
999       elif not isinstance(lvs, dict):
1000         logging.warning("Connection to node %s failed or invalid data"
1001                         " returned", node)
1002         res_nodes.append(node)
1003         continue
1004
1005       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1006         inst = nv_dict.pop((node, lv_name), None)
1007         if (not lv_online and inst is not None
1008             and inst.name not in res_instances):
1009           res_instances.append(inst.name)
1010
1011     # any leftover items in nv_dict are missing LVs, let's arrange the
1012     # data better
1013     for key, inst in nv_dict.iteritems():
1014       if inst.name not in res_missing:
1015         res_missing[inst.name] = []
1016       res_missing[inst.name].append(key)
1017
1018     return result
1019
1020
1021 class LURenameCluster(LogicalUnit):
1022   """Rename the cluster.
1023
1024   """
1025   HPATH = "cluster-rename"
1026   HTYPE = constants.HTYPE_CLUSTER
1027   _OP_REQP = ["name"]
1028
1029   def BuildHooksEnv(self):
1030     """Build hooks env.
1031
1032     """
1033     env = {
1034       "OP_TARGET": self.cfg.GetClusterName(),
1035       "NEW_NAME": self.op.name,
1036       }
1037     mn = self.cfg.GetMasterNode()
1038     return env, [mn], [mn]
1039
1040   def CheckPrereq(self):
1041     """Verify that the passed name is a valid one.
1042
1043     """
1044     hostname = utils.HostInfo(self.op.name)
1045
1046     new_name = hostname.name
1047     self.ip = new_ip = hostname.ip
1048     old_name = self.cfg.GetClusterName()
1049     old_ip = self.cfg.GetMasterIP()
1050     if new_name == old_name and new_ip == old_ip:
1051       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1052                                  " cluster has changed")
1053     if new_ip != old_ip:
1054       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1055         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1056                                    " reachable on the network. Aborting." %
1057                                    new_ip)
1058
1059     self.op.name = new_name
1060
1061   def Exec(self, feedback_fn):
1062     """Rename the cluster.
1063
1064     """
1065     clustername = self.op.name
1066     ip = self.ip
1067
1068     # shutdown the master IP
1069     master = self.cfg.GetMasterNode()
1070     if not self.rpc.call_node_stop_master(master, False):
1071       raise errors.OpExecError("Could not disable the master role")
1072
1073     try:
1074       # modify the sstore
1075       # TODO: sstore
1076       ss.SetKey(ss.SS_MASTER_IP, ip)
1077       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1078
1079       # Distribute updated ss config to all nodes
1080       myself = self.cfg.GetNodeInfo(master)
1081       dist_nodes = self.cfg.GetNodeList()
1082       if myself.name in dist_nodes:
1083         dist_nodes.remove(myself.name)
1084
1085       logging.debug("Copying updated ssconf data to all nodes")
1086       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1087         fname = ss.KeyToFilename(keyname)
1088         result = self.rpc.call_upload_file(dist_nodes, fname)
1089         for to_node in dist_nodes:
1090           if not result[to_node]:
1091             logging.error("Copy of file %s to node %s failed", fname, to_node)
1092     finally:
1093       if not self.rpc.call_node_start_master(master, False):
1094         logging.error("Could not re-enable the master role on the master,"
1095                       " please restart manually.")
1096
1097
1098 def _RecursiveCheckIfLVMBased(disk):
1099   """Check if the given disk or its children are lvm-based.
1100
1101   Args:
1102     disk: ganeti.objects.Disk object
1103
1104   Returns:
1105     boolean indicating whether a LD_LV dev_type was found or not
1106
1107   """
1108   if disk.children:
1109     for chdisk in disk.children:
1110       if _RecursiveCheckIfLVMBased(chdisk):
1111         return True
1112   return disk.dev_type == constants.LD_LV
1113
1114
1115 class LUSetClusterParams(LogicalUnit):
1116   """Change the parameters of the cluster.
1117
1118   """
1119   HPATH = "cluster-modify"
1120   HTYPE = constants.HTYPE_CLUSTER
1121   _OP_REQP = []
1122   REQ_BGL = False
1123
1124   def ExpandNames(self):
1125     # FIXME: in the future maybe other cluster params won't require checking on
1126     # all nodes to be modified.
1127     self.needed_locks = {
1128       locking.LEVEL_NODE: locking.ALL_SET,
1129     }
1130     self.share_locks[locking.LEVEL_NODE] = 1
1131
1132   def BuildHooksEnv(self):
1133     """Build hooks env.
1134
1135     """
1136     env = {
1137       "OP_TARGET": self.cfg.GetClusterName(),
1138       "NEW_VG_NAME": self.op.vg_name,
1139       }
1140     mn = self.cfg.GetMasterNode()
1141     return env, [mn], [mn]
1142
1143   def CheckPrereq(self):
1144     """Check prerequisites.
1145
1146     This checks whether the given params don't conflict and
1147     if the given volume group is valid.
1148
1149     """
1150     # FIXME: This only works because there is only one parameter that can be
1151     # changed or removed.
1152     if self.op.vg_name is not None and not self.op.vg_name:
1153       instances = self.cfg.GetAllInstancesInfo().values()
1154       for inst in instances:
1155         for disk in inst.disks:
1156           if _RecursiveCheckIfLVMBased(disk):
1157             raise errors.OpPrereqError("Cannot disable lvm storage while"
1158                                        " lvm-based instances exist")
1159
1160     node_list = self.acquired_locks[locking.LEVEL_NODE]
1161
1162     # if vg_name not None, checks given volume group on all nodes
1163     if self.op.vg_name:
1164       vglist = self.rpc.call_vg_list(node_list)
1165       for node in node_list:
1166         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1167                                               constants.MIN_VG_SIZE)
1168         if vgstatus:
1169           raise errors.OpPrereqError("Error on node '%s': %s" %
1170                                      (node, vgstatus))
1171
1172     self.cluster = cluster = self.cfg.GetClusterInfo()
1173     # beparams changes do not need validation (we can't validate?),
1174     # but we still process here
1175     if self.op.beparams:
1176       self.new_beparams = cluster.FillDict(
1177         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1178
1179     # hypervisor list/parameters
1180     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1181     if self.op.hvparams:
1182       if not isinstance(self.op.hvparams, dict):
1183         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1184       for hv_name, hv_dict in self.op.hvparams.items():
1185         if hv_name not in self.new_hvparams:
1186           self.new_hvparams[hv_name] = hv_dict
1187         else:
1188           self.new_hvparams[hv_name].update(hv_dict)
1189
1190     if self.op.enabled_hypervisors is not None:
1191       self.hv_list = self.op.enabled_hypervisors
1192     else:
1193       self.hv_list = cluster.enabled_hypervisors
1194
1195     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1196       # either the enabled list has changed, or the parameters have, validate
1197       for hv_name, hv_params in self.new_hvparams.items():
1198         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1199             (self.op.enabled_hypervisors and
1200              hv_name in self.op.enabled_hypervisors)):
1201           # either this is a new hypervisor, or its parameters have changed
1202           hv_class = hypervisor.GetHypervisor(hv_name)
1203           hv_class.CheckParameterSyntax(hv_params)
1204           _CheckHVParams(self, node_list, hv_name, hv_params)
1205
1206   def Exec(self, feedback_fn):
1207     """Change the parameters of the cluster.
1208
1209     """
1210     if self.op.vg_name is not None:
1211       if self.op.vg_name != self.cfg.GetVGName():
1212         self.cfg.SetVGName(self.op.vg_name)
1213       else:
1214         feedback_fn("Cluster LVM configuration already in desired"
1215                     " state, not changing")
1216     if self.op.hvparams:
1217       self.cluster.hvparams = self.new_hvparams
1218     if self.op.enabled_hypervisors is not None:
1219       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1220     if self.op.beparams:
1221       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1222     self.cfg.Update(self.cluster)
1223
1224
1225 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1226   """Sleep and poll for an instance's disk to sync.
1227
1228   """
1229   if not instance.disks:
1230     return True
1231
1232   if not oneshot:
1233     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1234
1235   node = instance.primary_node
1236
1237   for dev in instance.disks:
1238     lu.cfg.SetDiskID(dev, node)
1239
1240   retries = 0
1241   while True:
1242     max_time = 0
1243     done = True
1244     cumul_degraded = False
1245     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1246     if not rstats:
1247       lu.proc.LogWarning("Can't get any data from node %s" % node)
1248       retries += 1
1249       if retries >= 10:
1250         raise errors.RemoteError("Can't contact node %s for mirror data,"
1251                                  " aborting." % node)
1252       time.sleep(6)
1253       continue
1254     retries = 0
1255     for i in range(len(rstats)):
1256       mstat = rstats[i]
1257       if mstat is None:
1258         lu.proc.LogWarning("Can't compute data for node %s/%s" %
1259                            (node, instance.disks[i].iv_name))
1260         continue
1261       # we ignore the ldisk parameter
1262       perc_done, est_time, is_degraded, _ = mstat
1263       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1264       if perc_done is not None:
1265         done = False
1266         if est_time is not None:
1267           rem_time = "%d estimated seconds remaining" % est_time
1268           max_time = est_time
1269         else:
1270           rem_time = "no time estimate"
1271         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1272                         (instance.disks[i].iv_name, perc_done, rem_time))
1273     if done or oneshot:
1274       break
1275
1276     time.sleep(min(60, max_time))
1277
1278   if done:
1279     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1280   return not cumul_degraded
1281
1282
1283 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1284   """Check that mirrors are not degraded.
1285
1286   The ldisk parameter, if True, will change the test from the
1287   is_degraded attribute (which represents overall non-ok status for
1288   the device(s)) to the ldisk (representing the local storage status).
1289
1290   """
1291   lu.cfg.SetDiskID(dev, node)
1292   if ldisk:
1293     idx = 6
1294   else:
1295     idx = 5
1296
1297   result = True
1298   if on_primary or dev.AssembleOnSecondary():
1299     rstats = lu.rpc.call_blockdev_find(node, dev)
1300     if not rstats:
1301       logging.warning("Node %s: disk degraded, not found or node down", node)
1302       result = False
1303     else:
1304       result = result and (not rstats[idx])
1305   if dev.children:
1306     for child in dev.children:
1307       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1308
1309   return result
1310
1311
1312 class LUDiagnoseOS(NoHooksLU):
1313   """Logical unit for OS diagnose/query.
1314
1315   """
1316   _OP_REQP = ["output_fields", "names"]
1317   REQ_BGL = False
1318
1319   def ExpandNames(self):
1320     if self.op.names:
1321       raise errors.OpPrereqError("Selective OS query not supported")
1322
1323     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1324     _CheckOutputFields(static=[],
1325                        dynamic=self.dynamic_fields,
1326                        selected=self.op.output_fields)
1327
1328     # Lock all nodes, in shared mode
1329     self.needed_locks = {}
1330     self.share_locks[locking.LEVEL_NODE] = 1
1331     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1332
1333   def CheckPrereq(self):
1334     """Check prerequisites.
1335
1336     """
1337
1338   @staticmethod
1339   def _DiagnoseByOS(node_list, rlist):
1340     """Remaps a per-node return list into an a per-os per-node dictionary
1341
1342       Args:
1343         node_list: a list with the names of all nodes
1344         rlist: a map with node names as keys and OS objects as values
1345
1346       Returns:
1347         map: a map with osnames as keys and as value another map, with
1348              nodes as
1349              keys and list of OS objects as values
1350              e.g. {"debian-etch": {"node1": [<object>,...],
1351                                    "node2": [<object>,]}
1352                   }
1353
1354     """
1355     all_os = {}
1356     for node_name, nr in rlist.iteritems():
1357       if not nr:
1358         continue
1359       for os_obj in nr:
1360         if os_obj.name not in all_os:
1361           # build a list of nodes for this os containing empty lists
1362           # for each node in node_list
1363           all_os[os_obj.name] = {}
1364           for nname in node_list:
1365             all_os[os_obj.name][nname] = []
1366         all_os[os_obj.name][node_name].append(os_obj)
1367     return all_os
1368
1369   def Exec(self, feedback_fn):
1370     """Compute the list of OSes.
1371
1372     """
1373     node_list = self.acquired_locks[locking.LEVEL_NODE]
1374     node_data = self.rpc.call_os_diagnose(node_list)
1375     if node_data == False:
1376       raise errors.OpExecError("Can't gather the list of OSes")
1377     pol = self._DiagnoseByOS(node_list, node_data)
1378     output = []
1379     for os_name, os_data in pol.iteritems():
1380       row = []
1381       for field in self.op.output_fields:
1382         if field == "name":
1383           val = os_name
1384         elif field == "valid":
1385           val = utils.all([osl and osl[0] for osl in os_data.values()])
1386         elif field == "node_status":
1387           val = {}
1388           for node_name, nos_list in os_data.iteritems():
1389             val[node_name] = [(v.status, v.path) for v in nos_list]
1390         else:
1391           raise errors.ParameterError(field)
1392         row.append(val)
1393       output.append(row)
1394
1395     return output
1396
1397
1398 class LURemoveNode(LogicalUnit):
1399   """Logical unit for removing a node.
1400
1401   """
1402   HPATH = "node-remove"
1403   HTYPE = constants.HTYPE_NODE
1404   _OP_REQP = ["node_name"]
1405
1406   def BuildHooksEnv(self):
1407     """Build hooks env.
1408
1409     This doesn't run on the target node in the pre phase as a failed
1410     node would then be impossible to remove.
1411
1412     """
1413     env = {
1414       "OP_TARGET": self.op.node_name,
1415       "NODE_NAME": self.op.node_name,
1416       }
1417     all_nodes = self.cfg.GetNodeList()
1418     all_nodes.remove(self.op.node_name)
1419     return env, all_nodes, all_nodes
1420
1421   def CheckPrereq(self):
1422     """Check prerequisites.
1423
1424     This checks:
1425      - the node exists in the configuration
1426      - it does not have primary or secondary instances
1427      - it's not the master
1428
1429     Any errors are signalled by raising errors.OpPrereqError.
1430
1431     """
1432     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1433     if node is None:
1434       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1435
1436     instance_list = self.cfg.GetInstanceList()
1437
1438     masternode = self.cfg.GetMasterNode()
1439     if node.name == masternode:
1440       raise errors.OpPrereqError("Node is the master node,"
1441                                  " you need to failover first.")
1442
1443     for instance_name in instance_list:
1444       instance = self.cfg.GetInstanceInfo(instance_name)
1445       if node.name == instance.primary_node:
1446         raise errors.OpPrereqError("Instance %s still running on the node,"
1447                                    " please remove first." % instance_name)
1448       if node.name in instance.secondary_nodes:
1449         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1450                                    " please remove first." % instance_name)
1451     self.op.node_name = node.name
1452     self.node = node
1453
1454   def Exec(self, feedback_fn):
1455     """Removes the node from the cluster.
1456
1457     """
1458     node = self.node
1459     logging.info("Stopping the node daemon and removing configs from node %s",
1460                  node.name)
1461
1462     self.context.RemoveNode(node.name)
1463
1464     self.rpc.call_node_leave_cluster(node.name)
1465
1466
1467 class LUQueryNodes(NoHooksLU):
1468   """Logical unit for querying nodes.
1469
1470   """
1471   _OP_REQP = ["output_fields", "names"]
1472   REQ_BGL = False
1473
1474   def ExpandNames(self):
1475     self.dynamic_fields = frozenset([
1476       "dtotal", "dfree",
1477       "mtotal", "mnode", "mfree",
1478       "bootid",
1479       "ctotal",
1480       ])
1481
1482     self.static_fields = frozenset([
1483       "name", "pinst_cnt", "sinst_cnt",
1484       "pinst_list", "sinst_list",
1485       "pip", "sip", "tags",
1486       "serial_no",
1487       ])
1488
1489     _CheckOutputFields(static=self.static_fields,
1490                        dynamic=self.dynamic_fields,
1491                        selected=self.op.output_fields)
1492
1493     self.needed_locks = {}
1494     self.share_locks[locking.LEVEL_NODE] = 1
1495
1496     if self.op.names:
1497       self.wanted = _GetWantedNodes(self, self.op.names)
1498     else:
1499       self.wanted = locking.ALL_SET
1500
1501     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1502     if self.do_locking:
1503       # if we don't request only static fields, we need to lock the nodes
1504       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1505
1506
1507   def CheckPrereq(self):
1508     """Check prerequisites.
1509
1510     """
1511     # The validation of the node list is done in the _GetWantedNodes,
1512     # if non empty, and if empty, there's no validation to do
1513     pass
1514
1515   def Exec(self, feedback_fn):
1516     """Computes the list of nodes and their attributes.
1517
1518     """
1519     all_info = self.cfg.GetAllNodesInfo()
1520     if self.do_locking:
1521       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1522     elif self.wanted != locking.ALL_SET:
1523       nodenames = self.wanted
1524       missing = set(nodenames).difference(all_info.keys())
1525       if missing:
1526         raise errors.OpExecError(
1527           "Some nodes were removed before retrieving their data: %s" % missing)
1528     else:
1529       nodenames = all_info.keys()
1530
1531     nodenames = utils.NiceSort(nodenames)
1532     nodelist = [all_info[name] for name in nodenames]
1533
1534     # begin data gathering
1535
1536     if self.dynamic_fields.intersection(self.op.output_fields):
1537       live_data = {}
1538       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1539                                           self.cfg.GetHypervisorType())
1540       for name in nodenames:
1541         nodeinfo = node_data.get(name, None)
1542         if nodeinfo:
1543           live_data[name] = {
1544             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1545             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1546             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1547             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1548             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1549             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1550             "bootid": nodeinfo['bootid'],
1551             }
1552         else:
1553           live_data[name] = {}
1554     else:
1555       live_data = dict.fromkeys(nodenames, {})
1556
1557     node_to_primary = dict([(name, set()) for name in nodenames])
1558     node_to_secondary = dict([(name, set()) for name in nodenames])
1559
1560     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1561                              "sinst_cnt", "sinst_list"))
1562     if inst_fields & frozenset(self.op.output_fields):
1563       instancelist = self.cfg.GetInstanceList()
1564
1565       for instance_name in instancelist:
1566         inst = self.cfg.GetInstanceInfo(instance_name)
1567         if inst.primary_node in node_to_primary:
1568           node_to_primary[inst.primary_node].add(inst.name)
1569         for secnode in inst.secondary_nodes:
1570           if secnode in node_to_secondary:
1571             node_to_secondary[secnode].add(inst.name)
1572
1573     # end data gathering
1574
1575     output = []
1576     for node in nodelist:
1577       node_output = []
1578       for field in self.op.output_fields:
1579         if field == "name":
1580           val = node.name
1581         elif field == "pinst_list":
1582           val = list(node_to_primary[node.name])
1583         elif field == "sinst_list":
1584           val = list(node_to_secondary[node.name])
1585         elif field == "pinst_cnt":
1586           val = len(node_to_primary[node.name])
1587         elif field == "sinst_cnt":
1588           val = len(node_to_secondary[node.name])
1589         elif field == "pip":
1590           val = node.primary_ip
1591         elif field == "sip":
1592           val = node.secondary_ip
1593         elif field == "tags":
1594           val = list(node.GetTags())
1595         elif field == "serial_no":
1596           val = node.serial_no
1597         elif field in self.dynamic_fields:
1598           val = live_data[node.name].get(field, None)
1599         else:
1600           raise errors.ParameterError(field)
1601         node_output.append(val)
1602       output.append(node_output)
1603
1604     return output
1605
1606
1607 class LUQueryNodeVolumes(NoHooksLU):
1608   """Logical unit for getting volumes on node(s).
1609
1610   """
1611   _OP_REQP = ["nodes", "output_fields"]
1612   REQ_BGL = False
1613
1614   def ExpandNames(self):
1615     _CheckOutputFields(static=["node"],
1616                        dynamic=["phys", "vg", "name", "size", "instance"],
1617                        selected=self.op.output_fields)
1618
1619     self.needed_locks = {}
1620     self.share_locks[locking.LEVEL_NODE] = 1
1621     if not self.op.nodes:
1622       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1623     else:
1624       self.needed_locks[locking.LEVEL_NODE] = \
1625         _GetWantedNodes(self, self.op.nodes)
1626
1627   def CheckPrereq(self):
1628     """Check prerequisites.
1629
1630     This checks that the fields required are valid output fields.
1631
1632     """
1633     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1634
1635   def Exec(self, feedback_fn):
1636     """Computes the list of nodes and their attributes.
1637
1638     """
1639     nodenames = self.nodes
1640     volumes = self.rpc.call_node_volumes(nodenames)
1641
1642     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1643              in self.cfg.GetInstanceList()]
1644
1645     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1646
1647     output = []
1648     for node in nodenames:
1649       if node not in volumes or not volumes[node]:
1650         continue
1651
1652       node_vols = volumes[node][:]
1653       node_vols.sort(key=lambda vol: vol['dev'])
1654
1655       for vol in node_vols:
1656         node_output = []
1657         for field in self.op.output_fields:
1658           if field == "node":
1659             val = node
1660           elif field == "phys":
1661             val = vol['dev']
1662           elif field == "vg":
1663             val = vol['vg']
1664           elif field == "name":
1665             val = vol['name']
1666           elif field == "size":
1667             val = int(float(vol['size']))
1668           elif field == "instance":
1669             for inst in ilist:
1670               if node not in lv_by_node[inst]:
1671                 continue
1672               if vol['name'] in lv_by_node[inst][node]:
1673                 val = inst.name
1674                 break
1675             else:
1676               val = '-'
1677           else:
1678             raise errors.ParameterError(field)
1679           node_output.append(str(val))
1680
1681         output.append(node_output)
1682
1683     return output
1684
1685
1686 class LUAddNode(LogicalUnit):
1687   """Logical unit for adding node to the cluster.
1688
1689   """
1690   HPATH = "node-add"
1691   HTYPE = constants.HTYPE_NODE
1692   _OP_REQP = ["node_name"]
1693
1694   def BuildHooksEnv(self):
1695     """Build hooks env.
1696
1697     This will run on all nodes before, and on all nodes + the new node after.
1698
1699     """
1700     env = {
1701       "OP_TARGET": self.op.node_name,
1702       "NODE_NAME": self.op.node_name,
1703       "NODE_PIP": self.op.primary_ip,
1704       "NODE_SIP": self.op.secondary_ip,
1705       }
1706     nodes_0 = self.cfg.GetNodeList()
1707     nodes_1 = nodes_0 + [self.op.node_name, ]
1708     return env, nodes_0, nodes_1
1709
1710   def CheckPrereq(self):
1711     """Check prerequisites.
1712
1713     This checks:
1714      - the new node is not already in the config
1715      - it is resolvable
1716      - its parameters (single/dual homed) matches the cluster
1717
1718     Any errors are signalled by raising errors.OpPrereqError.
1719
1720     """
1721     node_name = self.op.node_name
1722     cfg = self.cfg
1723
1724     dns_data = utils.HostInfo(node_name)
1725
1726     node = dns_data.name
1727     primary_ip = self.op.primary_ip = dns_data.ip
1728     secondary_ip = getattr(self.op, "secondary_ip", None)
1729     if secondary_ip is None:
1730       secondary_ip = primary_ip
1731     if not utils.IsValidIP(secondary_ip):
1732       raise errors.OpPrereqError("Invalid secondary IP given")
1733     self.op.secondary_ip = secondary_ip
1734
1735     node_list = cfg.GetNodeList()
1736     if not self.op.readd and node in node_list:
1737       raise errors.OpPrereqError("Node %s is already in the configuration" %
1738                                  node)
1739     elif self.op.readd and node not in node_list:
1740       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1741
1742     for existing_node_name in node_list:
1743       existing_node = cfg.GetNodeInfo(existing_node_name)
1744
1745       if self.op.readd and node == existing_node_name:
1746         if (existing_node.primary_ip != primary_ip or
1747             existing_node.secondary_ip != secondary_ip):
1748           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1749                                      " address configuration as before")
1750         continue
1751
1752       if (existing_node.primary_ip == primary_ip or
1753           existing_node.secondary_ip == primary_ip or
1754           existing_node.primary_ip == secondary_ip or
1755           existing_node.secondary_ip == secondary_ip):
1756         raise errors.OpPrereqError("New node ip address(es) conflict with"
1757                                    " existing node %s" % existing_node.name)
1758
1759     # check that the type of the node (single versus dual homed) is the
1760     # same as for the master
1761     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1762     master_singlehomed = myself.secondary_ip == myself.primary_ip
1763     newbie_singlehomed = secondary_ip == primary_ip
1764     if master_singlehomed != newbie_singlehomed:
1765       if master_singlehomed:
1766         raise errors.OpPrereqError("The master has no private ip but the"
1767                                    " new node has one")
1768       else:
1769         raise errors.OpPrereqError("The master has a private ip but the"
1770                                    " new node doesn't have one")
1771
1772     # checks reachablity
1773     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1774       raise errors.OpPrereqError("Node not reachable by ping")
1775
1776     if not newbie_singlehomed:
1777       # check reachability from my secondary ip to newbie's secondary ip
1778       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1779                            source=myself.secondary_ip):
1780         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1781                                    " based ping to noded port")
1782
1783     self.new_node = objects.Node(name=node,
1784                                  primary_ip=primary_ip,
1785                                  secondary_ip=secondary_ip)
1786
1787   def Exec(self, feedback_fn):
1788     """Adds the new node to the cluster.
1789
1790     """
1791     new_node = self.new_node
1792     node = new_node.name
1793
1794     # check connectivity
1795     result = self.rpc.call_version([node])[node]
1796     if result:
1797       if constants.PROTOCOL_VERSION == result:
1798         logging.info("Communication to node %s fine, sw version %s match",
1799                      node, result)
1800       else:
1801         raise errors.OpExecError("Version mismatch master version %s,"
1802                                  " node version %s" %
1803                                  (constants.PROTOCOL_VERSION, result))
1804     else:
1805       raise errors.OpExecError("Cannot get version from the new node")
1806
1807     # setup ssh on node
1808     logging.info("Copy ssh key to node %s", node)
1809     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1810     keyarray = []
1811     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1812                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1813                 priv_key, pub_key]
1814
1815     for i in keyfiles:
1816       f = open(i, 'r')
1817       try:
1818         keyarray.append(f.read())
1819       finally:
1820         f.close()
1821
1822     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1823                                     keyarray[2],
1824                                     keyarray[3], keyarray[4], keyarray[5])
1825
1826     if not result:
1827       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1828
1829     # Add node to our /etc/hosts, and add key to known_hosts
1830     utils.AddHostToEtcHosts(new_node.name)
1831
1832     if new_node.secondary_ip != new_node.primary_ip:
1833       if not self.rpc.call_node_has_ip_address(new_node.name,
1834                                                new_node.secondary_ip):
1835         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1836                                  " you gave (%s). Please fix and re-run this"
1837                                  " command." % new_node.secondary_ip)
1838
1839     node_verify_list = [self.cfg.GetMasterNode()]
1840     node_verify_param = {
1841       'nodelist': [node],
1842       # TODO: do a node-net-test as well?
1843     }
1844
1845     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1846                                        self.cfg.GetClusterName())
1847     for verifier in node_verify_list:
1848       if not result[verifier]:
1849         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1850                                  " for remote verification" % verifier)
1851       if result[verifier]['nodelist']:
1852         for failed in result[verifier]['nodelist']:
1853           feedback_fn("ssh/hostname verification failed %s -> %s" %
1854                       (verifier, result[verifier]['nodelist'][failed]))
1855         raise errors.OpExecError("ssh/hostname verification failed.")
1856
1857     # Distribute updated /etc/hosts and known_hosts to all nodes,
1858     # including the node just added
1859     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1860     dist_nodes = self.cfg.GetNodeList()
1861     if not self.op.readd:
1862       dist_nodes.append(node)
1863     if myself.name in dist_nodes:
1864       dist_nodes.remove(myself.name)
1865
1866     logging.debug("Copying hosts and known_hosts to all nodes")
1867     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1868       result = self.rpc.call_upload_file(dist_nodes, fname)
1869       for to_node in dist_nodes:
1870         if not result[to_node]:
1871           logging.error("Copy of file %s to node %s failed", fname, to_node)
1872
1873     to_copy = []
1874     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1875       to_copy.append(constants.VNC_PASSWORD_FILE)
1876     for fname in to_copy:
1877       result = self.rpc.call_upload_file([node], fname)
1878       if not result[node]:
1879         logging.error("Could not copy file %s to node %s", fname, node)
1880
1881     if self.op.readd:
1882       self.context.ReaddNode(new_node)
1883     else:
1884       self.context.AddNode(new_node)
1885
1886
1887 class LUQueryClusterInfo(NoHooksLU):
1888   """Query cluster configuration.
1889
1890   """
1891   _OP_REQP = []
1892   REQ_MASTER = False
1893   REQ_BGL = False
1894
1895   def ExpandNames(self):
1896     self.needed_locks = {}
1897
1898   def CheckPrereq(self):
1899     """No prerequsites needed for this LU.
1900
1901     """
1902     pass
1903
1904   def Exec(self, feedback_fn):
1905     """Return cluster config.
1906
1907     """
1908     cluster = self.cfg.GetClusterInfo()
1909     result = {
1910       "software_version": constants.RELEASE_VERSION,
1911       "protocol_version": constants.PROTOCOL_VERSION,
1912       "config_version": constants.CONFIG_VERSION,
1913       "os_api_version": constants.OS_API_VERSION,
1914       "export_version": constants.EXPORT_VERSION,
1915       "architecture": (platform.architecture()[0], platform.machine()),
1916       "name": cluster.cluster_name,
1917       "master": cluster.master_node,
1918       "hypervisor_type": cluster.hypervisor,
1919       "enabled_hypervisors": cluster.enabled_hypervisors,
1920       "hvparams": cluster.hvparams,
1921       "beparams": cluster.beparams,
1922       }
1923
1924     return result
1925
1926
1927 class LUQueryConfigValues(NoHooksLU):
1928   """Return configuration values.
1929
1930   """
1931   _OP_REQP = []
1932   REQ_BGL = False
1933
1934   def ExpandNames(self):
1935     self.needed_locks = {}
1936
1937     static_fields = ["cluster_name", "master_node", "drain_flag"]
1938     _CheckOutputFields(static=static_fields,
1939                        dynamic=[],
1940                        selected=self.op.output_fields)
1941
1942   def CheckPrereq(self):
1943     """No prerequisites.
1944
1945     """
1946     pass
1947
1948   def Exec(self, feedback_fn):
1949     """Dump a representation of the cluster config to the standard output.
1950
1951     """
1952     values = []
1953     for field in self.op.output_fields:
1954       if field == "cluster_name":
1955         entry = self.cfg.GetClusterName()
1956       elif field == "master_node":
1957         entry = self.cfg.GetMasterNode()
1958       elif field == "drain_flag":
1959         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1960       else:
1961         raise errors.ParameterError(field)
1962       values.append(entry)
1963     return values
1964
1965
1966 class LUActivateInstanceDisks(NoHooksLU):
1967   """Bring up an instance's disks.
1968
1969   """
1970   _OP_REQP = ["instance_name"]
1971   REQ_BGL = False
1972
1973   def ExpandNames(self):
1974     self._ExpandAndLockInstance()
1975     self.needed_locks[locking.LEVEL_NODE] = []
1976     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1977
1978   def DeclareLocks(self, level):
1979     if level == locking.LEVEL_NODE:
1980       self._LockInstancesNodes()
1981
1982   def CheckPrereq(self):
1983     """Check prerequisites.
1984
1985     This checks that the instance is in the cluster.
1986
1987     """
1988     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1989     assert self.instance is not None, \
1990       "Cannot retrieve locked instance %s" % self.op.instance_name
1991
1992   def Exec(self, feedback_fn):
1993     """Activate the disks.
1994
1995     """
1996     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1997     if not disks_ok:
1998       raise errors.OpExecError("Cannot activate block devices")
1999
2000     return disks_info
2001
2002
2003 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2004   """Prepare the block devices for an instance.
2005
2006   This sets up the block devices on all nodes.
2007
2008   Args:
2009     instance: a ganeti.objects.Instance object
2010     ignore_secondaries: if true, errors on secondary nodes won't result
2011                         in an error return from the function
2012
2013   Returns:
2014     false if the operation failed
2015     list of (host, instance_visible_name, node_visible_name) if the operation
2016          suceeded with the mapping from node devices to instance devices
2017   """
2018   device_info = []
2019   disks_ok = True
2020   iname = instance.name
2021   # With the two passes mechanism we try to reduce the window of
2022   # opportunity for the race condition of switching DRBD to primary
2023   # before handshaking occured, but we do not eliminate it
2024
2025   # The proper fix would be to wait (with some limits) until the
2026   # connection has been made and drbd transitions from WFConnection
2027   # into any other network-connected state (Connected, SyncTarget,
2028   # SyncSource, etc.)
2029
2030   # 1st pass, assemble on all nodes in secondary mode
2031   for inst_disk in instance.disks:
2032     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2033       lu.cfg.SetDiskID(node_disk, node)
2034       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2035       if not result:
2036         logging.error("Could not prepare block device %s on node %s"
2037                       " (is_primary=False, pass=1)", inst_disk.iv_name, node)
2038         if not ignore_secondaries:
2039           disks_ok = False
2040
2041   # FIXME: race condition on drbd migration to primary
2042
2043   # 2nd pass, do only the primary node
2044   for inst_disk in instance.disks:
2045     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2046       if node != instance.primary_node:
2047         continue
2048       lu.cfg.SetDiskID(node_disk, node)
2049       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2050       if not result:
2051         logging.error("Could not prepare block device %s on node %s"
2052                       " (is_primary=True, pass=2)", inst_disk.iv_name, node)
2053         disks_ok = False
2054     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2055
2056   # leave the disks configured for the primary node
2057   # this is a workaround that would be fixed better by
2058   # improving the logical/physical id handling
2059   for disk in instance.disks:
2060     lu.cfg.SetDiskID(disk, instance.primary_node)
2061
2062   return disks_ok, device_info
2063
2064
2065 def _StartInstanceDisks(lu, instance, force):
2066   """Start the disks of an instance.
2067
2068   """
2069   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2070                                            ignore_secondaries=force)
2071   if not disks_ok:
2072     _ShutdownInstanceDisks(lu, instance)
2073     if force is not None and not force:
2074       logging.error("If the message above refers to a secondary node,"
2075                     " you can retry the operation using '--force'.")
2076     raise errors.OpExecError("Disk consistency error")
2077
2078
2079 class LUDeactivateInstanceDisks(NoHooksLU):
2080   """Shutdown an instance's disks.
2081
2082   """
2083   _OP_REQP = ["instance_name"]
2084   REQ_BGL = False
2085
2086   def ExpandNames(self):
2087     self._ExpandAndLockInstance()
2088     self.needed_locks[locking.LEVEL_NODE] = []
2089     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2090
2091   def DeclareLocks(self, level):
2092     if level == locking.LEVEL_NODE:
2093       self._LockInstancesNodes()
2094
2095   def CheckPrereq(self):
2096     """Check prerequisites.
2097
2098     This checks that the instance is in the cluster.
2099
2100     """
2101     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2102     assert self.instance is not None, \
2103       "Cannot retrieve locked instance %s" % self.op.instance_name
2104
2105   def Exec(self, feedback_fn):
2106     """Deactivate the disks
2107
2108     """
2109     instance = self.instance
2110     _SafeShutdownInstanceDisks(self, instance)
2111
2112
2113 def _SafeShutdownInstanceDisks(lu, instance):
2114   """Shutdown block devices of an instance.
2115
2116   This function checks if an instance is running, before calling
2117   _ShutdownInstanceDisks.
2118
2119   """
2120   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2121                                       [instance.hypervisor])
2122   ins_l = ins_l[instance.primary_node]
2123   if not type(ins_l) is list:
2124     raise errors.OpExecError("Can't contact node '%s'" %
2125                              instance.primary_node)
2126
2127   if instance.name in ins_l:
2128     raise errors.OpExecError("Instance is running, can't shutdown"
2129                              " block devices.")
2130
2131   _ShutdownInstanceDisks(lu, instance)
2132
2133
2134 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2135   """Shutdown block devices of an instance.
2136
2137   This does the shutdown on all nodes of the instance.
2138
2139   If the ignore_primary is false, errors on the primary node are
2140   ignored.
2141
2142   """
2143   result = True
2144   for disk in instance.disks:
2145     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2146       lu.cfg.SetDiskID(top_disk, node)
2147       if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2148         logging.error("Could not shutdown block device %s on node %s",
2149                       disk.iv_name, node)
2150         if not ignore_primary or node != instance.primary_node:
2151           result = False
2152   return result
2153
2154
2155 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2156   """Checks if a node has enough free memory.
2157
2158   This function check if a given node has the needed amount of free
2159   memory. In case the node has less memory or we cannot get the
2160   information from the node, this function raise an OpPrereqError
2161   exception.
2162
2163   @type lu: C{LogicalUnit}
2164   @param lu: a logical unit from which we get configuration data
2165   @type node: C{str}
2166   @param node: the node to check
2167   @type reason: C{str}
2168   @param reason: string to use in the error message
2169   @type requested: C{int}
2170   @param requested: the amount of memory in MiB to check for
2171   @type hypervisor: C{str}
2172   @param hypervisor: the hypervisor to ask for memory stats
2173   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2174       we cannot check the node
2175
2176   """
2177   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2178   if not nodeinfo or not isinstance(nodeinfo, dict):
2179     raise errors.OpPrereqError("Could not contact node %s for resource"
2180                              " information" % (node,))
2181
2182   free_mem = nodeinfo[node].get('memory_free')
2183   if not isinstance(free_mem, int):
2184     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2185                              " was '%s'" % (node, free_mem))
2186   if requested > free_mem:
2187     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2188                              " needed %s MiB, available %s MiB" %
2189                              (node, reason, requested, free_mem))
2190
2191
2192 class LUStartupInstance(LogicalUnit):
2193   """Starts an instance.
2194
2195   """
2196   HPATH = "instance-start"
2197   HTYPE = constants.HTYPE_INSTANCE
2198   _OP_REQP = ["instance_name", "force"]
2199   REQ_BGL = False
2200
2201   def ExpandNames(self):
2202     self._ExpandAndLockInstance()
2203     self.needed_locks[locking.LEVEL_NODE] = []
2204     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2205
2206   def DeclareLocks(self, level):
2207     if level == locking.LEVEL_NODE:
2208       self._LockInstancesNodes()
2209
2210   def BuildHooksEnv(self):
2211     """Build hooks env.
2212
2213     This runs on master, primary and secondary nodes of the instance.
2214
2215     """
2216     env = {
2217       "FORCE": self.op.force,
2218       }
2219     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2220     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2221           list(self.instance.secondary_nodes))
2222     return env, nl, nl
2223
2224   def CheckPrereq(self):
2225     """Check prerequisites.
2226
2227     This checks that the instance is in the cluster.
2228
2229     """
2230     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2231     assert self.instance is not None, \
2232       "Cannot retrieve locked instance %s" % self.op.instance_name
2233
2234     bep = self.cfg.GetClusterInfo().FillBE(instance)
2235     # check bridges existance
2236     _CheckInstanceBridgesExist(self, instance)
2237
2238     _CheckNodeFreeMemory(self, instance.primary_node,
2239                          "starting instance %s" % instance.name,
2240                          bep[constants.BE_MEMORY], instance.hypervisor)
2241
2242   def Exec(self, feedback_fn):
2243     """Start the instance.
2244
2245     """
2246     instance = self.instance
2247     force = self.op.force
2248     extra_args = getattr(self.op, "extra_args", "")
2249
2250     self.cfg.MarkInstanceUp(instance.name)
2251
2252     node_current = instance.primary_node
2253
2254     _StartInstanceDisks(self, instance, force)
2255
2256     if not self.rpc.call_instance_start(node_current, instance, extra_args):
2257       _ShutdownInstanceDisks(self, instance)
2258       raise errors.OpExecError("Could not start instance")
2259
2260
2261 class LURebootInstance(LogicalUnit):
2262   """Reboot an instance.
2263
2264   """
2265   HPATH = "instance-reboot"
2266   HTYPE = constants.HTYPE_INSTANCE
2267   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2268   REQ_BGL = False
2269
2270   def ExpandNames(self):
2271     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2272                                    constants.INSTANCE_REBOOT_HARD,
2273                                    constants.INSTANCE_REBOOT_FULL]:
2274       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2275                                   (constants.INSTANCE_REBOOT_SOFT,
2276                                    constants.INSTANCE_REBOOT_HARD,
2277                                    constants.INSTANCE_REBOOT_FULL))
2278     self._ExpandAndLockInstance()
2279     self.needed_locks[locking.LEVEL_NODE] = []
2280     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2281
2282   def DeclareLocks(self, level):
2283     if level == locking.LEVEL_NODE:
2284       primary_only = not constants.INSTANCE_REBOOT_FULL
2285       self._LockInstancesNodes(primary_only=primary_only)
2286
2287   def BuildHooksEnv(self):
2288     """Build hooks env.
2289
2290     This runs on master, primary and secondary nodes of the instance.
2291
2292     """
2293     env = {
2294       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2295       }
2296     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2297     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2298           list(self.instance.secondary_nodes))
2299     return env, nl, nl
2300
2301   def CheckPrereq(self):
2302     """Check prerequisites.
2303
2304     This checks that the instance is in the cluster.
2305
2306     """
2307     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2308     assert self.instance is not None, \
2309       "Cannot retrieve locked instance %s" % self.op.instance_name
2310
2311     # check bridges existance
2312     _CheckInstanceBridgesExist(self, instance)
2313
2314   def Exec(self, feedback_fn):
2315     """Reboot the instance.
2316
2317     """
2318     instance = self.instance
2319     ignore_secondaries = self.op.ignore_secondaries
2320     reboot_type = self.op.reboot_type
2321     extra_args = getattr(self.op, "extra_args", "")
2322
2323     node_current = instance.primary_node
2324
2325     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2326                        constants.INSTANCE_REBOOT_HARD]:
2327       if not self.rpc.call_instance_reboot(node_current, instance,
2328                                            reboot_type, extra_args):
2329         raise errors.OpExecError("Could not reboot instance")
2330     else:
2331       if not self.rpc.call_instance_shutdown(node_current, instance):
2332         raise errors.OpExecError("could not shutdown instance for full reboot")
2333       _ShutdownInstanceDisks(self, instance)
2334       _StartInstanceDisks(self, instance, ignore_secondaries)
2335       if not self.rpc.call_instance_start(node_current, instance, extra_args):
2336         _ShutdownInstanceDisks(self, instance)
2337         raise errors.OpExecError("Could not start instance for full reboot")
2338
2339     self.cfg.MarkInstanceUp(instance.name)
2340
2341
2342 class LUShutdownInstance(LogicalUnit):
2343   """Shutdown an instance.
2344
2345   """
2346   HPATH = "instance-stop"
2347   HTYPE = constants.HTYPE_INSTANCE
2348   _OP_REQP = ["instance_name"]
2349   REQ_BGL = False
2350
2351   def ExpandNames(self):
2352     self._ExpandAndLockInstance()
2353     self.needed_locks[locking.LEVEL_NODE] = []
2354     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2355
2356   def DeclareLocks(self, level):
2357     if level == locking.LEVEL_NODE:
2358       self._LockInstancesNodes()
2359
2360   def BuildHooksEnv(self):
2361     """Build hooks env.
2362
2363     This runs on master, primary and secondary nodes of the instance.
2364
2365     """
2366     env = _BuildInstanceHookEnvByObject(self, self.instance)
2367     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2368           list(self.instance.secondary_nodes))
2369     return env, nl, nl
2370
2371   def CheckPrereq(self):
2372     """Check prerequisites.
2373
2374     This checks that the instance is in the cluster.
2375
2376     """
2377     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2378     assert self.instance is not None, \
2379       "Cannot retrieve locked instance %s" % self.op.instance_name
2380
2381   def Exec(self, feedback_fn):
2382     """Shutdown the instance.
2383
2384     """
2385     instance = self.instance
2386     node_current = instance.primary_node
2387     self.cfg.MarkInstanceDown(instance.name)
2388     if not self.rpc.call_instance_shutdown(node_current, instance):
2389       logging.error("Could not shutdown instance")
2390
2391     _ShutdownInstanceDisks(self, instance)
2392
2393
2394 class LUReinstallInstance(LogicalUnit):
2395   """Reinstall an instance.
2396
2397   """
2398   HPATH = "instance-reinstall"
2399   HTYPE = constants.HTYPE_INSTANCE
2400   _OP_REQP = ["instance_name"]
2401   REQ_BGL = False
2402
2403   def ExpandNames(self):
2404     self._ExpandAndLockInstance()
2405     self.needed_locks[locking.LEVEL_NODE] = []
2406     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2407
2408   def DeclareLocks(self, level):
2409     if level == locking.LEVEL_NODE:
2410       self._LockInstancesNodes()
2411
2412   def BuildHooksEnv(self):
2413     """Build hooks env.
2414
2415     This runs on master, primary and secondary nodes of the instance.
2416
2417     """
2418     env = _BuildInstanceHookEnvByObject(self, self.instance)
2419     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2420           list(self.instance.secondary_nodes))
2421     return env, nl, nl
2422
2423   def CheckPrereq(self):
2424     """Check prerequisites.
2425
2426     This checks that the instance is in the cluster and is not running.
2427
2428     """
2429     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2430     assert instance is not None, \
2431       "Cannot retrieve locked instance %s" % self.op.instance_name
2432
2433     if instance.disk_template == constants.DT_DISKLESS:
2434       raise errors.OpPrereqError("Instance '%s' has no disks" %
2435                                  self.op.instance_name)
2436     if instance.status != "down":
2437       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2438                                  self.op.instance_name)
2439     remote_info = self.rpc.call_instance_info(instance.primary_node,
2440                                               instance.name,
2441                                               instance.hypervisor)
2442     if remote_info:
2443       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2444                                  (self.op.instance_name,
2445                                   instance.primary_node))
2446
2447     self.op.os_type = getattr(self.op, "os_type", None)
2448     if self.op.os_type is not None:
2449       # OS verification
2450       pnode = self.cfg.GetNodeInfo(
2451         self.cfg.ExpandNodeName(instance.primary_node))
2452       if pnode is None:
2453         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2454                                    self.op.pnode)
2455       os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2456       if not os_obj:
2457         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2458                                    " primary node"  % self.op.os_type)
2459
2460     self.instance = instance
2461
2462   def Exec(self, feedback_fn):
2463     """Reinstall the instance.
2464
2465     """
2466     inst = self.instance
2467
2468     if self.op.os_type is not None:
2469       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2470       inst.os = self.op.os_type
2471       self.cfg.Update(inst)
2472
2473     _StartInstanceDisks(self, inst, None)
2474     try:
2475       feedback_fn("Running the instance OS create scripts...")
2476       if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2477                                            "sda", "sdb"):
2478         raise errors.OpExecError("Could not install OS for instance %s"
2479                                  " on node %s" %
2480                                  (inst.name, inst.primary_node))
2481     finally:
2482       _ShutdownInstanceDisks(self, inst)
2483
2484
2485 class LURenameInstance(LogicalUnit):
2486   """Rename an instance.
2487
2488   """
2489   HPATH = "instance-rename"
2490   HTYPE = constants.HTYPE_INSTANCE
2491   _OP_REQP = ["instance_name", "new_name"]
2492
2493   def BuildHooksEnv(self):
2494     """Build hooks env.
2495
2496     This runs on master, primary and secondary nodes of the instance.
2497
2498     """
2499     env = _BuildInstanceHookEnvByObject(self, self.instance)
2500     env["INSTANCE_NEW_NAME"] = self.op.new_name
2501     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2502           list(self.instance.secondary_nodes))
2503     return env, nl, nl
2504
2505   def CheckPrereq(self):
2506     """Check prerequisites.
2507
2508     This checks that the instance is in the cluster and is not running.
2509
2510     """
2511     instance = self.cfg.GetInstanceInfo(
2512       self.cfg.ExpandInstanceName(self.op.instance_name))
2513     if instance is None:
2514       raise errors.OpPrereqError("Instance '%s' not known" %
2515                                  self.op.instance_name)
2516     if instance.status != "down":
2517       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2518                                  self.op.instance_name)
2519     remote_info = self.rpc.call_instance_info(instance.primary_node,
2520                                               instance.name,
2521                                               instance.hypervisor)
2522     if remote_info:
2523       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2524                                  (self.op.instance_name,
2525                                   instance.primary_node))
2526     self.instance = instance
2527
2528     # new name verification
2529     name_info = utils.HostInfo(self.op.new_name)
2530
2531     self.op.new_name = new_name = name_info.name
2532     instance_list = self.cfg.GetInstanceList()
2533     if new_name in instance_list:
2534       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2535                                  new_name)
2536
2537     if not getattr(self.op, "ignore_ip", False):
2538       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2539         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2540                                    (name_info.ip, new_name))
2541
2542
2543   def Exec(self, feedback_fn):
2544     """Reinstall the instance.
2545
2546     """
2547     inst = self.instance
2548     old_name = inst.name
2549
2550     if inst.disk_template == constants.DT_FILE:
2551       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2552
2553     self.cfg.RenameInstance(inst.name, self.op.new_name)
2554     # Change the instance lock. This is definitely safe while we hold the BGL
2555     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2556     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2557
2558     # re-read the instance from the configuration after rename
2559     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2560
2561     if inst.disk_template == constants.DT_FILE:
2562       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2563       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2564                                                      old_file_storage_dir,
2565                                                      new_file_storage_dir)
2566
2567       if not result:
2568         raise errors.OpExecError("Could not connect to node '%s' to rename"
2569                                  " directory '%s' to '%s' (but the instance"
2570                                  " has been renamed in Ganeti)" % (
2571                                  inst.primary_node, old_file_storage_dir,
2572                                  new_file_storage_dir))
2573
2574       if not result[0]:
2575         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2576                                  " (but the instance has been renamed in"
2577                                  " Ganeti)" % (old_file_storage_dir,
2578                                                new_file_storage_dir))
2579
2580     _StartInstanceDisks(self, inst, None)
2581     try:
2582       if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2583                                                old_name):
2584         msg = ("Could not run OS rename script for instance %s on node %s"
2585                " (but the instance has been renamed in Ganeti)" %
2586                (inst.name, inst.primary_node))
2587         logging.error(msg)
2588     finally:
2589       _ShutdownInstanceDisks(self, inst)
2590
2591
2592 class LURemoveInstance(LogicalUnit):
2593   """Remove an instance.
2594
2595   """
2596   HPATH = "instance-remove"
2597   HTYPE = constants.HTYPE_INSTANCE
2598   _OP_REQP = ["instance_name", "ignore_failures"]
2599   REQ_BGL = False
2600
2601   def ExpandNames(self):
2602     self._ExpandAndLockInstance()
2603     self.needed_locks[locking.LEVEL_NODE] = []
2604     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2605
2606   def DeclareLocks(self, level):
2607     if level == locking.LEVEL_NODE:
2608       self._LockInstancesNodes()
2609
2610   def BuildHooksEnv(self):
2611     """Build hooks env.
2612
2613     This runs on master, primary and secondary nodes of the instance.
2614
2615     """
2616     env = _BuildInstanceHookEnvByObject(self, self.instance)
2617     nl = [self.cfg.GetMasterNode()]
2618     return env, nl, nl
2619
2620   def CheckPrereq(self):
2621     """Check prerequisites.
2622
2623     This checks that the instance is in the cluster.
2624
2625     """
2626     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2627     assert self.instance is not None, \
2628       "Cannot retrieve locked instance %s" % self.op.instance_name
2629
2630   def Exec(self, feedback_fn):
2631     """Remove the instance.
2632
2633     """
2634     instance = self.instance
2635     logging.info("Shutting down instance %s on node %s",
2636                  instance.name, instance.primary_node)
2637
2638     if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2639       if self.op.ignore_failures:
2640         feedback_fn("Warning: can't shutdown instance")
2641       else:
2642         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2643                                  (instance.name, instance.primary_node))
2644
2645     logging.info("Removing block devices for instance %s", instance.name)
2646
2647     if not _RemoveDisks(self, instance):
2648       if self.op.ignore_failures:
2649         feedback_fn("Warning: can't remove instance's disks")
2650       else:
2651         raise errors.OpExecError("Can't remove instance's disks")
2652
2653     logging.info("Removing instance %s out of cluster config", instance.name)
2654
2655     self.cfg.RemoveInstance(instance.name)
2656     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2657
2658
2659 class LUQueryInstances(NoHooksLU):
2660   """Logical unit for querying instances.
2661
2662   """
2663   _OP_REQP = ["output_fields", "names"]
2664   REQ_BGL = False
2665
2666   def ExpandNames(self):
2667     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2668     hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2669     bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2670     self.static_fields = frozenset([
2671       "name", "os", "pnode", "snodes",
2672       "admin_state", "admin_ram",
2673       "disk_template", "ip", "mac", "bridge",
2674       "sda_size", "sdb_size", "vcpus", "tags",
2675       "network_port",
2676       "serial_no", "hypervisor", "hvparams",
2677       ] + hvp + bep)
2678
2679     _CheckOutputFields(static=self.static_fields,
2680                        dynamic=self.dynamic_fields,
2681                        selected=self.op.output_fields)
2682
2683     self.needed_locks = {}
2684     self.share_locks[locking.LEVEL_INSTANCE] = 1
2685     self.share_locks[locking.LEVEL_NODE] = 1
2686
2687     if self.op.names:
2688       self.wanted = _GetWantedInstances(self, self.op.names)
2689     else:
2690       self.wanted = locking.ALL_SET
2691
2692     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2693     if self.do_locking:
2694       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2695       self.needed_locks[locking.LEVEL_NODE] = []
2696       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2697
2698   def DeclareLocks(self, level):
2699     if level == locking.LEVEL_NODE and self.do_locking:
2700       self._LockInstancesNodes()
2701
2702   def CheckPrereq(self):
2703     """Check prerequisites.
2704
2705     """
2706     pass
2707
2708   def Exec(self, feedback_fn):
2709     """Computes the list of nodes and their attributes.
2710
2711     """
2712     all_info = self.cfg.GetAllInstancesInfo()
2713     if self.do_locking:
2714       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2715     elif self.wanted != locking.ALL_SET:
2716       instance_names = self.wanted
2717       missing = set(instance_names).difference(all_info.keys())
2718       if missing:
2719         raise errors.OpExecError(
2720           "Some instances were removed before retrieving their data: %s"
2721           % missing)
2722     else:
2723       instance_names = all_info.keys()
2724
2725     instance_names = utils.NiceSort(instance_names)
2726     instance_list = [all_info[iname] for iname in instance_names]
2727
2728     # begin data gathering
2729
2730     nodes = frozenset([inst.primary_node for inst in instance_list])
2731     hv_list = list(set([inst.hypervisor for inst in instance_list]))
2732
2733     bad_nodes = []
2734     if self.dynamic_fields.intersection(self.op.output_fields):
2735       live_data = {}
2736       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2737       for name in nodes:
2738         result = node_data[name]
2739         if result:
2740           live_data.update(result)
2741         elif result == False:
2742           bad_nodes.append(name)
2743         # else no instance is alive
2744     else:
2745       live_data = dict([(name, {}) for name in instance_names])
2746
2747     # end data gathering
2748
2749     HVPREFIX = "hv/"
2750     BEPREFIX = "be/"
2751     output = []
2752     for instance in instance_list:
2753       iout = []
2754       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2755       i_be = self.cfg.GetClusterInfo().FillBE(instance)
2756       for field in self.op.output_fields:
2757         if field == "name":
2758           val = instance.name
2759         elif field == "os":
2760           val = instance.os
2761         elif field == "pnode":
2762           val = instance.primary_node
2763         elif field == "snodes":
2764           val = list(instance.secondary_nodes)
2765         elif field == "admin_state":
2766           val = (instance.status != "down")
2767         elif field == "oper_state":
2768           if instance.primary_node in bad_nodes:
2769             val = None
2770           else:
2771             val = bool(live_data.get(instance.name))
2772         elif field == "status":
2773           if instance.primary_node in bad_nodes:
2774             val = "ERROR_nodedown"
2775           else:
2776             running = bool(live_data.get(instance.name))
2777             if running:
2778               if instance.status != "down":
2779                 val = "running"
2780               else:
2781                 val = "ERROR_up"
2782             else:
2783               if instance.status != "down":
2784                 val = "ERROR_down"
2785               else:
2786                 val = "ADMIN_down"
2787         elif field == "oper_ram":
2788           if instance.primary_node in bad_nodes:
2789             val = None
2790           elif instance.name in live_data:
2791             val = live_data[instance.name].get("memory", "?")
2792           else:
2793             val = "-"
2794         elif field == "disk_template":
2795           val = instance.disk_template
2796         elif field == "ip":
2797           val = instance.nics[0].ip
2798         elif field == "bridge":
2799           val = instance.nics[0].bridge
2800         elif field == "mac":
2801           val = instance.nics[0].mac
2802         elif field == "sda_size" or field == "sdb_size":
2803           disk = instance.FindDisk(field[:3])
2804           if disk is None:
2805             val = None
2806           else:
2807             val = disk.size
2808         elif field == "tags":
2809           val = list(instance.GetTags())
2810         elif field == "serial_no":
2811           val = instance.serial_no
2812         elif field == "network_port":
2813           val = instance.network_port
2814         elif field == "hypervisor":
2815           val = instance.hypervisor
2816         elif field == "hvparams":
2817           val = i_hv
2818         elif (field.startswith(HVPREFIX) and
2819               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2820           val = i_hv.get(field[len(HVPREFIX):], None)
2821         elif field == "beparams":
2822           val = i_be
2823         elif (field.startswith(BEPREFIX) and
2824               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2825           val = i_be.get(field[len(BEPREFIX):], None)
2826         else:
2827           raise errors.ParameterError(field)
2828         iout.append(val)
2829       output.append(iout)
2830
2831     return output
2832
2833
2834 class LUFailoverInstance(LogicalUnit):
2835   """Failover an instance.
2836
2837   """
2838   HPATH = "instance-failover"
2839   HTYPE = constants.HTYPE_INSTANCE
2840   _OP_REQP = ["instance_name", "ignore_consistency"]
2841   REQ_BGL = False
2842
2843   def ExpandNames(self):
2844     self._ExpandAndLockInstance()
2845     self.needed_locks[locking.LEVEL_NODE] = []
2846     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2847
2848   def DeclareLocks(self, level):
2849     if level == locking.LEVEL_NODE:
2850       self._LockInstancesNodes()
2851
2852   def BuildHooksEnv(self):
2853     """Build hooks env.
2854
2855     This runs on master, primary and secondary nodes of the instance.
2856
2857     """
2858     env = {
2859       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2860       }
2861     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2862     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2863     return env, nl, nl
2864
2865   def CheckPrereq(self):
2866     """Check prerequisites.
2867
2868     This checks that the instance is in the cluster.
2869
2870     """
2871     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2872     assert self.instance is not None, \
2873       "Cannot retrieve locked instance %s" % self.op.instance_name
2874
2875     bep = self.cfg.GetClusterInfo().FillBE(instance)
2876     if instance.disk_template not in constants.DTS_NET_MIRROR:
2877       raise errors.OpPrereqError("Instance's disk layout is not"
2878                                  " network mirrored, cannot failover.")
2879
2880     secondary_nodes = instance.secondary_nodes
2881     if not secondary_nodes:
2882       raise errors.ProgrammerError("no secondary node but using "
2883                                    "a mirrored disk template")
2884
2885     target_node = secondary_nodes[0]
2886     # check memory requirements on the secondary node
2887     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2888                          instance.name, bep[constants.BE_MEMORY],
2889                          instance.hypervisor)
2890
2891     # check bridge existance
2892     brlist = [nic.bridge for nic in instance.nics]
2893     if not self.rpc.call_bridges_exist(target_node, brlist):
2894       raise errors.OpPrereqError("One or more target bridges %s does not"
2895                                  " exist on destination node '%s'" %
2896                                  (brlist, target_node))
2897
2898   def Exec(self, feedback_fn):
2899     """Failover an instance.
2900
2901     The failover is done by shutting it down on its present node and
2902     starting it on the secondary.
2903
2904     """
2905     instance = self.instance
2906
2907     source_node = instance.primary_node
2908     target_node = instance.secondary_nodes[0]
2909
2910     feedback_fn("* checking disk consistency between source and target")
2911     for dev in instance.disks:
2912       # for drbd, these are drbd over lvm
2913       if not _CheckDiskConsistency(self, dev, target_node, False):
2914         if instance.status == "up" and not self.op.ignore_consistency:
2915           raise errors.OpExecError("Disk %s is degraded on target node,"
2916                                    " aborting failover." % dev.iv_name)
2917
2918     feedback_fn("* shutting down instance on source node")
2919     logging.info("Shutting down instance %s on node %s",
2920                  instance.name, source_node)
2921
2922     if not self.rpc.call_instance_shutdown(source_node, instance):
2923       if self.op.ignore_consistency:
2924         logging.error("Could not shutdown instance %s on node %s. Proceeding"
2925                       " anyway. Please make sure node %s is down",
2926                       instance.name, source_node, source_node)
2927       else:
2928         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2929                                  (instance.name, source_node))
2930
2931     feedback_fn("* deactivating the instance's disks on source node")
2932     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2933       raise errors.OpExecError("Can't shut down the instance's disks.")
2934
2935     instance.primary_node = target_node
2936     # distribute new instance config to the other nodes
2937     self.cfg.Update(instance)
2938
2939     # Only start the instance if it's marked as up
2940     if instance.status == "up":
2941       feedback_fn("* activating the instance's disks on target node")
2942       logging.info("Starting instance %s on node %s",
2943                    instance.name, target_node)
2944
2945       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2946                                                ignore_secondaries=True)
2947       if not disks_ok:
2948         _ShutdownInstanceDisks(self, instance)
2949         raise errors.OpExecError("Can't activate the instance's disks")
2950
2951       feedback_fn("* starting the instance on the target node")
2952       if not self.rpc.call_instance_start(target_node, instance, None):
2953         _ShutdownInstanceDisks(self, instance)
2954         raise errors.OpExecError("Could not start instance %s on node %s." %
2955                                  (instance.name, target_node))
2956
2957
2958 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2959   """Create a tree of block devices on the primary node.
2960
2961   This always creates all devices.
2962
2963   """
2964   if device.children:
2965     for child in device.children:
2966       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2967         return False
2968
2969   lu.cfg.SetDiskID(device, node)
2970   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2971                                        instance.name, True, info)
2972   if not new_id:
2973     return False
2974   if device.physical_id is None:
2975     device.physical_id = new_id
2976   return True
2977
2978
2979 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2980   """Create a tree of block devices on a secondary node.
2981
2982   If this device type has to be created on secondaries, create it and
2983   all its children.
2984
2985   If not, just recurse to children keeping the same 'force' value.
2986
2987   """
2988   if device.CreateOnSecondary():
2989     force = True
2990   if device.children:
2991     for child in device.children:
2992       if not _CreateBlockDevOnSecondary(lu, node, instance,
2993                                         child, force, info):
2994         return False
2995
2996   if not force:
2997     return True
2998   lu.cfg.SetDiskID(device, node)
2999   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3000                                        instance.name, False, info)
3001   if not new_id:
3002     return False
3003   if device.physical_id is None:
3004     device.physical_id = new_id
3005   return True
3006
3007
3008 def _GenerateUniqueNames(lu, exts):
3009   """Generate a suitable LV name.
3010
3011   This will generate a logical volume name for the given instance.
3012
3013   """
3014   results = []
3015   for val in exts:
3016     new_id = lu.cfg.GenerateUniqueID()
3017     results.append("%s%s" % (new_id, val))
3018   return results
3019
3020
3021 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3022                          p_minor, s_minor):
3023   """Generate a drbd8 device complete with its children.
3024
3025   """
3026   port = lu.cfg.AllocatePort()
3027   vgname = lu.cfg.GetVGName()
3028   shared_secret = lu.cfg.GenerateDRBDSecret()
3029   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3030                           logical_id=(vgname, names[0]))
3031   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3032                           logical_id=(vgname, names[1]))
3033   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3034                           logical_id=(primary, secondary, port,
3035                                       p_minor, s_minor,
3036                                       shared_secret),
3037                           children=[dev_data, dev_meta],
3038                           iv_name=iv_name)
3039   return drbd_dev
3040
3041
3042 def _GenerateDiskTemplate(lu, template_name,
3043                           instance_name, primary_node,
3044                           secondary_nodes, disk_sz, swap_sz,
3045                           file_storage_dir, file_driver):
3046   """Generate the entire disk layout for a given template type.
3047
3048   """
3049   #TODO: compute space requirements
3050
3051   vgname = lu.cfg.GetVGName()
3052   if template_name == constants.DT_DISKLESS:
3053     disks = []
3054   elif template_name == constants.DT_PLAIN:
3055     if len(secondary_nodes) != 0:
3056       raise errors.ProgrammerError("Wrong template configuration")
3057
3058     names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3059     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3060                            logical_id=(vgname, names[0]),
3061                            iv_name = "sda")
3062     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3063                            logical_id=(vgname, names[1]),
3064                            iv_name = "sdb")
3065     disks = [sda_dev, sdb_dev]
3066   elif template_name == constants.DT_DRBD8:
3067     if len(secondary_nodes) != 1:
3068       raise errors.ProgrammerError("Wrong template configuration")
3069     remote_node = secondary_nodes[0]
3070     (minor_pa, minor_pb,
3071      minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3072       [primary_node, primary_node, remote_node, remote_node], instance_name)
3073
3074     names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3075                                       ".sdb_data", ".sdb_meta"])
3076     drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3077                                         disk_sz, names[0:2], "sda",
3078                                         minor_pa, minor_sa)
3079     drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3080                                         swap_sz, names[2:4], "sdb",
3081                                         minor_pb, minor_sb)
3082     disks = [drbd_sda_dev, drbd_sdb_dev]
3083   elif template_name == constants.DT_FILE:
3084     if len(secondary_nodes) != 0:
3085       raise errors.ProgrammerError("Wrong template configuration")
3086
3087     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3088                                 iv_name="sda", logical_id=(file_driver,
3089                                 "%s/sda" % file_storage_dir))
3090     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3091                                 iv_name="sdb", logical_id=(file_driver,
3092                                 "%s/sdb" % file_storage_dir))
3093     disks = [file_sda_dev, file_sdb_dev]
3094   else:
3095     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3096   return disks
3097
3098
3099 def _GetInstanceInfoText(instance):
3100   """Compute that text that should be added to the disk's metadata.
3101
3102   """
3103   return "originstname+%s" % instance.name
3104
3105
3106 def _CreateDisks(lu, instance):
3107   """Create all disks for an instance.
3108
3109   This abstracts away some work from AddInstance.
3110
3111   Args:
3112     instance: the instance object
3113
3114   Returns:
3115     True or False showing the success of the creation process
3116
3117   """
3118   info = _GetInstanceInfoText(instance)
3119
3120   if instance.disk_template == constants.DT_FILE:
3121     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3122     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3123                                                  file_storage_dir)
3124
3125     if not result:
3126       logging.error("Could not connect to node '%s'", instance.primary_node)
3127       return False
3128
3129     if not result[0]:
3130       logging.error("Failed to create directory '%s'", file_storage_dir)
3131       return False
3132
3133   for device in instance.disks:
3134     logging.info("Creating volume %s for instance %s",
3135                  device.iv_name, instance.name)
3136     #HARDCODE
3137     for secondary_node in instance.secondary_nodes:
3138       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3139                                         device, False, info):
3140         logging.error("Failed to create volume %s (%s) on secondary node %s!",
3141                       device.iv_name, device, secondary_node)
3142         return False
3143     #HARDCODE
3144     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3145                                     instance, device, info):
3146       logging.error("Failed to create volume %s on primary!", device.iv_name)
3147       return False
3148
3149   return True
3150
3151
3152 def _RemoveDisks(lu, instance):
3153   """Remove all disks for an instance.
3154
3155   This abstracts away some work from `AddInstance()` and
3156   `RemoveInstance()`. Note that in case some of the devices couldn't
3157   be removed, the removal will continue with the other ones (compare
3158   with `_CreateDisks()`).
3159
3160   Args:
3161     instance: the instance object
3162
3163   Returns:
3164     True or False showing the success of the removal proces
3165
3166   """
3167   logging.info("Removing block devices for instance %s", instance.name)
3168
3169   result = True
3170   for device in instance.disks:
3171     for node, disk in device.ComputeNodeTree(instance.primary_node):
3172       lu.cfg.SetDiskID(disk, node)
3173       if not lu.rpc.call_blockdev_remove(node, disk):
3174         logging.error("Could not remove block device %s on node %s,"
3175                       " continuing anyway", device.iv_name, node)
3176         result = False
3177
3178   if instance.disk_template == constants.DT_FILE:
3179     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3180     if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3181                                                file_storage_dir):
3182       logging.error("Could not remove directory '%s'", file_storage_dir)
3183       result = False
3184
3185   return result
3186
3187
3188 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3189   """Compute disk size requirements in the volume group
3190
3191   This is currently hard-coded for the two-drive layout.
3192
3193   """
3194   # Required free disk space as a function of disk and swap space
3195   req_size_dict = {
3196     constants.DT_DISKLESS: None,
3197     constants.DT_PLAIN: disk_size + swap_size,
3198     # 256 MB are added for drbd metadata, 128MB for each drbd device
3199     constants.DT_DRBD8: disk_size + swap_size + 256,
3200     constants.DT_FILE: None,
3201   }
3202
3203   if disk_template not in req_size_dict:
3204     raise errors.ProgrammerError("Disk template '%s' size requirement"
3205                                  " is unknown" %  disk_template)
3206
3207   return req_size_dict[disk_template]
3208
3209
3210 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3211   """Hypervisor parameter validation.
3212
3213   This function abstract the hypervisor parameter validation to be
3214   used in both instance create and instance modify.
3215
3216   @type lu: L{LogicalUnit}
3217   @param lu: the logical unit for which we check
3218   @type nodenames: list
3219   @param nodenames: the list of nodes on which we should check
3220   @type hvname: string
3221   @param hvname: the name of the hypervisor we should use
3222   @type hvparams: dict
3223   @param hvparams: the parameters which we need to check
3224   @raise errors.OpPrereqError: if the parameters are not valid
3225
3226   """
3227   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3228                                                   hvname,
3229                                                   hvparams)
3230   for node in nodenames:
3231     info = hvinfo.get(node, None)
3232     if not info or not isinstance(info, (tuple, list)):
3233       raise errors.OpPrereqError("Cannot get current information"
3234                                  " from node '%s' (%s)" % (node, info))
3235     if not info[0]:
3236       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3237                                  " %s" % info[1])
3238
3239
3240 class LUCreateInstance(LogicalUnit):
3241   """Create an instance.
3242
3243   """
3244   HPATH = "instance-add"
3245   HTYPE = constants.HTYPE_INSTANCE
3246   _OP_REQP = ["instance_name", "disk_size",
3247               "disk_template", "swap_size", "mode", "start",
3248               "wait_for_sync", "ip_check", "mac",
3249               "hvparams", "beparams"]
3250   REQ_BGL = False
3251
3252   def _ExpandNode(self, node):
3253     """Expands and checks one node name.
3254
3255     """
3256     node_full = self.cfg.ExpandNodeName(node)
3257     if node_full is None:
3258       raise errors.OpPrereqError("Unknown node %s" % node)
3259     return node_full
3260
3261   def ExpandNames(self):
3262     """ExpandNames for CreateInstance.
3263
3264     Figure out the right locks for instance creation.
3265
3266     """
3267     self.needed_locks = {}
3268
3269     # set optional parameters to none if they don't exist
3270     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3271       if not hasattr(self.op, attr):
3272         setattr(self.op, attr, None)
3273
3274     # cheap checks, mostly valid constants given
3275
3276     # verify creation mode
3277     if self.op.mode not in (constants.INSTANCE_CREATE,
3278                             constants.INSTANCE_IMPORT):
3279       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3280                                  self.op.mode)
3281
3282     # disk template and mirror node verification
3283     if self.op.disk_template not in constants.DISK_TEMPLATES:
3284       raise errors.OpPrereqError("Invalid disk template name")
3285
3286     if self.op.hypervisor is None:
3287       self.op.hypervisor = self.cfg.GetHypervisorType()
3288
3289     cluster = self.cfg.GetClusterInfo()
3290     enabled_hvs = cluster.enabled_hypervisors
3291     if self.op.hypervisor not in enabled_hvs:
3292       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3293                                  " cluster (%s)" % (self.op.hypervisor,
3294                                   ",".join(enabled_hvs)))
3295
3296     # check hypervisor parameter syntax (locally)
3297
3298     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3299                                   self.op.hvparams)
3300     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3301     hv_type.CheckParameterSyntax(filled_hvp)
3302
3303     # fill and remember the beparams dict
3304     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3305                                     self.op.beparams)
3306
3307     #### instance parameters check
3308
3309     # instance name verification
3310     hostname1 = utils.HostInfo(self.op.instance_name)
3311     self.op.instance_name = instance_name = hostname1.name
3312
3313     # this is just a preventive check, but someone might still add this
3314     # instance in the meantime, and creation will fail at lock-add time
3315     if instance_name in self.cfg.GetInstanceList():
3316       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3317                                  instance_name)
3318
3319     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3320
3321     # ip validity checks
3322     ip = getattr(self.op, "ip", None)
3323     if ip is None or ip.lower() == "none":
3324       inst_ip = None
3325     elif ip.lower() == constants.VALUE_AUTO:
3326       inst_ip = hostname1.ip
3327     else:
3328       if not utils.IsValidIP(ip):
3329         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3330                                    " like a valid IP" % ip)
3331       inst_ip = ip
3332     self.inst_ip = self.op.ip = inst_ip
3333     # used in CheckPrereq for ip ping check
3334     self.check_ip = hostname1.ip
3335
3336     # MAC address verification
3337     if self.op.mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3338       if not utils.IsValidMac(self.op.mac.lower()):
3339         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3340                                    self.op.mac)
3341
3342     # file storage checks
3343     if (self.op.file_driver and
3344         not self.op.file_driver in constants.FILE_DRIVER):
3345       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3346                                  self.op.file_driver)
3347
3348     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3349       raise errors.OpPrereqError("File storage directory path not absolute")
3350
3351     ### Node/iallocator related checks
3352     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3353       raise errors.OpPrereqError("One and only one of iallocator and primary"
3354                                  " node must be given")
3355
3356     if self.op.iallocator:
3357       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3358     else:
3359       self.op.pnode = self._ExpandNode(self.op.pnode)
3360       nodelist = [self.op.pnode]
3361       if self.op.snode is not None:
3362         self.op.snode = self._ExpandNode(self.op.snode)
3363         nodelist.append(self.op.snode)
3364       self.needed_locks[locking.LEVEL_NODE] = nodelist
3365
3366     # in case of import lock the source node too
3367     if self.op.mode == constants.INSTANCE_IMPORT:
3368       src_node = getattr(self.op, "src_node", None)
3369       src_path = getattr(self.op, "src_path", None)
3370
3371       if src_node is None or src_path is None:
3372         raise errors.OpPrereqError("Importing an instance requires source"
3373                                    " node and path options")
3374
3375       if not os.path.isabs(src_path):
3376         raise errors.OpPrereqError("The source path must be absolute")
3377
3378       self.op.src_node = src_node = self._ExpandNode(src_node)
3379       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3380         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3381
3382     else: # INSTANCE_CREATE
3383       if getattr(self.op, "os_type", None) is None:
3384         raise errors.OpPrereqError("No guest OS specified")
3385
3386   def _RunAllocator(self):
3387     """Run the allocator based on input opcode.
3388
3389     """
3390     disks = [{"size": self.op.disk_size, "mode": "w"},
3391              {"size": self.op.swap_size, "mode": "w"}]
3392     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3393              "bridge": self.op.bridge}]
3394     ial = IAllocator(self,
3395                      mode=constants.IALLOCATOR_MODE_ALLOC,
3396                      name=self.op.instance_name,
3397                      disk_template=self.op.disk_template,
3398                      tags=[],
3399                      os=self.op.os_type,
3400                      vcpus=self.be_full[constants.BE_VCPUS],
3401                      mem_size=self.be_full[constants.BE_MEMORY],
3402                      disks=disks,
3403                      nics=nics,
3404                      )
3405
3406     ial.Run(self.op.iallocator)
3407
3408     if not ial.success:
3409       raise errors.OpPrereqError("Can't compute nodes using"
3410                                  " iallocator '%s': %s" % (self.op.iallocator,
3411                                                            ial.info))
3412     if len(ial.nodes) != ial.required_nodes:
3413       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3414                                  " of nodes (%s), required %s" %
3415                                  (self.op.iallocator, len(ial.nodes),
3416                                   ial.required_nodes))
3417     self.op.pnode = ial.nodes[0]
3418     feedback_fn("Selected nodes for the instance: %s" %
3419                 (", ".join(ial.nodes),))
3420     logging.info("Selected nodes for instance %s via iallocator %s: %s",
3421                  self.op.instance_name, self.op.iallocator, ial.nodes)
3422     if ial.required_nodes == 2:
3423       self.op.snode = ial.nodes[1]
3424
3425   def BuildHooksEnv(self):
3426     """Build hooks env.
3427
3428     This runs on master, primary and secondary nodes of the instance.
3429
3430     """
3431     env = {
3432       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3433       "INSTANCE_DISK_SIZE": self.op.disk_size,
3434       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3435       "INSTANCE_ADD_MODE": self.op.mode,
3436       }
3437     if self.op.mode == constants.INSTANCE_IMPORT:
3438       env["INSTANCE_SRC_NODE"] = self.op.src_node
3439       env["INSTANCE_SRC_PATH"] = self.op.src_path
3440       env["INSTANCE_SRC_IMAGE"] = self.src_image
3441
3442     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3443       primary_node=self.op.pnode,
3444       secondary_nodes=self.secondaries,
3445       status=self.instance_status,
3446       os_type=self.op.os_type,
3447       memory=self.be_full[constants.BE_MEMORY],
3448       vcpus=self.be_full[constants.BE_VCPUS],
3449       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3450     ))
3451
3452     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3453           self.secondaries)
3454     return env, nl, nl
3455
3456
3457   def CheckPrereq(self):
3458     """Check prerequisites.
3459
3460     """
3461     if (not self.cfg.GetVGName() and
3462         self.op.disk_template not in constants.DTS_NOT_LVM):
3463       raise errors.OpPrereqError("Cluster does not support lvm-based"
3464                                  " instances")
3465
3466
3467     if self.op.mode == constants.INSTANCE_IMPORT:
3468       src_node = self.op.src_node
3469       src_path = self.op.src_path
3470
3471       export_info = self.rpc.call_export_info(src_node, src_path)
3472
3473       if not export_info:
3474         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3475
3476       if not export_info.has_section(constants.INISECT_EXP):
3477         raise errors.ProgrammerError("Corrupted export config")
3478
3479       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3480       if (int(ei_version) != constants.EXPORT_VERSION):
3481         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3482                                    (ei_version, constants.EXPORT_VERSION))
3483
3484       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3485         raise errors.OpPrereqError("Can't import instance with more than"
3486                                    " one data disk")
3487
3488       # FIXME: are the old os-es, disk sizes, etc. useful?
3489       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3490       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3491                                                          'disk0_dump'))
3492       self.src_image = diskimage
3493
3494       if self.op.mac == constants.VALUE_AUTO:
3495         old_name = export_info.get(constants.INISECT_INS, 'name')
3496         if self.op.instance_name == old_name:
3497           # FIXME: adjust every nic, when we'll be able to create instances
3498           # with more than one
3499           if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
3500             self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
3501
3502     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3503
3504     if self.op.start and not self.op.ip_check:
3505       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3506                                  " adding an instance in start mode")
3507
3508     if self.op.ip_check:
3509       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3510         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3511                                    (self.check_ip, self.op.instance_name))
3512
3513     # bridge verification
3514     bridge = getattr(self.op, "bridge", None)
3515     if bridge is None:
3516       self.op.bridge = self.cfg.GetDefBridge()
3517     else:
3518       self.op.bridge = bridge
3519
3520     #### allocator run
3521
3522     if self.op.iallocator is not None:
3523       self._RunAllocator()
3524
3525     #### node related checks
3526
3527     # check primary node
3528     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3529     assert self.pnode is not None, \
3530       "Cannot retrieve locked node %s" % self.op.pnode
3531     self.secondaries = []
3532
3533     # mirror node verification
3534     if self.op.disk_template in constants.DTS_NET_MIRROR:
3535       if self.op.snode is None:
3536         raise errors.OpPrereqError("The networked disk templates need"
3537                                    " a mirror node")
3538       if self.op.snode == pnode.name:
3539         raise errors.OpPrereqError("The secondary node cannot be"
3540                                    " the primary node.")
3541       self.secondaries.append(self.op.snode)
3542
3543     nodenames = [pnode.name] + self.secondaries
3544
3545     req_size = _ComputeDiskSize(self.op.disk_template,
3546                                 self.op.disk_size, self.op.swap_size)
3547
3548     # Check lv size requirements
3549     if req_size is not None:
3550       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3551                                          self.op.hypervisor)
3552       for node in nodenames:
3553         info = nodeinfo.get(node, None)
3554         if not info:
3555           raise errors.OpPrereqError("Cannot get current information"
3556                                      " from node '%s'" % node)
3557         vg_free = info.get('vg_free', None)
3558         if not isinstance(vg_free, int):
3559           raise errors.OpPrereqError("Can't compute free disk space on"
3560                                      " node %s" % node)
3561         if req_size > info['vg_free']:
3562           raise errors.OpPrereqError("Not enough disk space on target node %s."
3563                                      " %d MB available, %d MB required" %
3564                                      (node, info['vg_free'], req_size))
3565
3566     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3567
3568     # os verification
3569     os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3570     if not os_obj:
3571       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3572                                  " primary node"  % self.op.os_type)
3573
3574     # bridge check on primary node
3575     if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3576       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3577                                  " destination node '%s'" %
3578                                  (self.op.bridge, pnode.name))
3579
3580     # memory check on primary node
3581     if self.op.start:
3582       _CheckNodeFreeMemory(self, self.pnode.name,
3583                            "creating instance %s" % self.op.instance_name,
3584                            self.be_full[constants.BE_MEMORY],
3585                            self.op.hypervisor)
3586
3587     if self.op.start:
3588       self.instance_status = 'up'
3589     else:
3590       self.instance_status = 'down'
3591
3592   def Exec(self, feedback_fn):
3593     """Create and add the instance to the cluster.
3594
3595     """
3596     instance = self.op.instance_name
3597     pnode_name = self.pnode.name
3598
3599     if self.op.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3600       mac_address = self.cfg.GenerateMAC()
3601     else:
3602       mac_address = self.op.mac
3603
3604     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3605     if self.inst_ip is not None:
3606       nic.ip = self.inst_ip
3607
3608     ht_kind = self.op.hypervisor
3609     if ht_kind in constants.HTS_REQ_PORT:
3610       network_port = self.cfg.AllocatePort()
3611     else:
3612       network_port = None
3613
3614     ##if self.op.vnc_bind_address is None:
3615     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3616
3617     # this is needed because os.path.join does not accept None arguments
3618     if self.op.file_storage_dir is None:
3619       string_file_storage_dir = ""
3620     else:
3621       string_file_storage_dir = self.op.file_storage_dir
3622
3623     # build the full file storage dir path
3624     file_storage_dir = os.path.normpath(os.path.join(
3625                                         self.cfg.GetFileStorageDir(),
3626                                         string_file_storage_dir, instance))
3627
3628
3629     disks = _GenerateDiskTemplate(self,
3630                                   self.op.disk_template,
3631                                   instance, pnode_name,
3632                                   self.secondaries, self.op.disk_size,
3633                                   self.op.swap_size,
3634                                   file_storage_dir,
3635                                   self.op.file_driver)
3636
3637     iobj = objects.Instance(name=instance, os=self.op.os_type,
3638                             primary_node=pnode_name,
3639                             nics=[nic], disks=disks,
3640                             disk_template=self.op.disk_template,
3641                             status=self.instance_status,
3642                             network_port=network_port,
3643                             beparams=self.op.beparams,
3644                             hvparams=self.op.hvparams,
3645                             hypervisor=self.op.hypervisor,
3646                             )
3647
3648     feedback_fn("* creating instance disks...")
3649     if not _CreateDisks(self, iobj):
3650       _RemoveDisks(self, iobj)
3651       self.cfg.ReleaseDRBDMinors(instance)
3652       raise errors.OpExecError("Device creation failed, reverting...")
3653
3654     feedback_fn("adding instance %s to cluster config" % instance)
3655
3656     self.cfg.AddInstance(iobj)
3657     # Declare that we don't want to remove the instance lock anymore, as we've
3658     # added the instance to the config
3659     del self.remove_locks[locking.LEVEL_INSTANCE]
3660     # Remove the temp. assignements for the instance's drbds
3661     self.cfg.ReleaseDRBDMinors(instance)
3662
3663     if self.op.wait_for_sync:
3664       disk_abort = not _WaitForSync(self, iobj)
3665     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3666       # make sure the disks are not degraded (still sync-ing is ok)
3667       time.sleep(15)
3668       feedback_fn("* checking mirrors status")
3669       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3670     else:
3671       disk_abort = False
3672
3673     if disk_abort:
3674       _RemoveDisks(self, iobj)
3675       self.cfg.RemoveInstance(iobj.name)
3676       # Make sure the instance lock gets removed
3677       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3678       raise errors.OpExecError("There are some degraded disks for"
3679                                " this instance")
3680
3681     feedback_fn("creating os for instance %s on node %s" %
3682                 (instance, pnode_name))
3683
3684     if iobj.disk_template != constants.DT_DISKLESS:
3685       if self.op.mode == constants.INSTANCE_CREATE:
3686         feedback_fn("* running the instance OS create scripts...")
3687         if not self.rpc.call_instance_os_add(pnode_name, iobj):
3688           raise errors.OpExecError("could not add os for instance %s"
3689                                    " on node %s" %
3690                                    (instance, pnode_name))
3691
3692       elif self.op.mode == constants.INSTANCE_IMPORT:
3693         feedback_fn("* running the instance OS import scripts...")
3694         src_node = self.op.src_node
3695         src_image = self.src_image
3696         cluster_name = self.cfg.GetClusterName()
3697         if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3698                                                 src_node, src_image,
3699                                                 cluster_name):
3700           raise errors.OpExecError("Could not import os for instance"
3701                                    " %s on node %s" %
3702                                    (instance, pnode_name))
3703       else:
3704         # also checked in the prereq part
3705         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3706                                      % self.op.mode)
3707
3708     if self.op.start:
3709       logging.info("Starting instance %s on node %s", instance, pnode_name)
3710       feedback_fn("* starting instance...")
3711       if not self.rpc.call_instance_start(pnode_name, iobj, None):
3712         raise errors.OpExecError("Could not start instance")
3713
3714
3715 class LUConnectConsole(NoHooksLU):
3716   """Connect to an instance's console.
3717
3718   This is somewhat special in that it returns the command line that
3719   you need to run on the master node in order to connect to the
3720   console.
3721
3722   """
3723   _OP_REQP = ["instance_name"]
3724   REQ_BGL = False
3725
3726   def ExpandNames(self):
3727     self._ExpandAndLockInstance()
3728
3729   def CheckPrereq(self):
3730     """Check prerequisites.
3731
3732     This checks that the instance is in the cluster.
3733
3734     """
3735     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3736     assert self.instance is not None, \
3737       "Cannot retrieve locked instance %s" % self.op.instance_name
3738
3739   def Exec(self, feedback_fn):
3740     """Connect to the console of an instance
3741
3742     """
3743     instance = self.instance
3744     node = instance.primary_node
3745
3746     node_insts = self.rpc.call_instance_list([node],
3747                                              [instance.hypervisor])[node]
3748     if node_insts is False:
3749       raise errors.OpExecError("Can't connect to node %s." % node)
3750
3751     if instance.name not in node_insts:
3752       raise errors.OpExecError("Instance %s is not running." % instance.name)
3753
3754     logging.debug("Connecting to console of %s on %s", instance.name, node)
3755
3756     hyper = hypervisor.GetHypervisor(instance.hypervisor)
3757     console_cmd = hyper.GetShellCommandForConsole(instance)
3758
3759     # build ssh cmdline
3760     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3761
3762
3763 class LUReplaceDisks(LogicalUnit):
3764   """Replace the disks of an instance.
3765
3766   """
3767   HPATH = "mirrors-replace"
3768   HTYPE = constants.HTYPE_INSTANCE
3769   _OP_REQP = ["instance_name", "mode", "disks"]
3770   REQ_BGL = False
3771
3772   def ExpandNames(self):
3773     self._ExpandAndLockInstance()
3774
3775     if not hasattr(self.op, "remote_node"):
3776       self.op.remote_node = None
3777
3778     ia_name = getattr(self.op, "iallocator", None)
3779     if ia_name is not None:
3780       if self.op.remote_node is not None:
3781         raise errors.OpPrereqError("Give either the iallocator or the new"
3782                                    " secondary, not both")
3783       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3784     elif self.op.remote_node is not None:
3785       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3786       if remote_node is None:
3787         raise errors.OpPrereqError("Node '%s' not known" %
3788                                    self.op.remote_node)
3789       self.op.remote_node = remote_node
3790       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3791       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3792     else:
3793       self.needed_locks[locking.LEVEL_NODE] = []
3794       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3795
3796   def DeclareLocks(self, level):
3797     # If we're not already locking all nodes in the set we have to declare the
3798     # instance's primary/secondary nodes.
3799     if (level == locking.LEVEL_NODE and
3800         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3801       self._LockInstancesNodes()
3802
3803   def _RunAllocator(self):
3804     """Compute a new secondary node using an IAllocator.
3805
3806     """
3807     ial = IAllocator(self,
3808                      mode=constants.IALLOCATOR_MODE_RELOC,
3809                      name=self.op.instance_name,
3810                      relocate_from=[self.sec_node])
3811
3812     ial.Run(self.op.iallocator)
3813
3814     if not ial.success:
3815       raise errors.OpPrereqError("Can't compute nodes using"
3816                                  " iallocator '%s': %s" % (self.op.iallocator,
3817                                                            ial.info))
3818     if len(ial.nodes) != ial.required_nodes:
3819       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3820                                  " of nodes (%s), required %s" %
3821                                  (len(ial.nodes), ial.required_nodes))
3822     self.op.remote_node = ial.nodes[0]
3823     feedback_fn("Selected new secondary for the instance: %s" %
3824                 self.op.remote_node)
3825
3826   def BuildHooksEnv(self):
3827     """Build hooks env.
3828
3829     This runs on the master, the primary and all the secondaries.
3830
3831     """
3832     env = {
3833       "MODE": self.op.mode,
3834       "NEW_SECONDARY": self.op.remote_node,
3835       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3836       }
3837     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3838     nl = [
3839       self.cfg.GetMasterNode(),
3840       self.instance.primary_node,
3841       ]
3842     if self.op.remote_node is not None:
3843       nl.append(self.op.remote_node)
3844     return env, nl, nl
3845
3846   def CheckPrereq(self):
3847     """Check prerequisites.
3848
3849     This checks that the instance is in the cluster.
3850
3851     """
3852     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3853     assert instance is not None, \
3854       "Cannot retrieve locked instance %s" % self.op.instance_name
3855     self.instance = instance
3856
3857     if instance.disk_template not in constants.DTS_NET_MIRROR:
3858       raise errors.OpPrereqError("Instance's disk layout is not"
3859                                  " network mirrored.")
3860
3861     if len(instance.secondary_nodes) != 1:
3862       raise errors.OpPrereqError("The instance has a strange layout,"
3863                                  " expected one secondary but found %d" %
3864                                  len(instance.secondary_nodes))
3865
3866     self.sec_node = instance.secondary_nodes[0]
3867
3868     ia_name = getattr(self.op, "iallocator", None)
3869     if ia_name is not None:
3870       self._RunAllocator()
3871
3872     remote_node = self.op.remote_node
3873     if remote_node is not None:
3874       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3875       assert self.remote_node_info is not None, \
3876         "Cannot retrieve locked node %s" % remote_node
3877     else:
3878       self.remote_node_info = None
3879     if remote_node == instance.primary_node:
3880       raise errors.OpPrereqError("The specified node is the primary node of"
3881                                  " the instance.")
3882     elif remote_node == self.sec_node:
3883       if self.op.mode == constants.REPLACE_DISK_SEC:
3884         # this is for DRBD8, where we can't execute the same mode of
3885         # replacement as for drbd7 (no different port allocated)
3886         raise errors.OpPrereqError("Same secondary given, cannot execute"
3887                                    " replacement")
3888     if instance.disk_template == constants.DT_DRBD8:
3889       if (self.op.mode == constants.REPLACE_DISK_ALL and
3890           remote_node is not None):
3891         # switch to replace secondary mode
3892         self.op.mode = constants.REPLACE_DISK_SEC
3893
3894       if self.op.mode == constants.REPLACE_DISK_ALL:
3895         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3896                                    " secondary disk replacement, not"
3897                                    " both at once")
3898       elif self.op.mode == constants.REPLACE_DISK_PRI:
3899         if remote_node is not None:
3900           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3901                                      " the secondary while doing a primary"
3902                                      " node disk replacement")
3903         self.tgt_node = instance.primary_node
3904         self.oth_node = instance.secondary_nodes[0]
3905       elif self.op.mode == constants.REPLACE_DISK_SEC:
3906         self.new_node = remote_node # this can be None, in which case
3907                                     # we don't change the secondary
3908         self.tgt_node = instance.secondary_nodes[0]
3909         self.oth_node = instance.primary_node
3910       else:
3911         raise errors.ProgrammerError("Unhandled disk replace mode")
3912
3913     for name in self.op.disks:
3914       if instance.FindDisk(name) is None:
3915         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3916                                    (name, instance.name))
3917
3918   def _ExecD8DiskOnly(self, feedback_fn):
3919     """Replace a disk on the primary or secondary for dbrd8.
3920
3921     The algorithm for replace is quite complicated:
3922       - for each disk to be replaced:
3923         - create new LVs on the target node with unique names
3924         - detach old LVs from the drbd device
3925         - rename old LVs to name_replaced.<time_t>
3926         - rename new LVs to old LVs
3927         - attach the new LVs (with the old names now) to the drbd device
3928       - wait for sync across all devices
3929       - for each modified disk:
3930         - remove old LVs (which have the name name_replaces.<time_t>)
3931
3932     Failures are not very well handled.
3933
3934     """
3935     steps_total = 6
3936     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3937     instance = self.instance
3938     iv_names = {}
3939     vgname = self.cfg.GetVGName()
3940     # start of work
3941     cfg = self.cfg
3942     tgt_node = self.tgt_node
3943     oth_node = self.oth_node
3944
3945     # Step: check device activation
3946     self.proc.LogStep(1, steps_total, "check device existence")
3947     info("checking volume groups")
3948     my_vg = cfg.GetVGName()
3949     results = self.rpc.call_vg_list([oth_node, tgt_node])
3950     if not results:
3951       raise errors.OpExecError("Can't list volume groups on the nodes")
3952     for node in oth_node, tgt_node:
3953       res = results.get(node, False)
3954       if not res or my_vg not in res:
3955         raise errors.OpExecError("Volume group '%s' not found on %s" %
3956                                  (my_vg, node))
3957     for dev in instance.disks:
3958       if not dev.iv_name in self.op.disks:
3959         continue
3960       for node in tgt_node, oth_node:
3961         info("checking %s on %s" % (dev.iv_name, node))
3962         cfg.SetDiskID(dev, node)
3963         if not self.rpc.call_blockdev_find(node, dev):
3964           raise errors.OpExecError("Can't find device %s on node %s" %
3965                                    (dev.iv_name, node))
3966
3967     # Step: check other node consistency
3968     self.proc.LogStep(2, steps_total, "check peer consistency")
3969     for dev in instance.disks:
3970       if not dev.iv_name in self.op.disks:
3971         continue
3972       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3973       if not _CheckDiskConsistency(self, dev, oth_node,
3974                                    oth_node==instance.primary_node):
3975         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3976                                  " to replace disks on this node (%s)" %
3977                                  (oth_node, tgt_node))
3978
3979     # Step: create new storage
3980     self.proc.LogStep(3, steps_total, "allocate new storage")
3981     for dev in instance.disks:
3982       if not dev.iv_name in self.op.disks:
3983         continue
3984       size = dev.size
3985       cfg.SetDiskID(dev, tgt_node)
3986       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3987       names = _GenerateUniqueNames(self, lv_names)
3988       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3989                              logical_id=(vgname, names[0]))
3990       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3991                              logical_id=(vgname, names[1]))
3992       new_lvs = [lv_data, lv_meta]
3993       old_lvs = dev.children
3994       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3995       info("creating new local storage on %s for %s" %
3996            (tgt_node, dev.iv_name))
3997       # since we *always* want to create this LV, we use the
3998       # _Create...OnPrimary (which forces the creation), even if we
3999       # are talking about the secondary node
4000       for new_lv in new_lvs:
4001         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4002                                         _GetInstanceInfoText(instance)):
4003           raise errors.OpExecError("Failed to create new LV named '%s' on"
4004                                    " node '%s'" %
4005                                    (new_lv.logical_id[1], tgt_node))
4006
4007     # Step: for each lv, detach+rename*2+attach
4008     self.proc.LogStep(4, steps_total, "change drbd configuration")
4009     for dev, old_lvs, new_lvs in iv_names.itervalues():
4010       info("detaching %s drbd from local storage" % dev.iv_name)
4011       if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4012         raise errors.OpExecError("Can't detach drbd from local storage on node"
4013                                  " %s for device %s" % (tgt_node, dev.iv_name))
4014       #dev.children = []
4015       #cfg.Update(instance)
4016
4017       # ok, we created the new LVs, so now we know we have the needed
4018       # storage; as such, we proceed on the target node to rename
4019       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4020       # using the assumption that logical_id == physical_id (which in
4021       # turn is the unique_id on that node)
4022
4023       # FIXME(iustin): use a better name for the replaced LVs
4024       temp_suffix = int(time.time())
4025       ren_fn = lambda d, suff: (d.physical_id[0],
4026                                 d.physical_id[1] + "_replaced-%s" % suff)
4027       # build the rename list based on what LVs exist on the node
4028       rlist = []
4029       for to_ren in old_lvs:
4030         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4031         if find_res is not None: # device exists
4032           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4033
4034       info("renaming the old LVs on the target node")
4035       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4036         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4037       # now we rename the new LVs to the old LVs
4038       info("renaming the new LVs on the target node")
4039       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4040       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4041         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4042
4043       for old, new in zip(old_lvs, new_lvs):
4044         new.logical_id = old.logical_id
4045         cfg.SetDiskID(new, tgt_node)
4046
4047       for disk in old_lvs:
4048         disk.logical_id = ren_fn(disk, temp_suffix)
4049         cfg.SetDiskID(disk, tgt_node)
4050
4051       # now that the new lvs have the old name, we can add them to the device
4052       info("adding new mirror component on %s" % tgt_node)
4053       if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4054         for new_lv in new_lvs:
4055           if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4056             warning("Can't rollback device %s", hint="manually cleanup unused"
4057                     " logical volumes")
4058         raise errors.OpExecError("Can't add local storage to drbd")
4059
4060       dev.children = new_lvs
4061       cfg.Update(instance)
4062
4063     # Step: wait for sync
4064
4065     # this can fail as the old devices are degraded and _WaitForSync
4066     # does a combined result over all disks, so we don't check its
4067     # return value
4068     self.proc.LogStep(5, steps_total, "sync devices")
4069     _WaitForSync(self, instance, unlock=True)
4070
4071     # so check manually all the devices
4072     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4073       cfg.SetDiskID(dev, instance.primary_node)
4074       is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4075       if is_degr:
4076         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4077
4078     # Step: remove old storage
4079     self.proc.LogStep(6, steps_total, "removing old storage")
4080     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4081       info("remove logical volumes for %s" % name)
4082       for lv in old_lvs:
4083         cfg.SetDiskID(lv, tgt_node)
4084         if not self.rpc.call_blockdev_remove(tgt_node, lv):
4085           warning("Can't remove old LV", hint="manually remove unused LVs")
4086           continue
4087
4088   def _ExecD8Secondary(self, feedback_fn):
4089     """Replace the secondary node for drbd8.
4090
4091     The algorithm for replace is quite complicated:
4092       - for all disks of the instance:
4093         - create new LVs on the new node with same names
4094         - shutdown the drbd device on the old secondary
4095         - disconnect the drbd network on the primary
4096         - create the drbd device on the new secondary
4097         - network attach the drbd on the primary, using an artifice:
4098           the drbd code for Attach() will connect to the network if it
4099           finds a device which is connected to the good local disks but
4100           not network enabled
4101       - wait for sync across all devices
4102       - remove all disks from the old secondary
4103
4104     Failures are not very well handled.
4105
4106     """
4107     steps_total = 6
4108     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4109     instance = self.instance
4110     iv_names = {}
4111     vgname = self.cfg.GetVGName()
4112     # start of work
4113     cfg = self.cfg
4114     old_node = self.tgt_node
4115     new_node = self.new_node
4116     pri_node = instance.primary_node
4117
4118     # Step: check device activation
4119     self.proc.LogStep(1, steps_total, "check device existence")
4120     info("checking volume groups")
4121     my_vg = cfg.GetVGName()
4122     results = self.rpc.call_vg_list([pri_node, new_node])
4123     if not results:
4124       raise errors.OpExecError("Can't list volume groups on the nodes")
4125     for node in pri_node, new_node:
4126       res = results.get(node, False)
4127       if not res or my_vg not in res:
4128         raise errors.OpExecError("Volume group '%s' not found on %s" %
4129                                  (my_vg, node))
4130     for dev in instance.disks:
4131       if not dev.iv_name in self.op.disks:
4132         continue
4133       info("checking %s on %s" % (dev.iv_name, pri_node))
4134       cfg.SetDiskID(dev, pri_node)
4135       if not self.rpc.call_blockdev_find(pri_node, dev):
4136         raise errors.OpExecError("Can't find device %s on node %s" %
4137                                  (dev.iv_name, pri_node))
4138
4139     # Step: check other node consistency
4140     self.proc.LogStep(2, steps_total, "check peer consistency")
4141     for dev in instance.disks:
4142       if not dev.iv_name in self.op.disks:
4143         continue
4144       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4145       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4146         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4147                                  " unsafe to replace the secondary" %
4148                                  pri_node)
4149
4150     # Step: create new storage
4151     self.proc.LogStep(3, steps_total, "allocate new storage")
4152     for dev in instance.disks:
4153       size = dev.size
4154       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4155       # since we *always* want to create this LV, we use the
4156       # _Create...OnPrimary (which forces the creation), even if we
4157       # are talking about the secondary node
4158       for new_lv in dev.children:
4159         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4160                                         _GetInstanceInfoText(instance)):
4161           raise errors.OpExecError("Failed to create new LV named '%s' on"
4162                                    " node '%s'" %
4163                                    (new_lv.logical_id[1], new_node))
4164
4165
4166     # Step 4: dbrd minors and drbd setups changes
4167     # after this, we must manually remove the drbd minors on both the
4168     # error and the success paths
4169     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4170                                    instance.name)
4171     logging.debug("Allocated minors %s" % (minors,))
4172     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4173     for dev, new_minor in zip(instance.disks, minors):
4174       size = dev.size
4175       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4176       # create new devices on new_node
4177       if pri_node == dev.logical_id[0]:
4178         new_logical_id = (pri_node, new_node,
4179                           dev.logical_id[2], dev.logical_id[3], new_minor,
4180                           dev.logical_id[5])
4181       else:
4182         new_logical_id = (new_node, pri_node,
4183                           dev.logical_id[2], new_minor, dev.logical_id[4],
4184                           dev.logical_id[5])
4185       iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4186       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4187                     new_logical_id)
4188       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4189                               logical_id=new_logical_id,
4190                               children=dev.children)
4191       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4192                                         new_drbd, False,
4193                                         _GetInstanceInfoText(instance)):
4194         self.cfg.ReleaseDRBDMinors(instance.name)
4195         raise errors.OpExecError("Failed to create new DRBD on"
4196                                  " node '%s'" % new_node)
4197
4198     for dev in instance.disks:
4199       # we have new devices, shutdown the drbd on the old secondary
4200       info("shutting down drbd for %s on old node" % dev.iv_name)
4201       cfg.SetDiskID(dev, old_node)
4202       if not self.rpc.call_blockdev_shutdown(old_node, dev):
4203         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4204                 hint="Please cleanup this device manually as soon as possible")
4205
4206     info("detaching primary drbds from the network (=> standalone)")
4207     done = 0
4208     for dev in instance.disks:
4209       cfg.SetDiskID(dev, pri_node)
4210       # set the network part of the physical (unique in bdev terms) id
4211       # to None, meaning detach from network
4212       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4213       # and 'find' the device, which will 'fix' it to match the
4214       # standalone state
4215       if self.rpc.call_blockdev_find(pri_node, dev):
4216         done += 1
4217       else:
4218         warning("Failed to detach drbd %s from network, unusual case" %
4219                 dev.iv_name)
4220
4221     if not done:
4222       # no detaches succeeded (very unlikely)
4223       self.cfg.ReleaseDRBDMinors(instance.name)
4224       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4225
4226     # if we managed to detach at least one, we update all the disks of
4227     # the instance to point to the new secondary
4228     info("updating instance configuration")
4229     for dev, _, new_logical_id in iv_names.itervalues():
4230       dev.logical_id = new_logical_id
4231       cfg.SetDiskID(dev, pri_node)
4232     cfg.Update(instance)
4233     # we can remove now the temp minors as now the new values are
4234     # written to the config file (and therefore stable)
4235     self.cfg.ReleaseDRBDMinors(instance.name)
4236
4237     # and now perform the drbd attach
4238     info("attaching primary drbds to new secondary (standalone => connected)")
4239     failures = []
4240     for dev in instance.disks:
4241       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4242       # since the attach is smart, it's enough to 'find' the device,
4243       # it will automatically activate the network, if the physical_id
4244       # is correct
4245       cfg.SetDiskID(dev, pri_node)
4246       logging.debug("Disk to attach: %s", dev)
4247       if not self.rpc.call_blockdev_find(pri_node, dev):
4248         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4249                 "please do a gnt-instance info to see the status of disks")
4250
4251     # this can fail as the old devices are degraded and _WaitForSync
4252     # does a combined result over all disks, so we don't check its
4253     # return value
4254     self.proc.LogStep(5, steps_total, "sync devices")
4255     _WaitForSync(self, instance, unlock=True)
4256
4257     # so check manually all the devices
4258     for name, (dev, old_lvs, _) in iv_names.iteritems():
4259       cfg.SetDiskID(dev, pri_node)
4260       is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4261       if is_degr:
4262         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4263
4264     self.proc.LogStep(6, steps_total, "removing old storage")
4265     for name, (dev, old_lvs, _) in iv_names.iteritems():
4266       info("remove logical volumes for %s" % name)
4267       for lv in old_lvs:
4268         cfg.SetDiskID(lv, old_node)
4269         if not self.rpc.call_blockdev_remove(old_node, lv):
4270           warning("Can't remove LV on old secondary",
4271                   hint="Cleanup stale volumes by hand")
4272
4273   def Exec(self, feedback_fn):
4274     """Execute disk replacement.
4275
4276     This dispatches the disk replacement to the appropriate handler.
4277
4278     """
4279     instance = self.instance
4280
4281     # Activate the instance disks if we're replacing them on a down instance
4282     if instance.status == "down":
4283       _StartInstanceDisks(self, instance, True)
4284
4285     if instance.disk_template == constants.DT_DRBD8:
4286       if self.op.remote_node is None:
4287         fn = self._ExecD8DiskOnly
4288       else:
4289         fn = self._ExecD8Secondary
4290     else:
4291       raise errors.ProgrammerError("Unhandled disk replacement case")
4292
4293     ret = fn(feedback_fn)
4294
4295     # Deactivate the instance disks if we're replacing them on a down instance
4296     if instance.status == "down":
4297       _SafeShutdownInstanceDisks(self, instance)
4298
4299     return ret
4300
4301
4302 class LUGrowDisk(LogicalUnit):
4303   """Grow a disk of an instance.
4304
4305   """
4306   HPATH = "disk-grow"
4307   HTYPE = constants.HTYPE_INSTANCE
4308   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4309   REQ_BGL = False
4310
4311   def ExpandNames(self):
4312     self._ExpandAndLockInstance()
4313     self.needed_locks[locking.LEVEL_NODE] = []
4314     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4315
4316   def DeclareLocks(self, level):
4317     if level == locking.LEVEL_NODE:
4318       self._LockInstancesNodes()
4319
4320   def BuildHooksEnv(self):
4321     """Build hooks env.
4322
4323     This runs on the master, the primary and all the secondaries.
4324
4325     """
4326     env = {
4327       "DISK": self.op.disk,
4328       "AMOUNT": self.op.amount,
4329       }
4330     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4331     nl = [
4332       self.cfg.GetMasterNode(),
4333       self.instance.primary_node,
4334       ]
4335     return env, nl, nl
4336
4337   def CheckPrereq(self):
4338     """Check prerequisites.
4339
4340     This checks that the instance is in the cluster.
4341
4342     """
4343     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4344     assert instance is not None, \
4345       "Cannot retrieve locked instance %s" % self.op.instance_name
4346
4347     self.instance = instance
4348
4349     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4350       raise errors.OpPrereqError("Instance's disk layout does not support"
4351                                  " growing.")
4352
4353     if instance.FindDisk(self.op.disk) is None:
4354       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4355                                  (self.op.disk, instance.name))
4356
4357     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4358     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4359                                        instance.hypervisor)
4360     for node in nodenames:
4361       info = nodeinfo.get(node, None)
4362       if not info:
4363         raise errors.OpPrereqError("Cannot get current information"
4364                                    " from node '%s'" % node)
4365       vg_free = info.get('vg_free', None)
4366       if not isinstance(vg_free, int):
4367         raise errors.OpPrereqError("Can't compute free disk space on"
4368                                    " node %s" % node)
4369       if self.op.amount > info['vg_free']:
4370         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4371                                    " %d MiB available, %d MiB required" %
4372                                    (node, info['vg_free'], self.op.amount))
4373
4374   def Exec(self, feedback_fn):
4375     """Execute disk grow.
4376
4377     """
4378     instance = self.instance
4379     disk = instance.FindDisk(self.op.disk)
4380     for node in (instance.secondary_nodes + (instance.primary_node,)):
4381       self.cfg.SetDiskID(disk, node)
4382       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4383       if (not result or not isinstance(result, (list, tuple)) or
4384           len(result) != 2):
4385         raise errors.OpExecError("grow request failed to node %s" % node)
4386       elif not result[0]:
4387         raise errors.OpExecError("grow request failed to node %s: %s" %
4388                                  (node, result[1]))
4389     disk.RecordGrow(self.op.amount)
4390     self.cfg.Update(instance)
4391     if self.op.wait_for_sync:
4392       disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4393       if disk_abort:
4394         logging.error("Warning: disk sync-ing has not returned a good"
4395                       " status.\nPlease check the instance.")
4396
4397
4398 class LUQueryInstanceData(NoHooksLU):
4399   """Query runtime instance data.
4400
4401   """
4402   _OP_REQP = ["instances", "static"]
4403   REQ_BGL = False
4404
4405   def ExpandNames(self):
4406     self.needed_locks = {}
4407     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4408
4409     if not isinstance(self.op.instances, list):
4410       raise errors.OpPrereqError("Invalid argument type 'instances'")
4411
4412     if self.op.instances:
4413       self.wanted_names = []
4414       for name in self.op.instances:
4415         full_name = self.cfg.ExpandInstanceName(name)
4416         if full_name is None:
4417           raise errors.OpPrereqError("Instance '%s' not known" %
4418                                      self.op.instance_name)
4419         self.wanted_names.append(full_name)
4420       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4421     else:
4422       self.wanted_names = None
4423       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4424
4425     self.needed_locks[locking.LEVEL_NODE] = []
4426     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4427
4428   def DeclareLocks(self, level):
4429     if level == locking.LEVEL_NODE:
4430       self._LockInstancesNodes()
4431
4432   def CheckPrereq(self):
4433     """Check prerequisites.
4434
4435     This only checks the optional instance list against the existing names.
4436
4437     """
4438     if self.wanted_names is None:
4439       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4440
4441     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4442                              in self.wanted_names]
4443     return
4444
4445   def _ComputeDiskStatus(self, instance, snode, dev):
4446     """Compute block device status.
4447
4448     """
4449     static = self.op.static
4450     if not static:
4451       self.cfg.SetDiskID(dev, instance.primary_node)
4452       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4453     else:
4454       dev_pstatus = None
4455
4456     if dev.dev_type in constants.LDS_DRBD:
4457       # we change the snode then (otherwise we use the one passed in)
4458       if dev.logical_id[0] == instance.primary_node:
4459         snode = dev.logical_id[1]
4460       else:
4461         snode = dev.logical_id[0]
4462
4463     if snode and not static:
4464       self.cfg.SetDiskID(dev, snode)
4465       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4466     else:
4467       dev_sstatus = None
4468
4469     if dev.children:
4470       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4471                       for child in dev.children]
4472     else:
4473       dev_children = []
4474
4475     data = {
4476       "iv_name": dev.iv_name,
4477       "dev_type": dev.dev_type,
4478       "logical_id": dev.logical_id,
4479       "physical_id": dev.physical_id,
4480       "pstatus": dev_pstatus,
4481       "sstatus": dev_sstatus,
4482       "children": dev_children,
4483       }
4484
4485     return data
4486
4487   def Exec(self, feedback_fn):
4488     """Gather and return data"""
4489     result = {}
4490
4491     cluster = self.cfg.GetClusterInfo()
4492
4493     for instance in self.wanted_instances:
4494       if not self.op.static:
4495         remote_info = self.rpc.call_instance_info(instance.primary_node,
4496                                                   instance.name,
4497                                                   instance.hypervisor)
4498         if remote_info and "state" in remote_info:
4499           remote_state = "up"
4500         else:
4501           remote_state = "down"
4502       else:
4503         remote_state = None
4504       if instance.status == "down":
4505         config_state = "down"
4506       else:
4507         config_state = "up"
4508
4509       disks = [self._ComputeDiskStatus(instance, None, device)
4510                for device in instance.disks]
4511
4512       idict = {
4513         "name": instance.name,
4514         "config_state": config_state,
4515         "run_state": remote_state,
4516         "pnode": instance.primary_node,
4517         "snodes": instance.secondary_nodes,
4518         "os": instance.os,
4519         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4520         "disks": disks,
4521         "hypervisor": instance.hypervisor,
4522         "network_port": instance.network_port,
4523         "hv_instance": instance.hvparams,
4524         "hv_actual": cluster.FillHV(instance),
4525         "be_instance": instance.beparams,
4526         "be_actual": cluster.FillBE(instance),
4527         }
4528
4529       result[instance.name] = idict
4530
4531     return result
4532
4533
4534 class LUSetInstanceParams(LogicalUnit):
4535   """Modifies an instances's parameters.
4536
4537   """
4538   HPATH = "instance-modify"
4539   HTYPE = constants.HTYPE_INSTANCE
4540   _OP_REQP = ["instance_name", "hvparams"]
4541   REQ_BGL = False
4542
4543   def ExpandNames(self):
4544     self._ExpandAndLockInstance()
4545     self.needed_locks[locking.LEVEL_NODE] = []
4546     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4547
4548
4549   def DeclareLocks(self, level):
4550     if level == locking.LEVEL_NODE:
4551       self._LockInstancesNodes()
4552
4553   def BuildHooksEnv(self):
4554     """Build hooks env.
4555
4556     This runs on the master, primary and secondaries.
4557
4558     """
4559     args = dict()
4560     if constants.BE_MEMORY in self.be_new:
4561       args['memory'] = self.be_new[constants.BE_MEMORY]
4562     if constants.BE_VCPUS in self.be_new:
4563       args['vcpus'] = self.be_new[constants.BE_VCPUS]
4564     if self.do_ip or self.do_bridge or self.mac:
4565       if self.do_ip:
4566         ip = self.ip
4567       else:
4568         ip = self.instance.nics[0].ip
4569       if self.bridge:
4570         bridge = self.bridge
4571       else:
4572         bridge = self.instance.nics[0].bridge
4573       if self.mac:
4574         mac = self.mac
4575       else:
4576         mac = self.instance.nics[0].mac
4577       args['nics'] = [(ip, bridge, mac)]
4578     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4579     nl = [self.cfg.GetMasterNode(),
4580           self.instance.primary_node] + list(self.instance.secondary_nodes)
4581     return env, nl, nl
4582
4583   def CheckPrereq(self):
4584     """Check prerequisites.
4585
4586     This only checks the instance list against the existing names.
4587
4588     """
4589     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4590     # a separate CheckArguments function, if we implement one, so the operation
4591     # can be aborted without waiting for any lock, should it have an error...
4592     self.ip = getattr(self.op, "ip", None)
4593     self.mac = getattr(self.op, "mac", None)
4594     self.bridge = getattr(self.op, "bridge", None)
4595     self.kernel_path = getattr(self.op, "kernel_path", None)
4596     self.initrd_path = getattr(self.op, "initrd_path", None)
4597     self.force = getattr(self.op, "force", None)
4598     all_parms = [self.ip, self.bridge, self.mac]
4599     if (all_parms.count(None) == len(all_parms) and
4600         not self.op.hvparams and
4601         not self.op.beparams):
4602       raise errors.OpPrereqError("No changes submitted")
4603     for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4604       val = self.op.beparams.get(item, None)
4605       if val is not None:
4606         try:
4607           val = int(val)
4608         except ValueError, err:
4609           raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4610         self.op.beparams[item] = val
4611     if self.ip is not None:
4612       self.do_ip = True
4613       if self.ip.lower() == "none":
4614         self.ip = None
4615       else:
4616         if not utils.IsValidIP(self.ip):
4617           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4618     else:
4619       self.do_ip = False
4620     self.do_bridge = (self.bridge is not None)
4621     if self.mac is not None:
4622       if self.cfg.IsMacInUse(self.mac):
4623         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4624                                    self.mac)
4625       if not utils.IsValidMac(self.mac):
4626         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4627
4628     # checking the new params on the primary/secondary nodes
4629
4630     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4631     assert self.instance is not None, \
4632       "Cannot retrieve locked instance %s" % self.op.instance_name
4633     pnode = self.instance.primary_node
4634     nodelist = [pnode]
4635     nodelist.extend(instance.secondary_nodes)
4636
4637     # hvparams processing
4638     if self.op.hvparams:
4639       i_hvdict = copy.deepcopy(instance.hvparams)
4640       for key, val in self.op.hvparams.iteritems():
4641         if val is None:
4642           try:
4643             del i_hvdict[key]
4644           except KeyError:
4645             pass
4646         else:
4647           i_hvdict[key] = val
4648       cluster = self.cfg.GetClusterInfo()
4649       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4650                                 i_hvdict)
4651       # local check
4652       hypervisor.GetHypervisor(
4653         instance.hypervisor).CheckParameterSyntax(hv_new)
4654       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4655       self.hv_new = hv_new # the new actual values
4656       self.hv_inst = i_hvdict # the new dict (without defaults)
4657     else:
4658       self.hv_new = self.hv_inst = {}
4659
4660     # beparams processing
4661     if self.op.beparams:
4662       i_bedict = copy.deepcopy(instance.beparams)
4663       for key, val in self.op.beparams.iteritems():
4664         if val is None:
4665           try:
4666             del i_bedict[key]
4667           except KeyError:
4668             pass
4669         else:
4670           i_bedict[key] = val
4671       cluster = self.cfg.GetClusterInfo()
4672       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4673                                 i_bedict)
4674       self.be_new = be_new # the new actual values
4675       self.be_inst = i_bedict # the new dict (without defaults)
4676     else:
4677       self.hv_new = self.hv_inst = {}
4678
4679     self.warn = []
4680
4681     if constants.BE_MEMORY in self.op.beparams and not self.force:
4682       mem_check_list = [pnode]
4683       if be_new[constants.BE_AUTO_BALANCE]:
4684         # either we changed auto_balance to yes or it was from before
4685         mem_check_list.extend(instance.secondary_nodes)
4686       instance_info = self.rpc.call_instance_info(pnode, instance.name,
4687                                                   instance.hypervisor)
4688       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4689                                          instance.hypervisor)
4690
4691       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4692         # Assume the primary node is unreachable and go ahead
4693         self.warn.append("Can't get info from primary node %s" % pnode)
4694       else:
4695         if instance_info:
4696           current_mem = instance_info['memory']
4697         else:
4698           # Assume instance not running
4699           # (there is a slight race condition here, but it's not very probable,
4700           # and we have no other way to check)
4701           current_mem = 0
4702         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4703                     nodeinfo[pnode]['memory_free'])
4704         if miss_mem > 0:
4705           raise errors.OpPrereqError("This change will prevent the instance"
4706                                      " from starting, due to %d MB of memory"
4707                                      " missing on its primary node" % miss_mem)
4708
4709       if be_new[constants.BE_AUTO_BALANCE]:
4710         for node in instance.secondary_nodes:
4711           if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4712             self.warn.append("Can't get info from secondary node %s" % node)
4713           elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4714             self.warn.append("Not enough memory to failover instance to"
4715                              " secondary node %s" % node)
4716
4717     return
4718
4719   def Exec(self, feedback_fn):
4720     """Modifies an instance.
4721
4722     All parameters take effect only at the next restart of the instance.
4723     """
4724     # Process here the warnings from CheckPrereq, as we don't have a
4725     # feedback_fn there.
4726     for warn in self.warn:
4727       feedback_fn("WARNING: %s" % warn)
4728
4729     result = []
4730     instance = self.instance
4731     if self.do_ip:
4732       instance.nics[0].ip = self.ip
4733       result.append(("ip", self.ip))
4734     if self.bridge:
4735       instance.nics[0].bridge = self.bridge
4736       result.append(("bridge", self.bridge))
4737     if self.mac:
4738       instance.nics[0].mac = self.mac
4739       result.append(("mac", self.mac))
4740     if self.op.hvparams:
4741       instance.hvparams = self.hv_new
4742       for key, val in self.op.hvparams.iteritems():
4743         result.append(("hv/%s" % key, val))
4744     if self.op.beparams:
4745       instance.beparams = self.be_inst
4746       for key, val in self.op.beparams.iteritems():
4747         result.append(("be/%s" % key, val))
4748
4749     self.cfg.Update(instance)
4750
4751     return result
4752
4753
4754 class LUQueryExports(NoHooksLU):
4755   """Query the exports list
4756
4757   """
4758   _OP_REQP = ['nodes']
4759   REQ_BGL = False
4760
4761   def ExpandNames(self):
4762     self.needed_locks = {}
4763     self.share_locks[locking.LEVEL_NODE] = 1
4764     if not self.op.nodes:
4765       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4766     else:
4767       self.needed_locks[locking.LEVEL_NODE] = \
4768         _GetWantedNodes(self, self.op.nodes)
4769
4770   def CheckPrereq(self):
4771     """Check prerequisites.
4772
4773     """
4774     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4775
4776   def Exec(self, feedback_fn):
4777     """Compute the list of all the exported system images.
4778
4779     Returns:
4780       a dictionary with the structure node->(export-list)
4781       where export-list is a list of the instances exported on
4782       that node.
4783
4784     """
4785     return self.rpc.call_export_list(self.nodes)
4786
4787
4788 class LUExportInstance(LogicalUnit):
4789   """Export an instance to an image in the cluster.
4790
4791   """
4792   HPATH = "instance-export"
4793   HTYPE = constants.HTYPE_INSTANCE
4794   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4795   REQ_BGL = False
4796
4797   def ExpandNames(self):
4798     self._ExpandAndLockInstance()
4799     # FIXME: lock only instance primary and destination node
4800     #
4801     # Sad but true, for now we have do lock all nodes, as we don't know where
4802     # the previous export might be, and and in this LU we search for it and
4803     # remove it from its current node. In the future we could fix this by:
4804     #  - making a tasklet to search (share-lock all), then create the new one,
4805     #    then one to remove, after
4806     #  - removing the removal operation altoghether
4807     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4808
4809   def DeclareLocks(self, level):
4810     """Last minute lock declaration."""
4811     # All nodes are locked anyway, so nothing to do here.
4812
4813   def BuildHooksEnv(self):
4814     """Build hooks env.
4815
4816     This will run on the master, primary node and target node.
4817
4818     """
4819     env = {
4820       "EXPORT_NODE": self.op.target_node,
4821       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4822       }
4823     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4824     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4825           self.op.target_node]
4826     return env, nl, nl
4827
4828   def CheckPrereq(self):
4829     """Check prerequisites.
4830
4831     This checks that the instance and node names are valid.
4832
4833     """
4834     instance_name = self.op.instance_name
4835     self.instance = self.cfg.GetInstanceInfo(instance_name)
4836     assert self.instance is not None, \
4837           "Cannot retrieve locked instance %s" % self.op.instance_name
4838
4839     self.dst_node = self.cfg.GetNodeInfo(
4840       self.cfg.ExpandNodeName(self.op.target_node))
4841
4842     assert self.dst_node is not None, \
4843           "Cannot retrieve locked node %s" % self.op.target_node
4844
4845     # instance disk type verification
4846     for disk in self.instance.disks:
4847       if disk.dev_type == constants.LD_FILE:
4848         raise errors.OpPrereqError("Export not supported for instances with"
4849                                    " file-based disks")
4850
4851   def Exec(self, feedback_fn):
4852     """Export an instance to an image in the cluster.
4853
4854     """
4855     instance = self.instance
4856     dst_node = self.dst_node
4857     src_node = instance.primary_node
4858     if self.op.shutdown:
4859       # shutdown the instance, but not the disks
4860       if not self.rpc.call_instance_shutdown(src_node, instance):
4861         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4862                                  (instance.name, src_node))
4863
4864     vgname = self.cfg.GetVGName()
4865
4866     snap_disks = []
4867
4868     try:
4869       for disk in instance.disks:
4870         if disk.iv_name == "sda":
4871           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4872           new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4873
4874           if not new_dev_name:
4875             logging.error("Could not snapshot block device %s on node %s",
4876                           disk.logical_id[1], src_node)
4877           else:
4878             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4879                                       logical_id=(vgname, new_dev_name),
4880                                       physical_id=(vgname, new_dev_name),
4881                                       iv_name=disk.iv_name)
4882             snap_disks.append(new_dev)
4883
4884     finally:
4885       if self.op.shutdown and instance.status == "up":
4886         if not self.rpc.call_instance_start(src_node, instance, None):
4887           _ShutdownInstanceDisks(self, instance)
4888           raise errors.OpExecError("Could not start instance")
4889
4890     # TODO: check for size
4891
4892     cluster_name = self.cfg.GetClusterName()
4893     for dev in snap_disks:
4894       if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4895                                       instance, cluster_name):
4896         logging.error("Could not export block device %s from node %s to"
4897                       " node %s", dev.logical_id[1], src_node, dst_node.name)
4898       if not self.rpc.call_blockdev_remove(src_node, dev):
4899         logging.error("Could not remove snapshot block device %s from node"
4900                       " %s", dev.logical_id[1], src_node)
4901
4902     if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4903       logging.error("Could not finalize export for instance %s on node %s",
4904                     instance.name, dst_node.name)
4905
4906     nodelist = self.cfg.GetNodeList()
4907     nodelist.remove(dst_node.name)
4908
4909     # on one-node clusters nodelist will be empty after the removal
4910     # if we proceed the backup would be removed because OpQueryExports
4911     # substitutes an empty list with the full cluster node list.
4912     if nodelist:
4913       exportlist = self.rpc.call_export_list(nodelist)
4914       for node in exportlist:
4915         if instance.name in exportlist[node]:
4916           if not self.rpc.call_export_remove(node, instance.name):
4917             logging.error("Could not remove older export for instance %s"
4918                           " on node %s", instance.name, node)
4919
4920
4921 class LURemoveExport(NoHooksLU):
4922   """Remove exports related to the named instance.
4923
4924   """
4925   _OP_REQP = ["instance_name"]
4926   REQ_BGL = False
4927
4928   def ExpandNames(self):
4929     self.needed_locks = {}
4930     # We need all nodes to be locked in order for RemoveExport to work, but we
4931     # don't need to lock the instance itself, as nothing will happen to it (and
4932     # we can remove exports also for a removed instance)
4933     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4934
4935   def CheckPrereq(self):
4936     """Check prerequisites.
4937     """
4938     pass
4939
4940   def Exec(self, feedback_fn):
4941     """Remove any export.
4942
4943     """
4944     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4945     # If the instance was not found we'll try with the name that was passed in.
4946     # This will only work if it was an FQDN, though.
4947     fqdn_warn = False
4948     if not instance_name:
4949       fqdn_warn = True
4950       instance_name = self.op.instance_name
4951
4952     exportlist = self.rpc.call_export_list(self.acquired_locks[
4953       locking.LEVEL_NODE])
4954     found = False
4955     for node in exportlist:
4956       if instance_name in exportlist[node]:
4957         found = True
4958         if not self.rpc.call_export_remove(node, instance_name):
4959           logging.error("Could not remove export for instance %s"
4960                         " on node %s", instance_name, node)
4961
4962     if fqdn_warn and not found:
4963       feedback_fn("Export not found. If trying to remove an export belonging"
4964                   " to a deleted instance please use its Fully Qualified"
4965                   " Domain Name.")
4966
4967
4968 class TagsLU(NoHooksLU):
4969   """Generic tags LU.
4970
4971   This is an abstract class which is the parent of all the other tags LUs.
4972
4973   """
4974
4975   def ExpandNames(self):
4976     self.needed_locks = {}
4977     if self.op.kind == constants.TAG_NODE:
4978       name = self.cfg.ExpandNodeName(self.op.name)
4979       if name is None:
4980         raise errors.OpPrereqError("Invalid node name (%s)" %
4981                                    (self.op.name,))
4982       self.op.name = name
4983       self.needed_locks[locking.LEVEL_NODE] = name
4984     elif self.op.kind == constants.TAG_INSTANCE:
4985       name = self.cfg.ExpandInstanceName(self.op.name)
4986       if name is None:
4987         raise errors.OpPrereqError("Invalid instance name (%s)" %
4988                                    (self.op.name,))
4989       self.op.name = name
4990       self.needed_locks[locking.LEVEL_INSTANCE] = name
4991
4992   def CheckPrereq(self):
4993     """Check prerequisites.
4994
4995     """
4996     if self.op.kind == constants.TAG_CLUSTER:
4997       self.target = self.cfg.GetClusterInfo()
4998     elif self.op.kind == constants.TAG_NODE:
4999       self.target = self.cfg.GetNodeInfo(self.op.name)
5000     elif self.op.kind == constants.TAG_INSTANCE:
5001       self.target = self.cfg.GetInstanceInfo(self.op.name)
5002     else:
5003       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5004                                  str(self.op.kind))
5005
5006
5007 class LUGetTags(TagsLU):
5008   """Returns the tags of a given object.
5009
5010   """
5011   _OP_REQP = ["kind", "name"]
5012   REQ_BGL = False
5013
5014   def Exec(self, feedback_fn):
5015     """Returns the tag list.
5016
5017     """
5018     return list(self.target.GetTags())
5019
5020
5021 class LUSearchTags(NoHooksLU):
5022   """Searches the tags for a given pattern.
5023
5024   """
5025   _OP_REQP = ["pattern"]
5026   REQ_BGL = False
5027
5028   def ExpandNames(self):
5029     self.needed_locks = {}
5030
5031   def CheckPrereq(self):
5032     """Check prerequisites.
5033
5034     This checks the pattern passed for validity by compiling it.
5035
5036     """
5037     try:
5038       self.re = re.compile(self.op.pattern)
5039     except re.error, err:
5040       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5041                                  (self.op.pattern, err))
5042
5043   def Exec(self, feedback_fn):
5044     """Returns the tag list.
5045
5046     """
5047     cfg = self.cfg
5048     tgts = [("/cluster", cfg.GetClusterInfo())]
5049     ilist = cfg.GetAllInstancesInfo().values()
5050     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5051     nlist = cfg.GetAllNodesInfo().values()
5052     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5053     results = []
5054     for path, target in tgts:
5055       for tag in target.GetTags():
5056         if self.re.search(tag):
5057           results.append((path, tag))
5058     return results
5059
5060
5061 class LUAddTags(TagsLU):
5062   """Sets a tag on a given object.
5063
5064   """
5065   _OP_REQP = ["kind", "name", "tags"]
5066   REQ_BGL = False
5067
5068   def CheckPrereq(self):
5069     """Check prerequisites.
5070
5071     This checks the type and length of the tag name and value.
5072
5073     """
5074     TagsLU.CheckPrereq(self)
5075     for tag in self.op.tags:
5076       objects.TaggableObject.ValidateTag(tag)
5077
5078   def Exec(self, feedback_fn):
5079     """Sets the tag.
5080
5081     """
5082     try:
5083       for tag in self.op.tags:
5084         self.target.AddTag(tag)
5085     except errors.TagError, err:
5086       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5087     try:
5088       self.cfg.Update(self.target)
5089     except errors.ConfigurationError:
5090       raise errors.OpRetryError("There has been a modification to the"
5091                                 " config file and the operation has been"
5092                                 " aborted. Please retry.")
5093
5094
5095 class LUDelTags(TagsLU):
5096   """Delete a list of tags from a given object.
5097
5098   """
5099   _OP_REQP = ["kind", "name", "tags"]
5100   REQ_BGL = False
5101
5102   def CheckPrereq(self):
5103     """Check prerequisites.
5104
5105     This checks that we have the given tag.
5106
5107     """
5108     TagsLU.CheckPrereq(self)
5109     for tag in self.op.tags:
5110       objects.TaggableObject.ValidateTag(tag)
5111     del_tags = frozenset(self.op.tags)
5112     cur_tags = self.target.GetTags()
5113     if not del_tags <= cur_tags:
5114       diff_tags = del_tags - cur_tags
5115       diff_names = ["'%s'" % tag for tag in diff_tags]
5116       diff_names.sort()
5117       raise errors.OpPrereqError("Tag(s) %s not found" %
5118                                  (",".join(diff_names)))
5119
5120   def Exec(self, feedback_fn):
5121     """Remove the tag from the object.
5122
5123     """
5124     for tag in self.op.tags:
5125       self.target.RemoveTag(tag)
5126     try:
5127       self.cfg.Update(self.target)
5128     except errors.ConfigurationError:
5129       raise errors.OpRetryError("There has been a modification to the"
5130                                 " config file and the operation has been"
5131                                 " aborted. Please retry.")
5132
5133
5134 class LUTestDelay(NoHooksLU):
5135   """Sleep for a specified amount of time.
5136
5137   This LU sleeps on the master and/or nodes for a specified amount of
5138   time.
5139
5140   """
5141   _OP_REQP = ["duration", "on_master", "on_nodes"]
5142   REQ_BGL = False
5143
5144   def ExpandNames(self):
5145     """Expand names and set required locks.
5146
5147     This expands the node list, if any.
5148
5149     """
5150     self.needed_locks = {}
5151     if self.op.on_nodes:
5152       # _GetWantedNodes can be used here, but is not always appropriate to use
5153       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5154       # more information.
5155       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5156       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5157
5158   def CheckPrereq(self):
5159     """Check prerequisites.
5160
5161     """
5162
5163   def Exec(self, feedback_fn):
5164     """Do the actual sleep.
5165
5166     """
5167     if self.op.on_master:
5168       if not utils.TestDelay(self.op.duration):
5169         raise errors.OpExecError("Error during master delay test")
5170     if self.op.on_nodes:
5171       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5172       if not result:
5173         raise errors.OpExecError("Complete failure from rpc call")
5174       for node, node_result in result.items():
5175         if not node_result:
5176           raise errors.OpExecError("Failure during rpc call to node %s,"
5177                                    " result: %s" % (node, node_result))
5178
5179
5180 class IAllocator(object):
5181   """IAllocator framework.
5182
5183   An IAllocator instance has three sets of attributes:
5184     - cfg that is needed to query the cluster
5185     - input data (all members of the _KEYS class attribute are required)
5186     - four buffer attributes (in|out_data|text), that represent the
5187       input (to the external script) in text and data structure format,
5188       and the output from it, again in two formats
5189     - the result variables from the script (success, info, nodes) for
5190       easy usage
5191
5192   """
5193   _ALLO_KEYS = [
5194     "mem_size", "disks", "disk_template",
5195     "os", "tags", "nics", "vcpus",
5196     ]
5197   _RELO_KEYS = [
5198     "relocate_from",
5199     ]
5200
5201   def __init__(self, lu, mode, name, **kwargs):
5202     self.lu = lu
5203     # init buffer variables
5204     self.in_text = self.out_text = self.in_data = self.out_data = None
5205     # init all input fields so that pylint is happy
5206     self.mode = mode
5207     self.name = name
5208     self.mem_size = self.disks = self.disk_template = None
5209     self.os = self.tags = self.nics = self.vcpus = None
5210     self.relocate_from = None
5211     # computed fields
5212     self.required_nodes = None
5213     # init result fields
5214     self.success = self.info = self.nodes = None
5215     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5216       keyset = self._ALLO_KEYS
5217     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5218       keyset = self._RELO_KEYS
5219     else:
5220       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5221                                    " IAllocator" % self.mode)
5222     for key in kwargs:
5223       if key not in keyset:
5224         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5225                                      " IAllocator" % key)
5226       setattr(self, key, kwargs[key])
5227     for key in keyset:
5228       if key not in kwargs:
5229         raise errors.ProgrammerError("Missing input parameter '%s' to"
5230                                      " IAllocator" % key)
5231     self._BuildInputData()
5232
5233   def _ComputeClusterData(self):
5234     """Compute the generic allocator input data.
5235
5236     This is the data that is independent of the actual operation.
5237
5238     """
5239     cfg = self.lu.cfg
5240     cluster_info = cfg.GetClusterInfo()
5241     # cluster data
5242     data = {
5243       "version": 1,
5244       "cluster_name": cfg.GetClusterName(),
5245       "cluster_tags": list(cluster_info.GetTags()),
5246       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5247       # we don't have job IDs
5248       }
5249
5250     i_list = []
5251     cluster = self.cfg.GetClusterInfo()
5252     for iname in cfg.GetInstanceList():
5253       i_obj = cfg.GetInstanceInfo(iname)
5254       i_list.append((i_obj, cluster.FillBE(i_obj)))
5255
5256     # node data
5257     node_results = {}
5258     node_list = cfg.GetNodeList()
5259     # FIXME: here we have only one hypervisor information, but
5260     # instance can belong to different hypervisors
5261     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5262                                            cfg.GetHypervisorType())
5263     for nname in node_list:
5264       ninfo = cfg.GetNodeInfo(nname)
5265       if nname not in node_data or not isinstance(node_data[nname], dict):
5266         raise errors.OpExecError("Can't get data for node %s" % nname)
5267       remote_info = node_data[nname]
5268       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5269                    'vg_size', 'vg_free', 'cpu_total']:
5270         if attr not in remote_info:
5271           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5272                                    (nname, attr))
5273         try:
5274           remote_info[attr] = int(remote_info[attr])
5275         except ValueError, err:
5276           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5277                                    " %s" % (nname, attr, str(err)))
5278       # compute memory used by primary instances
5279       i_p_mem = i_p_up_mem = 0
5280       for iinfo, beinfo in i_list:
5281         if iinfo.primary_node == nname:
5282           i_p_mem += beinfo[constants.BE_MEMORY]
5283           if iinfo.status == "up":
5284             i_p_up_mem += beinfo[constants.BE_MEMORY]
5285
5286       # compute memory used by instances
5287       pnr = {
5288         "tags": list(ninfo.GetTags()),
5289         "total_memory": remote_info['memory_total'],
5290         "reserved_memory": remote_info['memory_dom0'],
5291         "free_memory": remote_info['memory_free'],
5292         "i_pri_memory": i_p_mem,
5293         "i_pri_up_memory": i_p_up_mem,
5294         "total_disk": remote_info['vg_size'],
5295         "free_disk": remote_info['vg_free'],
5296         "primary_ip": ninfo.primary_ip,
5297         "secondary_ip": ninfo.secondary_ip,
5298         "total_cpus": remote_info['cpu_total'],
5299         }
5300       node_results[nname] = pnr
5301     data["nodes"] = node_results
5302
5303     # instance data
5304     instance_data = {}
5305     for iinfo, beinfo in i_list:
5306       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5307                   for n in iinfo.nics]
5308       pir = {
5309         "tags": list(iinfo.GetTags()),
5310         "should_run": iinfo.status == "up",
5311         "vcpus": beinfo[constants.BE_VCPUS],
5312         "memory": beinfo[constants.BE_MEMORY],
5313         "os": iinfo.os,
5314         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5315         "nics": nic_data,
5316         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5317         "disk_template": iinfo.disk_template,
5318         "hypervisor": iinfo.hypervisor,
5319         }
5320       instance_data[iinfo.name] = pir
5321
5322     data["instances"] = instance_data
5323
5324     self.in_data = data
5325
5326   def _AddNewInstance(self):
5327     """Add new instance data to allocator structure.
5328
5329     This in combination with _AllocatorGetClusterData will create the
5330     correct structure needed as input for the allocator.
5331
5332     The checks for the completeness of the opcode must have already been
5333     done.
5334
5335     """
5336     data = self.in_data
5337     if len(self.disks) != 2:
5338       raise errors.OpExecError("Only two-disk configurations supported")
5339
5340     disk_space = _ComputeDiskSize(self.disk_template,
5341                                   self.disks[0]["size"], self.disks[1]["size"])
5342
5343     if self.disk_template in constants.DTS_NET_MIRROR:
5344       self.required_nodes = 2
5345     else:
5346       self.required_nodes = 1
5347     request = {
5348       "type": "allocate",
5349       "name": self.name,
5350       "disk_template": self.disk_template,
5351       "tags": self.tags,
5352       "os": self.os,
5353       "vcpus": self.vcpus,
5354       "memory": self.mem_size,
5355       "disks": self.disks,
5356       "disk_space_total": disk_space,
5357       "nics": self.nics,
5358       "required_nodes": self.required_nodes,
5359       }
5360     data["request"] = request
5361
5362   def _AddRelocateInstance(self):
5363     """Add relocate instance data to allocator structure.
5364
5365     This in combination with _IAllocatorGetClusterData will create the
5366     correct structure needed as input for the allocator.
5367
5368     The checks for the completeness of the opcode must have already been
5369     done.
5370
5371     """
5372     instance = self.lu.cfg.GetInstanceInfo(self.name)
5373     if instance is None:
5374       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5375                                    " IAllocator" % self.name)
5376
5377     if instance.disk_template not in constants.DTS_NET_MIRROR:
5378       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5379
5380     if len(instance.secondary_nodes) != 1:
5381       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5382
5383     self.required_nodes = 1
5384
5385     disk_space = _ComputeDiskSize(instance.disk_template,
5386                                   instance.disks[0].size,
5387                                   instance.disks[1].size)
5388
5389     request = {
5390       "type": "relocate",
5391       "name": self.name,
5392       "disk_space_total": disk_space,
5393       "required_nodes": self.required_nodes,
5394       "relocate_from": self.relocate_from,
5395       }
5396     self.in_data["request"] = request
5397
5398   def _BuildInputData(self):
5399     """Build input data structures.
5400
5401     """
5402     self._ComputeClusterData()
5403
5404     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5405       self._AddNewInstance()
5406     else:
5407       self._AddRelocateInstance()
5408
5409     self.in_text = serializer.Dump(self.in_data)
5410
5411   def Run(self, name, validate=True, call_fn=None):
5412     """Run an instance allocator and return the results.
5413
5414     """
5415     if call_fn is None:
5416       call_fn = self.lu.rpc.call_iallocator_runner
5417     data = self.in_text
5418
5419     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5420
5421     if not isinstance(result, (list, tuple)) or len(result) != 4:
5422       raise errors.OpExecError("Invalid result from master iallocator runner")
5423
5424     rcode, stdout, stderr, fail = result
5425
5426     if rcode == constants.IARUN_NOTFOUND:
5427       raise errors.OpExecError("Can't find allocator '%s'" % name)
5428     elif rcode == constants.IARUN_FAILURE:
5429       raise errors.OpExecError("Instance allocator call failed: %s,"
5430                                " output: %s" % (fail, stdout+stderr))
5431     self.out_text = stdout
5432     if validate:
5433       self._ValidateResult()
5434
5435   def _ValidateResult(self):
5436     """Process the allocator results.
5437
5438     This will process and if successful save the result in
5439     self.out_data and the other parameters.
5440
5441     """
5442     try:
5443       rdict = serializer.Load(self.out_text)
5444     except Exception, err:
5445       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5446
5447     if not isinstance(rdict, dict):
5448       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5449
5450     for key in "success", "info", "nodes":
5451       if key not in rdict:
5452         raise errors.OpExecError("Can't parse iallocator results:"
5453                                  " missing key '%s'" % key)
5454       setattr(self, key, rdict[key])
5455
5456     if not isinstance(rdict["nodes"], list):
5457       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5458                                " is not a list")
5459     self.out_data = rdict
5460
5461
5462 class LUTestAllocator(NoHooksLU):
5463   """Run allocator tests.
5464
5465   This LU runs the allocator tests
5466
5467   """
5468   _OP_REQP = ["direction", "mode", "name"]
5469
5470   def CheckPrereq(self):
5471     """Check prerequisites.
5472
5473     This checks the opcode parameters depending on the director and mode test.
5474
5475     """
5476     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5477       for attr in ["name", "mem_size", "disks", "disk_template",
5478                    "os", "tags", "nics", "vcpus"]:
5479         if not hasattr(self.op, attr):
5480           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5481                                      attr)
5482       iname = self.cfg.ExpandInstanceName(self.op.name)
5483       if iname is not None:
5484         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5485                                    iname)
5486       if not isinstance(self.op.nics, list):
5487         raise errors.OpPrereqError("Invalid parameter 'nics'")
5488       for row in self.op.nics:
5489         if (not isinstance(row, dict) or
5490             "mac" not in row or
5491             "ip" not in row or
5492             "bridge" not in row):
5493           raise errors.OpPrereqError("Invalid contents of the"
5494                                      " 'nics' parameter")
5495       if not isinstance(self.op.disks, list):
5496         raise errors.OpPrereqError("Invalid parameter 'disks'")
5497       if len(self.op.disks) != 2:
5498         raise errors.OpPrereqError("Only two-disk configurations supported")
5499       for row in self.op.disks:
5500         if (not isinstance(row, dict) or
5501             "size" not in row or
5502             not isinstance(row["size"], int) or
5503             "mode" not in row or
5504             row["mode"] not in ['r', 'w']):
5505           raise errors.OpPrereqError("Invalid contents of the"
5506                                      " 'disks' parameter")
5507     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5508       if not hasattr(self.op, "name"):
5509         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5510       fname = self.cfg.ExpandInstanceName(self.op.name)
5511       if fname is None:
5512         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5513                                    self.op.name)
5514       self.op.name = fname
5515       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5516     else:
5517       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5518                                  self.op.mode)
5519
5520     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5521       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5522         raise errors.OpPrereqError("Missing allocator name")
5523     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5524       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5525                                  self.op.direction)
5526
5527   def Exec(self, feedback_fn):
5528     """Run the allocator test.
5529
5530     """
5531     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5532       ial = IAllocator(self,
5533                        mode=self.op.mode,
5534                        name=self.op.name,
5535                        mem_size=self.op.mem_size,
5536                        disks=self.op.disks,
5537                        disk_template=self.op.disk_template,
5538                        os=self.op.os,
5539                        tags=self.op.tags,
5540                        nics=self.op.nics,
5541                        vcpus=self.op.vcpus,
5542                        )
5543     else:
5544       ial = IAllocator(self,
5545                        mode=self.op.mode,
5546                        name=self.op.name,
5547                        relocate_from=list(self.relocate_from),
5548                        )
5549
5550     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5551       result = ial.in_text
5552     else:
5553       ial.Run(self.op.allocator, validate=False)
5554       result = ial.out_text
5555     return result