Add a FieldSet class for variable parameter sets
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import itertools
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_MASTER: the LU needs to run on the master node
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_MASTER = True
68   REQ_BGL = True
69
70   def __init__(self, processor, op, context, rpc):
71     """Constructor for LogicalUnit.
72
73     This needs to be overriden in derived classes in order to check op
74     validity.
75
76     """
77     self.proc = processor
78     self.op = op
79     self.cfg = context.cfg
80     self.context = context
81     self.rpc = rpc
82     # Dicts used to declare locking needs to mcpu
83     self.needed_locks = None
84     self.acquired_locks = {}
85     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86     self.add_locks = {}
87     self.remove_locks = {}
88     # Used to force good behavior when calling helper functions
89     self.recalculate_locks = {}
90     self.__ssh = None
91     # logging
92     self.LogWarning = processor.LogWarning
93     self.LogInfo = processor.LogInfo
94
95     for attr_name in self._OP_REQP:
96       attr_val = getattr(op, attr_name, None)
97       if attr_val is None:
98         raise errors.OpPrereqError("Required parameter '%s' missing" %
99                                    attr_name)
100
101     if not self.cfg.IsCluster():
102       raise errors.OpPrereqError("Cluster not initialized yet,"
103                                  " use 'gnt-cluster init' first.")
104     if self.REQ_MASTER:
105       master = self.cfg.GetMasterNode()
106       if master != utils.HostInfo().name:
107         raise errors.OpPrereqError("Commands must be run on the master"
108                                    " node %s" % master)
109
110   def __GetSSH(self):
111     """Returns the SshRunner object
112
113     """
114     if not self.__ssh:
115       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
116     return self.__ssh
117
118   ssh = property(fget=__GetSSH)
119
120   def ExpandNames(self):
121     """Expand names for this LU.
122
123     This method is called before starting to execute the opcode, and it should
124     update all the parameters of the opcode to their canonical form (e.g. a
125     short node name must be fully expanded after this method has successfully
126     completed). This way locking, hooks, logging, ecc. can work correctly.
127
128     LUs which implement this method must also populate the self.needed_locks
129     member, as a dict with lock levels as keys, and a list of needed lock names
130     as values. Rules:
131       - Use an empty dict if you don't need any lock
132       - If you don't need any lock at a particular level omit that level
133       - Don't put anything for the BGL level
134       - If you want all locks at a level use locking.ALL_SET as a value
135
136     If you need to share locks (rather than acquire them exclusively) at one
137     level you can modify self.share_locks, setting a true value (usually 1) for
138     that level. By default locks are not shared.
139
140     Examples:
141     # Acquire all nodes and one instance
142     self.needed_locks = {
143       locking.LEVEL_NODE: locking.ALL_SET,
144       locking.LEVEL_INSTANCE: ['instance1.example.tld'],
145     }
146     # Acquire just two nodes
147     self.needed_locks = {
148       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
149     }
150     # Acquire no locks
151     self.needed_locks = {} # No, you can't leave it to the default value None
152
153     """
154     # The implementation of this method is mandatory only if the new LU is
155     # concurrent, so that old LUs don't need to be changed all at the same
156     # time.
157     if self.REQ_BGL:
158       self.needed_locks = {} # Exclusive LUs don't need locks.
159     else:
160       raise NotImplementedError
161
162   def DeclareLocks(self, level):
163     """Declare LU locking needs for a level
164
165     While most LUs can just declare their locking needs at ExpandNames time,
166     sometimes there's the need to calculate some locks after having acquired
167     the ones before. This function is called just before acquiring locks at a
168     particular level, but after acquiring the ones at lower levels, and permits
169     such calculations. It can be used to modify self.needed_locks, and by
170     default it does nothing.
171
172     This function is only called if you have something already set in
173     self.needed_locks for the level.
174
175     @param level: Locking level which is going to be locked
176     @type level: member of ganeti.locking.LEVELS
177
178     """
179
180   def CheckPrereq(self):
181     """Check prerequisites for this LU.
182
183     This method should check that the prerequisites for the execution
184     of this LU are fulfilled. It can do internode communication, but
185     it should be idempotent - no cluster or system changes are
186     allowed.
187
188     The method should raise errors.OpPrereqError in case something is
189     not fulfilled. Its return value is ignored.
190
191     This method should also update all the parameters of the opcode to
192     their canonical form if it hasn't been done by ExpandNames before.
193
194     """
195     raise NotImplementedError
196
197   def Exec(self, feedback_fn):
198     """Execute the LU.
199
200     This method should implement the actual work. It should raise
201     errors.OpExecError for failures that are somewhat dealt with in
202     code, or expected.
203
204     """
205     raise NotImplementedError
206
207   def BuildHooksEnv(self):
208     """Build hooks environment for this LU.
209
210     This method should return a three-node tuple consisting of: a dict
211     containing the environment that will be used for running the
212     specific hook for this LU, a list of node names on which the hook
213     should run before the execution, and a list of node names on which
214     the hook should run after the execution.
215
216     The keys of the dict must not have 'GANETI_' prefixed as this will
217     be handled in the hooks runner. Also note additional keys will be
218     added by the hooks runner. If the LU doesn't define any
219     environment, an empty dict (and not None) should be returned.
220
221     No nodes should be returned as an empty list (and not None).
222
223     Note that if the HPATH for a LU class is None, this function will
224     not be called.
225
226     """
227     raise NotImplementedError
228
229   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
230     """Notify the LU about the results of its hooks.
231
232     This method is called every time a hooks phase is executed, and notifies
233     the Logical Unit about the hooks' result. The LU can then use it to alter
234     its result based on the hooks.  By default the method does nothing and the
235     previous result is passed back unchanged but any LU can define it if it
236     wants to use the local cluster hook-scripts somehow.
237
238     Args:
239       phase: the hooks phase that has just been run
240       hooks_results: the results of the multi-node hooks rpc call
241       feedback_fn: function to send feedback back to the caller
242       lu_result: the previous result this LU had, or None in the PRE phase.
243
244     """
245     return lu_result
246
247   def _ExpandAndLockInstance(self):
248     """Helper function to expand and lock an instance.
249
250     Many LUs that work on an instance take its name in self.op.instance_name
251     and need to expand it and then declare the expanded name for locking. This
252     function does it, and then updates self.op.instance_name to the expanded
253     name. It also initializes needed_locks as a dict, if this hasn't been done
254     before.
255
256     """
257     if self.needed_locks is None:
258       self.needed_locks = {}
259     else:
260       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
261         "_ExpandAndLockInstance called with instance-level locks set"
262     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
263     if expanded_name is None:
264       raise errors.OpPrereqError("Instance '%s' not known" %
265                                   self.op.instance_name)
266     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
267     self.op.instance_name = expanded_name
268
269   def _LockInstancesNodes(self, primary_only=False):
270     """Helper function to declare instances' nodes for locking.
271
272     This function should be called after locking one or more instances to lock
273     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
274     with all primary or secondary nodes for instances already locked and
275     present in self.needed_locks[locking.LEVEL_INSTANCE].
276
277     It should be called from DeclareLocks, and for safety only works if
278     self.recalculate_locks[locking.LEVEL_NODE] is set.
279
280     In the future it may grow parameters to just lock some instance's nodes, or
281     to just lock primaries or secondary nodes, if needed.
282
283     If should be called in DeclareLocks in a way similar to:
284
285     if level == locking.LEVEL_NODE:
286       self._LockInstancesNodes()
287
288     @type primary_only: boolean
289     @param primary_only: only lock primary nodes of locked instances
290
291     """
292     assert locking.LEVEL_NODE in self.recalculate_locks, \
293       "_LockInstancesNodes helper function called with no nodes to recalculate"
294
295     # TODO: check if we're really been called with the instance locks held
296
297     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
298     # future we might want to have different behaviors depending on the value
299     # of self.recalculate_locks[locking.LEVEL_NODE]
300     wanted_nodes = []
301     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
302       instance = self.context.cfg.GetInstanceInfo(instance_name)
303       wanted_nodes.append(instance.primary_node)
304       if not primary_only:
305         wanted_nodes.extend(instance.secondary_nodes)
306
307     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
308       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
309     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
310       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
311
312     del self.recalculate_locks[locking.LEVEL_NODE]
313
314
315 class NoHooksLU(LogicalUnit):
316   """Simple LU which runs no hooks.
317
318   This LU is intended as a parent for other LogicalUnits which will
319   run no hooks, in order to reduce duplicate code.
320
321   """
322   HPATH = None
323   HTYPE = None
324
325
326 class _FieldSet(object):
327   """A simple field set.
328
329   Among the features are:
330     - checking if a string is among a list of static string or regex objects
331     - checking if a whole list of string matches
332     - returning the matching groups from a regex match
333
334   Internally, all fields are held as regular expression objects.
335
336   """
337   def __init__(self, *items):
338     self.items = [re.compile("^%s$" % value) for value in items]
339
340   def Extend(self, other_set):
341     """Extend the field set with the items from another one"""
342     self.items.extend(other_set.items)
343
344   def Matches(self, field):
345     """Checks if a field matches the current set
346
347     @type field: str
348     @param field: the string to match
349     @return: either False or a regular expression match object
350
351     """
352     for m in itertools.ifilter(None, (val.match(field) for val in self.items)):
353       return m
354     return False
355
356   def NonMatching(self, items):
357     """Returns the list of fields not matching the current set
358
359     @type items: list
360     @param items: the list of fields to check
361     @rtype: list
362     @return: list of non-matching fields
363
364     """
365     return [val for val in items if not self.Matches(val)]
366
367
368 def _GetWantedNodes(lu, nodes):
369   """Returns list of checked and expanded node names.
370
371   Args:
372     nodes: List of nodes (strings) or None for all
373
374   """
375   if not isinstance(nodes, list):
376     raise errors.OpPrereqError("Invalid argument type 'nodes'")
377
378   if not nodes:
379     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
380       " non-empty list of nodes whose name is to be expanded.")
381
382   wanted = []
383   for name in nodes:
384     node = lu.cfg.ExpandNodeName(name)
385     if node is None:
386       raise errors.OpPrereqError("No such node name '%s'" % name)
387     wanted.append(node)
388
389   return utils.NiceSort(wanted)
390
391
392 def _GetWantedInstances(lu, instances):
393   """Returns list of checked and expanded instance names.
394
395   Args:
396     instances: List of instances (strings) or None for all
397
398   """
399   if not isinstance(instances, list):
400     raise errors.OpPrereqError("Invalid argument type 'instances'")
401
402   if instances:
403     wanted = []
404
405     for name in instances:
406       instance = lu.cfg.ExpandInstanceName(name)
407       if instance is None:
408         raise errors.OpPrereqError("No such instance name '%s'" % name)
409       wanted.append(instance)
410
411   else:
412     wanted = lu.cfg.GetInstanceList()
413   return utils.NiceSort(wanted)
414
415
416 def _CheckOutputFields(static, dynamic, selected):
417   """Checks whether all selected fields are valid.
418
419   Args:
420     static: Static fields
421     dynamic: Dynamic fields
422
423   """
424   static_fields = frozenset(static)
425   dynamic_fields = frozenset(dynamic)
426
427   all_fields = static_fields | dynamic_fields
428
429   if not all_fields.issuperset(selected):
430     raise errors.OpPrereqError("Unknown output fields selected: %s"
431                                % ",".join(frozenset(selected).
432                                           difference(all_fields)))
433
434
435 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
436                           memory, vcpus, nics):
437   """Builds instance related env variables for hooks from single variables.
438
439   Args:
440     secondary_nodes: List of secondary nodes as strings
441   """
442   env = {
443     "OP_TARGET": name,
444     "INSTANCE_NAME": name,
445     "INSTANCE_PRIMARY": primary_node,
446     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
447     "INSTANCE_OS_TYPE": os_type,
448     "INSTANCE_STATUS": status,
449     "INSTANCE_MEMORY": memory,
450     "INSTANCE_VCPUS": vcpus,
451   }
452
453   if nics:
454     nic_count = len(nics)
455     for idx, (ip, bridge, mac) in enumerate(nics):
456       if ip is None:
457         ip = ""
458       env["INSTANCE_NIC%d_IP" % idx] = ip
459       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
460       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
461   else:
462     nic_count = 0
463
464   env["INSTANCE_NIC_COUNT"] = nic_count
465
466   return env
467
468
469 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
470   """Builds instance related env variables for hooks from an object.
471
472   Args:
473     instance: objects.Instance object of instance
474     override: dict of values to override
475   """
476   bep = lu.cfg.GetClusterInfo().FillBE(instance)
477   args = {
478     'name': instance.name,
479     'primary_node': instance.primary_node,
480     'secondary_nodes': instance.secondary_nodes,
481     'os_type': instance.os,
482     'status': instance.os,
483     'memory': bep[constants.BE_MEMORY],
484     'vcpus': bep[constants.BE_VCPUS],
485     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
486   }
487   if override:
488     args.update(override)
489   return _BuildInstanceHookEnv(**args)
490
491
492 def _CheckInstanceBridgesExist(lu, instance):
493   """Check that the brigdes needed by an instance exist.
494
495   """
496   # check bridges existance
497   brlist = [nic.bridge for nic in instance.nics]
498   if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
499     raise errors.OpPrereqError("one or more target bridges %s does not"
500                                " exist on destination node '%s'" %
501                                (brlist, instance.primary_node))
502
503
504 class LUDestroyCluster(NoHooksLU):
505   """Logical unit for destroying the cluster.
506
507   """
508   _OP_REQP = []
509
510   def CheckPrereq(self):
511     """Check prerequisites.
512
513     This checks whether the cluster is empty.
514
515     Any errors are signalled by raising errors.OpPrereqError.
516
517     """
518     master = self.cfg.GetMasterNode()
519
520     nodelist = self.cfg.GetNodeList()
521     if len(nodelist) != 1 or nodelist[0] != master:
522       raise errors.OpPrereqError("There are still %d node(s) in"
523                                  " this cluster." % (len(nodelist) - 1))
524     instancelist = self.cfg.GetInstanceList()
525     if instancelist:
526       raise errors.OpPrereqError("There are still %d instance(s) in"
527                                  " this cluster." % len(instancelist))
528
529   def Exec(self, feedback_fn):
530     """Destroys the cluster.
531
532     """
533     master = self.cfg.GetMasterNode()
534     if not self.rpc.call_node_stop_master(master, False):
535       raise errors.OpExecError("Could not disable the master role")
536     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
537     utils.CreateBackup(priv_key)
538     utils.CreateBackup(pub_key)
539     return master
540
541
542 class LUVerifyCluster(LogicalUnit):
543   """Verifies the cluster status.
544
545   """
546   HPATH = "cluster-verify"
547   HTYPE = constants.HTYPE_CLUSTER
548   _OP_REQP = ["skip_checks"]
549   REQ_BGL = False
550
551   def ExpandNames(self):
552     self.needed_locks = {
553       locking.LEVEL_NODE: locking.ALL_SET,
554       locking.LEVEL_INSTANCE: locking.ALL_SET,
555     }
556     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
557
558   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
559                   remote_version, feedback_fn):
560     """Run multiple tests against a node.
561
562     Test list:
563       - compares ganeti version
564       - checks vg existance and size > 20G
565       - checks config file checksum
566       - checks ssh to other nodes
567
568     Args:
569       node: name of the node to check
570       file_list: required list of files
571       local_cksum: dictionary of local files and their checksums
572
573     """
574     # compares ganeti version
575     local_version = constants.PROTOCOL_VERSION
576     if not remote_version:
577       feedback_fn("  - ERROR: connection to %s failed" % (node))
578       return True
579
580     if local_version != remote_version:
581       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
582                       (local_version, node, remote_version))
583       return True
584
585     # checks vg existance and size > 20G
586
587     bad = False
588     if not vglist:
589       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
590                       (node,))
591       bad = True
592     else:
593       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
594                                             constants.MIN_VG_SIZE)
595       if vgstatus:
596         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
597         bad = True
598
599     if not node_result:
600       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
601       return True
602
603     # checks config file checksum
604     # checks ssh to any
605
606     if 'filelist' not in node_result:
607       bad = True
608       feedback_fn("  - ERROR: node hasn't returned file checksum data")
609     else:
610       remote_cksum = node_result['filelist']
611       for file_name in file_list:
612         if file_name not in remote_cksum:
613           bad = True
614           feedback_fn("  - ERROR: file '%s' missing" % file_name)
615         elif remote_cksum[file_name] != local_cksum[file_name]:
616           bad = True
617           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
618
619     if 'nodelist' not in node_result:
620       bad = True
621       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
622     else:
623       if node_result['nodelist']:
624         bad = True
625         for node in node_result['nodelist']:
626           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
627                           (node, node_result['nodelist'][node]))
628     if 'node-net-test' not in node_result:
629       bad = True
630       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
631     else:
632       if node_result['node-net-test']:
633         bad = True
634         nlist = utils.NiceSort(node_result['node-net-test'].keys())
635         for node in nlist:
636           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
637                           (node, node_result['node-net-test'][node]))
638
639     hyp_result = node_result.get('hypervisor', None)
640     if isinstance(hyp_result, dict):
641       for hv_name, hv_result in hyp_result.iteritems():
642         if hv_result is not None:
643           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
644                       (hv_name, hv_result))
645     return bad
646
647   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
648                       node_instance, feedback_fn):
649     """Verify an instance.
650
651     This function checks to see if the required block devices are
652     available on the instance's node.
653
654     """
655     bad = False
656
657     node_current = instanceconfig.primary_node
658
659     node_vol_should = {}
660     instanceconfig.MapLVsByNode(node_vol_should)
661
662     for node in node_vol_should:
663       for volume in node_vol_should[node]:
664         if node not in node_vol_is or volume not in node_vol_is[node]:
665           feedback_fn("  - ERROR: volume %s missing on node %s" %
666                           (volume, node))
667           bad = True
668
669     if not instanceconfig.status == 'down':
670       if (node_current not in node_instance or
671           not instance in node_instance[node_current]):
672         feedback_fn("  - ERROR: instance %s not running on node %s" %
673                         (instance, node_current))
674         bad = True
675
676     for node in node_instance:
677       if (not node == node_current):
678         if instance in node_instance[node]:
679           feedback_fn("  - ERROR: instance %s should not run on node %s" %
680                           (instance, node))
681           bad = True
682
683     return bad
684
685   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
686     """Verify if there are any unknown volumes in the cluster.
687
688     The .os, .swap and backup volumes are ignored. All other volumes are
689     reported as unknown.
690
691     """
692     bad = False
693
694     for node in node_vol_is:
695       for volume in node_vol_is[node]:
696         if node not in node_vol_should or volume not in node_vol_should[node]:
697           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
698                       (volume, node))
699           bad = True
700     return bad
701
702   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
703     """Verify the list of running instances.
704
705     This checks what instances are running but unknown to the cluster.
706
707     """
708     bad = False
709     for node in node_instance:
710       for runninginstance in node_instance[node]:
711         if runninginstance not in instancelist:
712           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
713                           (runninginstance, node))
714           bad = True
715     return bad
716
717   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
718     """Verify N+1 Memory Resilience.
719
720     Check that if one single node dies we can still start all the instances it
721     was primary for.
722
723     """
724     bad = False
725
726     for node, nodeinfo in node_info.iteritems():
727       # This code checks that every node which is now listed as secondary has
728       # enough memory to host all instances it is supposed to should a single
729       # other node in the cluster fail.
730       # FIXME: not ready for failover to an arbitrary node
731       # FIXME: does not support file-backed instances
732       # WARNING: we currently take into account down instances as well as up
733       # ones, considering that even if they're down someone might want to start
734       # them even in the event of a node failure.
735       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
736         needed_mem = 0
737         for instance in instances:
738           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
739           if bep[constants.BE_AUTO_BALANCE]:
740             needed_mem += bep[constants.BE_MEMORY]
741         if nodeinfo['mfree'] < needed_mem:
742           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
743                       " failovers should node %s fail" % (node, prinode))
744           bad = True
745     return bad
746
747   def CheckPrereq(self):
748     """Check prerequisites.
749
750     Transform the list of checks we're going to skip into a set and check that
751     all its members are valid.
752
753     """
754     self.skip_set = frozenset(self.op.skip_checks)
755     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
756       raise errors.OpPrereqError("Invalid checks to be skipped specified")
757
758   def BuildHooksEnv(self):
759     """Build hooks env.
760
761     Cluster-Verify hooks just rone in the post phase and their failure makes
762     the output be logged in the verify output and the verification to fail.
763
764     """
765     all_nodes = self.cfg.GetNodeList()
766     # TODO: populate the environment with useful information for verify hooks
767     env = {}
768     return env, [], all_nodes
769
770   def Exec(self, feedback_fn):
771     """Verify integrity of cluster, performing various test on nodes.
772
773     """
774     bad = False
775     feedback_fn("* Verifying global settings")
776     for msg in self.cfg.VerifyConfig():
777       feedback_fn("  - ERROR: %s" % msg)
778
779     vg_name = self.cfg.GetVGName()
780     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
781     nodelist = utils.NiceSort(self.cfg.GetNodeList())
782     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
783     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
784     i_non_redundant = [] # Non redundant instances
785     i_non_a_balanced = [] # Non auto-balanced instances
786     node_volume = {}
787     node_instance = {}
788     node_info = {}
789     instance_cfg = {}
790
791     # FIXME: verify OS list
792     # do local checksums
793     file_names = []
794     file_names.append(constants.SSL_CERT_FILE)
795     file_names.append(constants.CLUSTER_CONF_FILE)
796     local_checksums = utils.FingerprintFiles(file_names)
797
798     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
799     all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
800     all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
801     all_vglist = self.rpc.call_vg_list(nodelist)
802     node_verify_param = {
803       'filelist': file_names,
804       'nodelist': nodelist,
805       'hypervisor': hypervisors,
806       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
807                         for node in nodeinfo]
808       }
809     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
810                                            self.cfg.GetClusterName())
811     all_rversion = self.rpc.call_version(nodelist)
812     all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
813                                         self.cfg.GetHypervisorType())
814
815     cluster = self.cfg.GetClusterInfo()
816     for node in nodelist:
817       feedback_fn("* Verifying node %s" % node)
818       result = self._VerifyNode(node, file_names, local_checksums,
819                                 all_vglist[node], all_nvinfo[node],
820                                 all_rversion[node], feedback_fn)
821       bad = bad or result
822
823       # node_volume
824       volumeinfo = all_volumeinfo[node]
825
826       if isinstance(volumeinfo, basestring):
827         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
828                     (node, volumeinfo[-400:].encode('string_escape')))
829         bad = True
830         node_volume[node] = {}
831       elif not isinstance(volumeinfo, dict):
832         feedback_fn("  - ERROR: connection to %s failed" % (node,))
833         bad = True
834         continue
835       else:
836         node_volume[node] = volumeinfo
837
838       # node_instance
839       nodeinstance = all_instanceinfo[node]
840       if type(nodeinstance) != list:
841         feedback_fn("  - ERROR: connection to %s failed" % (node,))
842         bad = True
843         continue
844
845       node_instance[node] = nodeinstance
846
847       # node_info
848       nodeinfo = all_ninfo[node]
849       if not isinstance(nodeinfo, dict):
850         feedback_fn("  - ERROR: connection to %s failed" % (node,))
851         bad = True
852         continue
853
854       try:
855         node_info[node] = {
856           "mfree": int(nodeinfo['memory_free']),
857           "dfree": int(nodeinfo['vg_free']),
858           "pinst": [],
859           "sinst": [],
860           # dictionary holding all instances this node is secondary for,
861           # grouped by their primary node. Each key is a cluster node, and each
862           # value is a list of instances which have the key as primary and the
863           # current node as secondary.  this is handy to calculate N+1 memory
864           # availability if you can only failover from a primary to its
865           # secondary.
866           "sinst-by-pnode": {},
867         }
868       except ValueError:
869         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
870         bad = True
871         continue
872
873     node_vol_should = {}
874
875     for instance in instancelist:
876       feedback_fn("* Verifying instance %s" % instance)
877       inst_config = self.cfg.GetInstanceInfo(instance)
878       result =  self._VerifyInstance(instance, inst_config, node_volume,
879                                      node_instance, feedback_fn)
880       bad = bad or result
881
882       inst_config.MapLVsByNode(node_vol_should)
883
884       instance_cfg[instance] = inst_config
885
886       pnode = inst_config.primary_node
887       if pnode in node_info:
888         node_info[pnode]['pinst'].append(instance)
889       else:
890         feedback_fn("  - ERROR: instance %s, connection to primary node"
891                     " %s failed" % (instance, pnode))
892         bad = True
893
894       # If the instance is non-redundant we cannot survive losing its primary
895       # node, so we are not N+1 compliant. On the other hand we have no disk
896       # templates with more than one secondary so that situation is not well
897       # supported either.
898       # FIXME: does not support file-backed instances
899       if len(inst_config.secondary_nodes) == 0:
900         i_non_redundant.append(instance)
901       elif len(inst_config.secondary_nodes) > 1:
902         feedback_fn("  - WARNING: multiple secondaries for instance %s"
903                     % instance)
904
905       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
906         i_non_a_balanced.append(instance)
907
908       for snode in inst_config.secondary_nodes:
909         if snode in node_info:
910           node_info[snode]['sinst'].append(instance)
911           if pnode not in node_info[snode]['sinst-by-pnode']:
912             node_info[snode]['sinst-by-pnode'][pnode] = []
913           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
914         else:
915           feedback_fn("  - ERROR: instance %s, connection to secondary node"
916                       " %s failed" % (instance, snode))
917
918     feedback_fn("* Verifying orphan volumes")
919     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
920                                        feedback_fn)
921     bad = bad or result
922
923     feedback_fn("* Verifying remaining instances")
924     result = self._VerifyOrphanInstances(instancelist, node_instance,
925                                          feedback_fn)
926     bad = bad or result
927
928     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
929       feedback_fn("* Verifying N+1 Memory redundancy")
930       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
931       bad = bad or result
932
933     feedback_fn("* Other Notes")
934     if i_non_redundant:
935       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
936                   % len(i_non_redundant))
937
938     if i_non_a_balanced:
939       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
940                   % len(i_non_a_balanced))
941
942     return not bad
943
944   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
945     """Analize the post-hooks' result, handle it, and send some
946     nicely-formatted feedback back to the user.
947
948     Args:
949       phase: the hooks phase that has just been run
950       hooks_results: the results of the multi-node hooks rpc call
951       feedback_fn: function to send feedback back to the caller
952       lu_result: previous Exec result
953
954     """
955     # We only really run POST phase hooks, and are only interested in
956     # their results
957     if phase == constants.HOOKS_PHASE_POST:
958       # Used to change hooks' output to proper indentation
959       indent_re = re.compile('^', re.M)
960       feedback_fn("* Hooks Results")
961       if not hooks_results:
962         feedback_fn("  - ERROR: general communication failure")
963         lu_result = 1
964       else:
965         for node_name in hooks_results:
966           show_node_header = True
967           res = hooks_results[node_name]
968           if res is False or not isinstance(res, list):
969             feedback_fn("    Communication failure")
970             lu_result = 1
971             continue
972           for script, hkr, output in res:
973             if hkr == constants.HKR_FAIL:
974               # The node header is only shown once, if there are
975               # failing hooks on that node
976               if show_node_header:
977                 feedback_fn("  Node %s:" % node_name)
978                 show_node_header = False
979               feedback_fn("    ERROR: Script %s failed, output:" % script)
980               output = indent_re.sub('      ', output)
981               feedback_fn("%s" % output)
982               lu_result = 1
983
984       return lu_result
985
986
987 class LUVerifyDisks(NoHooksLU):
988   """Verifies the cluster disks status.
989
990   """
991   _OP_REQP = []
992   REQ_BGL = False
993
994   def ExpandNames(self):
995     self.needed_locks = {
996       locking.LEVEL_NODE: locking.ALL_SET,
997       locking.LEVEL_INSTANCE: locking.ALL_SET,
998     }
999     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1000
1001   def CheckPrereq(self):
1002     """Check prerequisites.
1003
1004     This has no prerequisites.
1005
1006     """
1007     pass
1008
1009   def Exec(self, feedback_fn):
1010     """Verify integrity of cluster disks.
1011
1012     """
1013     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1014
1015     vg_name = self.cfg.GetVGName()
1016     nodes = utils.NiceSort(self.cfg.GetNodeList())
1017     instances = [self.cfg.GetInstanceInfo(name)
1018                  for name in self.cfg.GetInstanceList()]
1019
1020     nv_dict = {}
1021     for inst in instances:
1022       inst_lvs = {}
1023       if (inst.status != "up" or
1024           inst.disk_template not in constants.DTS_NET_MIRROR):
1025         continue
1026       inst.MapLVsByNode(inst_lvs)
1027       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1028       for node, vol_list in inst_lvs.iteritems():
1029         for vol in vol_list:
1030           nv_dict[(node, vol)] = inst
1031
1032     if not nv_dict:
1033       return result
1034
1035     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1036
1037     to_act = set()
1038     for node in nodes:
1039       # node_volume
1040       lvs = node_lvs[node]
1041
1042       if isinstance(lvs, basestring):
1043         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1044         res_nlvm[node] = lvs
1045       elif not isinstance(lvs, dict):
1046         logging.warning("Connection to node %s failed or invalid data"
1047                         " returned", node)
1048         res_nodes.append(node)
1049         continue
1050
1051       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1052         inst = nv_dict.pop((node, lv_name), None)
1053         if (not lv_online and inst is not None
1054             and inst.name not in res_instances):
1055           res_instances.append(inst.name)
1056
1057     # any leftover items in nv_dict are missing LVs, let's arrange the
1058     # data better
1059     for key, inst in nv_dict.iteritems():
1060       if inst.name not in res_missing:
1061         res_missing[inst.name] = []
1062       res_missing[inst.name].append(key)
1063
1064     return result
1065
1066
1067 class LURenameCluster(LogicalUnit):
1068   """Rename the cluster.
1069
1070   """
1071   HPATH = "cluster-rename"
1072   HTYPE = constants.HTYPE_CLUSTER
1073   _OP_REQP = ["name"]
1074
1075   def BuildHooksEnv(self):
1076     """Build hooks env.
1077
1078     """
1079     env = {
1080       "OP_TARGET": self.cfg.GetClusterName(),
1081       "NEW_NAME": self.op.name,
1082       }
1083     mn = self.cfg.GetMasterNode()
1084     return env, [mn], [mn]
1085
1086   def CheckPrereq(self):
1087     """Verify that the passed name is a valid one.
1088
1089     """
1090     hostname = utils.HostInfo(self.op.name)
1091
1092     new_name = hostname.name
1093     self.ip = new_ip = hostname.ip
1094     old_name = self.cfg.GetClusterName()
1095     old_ip = self.cfg.GetMasterIP()
1096     if new_name == old_name and new_ip == old_ip:
1097       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1098                                  " cluster has changed")
1099     if new_ip != old_ip:
1100       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1101         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1102                                    " reachable on the network. Aborting." %
1103                                    new_ip)
1104
1105     self.op.name = new_name
1106
1107   def Exec(self, feedback_fn):
1108     """Rename the cluster.
1109
1110     """
1111     clustername = self.op.name
1112     ip = self.ip
1113
1114     # shutdown the master IP
1115     master = self.cfg.GetMasterNode()
1116     if not self.rpc.call_node_stop_master(master, False):
1117       raise errors.OpExecError("Could not disable the master role")
1118
1119     try:
1120       # modify the sstore
1121       # TODO: sstore
1122       ss.SetKey(ss.SS_MASTER_IP, ip)
1123       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1124
1125       # Distribute updated ss config to all nodes
1126       myself = self.cfg.GetNodeInfo(master)
1127       dist_nodes = self.cfg.GetNodeList()
1128       if myself.name in dist_nodes:
1129         dist_nodes.remove(myself.name)
1130
1131       logging.debug("Copying updated ssconf data to all nodes")
1132       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1133         fname = ss.KeyToFilename(keyname)
1134         result = self.rpc.call_upload_file(dist_nodes, fname)
1135         for to_node in dist_nodes:
1136           if not result[to_node]:
1137             self.LogWarning("Copy of file %s to node %s failed",
1138                             fname, to_node)
1139     finally:
1140       if not self.rpc.call_node_start_master(master, False):
1141         self.LogWarning("Could not re-enable the master role on"
1142                         " the master, please restart manually.")
1143
1144
1145 def _RecursiveCheckIfLVMBased(disk):
1146   """Check if the given disk or its children are lvm-based.
1147
1148   Args:
1149     disk: ganeti.objects.Disk object
1150
1151   Returns:
1152     boolean indicating whether a LD_LV dev_type was found or not
1153
1154   """
1155   if disk.children:
1156     for chdisk in disk.children:
1157       if _RecursiveCheckIfLVMBased(chdisk):
1158         return True
1159   return disk.dev_type == constants.LD_LV
1160
1161
1162 class LUSetClusterParams(LogicalUnit):
1163   """Change the parameters of the cluster.
1164
1165   """
1166   HPATH = "cluster-modify"
1167   HTYPE = constants.HTYPE_CLUSTER
1168   _OP_REQP = []
1169   REQ_BGL = False
1170
1171   def ExpandNames(self):
1172     # FIXME: in the future maybe other cluster params won't require checking on
1173     # all nodes to be modified.
1174     self.needed_locks = {
1175       locking.LEVEL_NODE: locking.ALL_SET,
1176     }
1177     self.share_locks[locking.LEVEL_NODE] = 1
1178
1179   def BuildHooksEnv(self):
1180     """Build hooks env.
1181
1182     """
1183     env = {
1184       "OP_TARGET": self.cfg.GetClusterName(),
1185       "NEW_VG_NAME": self.op.vg_name,
1186       }
1187     mn = self.cfg.GetMasterNode()
1188     return env, [mn], [mn]
1189
1190   def CheckPrereq(self):
1191     """Check prerequisites.
1192
1193     This checks whether the given params don't conflict and
1194     if the given volume group is valid.
1195
1196     """
1197     # FIXME: This only works because there is only one parameter that can be
1198     # changed or removed.
1199     if self.op.vg_name is not None and not self.op.vg_name:
1200       instances = self.cfg.GetAllInstancesInfo().values()
1201       for inst in instances:
1202         for disk in inst.disks:
1203           if _RecursiveCheckIfLVMBased(disk):
1204             raise errors.OpPrereqError("Cannot disable lvm storage while"
1205                                        " lvm-based instances exist")
1206
1207     node_list = self.acquired_locks[locking.LEVEL_NODE]
1208
1209     # if vg_name not None, checks given volume group on all nodes
1210     if self.op.vg_name:
1211       vglist = self.rpc.call_vg_list(node_list)
1212       for node in node_list:
1213         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1214                                               constants.MIN_VG_SIZE)
1215         if vgstatus:
1216           raise errors.OpPrereqError("Error on node '%s': %s" %
1217                                      (node, vgstatus))
1218
1219     self.cluster = cluster = self.cfg.GetClusterInfo()
1220     # beparams changes do not need validation (we can't validate?),
1221     # but we still process here
1222     if self.op.beparams:
1223       self.new_beparams = cluster.FillDict(
1224         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1225
1226     # hypervisor list/parameters
1227     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1228     if self.op.hvparams:
1229       if not isinstance(self.op.hvparams, dict):
1230         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1231       for hv_name, hv_dict in self.op.hvparams.items():
1232         if hv_name not in self.new_hvparams:
1233           self.new_hvparams[hv_name] = hv_dict
1234         else:
1235           self.new_hvparams[hv_name].update(hv_dict)
1236
1237     if self.op.enabled_hypervisors is not None:
1238       self.hv_list = self.op.enabled_hypervisors
1239     else:
1240       self.hv_list = cluster.enabled_hypervisors
1241
1242     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1243       # either the enabled list has changed, or the parameters have, validate
1244       for hv_name, hv_params in self.new_hvparams.items():
1245         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1246             (self.op.enabled_hypervisors and
1247              hv_name in self.op.enabled_hypervisors)):
1248           # either this is a new hypervisor, or its parameters have changed
1249           hv_class = hypervisor.GetHypervisor(hv_name)
1250           hv_class.CheckParameterSyntax(hv_params)
1251           _CheckHVParams(self, node_list, hv_name, hv_params)
1252
1253   def Exec(self, feedback_fn):
1254     """Change the parameters of the cluster.
1255
1256     """
1257     if self.op.vg_name is not None:
1258       if self.op.vg_name != self.cfg.GetVGName():
1259         self.cfg.SetVGName(self.op.vg_name)
1260       else:
1261         feedback_fn("Cluster LVM configuration already in desired"
1262                     " state, not changing")
1263     if self.op.hvparams:
1264       self.cluster.hvparams = self.new_hvparams
1265     if self.op.enabled_hypervisors is not None:
1266       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1267     if self.op.beparams:
1268       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1269     self.cfg.Update(self.cluster)
1270
1271
1272 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1273   """Sleep and poll for an instance's disk to sync.
1274
1275   """
1276   if not instance.disks:
1277     return True
1278
1279   if not oneshot:
1280     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1281
1282   node = instance.primary_node
1283
1284   for dev in instance.disks:
1285     lu.cfg.SetDiskID(dev, node)
1286
1287   retries = 0
1288   while True:
1289     max_time = 0
1290     done = True
1291     cumul_degraded = False
1292     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1293     if not rstats:
1294       lu.LogWarning("Can't get any data from node %s", node)
1295       retries += 1
1296       if retries >= 10:
1297         raise errors.RemoteError("Can't contact node %s for mirror data,"
1298                                  " aborting." % node)
1299       time.sleep(6)
1300       continue
1301     retries = 0
1302     for i in range(len(rstats)):
1303       mstat = rstats[i]
1304       if mstat is None:
1305         lu.LogWarning("Can't compute data for node %s/%s",
1306                            node, instance.disks[i].iv_name)
1307         continue
1308       # we ignore the ldisk parameter
1309       perc_done, est_time, is_degraded, _ = mstat
1310       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1311       if perc_done is not None:
1312         done = False
1313         if est_time is not None:
1314           rem_time = "%d estimated seconds remaining" % est_time
1315           max_time = est_time
1316         else:
1317           rem_time = "no time estimate"
1318         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1319                         (instance.disks[i].iv_name, perc_done, rem_time))
1320     if done or oneshot:
1321       break
1322
1323     time.sleep(min(60, max_time))
1324
1325   if done:
1326     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1327   return not cumul_degraded
1328
1329
1330 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1331   """Check that mirrors are not degraded.
1332
1333   The ldisk parameter, if True, will change the test from the
1334   is_degraded attribute (which represents overall non-ok status for
1335   the device(s)) to the ldisk (representing the local storage status).
1336
1337   """
1338   lu.cfg.SetDiskID(dev, node)
1339   if ldisk:
1340     idx = 6
1341   else:
1342     idx = 5
1343
1344   result = True
1345   if on_primary or dev.AssembleOnSecondary():
1346     rstats = lu.rpc.call_blockdev_find(node, dev)
1347     if not rstats:
1348       logging.warning("Node %s: disk degraded, not found or node down", node)
1349       result = False
1350     else:
1351       result = result and (not rstats[idx])
1352   if dev.children:
1353     for child in dev.children:
1354       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1355
1356   return result
1357
1358
1359 class LUDiagnoseOS(NoHooksLU):
1360   """Logical unit for OS diagnose/query.
1361
1362   """
1363   _OP_REQP = ["output_fields", "names"]
1364   REQ_BGL = False
1365
1366   def ExpandNames(self):
1367     if self.op.names:
1368       raise errors.OpPrereqError("Selective OS query not supported")
1369
1370     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1371     _CheckOutputFields(static=[],
1372                        dynamic=self.dynamic_fields,
1373                        selected=self.op.output_fields)
1374
1375     # Lock all nodes, in shared mode
1376     self.needed_locks = {}
1377     self.share_locks[locking.LEVEL_NODE] = 1
1378     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1379
1380   def CheckPrereq(self):
1381     """Check prerequisites.
1382
1383     """
1384
1385   @staticmethod
1386   def _DiagnoseByOS(node_list, rlist):
1387     """Remaps a per-node return list into an a per-os per-node dictionary
1388
1389       Args:
1390         node_list: a list with the names of all nodes
1391         rlist: a map with node names as keys and OS objects as values
1392
1393       Returns:
1394         map: a map with osnames as keys and as value another map, with
1395              nodes as
1396              keys and list of OS objects as values
1397              e.g. {"debian-etch": {"node1": [<object>,...],
1398                                    "node2": [<object>,]}
1399                   }
1400
1401     """
1402     all_os = {}
1403     for node_name, nr in rlist.iteritems():
1404       if not nr:
1405         continue
1406       for os_obj in nr:
1407         if os_obj.name not in all_os:
1408           # build a list of nodes for this os containing empty lists
1409           # for each node in node_list
1410           all_os[os_obj.name] = {}
1411           for nname in node_list:
1412             all_os[os_obj.name][nname] = []
1413         all_os[os_obj.name][node_name].append(os_obj)
1414     return all_os
1415
1416   def Exec(self, feedback_fn):
1417     """Compute the list of OSes.
1418
1419     """
1420     node_list = self.acquired_locks[locking.LEVEL_NODE]
1421     node_data = self.rpc.call_os_diagnose(node_list)
1422     if node_data == False:
1423       raise errors.OpExecError("Can't gather the list of OSes")
1424     pol = self._DiagnoseByOS(node_list, node_data)
1425     output = []
1426     for os_name, os_data in pol.iteritems():
1427       row = []
1428       for field in self.op.output_fields:
1429         if field == "name":
1430           val = os_name
1431         elif field == "valid":
1432           val = utils.all([osl and osl[0] for osl in os_data.values()])
1433         elif field == "node_status":
1434           val = {}
1435           for node_name, nos_list in os_data.iteritems():
1436             val[node_name] = [(v.status, v.path) for v in nos_list]
1437         else:
1438           raise errors.ParameterError(field)
1439         row.append(val)
1440       output.append(row)
1441
1442     return output
1443
1444
1445 class LURemoveNode(LogicalUnit):
1446   """Logical unit for removing a node.
1447
1448   """
1449   HPATH = "node-remove"
1450   HTYPE = constants.HTYPE_NODE
1451   _OP_REQP = ["node_name"]
1452
1453   def BuildHooksEnv(self):
1454     """Build hooks env.
1455
1456     This doesn't run on the target node in the pre phase as a failed
1457     node would then be impossible to remove.
1458
1459     """
1460     env = {
1461       "OP_TARGET": self.op.node_name,
1462       "NODE_NAME": self.op.node_name,
1463       }
1464     all_nodes = self.cfg.GetNodeList()
1465     all_nodes.remove(self.op.node_name)
1466     return env, all_nodes, all_nodes
1467
1468   def CheckPrereq(self):
1469     """Check prerequisites.
1470
1471     This checks:
1472      - the node exists in the configuration
1473      - it does not have primary or secondary instances
1474      - it's not the master
1475
1476     Any errors are signalled by raising errors.OpPrereqError.
1477
1478     """
1479     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1480     if node is None:
1481       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1482
1483     instance_list = self.cfg.GetInstanceList()
1484
1485     masternode = self.cfg.GetMasterNode()
1486     if node.name == masternode:
1487       raise errors.OpPrereqError("Node is the master node,"
1488                                  " you need to failover first.")
1489
1490     for instance_name in instance_list:
1491       instance = self.cfg.GetInstanceInfo(instance_name)
1492       if node.name == instance.primary_node:
1493         raise errors.OpPrereqError("Instance %s still running on the node,"
1494                                    " please remove first." % instance_name)
1495       if node.name in instance.secondary_nodes:
1496         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1497                                    " please remove first." % instance_name)
1498     self.op.node_name = node.name
1499     self.node = node
1500
1501   def Exec(self, feedback_fn):
1502     """Removes the node from the cluster.
1503
1504     """
1505     node = self.node
1506     logging.info("Stopping the node daemon and removing configs from node %s",
1507                  node.name)
1508
1509     self.context.RemoveNode(node.name)
1510
1511     self.rpc.call_node_leave_cluster(node.name)
1512
1513
1514 class LUQueryNodes(NoHooksLU):
1515   """Logical unit for querying nodes.
1516
1517   """
1518   _OP_REQP = ["output_fields", "names"]
1519   REQ_BGL = False
1520
1521   def ExpandNames(self):
1522     self.dynamic_fields = frozenset([
1523       "dtotal", "dfree",
1524       "mtotal", "mnode", "mfree",
1525       "bootid",
1526       "ctotal",
1527       ])
1528
1529     self.static_fields = frozenset([
1530       "name", "pinst_cnt", "sinst_cnt",
1531       "pinst_list", "sinst_list",
1532       "pip", "sip", "tags",
1533       "serial_no",
1534       ])
1535
1536     _CheckOutputFields(static=self.static_fields,
1537                        dynamic=self.dynamic_fields,
1538                        selected=self.op.output_fields)
1539
1540     self.needed_locks = {}
1541     self.share_locks[locking.LEVEL_NODE] = 1
1542
1543     if self.op.names:
1544       self.wanted = _GetWantedNodes(self, self.op.names)
1545     else:
1546       self.wanted = locking.ALL_SET
1547
1548     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1549     if self.do_locking:
1550       # if we don't request only static fields, we need to lock the nodes
1551       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1552
1553
1554   def CheckPrereq(self):
1555     """Check prerequisites.
1556
1557     """
1558     # The validation of the node list is done in the _GetWantedNodes,
1559     # if non empty, and if empty, there's no validation to do
1560     pass
1561
1562   def Exec(self, feedback_fn):
1563     """Computes the list of nodes and their attributes.
1564
1565     """
1566     all_info = self.cfg.GetAllNodesInfo()
1567     if self.do_locking:
1568       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1569     elif self.wanted != locking.ALL_SET:
1570       nodenames = self.wanted
1571       missing = set(nodenames).difference(all_info.keys())
1572       if missing:
1573         raise errors.OpExecError(
1574           "Some nodes were removed before retrieving their data: %s" % missing)
1575     else:
1576       nodenames = all_info.keys()
1577
1578     nodenames = utils.NiceSort(nodenames)
1579     nodelist = [all_info[name] for name in nodenames]
1580
1581     # begin data gathering
1582
1583     if self.dynamic_fields.intersection(self.op.output_fields):
1584       live_data = {}
1585       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1586                                           self.cfg.GetHypervisorType())
1587       for name in nodenames:
1588         nodeinfo = node_data.get(name, None)
1589         if nodeinfo:
1590           live_data[name] = {
1591             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1592             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1593             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1594             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1595             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1596             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1597             "bootid": nodeinfo['bootid'],
1598             }
1599         else:
1600           live_data[name] = {}
1601     else:
1602       live_data = dict.fromkeys(nodenames, {})
1603
1604     node_to_primary = dict([(name, set()) for name in nodenames])
1605     node_to_secondary = dict([(name, set()) for name in nodenames])
1606
1607     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1608                              "sinst_cnt", "sinst_list"))
1609     if inst_fields & frozenset(self.op.output_fields):
1610       instancelist = self.cfg.GetInstanceList()
1611
1612       for instance_name in instancelist:
1613         inst = self.cfg.GetInstanceInfo(instance_name)
1614         if inst.primary_node in node_to_primary:
1615           node_to_primary[inst.primary_node].add(inst.name)
1616         for secnode in inst.secondary_nodes:
1617           if secnode in node_to_secondary:
1618             node_to_secondary[secnode].add(inst.name)
1619
1620     # end data gathering
1621
1622     output = []
1623     for node in nodelist:
1624       node_output = []
1625       for field in self.op.output_fields:
1626         if field == "name":
1627           val = node.name
1628         elif field == "pinst_list":
1629           val = list(node_to_primary[node.name])
1630         elif field == "sinst_list":
1631           val = list(node_to_secondary[node.name])
1632         elif field == "pinst_cnt":
1633           val = len(node_to_primary[node.name])
1634         elif field == "sinst_cnt":
1635           val = len(node_to_secondary[node.name])
1636         elif field == "pip":
1637           val = node.primary_ip
1638         elif field == "sip":
1639           val = node.secondary_ip
1640         elif field == "tags":
1641           val = list(node.GetTags())
1642         elif field == "serial_no":
1643           val = node.serial_no
1644         elif field in self.dynamic_fields:
1645           val = live_data[node.name].get(field, None)
1646         else:
1647           raise errors.ParameterError(field)
1648         node_output.append(val)
1649       output.append(node_output)
1650
1651     return output
1652
1653
1654 class LUQueryNodeVolumes(NoHooksLU):
1655   """Logical unit for getting volumes on node(s).
1656
1657   """
1658   _OP_REQP = ["nodes", "output_fields"]
1659   REQ_BGL = False
1660
1661   def ExpandNames(self):
1662     _CheckOutputFields(static=["node"],
1663                        dynamic=["phys", "vg", "name", "size", "instance"],
1664                        selected=self.op.output_fields)
1665
1666     self.needed_locks = {}
1667     self.share_locks[locking.LEVEL_NODE] = 1
1668     if not self.op.nodes:
1669       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1670     else:
1671       self.needed_locks[locking.LEVEL_NODE] = \
1672         _GetWantedNodes(self, self.op.nodes)
1673
1674   def CheckPrereq(self):
1675     """Check prerequisites.
1676
1677     This checks that the fields required are valid output fields.
1678
1679     """
1680     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1681
1682   def Exec(self, feedback_fn):
1683     """Computes the list of nodes and their attributes.
1684
1685     """
1686     nodenames = self.nodes
1687     volumes = self.rpc.call_node_volumes(nodenames)
1688
1689     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1690              in self.cfg.GetInstanceList()]
1691
1692     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1693
1694     output = []
1695     for node in nodenames:
1696       if node not in volumes or not volumes[node]:
1697         continue
1698
1699       node_vols = volumes[node][:]
1700       node_vols.sort(key=lambda vol: vol['dev'])
1701
1702       for vol in node_vols:
1703         node_output = []
1704         for field in self.op.output_fields:
1705           if field == "node":
1706             val = node
1707           elif field == "phys":
1708             val = vol['dev']
1709           elif field == "vg":
1710             val = vol['vg']
1711           elif field == "name":
1712             val = vol['name']
1713           elif field == "size":
1714             val = int(float(vol['size']))
1715           elif field == "instance":
1716             for inst in ilist:
1717               if node not in lv_by_node[inst]:
1718                 continue
1719               if vol['name'] in lv_by_node[inst][node]:
1720                 val = inst.name
1721                 break
1722             else:
1723               val = '-'
1724           else:
1725             raise errors.ParameterError(field)
1726           node_output.append(str(val))
1727
1728         output.append(node_output)
1729
1730     return output
1731
1732
1733 class LUAddNode(LogicalUnit):
1734   """Logical unit for adding node to the cluster.
1735
1736   """
1737   HPATH = "node-add"
1738   HTYPE = constants.HTYPE_NODE
1739   _OP_REQP = ["node_name"]
1740
1741   def BuildHooksEnv(self):
1742     """Build hooks env.
1743
1744     This will run on all nodes before, and on all nodes + the new node after.
1745
1746     """
1747     env = {
1748       "OP_TARGET": self.op.node_name,
1749       "NODE_NAME": self.op.node_name,
1750       "NODE_PIP": self.op.primary_ip,
1751       "NODE_SIP": self.op.secondary_ip,
1752       }
1753     nodes_0 = self.cfg.GetNodeList()
1754     nodes_1 = nodes_0 + [self.op.node_name, ]
1755     return env, nodes_0, nodes_1
1756
1757   def CheckPrereq(self):
1758     """Check prerequisites.
1759
1760     This checks:
1761      - the new node is not already in the config
1762      - it is resolvable
1763      - its parameters (single/dual homed) matches the cluster
1764
1765     Any errors are signalled by raising errors.OpPrereqError.
1766
1767     """
1768     node_name = self.op.node_name
1769     cfg = self.cfg
1770
1771     dns_data = utils.HostInfo(node_name)
1772
1773     node = dns_data.name
1774     primary_ip = self.op.primary_ip = dns_data.ip
1775     secondary_ip = getattr(self.op, "secondary_ip", None)
1776     if secondary_ip is None:
1777       secondary_ip = primary_ip
1778     if not utils.IsValidIP(secondary_ip):
1779       raise errors.OpPrereqError("Invalid secondary IP given")
1780     self.op.secondary_ip = secondary_ip
1781
1782     node_list = cfg.GetNodeList()
1783     if not self.op.readd and node in node_list:
1784       raise errors.OpPrereqError("Node %s is already in the configuration" %
1785                                  node)
1786     elif self.op.readd and node not in node_list:
1787       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1788
1789     for existing_node_name in node_list:
1790       existing_node = cfg.GetNodeInfo(existing_node_name)
1791
1792       if self.op.readd and node == existing_node_name:
1793         if (existing_node.primary_ip != primary_ip or
1794             existing_node.secondary_ip != secondary_ip):
1795           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1796                                      " address configuration as before")
1797         continue
1798
1799       if (existing_node.primary_ip == primary_ip or
1800           existing_node.secondary_ip == primary_ip or
1801           existing_node.primary_ip == secondary_ip or
1802           existing_node.secondary_ip == secondary_ip):
1803         raise errors.OpPrereqError("New node ip address(es) conflict with"
1804                                    " existing node %s" % existing_node.name)
1805
1806     # check that the type of the node (single versus dual homed) is the
1807     # same as for the master
1808     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1809     master_singlehomed = myself.secondary_ip == myself.primary_ip
1810     newbie_singlehomed = secondary_ip == primary_ip
1811     if master_singlehomed != newbie_singlehomed:
1812       if master_singlehomed:
1813         raise errors.OpPrereqError("The master has no private ip but the"
1814                                    " new node has one")
1815       else:
1816         raise errors.OpPrereqError("The master has a private ip but the"
1817                                    " new node doesn't have one")
1818
1819     # checks reachablity
1820     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1821       raise errors.OpPrereqError("Node not reachable by ping")
1822
1823     if not newbie_singlehomed:
1824       # check reachability from my secondary ip to newbie's secondary ip
1825       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1826                            source=myself.secondary_ip):
1827         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1828                                    " based ping to noded port")
1829
1830     self.new_node = objects.Node(name=node,
1831                                  primary_ip=primary_ip,
1832                                  secondary_ip=secondary_ip)
1833
1834   def Exec(self, feedback_fn):
1835     """Adds the new node to the cluster.
1836
1837     """
1838     new_node = self.new_node
1839     node = new_node.name
1840
1841     # check connectivity
1842     result = self.rpc.call_version([node])[node]
1843     if result:
1844       if constants.PROTOCOL_VERSION == result:
1845         logging.info("Communication to node %s fine, sw version %s match",
1846                      node, result)
1847       else:
1848         raise errors.OpExecError("Version mismatch master version %s,"
1849                                  " node version %s" %
1850                                  (constants.PROTOCOL_VERSION, result))
1851     else:
1852       raise errors.OpExecError("Cannot get version from the new node")
1853
1854     # setup ssh on node
1855     logging.info("Copy ssh key to node %s", node)
1856     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1857     keyarray = []
1858     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1859                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1860                 priv_key, pub_key]
1861
1862     for i in keyfiles:
1863       f = open(i, 'r')
1864       try:
1865         keyarray.append(f.read())
1866       finally:
1867         f.close()
1868
1869     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1870                                     keyarray[2],
1871                                     keyarray[3], keyarray[4], keyarray[5])
1872
1873     if not result:
1874       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1875
1876     # Add node to our /etc/hosts, and add key to known_hosts
1877     utils.AddHostToEtcHosts(new_node.name)
1878
1879     if new_node.secondary_ip != new_node.primary_ip:
1880       if not self.rpc.call_node_has_ip_address(new_node.name,
1881                                                new_node.secondary_ip):
1882         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1883                                  " you gave (%s). Please fix and re-run this"
1884                                  " command." % new_node.secondary_ip)
1885
1886     node_verify_list = [self.cfg.GetMasterNode()]
1887     node_verify_param = {
1888       'nodelist': [node],
1889       # TODO: do a node-net-test as well?
1890     }
1891
1892     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1893                                        self.cfg.GetClusterName())
1894     for verifier in node_verify_list:
1895       if not result[verifier]:
1896         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1897                                  " for remote verification" % verifier)
1898       if result[verifier]['nodelist']:
1899         for failed in result[verifier]['nodelist']:
1900           feedback_fn("ssh/hostname verification failed %s -> %s" %
1901                       (verifier, result[verifier]['nodelist'][failed]))
1902         raise errors.OpExecError("ssh/hostname verification failed.")
1903
1904     # Distribute updated /etc/hosts and known_hosts to all nodes,
1905     # including the node just added
1906     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1907     dist_nodes = self.cfg.GetNodeList()
1908     if not self.op.readd:
1909       dist_nodes.append(node)
1910     if myself.name in dist_nodes:
1911       dist_nodes.remove(myself.name)
1912
1913     logging.debug("Copying hosts and known_hosts to all nodes")
1914     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1915       result = self.rpc.call_upload_file(dist_nodes, fname)
1916       for to_node in dist_nodes:
1917         if not result[to_node]:
1918           logging.error("Copy of file %s to node %s failed", fname, to_node)
1919
1920     to_copy = []
1921     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1922       to_copy.append(constants.VNC_PASSWORD_FILE)
1923     for fname in to_copy:
1924       result = self.rpc.call_upload_file([node], fname)
1925       if not result[node]:
1926         logging.error("Could not copy file %s to node %s", fname, node)
1927
1928     if self.op.readd:
1929       self.context.ReaddNode(new_node)
1930     else:
1931       self.context.AddNode(new_node)
1932
1933
1934 class LUQueryClusterInfo(NoHooksLU):
1935   """Query cluster configuration.
1936
1937   """
1938   _OP_REQP = []
1939   REQ_MASTER = False
1940   REQ_BGL = False
1941
1942   def ExpandNames(self):
1943     self.needed_locks = {}
1944
1945   def CheckPrereq(self):
1946     """No prerequsites needed for this LU.
1947
1948     """
1949     pass
1950
1951   def Exec(self, feedback_fn):
1952     """Return cluster config.
1953
1954     """
1955     cluster = self.cfg.GetClusterInfo()
1956     result = {
1957       "software_version": constants.RELEASE_VERSION,
1958       "protocol_version": constants.PROTOCOL_VERSION,
1959       "config_version": constants.CONFIG_VERSION,
1960       "os_api_version": constants.OS_API_VERSION,
1961       "export_version": constants.EXPORT_VERSION,
1962       "architecture": (platform.architecture()[0], platform.machine()),
1963       "name": cluster.cluster_name,
1964       "master": cluster.master_node,
1965       "default_hypervisor": cluster.default_hypervisor,
1966       "enabled_hypervisors": cluster.enabled_hypervisors,
1967       "hvparams": cluster.hvparams,
1968       "beparams": cluster.beparams,
1969       }
1970
1971     return result
1972
1973
1974 class LUQueryConfigValues(NoHooksLU):
1975   """Return configuration values.
1976
1977   """
1978   _OP_REQP = []
1979   REQ_BGL = False
1980
1981   def ExpandNames(self):
1982     self.needed_locks = {}
1983
1984     static_fields = ["cluster_name", "master_node", "drain_flag"]
1985     _CheckOutputFields(static=static_fields,
1986                        dynamic=[],
1987                        selected=self.op.output_fields)
1988
1989   def CheckPrereq(self):
1990     """No prerequisites.
1991
1992     """
1993     pass
1994
1995   def Exec(self, feedback_fn):
1996     """Dump a representation of the cluster config to the standard output.
1997
1998     """
1999     values = []
2000     for field in self.op.output_fields:
2001       if field == "cluster_name":
2002         entry = self.cfg.GetClusterName()
2003       elif field == "master_node":
2004         entry = self.cfg.GetMasterNode()
2005       elif field == "drain_flag":
2006         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2007       else:
2008         raise errors.ParameterError(field)
2009       values.append(entry)
2010     return values
2011
2012
2013 class LUActivateInstanceDisks(NoHooksLU):
2014   """Bring up an instance's disks.
2015
2016   """
2017   _OP_REQP = ["instance_name"]
2018   REQ_BGL = False
2019
2020   def ExpandNames(self):
2021     self._ExpandAndLockInstance()
2022     self.needed_locks[locking.LEVEL_NODE] = []
2023     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2024
2025   def DeclareLocks(self, level):
2026     if level == locking.LEVEL_NODE:
2027       self._LockInstancesNodes()
2028
2029   def CheckPrereq(self):
2030     """Check prerequisites.
2031
2032     This checks that the instance is in the cluster.
2033
2034     """
2035     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2036     assert self.instance is not None, \
2037       "Cannot retrieve locked instance %s" % self.op.instance_name
2038
2039   def Exec(self, feedback_fn):
2040     """Activate the disks.
2041
2042     """
2043     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2044     if not disks_ok:
2045       raise errors.OpExecError("Cannot activate block devices")
2046
2047     return disks_info
2048
2049
2050 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2051   """Prepare the block devices for an instance.
2052
2053   This sets up the block devices on all nodes.
2054
2055   Args:
2056     instance: a ganeti.objects.Instance object
2057     ignore_secondaries: if true, errors on secondary nodes won't result
2058                         in an error return from the function
2059
2060   Returns:
2061     false if the operation failed
2062     list of (host, instance_visible_name, node_visible_name) if the operation
2063          suceeded with the mapping from node devices to instance devices
2064   """
2065   device_info = []
2066   disks_ok = True
2067   iname = instance.name
2068   # With the two passes mechanism we try to reduce the window of
2069   # opportunity for the race condition of switching DRBD to primary
2070   # before handshaking occured, but we do not eliminate it
2071
2072   # The proper fix would be to wait (with some limits) until the
2073   # connection has been made and drbd transitions from WFConnection
2074   # into any other network-connected state (Connected, SyncTarget,
2075   # SyncSource, etc.)
2076
2077   # 1st pass, assemble on all nodes in secondary mode
2078   for inst_disk in instance.disks:
2079     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2080       lu.cfg.SetDiskID(node_disk, node)
2081       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2082       if not result:
2083         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2084                            " (is_primary=False, pass=1)",
2085                            inst_disk.iv_name, node)
2086         if not ignore_secondaries:
2087           disks_ok = False
2088
2089   # FIXME: race condition on drbd migration to primary
2090
2091   # 2nd pass, do only the primary node
2092   for inst_disk in instance.disks:
2093     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2094       if node != instance.primary_node:
2095         continue
2096       lu.cfg.SetDiskID(node_disk, node)
2097       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2098       if not result:
2099         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2100                            " (is_primary=True, pass=2)",
2101                            inst_disk.iv_name, node)
2102         disks_ok = False
2103     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2104
2105   # leave the disks configured for the primary node
2106   # this is a workaround that would be fixed better by
2107   # improving the logical/physical id handling
2108   for disk in instance.disks:
2109     lu.cfg.SetDiskID(disk, instance.primary_node)
2110
2111   return disks_ok, device_info
2112
2113
2114 def _StartInstanceDisks(lu, instance, force):
2115   """Start the disks of an instance.
2116
2117   """
2118   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2119                                            ignore_secondaries=force)
2120   if not disks_ok:
2121     _ShutdownInstanceDisks(lu, instance)
2122     if force is not None and not force:
2123       lu.proc.LogWarning("", hint="If the message above refers to a"
2124                          " secondary node,"
2125                          " you can retry the operation using '--force'.")
2126     raise errors.OpExecError("Disk consistency error")
2127
2128
2129 class LUDeactivateInstanceDisks(NoHooksLU):
2130   """Shutdown an instance's disks.
2131
2132   """
2133   _OP_REQP = ["instance_name"]
2134   REQ_BGL = False
2135
2136   def ExpandNames(self):
2137     self._ExpandAndLockInstance()
2138     self.needed_locks[locking.LEVEL_NODE] = []
2139     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2140
2141   def DeclareLocks(self, level):
2142     if level == locking.LEVEL_NODE:
2143       self._LockInstancesNodes()
2144
2145   def CheckPrereq(self):
2146     """Check prerequisites.
2147
2148     This checks that the instance is in the cluster.
2149
2150     """
2151     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2152     assert self.instance is not None, \
2153       "Cannot retrieve locked instance %s" % self.op.instance_name
2154
2155   def Exec(self, feedback_fn):
2156     """Deactivate the disks
2157
2158     """
2159     instance = self.instance
2160     _SafeShutdownInstanceDisks(self, instance)
2161
2162
2163 def _SafeShutdownInstanceDisks(lu, instance):
2164   """Shutdown block devices of an instance.
2165
2166   This function checks if an instance is running, before calling
2167   _ShutdownInstanceDisks.
2168
2169   """
2170   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2171                                       [instance.hypervisor])
2172   ins_l = ins_l[instance.primary_node]
2173   if not type(ins_l) is list:
2174     raise errors.OpExecError("Can't contact node '%s'" %
2175                              instance.primary_node)
2176
2177   if instance.name in ins_l:
2178     raise errors.OpExecError("Instance is running, can't shutdown"
2179                              " block devices.")
2180
2181   _ShutdownInstanceDisks(lu, instance)
2182
2183
2184 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2185   """Shutdown block devices of an instance.
2186
2187   This does the shutdown on all nodes of the instance.
2188
2189   If the ignore_primary is false, errors on the primary node are
2190   ignored.
2191
2192   """
2193   result = True
2194   for disk in instance.disks:
2195     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2196       lu.cfg.SetDiskID(top_disk, node)
2197       if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2198         logging.error("Could not shutdown block device %s on node %s",
2199                       disk.iv_name, node)
2200         if not ignore_primary or node != instance.primary_node:
2201           result = False
2202   return result
2203
2204
2205 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2206   """Checks if a node has enough free memory.
2207
2208   This function check if a given node has the needed amount of free
2209   memory. In case the node has less memory or we cannot get the
2210   information from the node, this function raise an OpPrereqError
2211   exception.
2212
2213   @type lu: C{LogicalUnit}
2214   @param lu: a logical unit from which we get configuration data
2215   @type node: C{str}
2216   @param node: the node to check
2217   @type reason: C{str}
2218   @param reason: string to use in the error message
2219   @type requested: C{int}
2220   @param requested: the amount of memory in MiB to check for
2221   @type hypervisor: C{str}
2222   @param hypervisor: the hypervisor to ask for memory stats
2223   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2224       we cannot check the node
2225
2226   """
2227   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2228   if not nodeinfo or not isinstance(nodeinfo, dict):
2229     raise errors.OpPrereqError("Could not contact node %s for resource"
2230                              " information" % (node,))
2231
2232   free_mem = nodeinfo[node].get('memory_free')
2233   if not isinstance(free_mem, int):
2234     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2235                              " was '%s'" % (node, free_mem))
2236   if requested > free_mem:
2237     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2238                              " needed %s MiB, available %s MiB" %
2239                              (node, reason, requested, free_mem))
2240
2241
2242 class LUStartupInstance(LogicalUnit):
2243   """Starts an instance.
2244
2245   """
2246   HPATH = "instance-start"
2247   HTYPE = constants.HTYPE_INSTANCE
2248   _OP_REQP = ["instance_name", "force"]
2249   REQ_BGL = False
2250
2251   def ExpandNames(self):
2252     self._ExpandAndLockInstance()
2253     self.needed_locks[locking.LEVEL_NODE] = []
2254     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2255
2256   def DeclareLocks(self, level):
2257     if level == locking.LEVEL_NODE:
2258       self._LockInstancesNodes()
2259
2260   def BuildHooksEnv(self):
2261     """Build hooks env.
2262
2263     This runs on master, primary and secondary nodes of the instance.
2264
2265     """
2266     env = {
2267       "FORCE": self.op.force,
2268       }
2269     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2270     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2271           list(self.instance.secondary_nodes))
2272     return env, nl, nl
2273
2274   def CheckPrereq(self):
2275     """Check prerequisites.
2276
2277     This checks that the instance is in the cluster.
2278
2279     """
2280     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2281     assert self.instance is not None, \
2282       "Cannot retrieve locked instance %s" % self.op.instance_name
2283
2284     bep = self.cfg.GetClusterInfo().FillBE(instance)
2285     # check bridges existance
2286     _CheckInstanceBridgesExist(self, instance)
2287
2288     _CheckNodeFreeMemory(self, instance.primary_node,
2289                          "starting instance %s" % instance.name,
2290                          bep[constants.BE_MEMORY], instance.hypervisor)
2291
2292   def Exec(self, feedback_fn):
2293     """Start the instance.
2294
2295     """
2296     instance = self.instance
2297     force = self.op.force
2298     extra_args = getattr(self.op, "extra_args", "")
2299
2300     self.cfg.MarkInstanceUp(instance.name)
2301
2302     node_current = instance.primary_node
2303
2304     _StartInstanceDisks(self, instance, force)
2305
2306     if not self.rpc.call_instance_start(node_current, instance, extra_args):
2307       _ShutdownInstanceDisks(self, instance)
2308       raise errors.OpExecError("Could not start instance")
2309
2310
2311 class LURebootInstance(LogicalUnit):
2312   """Reboot an instance.
2313
2314   """
2315   HPATH = "instance-reboot"
2316   HTYPE = constants.HTYPE_INSTANCE
2317   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2318   REQ_BGL = False
2319
2320   def ExpandNames(self):
2321     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2322                                    constants.INSTANCE_REBOOT_HARD,
2323                                    constants.INSTANCE_REBOOT_FULL]:
2324       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2325                                   (constants.INSTANCE_REBOOT_SOFT,
2326                                    constants.INSTANCE_REBOOT_HARD,
2327                                    constants.INSTANCE_REBOOT_FULL))
2328     self._ExpandAndLockInstance()
2329     self.needed_locks[locking.LEVEL_NODE] = []
2330     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2331
2332   def DeclareLocks(self, level):
2333     if level == locking.LEVEL_NODE:
2334       primary_only = not constants.INSTANCE_REBOOT_FULL
2335       self._LockInstancesNodes(primary_only=primary_only)
2336
2337   def BuildHooksEnv(self):
2338     """Build hooks env.
2339
2340     This runs on master, primary and secondary nodes of the instance.
2341
2342     """
2343     env = {
2344       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2345       }
2346     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2347     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2348           list(self.instance.secondary_nodes))
2349     return env, nl, nl
2350
2351   def CheckPrereq(self):
2352     """Check prerequisites.
2353
2354     This checks that the instance is in the cluster.
2355
2356     """
2357     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2358     assert self.instance is not None, \
2359       "Cannot retrieve locked instance %s" % self.op.instance_name
2360
2361     # check bridges existance
2362     _CheckInstanceBridgesExist(self, instance)
2363
2364   def Exec(self, feedback_fn):
2365     """Reboot the instance.
2366
2367     """
2368     instance = self.instance
2369     ignore_secondaries = self.op.ignore_secondaries
2370     reboot_type = self.op.reboot_type
2371     extra_args = getattr(self.op, "extra_args", "")
2372
2373     node_current = instance.primary_node
2374
2375     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2376                        constants.INSTANCE_REBOOT_HARD]:
2377       if not self.rpc.call_instance_reboot(node_current, instance,
2378                                            reboot_type, extra_args):
2379         raise errors.OpExecError("Could not reboot instance")
2380     else:
2381       if not self.rpc.call_instance_shutdown(node_current, instance):
2382         raise errors.OpExecError("could not shutdown instance for full reboot")
2383       _ShutdownInstanceDisks(self, instance)
2384       _StartInstanceDisks(self, instance, ignore_secondaries)
2385       if not self.rpc.call_instance_start(node_current, instance, extra_args):
2386         _ShutdownInstanceDisks(self, instance)
2387         raise errors.OpExecError("Could not start instance for full reboot")
2388
2389     self.cfg.MarkInstanceUp(instance.name)
2390
2391
2392 class LUShutdownInstance(LogicalUnit):
2393   """Shutdown an instance.
2394
2395   """
2396   HPATH = "instance-stop"
2397   HTYPE = constants.HTYPE_INSTANCE
2398   _OP_REQP = ["instance_name"]
2399   REQ_BGL = False
2400
2401   def ExpandNames(self):
2402     self._ExpandAndLockInstance()
2403     self.needed_locks[locking.LEVEL_NODE] = []
2404     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2405
2406   def DeclareLocks(self, level):
2407     if level == locking.LEVEL_NODE:
2408       self._LockInstancesNodes()
2409
2410   def BuildHooksEnv(self):
2411     """Build hooks env.
2412
2413     This runs on master, primary and secondary nodes of the instance.
2414
2415     """
2416     env = _BuildInstanceHookEnvByObject(self, self.instance)
2417     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2418           list(self.instance.secondary_nodes))
2419     return env, nl, nl
2420
2421   def CheckPrereq(self):
2422     """Check prerequisites.
2423
2424     This checks that the instance is in the cluster.
2425
2426     """
2427     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2428     assert self.instance is not None, \
2429       "Cannot retrieve locked instance %s" % self.op.instance_name
2430
2431   def Exec(self, feedback_fn):
2432     """Shutdown the instance.
2433
2434     """
2435     instance = self.instance
2436     node_current = instance.primary_node
2437     self.cfg.MarkInstanceDown(instance.name)
2438     if not self.rpc.call_instance_shutdown(node_current, instance):
2439       self.proc.LogWarning("Could not shutdown instance")
2440
2441     _ShutdownInstanceDisks(self, instance)
2442
2443
2444 class LUReinstallInstance(LogicalUnit):
2445   """Reinstall an instance.
2446
2447   """
2448   HPATH = "instance-reinstall"
2449   HTYPE = constants.HTYPE_INSTANCE
2450   _OP_REQP = ["instance_name"]
2451   REQ_BGL = False
2452
2453   def ExpandNames(self):
2454     self._ExpandAndLockInstance()
2455     self.needed_locks[locking.LEVEL_NODE] = []
2456     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2457
2458   def DeclareLocks(self, level):
2459     if level == locking.LEVEL_NODE:
2460       self._LockInstancesNodes()
2461
2462   def BuildHooksEnv(self):
2463     """Build hooks env.
2464
2465     This runs on master, primary and secondary nodes of the instance.
2466
2467     """
2468     env = _BuildInstanceHookEnvByObject(self, self.instance)
2469     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2470           list(self.instance.secondary_nodes))
2471     return env, nl, nl
2472
2473   def CheckPrereq(self):
2474     """Check prerequisites.
2475
2476     This checks that the instance is in the cluster and is not running.
2477
2478     """
2479     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2480     assert instance is not None, \
2481       "Cannot retrieve locked instance %s" % self.op.instance_name
2482
2483     if instance.disk_template == constants.DT_DISKLESS:
2484       raise errors.OpPrereqError("Instance '%s' has no disks" %
2485                                  self.op.instance_name)
2486     if instance.status != "down":
2487       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2488                                  self.op.instance_name)
2489     remote_info = self.rpc.call_instance_info(instance.primary_node,
2490                                               instance.name,
2491                                               instance.hypervisor)
2492     if remote_info:
2493       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2494                                  (self.op.instance_name,
2495                                   instance.primary_node))
2496
2497     self.op.os_type = getattr(self.op, "os_type", None)
2498     if self.op.os_type is not None:
2499       # OS verification
2500       pnode = self.cfg.GetNodeInfo(
2501         self.cfg.ExpandNodeName(instance.primary_node))
2502       if pnode is None:
2503         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2504                                    self.op.pnode)
2505       os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2506       if not os_obj:
2507         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2508                                    " primary node"  % self.op.os_type)
2509
2510     self.instance = instance
2511
2512   def Exec(self, feedback_fn):
2513     """Reinstall the instance.
2514
2515     """
2516     inst = self.instance
2517
2518     if self.op.os_type is not None:
2519       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2520       inst.os = self.op.os_type
2521       self.cfg.Update(inst)
2522
2523     _StartInstanceDisks(self, inst, None)
2524     try:
2525       feedback_fn("Running the instance OS create scripts...")
2526       if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2527         raise errors.OpExecError("Could not install OS for instance %s"
2528                                  " on node %s" %
2529                                  (inst.name, inst.primary_node))
2530     finally:
2531       _ShutdownInstanceDisks(self, inst)
2532
2533
2534 class LURenameInstance(LogicalUnit):
2535   """Rename an instance.
2536
2537   """
2538   HPATH = "instance-rename"
2539   HTYPE = constants.HTYPE_INSTANCE
2540   _OP_REQP = ["instance_name", "new_name"]
2541
2542   def BuildHooksEnv(self):
2543     """Build hooks env.
2544
2545     This runs on master, primary and secondary nodes of the instance.
2546
2547     """
2548     env = _BuildInstanceHookEnvByObject(self, self.instance)
2549     env["INSTANCE_NEW_NAME"] = self.op.new_name
2550     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2551           list(self.instance.secondary_nodes))
2552     return env, nl, nl
2553
2554   def CheckPrereq(self):
2555     """Check prerequisites.
2556
2557     This checks that the instance is in the cluster and is not running.
2558
2559     """
2560     instance = self.cfg.GetInstanceInfo(
2561       self.cfg.ExpandInstanceName(self.op.instance_name))
2562     if instance is None:
2563       raise errors.OpPrereqError("Instance '%s' not known" %
2564                                  self.op.instance_name)
2565     if instance.status != "down":
2566       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2567                                  self.op.instance_name)
2568     remote_info = self.rpc.call_instance_info(instance.primary_node,
2569                                               instance.name,
2570                                               instance.hypervisor)
2571     if remote_info:
2572       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2573                                  (self.op.instance_name,
2574                                   instance.primary_node))
2575     self.instance = instance
2576
2577     # new name verification
2578     name_info = utils.HostInfo(self.op.new_name)
2579
2580     self.op.new_name = new_name = name_info.name
2581     instance_list = self.cfg.GetInstanceList()
2582     if new_name in instance_list:
2583       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2584                                  new_name)
2585
2586     if not getattr(self.op, "ignore_ip", False):
2587       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2588         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2589                                    (name_info.ip, new_name))
2590
2591
2592   def Exec(self, feedback_fn):
2593     """Reinstall the instance.
2594
2595     """
2596     inst = self.instance
2597     old_name = inst.name
2598
2599     if inst.disk_template == constants.DT_FILE:
2600       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2601
2602     self.cfg.RenameInstance(inst.name, self.op.new_name)
2603     # Change the instance lock. This is definitely safe while we hold the BGL
2604     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2605     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2606
2607     # re-read the instance from the configuration after rename
2608     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2609
2610     if inst.disk_template == constants.DT_FILE:
2611       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2612       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2613                                                      old_file_storage_dir,
2614                                                      new_file_storage_dir)
2615
2616       if not result:
2617         raise errors.OpExecError("Could not connect to node '%s' to rename"
2618                                  " directory '%s' to '%s' (but the instance"
2619                                  " has been renamed in Ganeti)" % (
2620                                  inst.primary_node, old_file_storage_dir,
2621                                  new_file_storage_dir))
2622
2623       if not result[0]:
2624         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2625                                  " (but the instance has been renamed in"
2626                                  " Ganeti)" % (old_file_storage_dir,
2627                                                new_file_storage_dir))
2628
2629     _StartInstanceDisks(self, inst, None)
2630     try:
2631       if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2632                                                old_name):
2633         msg = ("Could not run OS rename script for instance %s on node %s"
2634                " (but the instance has been renamed in Ganeti)" %
2635                (inst.name, inst.primary_node))
2636         self.proc.LogWarning(msg)
2637     finally:
2638       _ShutdownInstanceDisks(self, inst)
2639
2640
2641 class LURemoveInstance(LogicalUnit):
2642   """Remove an instance.
2643
2644   """
2645   HPATH = "instance-remove"
2646   HTYPE = constants.HTYPE_INSTANCE
2647   _OP_REQP = ["instance_name", "ignore_failures"]
2648   REQ_BGL = False
2649
2650   def ExpandNames(self):
2651     self._ExpandAndLockInstance()
2652     self.needed_locks[locking.LEVEL_NODE] = []
2653     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2654
2655   def DeclareLocks(self, level):
2656     if level == locking.LEVEL_NODE:
2657       self._LockInstancesNodes()
2658
2659   def BuildHooksEnv(self):
2660     """Build hooks env.
2661
2662     This runs on master, primary and secondary nodes of the instance.
2663
2664     """
2665     env = _BuildInstanceHookEnvByObject(self, self.instance)
2666     nl = [self.cfg.GetMasterNode()]
2667     return env, nl, nl
2668
2669   def CheckPrereq(self):
2670     """Check prerequisites.
2671
2672     This checks that the instance is in the cluster.
2673
2674     """
2675     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2676     assert self.instance is not None, \
2677       "Cannot retrieve locked instance %s" % self.op.instance_name
2678
2679   def Exec(self, feedback_fn):
2680     """Remove the instance.
2681
2682     """
2683     instance = self.instance
2684     logging.info("Shutting down instance %s on node %s",
2685                  instance.name, instance.primary_node)
2686
2687     if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2688       if self.op.ignore_failures:
2689         feedback_fn("Warning: can't shutdown instance")
2690       else:
2691         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2692                                  (instance.name, instance.primary_node))
2693
2694     logging.info("Removing block devices for instance %s", instance.name)
2695
2696     if not _RemoveDisks(self, instance):
2697       if self.op.ignore_failures:
2698         feedback_fn("Warning: can't remove instance's disks")
2699       else:
2700         raise errors.OpExecError("Can't remove instance's disks")
2701
2702     logging.info("Removing instance %s out of cluster config", instance.name)
2703
2704     self.cfg.RemoveInstance(instance.name)
2705     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2706
2707
2708 class LUQueryInstances(NoHooksLU):
2709   """Logical unit for querying instances.
2710
2711   """
2712   _OP_REQP = ["output_fields", "names"]
2713   REQ_BGL = False
2714
2715   def ExpandNames(self):
2716     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2717     hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2718     bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2719     self.static_fields = frozenset([
2720       "name", "os", "pnode", "snodes",
2721       "admin_state", "admin_ram",
2722       "disk_template", "ip", "mac", "bridge",
2723       "sda_size", "sdb_size", "vcpus", "tags",
2724       "network_port", "beparams",
2725       "serial_no", "hypervisor", "hvparams",
2726       ] + hvp + bep)
2727
2728     _CheckOutputFields(static=self.static_fields,
2729                        dynamic=self.dynamic_fields,
2730                        selected=self.op.output_fields)
2731
2732     self.needed_locks = {}
2733     self.share_locks[locking.LEVEL_INSTANCE] = 1
2734     self.share_locks[locking.LEVEL_NODE] = 1
2735
2736     if self.op.names:
2737       self.wanted = _GetWantedInstances(self, self.op.names)
2738     else:
2739       self.wanted = locking.ALL_SET
2740
2741     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2742     if self.do_locking:
2743       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2744       self.needed_locks[locking.LEVEL_NODE] = []
2745       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2746
2747   def DeclareLocks(self, level):
2748     if level == locking.LEVEL_NODE and self.do_locking:
2749       self._LockInstancesNodes()
2750
2751   def CheckPrereq(self):
2752     """Check prerequisites.
2753
2754     """
2755     pass
2756
2757   def Exec(self, feedback_fn):
2758     """Computes the list of nodes and their attributes.
2759
2760     """
2761     all_info = self.cfg.GetAllInstancesInfo()
2762     if self.do_locking:
2763       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2764     elif self.wanted != locking.ALL_SET:
2765       instance_names = self.wanted
2766       missing = set(instance_names).difference(all_info.keys())
2767       if missing:
2768         raise errors.OpExecError(
2769           "Some instances were removed before retrieving their data: %s"
2770           % missing)
2771     else:
2772       instance_names = all_info.keys()
2773
2774     instance_names = utils.NiceSort(instance_names)
2775     instance_list = [all_info[iname] for iname in instance_names]
2776
2777     # begin data gathering
2778
2779     nodes = frozenset([inst.primary_node for inst in instance_list])
2780     hv_list = list(set([inst.hypervisor for inst in instance_list]))
2781
2782     bad_nodes = []
2783     if self.dynamic_fields.intersection(self.op.output_fields):
2784       live_data = {}
2785       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2786       for name in nodes:
2787         result = node_data[name]
2788         if result:
2789           live_data.update(result)
2790         elif result == False:
2791           bad_nodes.append(name)
2792         # else no instance is alive
2793     else:
2794       live_data = dict([(name, {}) for name in instance_names])
2795
2796     # end data gathering
2797
2798     HVPREFIX = "hv/"
2799     BEPREFIX = "be/"
2800     output = []
2801     for instance in instance_list:
2802       iout = []
2803       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2804       i_be = self.cfg.GetClusterInfo().FillBE(instance)
2805       for field in self.op.output_fields:
2806         if field == "name":
2807           val = instance.name
2808         elif field == "os":
2809           val = instance.os
2810         elif field == "pnode":
2811           val = instance.primary_node
2812         elif field == "snodes":
2813           val = list(instance.secondary_nodes)
2814         elif field == "admin_state":
2815           val = (instance.status != "down")
2816         elif field == "oper_state":
2817           if instance.primary_node in bad_nodes:
2818             val = None
2819           else:
2820             val = bool(live_data.get(instance.name))
2821         elif field == "status":
2822           if instance.primary_node in bad_nodes:
2823             val = "ERROR_nodedown"
2824           else:
2825             running = bool(live_data.get(instance.name))
2826             if running:
2827               if instance.status != "down":
2828                 val = "running"
2829               else:
2830                 val = "ERROR_up"
2831             else:
2832               if instance.status != "down":
2833                 val = "ERROR_down"
2834               else:
2835                 val = "ADMIN_down"
2836         elif field == "oper_ram":
2837           if instance.primary_node in bad_nodes:
2838             val = None
2839           elif instance.name in live_data:
2840             val = live_data[instance.name].get("memory", "?")
2841           else:
2842             val = "-"
2843         elif field == "disk_template":
2844           val = instance.disk_template
2845         elif field == "ip":
2846           val = instance.nics[0].ip
2847         elif field == "bridge":
2848           val = instance.nics[0].bridge
2849         elif field == "mac":
2850           val = instance.nics[0].mac
2851         elif field == "sda_size" or field == "sdb_size":
2852           disk = instance.FindDisk(field[:3])
2853           if disk is None:
2854             val = None
2855           else:
2856             val = disk.size
2857         elif field == "tags":
2858           val = list(instance.GetTags())
2859         elif field == "serial_no":
2860           val = instance.serial_no
2861         elif field == "network_port":
2862           val = instance.network_port
2863         elif field == "hypervisor":
2864           val = instance.hypervisor
2865         elif field == "hvparams":
2866           val = i_hv
2867         elif (field.startswith(HVPREFIX) and
2868               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2869           val = i_hv.get(field[len(HVPREFIX):], None)
2870         elif field == "beparams":
2871           val = i_be
2872         elif (field.startswith(BEPREFIX) and
2873               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2874           val = i_be.get(field[len(BEPREFIX):], None)
2875         else:
2876           raise errors.ParameterError(field)
2877         iout.append(val)
2878       output.append(iout)
2879
2880     return output
2881
2882
2883 class LUFailoverInstance(LogicalUnit):
2884   """Failover an instance.
2885
2886   """
2887   HPATH = "instance-failover"
2888   HTYPE = constants.HTYPE_INSTANCE
2889   _OP_REQP = ["instance_name", "ignore_consistency"]
2890   REQ_BGL = False
2891
2892   def ExpandNames(self):
2893     self._ExpandAndLockInstance()
2894     self.needed_locks[locking.LEVEL_NODE] = []
2895     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2896
2897   def DeclareLocks(self, level):
2898     if level == locking.LEVEL_NODE:
2899       self._LockInstancesNodes()
2900
2901   def BuildHooksEnv(self):
2902     """Build hooks env.
2903
2904     This runs on master, primary and secondary nodes of the instance.
2905
2906     """
2907     env = {
2908       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2909       }
2910     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2911     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2912     return env, nl, nl
2913
2914   def CheckPrereq(self):
2915     """Check prerequisites.
2916
2917     This checks that the instance is in the cluster.
2918
2919     """
2920     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2921     assert self.instance is not None, \
2922       "Cannot retrieve locked instance %s" % self.op.instance_name
2923
2924     bep = self.cfg.GetClusterInfo().FillBE(instance)
2925     if instance.disk_template not in constants.DTS_NET_MIRROR:
2926       raise errors.OpPrereqError("Instance's disk layout is not"
2927                                  " network mirrored, cannot failover.")
2928
2929     secondary_nodes = instance.secondary_nodes
2930     if not secondary_nodes:
2931       raise errors.ProgrammerError("no secondary node but using "
2932                                    "a mirrored disk template")
2933
2934     target_node = secondary_nodes[0]
2935     # check memory requirements on the secondary node
2936     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2937                          instance.name, bep[constants.BE_MEMORY],
2938                          instance.hypervisor)
2939
2940     # check bridge existance
2941     brlist = [nic.bridge for nic in instance.nics]
2942     if not self.rpc.call_bridges_exist(target_node, brlist):
2943       raise errors.OpPrereqError("One or more target bridges %s does not"
2944                                  " exist on destination node '%s'" %
2945                                  (brlist, target_node))
2946
2947   def Exec(self, feedback_fn):
2948     """Failover an instance.
2949
2950     The failover is done by shutting it down on its present node and
2951     starting it on the secondary.
2952
2953     """
2954     instance = self.instance
2955
2956     source_node = instance.primary_node
2957     target_node = instance.secondary_nodes[0]
2958
2959     feedback_fn("* checking disk consistency between source and target")
2960     for dev in instance.disks:
2961       # for drbd, these are drbd over lvm
2962       if not _CheckDiskConsistency(self, dev, target_node, False):
2963         if instance.status == "up" and not self.op.ignore_consistency:
2964           raise errors.OpExecError("Disk %s is degraded on target node,"
2965                                    " aborting failover." % dev.iv_name)
2966
2967     feedback_fn("* shutting down instance on source node")
2968     logging.info("Shutting down instance %s on node %s",
2969                  instance.name, source_node)
2970
2971     if not self.rpc.call_instance_shutdown(source_node, instance):
2972       if self.op.ignore_consistency:
2973         self.proc.LogWarning("Could not shutdown instance %s on node %s."
2974                              " Proceeding"
2975                              " anyway. Please make sure node %s is down",
2976                              instance.name, source_node, source_node)
2977       else:
2978         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2979                                  (instance.name, source_node))
2980
2981     feedback_fn("* deactivating the instance's disks on source node")
2982     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2983       raise errors.OpExecError("Can't shut down the instance's disks.")
2984
2985     instance.primary_node = target_node
2986     # distribute new instance config to the other nodes
2987     self.cfg.Update(instance)
2988
2989     # Only start the instance if it's marked as up
2990     if instance.status == "up":
2991       feedback_fn("* activating the instance's disks on target node")
2992       logging.info("Starting instance %s on node %s",
2993                    instance.name, target_node)
2994
2995       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2996                                                ignore_secondaries=True)
2997       if not disks_ok:
2998         _ShutdownInstanceDisks(self, instance)
2999         raise errors.OpExecError("Can't activate the instance's disks")
3000
3001       feedback_fn("* starting the instance on the target node")
3002       if not self.rpc.call_instance_start(target_node, instance, None):
3003         _ShutdownInstanceDisks(self, instance)
3004         raise errors.OpExecError("Could not start instance %s on node %s." %
3005                                  (instance.name, target_node))
3006
3007
3008 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3009   """Create a tree of block devices on the primary node.
3010
3011   This always creates all devices.
3012
3013   """
3014   if device.children:
3015     for child in device.children:
3016       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3017         return False
3018
3019   lu.cfg.SetDiskID(device, node)
3020   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3021                                        instance.name, True, info)
3022   if not new_id:
3023     return False
3024   if device.physical_id is None:
3025     device.physical_id = new_id
3026   return True
3027
3028
3029 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3030   """Create a tree of block devices on a secondary node.
3031
3032   If this device type has to be created on secondaries, create it and
3033   all its children.
3034
3035   If not, just recurse to children keeping the same 'force' value.
3036
3037   """
3038   if device.CreateOnSecondary():
3039     force = True
3040   if device.children:
3041     for child in device.children:
3042       if not _CreateBlockDevOnSecondary(lu, node, instance,
3043                                         child, force, info):
3044         return False
3045
3046   if not force:
3047     return True
3048   lu.cfg.SetDiskID(device, node)
3049   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3050                                        instance.name, False, info)
3051   if not new_id:
3052     return False
3053   if device.physical_id is None:
3054     device.physical_id = new_id
3055   return True
3056
3057
3058 def _GenerateUniqueNames(lu, exts):
3059   """Generate a suitable LV name.
3060
3061   This will generate a logical volume name for the given instance.
3062
3063   """
3064   results = []
3065   for val in exts:
3066     new_id = lu.cfg.GenerateUniqueID()
3067     results.append("%s%s" % (new_id, val))
3068   return results
3069
3070
3071 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3072                          p_minor, s_minor):
3073   """Generate a drbd8 device complete with its children.
3074
3075   """
3076   port = lu.cfg.AllocatePort()
3077   vgname = lu.cfg.GetVGName()
3078   shared_secret = lu.cfg.GenerateDRBDSecret()
3079   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3080                           logical_id=(vgname, names[0]))
3081   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3082                           logical_id=(vgname, names[1]))
3083   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3084                           logical_id=(primary, secondary, port,
3085                                       p_minor, s_minor,
3086                                       shared_secret),
3087                           children=[dev_data, dev_meta],
3088                           iv_name=iv_name)
3089   return drbd_dev
3090
3091
3092 def _GenerateDiskTemplate(lu, template_name,
3093                           instance_name, primary_node,
3094                           secondary_nodes, disk_sz, swap_sz,
3095                           file_storage_dir, file_driver):
3096   """Generate the entire disk layout for a given template type.
3097
3098   """
3099   #TODO: compute space requirements
3100
3101   vgname = lu.cfg.GetVGName()
3102   if template_name == constants.DT_DISKLESS:
3103     disks = []
3104   elif template_name == constants.DT_PLAIN:
3105     if len(secondary_nodes) != 0:
3106       raise errors.ProgrammerError("Wrong template configuration")
3107
3108     names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3109     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3110                            logical_id=(vgname, names[0]),
3111                            iv_name = "sda")
3112     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3113                            logical_id=(vgname, names[1]),
3114                            iv_name = "sdb")
3115     disks = [sda_dev, sdb_dev]
3116   elif template_name == constants.DT_DRBD8:
3117     if len(secondary_nodes) != 1:
3118       raise errors.ProgrammerError("Wrong template configuration")
3119     remote_node = secondary_nodes[0]
3120     (minor_pa, minor_pb,
3121      minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3122       [primary_node, primary_node, remote_node, remote_node], instance_name)
3123
3124     names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3125                                       ".sdb_data", ".sdb_meta"])
3126     drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3127                                         disk_sz, names[0:2], "sda",
3128                                         minor_pa, minor_sa)
3129     drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3130                                         swap_sz, names[2:4], "sdb",
3131                                         minor_pb, minor_sb)
3132     disks = [drbd_sda_dev, drbd_sdb_dev]
3133   elif template_name == constants.DT_FILE:
3134     if len(secondary_nodes) != 0:
3135       raise errors.ProgrammerError("Wrong template configuration")
3136
3137     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3138                                 iv_name="sda", logical_id=(file_driver,
3139                                 "%s/sda" % file_storage_dir))
3140     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3141                                 iv_name="sdb", logical_id=(file_driver,
3142                                 "%s/sdb" % file_storage_dir))
3143     disks = [file_sda_dev, file_sdb_dev]
3144   else:
3145     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3146   return disks
3147
3148
3149 def _GetInstanceInfoText(instance):
3150   """Compute that text that should be added to the disk's metadata.
3151
3152   """
3153   return "originstname+%s" % instance.name
3154
3155
3156 def _CreateDisks(lu, instance):
3157   """Create all disks for an instance.
3158
3159   This abstracts away some work from AddInstance.
3160
3161   Args:
3162     instance: the instance object
3163
3164   Returns:
3165     True or False showing the success of the creation process
3166
3167   """
3168   info = _GetInstanceInfoText(instance)
3169
3170   if instance.disk_template == constants.DT_FILE:
3171     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3172     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3173                                                  file_storage_dir)
3174
3175     if not result:
3176       logging.error("Could not connect to node '%s'", instance.primary_node)
3177       return False
3178
3179     if not result[0]:
3180       logging.error("Failed to create directory '%s'", file_storage_dir)
3181       return False
3182
3183   for device in instance.disks:
3184     logging.info("Creating volume %s for instance %s",
3185                  device.iv_name, instance.name)
3186     #HARDCODE
3187     for secondary_node in instance.secondary_nodes:
3188       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3189                                         device, False, info):
3190         logging.error("Failed to create volume %s (%s) on secondary node %s!",
3191                       device.iv_name, device, secondary_node)
3192         return False
3193     #HARDCODE
3194     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3195                                     instance, device, info):
3196       logging.error("Failed to create volume %s on primary!", device.iv_name)
3197       return False
3198
3199   return True
3200
3201
3202 def _RemoveDisks(lu, instance):
3203   """Remove all disks for an instance.
3204
3205   This abstracts away some work from `AddInstance()` and
3206   `RemoveInstance()`. Note that in case some of the devices couldn't
3207   be removed, the removal will continue with the other ones (compare
3208   with `_CreateDisks()`).
3209
3210   Args:
3211     instance: the instance object
3212
3213   Returns:
3214     True or False showing the success of the removal proces
3215
3216   """
3217   logging.info("Removing block devices for instance %s", instance.name)
3218
3219   result = True
3220   for device in instance.disks:
3221     for node, disk in device.ComputeNodeTree(instance.primary_node):
3222       lu.cfg.SetDiskID(disk, node)
3223       if not lu.rpc.call_blockdev_remove(node, disk):
3224         lu.proc.LogWarning("Could not remove block device %s on node %s,"
3225                            " continuing anyway", device.iv_name, node)
3226         result = False
3227
3228   if instance.disk_template == constants.DT_FILE:
3229     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3230     if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3231                                                file_storage_dir):
3232       logging.error("Could not remove directory '%s'", file_storage_dir)
3233       result = False
3234
3235   return result
3236
3237
3238 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3239   """Compute disk size requirements in the volume group
3240
3241   This is currently hard-coded for the two-drive layout.
3242
3243   """
3244   # Required free disk space as a function of disk and swap space
3245   req_size_dict = {
3246     constants.DT_DISKLESS: None,
3247     constants.DT_PLAIN: disk_size + swap_size,
3248     # 256 MB are added for drbd metadata, 128MB for each drbd device
3249     constants.DT_DRBD8: disk_size + swap_size + 256,
3250     constants.DT_FILE: None,
3251   }
3252
3253   if disk_template not in req_size_dict:
3254     raise errors.ProgrammerError("Disk template '%s' size requirement"
3255                                  " is unknown" %  disk_template)
3256
3257   return req_size_dict[disk_template]
3258
3259
3260 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3261   """Hypervisor parameter validation.
3262
3263   This function abstract the hypervisor parameter validation to be
3264   used in both instance create and instance modify.
3265
3266   @type lu: L{LogicalUnit}
3267   @param lu: the logical unit for which we check
3268   @type nodenames: list
3269   @param nodenames: the list of nodes on which we should check
3270   @type hvname: string
3271   @param hvname: the name of the hypervisor we should use
3272   @type hvparams: dict
3273   @param hvparams: the parameters which we need to check
3274   @raise errors.OpPrereqError: if the parameters are not valid
3275
3276   """
3277   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3278                                                   hvname,
3279                                                   hvparams)
3280   for node in nodenames:
3281     info = hvinfo.get(node, None)
3282     if not info or not isinstance(info, (tuple, list)):
3283       raise errors.OpPrereqError("Cannot get current information"
3284                                  " from node '%s' (%s)" % (node, info))
3285     if not info[0]:
3286       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3287                                  " %s" % info[1])
3288
3289
3290 class LUCreateInstance(LogicalUnit):
3291   """Create an instance.
3292
3293   """
3294   HPATH = "instance-add"
3295   HTYPE = constants.HTYPE_INSTANCE
3296   _OP_REQP = ["instance_name", "disk_size",
3297               "disk_template", "swap_size", "mode", "start",
3298               "wait_for_sync", "ip_check", "mac",
3299               "hvparams", "beparams"]
3300   REQ_BGL = False
3301
3302   def _ExpandNode(self, node):
3303     """Expands and checks one node name.
3304
3305     """
3306     node_full = self.cfg.ExpandNodeName(node)
3307     if node_full is None:
3308       raise errors.OpPrereqError("Unknown node %s" % node)
3309     return node_full
3310
3311   def ExpandNames(self):
3312     """ExpandNames for CreateInstance.
3313
3314     Figure out the right locks for instance creation.
3315
3316     """
3317     self.needed_locks = {}
3318
3319     # set optional parameters to none if they don't exist
3320     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3321       if not hasattr(self.op, attr):
3322         setattr(self.op, attr, None)
3323
3324     # cheap checks, mostly valid constants given
3325
3326     # verify creation mode
3327     if self.op.mode not in (constants.INSTANCE_CREATE,
3328                             constants.INSTANCE_IMPORT):
3329       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3330                                  self.op.mode)
3331
3332     # disk template and mirror node verification
3333     if self.op.disk_template not in constants.DISK_TEMPLATES:
3334       raise errors.OpPrereqError("Invalid disk template name")
3335
3336     if self.op.hypervisor is None:
3337       self.op.hypervisor = self.cfg.GetHypervisorType()
3338
3339     cluster = self.cfg.GetClusterInfo()
3340     enabled_hvs = cluster.enabled_hypervisors
3341     if self.op.hypervisor not in enabled_hvs:
3342       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3343                                  " cluster (%s)" % (self.op.hypervisor,
3344                                   ",".join(enabled_hvs)))
3345
3346     # check hypervisor parameter syntax (locally)
3347
3348     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3349                                   self.op.hvparams)
3350     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3351     hv_type.CheckParameterSyntax(filled_hvp)
3352
3353     # fill and remember the beparams dict
3354     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3355                                     self.op.beparams)
3356
3357     #### instance parameters check
3358
3359     # instance name verification
3360     hostname1 = utils.HostInfo(self.op.instance_name)
3361     self.op.instance_name = instance_name = hostname1.name
3362
3363     # this is just a preventive check, but someone might still add this
3364     # instance in the meantime, and creation will fail at lock-add time
3365     if instance_name in self.cfg.GetInstanceList():
3366       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3367                                  instance_name)
3368
3369     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3370
3371     # ip validity checks
3372     ip = getattr(self.op, "ip", None)
3373     if ip is None or ip.lower() == "none":
3374       inst_ip = None
3375     elif ip.lower() == constants.VALUE_AUTO:
3376       inst_ip = hostname1.ip
3377     else:
3378       if not utils.IsValidIP(ip):
3379         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3380                                    " like a valid IP" % ip)
3381       inst_ip = ip
3382     self.inst_ip = self.op.ip = inst_ip
3383     # used in CheckPrereq for ip ping check
3384     self.check_ip = hostname1.ip
3385
3386     # MAC address verification
3387     if self.op.mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3388       if not utils.IsValidMac(self.op.mac.lower()):
3389         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3390                                    self.op.mac)
3391
3392     # file storage checks
3393     if (self.op.file_driver and
3394         not self.op.file_driver in constants.FILE_DRIVER):
3395       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3396                                  self.op.file_driver)
3397
3398     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3399       raise errors.OpPrereqError("File storage directory path not absolute")
3400
3401     ### Node/iallocator related checks
3402     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3403       raise errors.OpPrereqError("One and only one of iallocator and primary"
3404                                  " node must be given")
3405
3406     if self.op.iallocator:
3407       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3408     else:
3409       self.op.pnode = self._ExpandNode(self.op.pnode)
3410       nodelist = [self.op.pnode]
3411       if self.op.snode is not None:
3412         self.op.snode = self._ExpandNode(self.op.snode)
3413         nodelist.append(self.op.snode)
3414       self.needed_locks[locking.LEVEL_NODE] = nodelist
3415
3416     # in case of import lock the source node too
3417     if self.op.mode == constants.INSTANCE_IMPORT:
3418       src_node = getattr(self.op, "src_node", None)
3419       src_path = getattr(self.op, "src_path", None)
3420
3421       if src_node is None or src_path is None:
3422         raise errors.OpPrereqError("Importing an instance requires source"
3423                                    " node and path options")
3424
3425       if not os.path.isabs(src_path):
3426         raise errors.OpPrereqError("The source path must be absolute")
3427
3428       self.op.src_node = src_node = self._ExpandNode(src_node)
3429       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3430         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3431
3432     else: # INSTANCE_CREATE
3433       if getattr(self.op, "os_type", None) is None:
3434         raise errors.OpPrereqError("No guest OS specified")
3435
3436   def _RunAllocator(self):
3437     """Run the allocator based on input opcode.
3438
3439     """
3440     disks = [{"size": self.op.disk_size, "mode": "w"},
3441              {"size": self.op.swap_size, "mode": "w"}]
3442     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3443              "bridge": self.op.bridge}]
3444     ial = IAllocator(self,
3445                      mode=constants.IALLOCATOR_MODE_ALLOC,
3446                      name=self.op.instance_name,
3447                      disk_template=self.op.disk_template,
3448                      tags=[],
3449                      os=self.op.os_type,
3450                      vcpus=self.be_full[constants.BE_VCPUS],
3451                      mem_size=self.be_full[constants.BE_MEMORY],
3452                      disks=disks,
3453                      nics=nics,
3454                      )
3455
3456     ial.Run(self.op.iallocator)
3457
3458     if not ial.success:
3459       raise errors.OpPrereqError("Can't compute nodes using"
3460                                  " iallocator '%s': %s" % (self.op.iallocator,
3461                                                            ial.info))
3462     if len(ial.nodes) != ial.required_nodes:
3463       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3464                                  " of nodes (%s), required %s" %
3465                                  (self.op.iallocator, len(ial.nodes),
3466                                   ial.required_nodes))
3467     self.op.pnode = ial.nodes[0]
3468     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3469                  self.op.instance_name, self.op.iallocator,
3470                  ", ".join(ial.nodes))
3471     if ial.required_nodes == 2:
3472       self.op.snode = ial.nodes[1]
3473
3474   def BuildHooksEnv(self):
3475     """Build hooks env.
3476
3477     This runs on master, primary and secondary nodes of the instance.
3478
3479     """
3480     env = {
3481       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3482       "INSTANCE_DISK_SIZE": self.op.disk_size,
3483       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3484       "INSTANCE_ADD_MODE": self.op.mode,
3485       }
3486     if self.op.mode == constants.INSTANCE_IMPORT:
3487       env["INSTANCE_SRC_NODE"] = self.op.src_node
3488       env["INSTANCE_SRC_PATH"] = self.op.src_path
3489       env["INSTANCE_SRC_IMAGES"] = self.src_images
3490
3491     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3492       primary_node=self.op.pnode,
3493       secondary_nodes=self.secondaries,
3494       status=self.instance_status,
3495       os_type=self.op.os_type,
3496       memory=self.be_full[constants.BE_MEMORY],
3497       vcpus=self.be_full[constants.BE_VCPUS],
3498       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3499     ))
3500
3501     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3502           self.secondaries)
3503     return env, nl, nl
3504
3505
3506   def CheckPrereq(self):
3507     """Check prerequisites.
3508
3509     """
3510     if (not self.cfg.GetVGName() and
3511         self.op.disk_template not in constants.DTS_NOT_LVM):
3512       raise errors.OpPrereqError("Cluster does not support lvm-based"
3513                                  " instances")
3514
3515
3516     if self.op.mode == constants.INSTANCE_IMPORT:
3517       src_node = self.op.src_node
3518       src_path = self.op.src_path
3519
3520       export_info = self.rpc.call_export_info(src_node, src_path)
3521
3522       if not export_info:
3523         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3524
3525       if not export_info.has_section(constants.INISECT_EXP):
3526         raise errors.ProgrammerError("Corrupted export config")
3527
3528       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3529       if (int(ei_version) != constants.EXPORT_VERSION):
3530         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3531                                    (ei_version, constants.EXPORT_VERSION))
3532
3533       # Check that the new instance doesn't have less disks than the export
3534       # TODO: substitute "2" with the actual number of disks requested
3535       instance_disks = 2
3536       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3537       if instance_disks < export_disks:
3538         raise errors.OpPrereqError("Not enough disks to import."
3539                                    " (instance: %d, export: %d)" %
3540                                    (2, export_disks))
3541
3542       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3543       disk_images = []
3544       for idx in range(export_disks):
3545         option = 'disk%d_dump' % idx
3546         if export_info.has_option(constants.INISECT_INS, option):
3547           # FIXME: are the old os-es, disk sizes, etc. useful?
3548           export_name = export_info.get(constants.INISECT_INS, option)
3549           image = os.path.join(src_path, export_name)
3550           disk_images.append(image)
3551         else:
3552           disk_images.append(False)
3553
3554       self.src_images = disk_images
3555
3556       if self.op.mac == constants.VALUE_AUTO:
3557         old_name = export_info.get(constants.INISECT_INS, 'name')
3558         if self.op.instance_name == old_name:
3559           # FIXME: adjust every nic, when we'll be able to create instances
3560           # with more than one
3561           if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
3562             self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
3563
3564     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3565
3566     if self.op.start and not self.op.ip_check:
3567       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3568                                  " adding an instance in start mode")
3569
3570     if self.op.ip_check:
3571       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3572         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3573                                    (self.check_ip, self.op.instance_name))
3574
3575     # bridge verification
3576     bridge = getattr(self.op, "bridge", None)
3577     if bridge is None:
3578       self.op.bridge = self.cfg.GetDefBridge()
3579     else:
3580       self.op.bridge = bridge
3581
3582     #### allocator run
3583
3584     if self.op.iallocator is not None:
3585       self._RunAllocator()
3586
3587     #### node related checks
3588
3589     # check primary node
3590     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3591     assert self.pnode is not None, \
3592       "Cannot retrieve locked node %s" % self.op.pnode
3593     self.secondaries = []
3594
3595     # mirror node verification
3596     if self.op.disk_template in constants.DTS_NET_MIRROR:
3597       if self.op.snode is None:
3598         raise errors.OpPrereqError("The networked disk templates need"
3599                                    " a mirror node")
3600       if self.op.snode == pnode.name:
3601         raise errors.OpPrereqError("The secondary node cannot be"
3602                                    " the primary node.")
3603       self.secondaries.append(self.op.snode)
3604
3605     nodenames = [pnode.name] + self.secondaries
3606
3607     req_size = _ComputeDiskSize(self.op.disk_template,
3608                                 self.op.disk_size, self.op.swap_size)
3609
3610     # Check lv size requirements
3611     if req_size is not None:
3612       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3613                                          self.op.hypervisor)
3614       for node in nodenames:
3615         info = nodeinfo.get(node, None)
3616         if not info:
3617           raise errors.OpPrereqError("Cannot get current information"
3618                                      " from node '%s'" % node)
3619         vg_free = info.get('vg_free', None)
3620         if not isinstance(vg_free, int):
3621           raise errors.OpPrereqError("Can't compute free disk space on"
3622                                      " node %s" % node)
3623         if req_size > info['vg_free']:
3624           raise errors.OpPrereqError("Not enough disk space on target node %s."
3625                                      " %d MB available, %d MB required" %
3626                                      (node, info['vg_free'], req_size))
3627
3628     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3629
3630     # os verification
3631     os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3632     if not os_obj:
3633       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3634                                  " primary node"  % self.op.os_type)
3635
3636     # bridge check on primary node
3637     if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3638       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3639                                  " destination node '%s'" %
3640                                  (self.op.bridge, pnode.name))
3641
3642     # memory check on primary node
3643     if self.op.start:
3644       _CheckNodeFreeMemory(self, self.pnode.name,
3645                            "creating instance %s" % self.op.instance_name,
3646                            self.be_full[constants.BE_MEMORY],
3647                            self.op.hypervisor)
3648
3649     if self.op.start:
3650       self.instance_status = 'up'
3651     else:
3652       self.instance_status = 'down'
3653
3654   def Exec(self, feedback_fn):
3655     """Create and add the instance to the cluster.
3656
3657     """
3658     instance = self.op.instance_name
3659     pnode_name = self.pnode.name
3660
3661     if self.op.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3662       mac_address = self.cfg.GenerateMAC()
3663     else:
3664       mac_address = self.op.mac
3665
3666     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3667     if self.inst_ip is not None:
3668       nic.ip = self.inst_ip
3669
3670     ht_kind = self.op.hypervisor
3671     if ht_kind in constants.HTS_REQ_PORT:
3672       network_port = self.cfg.AllocatePort()
3673     else:
3674       network_port = None
3675
3676     ##if self.op.vnc_bind_address is None:
3677     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3678
3679     # this is needed because os.path.join does not accept None arguments
3680     if self.op.file_storage_dir is None:
3681       string_file_storage_dir = ""
3682     else:
3683       string_file_storage_dir = self.op.file_storage_dir
3684
3685     # build the full file storage dir path
3686     file_storage_dir = os.path.normpath(os.path.join(
3687                                         self.cfg.GetFileStorageDir(),
3688                                         string_file_storage_dir, instance))
3689
3690
3691     disks = _GenerateDiskTemplate(self,
3692                                   self.op.disk_template,
3693                                   instance, pnode_name,
3694                                   self.secondaries, self.op.disk_size,
3695                                   self.op.swap_size,
3696                                   file_storage_dir,
3697                                   self.op.file_driver)
3698
3699     iobj = objects.Instance(name=instance, os=self.op.os_type,
3700                             primary_node=pnode_name,
3701                             nics=[nic], disks=disks,
3702                             disk_template=self.op.disk_template,
3703                             status=self.instance_status,
3704                             network_port=network_port,
3705                             beparams=self.op.beparams,
3706                             hvparams=self.op.hvparams,
3707                             hypervisor=self.op.hypervisor,
3708                             )
3709
3710     feedback_fn("* creating instance disks...")
3711     if not _CreateDisks(self, iobj):
3712       _RemoveDisks(self, iobj)
3713       self.cfg.ReleaseDRBDMinors(instance)
3714       raise errors.OpExecError("Device creation failed, reverting...")
3715
3716     feedback_fn("adding instance %s to cluster config" % instance)
3717
3718     self.cfg.AddInstance(iobj)
3719     # Declare that we don't want to remove the instance lock anymore, as we've
3720     # added the instance to the config
3721     del self.remove_locks[locking.LEVEL_INSTANCE]
3722     # Remove the temp. assignements for the instance's drbds
3723     self.cfg.ReleaseDRBDMinors(instance)
3724
3725     if self.op.wait_for_sync:
3726       disk_abort = not _WaitForSync(self, iobj)
3727     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3728       # make sure the disks are not degraded (still sync-ing is ok)
3729       time.sleep(15)
3730       feedback_fn("* checking mirrors status")
3731       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3732     else:
3733       disk_abort = False
3734
3735     if disk_abort:
3736       _RemoveDisks(self, iobj)
3737       self.cfg.RemoveInstance(iobj.name)
3738       # Make sure the instance lock gets removed
3739       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3740       raise errors.OpExecError("There are some degraded disks for"
3741                                " this instance")
3742
3743     feedback_fn("creating os for instance %s on node %s" %
3744                 (instance, pnode_name))
3745
3746     if iobj.disk_template != constants.DT_DISKLESS:
3747       if self.op.mode == constants.INSTANCE_CREATE:
3748         feedback_fn("* running the instance OS create scripts...")
3749         if not self.rpc.call_instance_os_add(pnode_name, iobj):
3750           raise errors.OpExecError("could not add os for instance %s"
3751                                    " on node %s" %
3752                                    (instance, pnode_name))
3753
3754       elif self.op.mode == constants.INSTANCE_IMPORT:
3755         feedback_fn("* running the instance OS import scripts...")
3756         src_node = self.op.src_node
3757         src_images = self.src_images
3758         cluster_name = self.cfg.GetClusterName()
3759         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3760                                                          src_node, src_images,
3761                                                          cluster_name)
3762         for idx, result in enumerate(import_result):
3763           if not result:
3764             self.LogWarning("Could not image %s for on instance %s, disk %d,"
3765                             " on node %s" % (src_images[idx], instance, idx,
3766                                              pnode_name))
3767       else:
3768         # also checked in the prereq part
3769         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3770                                      % self.op.mode)
3771
3772     if self.op.start:
3773       logging.info("Starting instance %s on node %s", instance, pnode_name)
3774       feedback_fn("* starting instance...")
3775       if not self.rpc.call_instance_start(pnode_name, iobj, None):
3776         raise errors.OpExecError("Could not start instance")
3777
3778
3779 class LUConnectConsole(NoHooksLU):
3780   """Connect to an instance's console.
3781
3782   This is somewhat special in that it returns the command line that
3783   you need to run on the master node in order to connect to the
3784   console.
3785
3786   """
3787   _OP_REQP = ["instance_name"]
3788   REQ_BGL = False
3789
3790   def ExpandNames(self):
3791     self._ExpandAndLockInstance()
3792
3793   def CheckPrereq(self):
3794     """Check prerequisites.
3795
3796     This checks that the instance is in the cluster.
3797
3798     """
3799     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3800     assert self.instance is not None, \
3801       "Cannot retrieve locked instance %s" % self.op.instance_name
3802
3803   def Exec(self, feedback_fn):
3804     """Connect to the console of an instance
3805
3806     """
3807     instance = self.instance
3808     node = instance.primary_node
3809
3810     node_insts = self.rpc.call_instance_list([node],
3811                                              [instance.hypervisor])[node]
3812     if node_insts is False:
3813       raise errors.OpExecError("Can't connect to node %s." % node)
3814
3815     if instance.name not in node_insts:
3816       raise errors.OpExecError("Instance %s is not running." % instance.name)
3817
3818     logging.debug("Connecting to console of %s on %s", instance.name, node)
3819
3820     hyper = hypervisor.GetHypervisor(instance.hypervisor)
3821     console_cmd = hyper.GetShellCommandForConsole(instance)
3822
3823     # build ssh cmdline
3824     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3825
3826
3827 class LUReplaceDisks(LogicalUnit):
3828   """Replace the disks of an instance.
3829
3830   """
3831   HPATH = "mirrors-replace"
3832   HTYPE = constants.HTYPE_INSTANCE
3833   _OP_REQP = ["instance_name", "mode", "disks"]
3834   REQ_BGL = False
3835
3836   def ExpandNames(self):
3837     self._ExpandAndLockInstance()
3838
3839     if not hasattr(self.op, "remote_node"):
3840       self.op.remote_node = None
3841
3842     ia_name = getattr(self.op, "iallocator", None)
3843     if ia_name is not None:
3844       if self.op.remote_node is not None:
3845         raise errors.OpPrereqError("Give either the iallocator or the new"
3846                                    " secondary, not both")
3847       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3848     elif self.op.remote_node is not None:
3849       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3850       if remote_node is None:
3851         raise errors.OpPrereqError("Node '%s' not known" %
3852                                    self.op.remote_node)
3853       self.op.remote_node = remote_node
3854       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3855       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3856     else:
3857       self.needed_locks[locking.LEVEL_NODE] = []
3858       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3859
3860   def DeclareLocks(self, level):
3861     # If we're not already locking all nodes in the set we have to declare the
3862     # instance's primary/secondary nodes.
3863     if (level == locking.LEVEL_NODE and
3864         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3865       self._LockInstancesNodes()
3866
3867   def _RunAllocator(self):
3868     """Compute a new secondary node using an IAllocator.
3869
3870     """
3871     ial = IAllocator(self,
3872                      mode=constants.IALLOCATOR_MODE_RELOC,
3873                      name=self.op.instance_name,
3874                      relocate_from=[self.sec_node])
3875
3876     ial.Run(self.op.iallocator)
3877
3878     if not ial.success:
3879       raise errors.OpPrereqError("Can't compute nodes using"
3880                                  " iallocator '%s': %s" % (self.op.iallocator,
3881                                                            ial.info))
3882     if len(ial.nodes) != ial.required_nodes:
3883       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3884                                  " of nodes (%s), required %s" %
3885                                  (len(ial.nodes), ial.required_nodes))
3886     self.op.remote_node = ial.nodes[0]
3887     self.LogInfo("Selected new secondary for the instance: %s",
3888                  self.op.remote_node)
3889
3890   def BuildHooksEnv(self):
3891     """Build hooks env.
3892
3893     This runs on the master, the primary and all the secondaries.
3894
3895     """
3896     env = {
3897       "MODE": self.op.mode,
3898       "NEW_SECONDARY": self.op.remote_node,
3899       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3900       }
3901     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3902     nl = [
3903       self.cfg.GetMasterNode(),
3904       self.instance.primary_node,
3905       ]
3906     if self.op.remote_node is not None:
3907       nl.append(self.op.remote_node)
3908     return env, nl, nl
3909
3910   def CheckPrereq(self):
3911     """Check prerequisites.
3912
3913     This checks that the instance is in the cluster.
3914
3915     """
3916     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3917     assert instance is not None, \
3918       "Cannot retrieve locked instance %s" % self.op.instance_name
3919     self.instance = instance
3920
3921     if instance.disk_template not in constants.DTS_NET_MIRROR:
3922       raise errors.OpPrereqError("Instance's disk layout is not"
3923                                  " network mirrored.")
3924
3925     if len(instance.secondary_nodes) != 1:
3926       raise errors.OpPrereqError("The instance has a strange layout,"
3927                                  " expected one secondary but found %d" %
3928                                  len(instance.secondary_nodes))
3929
3930     self.sec_node = instance.secondary_nodes[0]
3931
3932     ia_name = getattr(self.op, "iallocator", None)
3933     if ia_name is not None:
3934       self._RunAllocator()
3935
3936     remote_node = self.op.remote_node
3937     if remote_node is not None:
3938       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3939       assert self.remote_node_info is not None, \
3940         "Cannot retrieve locked node %s" % remote_node
3941     else:
3942       self.remote_node_info = None
3943     if remote_node == instance.primary_node:
3944       raise errors.OpPrereqError("The specified node is the primary node of"
3945                                  " the instance.")
3946     elif remote_node == self.sec_node:
3947       if self.op.mode == constants.REPLACE_DISK_SEC:
3948         # this is for DRBD8, where we can't execute the same mode of
3949         # replacement as for drbd7 (no different port allocated)
3950         raise errors.OpPrereqError("Same secondary given, cannot execute"
3951                                    " replacement")
3952     if instance.disk_template == constants.DT_DRBD8:
3953       if (self.op.mode == constants.REPLACE_DISK_ALL and
3954           remote_node is not None):
3955         # switch to replace secondary mode
3956         self.op.mode = constants.REPLACE_DISK_SEC
3957
3958       if self.op.mode == constants.REPLACE_DISK_ALL:
3959         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3960                                    " secondary disk replacement, not"
3961                                    " both at once")
3962       elif self.op.mode == constants.REPLACE_DISK_PRI:
3963         if remote_node is not None:
3964           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3965                                      " the secondary while doing a primary"
3966                                      " node disk replacement")
3967         self.tgt_node = instance.primary_node
3968         self.oth_node = instance.secondary_nodes[0]
3969       elif self.op.mode == constants.REPLACE_DISK_SEC:
3970         self.new_node = remote_node # this can be None, in which case
3971                                     # we don't change the secondary
3972         self.tgt_node = instance.secondary_nodes[0]
3973         self.oth_node = instance.primary_node
3974       else:
3975         raise errors.ProgrammerError("Unhandled disk replace mode")
3976
3977     for name in self.op.disks:
3978       if instance.FindDisk(name) is None:
3979         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3980                                    (name, instance.name))
3981
3982   def _ExecD8DiskOnly(self, feedback_fn):
3983     """Replace a disk on the primary or secondary for dbrd8.
3984
3985     The algorithm for replace is quite complicated:
3986       - for each disk to be replaced:
3987         - create new LVs on the target node with unique names
3988         - detach old LVs from the drbd device
3989         - rename old LVs to name_replaced.<time_t>
3990         - rename new LVs to old LVs
3991         - attach the new LVs (with the old names now) to the drbd device
3992       - wait for sync across all devices
3993       - for each modified disk:
3994         - remove old LVs (which have the name name_replaces.<time_t>)
3995
3996     Failures are not very well handled.
3997
3998     """
3999     steps_total = 6
4000     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4001     instance = self.instance
4002     iv_names = {}
4003     vgname = self.cfg.GetVGName()
4004     # start of work
4005     cfg = self.cfg
4006     tgt_node = self.tgt_node
4007     oth_node = self.oth_node
4008
4009     # Step: check device activation
4010     self.proc.LogStep(1, steps_total, "check device existence")
4011     info("checking volume groups")
4012     my_vg = cfg.GetVGName()
4013     results = self.rpc.call_vg_list([oth_node, tgt_node])
4014     if not results:
4015       raise errors.OpExecError("Can't list volume groups on the nodes")
4016     for node in oth_node, tgt_node:
4017       res = results.get(node, False)
4018       if not res or my_vg not in res:
4019         raise errors.OpExecError("Volume group '%s' not found on %s" %
4020                                  (my_vg, node))
4021     for dev in instance.disks:
4022       if not dev.iv_name in self.op.disks:
4023         continue
4024       for node in tgt_node, oth_node:
4025         info("checking %s on %s" % (dev.iv_name, node))
4026         cfg.SetDiskID(dev, node)
4027         if not self.rpc.call_blockdev_find(node, dev):
4028           raise errors.OpExecError("Can't find device %s on node %s" %
4029                                    (dev.iv_name, node))
4030
4031     # Step: check other node consistency
4032     self.proc.LogStep(2, steps_total, "check peer consistency")
4033     for dev in instance.disks:
4034       if not dev.iv_name in self.op.disks:
4035         continue
4036       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4037       if not _CheckDiskConsistency(self, dev, oth_node,
4038                                    oth_node==instance.primary_node):
4039         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4040                                  " to replace disks on this node (%s)" %
4041                                  (oth_node, tgt_node))
4042
4043     # Step: create new storage
4044     self.proc.LogStep(3, steps_total, "allocate new storage")
4045     for dev in instance.disks:
4046       if not dev.iv_name in self.op.disks:
4047         continue
4048       size = dev.size
4049       cfg.SetDiskID(dev, tgt_node)
4050       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4051       names = _GenerateUniqueNames(self, lv_names)
4052       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4053                              logical_id=(vgname, names[0]))
4054       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4055                              logical_id=(vgname, names[1]))
4056       new_lvs = [lv_data, lv_meta]
4057       old_lvs = dev.children
4058       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4059       info("creating new local storage on %s for %s" %
4060            (tgt_node, dev.iv_name))
4061       # since we *always* want to create this LV, we use the
4062       # _Create...OnPrimary (which forces the creation), even if we
4063       # are talking about the secondary node
4064       for new_lv in new_lvs:
4065         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4066                                         _GetInstanceInfoText(instance)):
4067           raise errors.OpExecError("Failed to create new LV named '%s' on"
4068                                    " node '%s'" %
4069                                    (new_lv.logical_id[1], tgt_node))
4070
4071     # Step: for each lv, detach+rename*2+attach
4072     self.proc.LogStep(4, steps_total, "change drbd configuration")
4073     for dev, old_lvs, new_lvs in iv_names.itervalues():
4074       info("detaching %s drbd from local storage" % dev.iv_name)
4075       if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4076         raise errors.OpExecError("Can't detach drbd from local storage on node"
4077                                  " %s for device %s" % (tgt_node, dev.iv_name))
4078       #dev.children = []
4079       #cfg.Update(instance)
4080
4081       # ok, we created the new LVs, so now we know we have the needed
4082       # storage; as such, we proceed on the target node to rename
4083       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4084       # using the assumption that logical_id == physical_id (which in
4085       # turn is the unique_id on that node)
4086
4087       # FIXME(iustin): use a better name for the replaced LVs
4088       temp_suffix = int(time.time())
4089       ren_fn = lambda d, suff: (d.physical_id[0],
4090                                 d.physical_id[1] + "_replaced-%s" % suff)
4091       # build the rename list based on what LVs exist on the node
4092       rlist = []
4093       for to_ren in old_lvs:
4094         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4095         if find_res is not None: # device exists
4096           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4097
4098       info("renaming the old LVs on the target node")
4099       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4100         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4101       # now we rename the new LVs to the old LVs
4102       info("renaming the new LVs on the target node")
4103       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4104       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4105         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4106
4107       for old, new in zip(old_lvs, new_lvs):
4108         new.logical_id = old.logical_id
4109         cfg.SetDiskID(new, tgt_node)
4110
4111       for disk in old_lvs:
4112         disk.logical_id = ren_fn(disk, temp_suffix)
4113         cfg.SetDiskID(disk, tgt_node)
4114
4115       # now that the new lvs have the old name, we can add them to the device
4116       info("adding new mirror component on %s" % tgt_node)
4117       if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4118         for new_lv in new_lvs:
4119           if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4120             warning("Can't rollback device %s", hint="manually cleanup unused"
4121                     " logical volumes")
4122         raise errors.OpExecError("Can't add local storage to drbd")
4123
4124       dev.children = new_lvs
4125       cfg.Update(instance)
4126
4127     # Step: wait for sync
4128
4129     # this can fail as the old devices are degraded and _WaitForSync
4130     # does a combined result over all disks, so we don't check its
4131     # return value
4132     self.proc.LogStep(5, steps_total, "sync devices")
4133     _WaitForSync(self, instance, unlock=True)
4134
4135     # so check manually all the devices
4136     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4137       cfg.SetDiskID(dev, instance.primary_node)
4138       is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4139       if is_degr:
4140         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4141
4142     # Step: remove old storage
4143     self.proc.LogStep(6, steps_total, "removing old storage")
4144     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4145       info("remove logical volumes for %s" % name)
4146       for lv in old_lvs:
4147         cfg.SetDiskID(lv, tgt_node)
4148         if not self.rpc.call_blockdev_remove(tgt_node, lv):
4149           warning("Can't remove old LV", hint="manually remove unused LVs")
4150           continue
4151
4152   def _ExecD8Secondary(self, feedback_fn):
4153     """Replace the secondary node for drbd8.
4154
4155     The algorithm for replace is quite complicated:
4156       - for all disks of the instance:
4157         - create new LVs on the new node with same names
4158         - shutdown the drbd device on the old secondary
4159         - disconnect the drbd network on the primary
4160         - create the drbd device on the new secondary
4161         - network attach the drbd on the primary, using an artifice:
4162           the drbd code for Attach() will connect to the network if it
4163           finds a device which is connected to the good local disks but
4164           not network enabled
4165       - wait for sync across all devices
4166       - remove all disks from the old secondary
4167
4168     Failures are not very well handled.
4169
4170     """
4171     steps_total = 6
4172     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4173     instance = self.instance
4174     iv_names = {}
4175     vgname = self.cfg.GetVGName()
4176     # start of work
4177     cfg = self.cfg
4178     old_node = self.tgt_node
4179     new_node = self.new_node
4180     pri_node = instance.primary_node
4181
4182     # Step: check device activation
4183     self.proc.LogStep(1, steps_total, "check device existence")
4184     info("checking volume groups")
4185     my_vg = cfg.GetVGName()
4186     results = self.rpc.call_vg_list([pri_node, new_node])
4187     if not results:
4188       raise errors.OpExecError("Can't list volume groups on the nodes")
4189     for node in pri_node, new_node:
4190       res = results.get(node, False)
4191       if not res or my_vg not in res:
4192         raise errors.OpExecError("Volume group '%s' not found on %s" %
4193                                  (my_vg, node))
4194     for dev in instance.disks:
4195       if not dev.iv_name in self.op.disks:
4196         continue
4197       info("checking %s on %s" % (dev.iv_name, pri_node))
4198       cfg.SetDiskID(dev, pri_node)
4199       if not self.rpc.call_blockdev_find(pri_node, dev):
4200         raise errors.OpExecError("Can't find device %s on node %s" %
4201                                  (dev.iv_name, pri_node))
4202
4203     # Step: check other node consistency
4204     self.proc.LogStep(2, steps_total, "check peer consistency")
4205     for dev in instance.disks:
4206       if not dev.iv_name in self.op.disks:
4207         continue
4208       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4209       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4210         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4211                                  " unsafe to replace the secondary" %
4212                                  pri_node)
4213
4214     # Step: create new storage
4215     self.proc.LogStep(3, steps_total, "allocate new storage")
4216     for dev in instance.disks:
4217       size = dev.size
4218       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4219       # since we *always* want to create this LV, we use the
4220       # _Create...OnPrimary (which forces the creation), even if we
4221       # are talking about the secondary node
4222       for new_lv in dev.children:
4223         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4224                                         _GetInstanceInfoText(instance)):
4225           raise errors.OpExecError("Failed to create new LV named '%s' on"
4226                                    " node '%s'" %
4227                                    (new_lv.logical_id[1], new_node))
4228
4229
4230     # Step 4: dbrd minors and drbd setups changes
4231     # after this, we must manually remove the drbd minors on both the
4232     # error and the success paths
4233     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4234                                    instance.name)
4235     logging.debug("Allocated minors %s" % (minors,))
4236     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4237     for dev, new_minor in zip(instance.disks, minors):
4238       size = dev.size
4239       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4240       # create new devices on new_node
4241       if pri_node == dev.logical_id[0]:
4242         new_logical_id = (pri_node, new_node,
4243                           dev.logical_id[2], dev.logical_id[3], new_minor,
4244                           dev.logical_id[5])
4245       else:
4246         new_logical_id = (new_node, pri_node,
4247                           dev.logical_id[2], new_minor, dev.logical_id[4],
4248                           dev.logical_id[5])
4249       iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4250       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4251                     new_logical_id)
4252       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4253                               logical_id=new_logical_id,
4254                               children=dev.children)
4255       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4256                                         new_drbd, False,
4257                                         _GetInstanceInfoText(instance)):
4258         self.cfg.ReleaseDRBDMinors(instance.name)
4259         raise errors.OpExecError("Failed to create new DRBD on"
4260                                  " node '%s'" % new_node)
4261
4262     for dev in instance.disks:
4263       # we have new devices, shutdown the drbd on the old secondary
4264       info("shutting down drbd for %s on old node" % dev.iv_name)
4265       cfg.SetDiskID(dev, old_node)
4266       if not self.rpc.call_blockdev_shutdown(old_node, dev):
4267         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4268                 hint="Please cleanup this device manually as soon as possible")
4269
4270     info("detaching primary drbds from the network (=> standalone)")
4271     done = 0
4272     for dev in instance.disks:
4273       cfg.SetDiskID(dev, pri_node)
4274       # set the network part of the physical (unique in bdev terms) id
4275       # to None, meaning detach from network
4276       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4277       # and 'find' the device, which will 'fix' it to match the
4278       # standalone state
4279       if self.rpc.call_blockdev_find(pri_node, dev):
4280         done += 1
4281       else:
4282         warning("Failed to detach drbd %s from network, unusual case" %
4283                 dev.iv_name)
4284
4285     if not done:
4286       # no detaches succeeded (very unlikely)
4287       self.cfg.ReleaseDRBDMinors(instance.name)
4288       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4289
4290     # if we managed to detach at least one, we update all the disks of
4291     # the instance to point to the new secondary
4292     info("updating instance configuration")
4293     for dev, _, new_logical_id in iv_names.itervalues():
4294       dev.logical_id = new_logical_id
4295       cfg.SetDiskID(dev, pri_node)
4296     cfg.Update(instance)
4297     # we can remove now the temp minors as now the new values are
4298     # written to the config file (and therefore stable)
4299     self.cfg.ReleaseDRBDMinors(instance.name)
4300
4301     # and now perform the drbd attach
4302     info("attaching primary drbds to new secondary (standalone => connected)")
4303     failures = []
4304     for dev in instance.disks:
4305       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4306       # since the attach is smart, it's enough to 'find' the device,
4307       # it will automatically activate the network, if the physical_id
4308       # is correct
4309       cfg.SetDiskID(dev, pri_node)
4310       logging.debug("Disk to attach: %s", dev)
4311       if not self.rpc.call_blockdev_find(pri_node, dev):
4312         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4313                 "please do a gnt-instance info to see the status of disks")
4314
4315     # this can fail as the old devices are degraded and _WaitForSync
4316     # does a combined result over all disks, so we don't check its
4317     # return value
4318     self.proc.LogStep(5, steps_total, "sync devices")
4319     _WaitForSync(self, instance, unlock=True)
4320
4321     # so check manually all the devices
4322     for name, (dev, old_lvs, _) in iv_names.iteritems():
4323       cfg.SetDiskID(dev, pri_node)
4324       is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4325       if is_degr:
4326         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4327
4328     self.proc.LogStep(6, steps_total, "removing old storage")
4329     for name, (dev, old_lvs, _) in iv_names.iteritems():
4330       info("remove logical volumes for %s" % name)
4331       for lv in old_lvs:
4332         cfg.SetDiskID(lv, old_node)
4333         if not self.rpc.call_blockdev_remove(old_node, lv):
4334           warning("Can't remove LV on old secondary",
4335                   hint="Cleanup stale volumes by hand")
4336
4337   def Exec(self, feedback_fn):
4338     """Execute disk replacement.
4339
4340     This dispatches the disk replacement to the appropriate handler.
4341
4342     """
4343     instance = self.instance
4344
4345     # Activate the instance disks if we're replacing them on a down instance
4346     if instance.status == "down":
4347       _StartInstanceDisks(self, instance, True)
4348
4349     if instance.disk_template == constants.DT_DRBD8:
4350       if self.op.remote_node is None:
4351         fn = self._ExecD8DiskOnly
4352       else:
4353         fn = self._ExecD8Secondary
4354     else:
4355       raise errors.ProgrammerError("Unhandled disk replacement case")
4356
4357     ret = fn(feedback_fn)
4358
4359     # Deactivate the instance disks if we're replacing them on a down instance
4360     if instance.status == "down":
4361       _SafeShutdownInstanceDisks(self, instance)
4362
4363     return ret
4364
4365
4366 class LUGrowDisk(LogicalUnit):
4367   """Grow a disk of an instance.
4368
4369   """
4370   HPATH = "disk-grow"
4371   HTYPE = constants.HTYPE_INSTANCE
4372   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4373   REQ_BGL = False
4374
4375   def ExpandNames(self):
4376     self._ExpandAndLockInstance()
4377     self.needed_locks[locking.LEVEL_NODE] = []
4378     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4379
4380   def DeclareLocks(self, level):
4381     if level == locking.LEVEL_NODE:
4382       self._LockInstancesNodes()
4383
4384   def BuildHooksEnv(self):
4385     """Build hooks env.
4386
4387     This runs on the master, the primary and all the secondaries.
4388
4389     """
4390     env = {
4391       "DISK": self.op.disk,
4392       "AMOUNT": self.op.amount,
4393       }
4394     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4395     nl = [
4396       self.cfg.GetMasterNode(),
4397       self.instance.primary_node,
4398       ]
4399     return env, nl, nl
4400
4401   def CheckPrereq(self):
4402     """Check prerequisites.
4403
4404     This checks that the instance is in the cluster.
4405
4406     """
4407     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4408     assert instance is not None, \
4409       "Cannot retrieve locked instance %s" % self.op.instance_name
4410
4411     self.instance = instance
4412
4413     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4414       raise errors.OpPrereqError("Instance's disk layout does not support"
4415                                  " growing.")
4416
4417     if instance.FindDisk(self.op.disk) is None:
4418       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4419                                  (self.op.disk, instance.name))
4420
4421     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4422     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4423                                        instance.hypervisor)
4424     for node in nodenames:
4425       info = nodeinfo.get(node, None)
4426       if not info:
4427         raise errors.OpPrereqError("Cannot get current information"
4428                                    " from node '%s'" % node)
4429       vg_free = info.get('vg_free', None)
4430       if not isinstance(vg_free, int):
4431         raise errors.OpPrereqError("Can't compute free disk space on"
4432                                    " node %s" % node)
4433       if self.op.amount > info['vg_free']:
4434         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4435                                    " %d MiB available, %d MiB required" %
4436                                    (node, info['vg_free'], self.op.amount))
4437
4438   def Exec(self, feedback_fn):
4439     """Execute disk grow.
4440
4441     """
4442     instance = self.instance
4443     disk = instance.FindDisk(self.op.disk)
4444     for node in (instance.secondary_nodes + (instance.primary_node,)):
4445       self.cfg.SetDiskID(disk, node)
4446       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4447       if (not result or not isinstance(result, (list, tuple)) or
4448           len(result) != 2):
4449         raise errors.OpExecError("grow request failed to node %s" % node)
4450       elif not result[0]:
4451         raise errors.OpExecError("grow request failed to node %s: %s" %
4452                                  (node, result[1]))
4453     disk.RecordGrow(self.op.amount)
4454     self.cfg.Update(instance)
4455     if self.op.wait_for_sync:
4456       disk_abort = not _WaitForSync(self, instance)
4457       if disk_abort:
4458         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4459                              " status.\nPlease check the instance.")
4460
4461
4462 class LUQueryInstanceData(NoHooksLU):
4463   """Query runtime instance data.
4464
4465   """
4466   _OP_REQP = ["instances", "static"]
4467   REQ_BGL = False
4468
4469   def ExpandNames(self):
4470     self.needed_locks = {}
4471     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4472
4473     if not isinstance(self.op.instances, list):
4474       raise errors.OpPrereqError("Invalid argument type 'instances'")
4475
4476     if self.op.instances:
4477       self.wanted_names = []
4478       for name in self.op.instances:
4479         full_name = self.cfg.ExpandInstanceName(name)
4480         if full_name is None:
4481           raise errors.OpPrereqError("Instance '%s' not known" %
4482                                      self.op.instance_name)
4483         self.wanted_names.append(full_name)
4484       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4485     else:
4486       self.wanted_names = None
4487       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4488
4489     self.needed_locks[locking.LEVEL_NODE] = []
4490     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4491
4492   def DeclareLocks(self, level):
4493     if level == locking.LEVEL_NODE:
4494       self._LockInstancesNodes()
4495
4496   def CheckPrereq(self):
4497     """Check prerequisites.
4498
4499     This only checks the optional instance list against the existing names.
4500
4501     """
4502     if self.wanted_names is None:
4503       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4504
4505     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4506                              in self.wanted_names]
4507     return
4508
4509   def _ComputeDiskStatus(self, instance, snode, dev):
4510     """Compute block device status.
4511
4512     """
4513     static = self.op.static
4514     if not static:
4515       self.cfg.SetDiskID(dev, instance.primary_node)
4516       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4517     else:
4518       dev_pstatus = None
4519
4520     if dev.dev_type in constants.LDS_DRBD:
4521       # we change the snode then (otherwise we use the one passed in)
4522       if dev.logical_id[0] == instance.primary_node:
4523         snode = dev.logical_id[1]
4524       else:
4525         snode = dev.logical_id[0]
4526
4527     if snode and not static:
4528       self.cfg.SetDiskID(dev, snode)
4529       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4530     else:
4531       dev_sstatus = None
4532
4533     if dev.children:
4534       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4535                       for child in dev.children]
4536     else:
4537       dev_children = []
4538
4539     data = {
4540       "iv_name": dev.iv_name,
4541       "dev_type": dev.dev_type,
4542       "logical_id": dev.logical_id,
4543       "physical_id": dev.physical_id,
4544       "pstatus": dev_pstatus,
4545       "sstatus": dev_sstatus,
4546       "children": dev_children,
4547       }
4548
4549     return data
4550
4551   def Exec(self, feedback_fn):
4552     """Gather and return data"""
4553     result = {}
4554
4555     cluster = self.cfg.GetClusterInfo()
4556
4557     for instance in self.wanted_instances:
4558       if not self.op.static:
4559         remote_info = self.rpc.call_instance_info(instance.primary_node,
4560                                                   instance.name,
4561                                                   instance.hypervisor)
4562         if remote_info and "state" in remote_info:
4563           remote_state = "up"
4564         else:
4565           remote_state = "down"
4566       else:
4567         remote_state = None
4568       if instance.status == "down":
4569         config_state = "down"
4570       else:
4571         config_state = "up"
4572
4573       disks = [self._ComputeDiskStatus(instance, None, device)
4574                for device in instance.disks]
4575
4576       idict = {
4577         "name": instance.name,
4578         "config_state": config_state,
4579         "run_state": remote_state,
4580         "pnode": instance.primary_node,
4581         "snodes": instance.secondary_nodes,
4582         "os": instance.os,
4583         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4584         "disks": disks,
4585         "hypervisor": instance.hypervisor,
4586         "network_port": instance.network_port,
4587         "hv_instance": instance.hvparams,
4588         "hv_actual": cluster.FillHV(instance),
4589         "be_instance": instance.beparams,
4590         "be_actual": cluster.FillBE(instance),
4591         }
4592
4593       result[instance.name] = idict
4594
4595     return result
4596
4597
4598 class LUSetInstanceParams(LogicalUnit):
4599   """Modifies an instances's parameters.
4600
4601   """
4602   HPATH = "instance-modify"
4603   HTYPE = constants.HTYPE_INSTANCE
4604   _OP_REQP = ["instance_name", "hvparams"]
4605   REQ_BGL = False
4606
4607   def ExpandNames(self):
4608     self._ExpandAndLockInstance()
4609     self.needed_locks[locking.LEVEL_NODE] = []
4610     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4611
4612
4613   def DeclareLocks(self, level):
4614     if level == locking.LEVEL_NODE:
4615       self._LockInstancesNodes()
4616
4617   def BuildHooksEnv(self):
4618     """Build hooks env.
4619
4620     This runs on the master, primary and secondaries.
4621
4622     """
4623     args = dict()
4624     if constants.BE_MEMORY in self.be_new:
4625       args['memory'] = self.be_new[constants.BE_MEMORY]
4626     if constants.BE_VCPUS in self.be_new:
4627       args['vcpus'] = self.be_new[constants.BE_VCPUS]
4628     if self.do_ip or self.do_bridge or self.mac:
4629       if self.do_ip:
4630         ip = self.ip
4631       else:
4632         ip = self.instance.nics[0].ip
4633       if self.bridge:
4634         bridge = self.bridge
4635       else:
4636         bridge = self.instance.nics[0].bridge
4637       if self.mac:
4638         mac = self.mac
4639       else:
4640         mac = self.instance.nics[0].mac
4641       args['nics'] = [(ip, bridge, mac)]
4642     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4643     nl = [self.cfg.GetMasterNode(),
4644           self.instance.primary_node] + list(self.instance.secondary_nodes)
4645     return env, nl, nl
4646
4647   def CheckPrereq(self):
4648     """Check prerequisites.
4649
4650     This only checks the instance list against the existing names.
4651
4652     """
4653     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4654     # a separate CheckArguments function, if we implement one, so the operation
4655     # can be aborted without waiting for any lock, should it have an error...
4656     self.ip = getattr(self.op, "ip", None)
4657     self.mac = getattr(self.op, "mac", None)
4658     self.bridge = getattr(self.op, "bridge", None)
4659     self.kernel_path = getattr(self.op, "kernel_path", None)
4660     self.initrd_path = getattr(self.op, "initrd_path", None)
4661     self.force = getattr(self.op, "force", None)
4662     all_parms = [self.ip, self.bridge, self.mac]
4663     if (all_parms.count(None) == len(all_parms) and
4664         not self.op.hvparams and
4665         not self.op.beparams):
4666       raise errors.OpPrereqError("No changes submitted")
4667     for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4668       val = self.op.beparams.get(item, None)
4669       if val is not None:
4670         try:
4671           val = int(val)
4672         except ValueError, err:
4673           raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4674         self.op.beparams[item] = val
4675     if self.ip is not None:
4676       self.do_ip = True
4677       if self.ip.lower() == "none":
4678         self.ip = None
4679       else:
4680         if not utils.IsValidIP(self.ip):
4681           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4682     else:
4683       self.do_ip = False
4684     self.do_bridge = (self.bridge is not None)
4685     if self.mac is not None:
4686       if self.cfg.IsMacInUse(self.mac):
4687         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4688                                    self.mac)
4689       if not utils.IsValidMac(self.mac):
4690         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4691
4692     # checking the new params on the primary/secondary nodes
4693
4694     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4695     assert self.instance is not None, \
4696       "Cannot retrieve locked instance %s" % self.op.instance_name
4697     pnode = self.instance.primary_node
4698     nodelist = [pnode]
4699     nodelist.extend(instance.secondary_nodes)
4700
4701     # hvparams processing
4702     if self.op.hvparams:
4703       i_hvdict = copy.deepcopy(instance.hvparams)
4704       for key, val in self.op.hvparams.iteritems():
4705         if val is None:
4706           try:
4707             del i_hvdict[key]
4708           except KeyError:
4709             pass
4710         else:
4711           i_hvdict[key] = val
4712       cluster = self.cfg.GetClusterInfo()
4713       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4714                                 i_hvdict)
4715       # local check
4716       hypervisor.GetHypervisor(
4717         instance.hypervisor).CheckParameterSyntax(hv_new)
4718       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4719       self.hv_new = hv_new # the new actual values
4720       self.hv_inst = i_hvdict # the new dict (without defaults)
4721     else:
4722       self.hv_new = self.hv_inst = {}
4723
4724     # beparams processing
4725     if self.op.beparams:
4726       i_bedict = copy.deepcopy(instance.beparams)
4727       for key, val in self.op.beparams.iteritems():
4728         if val is None:
4729           try:
4730             del i_bedict[key]
4731           except KeyError:
4732             pass
4733         else:
4734           i_bedict[key] = val
4735       cluster = self.cfg.GetClusterInfo()
4736       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4737                                 i_bedict)
4738       self.be_new = be_new # the new actual values
4739       self.be_inst = i_bedict # the new dict (without defaults)
4740     else:
4741       self.hv_new = self.hv_inst = {}
4742
4743     self.warn = []
4744
4745     if constants.BE_MEMORY in self.op.beparams and not self.force:
4746       mem_check_list = [pnode]
4747       if be_new[constants.BE_AUTO_BALANCE]:
4748         # either we changed auto_balance to yes or it was from before
4749         mem_check_list.extend(instance.secondary_nodes)
4750       instance_info = self.rpc.call_instance_info(pnode, instance.name,
4751                                                   instance.hypervisor)
4752       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4753                                          instance.hypervisor)
4754
4755       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4756         # Assume the primary node is unreachable and go ahead
4757         self.warn.append("Can't get info from primary node %s" % pnode)
4758       else:
4759         if instance_info:
4760           current_mem = instance_info['memory']
4761         else:
4762           # Assume instance not running
4763           # (there is a slight race condition here, but it's not very probable,
4764           # and we have no other way to check)
4765           current_mem = 0
4766         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4767                     nodeinfo[pnode]['memory_free'])
4768         if miss_mem > 0:
4769           raise errors.OpPrereqError("This change will prevent the instance"
4770                                      " from starting, due to %d MB of memory"
4771                                      " missing on its primary node" % miss_mem)
4772
4773       if be_new[constants.BE_AUTO_BALANCE]:
4774         for node in instance.secondary_nodes:
4775           if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4776             self.warn.append("Can't get info from secondary node %s" % node)
4777           elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4778             self.warn.append("Not enough memory to failover instance to"
4779                              " secondary node %s" % node)
4780
4781     return
4782
4783   def Exec(self, feedback_fn):
4784     """Modifies an instance.
4785
4786     All parameters take effect only at the next restart of the instance.
4787     """
4788     # Process here the warnings from CheckPrereq, as we don't have a
4789     # feedback_fn there.
4790     for warn in self.warn:
4791       feedback_fn("WARNING: %s" % warn)
4792
4793     result = []
4794     instance = self.instance
4795     if self.do_ip:
4796       instance.nics[0].ip = self.ip
4797       result.append(("ip", self.ip))
4798     if self.bridge:
4799       instance.nics[0].bridge = self.bridge
4800       result.append(("bridge", self.bridge))
4801     if self.mac:
4802       instance.nics[0].mac = self.mac
4803       result.append(("mac", self.mac))
4804     if self.op.hvparams:
4805       instance.hvparams = self.hv_new
4806       for key, val in self.op.hvparams.iteritems():
4807         result.append(("hv/%s" % key, val))
4808     if self.op.beparams:
4809       instance.beparams = self.be_inst
4810       for key, val in self.op.beparams.iteritems():
4811         result.append(("be/%s" % key, val))
4812
4813     self.cfg.Update(instance)
4814
4815     return result
4816
4817
4818 class LUQueryExports(NoHooksLU):
4819   """Query the exports list
4820
4821   """
4822   _OP_REQP = ['nodes']
4823   REQ_BGL = False
4824
4825   def ExpandNames(self):
4826     self.needed_locks = {}
4827     self.share_locks[locking.LEVEL_NODE] = 1
4828     if not self.op.nodes:
4829       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4830     else:
4831       self.needed_locks[locking.LEVEL_NODE] = \
4832         _GetWantedNodes(self, self.op.nodes)
4833
4834   def CheckPrereq(self):
4835     """Check prerequisites.
4836
4837     """
4838     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4839
4840   def Exec(self, feedback_fn):
4841     """Compute the list of all the exported system images.
4842
4843     Returns:
4844       a dictionary with the structure node->(export-list)
4845       where export-list is a list of the instances exported on
4846       that node.
4847
4848     """
4849     return self.rpc.call_export_list(self.nodes)
4850
4851
4852 class LUExportInstance(LogicalUnit):
4853   """Export an instance to an image in the cluster.
4854
4855   """
4856   HPATH = "instance-export"
4857   HTYPE = constants.HTYPE_INSTANCE
4858   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4859   REQ_BGL = False
4860
4861   def ExpandNames(self):
4862     self._ExpandAndLockInstance()
4863     # FIXME: lock only instance primary and destination node
4864     #
4865     # Sad but true, for now we have do lock all nodes, as we don't know where
4866     # the previous export might be, and and in this LU we search for it and
4867     # remove it from its current node. In the future we could fix this by:
4868     #  - making a tasklet to search (share-lock all), then create the new one,
4869     #    then one to remove, after
4870     #  - removing the removal operation altoghether
4871     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4872
4873   def DeclareLocks(self, level):
4874     """Last minute lock declaration."""
4875     # All nodes are locked anyway, so nothing to do here.
4876
4877   def BuildHooksEnv(self):
4878     """Build hooks env.
4879
4880     This will run on the master, primary node and target node.
4881
4882     """
4883     env = {
4884       "EXPORT_NODE": self.op.target_node,
4885       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4886       }
4887     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4888     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4889           self.op.target_node]
4890     return env, nl, nl
4891
4892   def CheckPrereq(self):
4893     """Check prerequisites.
4894
4895     This checks that the instance and node names are valid.
4896
4897     """
4898     instance_name = self.op.instance_name
4899     self.instance = self.cfg.GetInstanceInfo(instance_name)
4900     assert self.instance is not None, \
4901           "Cannot retrieve locked instance %s" % self.op.instance_name
4902
4903     self.dst_node = self.cfg.GetNodeInfo(
4904       self.cfg.ExpandNodeName(self.op.target_node))
4905
4906     assert self.dst_node is not None, \
4907           "Cannot retrieve locked node %s" % self.op.target_node
4908
4909     # instance disk type verification
4910     for disk in self.instance.disks:
4911       if disk.dev_type == constants.LD_FILE:
4912         raise errors.OpPrereqError("Export not supported for instances with"
4913                                    " file-based disks")
4914
4915   def Exec(self, feedback_fn):
4916     """Export an instance to an image in the cluster.
4917
4918     """
4919     instance = self.instance
4920     dst_node = self.dst_node
4921     src_node = instance.primary_node
4922     if self.op.shutdown:
4923       # shutdown the instance, but not the disks
4924       if not self.rpc.call_instance_shutdown(src_node, instance):
4925         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4926                                  (instance.name, src_node))
4927
4928     vgname = self.cfg.GetVGName()
4929
4930     snap_disks = []
4931
4932     try:
4933       for disk in instance.disks:
4934         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4935         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4936
4937         if not new_dev_name:
4938           self.LogWarning("Could not snapshot block device %s on node %s",
4939                           disk.logical_id[1], src_node)
4940           snap_disks.append(False)
4941         else:
4942           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4943                                  logical_id=(vgname, new_dev_name),
4944                                  physical_id=(vgname, new_dev_name),
4945                                  iv_name=disk.iv_name)
4946           snap_disks.append(new_dev)
4947
4948     finally:
4949       if self.op.shutdown and instance.status == "up":
4950         if not self.rpc.call_instance_start(src_node, instance, None):
4951           _ShutdownInstanceDisks(self, instance)
4952           raise errors.OpExecError("Could not start instance")
4953
4954     # TODO: check for size
4955
4956     cluster_name = self.cfg.GetClusterName()
4957     for idx, dev in enumerate(snap_disks):
4958       if dev:
4959         if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4960                                              instance, cluster_name, idx):
4961           self.LogWarning("Could not export block device %s from node %s to"
4962                           " node %s", dev.logical_id[1], src_node,
4963                           dst_node.name)
4964         if not self.rpc.call_blockdev_remove(src_node, dev):
4965           self.LogWarning("Could not remove snapshot block device %s from node"
4966                           " %s", dev.logical_id[1], src_node)
4967
4968     if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4969       self.LogWarning("Could not finalize export for instance %s on node %s",
4970                       instance.name, dst_node.name)
4971
4972     nodelist = self.cfg.GetNodeList()
4973     nodelist.remove(dst_node.name)
4974
4975     # on one-node clusters nodelist will be empty after the removal
4976     # if we proceed the backup would be removed because OpQueryExports
4977     # substitutes an empty list with the full cluster node list.
4978     if nodelist:
4979       exportlist = self.rpc.call_export_list(nodelist)
4980       for node in exportlist:
4981         if instance.name in exportlist[node]:
4982           if not self.rpc.call_export_remove(node, instance.name):
4983             self.LogWarning("Could not remove older export for instance %s"
4984                             " on node %s", instance.name, node)
4985
4986
4987 class LURemoveExport(NoHooksLU):
4988   """Remove exports related to the named instance.
4989
4990   """
4991   _OP_REQP = ["instance_name"]
4992   REQ_BGL = False
4993
4994   def ExpandNames(self):
4995     self.needed_locks = {}
4996     # We need all nodes to be locked in order for RemoveExport to work, but we
4997     # don't need to lock the instance itself, as nothing will happen to it (and
4998     # we can remove exports also for a removed instance)
4999     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5000
5001   def CheckPrereq(self):
5002     """Check prerequisites.
5003     """
5004     pass
5005
5006   def Exec(self, feedback_fn):
5007     """Remove any export.
5008
5009     """
5010     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5011     # If the instance was not found we'll try with the name that was passed in.
5012     # This will only work if it was an FQDN, though.
5013     fqdn_warn = False
5014     if not instance_name:
5015       fqdn_warn = True
5016       instance_name = self.op.instance_name
5017
5018     exportlist = self.rpc.call_export_list(self.acquired_locks[
5019       locking.LEVEL_NODE])
5020     found = False
5021     for node in exportlist:
5022       if instance_name in exportlist[node]:
5023         found = True
5024         if not self.rpc.call_export_remove(node, instance_name):
5025           logging.error("Could not remove export for instance %s"
5026                         " on node %s", instance_name, node)
5027
5028     if fqdn_warn and not found:
5029       feedback_fn("Export not found. If trying to remove an export belonging"
5030                   " to a deleted instance please use its Fully Qualified"
5031                   " Domain Name.")
5032
5033
5034 class TagsLU(NoHooksLU):
5035   """Generic tags LU.
5036
5037   This is an abstract class which is the parent of all the other tags LUs.
5038
5039   """
5040
5041   def ExpandNames(self):
5042     self.needed_locks = {}
5043     if self.op.kind == constants.TAG_NODE:
5044       name = self.cfg.ExpandNodeName(self.op.name)
5045       if name is None:
5046         raise errors.OpPrereqError("Invalid node name (%s)" %
5047                                    (self.op.name,))
5048       self.op.name = name
5049       self.needed_locks[locking.LEVEL_NODE] = name
5050     elif self.op.kind == constants.TAG_INSTANCE:
5051       name = self.cfg.ExpandInstanceName(self.op.name)
5052       if name is None:
5053         raise errors.OpPrereqError("Invalid instance name (%s)" %
5054                                    (self.op.name,))
5055       self.op.name = name
5056       self.needed_locks[locking.LEVEL_INSTANCE] = name
5057
5058   def CheckPrereq(self):
5059     """Check prerequisites.
5060
5061     """
5062     if self.op.kind == constants.TAG_CLUSTER:
5063       self.target = self.cfg.GetClusterInfo()
5064     elif self.op.kind == constants.TAG_NODE:
5065       self.target = self.cfg.GetNodeInfo(self.op.name)
5066     elif self.op.kind == constants.TAG_INSTANCE:
5067       self.target = self.cfg.GetInstanceInfo(self.op.name)
5068     else:
5069       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5070                                  str(self.op.kind))
5071
5072
5073 class LUGetTags(TagsLU):
5074   """Returns the tags of a given object.
5075
5076   """
5077   _OP_REQP = ["kind", "name"]
5078   REQ_BGL = False
5079
5080   def Exec(self, feedback_fn):
5081     """Returns the tag list.
5082
5083     """
5084     return list(self.target.GetTags())
5085
5086
5087 class LUSearchTags(NoHooksLU):
5088   """Searches the tags for a given pattern.
5089
5090   """
5091   _OP_REQP = ["pattern"]
5092   REQ_BGL = False
5093
5094   def ExpandNames(self):
5095     self.needed_locks = {}
5096
5097   def CheckPrereq(self):
5098     """Check prerequisites.
5099
5100     This checks the pattern passed for validity by compiling it.
5101
5102     """
5103     try:
5104       self.re = re.compile(self.op.pattern)
5105     except re.error, err:
5106       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5107                                  (self.op.pattern, err))
5108
5109   def Exec(self, feedback_fn):
5110     """Returns the tag list.
5111
5112     """
5113     cfg = self.cfg
5114     tgts = [("/cluster", cfg.GetClusterInfo())]
5115     ilist = cfg.GetAllInstancesInfo().values()
5116     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5117     nlist = cfg.GetAllNodesInfo().values()
5118     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5119     results = []
5120     for path, target in tgts:
5121       for tag in target.GetTags():
5122         if self.re.search(tag):
5123           results.append((path, tag))
5124     return results
5125
5126
5127 class LUAddTags(TagsLU):
5128   """Sets a tag on a given object.
5129
5130   """
5131   _OP_REQP = ["kind", "name", "tags"]
5132   REQ_BGL = False
5133
5134   def CheckPrereq(self):
5135     """Check prerequisites.
5136
5137     This checks the type and length of the tag name and value.
5138
5139     """
5140     TagsLU.CheckPrereq(self)
5141     for tag in self.op.tags:
5142       objects.TaggableObject.ValidateTag(tag)
5143
5144   def Exec(self, feedback_fn):
5145     """Sets the tag.
5146
5147     """
5148     try:
5149       for tag in self.op.tags:
5150         self.target.AddTag(tag)
5151     except errors.TagError, err:
5152       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5153     try:
5154       self.cfg.Update(self.target)
5155     except errors.ConfigurationError:
5156       raise errors.OpRetryError("There has been a modification to the"
5157                                 " config file and the operation has been"
5158                                 " aborted. Please retry.")
5159
5160
5161 class LUDelTags(TagsLU):
5162   """Delete a list of tags from a given object.
5163
5164   """
5165   _OP_REQP = ["kind", "name", "tags"]
5166   REQ_BGL = False
5167
5168   def CheckPrereq(self):
5169     """Check prerequisites.
5170
5171     This checks that we have the given tag.
5172
5173     """
5174     TagsLU.CheckPrereq(self)
5175     for tag in self.op.tags:
5176       objects.TaggableObject.ValidateTag(tag)
5177     del_tags = frozenset(self.op.tags)
5178     cur_tags = self.target.GetTags()
5179     if not del_tags <= cur_tags:
5180       diff_tags = del_tags - cur_tags
5181       diff_names = ["'%s'" % tag for tag in diff_tags]
5182       diff_names.sort()
5183       raise errors.OpPrereqError("Tag(s) %s not found" %
5184                                  (",".join(diff_names)))
5185
5186   def Exec(self, feedback_fn):
5187     """Remove the tag from the object.
5188
5189     """
5190     for tag in self.op.tags:
5191       self.target.RemoveTag(tag)
5192     try:
5193       self.cfg.Update(self.target)
5194     except errors.ConfigurationError:
5195       raise errors.OpRetryError("There has been a modification to the"
5196                                 " config file and the operation has been"
5197                                 " aborted. Please retry.")
5198
5199
5200 class LUTestDelay(NoHooksLU):
5201   """Sleep for a specified amount of time.
5202
5203   This LU sleeps on the master and/or nodes for a specified amount of
5204   time.
5205
5206   """
5207   _OP_REQP = ["duration", "on_master", "on_nodes"]
5208   REQ_BGL = False
5209
5210   def ExpandNames(self):
5211     """Expand names and set required locks.
5212
5213     This expands the node list, if any.
5214
5215     """
5216     self.needed_locks = {}
5217     if self.op.on_nodes:
5218       # _GetWantedNodes can be used here, but is not always appropriate to use
5219       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5220       # more information.
5221       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5222       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5223
5224   def CheckPrereq(self):
5225     """Check prerequisites.
5226
5227     """
5228
5229   def Exec(self, feedback_fn):
5230     """Do the actual sleep.
5231
5232     """
5233     if self.op.on_master:
5234       if not utils.TestDelay(self.op.duration):
5235         raise errors.OpExecError("Error during master delay test")
5236     if self.op.on_nodes:
5237       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5238       if not result:
5239         raise errors.OpExecError("Complete failure from rpc call")
5240       for node, node_result in result.items():
5241         if not node_result:
5242           raise errors.OpExecError("Failure during rpc call to node %s,"
5243                                    " result: %s" % (node, node_result))
5244
5245
5246 class IAllocator(object):
5247   """IAllocator framework.
5248
5249   An IAllocator instance has three sets of attributes:
5250     - cfg that is needed to query the cluster
5251     - input data (all members of the _KEYS class attribute are required)
5252     - four buffer attributes (in|out_data|text), that represent the
5253       input (to the external script) in text and data structure format,
5254       and the output from it, again in two formats
5255     - the result variables from the script (success, info, nodes) for
5256       easy usage
5257
5258   """
5259   _ALLO_KEYS = [
5260     "mem_size", "disks", "disk_template",
5261     "os", "tags", "nics", "vcpus",
5262     ]
5263   _RELO_KEYS = [
5264     "relocate_from",
5265     ]
5266
5267   def __init__(self, lu, mode, name, **kwargs):
5268     self.lu = lu
5269     # init buffer variables
5270     self.in_text = self.out_text = self.in_data = self.out_data = None
5271     # init all input fields so that pylint is happy
5272     self.mode = mode
5273     self.name = name
5274     self.mem_size = self.disks = self.disk_template = None
5275     self.os = self.tags = self.nics = self.vcpus = None
5276     self.relocate_from = None
5277     # computed fields
5278     self.required_nodes = None
5279     # init result fields
5280     self.success = self.info = self.nodes = None
5281     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5282       keyset = self._ALLO_KEYS
5283     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5284       keyset = self._RELO_KEYS
5285     else:
5286       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5287                                    " IAllocator" % self.mode)
5288     for key in kwargs:
5289       if key not in keyset:
5290         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5291                                      " IAllocator" % key)
5292       setattr(self, key, kwargs[key])
5293     for key in keyset:
5294       if key not in kwargs:
5295         raise errors.ProgrammerError("Missing input parameter '%s' to"
5296                                      " IAllocator" % key)
5297     self._BuildInputData()
5298
5299   def _ComputeClusterData(self):
5300     """Compute the generic allocator input data.
5301
5302     This is the data that is independent of the actual operation.
5303
5304     """
5305     cfg = self.lu.cfg
5306     cluster_info = cfg.GetClusterInfo()
5307     # cluster data
5308     data = {
5309       "version": 1,
5310       "cluster_name": cfg.GetClusterName(),
5311       "cluster_tags": list(cluster_info.GetTags()),
5312       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5313       # we don't have job IDs
5314       }
5315
5316     i_list = []
5317     cluster = self.cfg.GetClusterInfo()
5318     for iname in cfg.GetInstanceList():
5319       i_obj = cfg.GetInstanceInfo(iname)
5320       i_list.append((i_obj, cluster.FillBE(i_obj)))
5321
5322     # node data
5323     node_results = {}
5324     node_list = cfg.GetNodeList()
5325     # FIXME: here we have only one hypervisor information, but
5326     # instance can belong to different hypervisors
5327     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5328                                            cfg.GetHypervisorType())
5329     for nname in node_list:
5330       ninfo = cfg.GetNodeInfo(nname)
5331       if nname not in node_data or not isinstance(node_data[nname], dict):
5332         raise errors.OpExecError("Can't get data for node %s" % nname)
5333       remote_info = node_data[nname]
5334       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5335                    'vg_size', 'vg_free', 'cpu_total']:
5336         if attr not in remote_info:
5337           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5338                                    (nname, attr))
5339         try:
5340           remote_info[attr] = int(remote_info[attr])
5341         except ValueError, err:
5342           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5343                                    " %s" % (nname, attr, str(err)))
5344       # compute memory used by primary instances
5345       i_p_mem = i_p_up_mem = 0
5346       for iinfo, beinfo in i_list:
5347         if iinfo.primary_node == nname:
5348           i_p_mem += beinfo[constants.BE_MEMORY]
5349           if iinfo.status == "up":
5350             i_p_up_mem += beinfo[constants.BE_MEMORY]
5351
5352       # compute memory used by instances
5353       pnr = {
5354         "tags": list(ninfo.GetTags()),
5355         "total_memory": remote_info['memory_total'],
5356         "reserved_memory": remote_info['memory_dom0'],
5357         "free_memory": remote_info['memory_free'],
5358         "i_pri_memory": i_p_mem,
5359         "i_pri_up_memory": i_p_up_mem,
5360         "total_disk": remote_info['vg_size'],
5361         "free_disk": remote_info['vg_free'],
5362         "primary_ip": ninfo.primary_ip,
5363         "secondary_ip": ninfo.secondary_ip,
5364         "total_cpus": remote_info['cpu_total'],
5365         }
5366       node_results[nname] = pnr
5367     data["nodes"] = node_results
5368
5369     # instance data
5370     instance_data = {}
5371     for iinfo, beinfo in i_list:
5372       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5373                   for n in iinfo.nics]
5374       pir = {
5375         "tags": list(iinfo.GetTags()),
5376         "should_run": iinfo.status == "up",
5377         "vcpus": beinfo[constants.BE_VCPUS],
5378         "memory": beinfo[constants.BE_MEMORY],
5379         "os": iinfo.os,
5380         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5381         "nics": nic_data,
5382         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5383         "disk_template": iinfo.disk_template,
5384         "hypervisor": iinfo.hypervisor,
5385         }
5386       instance_data[iinfo.name] = pir
5387
5388     data["instances"] = instance_data
5389
5390     self.in_data = data
5391
5392   def _AddNewInstance(self):
5393     """Add new instance data to allocator structure.
5394
5395     This in combination with _AllocatorGetClusterData will create the
5396     correct structure needed as input for the allocator.
5397
5398     The checks for the completeness of the opcode must have already been
5399     done.
5400
5401     """
5402     data = self.in_data
5403     if len(self.disks) != 2:
5404       raise errors.OpExecError("Only two-disk configurations supported")
5405
5406     disk_space = _ComputeDiskSize(self.disk_template,
5407                                   self.disks[0]["size"], self.disks[1]["size"])
5408
5409     if self.disk_template in constants.DTS_NET_MIRROR:
5410       self.required_nodes = 2
5411     else:
5412       self.required_nodes = 1
5413     request = {
5414       "type": "allocate",
5415       "name": self.name,
5416       "disk_template": self.disk_template,
5417       "tags": self.tags,
5418       "os": self.os,
5419       "vcpus": self.vcpus,
5420       "memory": self.mem_size,
5421       "disks": self.disks,
5422       "disk_space_total": disk_space,
5423       "nics": self.nics,
5424       "required_nodes": self.required_nodes,
5425       }
5426     data["request"] = request
5427
5428   def _AddRelocateInstance(self):
5429     """Add relocate instance data to allocator structure.
5430
5431     This in combination with _IAllocatorGetClusterData will create the
5432     correct structure needed as input for the allocator.
5433
5434     The checks for the completeness of the opcode must have already been
5435     done.
5436
5437     """
5438     instance = self.lu.cfg.GetInstanceInfo(self.name)
5439     if instance is None:
5440       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5441                                    " IAllocator" % self.name)
5442
5443     if instance.disk_template not in constants.DTS_NET_MIRROR:
5444       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5445
5446     if len(instance.secondary_nodes) != 1:
5447       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5448
5449     self.required_nodes = 1
5450
5451     disk_space = _ComputeDiskSize(instance.disk_template,
5452                                   instance.disks[0].size,
5453                                   instance.disks[1].size)
5454
5455     request = {
5456       "type": "relocate",
5457       "name": self.name,
5458       "disk_space_total": disk_space,
5459       "required_nodes": self.required_nodes,
5460       "relocate_from": self.relocate_from,
5461       }
5462     self.in_data["request"] = request
5463
5464   def _BuildInputData(self):
5465     """Build input data structures.
5466
5467     """
5468     self._ComputeClusterData()
5469
5470     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5471       self._AddNewInstance()
5472     else:
5473       self._AddRelocateInstance()
5474
5475     self.in_text = serializer.Dump(self.in_data)
5476
5477   def Run(self, name, validate=True, call_fn=None):
5478     """Run an instance allocator and return the results.
5479
5480     """
5481     if call_fn is None:
5482       call_fn = self.lu.rpc.call_iallocator_runner
5483     data = self.in_text
5484
5485     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5486
5487     if not isinstance(result, (list, tuple)) or len(result) != 4:
5488       raise errors.OpExecError("Invalid result from master iallocator runner")
5489
5490     rcode, stdout, stderr, fail = result
5491
5492     if rcode == constants.IARUN_NOTFOUND:
5493       raise errors.OpExecError("Can't find allocator '%s'" % name)
5494     elif rcode == constants.IARUN_FAILURE:
5495       raise errors.OpExecError("Instance allocator call failed: %s,"
5496                                " output: %s" % (fail, stdout+stderr))
5497     self.out_text = stdout
5498     if validate:
5499       self._ValidateResult()
5500
5501   def _ValidateResult(self):
5502     """Process the allocator results.
5503
5504     This will process and if successful save the result in
5505     self.out_data and the other parameters.
5506
5507     """
5508     try:
5509       rdict = serializer.Load(self.out_text)
5510     except Exception, err:
5511       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5512
5513     if not isinstance(rdict, dict):
5514       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5515
5516     for key in "success", "info", "nodes":
5517       if key not in rdict:
5518         raise errors.OpExecError("Can't parse iallocator results:"
5519                                  " missing key '%s'" % key)
5520       setattr(self, key, rdict[key])
5521
5522     if not isinstance(rdict["nodes"], list):
5523       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5524                                " is not a list")
5525     self.out_data = rdict
5526
5527
5528 class LUTestAllocator(NoHooksLU):
5529   """Run allocator tests.
5530
5531   This LU runs the allocator tests
5532
5533   """
5534   _OP_REQP = ["direction", "mode", "name"]
5535
5536   def CheckPrereq(self):
5537     """Check prerequisites.
5538
5539     This checks the opcode parameters depending on the director and mode test.
5540
5541     """
5542     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5543       for attr in ["name", "mem_size", "disks", "disk_template",
5544                    "os", "tags", "nics", "vcpus"]:
5545         if not hasattr(self.op, attr):
5546           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5547                                      attr)
5548       iname = self.cfg.ExpandInstanceName(self.op.name)
5549       if iname is not None:
5550         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5551                                    iname)
5552       if not isinstance(self.op.nics, list):
5553         raise errors.OpPrereqError("Invalid parameter 'nics'")
5554       for row in self.op.nics:
5555         if (not isinstance(row, dict) or
5556             "mac" not in row or
5557             "ip" not in row or
5558             "bridge" not in row):
5559           raise errors.OpPrereqError("Invalid contents of the"
5560                                      " 'nics' parameter")
5561       if not isinstance(self.op.disks, list):
5562         raise errors.OpPrereqError("Invalid parameter 'disks'")
5563       if len(self.op.disks) != 2:
5564         raise errors.OpPrereqError("Only two-disk configurations supported")
5565       for row in self.op.disks:
5566         if (not isinstance(row, dict) or
5567             "size" not in row or
5568             not isinstance(row["size"], int) or
5569             "mode" not in row or
5570             row["mode"] not in ['r', 'w']):
5571           raise errors.OpPrereqError("Invalid contents of the"
5572                                      " 'disks' parameter")
5573     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5574       if not hasattr(self.op, "name"):
5575         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5576       fname = self.cfg.ExpandInstanceName(self.op.name)
5577       if fname is None:
5578         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5579                                    self.op.name)
5580       self.op.name = fname
5581       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5582     else:
5583       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5584                                  self.op.mode)
5585
5586     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5587       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5588         raise errors.OpPrereqError("Missing allocator name")
5589     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5590       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5591                                  self.op.direction)
5592
5593   def Exec(self, feedback_fn):
5594     """Run the allocator test.
5595
5596     """
5597     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5598       ial = IAllocator(self,
5599                        mode=self.op.mode,
5600                        name=self.op.name,
5601                        mem_size=self.op.mem_size,
5602                        disks=self.op.disks,
5603                        disk_template=self.op.disk_template,
5604                        os=self.op.os,
5605                        tags=self.op.tags,
5606                        nics=self.op.nics,
5607                        vcpus=self.op.vcpus,
5608                        )
5609     else:
5610       ial = IAllocator(self,
5611                        mode=self.op.mode,
5612                        name=self.op.name,
5613                        relocate_from=list(self.relocate_from),
5614                        )
5615
5616     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5617       result = ial.in_text
5618     else:
5619       ial.Run(self.op.allocator, validate=False)
5620       result = ial.out_text
5621     return result