Convert cmdlib.py to _FieldSet
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import itertools
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_MASTER: the LU needs to run on the master node
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_MASTER = True
68   REQ_BGL = True
69
70   def __init__(self, processor, op, context, rpc):
71     """Constructor for LogicalUnit.
72
73     This needs to be overriden in derived classes in order to check op
74     validity.
75
76     """
77     self.proc = processor
78     self.op = op
79     self.cfg = context.cfg
80     self.context = context
81     self.rpc = rpc
82     # Dicts used to declare locking needs to mcpu
83     self.needed_locks = None
84     self.acquired_locks = {}
85     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86     self.add_locks = {}
87     self.remove_locks = {}
88     # Used to force good behavior when calling helper functions
89     self.recalculate_locks = {}
90     self.__ssh = None
91     # logging
92     self.LogWarning = processor.LogWarning
93     self.LogInfo = processor.LogInfo
94
95     for attr_name in self._OP_REQP:
96       attr_val = getattr(op, attr_name, None)
97       if attr_val is None:
98         raise errors.OpPrereqError("Required parameter '%s' missing" %
99                                    attr_name)
100
101     if not self.cfg.IsCluster():
102       raise errors.OpPrereqError("Cluster not initialized yet,"
103                                  " use 'gnt-cluster init' first.")
104     if self.REQ_MASTER:
105       master = self.cfg.GetMasterNode()
106       if master != utils.HostInfo().name:
107         raise errors.OpPrereqError("Commands must be run on the master"
108                                    " node %s" % master)
109
110   def __GetSSH(self):
111     """Returns the SshRunner object
112
113     """
114     if not self.__ssh:
115       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
116     return self.__ssh
117
118   ssh = property(fget=__GetSSH)
119
120   def ExpandNames(self):
121     """Expand names for this LU.
122
123     This method is called before starting to execute the opcode, and it should
124     update all the parameters of the opcode to their canonical form (e.g. a
125     short node name must be fully expanded after this method has successfully
126     completed). This way locking, hooks, logging, ecc. can work correctly.
127
128     LUs which implement this method must also populate the self.needed_locks
129     member, as a dict with lock levels as keys, and a list of needed lock names
130     as values. Rules:
131       - Use an empty dict if you don't need any lock
132       - If you don't need any lock at a particular level omit that level
133       - Don't put anything for the BGL level
134       - If you want all locks at a level use locking.ALL_SET as a value
135
136     If you need to share locks (rather than acquire them exclusively) at one
137     level you can modify self.share_locks, setting a true value (usually 1) for
138     that level. By default locks are not shared.
139
140     Examples:
141     # Acquire all nodes and one instance
142     self.needed_locks = {
143       locking.LEVEL_NODE: locking.ALL_SET,
144       locking.LEVEL_INSTANCE: ['instance1.example.tld'],
145     }
146     # Acquire just two nodes
147     self.needed_locks = {
148       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
149     }
150     # Acquire no locks
151     self.needed_locks = {} # No, you can't leave it to the default value None
152
153     """
154     # The implementation of this method is mandatory only if the new LU is
155     # concurrent, so that old LUs don't need to be changed all at the same
156     # time.
157     if self.REQ_BGL:
158       self.needed_locks = {} # Exclusive LUs don't need locks.
159     else:
160       raise NotImplementedError
161
162   def DeclareLocks(self, level):
163     """Declare LU locking needs for a level
164
165     While most LUs can just declare their locking needs at ExpandNames time,
166     sometimes there's the need to calculate some locks after having acquired
167     the ones before. This function is called just before acquiring locks at a
168     particular level, but after acquiring the ones at lower levels, and permits
169     such calculations. It can be used to modify self.needed_locks, and by
170     default it does nothing.
171
172     This function is only called if you have something already set in
173     self.needed_locks for the level.
174
175     @param level: Locking level which is going to be locked
176     @type level: member of ganeti.locking.LEVELS
177
178     """
179
180   def CheckPrereq(self):
181     """Check prerequisites for this LU.
182
183     This method should check that the prerequisites for the execution
184     of this LU are fulfilled. It can do internode communication, but
185     it should be idempotent - no cluster or system changes are
186     allowed.
187
188     The method should raise errors.OpPrereqError in case something is
189     not fulfilled. Its return value is ignored.
190
191     This method should also update all the parameters of the opcode to
192     their canonical form if it hasn't been done by ExpandNames before.
193
194     """
195     raise NotImplementedError
196
197   def Exec(self, feedback_fn):
198     """Execute the LU.
199
200     This method should implement the actual work. It should raise
201     errors.OpExecError for failures that are somewhat dealt with in
202     code, or expected.
203
204     """
205     raise NotImplementedError
206
207   def BuildHooksEnv(self):
208     """Build hooks environment for this LU.
209
210     This method should return a three-node tuple consisting of: a dict
211     containing the environment that will be used for running the
212     specific hook for this LU, a list of node names on which the hook
213     should run before the execution, and a list of node names on which
214     the hook should run after the execution.
215
216     The keys of the dict must not have 'GANETI_' prefixed as this will
217     be handled in the hooks runner. Also note additional keys will be
218     added by the hooks runner. If the LU doesn't define any
219     environment, an empty dict (and not None) should be returned.
220
221     No nodes should be returned as an empty list (and not None).
222
223     Note that if the HPATH for a LU class is None, this function will
224     not be called.
225
226     """
227     raise NotImplementedError
228
229   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
230     """Notify the LU about the results of its hooks.
231
232     This method is called every time a hooks phase is executed, and notifies
233     the Logical Unit about the hooks' result. The LU can then use it to alter
234     its result based on the hooks.  By default the method does nothing and the
235     previous result is passed back unchanged but any LU can define it if it
236     wants to use the local cluster hook-scripts somehow.
237
238     Args:
239       phase: the hooks phase that has just been run
240       hooks_results: the results of the multi-node hooks rpc call
241       feedback_fn: function to send feedback back to the caller
242       lu_result: the previous result this LU had, or None in the PRE phase.
243
244     """
245     return lu_result
246
247   def _ExpandAndLockInstance(self):
248     """Helper function to expand and lock an instance.
249
250     Many LUs that work on an instance take its name in self.op.instance_name
251     and need to expand it and then declare the expanded name for locking. This
252     function does it, and then updates self.op.instance_name to the expanded
253     name. It also initializes needed_locks as a dict, if this hasn't been done
254     before.
255
256     """
257     if self.needed_locks is None:
258       self.needed_locks = {}
259     else:
260       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
261         "_ExpandAndLockInstance called with instance-level locks set"
262     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
263     if expanded_name is None:
264       raise errors.OpPrereqError("Instance '%s' not known" %
265                                   self.op.instance_name)
266     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
267     self.op.instance_name = expanded_name
268
269   def _LockInstancesNodes(self, primary_only=False):
270     """Helper function to declare instances' nodes for locking.
271
272     This function should be called after locking one or more instances to lock
273     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
274     with all primary or secondary nodes for instances already locked and
275     present in self.needed_locks[locking.LEVEL_INSTANCE].
276
277     It should be called from DeclareLocks, and for safety only works if
278     self.recalculate_locks[locking.LEVEL_NODE] is set.
279
280     In the future it may grow parameters to just lock some instance's nodes, or
281     to just lock primaries or secondary nodes, if needed.
282
283     If should be called in DeclareLocks in a way similar to:
284
285     if level == locking.LEVEL_NODE:
286       self._LockInstancesNodes()
287
288     @type primary_only: boolean
289     @param primary_only: only lock primary nodes of locked instances
290
291     """
292     assert locking.LEVEL_NODE in self.recalculate_locks, \
293       "_LockInstancesNodes helper function called with no nodes to recalculate"
294
295     # TODO: check if we're really been called with the instance locks held
296
297     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
298     # future we might want to have different behaviors depending on the value
299     # of self.recalculate_locks[locking.LEVEL_NODE]
300     wanted_nodes = []
301     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
302       instance = self.context.cfg.GetInstanceInfo(instance_name)
303       wanted_nodes.append(instance.primary_node)
304       if not primary_only:
305         wanted_nodes.extend(instance.secondary_nodes)
306
307     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
308       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
309     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
310       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
311
312     del self.recalculate_locks[locking.LEVEL_NODE]
313
314
315 class NoHooksLU(LogicalUnit):
316   """Simple LU which runs no hooks.
317
318   This LU is intended as a parent for other LogicalUnits which will
319   run no hooks, in order to reduce duplicate code.
320
321   """
322   HPATH = None
323   HTYPE = None
324
325
326 class _FieldSet(object):
327   """A simple field set.
328
329   Among the features are:
330     - checking if a string is among a list of static string or regex objects
331     - checking if a whole list of string matches
332     - returning the matching groups from a regex match
333
334   Internally, all fields are held as regular expression objects.
335
336   """
337   def __init__(self, *items):
338     self.items = [re.compile("^%s$" % value) for value in items]
339
340   def Extend(self, other_set):
341     """Extend the field set with the items from another one"""
342     self.items.extend(other_set.items)
343
344   def Matches(self, field):
345     """Checks if a field matches the current set
346
347     @type field: str
348     @param field: the string to match
349     @return: either False or a regular expression match object
350
351     """
352     for m in itertools.ifilter(None, (val.match(field) for val in self.items)):
353       return m
354     return False
355
356   def NonMatching(self, items):
357     """Returns the list of fields not matching the current set
358
359     @type items: list
360     @param items: the list of fields to check
361     @rtype: list
362     @return: list of non-matching fields
363
364     """
365     return [val for val in items if not self.Matches(val)]
366
367
368 def _GetWantedNodes(lu, nodes):
369   """Returns list of checked and expanded node names.
370
371   Args:
372     nodes: List of nodes (strings) or None for all
373
374   """
375   if not isinstance(nodes, list):
376     raise errors.OpPrereqError("Invalid argument type 'nodes'")
377
378   if not nodes:
379     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
380       " non-empty list of nodes whose name is to be expanded.")
381
382   wanted = []
383   for name in nodes:
384     node = lu.cfg.ExpandNodeName(name)
385     if node is None:
386       raise errors.OpPrereqError("No such node name '%s'" % name)
387     wanted.append(node)
388
389   return utils.NiceSort(wanted)
390
391
392 def _GetWantedInstances(lu, instances):
393   """Returns list of checked and expanded instance names.
394
395   Args:
396     instances: List of instances (strings) or None for all
397
398   """
399   if not isinstance(instances, list):
400     raise errors.OpPrereqError("Invalid argument type 'instances'")
401
402   if instances:
403     wanted = []
404
405     for name in instances:
406       instance = lu.cfg.ExpandInstanceName(name)
407       if instance is None:
408         raise errors.OpPrereqError("No such instance name '%s'" % name)
409       wanted.append(instance)
410
411   else:
412     wanted = lu.cfg.GetInstanceList()
413   return utils.NiceSort(wanted)
414
415
416 def _CheckOutputFields(static, dynamic, selected):
417   """Checks whether all selected fields are valid.
418
419   @type static: L{_FieldSet}
420   @param static: static fields set
421   @type dynamic: L{_FieldSet}
422   @param dynamic: dynamic fields set
423
424   """
425   f = _FieldSet()
426   f.Extend(static)
427   f.Extend(dynamic)
428
429   delta = f.NonMatching(selected)
430   if delta:
431     raise errors.OpPrereqError("Unknown output fields selected: %s"
432                                % ",".join(delta))
433
434
435 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
436                           memory, vcpus, nics):
437   """Builds instance related env variables for hooks from single variables.
438
439   Args:
440     secondary_nodes: List of secondary nodes as strings
441   """
442   env = {
443     "OP_TARGET": name,
444     "INSTANCE_NAME": name,
445     "INSTANCE_PRIMARY": primary_node,
446     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
447     "INSTANCE_OS_TYPE": os_type,
448     "INSTANCE_STATUS": status,
449     "INSTANCE_MEMORY": memory,
450     "INSTANCE_VCPUS": vcpus,
451   }
452
453   if nics:
454     nic_count = len(nics)
455     for idx, (ip, bridge, mac) in enumerate(nics):
456       if ip is None:
457         ip = ""
458       env["INSTANCE_NIC%d_IP" % idx] = ip
459       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
460       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
461   else:
462     nic_count = 0
463
464   env["INSTANCE_NIC_COUNT"] = nic_count
465
466   return env
467
468
469 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
470   """Builds instance related env variables for hooks from an object.
471
472   Args:
473     instance: objects.Instance object of instance
474     override: dict of values to override
475   """
476   bep = lu.cfg.GetClusterInfo().FillBE(instance)
477   args = {
478     'name': instance.name,
479     'primary_node': instance.primary_node,
480     'secondary_nodes': instance.secondary_nodes,
481     'os_type': instance.os,
482     'status': instance.os,
483     'memory': bep[constants.BE_MEMORY],
484     'vcpus': bep[constants.BE_VCPUS],
485     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
486   }
487   if override:
488     args.update(override)
489   return _BuildInstanceHookEnv(**args)
490
491
492 def _CheckInstanceBridgesExist(lu, instance):
493   """Check that the brigdes needed by an instance exist.
494
495   """
496   # check bridges existance
497   brlist = [nic.bridge for nic in instance.nics]
498   if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
499     raise errors.OpPrereqError("one or more target bridges %s does not"
500                                " exist on destination node '%s'" %
501                                (brlist, instance.primary_node))
502
503
504 class LUDestroyCluster(NoHooksLU):
505   """Logical unit for destroying the cluster.
506
507   """
508   _OP_REQP = []
509
510   def CheckPrereq(self):
511     """Check prerequisites.
512
513     This checks whether the cluster is empty.
514
515     Any errors are signalled by raising errors.OpPrereqError.
516
517     """
518     master = self.cfg.GetMasterNode()
519
520     nodelist = self.cfg.GetNodeList()
521     if len(nodelist) != 1 or nodelist[0] != master:
522       raise errors.OpPrereqError("There are still %d node(s) in"
523                                  " this cluster." % (len(nodelist) - 1))
524     instancelist = self.cfg.GetInstanceList()
525     if instancelist:
526       raise errors.OpPrereqError("There are still %d instance(s) in"
527                                  " this cluster." % len(instancelist))
528
529   def Exec(self, feedback_fn):
530     """Destroys the cluster.
531
532     """
533     master = self.cfg.GetMasterNode()
534     if not self.rpc.call_node_stop_master(master, False):
535       raise errors.OpExecError("Could not disable the master role")
536     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
537     utils.CreateBackup(priv_key)
538     utils.CreateBackup(pub_key)
539     return master
540
541
542 class LUVerifyCluster(LogicalUnit):
543   """Verifies the cluster status.
544
545   """
546   HPATH = "cluster-verify"
547   HTYPE = constants.HTYPE_CLUSTER
548   _OP_REQP = ["skip_checks"]
549   REQ_BGL = False
550
551   def ExpandNames(self):
552     self.needed_locks = {
553       locking.LEVEL_NODE: locking.ALL_SET,
554       locking.LEVEL_INSTANCE: locking.ALL_SET,
555     }
556     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
557
558   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
559                   remote_version, feedback_fn):
560     """Run multiple tests against a node.
561
562     Test list:
563       - compares ganeti version
564       - checks vg existance and size > 20G
565       - checks config file checksum
566       - checks ssh to other nodes
567
568     Args:
569       node: name of the node to check
570       file_list: required list of files
571       local_cksum: dictionary of local files and their checksums
572
573     """
574     # compares ganeti version
575     local_version = constants.PROTOCOL_VERSION
576     if not remote_version:
577       feedback_fn("  - ERROR: connection to %s failed" % (node))
578       return True
579
580     if local_version != remote_version:
581       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
582                       (local_version, node, remote_version))
583       return True
584
585     # checks vg existance and size > 20G
586
587     bad = False
588     if not vglist:
589       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
590                       (node,))
591       bad = True
592     else:
593       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
594                                             constants.MIN_VG_SIZE)
595       if vgstatus:
596         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
597         bad = True
598
599     if not node_result:
600       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
601       return True
602
603     # checks config file checksum
604     # checks ssh to any
605
606     if 'filelist' not in node_result:
607       bad = True
608       feedback_fn("  - ERROR: node hasn't returned file checksum data")
609     else:
610       remote_cksum = node_result['filelist']
611       for file_name in file_list:
612         if file_name not in remote_cksum:
613           bad = True
614           feedback_fn("  - ERROR: file '%s' missing" % file_name)
615         elif remote_cksum[file_name] != local_cksum[file_name]:
616           bad = True
617           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
618
619     if 'nodelist' not in node_result:
620       bad = True
621       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
622     else:
623       if node_result['nodelist']:
624         bad = True
625         for node in node_result['nodelist']:
626           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
627                           (node, node_result['nodelist'][node]))
628     if 'node-net-test' not in node_result:
629       bad = True
630       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
631     else:
632       if node_result['node-net-test']:
633         bad = True
634         nlist = utils.NiceSort(node_result['node-net-test'].keys())
635         for node in nlist:
636           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
637                           (node, node_result['node-net-test'][node]))
638
639     hyp_result = node_result.get('hypervisor', None)
640     if isinstance(hyp_result, dict):
641       for hv_name, hv_result in hyp_result.iteritems():
642         if hv_result is not None:
643           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
644                       (hv_name, hv_result))
645     return bad
646
647   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
648                       node_instance, feedback_fn):
649     """Verify an instance.
650
651     This function checks to see if the required block devices are
652     available on the instance's node.
653
654     """
655     bad = False
656
657     node_current = instanceconfig.primary_node
658
659     node_vol_should = {}
660     instanceconfig.MapLVsByNode(node_vol_should)
661
662     for node in node_vol_should:
663       for volume in node_vol_should[node]:
664         if node not in node_vol_is or volume not in node_vol_is[node]:
665           feedback_fn("  - ERROR: volume %s missing on node %s" %
666                           (volume, node))
667           bad = True
668
669     if not instanceconfig.status == 'down':
670       if (node_current not in node_instance or
671           not instance in node_instance[node_current]):
672         feedback_fn("  - ERROR: instance %s not running on node %s" %
673                         (instance, node_current))
674         bad = True
675
676     for node in node_instance:
677       if (not node == node_current):
678         if instance in node_instance[node]:
679           feedback_fn("  - ERROR: instance %s should not run on node %s" %
680                           (instance, node))
681           bad = True
682
683     return bad
684
685   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
686     """Verify if there are any unknown volumes in the cluster.
687
688     The .os, .swap and backup volumes are ignored. All other volumes are
689     reported as unknown.
690
691     """
692     bad = False
693
694     for node in node_vol_is:
695       for volume in node_vol_is[node]:
696         if node not in node_vol_should or volume not in node_vol_should[node]:
697           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
698                       (volume, node))
699           bad = True
700     return bad
701
702   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
703     """Verify the list of running instances.
704
705     This checks what instances are running but unknown to the cluster.
706
707     """
708     bad = False
709     for node in node_instance:
710       for runninginstance in node_instance[node]:
711         if runninginstance not in instancelist:
712           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
713                           (runninginstance, node))
714           bad = True
715     return bad
716
717   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
718     """Verify N+1 Memory Resilience.
719
720     Check that if one single node dies we can still start all the instances it
721     was primary for.
722
723     """
724     bad = False
725
726     for node, nodeinfo in node_info.iteritems():
727       # This code checks that every node which is now listed as secondary has
728       # enough memory to host all instances it is supposed to should a single
729       # other node in the cluster fail.
730       # FIXME: not ready for failover to an arbitrary node
731       # FIXME: does not support file-backed instances
732       # WARNING: we currently take into account down instances as well as up
733       # ones, considering that even if they're down someone might want to start
734       # them even in the event of a node failure.
735       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
736         needed_mem = 0
737         for instance in instances:
738           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
739           if bep[constants.BE_AUTO_BALANCE]:
740             needed_mem += bep[constants.BE_MEMORY]
741         if nodeinfo['mfree'] < needed_mem:
742           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
743                       " failovers should node %s fail" % (node, prinode))
744           bad = True
745     return bad
746
747   def CheckPrereq(self):
748     """Check prerequisites.
749
750     Transform the list of checks we're going to skip into a set and check that
751     all its members are valid.
752
753     """
754     self.skip_set = frozenset(self.op.skip_checks)
755     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
756       raise errors.OpPrereqError("Invalid checks to be skipped specified")
757
758   def BuildHooksEnv(self):
759     """Build hooks env.
760
761     Cluster-Verify hooks just rone in the post phase and their failure makes
762     the output be logged in the verify output and the verification to fail.
763
764     """
765     all_nodes = self.cfg.GetNodeList()
766     # TODO: populate the environment with useful information for verify hooks
767     env = {}
768     return env, [], all_nodes
769
770   def Exec(self, feedback_fn):
771     """Verify integrity of cluster, performing various test on nodes.
772
773     """
774     bad = False
775     feedback_fn("* Verifying global settings")
776     for msg in self.cfg.VerifyConfig():
777       feedback_fn("  - ERROR: %s" % msg)
778
779     vg_name = self.cfg.GetVGName()
780     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
781     nodelist = utils.NiceSort(self.cfg.GetNodeList())
782     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
783     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
784     i_non_redundant = [] # Non redundant instances
785     i_non_a_balanced = [] # Non auto-balanced instances
786     node_volume = {}
787     node_instance = {}
788     node_info = {}
789     instance_cfg = {}
790
791     # FIXME: verify OS list
792     # do local checksums
793     file_names = []
794     file_names.append(constants.SSL_CERT_FILE)
795     file_names.append(constants.CLUSTER_CONF_FILE)
796     local_checksums = utils.FingerprintFiles(file_names)
797
798     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
799     all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
800     all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
801     all_vglist = self.rpc.call_vg_list(nodelist)
802     node_verify_param = {
803       'filelist': file_names,
804       'nodelist': nodelist,
805       'hypervisor': hypervisors,
806       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
807                         for node in nodeinfo]
808       }
809     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
810                                            self.cfg.GetClusterName())
811     all_rversion = self.rpc.call_version(nodelist)
812     all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
813                                         self.cfg.GetHypervisorType())
814
815     cluster = self.cfg.GetClusterInfo()
816     for node in nodelist:
817       feedback_fn("* Verifying node %s" % node)
818       result = self._VerifyNode(node, file_names, local_checksums,
819                                 all_vglist[node], all_nvinfo[node],
820                                 all_rversion[node], feedback_fn)
821       bad = bad or result
822
823       # node_volume
824       volumeinfo = all_volumeinfo[node]
825
826       if isinstance(volumeinfo, basestring):
827         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
828                     (node, volumeinfo[-400:].encode('string_escape')))
829         bad = True
830         node_volume[node] = {}
831       elif not isinstance(volumeinfo, dict):
832         feedback_fn("  - ERROR: connection to %s failed" % (node,))
833         bad = True
834         continue
835       else:
836         node_volume[node] = volumeinfo
837
838       # node_instance
839       nodeinstance = all_instanceinfo[node]
840       if type(nodeinstance) != list:
841         feedback_fn("  - ERROR: connection to %s failed" % (node,))
842         bad = True
843         continue
844
845       node_instance[node] = nodeinstance
846
847       # node_info
848       nodeinfo = all_ninfo[node]
849       if not isinstance(nodeinfo, dict):
850         feedback_fn("  - ERROR: connection to %s failed" % (node,))
851         bad = True
852         continue
853
854       try:
855         node_info[node] = {
856           "mfree": int(nodeinfo['memory_free']),
857           "dfree": int(nodeinfo['vg_free']),
858           "pinst": [],
859           "sinst": [],
860           # dictionary holding all instances this node is secondary for,
861           # grouped by their primary node. Each key is a cluster node, and each
862           # value is a list of instances which have the key as primary and the
863           # current node as secondary.  this is handy to calculate N+1 memory
864           # availability if you can only failover from a primary to its
865           # secondary.
866           "sinst-by-pnode": {},
867         }
868       except ValueError:
869         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
870         bad = True
871         continue
872
873     node_vol_should = {}
874
875     for instance in instancelist:
876       feedback_fn("* Verifying instance %s" % instance)
877       inst_config = self.cfg.GetInstanceInfo(instance)
878       result =  self._VerifyInstance(instance, inst_config, node_volume,
879                                      node_instance, feedback_fn)
880       bad = bad or result
881
882       inst_config.MapLVsByNode(node_vol_should)
883
884       instance_cfg[instance] = inst_config
885
886       pnode = inst_config.primary_node
887       if pnode in node_info:
888         node_info[pnode]['pinst'].append(instance)
889       else:
890         feedback_fn("  - ERROR: instance %s, connection to primary node"
891                     " %s failed" % (instance, pnode))
892         bad = True
893
894       # If the instance is non-redundant we cannot survive losing its primary
895       # node, so we are not N+1 compliant. On the other hand we have no disk
896       # templates with more than one secondary so that situation is not well
897       # supported either.
898       # FIXME: does not support file-backed instances
899       if len(inst_config.secondary_nodes) == 0:
900         i_non_redundant.append(instance)
901       elif len(inst_config.secondary_nodes) > 1:
902         feedback_fn("  - WARNING: multiple secondaries for instance %s"
903                     % instance)
904
905       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
906         i_non_a_balanced.append(instance)
907
908       for snode in inst_config.secondary_nodes:
909         if snode in node_info:
910           node_info[snode]['sinst'].append(instance)
911           if pnode not in node_info[snode]['sinst-by-pnode']:
912             node_info[snode]['sinst-by-pnode'][pnode] = []
913           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
914         else:
915           feedback_fn("  - ERROR: instance %s, connection to secondary node"
916                       " %s failed" % (instance, snode))
917
918     feedback_fn("* Verifying orphan volumes")
919     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
920                                        feedback_fn)
921     bad = bad or result
922
923     feedback_fn("* Verifying remaining instances")
924     result = self._VerifyOrphanInstances(instancelist, node_instance,
925                                          feedback_fn)
926     bad = bad or result
927
928     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
929       feedback_fn("* Verifying N+1 Memory redundancy")
930       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
931       bad = bad or result
932
933     feedback_fn("* Other Notes")
934     if i_non_redundant:
935       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
936                   % len(i_non_redundant))
937
938     if i_non_a_balanced:
939       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
940                   % len(i_non_a_balanced))
941
942     return not bad
943
944   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
945     """Analize the post-hooks' result, handle it, and send some
946     nicely-formatted feedback back to the user.
947
948     Args:
949       phase: the hooks phase that has just been run
950       hooks_results: the results of the multi-node hooks rpc call
951       feedback_fn: function to send feedback back to the caller
952       lu_result: previous Exec result
953
954     """
955     # We only really run POST phase hooks, and are only interested in
956     # their results
957     if phase == constants.HOOKS_PHASE_POST:
958       # Used to change hooks' output to proper indentation
959       indent_re = re.compile('^', re.M)
960       feedback_fn("* Hooks Results")
961       if not hooks_results:
962         feedback_fn("  - ERROR: general communication failure")
963         lu_result = 1
964       else:
965         for node_name in hooks_results:
966           show_node_header = True
967           res = hooks_results[node_name]
968           if res is False or not isinstance(res, list):
969             feedback_fn("    Communication failure")
970             lu_result = 1
971             continue
972           for script, hkr, output in res:
973             if hkr == constants.HKR_FAIL:
974               # The node header is only shown once, if there are
975               # failing hooks on that node
976               if show_node_header:
977                 feedback_fn("  Node %s:" % node_name)
978                 show_node_header = False
979               feedback_fn("    ERROR: Script %s failed, output:" % script)
980               output = indent_re.sub('      ', output)
981               feedback_fn("%s" % output)
982               lu_result = 1
983
984       return lu_result
985
986
987 class LUVerifyDisks(NoHooksLU):
988   """Verifies the cluster disks status.
989
990   """
991   _OP_REQP = []
992   REQ_BGL = False
993
994   def ExpandNames(self):
995     self.needed_locks = {
996       locking.LEVEL_NODE: locking.ALL_SET,
997       locking.LEVEL_INSTANCE: locking.ALL_SET,
998     }
999     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1000
1001   def CheckPrereq(self):
1002     """Check prerequisites.
1003
1004     This has no prerequisites.
1005
1006     """
1007     pass
1008
1009   def Exec(self, feedback_fn):
1010     """Verify integrity of cluster disks.
1011
1012     """
1013     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1014
1015     vg_name = self.cfg.GetVGName()
1016     nodes = utils.NiceSort(self.cfg.GetNodeList())
1017     instances = [self.cfg.GetInstanceInfo(name)
1018                  for name in self.cfg.GetInstanceList()]
1019
1020     nv_dict = {}
1021     for inst in instances:
1022       inst_lvs = {}
1023       if (inst.status != "up" or
1024           inst.disk_template not in constants.DTS_NET_MIRROR):
1025         continue
1026       inst.MapLVsByNode(inst_lvs)
1027       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1028       for node, vol_list in inst_lvs.iteritems():
1029         for vol in vol_list:
1030           nv_dict[(node, vol)] = inst
1031
1032     if not nv_dict:
1033       return result
1034
1035     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1036
1037     to_act = set()
1038     for node in nodes:
1039       # node_volume
1040       lvs = node_lvs[node]
1041
1042       if isinstance(lvs, basestring):
1043         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1044         res_nlvm[node] = lvs
1045       elif not isinstance(lvs, dict):
1046         logging.warning("Connection to node %s failed or invalid data"
1047                         " returned", node)
1048         res_nodes.append(node)
1049         continue
1050
1051       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1052         inst = nv_dict.pop((node, lv_name), None)
1053         if (not lv_online and inst is not None
1054             and inst.name not in res_instances):
1055           res_instances.append(inst.name)
1056
1057     # any leftover items in nv_dict are missing LVs, let's arrange the
1058     # data better
1059     for key, inst in nv_dict.iteritems():
1060       if inst.name not in res_missing:
1061         res_missing[inst.name] = []
1062       res_missing[inst.name].append(key)
1063
1064     return result
1065
1066
1067 class LURenameCluster(LogicalUnit):
1068   """Rename the cluster.
1069
1070   """
1071   HPATH = "cluster-rename"
1072   HTYPE = constants.HTYPE_CLUSTER
1073   _OP_REQP = ["name"]
1074
1075   def BuildHooksEnv(self):
1076     """Build hooks env.
1077
1078     """
1079     env = {
1080       "OP_TARGET": self.cfg.GetClusterName(),
1081       "NEW_NAME": self.op.name,
1082       }
1083     mn = self.cfg.GetMasterNode()
1084     return env, [mn], [mn]
1085
1086   def CheckPrereq(self):
1087     """Verify that the passed name is a valid one.
1088
1089     """
1090     hostname = utils.HostInfo(self.op.name)
1091
1092     new_name = hostname.name
1093     self.ip = new_ip = hostname.ip
1094     old_name = self.cfg.GetClusterName()
1095     old_ip = self.cfg.GetMasterIP()
1096     if new_name == old_name and new_ip == old_ip:
1097       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1098                                  " cluster has changed")
1099     if new_ip != old_ip:
1100       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1101         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1102                                    " reachable on the network. Aborting." %
1103                                    new_ip)
1104
1105     self.op.name = new_name
1106
1107   def Exec(self, feedback_fn):
1108     """Rename the cluster.
1109
1110     """
1111     clustername = self.op.name
1112     ip = self.ip
1113
1114     # shutdown the master IP
1115     master = self.cfg.GetMasterNode()
1116     if not self.rpc.call_node_stop_master(master, False):
1117       raise errors.OpExecError("Could not disable the master role")
1118
1119     try:
1120       # modify the sstore
1121       # TODO: sstore
1122       ss.SetKey(ss.SS_MASTER_IP, ip)
1123       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1124
1125       # Distribute updated ss config to all nodes
1126       myself = self.cfg.GetNodeInfo(master)
1127       dist_nodes = self.cfg.GetNodeList()
1128       if myself.name in dist_nodes:
1129         dist_nodes.remove(myself.name)
1130
1131       logging.debug("Copying updated ssconf data to all nodes")
1132       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1133         fname = ss.KeyToFilename(keyname)
1134         result = self.rpc.call_upload_file(dist_nodes, fname)
1135         for to_node in dist_nodes:
1136           if not result[to_node]:
1137             self.LogWarning("Copy of file %s to node %s failed",
1138                             fname, to_node)
1139     finally:
1140       if not self.rpc.call_node_start_master(master, False):
1141         self.LogWarning("Could not re-enable the master role on"
1142                         " the master, please restart manually.")
1143
1144
1145 def _RecursiveCheckIfLVMBased(disk):
1146   """Check if the given disk or its children are lvm-based.
1147
1148   Args:
1149     disk: ganeti.objects.Disk object
1150
1151   Returns:
1152     boolean indicating whether a LD_LV dev_type was found or not
1153
1154   """
1155   if disk.children:
1156     for chdisk in disk.children:
1157       if _RecursiveCheckIfLVMBased(chdisk):
1158         return True
1159   return disk.dev_type == constants.LD_LV
1160
1161
1162 class LUSetClusterParams(LogicalUnit):
1163   """Change the parameters of the cluster.
1164
1165   """
1166   HPATH = "cluster-modify"
1167   HTYPE = constants.HTYPE_CLUSTER
1168   _OP_REQP = []
1169   REQ_BGL = False
1170
1171   def ExpandNames(self):
1172     # FIXME: in the future maybe other cluster params won't require checking on
1173     # all nodes to be modified.
1174     self.needed_locks = {
1175       locking.LEVEL_NODE: locking.ALL_SET,
1176     }
1177     self.share_locks[locking.LEVEL_NODE] = 1
1178
1179   def BuildHooksEnv(self):
1180     """Build hooks env.
1181
1182     """
1183     env = {
1184       "OP_TARGET": self.cfg.GetClusterName(),
1185       "NEW_VG_NAME": self.op.vg_name,
1186       }
1187     mn = self.cfg.GetMasterNode()
1188     return env, [mn], [mn]
1189
1190   def CheckPrereq(self):
1191     """Check prerequisites.
1192
1193     This checks whether the given params don't conflict and
1194     if the given volume group is valid.
1195
1196     """
1197     # FIXME: This only works because there is only one parameter that can be
1198     # changed or removed.
1199     if self.op.vg_name is not None and not self.op.vg_name:
1200       instances = self.cfg.GetAllInstancesInfo().values()
1201       for inst in instances:
1202         for disk in inst.disks:
1203           if _RecursiveCheckIfLVMBased(disk):
1204             raise errors.OpPrereqError("Cannot disable lvm storage while"
1205                                        " lvm-based instances exist")
1206
1207     node_list = self.acquired_locks[locking.LEVEL_NODE]
1208
1209     # if vg_name not None, checks given volume group on all nodes
1210     if self.op.vg_name:
1211       vglist = self.rpc.call_vg_list(node_list)
1212       for node in node_list:
1213         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1214                                               constants.MIN_VG_SIZE)
1215         if vgstatus:
1216           raise errors.OpPrereqError("Error on node '%s': %s" %
1217                                      (node, vgstatus))
1218
1219     self.cluster = cluster = self.cfg.GetClusterInfo()
1220     # beparams changes do not need validation (we can't validate?),
1221     # but we still process here
1222     if self.op.beparams:
1223       self.new_beparams = cluster.FillDict(
1224         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1225
1226     # hypervisor list/parameters
1227     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1228     if self.op.hvparams:
1229       if not isinstance(self.op.hvparams, dict):
1230         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1231       for hv_name, hv_dict in self.op.hvparams.items():
1232         if hv_name not in self.new_hvparams:
1233           self.new_hvparams[hv_name] = hv_dict
1234         else:
1235           self.new_hvparams[hv_name].update(hv_dict)
1236
1237     if self.op.enabled_hypervisors is not None:
1238       self.hv_list = self.op.enabled_hypervisors
1239     else:
1240       self.hv_list = cluster.enabled_hypervisors
1241
1242     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1243       # either the enabled list has changed, or the parameters have, validate
1244       for hv_name, hv_params in self.new_hvparams.items():
1245         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1246             (self.op.enabled_hypervisors and
1247              hv_name in self.op.enabled_hypervisors)):
1248           # either this is a new hypervisor, or its parameters have changed
1249           hv_class = hypervisor.GetHypervisor(hv_name)
1250           hv_class.CheckParameterSyntax(hv_params)
1251           _CheckHVParams(self, node_list, hv_name, hv_params)
1252
1253   def Exec(self, feedback_fn):
1254     """Change the parameters of the cluster.
1255
1256     """
1257     if self.op.vg_name is not None:
1258       if self.op.vg_name != self.cfg.GetVGName():
1259         self.cfg.SetVGName(self.op.vg_name)
1260       else:
1261         feedback_fn("Cluster LVM configuration already in desired"
1262                     " state, not changing")
1263     if self.op.hvparams:
1264       self.cluster.hvparams = self.new_hvparams
1265     if self.op.enabled_hypervisors is not None:
1266       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1267     if self.op.beparams:
1268       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1269     self.cfg.Update(self.cluster)
1270
1271
1272 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1273   """Sleep and poll for an instance's disk to sync.
1274
1275   """
1276   if not instance.disks:
1277     return True
1278
1279   if not oneshot:
1280     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1281
1282   node = instance.primary_node
1283
1284   for dev in instance.disks:
1285     lu.cfg.SetDiskID(dev, node)
1286
1287   retries = 0
1288   while True:
1289     max_time = 0
1290     done = True
1291     cumul_degraded = False
1292     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1293     if not rstats:
1294       lu.LogWarning("Can't get any data from node %s", node)
1295       retries += 1
1296       if retries >= 10:
1297         raise errors.RemoteError("Can't contact node %s for mirror data,"
1298                                  " aborting." % node)
1299       time.sleep(6)
1300       continue
1301     retries = 0
1302     for i in range(len(rstats)):
1303       mstat = rstats[i]
1304       if mstat is None:
1305         lu.LogWarning("Can't compute data for node %s/%s",
1306                            node, instance.disks[i].iv_name)
1307         continue
1308       # we ignore the ldisk parameter
1309       perc_done, est_time, is_degraded, _ = mstat
1310       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1311       if perc_done is not None:
1312         done = False
1313         if est_time is not None:
1314           rem_time = "%d estimated seconds remaining" % est_time
1315           max_time = est_time
1316         else:
1317           rem_time = "no time estimate"
1318         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1319                         (instance.disks[i].iv_name, perc_done, rem_time))
1320     if done or oneshot:
1321       break
1322
1323     time.sleep(min(60, max_time))
1324
1325   if done:
1326     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1327   return not cumul_degraded
1328
1329
1330 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1331   """Check that mirrors are not degraded.
1332
1333   The ldisk parameter, if True, will change the test from the
1334   is_degraded attribute (which represents overall non-ok status for
1335   the device(s)) to the ldisk (representing the local storage status).
1336
1337   """
1338   lu.cfg.SetDiskID(dev, node)
1339   if ldisk:
1340     idx = 6
1341   else:
1342     idx = 5
1343
1344   result = True
1345   if on_primary or dev.AssembleOnSecondary():
1346     rstats = lu.rpc.call_blockdev_find(node, dev)
1347     if not rstats:
1348       logging.warning("Node %s: disk degraded, not found or node down", node)
1349       result = False
1350     else:
1351       result = result and (not rstats[idx])
1352   if dev.children:
1353     for child in dev.children:
1354       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1355
1356   return result
1357
1358
1359 class LUDiagnoseOS(NoHooksLU):
1360   """Logical unit for OS diagnose/query.
1361
1362   """
1363   _OP_REQP = ["output_fields", "names"]
1364   REQ_BGL = False
1365   _FIELDS_STATIC = _FieldSet()
1366   _FIELDS_DYNAMIC = _FieldSet("name", "valid", "node_status")
1367
1368   def ExpandNames(self):
1369     if self.op.names:
1370       raise errors.OpPrereqError("Selective OS query not supported")
1371
1372     _CheckOutputFields(static=self._FIELDS_STATIC,
1373                        dynamic=self._FIELDS_DYNAMIC,
1374                        selected=self.op.output_fields)
1375
1376     # Lock all nodes, in shared mode
1377     self.needed_locks = {}
1378     self.share_locks[locking.LEVEL_NODE] = 1
1379     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1380
1381   def CheckPrereq(self):
1382     """Check prerequisites.
1383
1384     """
1385
1386   @staticmethod
1387   def _DiagnoseByOS(node_list, rlist):
1388     """Remaps a per-node return list into an a per-os per-node dictionary
1389
1390       Args:
1391         node_list: a list with the names of all nodes
1392         rlist: a map with node names as keys and OS objects as values
1393
1394       Returns:
1395         map: a map with osnames as keys and as value another map, with
1396              nodes as
1397              keys and list of OS objects as values
1398              e.g. {"debian-etch": {"node1": [<object>,...],
1399                                    "node2": [<object>,]}
1400                   }
1401
1402     """
1403     all_os = {}
1404     for node_name, nr in rlist.iteritems():
1405       if not nr:
1406         continue
1407       for os_obj in nr:
1408         if os_obj.name not in all_os:
1409           # build a list of nodes for this os containing empty lists
1410           # for each node in node_list
1411           all_os[os_obj.name] = {}
1412           for nname in node_list:
1413             all_os[os_obj.name][nname] = []
1414         all_os[os_obj.name][node_name].append(os_obj)
1415     return all_os
1416
1417   def Exec(self, feedback_fn):
1418     """Compute the list of OSes.
1419
1420     """
1421     node_list = self.acquired_locks[locking.LEVEL_NODE]
1422     node_data = self.rpc.call_os_diagnose(node_list)
1423     if node_data == False:
1424       raise errors.OpExecError("Can't gather the list of OSes")
1425     pol = self._DiagnoseByOS(node_list, node_data)
1426     output = []
1427     for os_name, os_data in pol.iteritems():
1428       row = []
1429       for field in self.op.output_fields:
1430         if field == "name":
1431           val = os_name
1432         elif field == "valid":
1433           val = utils.all([osl and osl[0] for osl in os_data.values()])
1434         elif field == "node_status":
1435           val = {}
1436           for node_name, nos_list in os_data.iteritems():
1437             val[node_name] = [(v.status, v.path) for v in nos_list]
1438         else:
1439           raise errors.ParameterError(field)
1440         row.append(val)
1441       output.append(row)
1442
1443     return output
1444
1445
1446 class LURemoveNode(LogicalUnit):
1447   """Logical unit for removing a node.
1448
1449   """
1450   HPATH = "node-remove"
1451   HTYPE = constants.HTYPE_NODE
1452   _OP_REQP = ["node_name"]
1453
1454   def BuildHooksEnv(self):
1455     """Build hooks env.
1456
1457     This doesn't run on the target node in the pre phase as a failed
1458     node would then be impossible to remove.
1459
1460     """
1461     env = {
1462       "OP_TARGET": self.op.node_name,
1463       "NODE_NAME": self.op.node_name,
1464       }
1465     all_nodes = self.cfg.GetNodeList()
1466     all_nodes.remove(self.op.node_name)
1467     return env, all_nodes, all_nodes
1468
1469   def CheckPrereq(self):
1470     """Check prerequisites.
1471
1472     This checks:
1473      - the node exists in the configuration
1474      - it does not have primary or secondary instances
1475      - it's not the master
1476
1477     Any errors are signalled by raising errors.OpPrereqError.
1478
1479     """
1480     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1481     if node is None:
1482       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1483
1484     instance_list = self.cfg.GetInstanceList()
1485
1486     masternode = self.cfg.GetMasterNode()
1487     if node.name == masternode:
1488       raise errors.OpPrereqError("Node is the master node,"
1489                                  " you need to failover first.")
1490
1491     for instance_name in instance_list:
1492       instance = self.cfg.GetInstanceInfo(instance_name)
1493       if node.name == instance.primary_node:
1494         raise errors.OpPrereqError("Instance %s still running on the node,"
1495                                    " please remove first." % instance_name)
1496       if node.name in instance.secondary_nodes:
1497         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1498                                    " please remove first." % instance_name)
1499     self.op.node_name = node.name
1500     self.node = node
1501
1502   def Exec(self, feedback_fn):
1503     """Removes the node from the cluster.
1504
1505     """
1506     node = self.node
1507     logging.info("Stopping the node daemon and removing configs from node %s",
1508                  node.name)
1509
1510     self.context.RemoveNode(node.name)
1511
1512     self.rpc.call_node_leave_cluster(node.name)
1513
1514
1515 class LUQueryNodes(NoHooksLU):
1516   """Logical unit for querying nodes.
1517
1518   """
1519   _OP_REQP = ["output_fields", "names"]
1520   REQ_BGL = False
1521   _FIELDS_DYNAMIC = _FieldSet(
1522     "dtotal", "dfree",
1523     "mtotal", "mnode", "mfree",
1524     "bootid",
1525     "ctotal",
1526     )
1527
1528   _FIELDS_STATIC = _FieldSet(
1529     "name", "pinst_cnt", "sinst_cnt",
1530     "pinst_list", "sinst_list",
1531     "pip", "sip", "tags",
1532     "serial_no",
1533     )
1534
1535   def ExpandNames(self):
1536     _CheckOutputFields(static=self._FIELDS_STATIC,
1537                        dynamic=self._FIELDS_DYNAMIC,
1538                        selected=self.op.output_fields)
1539
1540     self.needed_locks = {}
1541     self.share_locks[locking.LEVEL_NODE] = 1
1542
1543     if self.op.names:
1544       self.wanted = _GetWantedNodes(self, self.op.names)
1545     else:
1546       self.wanted = locking.ALL_SET
1547
1548     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1549     if self.do_locking:
1550       # if we don't request only static fields, we need to lock the nodes
1551       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1552
1553
1554   def CheckPrereq(self):
1555     """Check prerequisites.
1556
1557     """
1558     # The validation of the node list is done in the _GetWantedNodes,
1559     # if non empty, and if empty, there's no validation to do
1560     pass
1561
1562   def Exec(self, feedback_fn):
1563     """Computes the list of nodes and their attributes.
1564
1565     """
1566     all_info = self.cfg.GetAllNodesInfo()
1567     if self.do_locking:
1568       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1569     elif self.wanted != locking.ALL_SET:
1570       nodenames = self.wanted
1571       missing = set(nodenames).difference(all_info.keys())
1572       if missing:
1573         raise errors.OpExecError(
1574           "Some nodes were removed before retrieving their data: %s" % missing)
1575     else:
1576       nodenames = all_info.keys()
1577
1578     nodenames = utils.NiceSort(nodenames)
1579     nodelist = [all_info[name] for name in nodenames]
1580
1581     # begin data gathering
1582
1583     if self.do_locking:
1584       live_data = {}
1585       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1586                                           self.cfg.GetHypervisorType())
1587       for name in nodenames:
1588         nodeinfo = node_data.get(name, None)
1589         if nodeinfo:
1590           live_data[name] = {
1591             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1592             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1593             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1594             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1595             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1596             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1597             "bootid": nodeinfo['bootid'],
1598             }
1599         else:
1600           live_data[name] = {}
1601     else:
1602       live_data = dict.fromkeys(nodenames, {})
1603
1604     node_to_primary = dict([(name, set()) for name in nodenames])
1605     node_to_secondary = dict([(name, set()) for name in nodenames])
1606
1607     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1608                              "sinst_cnt", "sinst_list"))
1609     if inst_fields & frozenset(self.op.output_fields):
1610       instancelist = self.cfg.GetInstanceList()
1611
1612       for instance_name in instancelist:
1613         inst = self.cfg.GetInstanceInfo(instance_name)
1614         if inst.primary_node in node_to_primary:
1615           node_to_primary[inst.primary_node].add(inst.name)
1616         for secnode in inst.secondary_nodes:
1617           if secnode in node_to_secondary:
1618             node_to_secondary[secnode].add(inst.name)
1619
1620     # end data gathering
1621
1622     output = []
1623     for node in nodelist:
1624       node_output = []
1625       for field in self.op.output_fields:
1626         if field == "name":
1627           val = node.name
1628         elif field == "pinst_list":
1629           val = list(node_to_primary[node.name])
1630         elif field == "sinst_list":
1631           val = list(node_to_secondary[node.name])
1632         elif field == "pinst_cnt":
1633           val = len(node_to_primary[node.name])
1634         elif field == "sinst_cnt":
1635           val = len(node_to_secondary[node.name])
1636         elif field == "pip":
1637           val = node.primary_ip
1638         elif field == "sip":
1639           val = node.secondary_ip
1640         elif field == "tags":
1641           val = list(node.GetTags())
1642         elif field == "serial_no":
1643           val = node.serial_no
1644         elif self._FIELDS_DYNAMIC.Matches(field):
1645           val = live_data[node.name].get(field, None)
1646         else:
1647           raise errors.ParameterError(field)
1648         node_output.append(val)
1649       output.append(node_output)
1650
1651     return output
1652
1653
1654 class LUQueryNodeVolumes(NoHooksLU):
1655   """Logical unit for getting volumes on node(s).
1656
1657   """
1658   _OP_REQP = ["nodes", "output_fields"]
1659   REQ_BGL = False
1660   _FIELDS_DYNAMIC = _FieldSet("phys", "vg", "name", "size", "instance")
1661   _FIELDS_STATIC = _FieldSet("node")
1662
1663   def ExpandNames(self):
1664     _CheckOutputFields(static=self._FIELDS_STATIC,
1665                        dynamic=self._FIELDS_DYNAMIC,
1666                        selected=self.op.output_fields)
1667
1668     self.needed_locks = {}
1669     self.share_locks[locking.LEVEL_NODE] = 1
1670     if not self.op.nodes:
1671       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1672     else:
1673       self.needed_locks[locking.LEVEL_NODE] = \
1674         _GetWantedNodes(self, self.op.nodes)
1675
1676   def CheckPrereq(self):
1677     """Check prerequisites.
1678
1679     This checks that the fields required are valid output fields.
1680
1681     """
1682     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1683
1684   def Exec(self, feedback_fn):
1685     """Computes the list of nodes and their attributes.
1686
1687     """
1688     nodenames = self.nodes
1689     volumes = self.rpc.call_node_volumes(nodenames)
1690
1691     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1692              in self.cfg.GetInstanceList()]
1693
1694     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1695
1696     output = []
1697     for node in nodenames:
1698       if node not in volumes or not volumes[node]:
1699         continue
1700
1701       node_vols = volumes[node][:]
1702       node_vols.sort(key=lambda vol: vol['dev'])
1703
1704       for vol in node_vols:
1705         node_output = []
1706         for field in self.op.output_fields:
1707           if field == "node":
1708             val = node
1709           elif field == "phys":
1710             val = vol['dev']
1711           elif field == "vg":
1712             val = vol['vg']
1713           elif field == "name":
1714             val = vol['name']
1715           elif field == "size":
1716             val = int(float(vol['size']))
1717           elif field == "instance":
1718             for inst in ilist:
1719               if node not in lv_by_node[inst]:
1720                 continue
1721               if vol['name'] in lv_by_node[inst][node]:
1722                 val = inst.name
1723                 break
1724             else:
1725               val = '-'
1726           else:
1727             raise errors.ParameterError(field)
1728           node_output.append(str(val))
1729
1730         output.append(node_output)
1731
1732     return output
1733
1734
1735 class LUAddNode(LogicalUnit):
1736   """Logical unit for adding node to the cluster.
1737
1738   """
1739   HPATH = "node-add"
1740   HTYPE = constants.HTYPE_NODE
1741   _OP_REQP = ["node_name"]
1742
1743   def BuildHooksEnv(self):
1744     """Build hooks env.
1745
1746     This will run on all nodes before, and on all nodes + the new node after.
1747
1748     """
1749     env = {
1750       "OP_TARGET": self.op.node_name,
1751       "NODE_NAME": self.op.node_name,
1752       "NODE_PIP": self.op.primary_ip,
1753       "NODE_SIP": self.op.secondary_ip,
1754       }
1755     nodes_0 = self.cfg.GetNodeList()
1756     nodes_1 = nodes_0 + [self.op.node_name, ]
1757     return env, nodes_0, nodes_1
1758
1759   def CheckPrereq(self):
1760     """Check prerequisites.
1761
1762     This checks:
1763      - the new node is not already in the config
1764      - it is resolvable
1765      - its parameters (single/dual homed) matches the cluster
1766
1767     Any errors are signalled by raising errors.OpPrereqError.
1768
1769     """
1770     node_name = self.op.node_name
1771     cfg = self.cfg
1772
1773     dns_data = utils.HostInfo(node_name)
1774
1775     node = dns_data.name
1776     primary_ip = self.op.primary_ip = dns_data.ip
1777     secondary_ip = getattr(self.op, "secondary_ip", None)
1778     if secondary_ip is None:
1779       secondary_ip = primary_ip
1780     if not utils.IsValidIP(secondary_ip):
1781       raise errors.OpPrereqError("Invalid secondary IP given")
1782     self.op.secondary_ip = secondary_ip
1783
1784     node_list = cfg.GetNodeList()
1785     if not self.op.readd and node in node_list:
1786       raise errors.OpPrereqError("Node %s is already in the configuration" %
1787                                  node)
1788     elif self.op.readd and node not in node_list:
1789       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1790
1791     for existing_node_name in node_list:
1792       existing_node = cfg.GetNodeInfo(existing_node_name)
1793
1794       if self.op.readd and node == existing_node_name:
1795         if (existing_node.primary_ip != primary_ip or
1796             existing_node.secondary_ip != secondary_ip):
1797           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1798                                      " address configuration as before")
1799         continue
1800
1801       if (existing_node.primary_ip == primary_ip or
1802           existing_node.secondary_ip == primary_ip or
1803           existing_node.primary_ip == secondary_ip or
1804           existing_node.secondary_ip == secondary_ip):
1805         raise errors.OpPrereqError("New node ip address(es) conflict with"
1806                                    " existing node %s" % existing_node.name)
1807
1808     # check that the type of the node (single versus dual homed) is the
1809     # same as for the master
1810     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1811     master_singlehomed = myself.secondary_ip == myself.primary_ip
1812     newbie_singlehomed = secondary_ip == primary_ip
1813     if master_singlehomed != newbie_singlehomed:
1814       if master_singlehomed:
1815         raise errors.OpPrereqError("The master has no private ip but the"
1816                                    " new node has one")
1817       else:
1818         raise errors.OpPrereqError("The master has a private ip but the"
1819                                    " new node doesn't have one")
1820
1821     # checks reachablity
1822     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1823       raise errors.OpPrereqError("Node not reachable by ping")
1824
1825     if not newbie_singlehomed:
1826       # check reachability from my secondary ip to newbie's secondary ip
1827       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1828                            source=myself.secondary_ip):
1829         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1830                                    " based ping to noded port")
1831
1832     self.new_node = objects.Node(name=node,
1833                                  primary_ip=primary_ip,
1834                                  secondary_ip=secondary_ip)
1835
1836   def Exec(self, feedback_fn):
1837     """Adds the new node to the cluster.
1838
1839     """
1840     new_node = self.new_node
1841     node = new_node.name
1842
1843     # check connectivity
1844     result = self.rpc.call_version([node])[node]
1845     if result:
1846       if constants.PROTOCOL_VERSION == result:
1847         logging.info("Communication to node %s fine, sw version %s match",
1848                      node, result)
1849       else:
1850         raise errors.OpExecError("Version mismatch master version %s,"
1851                                  " node version %s" %
1852                                  (constants.PROTOCOL_VERSION, result))
1853     else:
1854       raise errors.OpExecError("Cannot get version from the new node")
1855
1856     # setup ssh on node
1857     logging.info("Copy ssh key to node %s", node)
1858     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1859     keyarray = []
1860     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1861                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1862                 priv_key, pub_key]
1863
1864     for i in keyfiles:
1865       f = open(i, 'r')
1866       try:
1867         keyarray.append(f.read())
1868       finally:
1869         f.close()
1870
1871     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1872                                     keyarray[2],
1873                                     keyarray[3], keyarray[4], keyarray[5])
1874
1875     if not result:
1876       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1877
1878     # Add node to our /etc/hosts, and add key to known_hosts
1879     utils.AddHostToEtcHosts(new_node.name)
1880
1881     if new_node.secondary_ip != new_node.primary_ip:
1882       if not self.rpc.call_node_has_ip_address(new_node.name,
1883                                                new_node.secondary_ip):
1884         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1885                                  " you gave (%s). Please fix and re-run this"
1886                                  " command." % new_node.secondary_ip)
1887
1888     node_verify_list = [self.cfg.GetMasterNode()]
1889     node_verify_param = {
1890       'nodelist': [node],
1891       # TODO: do a node-net-test as well?
1892     }
1893
1894     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1895                                        self.cfg.GetClusterName())
1896     for verifier in node_verify_list:
1897       if not result[verifier]:
1898         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1899                                  " for remote verification" % verifier)
1900       if result[verifier]['nodelist']:
1901         for failed in result[verifier]['nodelist']:
1902           feedback_fn("ssh/hostname verification failed %s -> %s" %
1903                       (verifier, result[verifier]['nodelist'][failed]))
1904         raise errors.OpExecError("ssh/hostname verification failed.")
1905
1906     # Distribute updated /etc/hosts and known_hosts to all nodes,
1907     # including the node just added
1908     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1909     dist_nodes = self.cfg.GetNodeList()
1910     if not self.op.readd:
1911       dist_nodes.append(node)
1912     if myself.name in dist_nodes:
1913       dist_nodes.remove(myself.name)
1914
1915     logging.debug("Copying hosts and known_hosts to all nodes")
1916     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1917       result = self.rpc.call_upload_file(dist_nodes, fname)
1918       for to_node in dist_nodes:
1919         if not result[to_node]:
1920           logging.error("Copy of file %s to node %s failed", fname, to_node)
1921
1922     to_copy = []
1923     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1924       to_copy.append(constants.VNC_PASSWORD_FILE)
1925     for fname in to_copy:
1926       result = self.rpc.call_upload_file([node], fname)
1927       if not result[node]:
1928         logging.error("Could not copy file %s to node %s", fname, node)
1929
1930     if self.op.readd:
1931       self.context.ReaddNode(new_node)
1932     else:
1933       self.context.AddNode(new_node)
1934
1935
1936 class LUQueryClusterInfo(NoHooksLU):
1937   """Query cluster configuration.
1938
1939   """
1940   _OP_REQP = []
1941   REQ_MASTER = False
1942   REQ_BGL = False
1943
1944   def ExpandNames(self):
1945     self.needed_locks = {}
1946
1947   def CheckPrereq(self):
1948     """No prerequsites needed for this LU.
1949
1950     """
1951     pass
1952
1953   def Exec(self, feedback_fn):
1954     """Return cluster config.
1955
1956     """
1957     cluster = self.cfg.GetClusterInfo()
1958     result = {
1959       "software_version": constants.RELEASE_VERSION,
1960       "protocol_version": constants.PROTOCOL_VERSION,
1961       "config_version": constants.CONFIG_VERSION,
1962       "os_api_version": constants.OS_API_VERSION,
1963       "export_version": constants.EXPORT_VERSION,
1964       "architecture": (platform.architecture()[0], platform.machine()),
1965       "name": cluster.cluster_name,
1966       "master": cluster.master_node,
1967       "default_hypervisor": cluster.default_hypervisor,
1968       "enabled_hypervisors": cluster.enabled_hypervisors,
1969       "hvparams": cluster.hvparams,
1970       "beparams": cluster.beparams,
1971       }
1972
1973     return result
1974
1975
1976 class LUQueryConfigValues(NoHooksLU):
1977   """Return configuration values.
1978
1979   """
1980   _OP_REQP = []
1981   REQ_BGL = False
1982   _FIELDS_DYNAMIC = _FieldSet()
1983   _FIELDS_STATIC = _FieldSet("cluster_name", "master_node", "drain_flag")
1984
1985   def ExpandNames(self):
1986     self.needed_locks = {}
1987
1988     _CheckOutputFields(static=self._FIELDS_STATIC,
1989                        dynamic=self._FIELDS_DYNAMIC,
1990                        selected=self.op.output_fields)
1991
1992   def CheckPrereq(self):
1993     """No prerequisites.
1994
1995     """
1996     pass
1997
1998   def Exec(self, feedback_fn):
1999     """Dump a representation of the cluster config to the standard output.
2000
2001     """
2002     values = []
2003     for field in self.op.output_fields:
2004       if field == "cluster_name":
2005         entry = self.cfg.GetClusterName()
2006       elif field == "master_node":
2007         entry = self.cfg.GetMasterNode()
2008       elif field == "drain_flag":
2009         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2010       else:
2011         raise errors.ParameterError(field)
2012       values.append(entry)
2013     return values
2014
2015
2016 class LUActivateInstanceDisks(NoHooksLU):
2017   """Bring up an instance's disks.
2018
2019   """
2020   _OP_REQP = ["instance_name"]
2021   REQ_BGL = False
2022
2023   def ExpandNames(self):
2024     self._ExpandAndLockInstance()
2025     self.needed_locks[locking.LEVEL_NODE] = []
2026     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2027
2028   def DeclareLocks(self, level):
2029     if level == locking.LEVEL_NODE:
2030       self._LockInstancesNodes()
2031
2032   def CheckPrereq(self):
2033     """Check prerequisites.
2034
2035     This checks that the instance is in the cluster.
2036
2037     """
2038     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2039     assert self.instance is not None, \
2040       "Cannot retrieve locked instance %s" % self.op.instance_name
2041
2042   def Exec(self, feedback_fn):
2043     """Activate the disks.
2044
2045     """
2046     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2047     if not disks_ok:
2048       raise errors.OpExecError("Cannot activate block devices")
2049
2050     return disks_info
2051
2052
2053 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2054   """Prepare the block devices for an instance.
2055
2056   This sets up the block devices on all nodes.
2057
2058   Args:
2059     instance: a ganeti.objects.Instance object
2060     ignore_secondaries: if true, errors on secondary nodes won't result
2061                         in an error return from the function
2062
2063   Returns:
2064     false if the operation failed
2065     list of (host, instance_visible_name, node_visible_name) if the operation
2066          suceeded with the mapping from node devices to instance devices
2067   """
2068   device_info = []
2069   disks_ok = True
2070   iname = instance.name
2071   # With the two passes mechanism we try to reduce the window of
2072   # opportunity for the race condition of switching DRBD to primary
2073   # before handshaking occured, but we do not eliminate it
2074
2075   # The proper fix would be to wait (with some limits) until the
2076   # connection has been made and drbd transitions from WFConnection
2077   # into any other network-connected state (Connected, SyncTarget,
2078   # SyncSource, etc.)
2079
2080   # 1st pass, assemble on all nodes in secondary mode
2081   for inst_disk in instance.disks:
2082     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2083       lu.cfg.SetDiskID(node_disk, node)
2084       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2085       if not result:
2086         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2087                            " (is_primary=False, pass=1)",
2088                            inst_disk.iv_name, node)
2089         if not ignore_secondaries:
2090           disks_ok = False
2091
2092   # FIXME: race condition on drbd migration to primary
2093
2094   # 2nd pass, do only the primary node
2095   for inst_disk in instance.disks:
2096     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2097       if node != instance.primary_node:
2098         continue
2099       lu.cfg.SetDiskID(node_disk, node)
2100       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2101       if not result:
2102         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2103                            " (is_primary=True, pass=2)",
2104                            inst_disk.iv_name, node)
2105         disks_ok = False
2106     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2107
2108   # leave the disks configured for the primary node
2109   # this is a workaround that would be fixed better by
2110   # improving the logical/physical id handling
2111   for disk in instance.disks:
2112     lu.cfg.SetDiskID(disk, instance.primary_node)
2113
2114   return disks_ok, device_info
2115
2116
2117 def _StartInstanceDisks(lu, instance, force):
2118   """Start the disks of an instance.
2119
2120   """
2121   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2122                                            ignore_secondaries=force)
2123   if not disks_ok:
2124     _ShutdownInstanceDisks(lu, instance)
2125     if force is not None and not force:
2126       lu.proc.LogWarning("", hint="If the message above refers to a"
2127                          " secondary node,"
2128                          " you can retry the operation using '--force'.")
2129     raise errors.OpExecError("Disk consistency error")
2130
2131
2132 class LUDeactivateInstanceDisks(NoHooksLU):
2133   """Shutdown an instance's disks.
2134
2135   """
2136   _OP_REQP = ["instance_name"]
2137   REQ_BGL = False
2138
2139   def ExpandNames(self):
2140     self._ExpandAndLockInstance()
2141     self.needed_locks[locking.LEVEL_NODE] = []
2142     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2143
2144   def DeclareLocks(self, level):
2145     if level == locking.LEVEL_NODE:
2146       self._LockInstancesNodes()
2147
2148   def CheckPrereq(self):
2149     """Check prerequisites.
2150
2151     This checks that the instance is in the cluster.
2152
2153     """
2154     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2155     assert self.instance is not None, \
2156       "Cannot retrieve locked instance %s" % self.op.instance_name
2157
2158   def Exec(self, feedback_fn):
2159     """Deactivate the disks
2160
2161     """
2162     instance = self.instance
2163     _SafeShutdownInstanceDisks(self, instance)
2164
2165
2166 def _SafeShutdownInstanceDisks(lu, instance):
2167   """Shutdown block devices of an instance.
2168
2169   This function checks if an instance is running, before calling
2170   _ShutdownInstanceDisks.
2171
2172   """
2173   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2174                                       [instance.hypervisor])
2175   ins_l = ins_l[instance.primary_node]
2176   if not type(ins_l) is list:
2177     raise errors.OpExecError("Can't contact node '%s'" %
2178                              instance.primary_node)
2179
2180   if instance.name in ins_l:
2181     raise errors.OpExecError("Instance is running, can't shutdown"
2182                              " block devices.")
2183
2184   _ShutdownInstanceDisks(lu, instance)
2185
2186
2187 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2188   """Shutdown block devices of an instance.
2189
2190   This does the shutdown on all nodes of the instance.
2191
2192   If the ignore_primary is false, errors on the primary node are
2193   ignored.
2194
2195   """
2196   result = True
2197   for disk in instance.disks:
2198     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2199       lu.cfg.SetDiskID(top_disk, node)
2200       if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2201         logging.error("Could not shutdown block device %s on node %s",
2202                       disk.iv_name, node)
2203         if not ignore_primary or node != instance.primary_node:
2204           result = False
2205   return result
2206
2207
2208 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2209   """Checks if a node has enough free memory.
2210
2211   This function check if a given node has the needed amount of free
2212   memory. In case the node has less memory or we cannot get the
2213   information from the node, this function raise an OpPrereqError
2214   exception.
2215
2216   @type lu: C{LogicalUnit}
2217   @param lu: a logical unit from which we get configuration data
2218   @type node: C{str}
2219   @param node: the node to check
2220   @type reason: C{str}
2221   @param reason: string to use in the error message
2222   @type requested: C{int}
2223   @param requested: the amount of memory in MiB to check for
2224   @type hypervisor: C{str}
2225   @param hypervisor: the hypervisor to ask for memory stats
2226   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2227       we cannot check the node
2228
2229   """
2230   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2231   if not nodeinfo or not isinstance(nodeinfo, dict):
2232     raise errors.OpPrereqError("Could not contact node %s for resource"
2233                              " information" % (node,))
2234
2235   free_mem = nodeinfo[node].get('memory_free')
2236   if not isinstance(free_mem, int):
2237     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2238                              " was '%s'" % (node, free_mem))
2239   if requested > free_mem:
2240     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2241                              " needed %s MiB, available %s MiB" %
2242                              (node, reason, requested, free_mem))
2243
2244
2245 class LUStartupInstance(LogicalUnit):
2246   """Starts an instance.
2247
2248   """
2249   HPATH = "instance-start"
2250   HTYPE = constants.HTYPE_INSTANCE
2251   _OP_REQP = ["instance_name", "force"]
2252   REQ_BGL = False
2253
2254   def ExpandNames(self):
2255     self._ExpandAndLockInstance()
2256     self.needed_locks[locking.LEVEL_NODE] = []
2257     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2258
2259   def DeclareLocks(self, level):
2260     if level == locking.LEVEL_NODE:
2261       self._LockInstancesNodes()
2262
2263   def BuildHooksEnv(self):
2264     """Build hooks env.
2265
2266     This runs on master, primary and secondary nodes of the instance.
2267
2268     """
2269     env = {
2270       "FORCE": self.op.force,
2271       }
2272     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2273     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2274           list(self.instance.secondary_nodes))
2275     return env, nl, nl
2276
2277   def CheckPrereq(self):
2278     """Check prerequisites.
2279
2280     This checks that the instance is in the cluster.
2281
2282     """
2283     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2284     assert self.instance is not None, \
2285       "Cannot retrieve locked instance %s" % self.op.instance_name
2286
2287     bep = self.cfg.GetClusterInfo().FillBE(instance)
2288     # check bridges existance
2289     _CheckInstanceBridgesExist(self, instance)
2290
2291     _CheckNodeFreeMemory(self, instance.primary_node,
2292                          "starting instance %s" % instance.name,
2293                          bep[constants.BE_MEMORY], instance.hypervisor)
2294
2295   def Exec(self, feedback_fn):
2296     """Start the instance.
2297
2298     """
2299     instance = self.instance
2300     force = self.op.force
2301     extra_args = getattr(self.op, "extra_args", "")
2302
2303     self.cfg.MarkInstanceUp(instance.name)
2304
2305     node_current = instance.primary_node
2306
2307     _StartInstanceDisks(self, instance, force)
2308
2309     if not self.rpc.call_instance_start(node_current, instance, extra_args):
2310       _ShutdownInstanceDisks(self, instance)
2311       raise errors.OpExecError("Could not start instance")
2312
2313
2314 class LURebootInstance(LogicalUnit):
2315   """Reboot an instance.
2316
2317   """
2318   HPATH = "instance-reboot"
2319   HTYPE = constants.HTYPE_INSTANCE
2320   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2321   REQ_BGL = False
2322
2323   def ExpandNames(self):
2324     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2325                                    constants.INSTANCE_REBOOT_HARD,
2326                                    constants.INSTANCE_REBOOT_FULL]:
2327       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2328                                   (constants.INSTANCE_REBOOT_SOFT,
2329                                    constants.INSTANCE_REBOOT_HARD,
2330                                    constants.INSTANCE_REBOOT_FULL))
2331     self._ExpandAndLockInstance()
2332     self.needed_locks[locking.LEVEL_NODE] = []
2333     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2334
2335   def DeclareLocks(self, level):
2336     if level == locking.LEVEL_NODE:
2337       primary_only = not constants.INSTANCE_REBOOT_FULL
2338       self._LockInstancesNodes(primary_only=primary_only)
2339
2340   def BuildHooksEnv(self):
2341     """Build hooks env.
2342
2343     This runs on master, primary and secondary nodes of the instance.
2344
2345     """
2346     env = {
2347       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2348       }
2349     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2350     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2351           list(self.instance.secondary_nodes))
2352     return env, nl, nl
2353
2354   def CheckPrereq(self):
2355     """Check prerequisites.
2356
2357     This checks that the instance is in the cluster.
2358
2359     """
2360     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2361     assert self.instance is not None, \
2362       "Cannot retrieve locked instance %s" % self.op.instance_name
2363
2364     # check bridges existance
2365     _CheckInstanceBridgesExist(self, instance)
2366
2367   def Exec(self, feedback_fn):
2368     """Reboot the instance.
2369
2370     """
2371     instance = self.instance
2372     ignore_secondaries = self.op.ignore_secondaries
2373     reboot_type = self.op.reboot_type
2374     extra_args = getattr(self.op, "extra_args", "")
2375
2376     node_current = instance.primary_node
2377
2378     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2379                        constants.INSTANCE_REBOOT_HARD]:
2380       if not self.rpc.call_instance_reboot(node_current, instance,
2381                                            reboot_type, extra_args):
2382         raise errors.OpExecError("Could not reboot instance")
2383     else:
2384       if not self.rpc.call_instance_shutdown(node_current, instance):
2385         raise errors.OpExecError("could not shutdown instance for full reboot")
2386       _ShutdownInstanceDisks(self, instance)
2387       _StartInstanceDisks(self, instance, ignore_secondaries)
2388       if not self.rpc.call_instance_start(node_current, instance, extra_args):
2389         _ShutdownInstanceDisks(self, instance)
2390         raise errors.OpExecError("Could not start instance for full reboot")
2391
2392     self.cfg.MarkInstanceUp(instance.name)
2393
2394
2395 class LUShutdownInstance(LogicalUnit):
2396   """Shutdown an instance.
2397
2398   """
2399   HPATH = "instance-stop"
2400   HTYPE = constants.HTYPE_INSTANCE
2401   _OP_REQP = ["instance_name"]
2402   REQ_BGL = False
2403
2404   def ExpandNames(self):
2405     self._ExpandAndLockInstance()
2406     self.needed_locks[locking.LEVEL_NODE] = []
2407     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2408
2409   def DeclareLocks(self, level):
2410     if level == locking.LEVEL_NODE:
2411       self._LockInstancesNodes()
2412
2413   def BuildHooksEnv(self):
2414     """Build hooks env.
2415
2416     This runs on master, primary and secondary nodes of the instance.
2417
2418     """
2419     env = _BuildInstanceHookEnvByObject(self, self.instance)
2420     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2421           list(self.instance.secondary_nodes))
2422     return env, nl, nl
2423
2424   def CheckPrereq(self):
2425     """Check prerequisites.
2426
2427     This checks that the instance is in the cluster.
2428
2429     """
2430     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2431     assert self.instance is not None, \
2432       "Cannot retrieve locked instance %s" % self.op.instance_name
2433
2434   def Exec(self, feedback_fn):
2435     """Shutdown the instance.
2436
2437     """
2438     instance = self.instance
2439     node_current = instance.primary_node
2440     self.cfg.MarkInstanceDown(instance.name)
2441     if not self.rpc.call_instance_shutdown(node_current, instance):
2442       self.proc.LogWarning("Could not shutdown instance")
2443
2444     _ShutdownInstanceDisks(self, instance)
2445
2446
2447 class LUReinstallInstance(LogicalUnit):
2448   """Reinstall an instance.
2449
2450   """
2451   HPATH = "instance-reinstall"
2452   HTYPE = constants.HTYPE_INSTANCE
2453   _OP_REQP = ["instance_name"]
2454   REQ_BGL = False
2455
2456   def ExpandNames(self):
2457     self._ExpandAndLockInstance()
2458     self.needed_locks[locking.LEVEL_NODE] = []
2459     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2460
2461   def DeclareLocks(self, level):
2462     if level == locking.LEVEL_NODE:
2463       self._LockInstancesNodes()
2464
2465   def BuildHooksEnv(self):
2466     """Build hooks env.
2467
2468     This runs on master, primary and secondary nodes of the instance.
2469
2470     """
2471     env = _BuildInstanceHookEnvByObject(self, self.instance)
2472     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2473           list(self.instance.secondary_nodes))
2474     return env, nl, nl
2475
2476   def CheckPrereq(self):
2477     """Check prerequisites.
2478
2479     This checks that the instance is in the cluster and is not running.
2480
2481     """
2482     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2483     assert instance is not None, \
2484       "Cannot retrieve locked instance %s" % self.op.instance_name
2485
2486     if instance.disk_template == constants.DT_DISKLESS:
2487       raise errors.OpPrereqError("Instance '%s' has no disks" %
2488                                  self.op.instance_name)
2489     if instance.status != "down":
2490       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2491                                  self.op.instance_name)
2492     remote_info = self.rpc.call_instance_info(instance.primary_node,
2493                                               instance.name,
2494                                               instance.hypervisor)
2495     if remote_info:
2496       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2497                                  (self.op.instance_name,
2498                                   instance.primary_node))
2499
2500     self.op.os_type = getattr(self.op, "os_type", None)
2501     if self.op.os_type is not None:
2502       # OS verification
2503       pnode = self.cfg.GetNodeInfo(
2504         self.cfg.ExpandNodeName(instance.primary_node))
2505       if pnode is None:
2506         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2507                                    self.op.pnode)
2508       os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2509       if not os_obj:
2510         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2511                                    " primary node"  % self.op.os_type)
2512
2513     self.instance = instance
2514
2515   def Exec(self, feedback_fn):
2516     """Reinstall the instance.
2517
2518     """
2519     inst = self.instance
2520
2521     if self.op.os_type is not None:
2522       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2523       inst.os = self.op.os_type
2524       self.cfg.Update(inst)
2525
2526     _StartInstanceDisks(self, inst, None)
2527     try:
2528       feedback_fn("Running the instance OS create scripts...")
2529       if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2530         raise errors.OpExecError("Could not install OS for instance %s"
2531                                  " on node %s" %
2532                                  (inst.name, inst.primary_node))
2533     finally:
2534       _ShutdownInstanceDisks(self, inst)
2535
2536
2537 class LURenameInstance(LogicalUnit):
2538   """Rename an instance.
2539
2540   """
2541   HPATH = "instance-rename"
2542   HTYPE = constants.HTYPE_INSTANCE
2543   _OP_REQP = ["instance_name", "new_name"]
2544
2545   def BuildHooksEnv(self):
2546     """Build hooks env.
2547
2548     This runs on master, primary and secondary nodes of the instance.
2549
2550     """
2551     env = _BuildInstanceHookEnvByObject(self, self.instance)
2552     env["INSTANCE_NEW_NAME"] = self.op.new_name
2553     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2554           list(self.instance.secondary_nodes))
2555     return env, nl, nl
2556
2557   def CheckPrereq(self):
2558     """Check prerequisites.
2559
2560     This checks that the instance is in the cluster and is not running.
2561
2562     """
2563     instance = self.cfg.GetInstanceInfo(
2564       self.cfg.ExpandInstanceName(self.op.instance_name))
2565     if instance is None:
2566       raise errors.OpPrereqError("Instance '%s' not known" %
2567                                  self.op.instance_name)
2568     if instance.status != "down":
2569       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2570                                  self.op.instance_name)
2571     remote_info = self.rpc.call_instance_info(instance.primary_node,
2572                                               instance.name,
2573                                               instance.hypervisor)
2574     if remote_info:
2575       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2576                                  (self.op.instance_name,
2577                                   instance.primary_node))
2578     self.instance = instance
2579
2580     # new name verification
2581     name_info = utils.HostInfo(self.op.new_name)
2582
2583     self.op.new_name = new_name = name_info.name
2584     instance_list = self.cfg.GetInstanceList()
2585     if new_name in instance_list:
2586       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2587                                  new_name)
2588
2589     if not getattr(self.op, "ignore_ip", False):
2590       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2591         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2592                                    (name_info.ip, new_name))
2593
2594
2595   def Exec(self, feedback_fn):
2596     """Reinstall the instance.
2597
2598     """
2599     inst = self.instance
2600     old_name = inst.name
2601
2602     if inst.disk_template == constants.DT_FILE:
2603       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2604
2605     self.cfg.RenameInstance(inst.name, self.op.new_name)
2606     # Change the instance lock. This is definitely safe while we hold the BGL
2607     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2608     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2609
2610     # re-read the instance from the configuration after rename
2611     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2612
2613     if inst.disk_template == constants.DT_FILE:
2614       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2615       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2616                                                      old_file_storage_dir,
2617                                                      new_file_storage_dir)
2618
2619       if not result:
2620         raise errors.OpExecError("Could not connect to node '%s' to rename"
2621                                  " directory '%s' to '%s' (but the instance"
2622                                  " has been renamed in Ganeti)" % (
2623                                  inst.primary_node, old_file_storage_dir,
2624                                  new_file_storage_dir))
2625
2626       if not result[0]:
2627         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2628                                  " (but the instance has been renamed in"
2629                                  " Ganeti)" % (old_file_storage_dir,
2630                                                new_file_storage_dir))
2631
2632     _StartInstanceDisks(self, inst, None)
2633     try:
2634       if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2635                                                old_name):
2636         msg = ("Could not run OS rename script for instance %s on node %s"
2637                " (but the instance has been renamed in Ganeti)" %
2638                (inst.name, inst.primary_node))
2639         self.proc.LogWarning(msg)
2640     finally:
2641       _ShutdownInstanceDisks(self, inst)
2642
2643
2644 class LURemoveInstance(LogicalUnit):
2645   """Remove an instance.
2646
2647   """
2648   HPATH = "instance-remove"
2649   HTYPE = constants.HTYPE_INSTANCE
2650   _OP_REQP = ["instance_name", "ignore_failures"]
2651   REQ_BGL = False
2652
2653   def ExpandNames(self):
2654     self._ExpandAndLockInstance()
2655     self.needed_locks[locking.LEVEL_NODE] = []
2656     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2657
2658   def DeclareLocks(self, level):
2659     if level == locking.LEVEL_NODE:
2660       self._LockInstancesNodes()
2661
2662   def BuildHooksEnv(self):
2663     """Build hooks env.
2664
2665     This runs on master, primary and secondary nodes of the instance.
2666
2667     """
2668     env = _BuildInstanceHookEnvByObject(self, self.instance)
2669     nl = [self.cfg.GetMasterNode()]
2670     return env, nl, nl
2671
2672   def CheckPrereq(self):
2673     """Check prerequisites.
2674
2675     This checks that the instance is in the cluster.
2676
2677     """
2678     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2679     assert self.instance is not None, \
2680       "Cannot retrieve locked instance %s" % self.op.instance_name
2681
2682   def Exec(self, feedback_fn):
2683     """Remove the instance.
2684
2685     """
2686     instance = self.instance
2687     logging.info("Shutting down instance %s on node %s",
2688                  instance.name, instance.primary_node)
2689
2690     if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2691       if self.op.ignore_failures:
2692         feedback_fn("Warning: can't shutdown instance")
2693       else:
2694         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2695                                  (instance.name, instance.primary_node))
2696
2697     logging.info("Removing block devices for instance %s", instance.name)
2698
2699     if not _RemoveDisks(self, instance):
2700       if self.op.ignore_failures:
2701         feedback_fn("Warning: can't remove instance's disks")
2702       else:
2703         raise errors.OpExecError("Can't remove instance's disks")
2704
2705     logging.info("Removing instance %s out of cluster config", instance.name)
2706
2707     self.cfg.RemoveInstance(instance.name)
2708     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2709
2710
2711 class LUQueryInstances(NoHooksLU):
2712   """Logical unit for querying instances.
2713
2714   """
2715   _OP_REQP = ["output_fields", "names"]
2716   REQ_BGL = False
2717   _FIELDS_STATIC = _FieldSet(*["name", "os", "pnode", "snodes",
2718                                "admin_state", "admin_ram",
2719                                "disk_template", "ip", "mac", "bridge",
2720                                "sda_size", "sdb_size", "vcpus", "tags",
2721                                "network_port", "beparams",
2722                                "serial_no", "hypervisor", "hvparams",] +
2723                              ["hv/%s" % name
2724                               for name in constants.HVS_PARAMETERS] +
2725                              ["be/%s" % name
2726                               for name in constants.BES_PARAMETERS])
2727   _FIELDS_DYNAMIC = _FieldSet("oper_state", "oper_ram", "status")
2728
2729
2730   def ExpandNames(self):
2731     _CheckOutputFields(static=self._FIELDS_STATIC,
2732                        dynamic=self._FIELDS_DYNAMIC,
2733                        selected=self.op.output_fields)
2734
2735     self.needed_locks = {}
2736     self.share_locks[locking.LEVEL_INSTANCE] = 1
2737     self.share_locks[locking.LEVEL_NODE] = 1
2738
2739     if self.op.names:
2740       self.wanted = _GetWantedInstances(self, self.op.names)
2741     else:
2742       self.wanted = locking.ALL_SET
2743
2744     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2745     if self.do_locking:
2746       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2747       self.needed_locks[locking.LEVEL_NODE] = []
2748       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2749
2750   def DeclareLocks(self, level):
2751     if level == locking.LEVEL_NODE and self.do_locking:
2752       self._LockInstancesNodes()
2753
2754   def CheckPrereq(self):
2755     """Check prerequisites.
2756
2757     """
2758     pass
2759
2760   def Exec(self, feedback_fn):
2761     """Computes the list of nodes and their attributes.
2762
2763     """
2764     all_info = self.cfg.GetAllInstancesInfo()
2765     if self.do_locking:
2766       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2767     elif self.wanted != locking.ALL_SET:
2768       instance_names = self.wanted
2769       missing = set(instance_names).difference(all_info.keys())
2770       if missing:
2771         raise errors.OpExecError(
2772           "Some instances were removed before retrieving their data: %s"
2773           % missing)
2774     else:
2775       instance_names = all_info.keys()
2776
2777     instance_names = utils.NiceSort(instance_names)
2778     instance_list = [all_info[iname] for iname in instance_names]
2779
2780     # begin data gathering
2781
2782     nodes = frozenset([inst.primary_node for inst in instance_list])
2783     hv_list = list(set([inst.hypervisor for inst in instance_list]))
2784
2785     bad_nodes = []
2786     if self.do_locking:
2787       live_data = {}
2788       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2789       for name in nodes:
2790         result = node_data[name]
2791         if result:
2792           live_data.update(result)
2793         elif result == False:
2794           bad_nodes.append(name)
2795         # else no instance is alive
2796     else:
2797       live_data = dict([(name, {}) for name in instance_names])
2798
2799     # end data gathering
2800
2801     HVPREFIX = "hv/"
2802     BEPREFIX = "be/"
2803     output = []
2804     for instance in instance_list:
2805       iout = []
2806       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2807       i_be = self.cfg.GetClusterInfo().FillBE(instance)
2808       for field in self.op.output_fields:
2809         if field == "name":
2810           val = instance.name
2811         elif field == "os":
2812           val = instance.os
2813         elif field == "pnode":
2814           val = instance.primary_node
2815         elif field == "snodes":
2816           val = list(instance.secondary_nodes)
2817         elif field == "admin_state":
2818           val = (instance.status != "down")
2819         elif field == "oper_state":
2820           if instance.primary_node in bad_nodes:
2821             val = None
2822           else:
2823             val = bool(live_data.get(instance.name))
2824         elif field == "status":
2825           if instance.primary_node in bad_nodes:
2826             val = "ERROR_nodedown"
2827           else:
2828             running = bool(live_data.get(instance.name))
2829             if running:
2830               if instance.status != "down":
2831                 val = "running"
2832               else:
2833                 val = "ERROR_up"
2834             else:
2835               if instance.status != "down":
2836                 val = "ERROR_down"
2837               else:
2838                 val = "ADMIN_down"
2839         elif field == "oper_ram":
2840           if instance.primary_node in bad_nodes:
2841             val = None
2842           elif instance.name in live_data:
2843             val = live_data[instance.name].get("memory", "?")
2844           else:
2845             val = "-"
2846         elif field == "disk_template":
2847           val = instance.disk_template
2848         elif field == "ip":
2849           val = instance.nics[0].ip
2850         elif field == "bridge":
2851           val = instance.nics[0].bridge
2852         elif field == "mac":
2853           val = instance.nics[0].mac
2854         elif field == "sda_size" or field == "sdb_size":
2855           disk = instance.FindDisk(field[:3])
2856           if disk is None:
2857             val = None
2858           else:
2859             val = disk.size
2860         elif field == "tags":
2861           val = list(instance.GetTags())
2862         elif field == "serial_no":
2863           val = instance.serial_no
2864         elif field == "network_port":
2865           val = instance.network_port
2866         elif field == "hypervisor":
2867           val = instance.hypervisor
2868         elif field == "hvparams":
2869           val = i_hv
2870         elif (field.startswith(HVPREFIX) and
2871               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2872           val = i_hv.get(field[len(HVPREFIX):], None)
2873         elif field == "beparams":
2874           val = i_be
2875         elif (field.startswith(BEPREFIX) and
2876               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2877           val = i_be.get(field[len(BEPREFIX):], None)
2878         else:
2879           raise errors.ParameterError(field)
2880         iout.append(val)
2881       output.append(iout)
2882
2883     return output
2884
2885
2886 class LUFailoverInstance(LogicalUnit):
2887   """Failover an instance.
2888
2889   """
2890   HPATH = "instance-failover"
2891   HTYPE = constants.HTYPE_INSTANCE
2892   _OP_REQP = ["instance_name", "ignore_consistency"]
2893   REQ_BGL = False
2894
2895   def ExpandNames(self):
2896     self._ExpandAndLockInstance()
2897     self.needed_locks[locking.LEVEL_NODE] = []
2898     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2899
2900   def DeclareLocks(self, level):
2901     if level == locking.LEVEL_NODE:
2902       self._LockInstancesNodes()
2903
2904   def BuildHooksEnv(self):
2905     """Build hooks env.
2906
2907     This runs on master, primary and secondary nodes of the instance.
2908
2909     """
2910     env = {
2911       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2912       }
2913     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2914     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2915     return env, nl, nl
2916
2917   def CheckPrereq(self):
2918     """Check prerequisites.
2919
2920     This checks that the instance is in the cluster.
2921
2922     """
2923     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2924     assert self.instance is not None, \
2925       "Cannot retrieve locked instance %s" % self.op.instance_name
2926
2927     bep = self.cfg.GetClusterInfo().FillBE(instance)
2928     if instance.disk_template not in constants.DTS_NET_MIRROR:
2929       raise errors.OpPrereqError("Instance's disk layout is not"
2930                                  " network mirrored, cannot failover.")
2931
2932     secondary_nodes = instance.secondary_nodes
2933     if not secondary_nodes:
2934       raise errors.ProgrammerError("no secondary node but using "
2935                                    "a mirrored disk template")
2936
2937     target_node = secondary_nodes[0]
2938     # check memory requirements on the secondary node
2939     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2940                          instance.name, bep[constants.BE_MEMORY],
2941                          instance.hypervisor)
2942
2943     # check bridge existance
2944     brlist = [nic.bridge for nic in instance.nics]
2945     if not self.rpc.call_bridges_exist(target_node, brlist):
2946       raise errors.OpPrereqError("One or more target bridges %s does not"
2947                                  " exist on destination node '%s'" %
2948                                  (brlist, target_node))
2949
2950   def Exec(self, feedback_fn):
2951     """Failover an instance.
2952
2953     The failover is done by shutting it down on its present node and
2954     starting it on the secondary.
2955
2956     """
2957     instance = self.instance
2958
2959     source_node = instance.primary_node
2960     target_node = instance.secondary_nodes[0]
2961
2962     feedback_fn("* checking disk consistency between source and target")
2963     for dev in instance.disks:
2964       # for drbd, these are drbd over lvm
2965       if not _CheckDiskConsistency(self, dev, target_node, False):
2966         if instance.status == "up" and not self.op.ignore_consistency:
2967           raise errors.OpExecError("Disk %s is degraded on target node,"
2968                                    " aborting failover." % dev.iv_name)
2969
2970     feedback_fn("* shutting down instance on source node")
2971     logging.info("Shutting down instance %s on node %s",
2972                  instance.name, source_node)
2973
2974     if not self.rpc.call_instance_shutdown(source_node, instance):
2975       if self.op.ignore_consistency:
2976         self.proc.LogWarning("Could not shutdown instance %s on node %s."
2977                              " Proceeding"
2978                              " anyway. Please make sure node %s is down",
2979                              instance.name, source_node, source_node)
2980       else:
2981         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2982                                  (instance.name, source_node))
2983
2984     feedback_fn("* deactivating the instance's disks on source node")
2985     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2986       raise errors.OpExecError("Can't shut down the instance's disks.")
2987
2988     instance.primary_node = target_node
2989     # distribute new instance config to the other nodes
2990     self.cfg.Update(instance)
2991
2992     # Only start the instance if it's marked as up
2993     if instance.status == "up":
2994       feedback_fn("* activating the instance's disks on target node")
2995       logging.info("Starting instance %s on node %s",
2996                    instance.name, target_node)
2997
2998       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2999                                                ignore_secondaries=True)
3000       if not disks_ok:
3001         _ShutdownInstanceDisks(self, instance)
3002         raise errors.OpExecError("Can't activate the instance's disks")
3003
3004       feedback_fn("* starting the instance on the target node")
3005       if not self.rpc.call_instance_start(target_node, instance, None):
3006         _ShutdownInstanceDisks(self, instance)
3007         raise errors.OpExecError("Could not start instance %s on node %s." %
3008                                  (instance.name, target_node))
3009
3010
3011 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3012   """Create a tree of block devices on the primary node.
3013
3014   This always creates all devices.
3015
3016   """
3017   if device.children:
3018     for child in device.children:
3019       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3020         return False
3021
3022   lu.cfg.SetDiskID(device, node)
3023   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3024                                        instance.name, True, info)
3025   if not new_id:
3026     return False
3027   if device.physical_id is None:
3028     device.physical_id = new_id
3029   return True
3030
3031
3032 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3033   """Create a tree of block devices on a secondary node.
3034
3035   If this device type has to be created on secondaries, create it and
3036   all its children.
3037
3038   If not, just recurse to children keeping the same 'force' value.
3039
3040   """
3041   if device.CreateOnSecondary():
3042     force = True
3043   if device.children:
3044     for child in device.children:
3045       if not _CreateBlockDevOnSecondary(lu, node, instance,
3046                                         child, force, info):
3047         return False
3048
3049   if not force:
3050     return True
3051   lu.cfg.SetDiskID(device, node)
3052   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3053                                        instance.name, False, info)
3054   if not new_id:
3055     return False
3056   if device.physical_id is None:
3057     device.physical_id = new_id
3058   return True
3059
3060
3061 def _GenerateUniqueNames(lu, exts):
3062   """Generate a suitable LV name.
3063
3064   This will generate a logical volume name for the given instance.
3065
3066   """
3067   results = []
3068   for val in exts:
3069     new_id = lu.cfg.GenerateUniqueID()
3070     results.append("%s%s" % (new_id, val))
3071   return results
3072
3073
3074 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3075                          p_minor, s_minor):
3076   """Generate a drbd8 device complete with its children.
3077
3078   """
3079   port = lu.cfg.AllocatePort()
3080   vgname = lu.cfg.GetVGName()
3081   shared_secret = lu.cfg.GenerateDRBDSecret()
3082   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3083                           logical_id=(vgname, names[0]))
3084   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3085                           logical_id=(vgname, names[1]))
3086   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3087                           logical_id=(primary, secondary, port,
3088                                       p_minor, s_minor,
3089                                       shared_secret),
3090                           children=[dev_data, dev_meta],
3091                           iv_name=iv_name)
3092   return drbd_dev
3093
3094
3095 def _GenerateDiskTemplate(lu, template_name,
3096                           instance_name, primary_node,
3097                           secondary_nodes, disk_sz, swap_sz,
3098                           file_storage_dir, file_driver):
3099   """Generate the entire disk layout for a given template type.
3100
3101   """
3102   #TODO: compute space requirements
3103
3104   vgname = lu.cfg.GetVGName()
3105   if template_name == constants.DT_DISKLESS:
3106     disks = []
3107   elif template_name == constants.DT_PLAIN:
3108     if len(secondary_nodes) != 0:
3109       raise errors.ProgrammerError("Wrong template configuration")
3110
3111     names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3112     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3113                            logical_id=(vgname, names[0]),
3114                            iv_name = "sda")
3115     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3116                            logical_id=(vgname, names[1]),
3117                            iv_name = "sdb")
3118     disks = [sda_dev, sdb_dev]
3119   elif template_name == constants.DT_DRBD8:
3120     if len(secondary_nodes) != 1:
3121       raise errors.ProgrammerError("Wrong template configuration")
3122     remote_node = secondary_nodes[0]
3123     (minor_pa, minor_pb,
3124      minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3125       [primary_node, primary_node, remote_node, remote_node], instance_name)
3126
3127     names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3128                                       ".sdb_data", ".sdb_meta"])
3129     drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3130                                         disk_sz, names[0:2], "sda",
3131                                         minor_pa, minor_sa)
3132     drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3133                                         swap_sz, names[2:4], "sdb",
3134                                         minor_pb, minor_sb)
3135     disks = [drbd_sda_dev, drbd_sdb_dev]
3136   elif template_name == constants.DT_FILE:
3137     if len(secondary_nodes) != 0:
3138       raise errors.ProgrammerError("Wrong template configuration")
3139
3140     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3141                                 iv_name="sda", logical_id=(file_driver,
3142                                 "%s/sda" % file_storage_dir))
3143     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3144                                 iv_name="sdb", logical_id=(file_driver,
3145                                 "%s/sdb" % file_storage_dir))
3146     disks = [file_sda_dev, file_sdb_dev]
3147   else:
3148     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3149   return disks
3150
3151
3152 def _GetInstanceInfoText(instance):
3153   """Compute that text that should be added to the disk's metadata.
3154
3155   """
3156   return "originstname+%s" % instance.name
3157
3158
3159 def _CreateDisks(lu, instance):
3160   """Create all disks for an instance.
3161
3162   This abstracts away some work from AddInstance.
3163
3164   Args:
3165     instance: the instance object
3166
3167   Returns:
3168     True or False showing the success of the creation process
3169
3170   """
3171   info = _GetInstanceInfoText(instance)
3172
3173   if instance.disk_template == constants.DT_FILE:
3174     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3175     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3176                                                  file_storage_dir)
3177
3178     if not result:
3179       logging.error("Could not connect to node '%s'", instance.primary_node)
3180       return False
3181
3182     if not result[0]:
3183       logging.error("Failed to create directory '%s'", file_storage_dir)
3184       return False
3185
3186   for device in instance.disks:
3187     logging.info("Creating volume %s for instance %s",
3188                  device.iv_name, instance.name)
3189     #HARDCODE
3190     for secondary_node in instance.secondary_nodes:
3191       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3192                                         device, False, info):
3193         logging.error("Failed to create volume %s (%s) on secondary node %s!",
3194                       device.iv_name, device, secondary_node)
3195         return False
3196     #HARDCODE
3197     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3198                                     instance, device, info):
3199       logging.error("Failed to create volume %s on primary!", device.iv_name)
3200       return False
3201
3202   return True
3203
3204
3205 def _RemoveDisks(lu, instance):
3206   """Remove all disks for an instance.
3207
3208   This abstracts away some work from `AddInstance()` and
3209   `RemoveInstance()`. Note that in case some of the devices couldn't
3210   be removed, the removal will continue with the other ones (compare
3211   with `_CreateDisks()`).
3212
3213   Args:
3214     instance: the instance object
3215
3216   Returns:
3217     True or False showing the success of the removal proces
3218
3219   """
3220   logging.info("Removing block devices for instance %s", instance.name)
3221
3222   result = True
3223   for device in instance.disks:
3224     for node, disk in device.ComputeNodeTree(instance.primary_node):
3225       lu.cfg.SetDiskID(disk, node)
3226       if not lu.rpc.call_blockdev_remove(node, disk):
3227         lu.proc.LogWarning("Could not remove block device %s on node %s,"
3228                            " continuing anyway", device.iv_name, node)
3229         result = False
3230
3231   if instance.disk_template == constants.DT_FILE:
3232     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3233     if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3234                                                file_storage_dir):
3235       logging.error("Could not remove directory '%s'", file_storage_dir)
3236       result = False
3237
3238   return result
3239
3240
3241 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3242   """Compute disk size requirements in the volume group
3243
3244   This is currently hard-coded for the two-drive layout.
3245
3246   """
3247   # Required free disk space as a function of disk and swap space
3248   req_size_dict = {
3249     constants.DT_DISKLESS: None,
3250     constants.DT_PLAIN: disk_size + swap_size,
3251     # 256 MB are added for drbd metadata, 128MB for each drbd device
3252     constants.DT_DRBD8: disk_size + swap_size + 256,
3253     constants.DT_FILE: None,
3254   }
3255
3256   if disk_template not in req_size_dict:
3257     raise errors.ProgrammerError("Disk template '%s' size requirement"
3258                                  " is unknown" %  disk_template)
3259
3260   return req_size_dict[disk_template]
3261
3262
3263 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3264   """Hypervisor parameter validation.
3265
3266   This function abstract the hypervisor parameter validation to be
3267   used in both instance create and instance modify.
3268
3269   @type lu: L{LogicalUnit}
3270   @param lu: the logical unit for which we check
3271   @type nodenames: list
3272   @param nodenames: the list of nodes on which we should check
3273   @type hvname: string
3274   @param hvname: the name of the hypervisor we should use
3275   @type hvparams: dict
3276   @param hvparams: the parameters which we need to check
3277   @raise errors.OpPrereqError: if the parameters are not valid
3278
3279   """
3280   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3281                                                   hvname,
3282                                                   hvparams)
3283   for node in nodenames:
3284     info = hvinfo.get(node, None)
3285     if not info or not isinstance(info, (tuple, list)):
3286       raise errors.OpPrereqError("Cannot get current information"
3287                                  " from node '%s' (%s)" % (node, info))
3288     if not info[0]:
3289       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3290                                  " %s" % info[1])
3291
3292
3293 class LUCreateInstance(LogicalUnit):
3294   """Create an instance.
3295
3296   """
3297   HPATH = "instance-add"
3298   HTYPE = constants.HTYPE_INSTANCE
3299   _OP_REQP = ["instance_name", "disk_size",
3300               "disk_template", "swap_size", "mode", "start",
3301               "wait_for_sync", "ip_check", "mac",
3302               "hvparams", "beparams"]
3303   REQ_BGL = False
3304
3305   def _ExpandNode(self, node):
3306     """Expands and checks one node name.
3307
3308     """
3309     node_full = self.cfg.ExpandNodeName(node)
3310     if node_full is None:
3311       raise errors.OpPrereqError("Unknown node %s" % node)
3312     return node_full
3313
3314   def ExpandNames(self):
3315     """ExpandNames for CreateInstance.
3316
3317     Figure out the right locks for instance creation.
3318
3319     """
3320     self.needed_locks = {}
3321
3322     # set optional parameters to none if they don't exist
3323     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3324       if not hasattr(self.op, attr):
3325         setattr(self.op, attr, None)
3326
3327     # cheap checks, mostly valid constants given
3328
3329     # verify creation mode
3330     if self.op.mode not in (constants.INSTANCE_CREATE,
3331                             constants.INSTANCE_IMPORT):
3332       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3333                                  self.op.mode)
3334
3335     # disk template and mirror node verification
3336     if self.op.disk_template not in constants.DISK_TEMPLATES:
3337       raise errors.OpPrereqError("Invalid disk template name")
3338
3339     if self.op.hypervisor is None:
3340       self.op.hypervisor = self.cfg.GetHypervisorType()
3341
3342     cluster = self.cfg.GetClusterInfo()
3343     enabled_hvs = cluster.enabled_hypervisors
3344     if self.op.hypervisor not in enabled_hvs:
3345       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3346                                  " cluster (%s)" % (self.op.hypervisor,
3347                                   ",".join(enabled_hvs)))
3348
3349     # check hypervisor parameter syntax (locally)
3350
3351     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3352                                   self.op.hvparams)
3353     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3354     hv_type.CheckParameterSyntax(filled_hvp)
3355
3356     # fill and remember the beparams dict
3357     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3358                                     self.op.beparams)
3359
3360     #### instance parameters check
3361
3362     # instance name verification
3363     hostname1 = utils.HostInfo(self.op.instance_name)
3364     self.op.instance_name = instance_name = hostname1.name
3365
3366     # this is just a preventive check, but someone might still add this
3367     # instance in the meantime, and creation will fail at lock-add time
3368     if instance_name in self.cfg.GetInstanceList():
3369       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3370                                  instance_name)
3371
3372     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3373
3374     # ip validity checks
3375     ip = getattr(self.op, "ip", None)
3376     if ip is None or ip.lower() == "none":
3377       inst_ip = None
3378     elif ip.lower() == constants.VALUE_AUTO:
3379       inst_ip = hostname1.ip
3380     else:
3381       if not utils.IsValidIP(ip):
3382         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3383                                    " like a valid IP" % ip)
3384       inst_ip = ip
3385     self.inst_ip = self.op.ip = inst_ip
3386     # used in CheckPrereq for ip ping check
3387     self.check_ip = hostname1.ip
3388
3389     # MAC address verification
3390     if self.op.mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3391       if not utils.IsValidMac(self.op.mac.lower()):
3392         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3393                                    self.op.mac)
3394
3395     # file storage checks
3396     if (self.op.file_driver and
3397         not self.op.file_driver in constants.FILE_DRIVER):
3398       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3399                                  self.op.file_driver)
3400
3401     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3402       raise errors.OpPrereqError("File storage directory path not absolute")
3403
3404     ### Node/iallocator related checks
3405     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3406       raise errors.OpPrereqError("One and only one of iallocator and primary"
3407                                  " node must be given")
3408
3409     if self.op.iallocator:
3410       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3411     else:
3412       self.op.pnode = self._ExpandNode(self.op.pnode)
3413       nodelist = [self.op.pnode]
3414       if self.op.snode is not None:
3415         self.op.snode = self._ExpandNode(self.op.snode)
3416         nodelist.append(self.op.snode)
3417       self.needed_locks[locking.LEVEL_NODE] = nodelist
3418
3419     # in case of import lock the source node too
3420     if self.op.mode == constants.INSTANCE_IMPORT:
3421       src_node = getattr(self.op, "src_node", None)
3422       src_path = getattr(self.op, "src_path", None)
3423
3424       if src_node is None or src_path is None:
3425         raise errors.OpPrereqError("Importing an instance requires source"
3426                                    " node and path options")
3427
3428       if not os.path.isabs(src_path):
3429         raise errors.OpPrereqError("The source path must be absolute")
3430
3431       self.op.src_node = src_node = self._ExpandNode(src_node)
3432       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3433         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3434
3435     else: # INSTANCE_CREATE
3436       if getattr(self.op, "os_type", None) is None:
3437         raise errors.OpPrereqError("No guest OS specified")
3438
3439   def _RunAllocator(self):
3440     """Run the allocator based on input opcode.
3441
3442     """
3443     disks = [{"size": self.op.disk_size, "mode": "w"},
3444              {"size": self.op.swap_size, "mode": "w"}]
3445     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3446              "bridge": self.op.bridge}]
3447     ial = IAllocator(self,
3448                      mode=constants.IALLOCATOR_MODE_ALLOC,
3449                      name=self.op.instance_name,
3450                      disk_template=self.op.disk_template,
3451                      tags=[],
3452                      os=self.op.os_type,
3453                      vcpus=self.be_full[constants.BE_VCPUS],
3454                      mem_size=self.be_full[constants.BE_MEMORY],
3455                      disks=disks,
3456                      nics=nics,
3457                      )
3458
3459     ial.Run(self.op.iallocator)
3460
3461     if not ial.success:
3462       raise errors.OpPrereqError("Can't compute nodes using"
3463                                  " iallocator '%s': %s" % (self.op.iallocator,
3464                                                            ial.info))
3465     if len(ial.nodes) != ial.required_nodes:
3466       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3467                                  " of nodes (%s), required %s" %
3468                                  (self.op.iallocator, len(ial.nodes),
3469                                   ial.required_nodes))
3470     self.op.pnode = ial.nodes[0]
3471     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3472                  self.op.instance_name, self.op.iallocator,
3473                  ", ".join(ial.nodes))
3474     if ial.required_nodes == 2:
3475       self.op.snode = ial.nodes[1]
3476
3477   def BuildHooksEnv(self):
3478     """Build hooks env.
3479
3480     This runs on master, primary and secondary nodes of the instance.
3481
3482     """
3483     env = {
3484       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3485       "INSTANCE_DISK_SIZE": self.op.disk_size,
3486       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3487       "INSTANCE_ADD_MODE": self.op.mode,
3488       }
3489     if self.op.mode == constants.INSTANCE_IMPORT:
3490       env["INSTANCE_SRC_NODE"] = self.op.src_node
3491       env["INSTANCE_SRC_PATH"] = self.op.src_path
3492       env["INSTANCE_SRC_IMAGES"] = self.src_images
3493
3494     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3495       primary_node=self.op.pnode,
3496       secondary_nodes=self.secondaries,
3497       status=self.instance_status,
3498       os_type=self.op.os_type,
3499       memory=self.be_full[constants.BE_MEMORY],
3500       vcpus=self.be_full[constants.BE_VCPUS],
3501       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3502     ))
3503
3504     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3505           self.secondaries)
3506     return env, nl, nl
3507
3508
3509   def CheckPrereq(self):
3510     """Check prerequisites.
3511
3512     """
3513     if (not self.cfg.GetVGName() and
3514         self.op.disk_template not in constants.DTS_NOT_LVM):
3515       raise errors.OpPrereqError("Cluster does not support lvm-based"
3516                                  " instances")
3517
3518
3519     if self.op.mode == constants.INSTANCE_IMPORT:
3520       src_node = self.op.src_node
3521       src_path = self.op.src_path
3522
3523       export_info = self.rpc.call_export_info(src_node, src_path)
3524
3525       if not export_info:
3526         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3527
3528       if not export_info.has_section(constants.INISECT_EXP):
3529         raise errors.ProgrammerError("Corrupted export config")
3530
3531       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3532       if (int(ei_version) != constants.EXPORT_VERSION):
3533         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3534                                    (ei_version, constants.EXPORT_VERSION))
3535
3536       # Check that the new instance doesn't have less disks than the export
3537       # TODO: substitute "2" with the actual number of disks requested
3538       instance_disks = 2
3539       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3540       if instance_disks < export_disks:
3541         raise errors.OpPrereqError("Not enough disks to import."
3542                                    " (instance: %d, export: %d)" %
3543                                    (2, export_disks))
3544
3545       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3546       disk_images = []
3547       for idx in range(export_disks):
3548         option = 'disk%d_dump' % idx
3549         if export_info.has_option(constants.INISECT_INS, option):
3550           # FIXME: are the old os-es, disk sizes, etc. useful?
3551           export_name = export_info.get(constants.INISECT_INS, option)
3552           image = os.path.join(src_path, export_name)
3553           disk_images.append(image)
3554         else:
3555           disk_images.append(False)
3556
3557       self.src_images = disk_images
3558
3559       if self.op.mac == constants.VALUE_AUTO:
3560         old_name = export_info.get(constants.INISECT_INS, 'name')
3561         if self.op.instance_name == old_name:
3562           # FIXME: adjust every nic, when we'll be able to create instances
3563           # with more than one
3564           if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
3565             self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
3566
3567     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3568
3569     if self.op.start and not self.op.ip_check:
3570       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3571                                  " adding an instance in start mode")
3572
3573     if self.op.ip_check:
3574       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3575         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3576                                    (self.check_ip, self.op.instance_name))
3577
3578     # bridge verification
3579     bridge = getattr(self.op, "bridge", None)
3580     if bridge is None:
3581       self.op.bridge = self.cfg.GetDefBridge()
3582     else:
3583       self.op.bridge = bridge
3584
3585     #### allocator run
3586
3587     if self.op.iallocator is not None:
3588       self._RunAllocator()
3589
3590     #### node related checks
3591
3592     # check primary node
3593     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3594     assert self.pnode is not None, \
3595       "Cannot retrieve locked node %s" % self.op.pnode
3596     self.secondaries = []
3597
3598     # mirror node verification
3599     if self.op.disk_template in constants.DTS_NET_MIRROR:
3600       if self.op.snode is None:
3601         raise errors.OpPrereqError("The networked disk templates need"
3602                                    " a mirror node")
3603       if self.op.snode == pnode.name:
3604         raise errors.OpPrereqError("The secondary node cannot be"
3605                                    " the primary node.")
3606       self.secondaries.append(self.op.snode)
3607
3608     nodenames = [pnode.name] + self.secondaries
3609
3610     req_size = _ComputeDiskSize(self.op.disk_template,
3611                                 self.op.disk_size, self.op.swap_size)
3612
3613     # Check lv size requirements
3614     if req_size is not None:
3615       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3616                                          self.op.hypervisor)
3617       for node in nodenames:
3618         info = nodeinfo.get(node, None)
3619         if not info:
3620           raise errors.OpPrereqError("Cannot get current information"
3621                                      " from node '%s'" % node)
3622         vg_free = info.get('vg_free', None)
3623         if not isinstance(vg_free, int):
3624           raise errors.OpPrereqError("Can't compute free disk space on"
3625                                      " node %s" % node)
3626         if req_size > info['vg_free']:
3627           raise errors.OpPrereqError("Not enough disk space on target node %s."
3628                                      " %d MB available, %d MB required" %
3629                                      (node, info['vg_free'], req_size))
3630
3631     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3632
3633     # os verification
3634     os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3635     if not os_obj:
3636       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3637                                  " primary node"  % self.op.os_type)
3638
3639     # bridge check on primary node
3640     if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3641       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3642                                  " destination node '%s'" %
3643                                  (self.op.bridge, pnode.name))
3644
3645     # memory check on primary node
3646     if self.op.start:
3647       _CheckNodeFreeMemory(self, self.pnode.name,
3648                            "creating instance %s" % self.op.instance_name,
3649                            self.be_full[constants.BE_MEMORY],
3650                            self.op.hypervisor)
3651
3652     if self.op.start:
3653       self.instance_status = 'up'
3654     else:
3655       self.instance_status = 'down'
3656
3657   def Exec(self, feedback_fn):
3658     """Create and add the instance to the cluster.
3659
3660     """
3661     instance = self.op.instance_name
3662     pnode_name = self.pnode.name
3663
3664     if self.op.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3665       mac_address = self.cfg.GenerateMAC()
3666     else:
3667       mac_address = self.op.mac
3668
3669     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3670     if self.inst_ip is not None:
3671       nic.ip = self.inst_ip
3672
3673     ht_kind = self.op.hypervisor
3674     if ht_kind in constants.HTS_REQ_PORT:
3675       network_port = self.cfg.AllocatePort()
3676     else:
3677       network_port = None
3678
3679     ##if self.op.vnc_bind_address is None:
3680     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3681
3682     # this is needed because os.path.join does not accept None arguments
3683     if self.op.file_storage_dir is None:
3684       string_file_storage_dir = ""
3685     else:
3686       string_file_storage_dir = self.op.file_storage_dir
3687
3688     # build the full file storage dir path
3689     file_storage_dir = os.path.normpath(os.path.join(
3690                                         self.cfg.GetFileStorageDir(),
3691                                         string_file_storage_dir, instance))
3692
3693
3694     disks = _GenerateDiskTemplate(self,
3695                                   self.op.disk_template,
3696                                   instance, pnode_name,
3697                                   self.secondaries, self.op.disk_size,
3698                                   self.op.swap_size,
3699                                   file_storage_dir,
3700                                   self.op.file_driver)
3701
3702     iobj = objects.Instance(name=instance, os=self.op.os_type,
3703                             primary_node=pnode_name,
3704                             nics=[nic], disks=disks,
3705                             disk_template=self.op.disk_template,
3706                             status=self.instance_status,
3707                             network_port=network_port,
3708                             beparams=self.op.beparams,
3709                             hvparams=self.op.hvparams,
3710                             hypervisor=self.op.hypervisor,
3711                             )
3712
3713     feedback_fn("* creating instance disks...")
3714     if not _CreateDisks(self, iobj):
3715       _RemoveDisks(self, iobj)
3716       self.cfg.ReleaseDRBDMinors(instance)
3717       raise errors.OpExecError("Device creation failed, reverting...")
3718
3719     feedback_fn("adding instance %s to cluster config" % instance)
3720
3721     self.cfg.AddInstance(iobj)
3722     # Declare that we don't want to remove the instance lock anymore, as we've
3723     # added the instance to the config
3724     del self.remove_locks[locking.LEVEL_INSTANCE]
3725     # Remove the temp. assignements for the instance's drbds
3726     self.cfg.ReleaseDRBDMinors(instance)
3727
3728     if self.op.wait_for_sync:
3729       disk_abort = not _WaitForSync(self, iobj)
3730     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3731       # make sure the disks are not degraded (still sync-ing is ok)
3732       time.sleep(15)
3733       feedback_fn("* checking mirrors status")
3734       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3735     else:
3736       disk_abort = False
3737
3738     if disk_abort:
3739       _RemoveDisks(self, iobj)
3740       self.cfg.RemoveInstance(iobj.name)
3741       # Make sure the instance lock gets removed
3742       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3743       raise errors.OpExecError("There are some degraded disks for"
3744                                " this instance")
3745
3746     feedback_fn("creating os for instance %s on node %s" %
3747                 (instance, pnode_name))
3748
3749     if iobj.disk_template != constants.DT_DISKLESS:
3750       if self.op.mode == constants.INSTANCE_CREATE:
3751         feedback_fn("* running the instance OS create scripts...")
3752         if not self.rpc.call_instance_os_add(pnode_name, iobj):
3753           raise errors.OpExecError("could not add os for instance %s"
3754                                    " on node %s" %
3755                                    (instance, pnode_name))
3756
3757       elif self.op.mode == constants.INSTANCE_IMPORT:
3758         feedback_fn("* running the instance OS import scripts...")
3759         src_node = self.op.src_node
3760         src_images = self.src_images
3761         cluster_name = self.cfg.GetClusterName()
3762         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3763                                                          src_node, src_images,
3764                                                          cluster_name)
3765         for idx, result in enumerate(import_result):
3766           if not result:
3767             self.LogWarning("Could not image %s for on instance %s, disk %d,"
3768                             " on node %s" % (src_images[idx], instance, idx,
3769                                              pnode_name))
3770       else:
3771         # also checked in the prereq part
3772         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3773                                      % self.op.mode)
3774
3775     if self.op.start:
3776       logging.info("Starting instance %s on node %s", instance, pnode_name)
3777       feedback_fn("* starting instance...")
3778       if not self.rpc.call_instance_start(pnode_name, iobj, None):
3779         raise errors.OpExecError("Could not start instance")
3780
3781
3782 class LUConnectConsole(NoHooksLU):
3783   """Connect to an instance's console.
3784
3785   This is somewhat special in that it returns the command line that
3786   you need to run on the master node in order to connect to the
3787   console.
3788
3789   """
3790   _OP_REQP = ["instance_name"]
3791   REQ_BGL = False
3792
3793   def ExpandNames(self):
3794     self._ExpandAndLockInstance()
3795
3796   def CheckPrereq(self):
3797     """Check prerequisites.
3798
3799     This checks that the instance is in the cluster.
3800
3801     """
3802     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3803     assert self.instance is not None, \
3804       "Cannot retrieve locked instance %s" % self.op.instance_name
3805
3806   def Exec(self, feedback_fn):
3807     """Connect to the console of an instance
3808
3809     """
3810     instance = self.instance
3811     node = instance.primary_node
3812
3813     node_insts = self.rpc.call_instance_list([node],
3814                                              [instance.hypervisor])[node]
3815     if node_insts is False:
3816       raise errors.OpExecError("Can't connect to node %s." % node)
3817
3818     if instance.name not in node_insts:
3819       raise errors.OpExecError("Instance %s is not running." % instance.name)
3820
3821     logging.debug("Connecting to console of %s on %s", instance.name, node)
3822
3823     hyper = hypervisor.GetHypervisor(instance.hypervisor)
3824     console_cmd = hyper.GetShellCommandForConsole(instance)
3825
3826     # build ssh cmdline
3827     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3828
3829
3830 class LUReplaceDisks(LogicalUnit):
3831   """Replace the disks of an instance.
3832
3833   """
3834   HPATH = "mirrors-replace"
3835   HTYPE = constants.HTYPE_INSTANCE
3836   _OP_REQP = ["instance_name", "mode", "disks"]
3837   REQ_BGL = False
3838
3839   def ExpandNames(self):
3840     self._ExpandAndLockInstance()
3841
3842     if not hasattr(self.op, "remote_node"):
3843       self.op.remote_node = None
3844
3845     ia_name = getattr(self.op, "iallocator", None)
3846     if ia_name is not None:
3847       if self.op.remote_node is not None:
3848         raise errors.OpPrereqError("Give either the iallocator or the new"
3849                                    " secondary, not both")
3850       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3851     elif self.op.remote_node is not None:
3852       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3853       if remote_node is None:
3854         raise errors.OpPrereqError("Node '%s' not known" %
3855                                    self.op.remote_node)
3856       self.op.remote_node = remote_node
3857       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3858       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3859     else:
3860       self.needed_locks[locking.LEVEL_NODE] = []
3861       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3862
3863   def DeclareLocks(self, level):
3864     # If we're not already locking all nodes in the set we have to declare the
3865     # instance's primary/secondary nodes.
3866     if (level == locking.LEVEL_NODE and
3867         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3868       self._LockInstancesNodes()
3869
3870   def _RunAllocator(self):
3871     """Compute a new secondary node using an IAllocator.
3872
3873     """
3874     ial = IAllocator(self,
3875                      mode=constants.IALLOCATOR_MODE_RELOC,
3876                      name=self.op.instance_name,
3877                      relocate_from=[self.sec_node])
3878
3879     ial.Run(self.op.iallocator)
3880
3881     if not ial.success:
3882       raise errors.OpPrereqError("Can't compute nodes using"
3883                                  " iallocator '%s': %s" % (self.op.iallocator,
3884                                                            ial.info))
3885     if len(ial.nodes) != ial.required_nodes:
3886       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3887                                  " of nodes (%s), required %s" %
3888                                  (len(ial.nodes), ial.required_nodes))
3889     self.op.remote_node = ial.nodes[0]
3890     self.LogInfo("Selected new secondary for the instance: %s",
3891                  self.op.remote_node)
3892
3893   def BuildHooksEnv(self):
3894     """Build hooks env.
3895
3896     This runs on the master, the primary and all the secondaries.
3897
3898     """
3899     env = {
3900       "MODE": self.op.mode,
3901       "NEW_SECONDARY": self.op.remote_node,
3902       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3903       }
3904     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3905     nl = [
3906       self.cfg.GetMasterNode(),
3907       self.instance.primary_node,
3908       ]
3909     if self.op.remote_node is not None:
3910       nl.append(self.op.remote_node)
3911     return env, nl, nl
3912
3913   def CheckPrereq(self):
3914     """Check prerequisites.
3915
3916     This checks that the instance is in the cluster.
3917
3918     """
3919     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3920     assert instance is not None, \
3921       "Cannot retrieve locked instance %s" % self.op.instance_name
3922     self.instance = instance
3923
3924     if instance.disk_template not in constants.DTS_NET_MIRROR:
3925       raise errors.OpPrereqError("Instance's disk layout is not"
3926                                  " network mirrored.")
3927
3928     if len(instance.secondary_nodes) != 1:
3929       raise errors.OpPrereqError("The instance has a strange layout,"
3930                                  " expected one secondary but found %d" %
3931                                  len(instance.secondary_nodes))
3932
3933     self.sec_node = instance.secondary_nodes[0]
3934
3935     ia_name = getattr(self.op, "iallocator", None)
3936     if ia_name is not None:
3937       self._RunAllocator()
3938
3939     remote_node = self.op.remote_node
3940     if remote_node is not None:
3941       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3942       assert self.remote_node_info is not None, \
3943         "Cannot retrieve locked node %s" % remote_node
3944     else:
3945       self.remote_node_info = None
3946     if remote_node == instance.primary_node:
3947       raise errors.OpPrereqError("The specified node is the primary node of"
3948                                  " the instance.")
3949     elif remote_node == self.sec_node:
3950       if self.op.mode == constants.REPLACE_DISK_SEC:
3951         # this is for DRBD8, where we can't execute the same mode of
3952         # replacement as for drbd7 (no different port allocated)
3953         raise errors.OpPrereqError("Same secondary given, cannot execute"
3954                                    " replacement")
3955     if instance.disk_template == constants.DT_DRBD8:
3956       if (self.op.mode == constants.REPLACE_DISK_ALL and
3957           remote_node is not None):
3958         # switch to replace secondary mode
3959         self.op.mode = constants.REPLACE_DISK_SEC
3960
3961       if self.op.mode == constants.REPLACE_DISK_ALL:
3962         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3963                                    " secondary disk replacement, not"
3964                                    " both at once")
3965       elif self.op.mode == constants.REPLACE_DISK_PRI:
3966         if remote_node is not None:
3967           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3968                                      " the secondary while doing a primary"
3969                                      " node disk replacement")
3970         self.tgt_node = instance.primary_node
3971         self.oth_node = instance.secondary_nodes[0]
3972       elif self.op.mode == constants.REPLACE_DISK_SEC:
3973         self.new_node = remote_node # this can be None, in which case
3974                                     # we don't change the secondary
3975         self.tgt_node = instance.secondary_nodes[0]
3976         self.oth_node = instance.primary_node
3977       else:
3978         raise errors.ProgrammerError("Unhandled disk replace mode")
3979
3980     for name in self.op.disks:
3981       if instance.FindDisk(name) is None:
3982         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3983                                    (name, instance.name))
3984
3985   def _ExecD8DiskOnly(self, feedback_fn):
3986     """Replace a disk on the primary or secondary for dbrd8.
3987
3988     The algorithm for replace is quite complicated:
3989       - for each disk to be replaced:
3990         - create new LVs on the target node with unique names
3991         - detach old LVs from the drbd device
3992         - rename old LVs to name_replaced.<time_t>
3993         - rename new LVs to old LVs
3994         - attach the new LVs (with the old names now) to the drbd device
3995       - wait for sync across all devices
3996       - for each modified disk:
3997         - remove old LVs (which have the name name_replaces.<time_t>)
3998
3999     Failures are not very well handled.
4000
4001     """
4002     steps_total = 6
4003     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4004     instance = self.instance
4005     iv_names = {}
4006     vgname = self.cfg.GetVGName()
4007     # start of work
4008     cfg = self.cfg
4009     tgt_node = self.tgt_node
4010     oth_node = self.oth_node
4011
4012     # Step: check device activation
4013     self.proc.LogStep(1, steps_total, "check device existence")
4014     info("checking volume groups")
4015     my_vg = cfg.GetVGName()
4016     results = self.rpc.call_vg_list([oth_node, tgt_node])
4017     if not results:
4018       raise errors.OpExecError("Can't list volume groups on the nodes")
4019     for node in oth_node, tgt_node:
4020       res = results.get(node, False)
4021       if not res or my_vg not in res:
4022         raise errors.OpExecError("Volume group '%s' not found on %s" %
4023                                  (my_vg, node))
4024     for dev in instance.disks:
4025       if not dev.iv_name in self.op.disks:
4026         continue
4027       for node in tgt_node, oth_node:
4028         info("checking %s on %s" % (dev.iv_name, node))
4029         cfg.SetDiskID(dev, node)
4030         if not self.rpc.call_blockdev_find(node, dev):
4031           raise errors.OpExecError("Can't find device %s on node %s" %
4032                                    (dev.iv_name, node))
4033
4034     # Step: check other node consistency
4035     self.proc.LogStep(2, steps_total, "check peer consistency")
4036     for dev in instance.disks:
4037       if not dev.iv_name in self.op.disks:
4038         continue
4039       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4040       if not _CheckDiskConsistency(self, dev, oth_node,
4041                                    oth_node==instance.primary_node):
4042         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4043                                  " to replace disks on this node (%s)" %
4044                                  (oth_node, tgt_node))
4045
4046     # Step: create new storage
4047     self.proc.LogStep(3, steps_total, "allocate new storage")
4048     for dev in instance.disks:
4049       if not dev.iv_name in self.op.disks:
4050         continue
4051       size = dev.size
4052       cfg.SetDiskID(dev, tgt_node)
4053       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4054       names = _GenerateUniqueNames(self, lv_names)
4055       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4056                              logical_id=(vgname, names[0]))
4057       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4058                              logical_id=(vgname, names[1]))
4059       new_lvs = [lv_data, lv_meta]
4060       old_lvs = dev.children
4061       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4062       info("creating new local storage on %s for %s" %
4063            (tgt_node, dev.iv_name))
4064       # since we *always* want to create this LV, we use the
4065       # _Create...OnPrimary (which forces the creation), even if we
4066       # are talking about the secondary node
4067       for new_lv in new_lvs:
4068         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4069                                         _GetInstanceInfoText(instance)):
4070           raise errors.OpExecError("Failed to create new LV named '%s' on"
4071                                    " node '%s'" %
4072                                    (new_lv.logical_id[1], tgt_node))
4073
4074     # Step: for each lv, detach+rename*2+attach
4075     self.proc.LogStep(4, steps_total, "change drbd configuration")
4076     for dev, old_lvs, new_lvs in iv_names.itervalues():
4077       info("detaching %s drbd from local storage" % dev.iv_name)
4078       if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4079         raise errors.OpExecError("Can't detach drbd from local storage on node"
4080                                  " %s for device %s" % (tgt_node, dev.iv_name))
4081       #dev.children = []
4082       #cfg.Update(instance)
4083
4084       # ok, we created the new LVs, so now we know we have the needed
4085       # storage; as such, we proceed on the target node to rename
4086       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4087       # using the assumption that logical_id == physical_id (which in
4088       # turn is the unique_id on that node)
4089
4090       # FIXME(iustin): use a better name for the replaced LVs
4091       temp_suffix = int(time.time())
4092       ren_fn = lambda d, suff: (d.physical_id[0],
4093                                 d.physical_id[1] + "_replaced-%s" % suff)
4094       # build the rename list based on what LVs exist on the node
4095       rlist = []
4096       for to_ren in old_lvs:
4097         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4098         if find_res is not None: # device exists
4099           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4100
4101       info("renaming the old LVs on the target node")
4102       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4103         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4104       # now we rename the new LVs to the old LVs
4105       info("renaming the new LVs on the target node")
4106       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4107       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4108         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4109
4110       for old, new in zip(old_lvs, new_lvs):
4111         new.logical_id = old.logical_id
4112         cfg.SetDiskID(new, tgt_node)
4113
4114       for disk in old_lvs:
4115         disk.logical_id = ren_fn(disk, temp_suffix)
4116         cfg.SetDiskID(disk, tgt_node)
4117
4118       # now that the new lvs have the old name, we can add them to the device
4119       info("adding new mirror component on %s" % tgt_node)
4120       if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4121         for new_lv in new_lvs:
4122           if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4123             warning("Can't rollback device %s", hint="manually cleanup unused"
4124                     " logical volumes")
4125         raise errors.OpExecError("Can't add local storage to drbd")
4126
4127       dev.children = new_lvs
4128       cfg.Update(instance)
4129
4130     # Step: wait for sync
4131
4132     # this can fail as the old devices are degraded and _WaitForSync
4133     # does a combined result over all disks, so we don't check its
4134     # return value
4135     self.proc.LogStep(5, steps_total, "sync devices")
4136     _WaitForSync(self, instance, unlock=True)
4137
4138     # so check manually all the devices
4139     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4140       cfg.SetDiskID(dev, instance.primary_node)
4141       is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4142       if is_degr:
4143         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4144
4145     # Step: remove old storage
4146     self.proc.LogStep(6, steps_total, "removing old storage")
4147     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4148       info("remove logical volumes for %s" % name)
4149       for lv in old_lvs:
4150         cfg.SetDiskID(lv, tgt_node)
4151         if not self.rpc.call_blockdev_remove(tgt_node, lv):
4152           warning("Can't remove old LV", hint="manually remove unused LVs")
4153           continue
4154
4155   def _ExecD8Secondary(self, feedback_fn):
4156     """Replace the secondary node for drbd8.
4157
4158     The algorithm for replace is quite complicated:
4159       - for all disks of the instance:
4160         - create new LVs on the new node with same names
4161         - shutdown the drbd device on the old secondary
4162         - disconnect the drbd network on the primary
4163         - create the drbd device on the new secondary
4164         - network attach the drbd on the primary, using an artifice:
4165           the drbd code for Attach() will connect to the network if it
4166           finds a device which is connected to the good local disks but
4167           not network enabled
4168       - wait for sync across all devices
4169       - remove all disks from the old secondary
4170
4171     Failures are not very well handled.
4172
4173     """
4174     steps_total = 6
4175     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4176     instance = self.instance
4177     iv_names = {}
4178     vgname = self.cfg.GetVGName()
4179     # start of work
4180     cfg = self.cfg
4181     old_node = self.tgt_node
4182     new_node = self.new_node
4183     pri_node = instance.primary_node
4184
4185     # Step: check device activation
4186     self.proc.LogStep(1, steps_total, "check device existence")
4187     info("checking volume groups")
4188     my_vg = cfg.GetVGName()
4189     results = self.rpc.call_vg_list([pri_node, new_node])
4190     if not results:
4191       raise errors.OpExecError("Can't list volume groups on the nodes")
4192     for node in pri_node, new_node:
4193       res = results.get(node, False)
4194       if not res or my_vg not in res:
4195         raise errors.OpExecError("Volume group '%s' not found on %s" %
4196                                  (my_vg, node))
4197     for dev in instance.disks:
4198       if not dev.iv_name in self.op.disks:
4199         continue
4200       info("checking %s on %s" % (dev.iv_name, pri_node))
4201       cfg.SetDiskID(dev, pri_node)
4202       if not self.rpc.call_blockdev_find(pri_node, dev):
4203         raise errors.OpExecError("Can't find device %s on node %s" %
4204                                  (dev.iv_name, pri_node))
4205
4206     # Step: check other node consistency
4207     self.proc.LogStep(2, steps_total, "check peer consistency")
4208     for dev in instance.disks:
4209       if not dev.iv_name in self.op.disks:
4210         continue
4211       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4212       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4213         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4214                                  " unsafe to replace the secondary" %
4215                                  pri_node)
4216
4217     # Step: create new storage
4218     self.proc.LogStep(3, steps_total, "allocate new storage")
4219     for dev in instance.disks:
4220       size = dev.size
4221       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4222       # since we *always* want to create this LV, we use the
4223       # _Create...OnPrimary (which forces the creation), even if we
4224       # are talking about the secondary node
4225       for new_lv in dev.children:
4226         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4227                                         _GetInstanceInfoText(instance)):
4228           raise errors.OpExecError("Failed to create new LV named '%s' on"
4229                                    " node '%s'" %
4230                                    (new_lv.logical_id[1], new_node))
4231
4232
4233     # Step 4: dbrd minors and drbd setups changes
4234     # after this, we must manually remove the drbd minors on both the
4235     # error and the success paths
4236     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4237                                    instance.name)
4238     logging.debug("Allocated minors %s" % (minors,))
4239     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4240     for dev, new_minor in zip(instance.disks, minors):
4241       size = dev.size
4242       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4243       # create new devices on new_node
4244       if pri_node == dev.logical_id[0]:
4245         new_logical_id = (pri_node, new_node,
4246                           dev.logical_id[2], dev.logical_id[3], new_minor,
4247                           dev.logical_id[5])
4248       else:
4249         new_logical_id = (new_node, pri_node,
4250                           dev.logical_id[2], new_minor, dev.logical_id[4],
4251                           dev.logical_id[5])
4252       iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4253       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4254                     new_logical_id)
4255       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4256                               logical_id=new_logical_id,
4257                               children=dev.children)
4258       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4259                                         new_drbd, False,
4260                                         _GetInstanceInfoText(instance)):
4261         self.cfg.ReleaseDRBDMinors(instance.name)
4262         raise errors.OpExecError("Failed to create new DRBD on"
4263                                  " node '%s'" % new_node)
4264
4265     for dev in instance.disks:
4266       # we have new devices, shutdown the drbd on the old secondary
4267       info("shutting down drbd for %s on old node" % dev.iv_name)
4268       cfg.SetDiskID(dev, old_node)
4269       if not self.rpc.call_blockdev_shutdown(old_node, dev):
4270         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4271                 hint="Please cleanup this device manually as soon as possible")
4272
4273     info("detaching primary drbds from the network (=> standalone)")
4274     done = 0
4275     for dev in instance.disks:
4276       cfg.SetDiskID(dev, pri_node)
4277       # set the network part of the physical (unique in bdev terms) id
4278       # to None, meaning detach from network
4279       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4280       # and 'find' the device, which will 'fix' it to match the
4281       # standalone state
4282       if self.rpc.call_blockdev_find(pri_node, dev):
4283         done += 1
4284       else:
4285         warning("Failed to detach drbd %s from network, unusual case" %
4286                 dev.iv_name)
4287
4288     if not done:
4289       # no detaches succeeded (very unlikely)
4290       self.cfg.ReleaseDRBDMinors(instance.name)
4291       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4292
4293     # if we managed to detach at least one, we update all the disks of
4294     # the instance to point to the new secondary
4295     info("updating instance configuration")
4296     for dev, _, new_logical_id in iv_names.itervalues():
4297       dev.logical_id = new_logical_id
4298       cfg.SetDiskID(dev, pri_node)
4299     cfg.Update(instance)
4300     # we can remove now the temp minors as now the new values are
4301     # written to the config file (and therefore stable)
4302     self.cfg.ReleaseDRBDMinors(instance.name)
4303
4304     # and now perform the drbd attach
4305     info("attaching primary drbds to new secondary (standalone => connected)")
4306     failures = []
4307     for dev in instance.disks:
4308       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4309       # since the attach is smart, it's enough to 'find' the device,
4310       # it will automatically activate the network, if the physical_id
4311       # is correct
4312       cfg.SetDiskID(dev, pri_node)
4313       logging.debug("Disk to attach: %s", dev)
4314       if not self.rpc.call_blockdev_find(pri_node, dev):
4315         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4316                 "please do a gnt-instance info to see the status of disks")
4317
4318     # this can fail as the old devices are degraded and _WaitForSync
4319     # does a combined result over all disks, so we don't check its
4320     # return value
4321     self.proc.LogStep(5, steps_total, "sync devices")
4322     _WaitForSync(self, instance, unlock=True)
4323
4324     # so check manually all the devices
4325     for name, (dev, old_lvs, _) in iv_names.iteritems():
4326       cfg.SetDiskID(dev, pri_node)
4327       is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4328       if is_degr:
4329         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4330
4331     self.proc.LogStep(6, steps_total, "removing old storage")
4332     for name, (dev, old_lvs, _) in iv_names.iteritems():
4333       info("remove logical volumes for %s" % name)
4334       for lv in old_lvs:
4335         cfg.SetDiskID(lv, old_node)
4336         if not self.rpc.call_blockdev_remove(old_node, lv):
4337           warning("Can't remove LV on old secondary",
4338                   hint="Cleanup stale volumes by hand")
4339
4340   def Exec(self, feedback_fn):
4341     """Execute disk replacement.
4342
4343     This dispatches the disk replacement to the appropriate handler.
4344
4345     """
4346     instance = self.instance
4347
4348     # Activate the instance disks if we're replacing them on a down instance
4349     if instance.status == "down":
4350       _StartInstanceDisks(self, instance, True)
4351
4352     if instance.disk_template == constants.DT_DRBD8:
4353       if self.op.remote_node is None:
4354         fn = self._ExecD8DiskOnly
4355       else:
4356         fn = self._ExecD8Secondary
4357     else:
4358       raise errors.ProgrammerError("Unhandled disk replacement case")
4359
4360     ret = fn(feedback_fn)
4361
4362     # Deactivate the instance disks if we're replacing them on a down instance
4363     if instance.status == "down":
4364       _SafeShutdownInstanceDisks(self, instance)
4365
4366     return ret
4367
4368
4369 class LUGrowDisk(LogicalUnit):
4370   """Grow a disk of an instance.
4371
4372   """
4373   HPATH = "disk-grow"
4374   HTYPE = constants.HTYPE_INSTANCE
4375   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4376   REQ_BGL = False
4377
4378   def ExpandNames(self):
4379     self._ExpandAndLockInstance()
4380     self.needed_locks[locking.LEVEL_NODE] = []
4381     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4382
4383   def DeclareLocks(self, level):
4384     if level == locking.LEVEL_NODE:
4385       self._LockInstancesNodes()
4386
4387   def BuildHooksEnv(self):
4388     """Build hooks env.
4389
4390     This runs on the master, the primary and all the secondaries.
4391
4392     """
4393     env = {
4394       "DISK": self.op.disk,
4395       "AMOUNT": self.op.amount,
4396       }
4397     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4398     nl = [
4399       self.cfg.GetMasterNode(),
4400       self.instance.primary_node,
4401       ]
4402     return env, nl, nl
4403
4404   def CheckPrereq(self):
4405     """Check prerequisites.
4406
4407     This checks that the instance is in the cluster.
4408
4409     """
4410     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4411     assert instance is not None, \
4412       "Cannot retrieve locked instance %s" % self.op.instance_name
4413
4414     self.instance = instance
4415
4416     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4417       raise errors.OpPrereqError("Instance's disk layout does not support"
4418                                  " growing.")
4419
4420     if instance.FindDisk(self.op.disk) is None:
4421       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4422                                  (self.op.disk, instance.name))
4423
4424     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4425     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4426                                        instance.hypervisor)
4427     for node in nodenames:
4428       info = nodeinfo.get(node, None)
4429       if not info:
4430         raise errors.OpPrereqError("Cannot get current information"
4431                                    " from node '%s'" % node)
4432       vg_free = info.get('vg_free', None)
4433       if not isinstance(vg_free, int):
4434         raise errors.OpPrereqError("Can't compute free disk space on"
4435                                    " node %s" % node)
4436       if self.op.amount > info['vg_free']:
4437         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4438                                    " %d MiB available, %d MiB required" %
4439                                    (node, info['vg_free'], self.op.amount))
4440
4441   def Exec(self, feedback_fn):
4442     """Execute disk grow.
4443
4444     """
4445     instance = self.instance
4446     disk = instance.FindDisk(self.op.disk)
4447     for node in (instance.secondary_nodes + (instance.primary_node,)):
4448       self.cfg.SetDiskID(disk, node)
4449       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4450       if (not result or not isinstance(result, (list, tuple)) or
4451           len(result) != 2):
4452         raise errors.OpExecError("grow request failed to node %s" % node)
4453       elif not result[0]:
4454         raise errors.OpExecError("grow request failed to node %s: %s" %
4455                                  (node, result[1]))
4456     disk.RecordGrow(self.op.amount)
4457     self.cfg.Update(instance)
4458     if self.op.wait_for_sync:
4459       disk_abort = not _WaitForSync(self, instance)
4460       if disk_abort:
4461         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4462                              " status.\nPlease check the instance.")
4463
4464
4465 class LUQueryInstanceData(NoHooksLU):
4466   """Query runtime instance data.
4467
4468   """
4469   _OP_REQP = ["instances", "static"]
4470   REQ_BGL = False
4471
4472   def ExpandNames(self):
4473     self.needed_locks = {}
4474     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4475
4476     if not isinstance(self.op.instances, list):
4477       raise errors.OpPrereqError("Invalid argument type 'instances'")
4478
4479     if self.op.instances:
4480       self.wanted_names = []
4481       for name in self.op.instances:
4482         full_name = self.cfg.ExpandInstanceName(name)
4483         if full_name is None:
4484           raise errors.OpPrereqError("Instance '%s' not known" %
4485                                      self.op.instance_name)
4486         self.wanted_names.append(full_name)
4487       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4488     else:
4489       self.wanted_names = None
4490       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4491
4492     self.needed_locks[locking.LEVEL_NODE] = []
4493     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4494
4495   def DeclareLocks(self, level):
4496     if level == locking.LEVEL_NODE:
4497       self._LockInstancesNodes()
4498
4499   def CheckPrereq(self):
4500     """Check prerequisites.
4501
4502     This only checks the optional instance list against the existing names.
4503
4504     """
4505     if self.wanted_names is None:
4506       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4507
4508     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4509                              in self.wanted_names]
4510     return
4511
4512   def _ComputeDiskStatus(self, instance, snode, dev):
4513     """Compute block device status.
4514
4515     """
4516     static = self.op.static
4517     if not static:
4518       self.cfg.SetDiskID(dev, instance.primary_node)
4519       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4520     else:
4521       dev_pstatus = None
4522
4523     if dev.dev_type in constants.LDS_DRBD:
4524       # we change the snode then (otherwise we use the one passed in)
4525       if dev.logical_id[0] == instance.primary_node:
4526         snode = dev.logical_id[1]
4527       else:
4528         snode = dev.logical_id[0]
4529
4530     if snode and not static:
4531       self.cfg.SetDiskID(dev, snode)
4532       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4533     else:
4534       dev_sstatus = None
4535
4536     if dev.children:
4537       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4538                       for child in dev.children]
4539     else:
4540       dev_children = []
4541
4542     data = {
4543       "iv_name": dev.iv_name,
4544       "dev_type": dev.dev_type,
4545       "logical_id": dev.logical_id,
4546       "physical_id": dev.physical_id,
4547       "pstatus": dev_pstatus,
4548       "sstatus": dev_sstatus,
4549       "children": dev_children,
4550       }
4551
4552     return data
4553
4554   def Exec(self, feedback_fn):
4555     """Gather and return data"""
4556     result = {}
4557
4558     cluster = self.cfg.GetClusterInfo()
4559
4560     for instance in self.wanted_instances:
4561       if not self.op.static:
4562         remote_info = self.rpc.call_instance_info(instance.primary_node,
4563                                                   instance.name,
4564                                                   instance.hypervisor)
4565         if remote_info and "state" in remote_info:
4566           remote_state = "up"
4567         else:
4568           remote_state = "down"
4569       else:
4570         remote_state = None
4571       if instance.status == "down":
4572         config_state = "down"
4573       else:
4574         config_state = "up"
4575
4576       disks = [self._ComputeDiskStatus(instance, None, device)
4577                for device in instance.disks]
4578
4579       idict = {
4580         "name": instance.name,
4581         "config_state": config_state,
4582         "run_state": remote_state,
4583         "pnode": instance.primary_node,
4584         "snodes": instance.secondary_nodes,
4585         "os": instance.os,
4586         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4587         "disks": disks,
4588         "hypervisor": instance.hypervisor,
4589         "network_port": instance.network_port,
4590         "hv_instance": instance.hvparams,
4591         "hv_actual": cluster.FillHV(instance),
4592         "be_instance": instance.beparams,
4593         "be_actual": cluster.FillBE(instance),
4594         }
4595
4596       result[instance.name] = idict
4597
4598     return result
4599
4600
4601 class LUSetInstanceParams(LogicalUnit):
4602   """Modifies an instances's parameters.
4603
4604   """
4605   HPATH = "instance-modify"
4606   HTYPE = constants.HTYPE_INSTANCE
4607   _OP_REQP = ["instance_name", "hvparams"]
4608   REQ_BGL = False
4609
4610   def ExpandNames(self):
4611     self._ExpandAndLockInstance()
4612     self.needed_locks[locking.LEVEL_NODE] = []
4613     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4614
4615
4616   def DeclareLocks(self, level):
4617     if level == locking.LEVEL_NODE:
4618       self._LockInstancesNodes()
4619
4620   def BuildHooksEnv(self):
4621     """Build hooks env.
4622
4623     This runs on the master, primary and secondaries.
4624
4625     """
4626     args = dict()
4627     if constants.BE_MEMORY in self.be_new:
4628       args['memory'] = self.be_new[constants.BE_MEMORY]
4629     if constants.BE_VCPUS in self.be_new:
4630       args['vcpus'] = self.be_new[constants.BE_VCPUS]
4631     if self.do_ip or self.do_bridge or self.mac:
4632       if self.do_ip:
4633         ip = self.ip
4634       else:
4635         ip = self.instance.nics[0].ip
4636       if self.bridge:
4637         bridge = self.bridge
4638       else:
4639         bridge = self.instance.nics[0].bridge
4640       if self.mac:
4641         mac = self.mac
4642       else:
4643         mac = self.instance.nics[0].mac
4644       args['nics'] = [(ip, bridge, mac)]
4645     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4646     nl = [self.cfg.GetMasterNode(),
4647           self.instance.primary_node] + list(self.instance.secondary_nodes)
4648     return env, nl, nl
4649
4650   def CheckPrereq(self):
4651     """Check prerequisites.
4652
4653     This only checks the instance list against the existing names.
4654
4655     """
4656     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4657     # a separate CheckArguments function, if we implement one, so the operation
4658     # can be aborted without waiting for any lock, should it have an error...
4659     self.ip = getattr(self.op, "ip", None)
4660     self.mac = getattr(self.op, "mac", None)
4661     self.bridge = getattr(self.op, "bridge", None)
4662     self.kernel_path = getattr(self.op, "kernel_path", None)
4663     self.initrd_path = getattr(self.op, "initrd_path", None)
4664     self.force = getattr(self.op, "force", None)
4665     all_parms = [self.ip, self.bridge, self.mac]
4666     if (all_parms.count(None) == len(all_parms) and
4667         not self.op.hvparams and
4668         not self.op.beparams):
4669       raise errors.OpPrereqError("No changes submitted")
4670     for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4671       val = self.op.beparams.get(item, None)
4672       if val is not None:
4673         try:
4674           val = int(val)
4675         except ValueError, err:
4676           raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4677         self.op.beparams[item] = val
4678     if self.ip is not None:
4679       self.do_ip = True
4680       if self.ip.lower() == "none":
4681         self.ip = None
4682       else:
4683         if not utils.IsValidIP(self.ip):
4684           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4685     else:
4686       self.do_ip = False
4687     self.do_bridge = (self.bridge is not None)
4688     if self.mac is not None:
4689       if self.cfg.IsMacInUse(self.mac):
4690         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4691                                    self.mac)
4692       if not utils.IsValidMac(self.mac):
4693         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4694
4695     # checking the new params on the primary/secondary nodes
4696
4697     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4698     assert self.instance is not None, \
4699       "Cannot retrieve locked instance %s" % self.op.instance_name
4700     pnode = self.instance.primary_node
4701     nodelist = [pnode]
4702     nodelist.extend(instance.secondary_nodes)
4703
4704     # hvparams processing
4705     if self.op.hvparams:
4706       i_hvdict = copy.deepcopy(instance.hvparams)
4707       for key, val in self.op.hvparams.iteritems():
4708         if val is None:
4709           try:
4710             del i_hvdict[key]
4711           except KeyError:
4712             pass
4713         else:
4714           i_hvdict[key] = val
4715       cluster = self.cfg.GetClusterInfo()
4716       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4717                                 i_hvdict)
4718       # local check
4719       hypervisor.GetHypervisor(
4720         instance.hypervisor).CheckParameterSyntax(hv_new)
4721       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4722       self.hv_new = hv_new # the new actual values
4723       self.hv_inst = i_hvdict # the new dict (without defaults)
4724     else:
4725       self.hv_new = self.hv_inst = {}
4726
4727     # beparams processing
4728     if self.op.beparams:
4729       i_bedict = copy.deepcopy(instance.beparams)
4730       for key, val in self.op.beparams.iteritems():
4731         if val is None:
4732           try:
4733             del i_bedict[key]
4734           except KeyError:
4735             pass
4736         else:
4737           i_bedict[key] = val
4738       cluster = self.cfg.GetClusterInfo()
4739       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4740                                 i_bedict)
4741       self.be_new = be_new # the new actual values
4742       self.be_inst = i_bedict # the new dict (without defaults)
4743     else:
4744       self.hv_new = self.hv_inst = {}
4745
4746     self.warn = []
4747
4748     if constants.BE_MEMORY in self.op.beparams and not self.force:
4749       mem_check_list = [pnode]
4750       if be_new[constants.BE_AUTO_BALANCE]:
4751         # either we changed auto_balance to yes or it was from before
4752         mem_check_list.extend(instance.secondary_nodes)
4753       instance_info = self.rpc.call_instance_info(pnode, instance.name,
4754                                                   instance.hypervisor)
4755       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4756                                          instance.hypervisor)
4757
4758       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4759         # Assume the primary node is unreachable and go ahead
4760         self.warn.append("Can't get info from primary node %s" % pnode)
4761       else:
4762         if instance_info:
4763           current_mem = instance_info['memory']
4764         else:
4765           # Assume instance not running
4766           # (there is a slight race condition here, but it's not very probable,
4767           # and we have no other way to check)
4768           current_mem = 0
4769         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4770                     nodeinfo[pnode]['memory_free'])
4771         if miss_mem > 0:
4772           raise errors.OpPrereqError("This change will prevent the instance"
4773                                      " from starting, due to %d MB of memory"
4774                                      " missing on its primary node" % miss_mem)
4775
4776       if be_new[constants.BE_AUTO_BALANCE]:
4777         for node in instance.secondary_nodes:
4778           if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4779             self.warn.append("Can't get info from secondary node %s" % node)
4780           elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4781             self.warn.append("Not enough memory to failover instance to"
4782                              " secondary node %s" % node)
4783
4784     return
4785
4786   def Exec(self, feedback_fn):
4787     """Modifies an instance.
4788
4789     All parameters take effect only at the next restart of the instance.
4790     """
4791     # Process here the warnings from CheckPrereq, as we don't have a
4792     # feedback_fn there.
4793     for warn in self.warn:
4794       feedback_fn("WARNING: %s" % warn)
4795
4796     result = []
4797     instance = self.instance
4798     if self.do_ip:
4799       instance.nics[0].ip = self.ip
4800       result.append(("ip", self.ip))
4801     if self.bridge:
4802       instance.nics[0].bridge = self.bridge
4803       result.append(("bridge", self.bridge))
4804     if self.mac:
4805       instance.nics[0].mac = self.mac
4806       result.append(("mac", self.mac))
4807     if self.op.hvparams:
4808       instance.hvparams = self.hv_new
4809       for key, val in self.op.hvparams.iteritems():
4810         result.append(("hv/%s" % key, val))
4811     if self.op.beparams:
4812       instance.beparams = self.be_inst
4813       for key, val in self.op.beparams.iteritems():
4814         result.append(("be/%s" % key, val))
4815
4816     self.cfg.Update(instance)
4817
4818     return result
4819
4820
4821 class LUQueryExports(NoHooksLU):
4822   """Query the exports list
4823
4824   """
4825   _OP_REQP = ['nodes']
4826   REQ_BGL = False
4827
4828   def ExpandNames(self):
4829     self.needed_locks = {}
4830     self.share_locks[locking.LEVEL_NODE] = 1
4831     if not self.op.nodes:
4832       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4833     else:
4834       self.needed_locks[locking.LEVEL_NODE] = \
4835         _GetWantedNodes(self, self.op.nodes)
4836
4837   def CheckPrereq(self):
4838     """Check prerequisites.
4839
4840     """
4841     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4842
4843   def Exec(self, feedback_fn):
4844     """Compute the list of all the exported system images.
4845
4846     Returns:
4847       a dictionary with the structure node->(export-list)
4848       where export-list is a list of the instances exported on
4849       that node.
4850
4851     """
4852     return self.rpc.call_export_list(self.nodes)
4853
4854
4855 class LUExportInstance(LogicalUnit):
4856   """Export an instance to an image in the cluster.
4857
4858   """
4859   HPATH = "instance-export"
4860   HTYPE = constants.HTYPE_INSTANCE
4861   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4862   REQ_BGL = False
4863
4864   def ExpandNames(self):
4865     self._ExpandAndLockInstance()
4866     # FIXME: lock only instance primary and destination node
4867     #
4868     # Sad but true, for now we have do lock all nodes, as we don't know where
4869     # the previous export might be, and and in this LU we search for it and
4870     # remove it from its current node. In the future we could fix this by:
4871     #  - making a tasklet to search (share-lock all), then create the new one,
4872     #    then one to remove, after
4873     #  - removing the removal operation altoghether
4874     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4875
4876   def DeclareLocks(self, level):
4877     """Last minute lock declaration."""
4878     # All nodes are locked anyway, so nothing to do here.
4879
4880   def BuildHooksEnv(self):
4881     """Build hooks env.
4882
4883     This will run on the master, primary node and target node.
4884
4885     """
4886     env = {
4887       "EXPORT_NODE": self.op.target_node,
4888       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4889       }
4890     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4891     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4892           self.op.target_node]
4893     return env, nl, nl
4894
4895   def CheckPrereq(self):
4896     """Check prerequisites.
4897
4898     This checks that the instance and node names are valid.
4899
4900     """
4901     instance_name = self.op.instance_name
4902     self.instance = self.cfg.GetInstanceInfo(instance_name)
4903     assert self.instance is not None, \
4904           "Cannot retrieve locked instance %s" % self.op.instance_name
4905
4906     self.dst_node = self.cfg.GetNodeInfo(
4907       self.cfg.ExpandNodeName(self.op.target_node))
4908
4909     assert self.dst_node is not None, \
4910           "Cannot retrieve locked node %s" % self.op.target_node
4911
4912     # instance disk type verification
4913     for disk in self.instance.disks:
4914       if disk.dev_type == constants.LD_FILE:
4915         raise errors.OpPrereqError("Export not supported for instances with"
4916                                    " file-based disks")
4917
4918   def Exec(self, feedback_fn):
4919     """Export an instance to an image in the cluster.
4920
4921     """
4922     instance = self.instance
4923     dst_node = self.dst_node
4924     src_node = instance.primary_node
4925     if self.op.shutdown:
4926       # shutdown the instance, but not the disks
4927       if not self.rpc.call_instance_shutdown(src_node, instance):
4928         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4929                                  (instance.name, src_node))
4930
4931     vgname = self.cfg.GetVGName()
4932
4933     snap_disks = []
4934
4935     try:
4936       for disk in instance.disks:
4937         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4938         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4939
4940         if not new_dev_name:
4941           self.LogWarning("Could not snapshot block device %s on node %s",
4942                           disk.logical_id[1], src_node)
4943           snap_disks.append(False)
4944         else:
4945           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4946                                  logical_id=(vgname, new_dev_name),
4947                                  physical_id=(vgname, new_dev_name),
4948                                  iv_name=disk.iv_name)
4949           snap_disks.append(new_dev)
4950
4951     finally:
4952       if self.op.shutdown and instance.status == "up":
4953         if not self.rpc.call_instance_start(src_node, instance, None):
4954           _ShutdownInstanceDisks(self, instance)
4955           raise errors.OpExecError("Could not start instance")
4956
4957     # TODO: check for size
4958
4959     cluster_name = self.cfg.GetClusterName()
4960     for idx, dev in enumerate(snap_disks):
4961       if dev:
4962         if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4963                                              instance, cluster_name, idx):
4964           self.LogWarning("Could not export block device %s from node %s to"
4965                           " node %s", dev.logical_id[1], src_node,
4966                           dst_node.name)
4967         if not self.rpc.call_blockdev_remove(src_node, dev):
4968           self.LogWarning("Could not remove snapshot block device %s from node"
4969                           " %s", dev.logical_id[1], src_node)
4970
4971     if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4972       self.LogWarning("Could not finalize export for instance %s on node %s",
4973                       instance.name, dst_node.name)
4974
4975     nodelist = self.cfg.GetNodeList()
4976     nodelist.remove(dst_node.name)
4977
4978     # on one-node clusters nodelist will be empty after the removal
4979     # if we proceed the backup would be removed because OpQueryExports
4980     # substitutes an empty list with the full cluster node list.
4981     if nodelist:
4982       exportlist = self.rpc.call_export_list(nodelist)
4983       for node in exportlist:
4984         if instance.name in exportlist[node]:
4985           if not self.rpc.call_export_remove(node, instance.name):
4986             self.LogWarning("Could not remove older export for instance %s"
4987                             " on node %s", instance.name, node)
4988
4989
4990 class LURemoveExport(NoHooksLU):
4991   """Remove exports related to the named instance.
4992
4993   """
4994   _OP_REQP = ["instance_name"]
4995   REQ_BGL = False
4996
4997   def ExpandNames(self):
4998     self.needed_locks = {}
4999     # We need all nodes to be locked in order for RemoveExport to work, but we
5000     # don't need to lock the instance itself, as nothing will happen to it (and
5001     # we can remove exports also for a removed instance)
5002     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5003
5004   def CheckPrereq(self):
5005     """Check prerequisites.
5006     """
5007     pass
5008
5009   def Exec(self, feedback_fn):
5010     """Remove any export.
5011
5012     """
5013     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5014     # If the instance was not found we'll try with the name that was passed in.
5015     # This will only work if it was an FQDN, though.
5016     fqdn_warn = False
5017     if not instance_name:
5018       fqdn_warn = True
5019       instance_name = self.op.instance_name
5020
5021     exportlist = self.rpc.call_export_list(self.acquired_locks[
5022       locking.LEVEL_NODE])
5023     found = False
5024     for node in exportlist:
5025       if instance_name in exportlist[node]:
5026         found = True
5027         if not self.rpc.call_export_remove(node, instance_name):
5028           logging.error("Could not remove export for instance %s"
5029                         " on node %s", instance_name, node)
5030
5031     if fqdn_warn and not found:
5032       feedback_fn("Export not found. If trying to remove an export belonging"
5033                   " to a deleted instance please use its Fully Qualified"
5034                   " Domain Name.")
5035
5036
5037 class TagsLU(NoHooksLU):
5038   """Generic tags LU.
5039
5040   This is an abstract class which is the parent of all the other tags LUs.
5041
5042   """
5043
5044   def ExpandNames(self):
5045     self.needed_locks = {}
5046     if self.op.kind == constants.TAG_NODE:
5047       name = self.cfg.ExpandNodeName(self.op.name)
5048       if name is None:
5049         raise errors.OpPrereqError("Invalid node name (%s)" %
5050                                    (self.op.name,))
5051       self.op.name = name
5052       self.needed_locks[locking.LEVEL_NODE] = name
5053     elif self.op.kind == constants.TAG_INSTANCE:
5054       name = self.cfg.ExpandInstanceName(self.op.name)
5055       if name is None:
5056         raise errors.OpPrereqError("Invalid instance name (%s)" %
5057                                    (self.op.name,))
5058       self.op.name = name
5059       self.needed_locks[locking.LEVEL_INSTANCE] = name
5060
5061   def CheckPrereq(self):
5062     """Check prerequisites.
5063
5064     """
5065     if self.op.kind == constants.TAG_CLUSTER:
5066       self.target = self.cfg.GetClusterInfo()
5067     elif self.op.kind == constants.TAG_NODE:
5068       self.target = self.cfg.GetNodeInfo(self.op.name)
5069     elif self.op.kind == constants.TAG_INSTANCE:
5070       self.target = self.cfg.GetInstanceInfo(self.op.name)
5071     else:
5072       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5073                                  str(self.op.kind))
5074
5075
5076 class LUGetTags(TagsLU):
5077   """Returns the tags of a given object.
5078
5079   """
5080   _OP_REQP = ["kind", "name"]
5081   REQ_BGL = False
5082
5083   def Exec(self, feedback_fn):
5084     """Returns the tag list.
5085
5086     """
5087     return list(self.target.GetTags())
5088
5089
5090 class LUSearchTags(NoHooksLU):
5091   """Searches the tags for a given pattern.
5092
5093   """
5094   _OP_REQP = ["pattern"]
5095   REQ_BGL = False
5096
5097   def ExpandNames(self):
5098     self.needed_locks = {}
5099
5100   def CheckPrereq(self):
5101     """Check prerequisites.
5102
5103     This checks the pattern passed for validity by compiling it.
5104
5105     """
5106     try:
5107       self.re = re.compile(self.op.pattern)
5108     except re.error, err:
5109       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5110                                  (self.op.pattern, err))
5111
5112   def Exec(self, feedback_fn):
5113     """Returns the tag list.
5114
5115     """
5116     cfg = self.cfg
5117     tgts = [("/cluster", cfg.GetClusterInfo())]
5118     ilist = cfg.GetAllInstancesInfo().values()
5119     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5120     nlist = cfg.GetAllNodesInfo().values()
5121     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5122     results = []
5123     for path, target in tgts:
5124       for tag in target.GetTags():
5125         if self.re.search(tag):
5126           results.append((path, tag))
5127     return results
5128
5129
5130 class LUAddTags(TagsLU):
5131   """Sets a tag on a given object.
5132
5133   """
5134   _OP_REQP = ["kind", "name", "tags"]
5135   REQ_BGL = False
5136
5137   def CheckPrereq(self):
5138     """Check prerequisites.
5139
5140     This checks the type and length of the tag name and value.
5141
5142     """
5143     TagsLU.CheckPrereq(self)
5144     for tag in self.op.tags:
5145       objects.TaggableObject.ValidateTag(tag)
5146
5147   def Exec(self, feedback_fn):
5148     """Sets the tag.
5149
5150     """
5151     try:
5152       for tag in self.op.tags:
5153         self.target.AddTag(tag)
5154     except errors.TagError, err:
5155       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5156     try:
5157       self.cfg.Update(self.target)
5158     except errors.ConfigurationError:
5159       raise errors.OpRetryError("There has been a modification to the"
5160                                 " config file and the operation has been"
5161                                 " aborted. Please retry.")
5162
5163
5164 class LUDelTags(TagsLU):
5165   """Delete a list of tags from a given object.
5166
5167   """
5168   _OP_REQP = ["kind", "name", "tags"]
5169   REQ_BGL = False
5170
5171   def CheckPrereq(self):
5172     """Check prerequisites.
5173
5174     This checks that we have the given tag.
5175
5176     """
5177     TagsLU.CheckPrereq(self)
5178     for tag in self.op.tags:
5179       objects.TaggableObject.ValidateTag(tag)
5180     del_tags = frozenset(self.op.tags)
5181     cur_tags = self.target.GetTags()
5182     if not del_tags <= cur_tags:
5183       diff_tags = del_tags - cur_tags
5184       diff_names = ["'%s'" % tag for tag in diff_tags]
5185       diff_names.sort()
5186       raise errors.OpPrereqError("Tag(s) %s not found" %
5187                                  (",".join(diff_names)))
5188
5189   def Exec(self, feedback_fn):
5190     """Remove the tag from the object.
5191
5192     """
5193     for tag in self.op.tags:
5194       self.target.RemoveTag(tag)
5195     try:
5196       self.cfg.Update(self.target)
5197     except errors.ConfigurationError:
5198       raise errors.OpRetryError("There has been a modification to the"
5199                                 " config file and the operation has been"
5200                                 " aborted. Please retry.")
5201
5202
5203 class LUTestDelay(NoHooksLU):
5204   """Sleep for a specified amount of time.
5205
5206   This LU sleeps on the master and/or nodes for a specified amount of
5207   time.
5208
5209   """
5210   _OP_REQP = ["duration", "on_master", "on_nodes"]
5211   REQ_BGL = False
5212
5213   def ExpandNames(self):
5214     """Expand names and set required locks.
5215
5216     This expands the node list, if any.
5217
5218     """
5219     self.needed_locks = {}
5220     if self.op.on_nodes:
5221       # _GetWantedNodes can be used here, but is not always appropriate to use
5222       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5223       # more information.
5224       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5225       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5226
5227   def CheckPrereq(self):
5228     """Check prerequisites.
5229
5230     """
5231
5232   def Exec(self, feedback_fn):
5233     """Do the actual sleep.
5234
5235     """
5236     if self.op.on_master:
5237       if not utils.TestDelay(self.op.duration):
5238         raise errors.OpExecError("Error during master delay test")
5239     if self.op.on_nodes:
5240       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5241       if not result:
5242         raise errors.OpExecError("Complete failure from rpc call")
5243       for node, node_result in result.items():
5244         if not node_result:
5245           raise errors.OpExecError("Failure during rpc call to node %s,"
5246                                    " result: %s" % (node, node_result))
5247
5248
5249 class IAllocator(object):
5250   """IAllocator framework.
5251
5252   An IAllocator instance has three sets of attributes:
5253     - cfg that is needed to query the cluster
5254     - input data (all members of the _KEYS class attribute are required)
5255     - four buffer attributes (in|out_data|text), that represent the
5256       input (to the external script) in text and data structure format,
5257       and the output from it, again in two formats
5258     - the result variables from the script (success, info, nodes) for
5259       easy usage
5260
5261   """
5262   _ALLO_KEYS = [
5263     "mem_size", "disks", "disk_template",
5264     "os", "tags", "nics", "vcpus",
5265     ]
5266   _RELO_KEYS = [
5267     "relocate_from",
5268     ]
5269
5270   def __init__(self, lu, mode, name, **kwargs):
5271     self.lu = lu
5272     # init buffer variables
5273     self.in_text = self.out_text = self.in_data = self.out_data = None
5274     # init all input fields so that pylint is happy
5275     self.mode = mode
5276     self.name = name
5277     self.mem_size = self.disks = self.disk_template = None
5278     self.os = self.tags = self.nics = self.vcpus = None
5279     self.relocate_from = None
5280     # computed fields
5281     self.required_nodes = None
5282     # init result fields
5283     self.success = self.info = self.nodes = None
5284     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5285       keyset = self._ALLO_KEYS
5286     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5287       keyset = self._RELO_KEYS
5288     else:
5289       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5290                                    " IAllocator" % self.mode)
5291     for key in kwargs:
5292       if key not in keyset:
5293         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5294                                      " IAllocator" % key)
5295       setattr(self, key, kwargs[key])
5296     for key in keyset:
5297       if key not in kwargs:
5298         raise errors.ProgrammerError("Missing input parameter '%s' to"
5299                                      " IAllocator" % key)
5300     self._BuildInputData()
5301
5302   def _ComputeClusterData(self):
5303     """Compute the generic allocator input data.
5304
5305     This is the data that is independent of the actual operation.
5306
5307     """
5308     cfg = self.lu.cfg
5309     cluster_info = cfg.GetClusterInfo()
5310     # cluster data
5311     data = {
5312       "version": 1,
5313       "cluster_name": cfg.GetClusterName(),
5314       "cluster_tags": list(cluster_info.GetTags()),
5315       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5316       # we don't have job IDs
5317       }
5318
5319     i_list = []
5320     cluster = self.cfg.GetClusterInfo()
5321     for iname in cfg.GetInstanceList():
5322       i_obj = cfg.GetInstanceInfo(iname)
5323       i_list.append((i_obj, cluster.FillBE(i_obj)))
5324
5325     # node data
5326     node_results = {}
5327     node_list = cfg.GetNodeList()
5328     # FIXME: here we have only one hypervisor information, but
5329     # instance can belong to different hypervisors
5330     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5331                                            cfg.GetHypervisorType())
5332     for nname in node_list:
5333       ninfo = cfg.GetNodeInfo(nname)
5334       if nname not in node_data or not isinstance(node_data[nname], dict):
5335         raise errors.OpExecError("Can't get data for node %s" % nname)
5336       remote_info = node_data[nname]
5337       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5338                    'vg_size', 'vg_free', 'cpu_total']:
5339         if attr not in remote_info:
5340           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5341                                    (nname, attr))
5342         try:
5343           remote_info[attr] = int(remote_info[attr])
5344         except ValueError, err:
5345           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5346                                    " %s" % (nname, attr, str(err)))
5347       # compute memory used by primary instances
5348       i_p_mem = i_p_up_mem = 0
5349       for iinfo, beinfo in i_list:
5350         if iinfo.primary_node == nname:
5351           i_p_mem += beinfo[constants.BE_MEMORY]
5352           if iinfo.status == "up":
5353             i_p_up_mem += beinfo[constants.BE_MEMORY]
5354
5355       # compute memory used by instances
5356       pnr = {
5357         "tags": list(ninfo.GetTags()),
5358         "total_memory": remote_info['memory_total'],
5359         "reserved_memory": remote_info['memory_dom0'],
5360         "free_memory": remote_info['memory_free'],
5361         "i_pri_memory": i_p_mem,
5362         "i_pri_up_memory": i_p_up_mem,
5363         "total_disk": remote_info['vg_size'],
5364         "free_disk": remote_info['vg_free'],
5365         "primary_ip": ninfo.primary_ip,
5366         "secondary_ip": ninfo.secondary_ip,
5367         "total_cpus": remote_info['cpu_total'],
5368         }
5369       node_results[nname] = pnr
5370     data["nodes"] = node_results
5371
5372     # instance data
5373     instance_data = {}
5374     for iinfo, beinfo in i_list:
5375       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5376                   for n in iinfo.nics]
5377       pir = {
5378         "tags": list(iinfo.GetTags()),
5379         "should_run": iinfo.status == "up",
5380         "vcpus": beinfo[constants.BE_VCPUS],
5381         "memory": beinfo[constants.BE_MEMORY],
5382         "os": iinfo.os,
5383         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5384         "nics": nic_data,
5385         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5386         "disk_template": iinfo.disk_template,
5387         "hypervisor": iinfo.hypervisor,
5388         }
5389       instance_data[iinfo.name] = pir
5390
5391     data["instances"] = instance_data
5392
5393     self.in_data = data
5394
5395   def _AddNewInstance(self):
5396     """Add new instance data to allocator structure.
5397
5398     This in combination with _AllocatorGetClusterData will create the
5399     correct structure needed as input for the allocator.
5400
5401     The checks for the completeness of the opcode must have already been
5402     done.
5403
5404     """
5405     data = self.in_data
5406     if len(self.disks) != 2:
5407       raise errors.OpExecError("Only two-disk configurations supported")
5408
5409     disk_space = _ComputeDiskSize(self.disk_template,
5410                                   self.disks[0]["size"], self.disks[1]["size"])
5411
5412     if self.disk_template in constants.DTS_NET_MIRROR:
5413       self.required_nodes = 2
5414     else:
5415       self.required_nodes = 1
5416     request = {
5417       "type": "allocate",
5418       "name": self.name,
5419       "disk_template": self.disk_template,
5420       "tags": self.tags,
5421       "os": self.os,
5422       "vcpus": self.vcpus,
5423       "memory": self.mem_size,
5424       "disks": self.disks,
5425       "disk_space_total": disk_space,
5426       "nics": self.nics,
5427       "required_nodes": self.required_nodes,
5428       }
5429     data["request"] = request
5430
5431   def _AddRelocateInstance(self):
5432     """Add relocate instance data to allocator structure.
5433
5434     This in combination with _IAllocatorGetClusterData will create the
5435     correct structure needed as input for the allocator.
5436
5437     The checks for the completeness of the opcode must have already been
5438     done.
5439
5440     """
5441     instance = self.lu.cfg.GetInstanceInfo(self.name)
5442     if instance is None:
5443       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5444                                    " IAllocator" % self.name)
5445
5446     if instance.disk_template not in constants.DTS_NET_MIRROR:
5447       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5448
5449     if len(instance.secondary_nodes) != 1:
5450       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5451
5452     self.required_nodes = 1
5453
5454     disk_space = _ComputeDiskSize(instance.disk_template,
5455                                   instance.disks[0].size,
5456                                   instance.disks[1].size)
5457
5458     request = {
5459       "type": "relocate",
5460       "name": self.name,
5461       "disk_space_total": disk_space,
5462       "required_nodes": self.required_nodes,
5463       "relocate_from": self.relocate_from,
5464       }
5465     self.in_data["request"] = request
5466
5467   def _BuildInputData(self):
5468     """Build input data structures.
5469
5470     """
5471     self._ComputeClusterData()
5472
5473     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5474       self._AddNewInstance()
5475     else:
5476       self._AddRelocateInstance()
5477
5478     self.in_text = serializer.Dump(self.in_data)
5479
5480   def Run(self, name, validate=True, call_fn=None):
5481     """Run an instance allocator and return the results.
5482
5483     """
5484     if call_fn is None:
5485       call_fn = self.lu.rpc.call_iallocator_runner
5486     data = self.in_text
5487
5488     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5489
5490     if not isinstance(result, (list, tuple)) or len(result) != 4:
5491       raise errors.OpExecError("Invalid result from master iallocator runner")
5492
5493     rcode, stdout, stderr, fail = result
5494
5495     if rcode == constants.IARUN_NOTFOUND:
5496       raise errors.OpExecError("Can't find allocator '%s'" % name)
5497     elif rcode == constants.IARUN_FAILURE:
5498       raise errors.OpExecError("Instance allocator call failed: %s,"
5499                                " output: %s" % (fail, stdout+stderr))
5500     self.out_text = stdout
5501     if validate:
5502       self._ValidateResult()
5503
5504   def _ValidateResult(self):
5505     """Process the allocator results.
5506
5507     This will process and if successful save the result in
5508     self.out_data and the other parameters.
5509
5510     """
5511     try:
5512       rdict = serializer.Load(self.out_text)
5513     except Exception, err:
5514       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5515
5516     if not isinstance(rdict, dict):
5517       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5518
5519     for key in "success", "info", "nodes":
5520       if key not in rdict:
5521         raise errors.OpExecError("Can't parse iallocator results:"
5522                                  " missing key '%s'" % key)
5523       setattr(self, key, rdict[key])
5524
5525     if not isinstance(rdict["nodes"], list):
5526       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5527                                " is not a list")
5528     self.out_data = rdict
5529
5530
5531 class LUTestAllocator(NoHooksLU):
5532   """Run allocator tests.
5533
5534   This LU runs the allocator tests
5535
5536   """
5537   _OP_REQP = ["direction", "mode", "name"]
5538
5539   def CheckPrereq(self):
5540     """Check prerequisites.
5541
5542     This checks the opcode parameters depending on the director and mode test.
5543
5544     """
5545     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5546       for attr in ["name", "mem_size", "disks", "disk_template",
5547                    "os", "tags", "nics", "vcpus"]:
5548         if not hasattr(self.op, attr):
5549           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5550                                      attr)
5551       iname = self.cfg.ExpandInstanceName(self.op.name)
5552       if iname is not None:
5553         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5554                                    iname)
5555       if not isinstance(self.op.nics, list):
5556         raise errors.OpPrereqError("Invalid parameter 'nics'")
5557       for row in self.op.nics:
5558         if (not isinstance(row, dict) or
5559             "mac" not in row or
5560             "ip" not in row or
5561             "bridge" not in row):
5562           raise errors.OpPrereqError("Invalid contents of the"
5563                                      " 'nics' parameter")
5564       if not isinstance(self.op.disks, list):
5565         raise errors.OpPrereqError("Invalid parameter 'disks'")
5566       if len(self.op.disks) != 2:
5567         raise errors.OpPrereqError("Only two-disk configurations supported")
5568       for row in self.op.disks:
5569         if (not isinstance(row, dict) or
5570             "size" not in row or
5571             not isinstance(row["size"], int) or
5572             "mode" not in row or
5573             row["mode"] not in ['r', 'w']):
5574           raise errors.OpPrereqError("Invalid contents of the"
5575                                      " 'disks' parameter")
5576     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5577       if not hasattr(self.op, "name"):
5578         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5579       fname = self.cfg.ExpandInstanceName(self.op.name)
5580       if fname is None:
5581         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5582                                    self.op.name)
5583       self.op.name = fname
5584       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5585     else:
5586       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5587                                  self.op.mode)
5588
5589     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5590       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5591         raise errors.OpPrereqError("Missing allocator name")
5592     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5593       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5594                                  self.op.direction)
5595
5596   def Exec(self, feedback_fn):
5597     """Run the allocator test.
5598
5599     """
5600     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5601       ial = IAllocator(self,
5602                        mode=self.op.mode,
5603                        name=self.op.name,
5604                        mem_size=self.op.mem_size,
5605                        disks=self.op.disks,
5606                        disk_template=self.op.disk_template,
5607                        os=self.op.os,
5608                        tags=self.op.tags,
5609                        nics=self.op.nics,
5610                        vcpus=self.op.vcpus,
5611                        )
5612     else:
5613       ial = IAllocator(self,
5614                        mode=self.op.mode,
5615                        name=self.op.name,
5616                        relocate_from=list(self.relocate_from),
5617                        )
5618
5619     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5620       result = ial.in_text
5621     else:
5622       ial.Run(self.op.allocator, validate=False)
5623       result = ial.out_text
5624     return result