Allow querying of variable number of parameters
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import itertools
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_MASTER: the LU needs to run on the master node
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_MASTER = True
68   REQ_BGL = True
69
70   def __init__(self, processor, op, context, rpc):
71     """Constructor for LogicalUnit.
72
73     This needs to be overriden in derived classes in order to check op
74     validity.
75
76     """
77     self.proc = processor
78     self.op = op
79     self.cfg = context.cfg
80     self.context = context
81     self.rpc = rpc
82     # Dicts used to declare locking needs to mcpu
83     self.needed_locks = None
84     self.acquired_locks = {}
85     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86     self.add_locks = {}
87     self.remove_locks = {}
88     # Used to force good behavior when calling helper functions
89     self.recalculate_locks = {}
90     self.__ssh = None
91     # logging
92     self.LogWarning = processor.LogWarning
93     self.LogInfo = processor.LogInfo
94
95     for attr_name in self._OP_REQP:
96       attr_val = getattr(op, attr_name, None)
97       if attr_val is None:
98         raise errors.OpPrereqError("Required parameter '%s' missing" %
99                                    attr_name)
100
101     if not self.cfg.IsCluster():
102       raise errors.OpPrereqError("Cluster not initialized yet,"
103                                  " use 'gnt-cluster init' first.")
104     if self.REQ_MASTER:
105       master = self.cfg.GetMasterNode()
106       if master != utils.HostInfo().name:
107         raise errors.OpPrereqError("Commands must be run on the master"
108                                    " node %s" % master)
109
110   def __GetSSH(self):
111     """Returns the SshRunner object
112
113     """
114     if not self.__ssh:
115       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
116     return self.__ssh
117
118   ssh = property(fget=__GetSSH)
119
120   def ExpandNames(self):
121     """Expand names for this LU.
122
123     This method is called before starting to execute the opcode, and it should
124     update all the parameters of the opcode to their canonical form (e.g. a
125     short node name must be fully expanded after this method has successfully
126     completed). This way locking, hooks, logging, ecc. can work correctly.
127
128     LUs which implement this method must also populate the self.needed_locks
129     member, as a dict with lock levels as keys, and a list of needed lock names
130     as values. Rules:
131       - Use an empty dict if you don't need any lock
132       - If you don't need any lock at a particular level omit that level
133       - Don't put anything for the BGL level
134       - If you want all locks at a level use locking.ALL_SET as a value
135
136     If you need to share locks (rather than acquire them exclusively) at one
137     level you can modify self.share_locks, setting a true value (usually 1) for
138     that level. By default locks are not shared.
139
140     Examples:
141     # Acquire all nodes and one instance
142     self.needed_locks = {
143       locking.LEVEL_NODE: locking.ALL_SET,
144       locking.LEVEL_INSTANCE: ['instance1.example.tld'],
145     }
146     # Acquire just two nodes
147     self.needed_locks = {
148       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
149     }
150     # Acquire no locks
151     self.needed_locks = {} # No, you can't leave it to the default value None
152
153     """
154     # The implementation of this method is mandatory only if the new LU is
155     # concurrent, so that old LUs don't need to be changed all at the same
156     # time.
157     if self.REQ_BGL:
158       self.needed_locks = {} # Exclusive LUs don't need locks.
159     else:
160       raise NotImplementedError
161
162   def DeclareLocks(self, level):
163     """Declare LU locking needs for a level
164
165     While most LUs can just declare their locking needs at ExpandNames time,
166     sometimes there's the need to calculate some locks after having acquired
167     the ones before. This function is called just before acquiring locks at a
168     particular level, but after acquiring the ones at lower levels, and permits
169     such calculations. It can be used to modify self.needed_locks, and by
170     default it does nothing.
171
172     This function is only called if you have something already set in
173     self.needed_locks for the level.
174
175     @param level: Locking level which is going to be locked
176     @type level: member of ganeti.locking.LEVELS
177
178     """
179
180   def CheckPrereq(self):
181     """Check prerequisites for this LU.
182
183     This method should check that the prerequisites for the execution
184     of this LU are fulfilled. It can do internode communication, but
185     it should be idempotent - no cluster or system changes are
186     allowed.
187
188     The method should raise errors.OpPrereqError in case something is
189     not fulfilled. Its return value is ignored.
190
191     This method should also update all the parameters of the opcode to
192     their canonical form if it hasn't been done by ExpandNames before.
193
194     """
195     raise NotImplementedError
196
197   def Exec(self, feedback_fn):
198     """Execute the LU.
199
200     This method should implement the actual work. It should raise
201     errors.OpExecError for failures that are somewhat dealt with in
202     code, or expected.
203
204     """
205     raise NotImplementedError
206
207   def BuildHooksEnv(self):
208     """Build hooks environment for this LU.
209
210     This method should return a three-node tuple consisting of: a dict
211     containing the environment that will be used for running the
212     specific hook for this LU, a list of node names on which the hook
213     should run before the execution, and a list of node names on which
214     the hook should run after the execution.
215
216     The keys of the dict must not have 'GANETI_' prefixed as this will
217     be handled in the hooks runner. Also note additional keys will be
218     added by the hooks runner. If the LU doesn't define any
219     environment, an empty dict (and not None) should be returned.
220
221     No nodes should be returned as an empty list (and not None).
222
223     Note that if the HPATH for a LU class is None, this function will
224     not be called.
225
226     """
227     raise NotImplementedError
228
229   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
230     """Notify the LU about the results of its hooks.
231
232     This method is called every time a hooks phase is executed, and notifies
233     the Logical Unit about the hooks' result. The LU can then use it to alter
234     its result based on the hooks.  By default the method does nothing and the
235     previous result is passed back unchanged but any LU can define it if it
236     wants to use the local cluster hook-scripts somehow.
237
238     Args:
239       phase: the hooks phase that has just been run
240       hooks_results: the results of the multi-node hooks rpc call
241       feedback_fn: function to send feedback back to the caller
242       lu_result: the previous result this LU had, or None in the PRE phase.
243
244     """
245     return lu_result
246
247   def _ExpandAndLockInstance(self):
248     """Helper function to expand and lock an instance.
249
250     Many LUs that work on an instance take its name in self.op.instance_name
251     and need to expand it and then declare the expanded name for locking. This
252     function does it, and then updates self.op.instance_name to the expanded
253     name. It also initializes needed_locks as a dict, if this hasn't been done
254     before.
255
256     """
257     if self.needed_locks is None:
258       self.needed_locks = {}
259     else:
260       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
261         "_ExpandAndLockInstance called with instance-level locks set"
262     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
263     if expanded_name is None:
264       raise errors.OpPrereqError("Instance '%s' not known" %
265                                   self.op.instance_name)
266     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
267     self.op.instance_name = expanded_name
268
269   def _LockInstancesNodes(self, primary_only=False):
270     """Helper function to declare instances' nodes for locking.
271
272     This function should be called after locking one or more instances to lock
273     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
274     with all primary or secondary nodes for instances already locked and
275     present in self.needed_locks[locking.LEVEL_INSTANCE].
276
277     It should be called from DeclareLocks, and for safety only works if
278     self.recalculate_locks[locking.LEVEL_NODE] is set.
279
280     In the future it may grow parameters to just lock some instance's nodes, or
281     to just lock primaries or secondary nodes, if needed.
282
283     If should be called in DeclareLocks in a way similar to:
284
285     if level == locking.LEVEL_NODE:
286       self._LockInstancesNodes()
287
288     @type primary_only: boolean
289     @param primary_only: only lock primary nodes of locked instances
290
291     """
292     assert locking.LEVEL_NODE in self.recalculate_locks, \
293       "_LockInstancesNodes helper function called with no nodes to recalculate"
294
295     # TODO: check if we're really been called with the instance locks held
296
297     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
298     # future we might want to have different behaviors depending on the value
299     # of self.recalculate_locks[locking.LEVEL_NODE]
300     wanted_nodes = []
301     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
302       instance = self.context.cfg.GetInstanceInfo(instance_name)
303       wanted_nodes.append(instance.primary_node)
304       if not primary_only:
305         wanted_nodes.extend(instance.secondary_nodes)
306
307     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
308       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
309     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
310       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
311
312     del self.recalculate_locks[locking.LEVEL_NODE]
313
314
315 class NoHooksLU(LogicalUnit):
316   """Simple LU which runs no hooks.
317
318   This LU is intended as a parent for other LogicalUnits which will
319   run no hooks, in order to reduce duplicate code.
320
321   """
322   HPATH = None
323   HTYPE = None
324
325
326 class _FieldSet(object):
327   """A simple field set.
328
329   Among the features are:
330     - checking if a string is among a list of static string or regex objects
331     - checking if a whole list of string matches
332     - returning the matching groups from a regex match
333
334   Internally, all fields are held as regular expression objects.
335
336   """
337   def __init__(self, *items):
338     self.items = [re.compile("^%s$" % value) for value in items]
339
340   def Extend(self, other_set):
341     """Extend the field set with the items from another one"""
342     self.items.extend(other_set.items)
343
344   def Matches(self, field):
345     """Checks if a field matches the current set
346
347     @type field: str
348     @param field: the string to match
349     @return: either False or a regular expression match object
350
351     """
352     for m in itertools.ifilter(None, (val.match(field) for val in self.items)):
353       return m
354     return False
355
356   def NonMatching(self, items):
357     """Returns the list of fields not matching the current set
358
359     @type items: list
360     @param items: the list of fields to check
361     @rtype: list
362     @return: list of non-matching fields
363
364     """
365     return [val for val in items if not self.Matches(val)]
366
367
368 def _GetWantedNodes(lu, nodes):
369   """Returns list of checked and expanded node names.
370
371   Args:
372     nodes: List of nodes (strings) or None for all
373
374   """
375   if not isinstance(nodes, list):
376     raise errors.OpPrereqError("Invalid argument type 'nodes'")
377
378   if not nodes:
379     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
380       " non-empty list of nodes whose name is to be expanded.")
381
382   wanted = []
383   for name in nodes:
384     node = lu.cfg.ExpandNodeName(name)
385     if node is None:
386       raise errors.OpPrereqError("No such node name '%s'" % name)
387     wanted.append(node)
388
389   return utils.NiceSort(wanted)
390
391
392 def _GetWantedInstances(lu, instances):
393   """Returns list of checked and expanded instance names.
394
395   Args:
396     instances: List of instances (strings) or None for all
397
398   """
399   if not isinstance(instances, list):
400     raise errors.OpPrereqError("Invalid argument type 'instances'")
401
402   if instances:
403     wanted = []
404
405     for name in instances:
406       instance = lu.cfg.ExpandInstanceName(name)
407       if instance is None:
408         raise errors.OpPrereqError("No such instance name '%s'" % name)
409       wanted.append(instance)
410
411   else:
412     wanted = lu.cfg.GetInstanceList()
413   return utils.NiceSort(wanted)
414
415
416 def _CheckOutputFields(static, dynamic, selected):
417   """Checks whether all selected fields are valid.
418
419   @type static: L{_FieldSet}
420   @param static: static fields set
421   @type dynamic: L{_FieldSet}
422   @param dynamic: dynamic fields set
423
424   """
425   f = _FieldSet()
426   f.Extend(static)
427   f.Extend(dynamic)
428
429   delta = f.NonMatching(selected)
430   if delta:
431     raise errors.OpPrereqError("Unknown output fields selected: %s"
432                                % ",".join(delta))
433
434
435 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
436                           memory, vcpus, nics):
437   """Builds instance related env variables for hooks from single variables.
438
439   Args:
440     secondary_nodes: List of secondary nodes as strings
441   """
442   env = {
443     "OP_TARGET": name,
444     "INSTANCE_NAME": name,
445     "INSTANCE_PRIMARY": primary_node,
446     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
447     "INSTANCE_OS_TYPE": os_type,
448     "INSTANCE_STATUS": status,
449     "INSTANCE_MEMORY": memory,
450     "INSTANCE_VCPUS": vcpus,
451   }
452
453   if nics:
454     nic_count = len(nics)
455     for idx, (ip, bridge, mac) in enumerate(nics):
456       if ip is None:
457         ip = ""
458       env["INSTANCE_NIC%d_IP" % idx] = ip
459       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
460       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
461   else:
462     nic_count = 0
463
464   env["INSTANCE_NIC_COUNT"] = nic_count
465
466   return env
467
468
469 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
470   """Builds instance related env variables for hooks from an object.
471
472   Args:
473     instance: objects.Instance object of instance
474     override: dict of values to override
475   """
476   bep = lu.cfg.GetClusterInfo().FillBE(instance)
477   args = {
478     'name': instance.name,
479     'primary_node': instance.primary_node,
480     'secondary_nodes': instance.secondary_nodes,
481     'os_type': instance.os,
482     'status': instance.os,
483     'memory': bep[constants.BE_MEMORY],
484     'vcpus': bep[constants.BE_VCPUS],
485     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
486   }
487   if override:
488     args.update(override)
489   return _BuildInstanceHookEnv(**args)
490
491
492 def _CheckInstanceBridgesExist(lu, instance):
493   """Check that the brigdes needed by an instance exist.
494
495   """
496   # check bridges existance
497   brlist = [nic.bridge for nic in instance.nics]
498   if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
499     raise errors.OpPrereqError("one or more target bridges %s does not"
500                                " exist on destination node '%s'" %
501                                (brlist, instance.primary_node))
502
503
504 class LUDestroyCluster(NoHooksLU):
505   """Logical unit for destroying the cluster.
506
507   """
508   _OP_REQP = []
509
510   def CheckPrereq(self):
511     """Check prerequisites.
512
513     This checks whether the cluster is empty.
514
515     Any errors are signalled by raising errors.OpPrereqError.
516
517     """
518     master = self.cfg.GetMasterNode()
519
520     nodelist = self.cfg.GetNodeList()
521     if len(nodelist) != 1 or nodelist[0] != master:
522       raise errors.OpPrereqError("There are still %d node(s) in"
523                                  " this cluster." % (len(nodelist) - 1))
524     instancelist = self.cfg.GetInstanceList()
525     if instancelist:
526       raise errors.OpPrereqError("There are still %d instance(s) in"
527                                  " this cluster." % len(instancelist))
528
529   def Exec(self, feedback_fn):
530     """Destroys the cluster.
531
532     """
533     master = self.cfg.GetMasterNode()
534     if not self.rpc.call_node_stop_master(master, False):
535       raise errors.OpExecError("Could not disable the master role")
536     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
537     utils.CreateBackup(priv_key)
538     utils.CreateBackup(pub_key)
539     return master
540
541
542 class LUVerifyCluster(LogicalUnit):
543   """Verifies the cluster status.
544
545   """
546   HPATH = "cluster-verify"
547   HTYPE = constants.HTYPE_CLUSTER
548   _OP_REQP = ["skip_checks"]
549   REQ_BGL = False
550
551   def ExpandNames(self):
552     self.needed_locks = {
553       locking.LEVEL_NODE: locking.ALL_SET,
554       locking.LEVEL_INSTANCE: locking.ALL_SET,
555     }
556     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
557
558   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
559                   remote_version, feedback_fn):
560     """Run multiple tests against a node.
561
562     Test list:
563       - compares ganeti version
564       - checks vg existance and size > 20G
565       - checks config file checksum
566       - checks ssh to other nodes
567
568     Args:
569       node: name of the node to check
570       file_list: required list of files
571       local_cksum: dictionary of local files and their checksums
572
573     """
574     # compares ganeti version
575     local_version = constants.PROTOCOL_VERSION
576     if not remote_version:
577       feedback_fn("  - ERROR: connection to %s failed" % (node))
578       return True
579
580     if local_version != remote_version:
581       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
582                       (local_version, node, remote_version))
583       return True
584
585     # checks vg existance and size > 20G
586
587     bad = False
588     if not vglist:
589       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
590                       (node,))
591       bad = True
592     else:
593       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
594                                             constants.MIN_VG_SIZE)
595       if vgstatus:
596         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
597         bad = True
598
599     if not node_result:
600       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
601       return True
602
603     # checks config file checksum
604     # checks ssh to any
605
606     if 'filelist' not in node_result:
607       bad = True
608       feedback_fn("  - ERROR: node hasn't returned file checksum data")
609     else:
610       remote_cksum = node_result['filelist']
611       for file_name in file_list:
612         if file_name not in remote_cksum:
613           bad = True
614           feedback_fn("  - ERROR: file '%s' missing" % file_name)
615         elif remote_cksum[file_name] != local_cksum[file_name]:
616           bad = True
617           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
618
619     if 'nodelist' not in node_result:
620       bad = True
621       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
622     else:
623       if node_result['nodelist']:
624         bad = True
625         for node in node_result['nodelist']:
626           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
627                           (node, node_result['nodelist'][node]))
628     if 'node-net-test' not in node_result:
629       bad = True
630       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
631     else:
632       if node_result['node-net-test']:
633         bad = True
634         nlist = utils.NiceSort(node_result['node-net-test'].keys())
635         for node in nlist:
636           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
637                           (node, node_result['node-net-test'][node]))
638
639     hyp_result = node_result.get('hypervisor', None)
640     if isinstance(hyp_result, dict):
641       for hv_name, hv_result in hyp_result.iteritems():
642         if hv_result is not None:
643           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
644                       (hv_name, hv_result))
645     return bad
646
647   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
648                       node_instance, feedback_fn):
649     """Verify an instance.
650
651     This function checks to see if the required block devices are
652     available on the instance's node.
653
654     """
655     bad = False
656
657     node_current = instanceconfig.primary_node
658
659     node_vol_should = {}
660     instanceconfig.MapLVsByNode(node_vol_should)
661
662     for node in node_vol_should:
663       for volume in node_vol_should[node]:
664         if node not in node_vol_is or volume not in node_vol_is[node]:
665           feedback_fn("  - ERROR: volume %s missing on node %s" %
666                           (volume, node))
667           bad = True
668
669     if not instanceconfig.status == 'down':
670       if (node_current not in node_instance or
671           not instance in node_instance[node_current]):
672         feedback_fn("  - ERROR: instance %s not running on node %s" %
673                         (instance, node_current))
674         bad = True
675
676     for node in node_instance:
677       if (not node == node_current):
678         if instance in node_instance[node]:
679           feedback_fn("  - ERROR: instance %s should not run on node %s" %
680                           (instance, node))
681           bad = True
682
683     return bad
684
685   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
686     """Verify if there are any unknown volumes in the cluster.
687
688     The .os, .swap and backup volumes are ignored. All other volumes are
689     reported as unknown.
690
691     """
692     bad = False
693
694     for node in node_vol_is:
695       for volume in node_vol_is[node]:
696         if node not in node_vol_should or volume not in node_vol_should[node]:
697           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
698                       (volume, node))
699           bad = True
700     return bad
701
702   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
703     """Verify the list of running instances.
704
705     This checks what instances are running but unknown to the cluster.
706
707     """
708     bad = False
709     for node in node_instance:
710       for runninginstance in node_instance[node]:
711         if runninginstance not in instancelist:
712           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
713                           (runninginstance, node))
714           bad = True
715     return bad
716
717   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
718     """Verify N+1 Memory Resilience.
719
720     Check that if one single node dies we can still start all the instances it
721     was primary for.
722
723     """
724     bad = False
725
726     for node, nodeinfo in node_info.iteritems():
727       # This code checks that every node which is now listed as secondary has
728       # enough memory to host all instances it is supposed to should a single
729       # other node in the cluster fail.
730       # FIXME: not ready for failover to an arbitrary node
731       # FIXME: does not support file-backed instances
732       # WARNING: we currently take into account down instances as well as up
733       # ones, considering that even if they're down someone might want to start
734       # them even in the event of a node failure.
735       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
736         needed_mem = 0
737         for instance in instances:
738           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
739           if bep[constants.BE_AUTO_BALANCE]:
740             needed_mem += bep[constants.BE_MEMORY]
741         if nodeinfo['mfree'] < needed_mem:
742           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
743                       " failovers should node %s fail" % (node, prinode))
744           bad = True
745     return bad
746
747   def CheckPrereq(self):
748     """Check prerequisites.
749
750     Transform the list of checks we're going to skip into a set and check that
751     all its members are valid.
752
753     """
754     self.skip_set = frozenset(self.op.skip_checks)
755     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
756       raise errors.OpPrereqError("Invalid checks to be skipped specified")
757
758   def BuildHooksEnv(self):
759     """Build hooks env.
760
761     Cluster-Verify hooks just rone in the post phase and their failure makes
762     the output be logged in the verify output and the verification to fail.
763
764     """
765     all_nodes = self.cfg.GetNodeList()
766     # TODO: populate the environment with useful information for verify hooks
767     env = {}
768     return env, [], all_nodes
769
770   def Exec(self, feedback_fn):
771     """Verify integrity of cluster, performing various test on nodes.
772
773     """
774     bad = False
775     feedback_fn("* Verifying global settings")
776     for msg in self.cfg.VerifyConfig():
777       feedback_fn("  - ERROR: %s" % msg)
778
779     vg_name = self.cfg.GetVGName()
780     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
781     nodelist = utils.NiceSort(self.cfg.GetNodeList())
782     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
783     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
784     i_non_redundant = [] # Non redundant instances
785     i_non_a_balanced = [] # Non auto-balanced instances
786     node_volume = {}
787     node_instance = {}
788     node_info = {}
789     instance_cfg = {}
790
791     # FIXME: verify OS list
792     # do local checksums
793     file_names = []
794     file_names.append(constants.SSL_CERT_FILE)
795     file_names.append(constants.CLUSTER_CONF_FILE)
796     local_checksums = utils.FingerprintFiles(file_names)
797
798     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
799     all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
800     all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
801     all_vglist = self.rpc.call_vg_list(nodelist)
802     node_verify_param = {
803       'filelist': file_names,
804       'nodelist': nodelist,
805       'hypervisor': hypervisors,
806       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
807                         for node in nodeinfo]
808       }
809     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
810                                            self.cfg.GetClusterName())
811     all_rversion = self.rpc.call_version(nodelist)
812     all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
813                                         self.cfg.GetHypervisorType())
814
815     cluster = self.cfg.GetClusterInfo()
816     for node in nodelist:
817       feedback_fn("* Verifying node %s" % node)
818       result = self._VerifyNode(node, file_names, local_checksums,
819                                 all_vglist[node], all_nvinfo[node],
820                                 all_rversion[node], feedback_fn)
821       bad = bad or result
822
823       # node_volume
824       volumeinfo = all_volumeinfo[node]
825
826       if isinstance(volumeinfo, basestring):
827         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
828                     (node, volumeinfo[-400:].encode('string_escape')))
829         bad = True
830         node_volume[node] = {}
831       elif not isinstance(volumeinfo, dict):
832         feedback_fn("  - ERROR: connection to %s failed" % (node,))
833         bad = True
834         continue
835       else:
836         node_volume[node] = volumeinfo
837
838       # node_instance
839       nodeinstance = all_instanceinfo[node]
840       if type(nodeinstance) != list:
841         feedback_fn("  - ERROR: connection to %s failed" % (node,))
842         bad = True
843         continue
844
845       node_instance[node] = nodeinstance
846
847       # node_info
848       nodeinfo = all_ninfo[node]
849       if not isinstance(nodeinfo, dict):
850         feedback_fn("  - ERROR: connection to %s failed" % (node,))
851         bad = True
852         continue
853
854       try:
855         node_info[node] = {
856           "mfree": int(nodeinfo['memory_free']),
857           "dfree": int(nodeinfo['vg_free']),
858           "pinst": [],
859           "sinst": [],
860           # dictionary holding all instances this node is secondary for,
861           # grouped by their primary node. Each key is a cluster node, and each
862           # value is a list of instances which have the key as primary and the
863           # current node as secondary.  this is handy to calculate N+1 memory
864           # availability if you can only failover from a primary to its
865           # secondary.
866           "sinst-by-pnode": {},
867         }
868       except ValueError:
869         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
870         bad = True
871         continue
872
873     node_vol_should = {}
874
875     for instance in instancelist:
876       feedback_fn("* Verifying instance %s" % instance)
877       inst_config = self.cfg.GetInstanceInfo(instance)
878       result =  self._VerifyInstance(instance, inst_config, node_volume,
879                                      node_instance, feedback_fn)
880       bad = bad or result
881
882       inst_config.MapLVsByNode(node_vol_should)
883
884       instance_cfg[instance] = inst_config
885
886       pnode = inst_config.primary_node
887       if pnode in node_info:
888         node_info[pnode]['pinst'].append(instance)
889       else:
890         feedback_fn("  - ERROR: instance %s, connection to primary node"
891                     " %s failed" % (instance, pnode))
892         bad = True
893
894       # If the instance is non-redundant we cannot survive losing its primary
895       # node, so we are not N+1 compliant. On the other hand we have no disk
896       # templates with more than one secondary so that situation is not well
897       # supported either.
898       # FIXME: does not support file-backed instances
899       if len(inst_config.secondary_nodes) == 0:
900         i_non_redundant.append(instance)
901       elif len(inst_config.secondary_nodes) > 1:
902         feedback_fn("  - WARNING: multiple secondaries for instance %s"
903                     % instance)
904
905       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
906         i_non_a_balanced.append(instance)
907
908       for snode in inst_config.secondary_nodes:
909         if snode in node_info:
910           node_info[snode]['sinst'].append(instance)
911           if pnode not in node_info[snode]['sinst-by-pnode']:
912             node_info[snode]['sinst-by-pnode'][pnode] = []
913           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
914         else:
915           feedback_fn("  - ERROR: instance %s, connection to secondary node"
916                       " %s failed" % (instance, snode))
917
918     feedback_fn("* Verifying orphan volumes")
919     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
920                                        feedback_fn)
921     bad = bad or result
922
923     feedback_fn("* Verifying remaining instances")
924     result = self._VerifyOrphanInstances(instancelist, node_instance,
925                                          feedback_fn)
926     bad = bad or result
927
928     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
929       feedback_fn("* Verifying N+1 Memory redundancy")
930       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
931       bad = bad or result
932
933     feedback_fn("* Other Notes")
934     if i_non_redundant:
935       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
936                   % len(i_non_redundant))
937
938     if i_non_a_balanced:
939       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
940                   % len(i_non_a_balanced))
941
942     return not bad
943
944   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
945     """Analize the post-hooks' result, handle it, and send some
946     nicely-formatted feedback back to the user.
947
948     Args:
949       phase: the hooks phase that has just been run
950       hooks_results: the results of the multi-node hooks rpc call
951       feedback_fn: function to send feedback back to the caller
952       lu_result: previous Exec result
953
954     """
955     # We only really run POST phase hooks, and are only interested in
956     # their results
957     if phase == constants.HOOKS_PHASE_POST:
958       # Used to change hooks' output to proper indentation
959       indent_re = re.compile('^', re.M)
960       feedback_fn("* Hooks Results")
961       if not hooks_results:
962         feedback_fn("  - ERROR: general communication failure")
963         lu_result = 1
964       else:
965         for node_name in hooks_results:
966           show_node_header = True
967           res = hooks_results[node_name]
968           if res is False or not isinstance(res, list):
969             feedback_fn("    Communication failure")
970             lu_result = 1
971             continue
972           for script, hkr, output in res:
973             if hkr == constants.HKR_FAIL:
974               # The node header is only shown once, if there are
975               # failing hooks on that node
976               if show_node_header:
977                 feedback_fn("  Node %s:" % node_name)
978                 show_node_header = False
979               feedback_fn("    ERROR: Script %s failed, output:" % script)
980               output = indent_re.sub('      ', output)
981               feedback_fn("%s" % output)
982               lu_result = 1
983
984       return lu_result
985
986
987 class LUVerifyDisks(NoHooksLU):
988   """Verifies the cluster disks status.
989
990   """
991   _OP_REQP = []
992   REQ_BGL = False
993
994   def ExpandNames(self):
995     self.needed_locks = {
996       locking.LEVEL_NODE: locking.ALL_SET,
997       locking.LEVEL_INSTANCE: locking.ALL_SET,
998     }
999     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1000
1001   def CheckPrereq(self):
1002     """Check prerequisites.
1003
1004     This has no prerequisites.
1005
1006     """
1007     pass
1008
1009   def Exec(self, feedback_fn):
1010     """Verify integrity of cluster disks.
1011
1012     """
1013     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1014
1015     vg_name = self.cfg.GetVGName()
1016     nodes = utils.NiceSort(self.cfg.GetNodeList())
1017     instances = [self.cfg.GetInstanceInfo(name)
1018                  for name in self.cfg.GetInstanceList()]
1019
1020     nv_dict = {}
1021     for inst in instances:
1022       inst_lvs = {}
1023       if (inst.status != "up" or
1024           inst.disk_template not in constants.DTS_NET_MIRROR):
1025         continue
1026       inst.MapLVsByNode(inst_lvs)
1027       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1028       for node, vol_list in inst_lvs.iteritems():
1029         for vol in vol_list:
1030           nv_dict[(node, vol)] = inst
1031
1032     if not nv_dict:
1033       return result
1034
1035     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1036
1037     to_act = set()
1038     for node in nodes:
1039       # node_volume
1040       lvs = node_lvs[node]
1041
1042       if isinstance(lvs, basestring):
1043         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1044         res_nlvm[node] = lvs
1045       elif not isinstance(lvs, dict):
1046         logging.warning("Connection to node %s failed or invalid data"
1047                         " returned", node)
1048         res_nodes.append(node)
1049         continue
1050
1051       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1052         inst = nv_dict.pop((node, lv_name), None)
1053         if (not lv_online and inst is not None
1054             and inst.name not in res_instances):
1055           res_instances.append(inst.name)
1056
1057     # any leftover items in nv_dict are missing LVs, let's arrange the
1058     # data better
1059     for key, inst in nv_dict.iteritems():
1060       if inst.name not in res_missing:
1061         res_missing[inst.name] = []
1062       res_missing[inst.name].append(key)
1063
1064     return result
1065
1066
1067 class LURenameCluster(LogicalUnit):
1068   """Rename the cluster.
1069
1070   """
1071   HPATH = "cluster-rename"
1072   HTYPE = constants.HTYPE_CLUSTER
1073   _OP_REQP = ["name"]
1074
1075   def BuildHooksEnv(self):
1076     """Build hooks env.
1077
1078     """
1079     env = {
1080       "OP_TARGET": self.cfg.GetClusterName(),
1081       "NEW_NAME": self.op.name,
1082       }
1083     mn = self.cfg.GetMasterNode()
1084     return env, [mn], [mn]
1085
1086   def CheckPrereq(self):
1087     """Verify that the passed name is a valid one.
1088
1089     """
1090     hostname = utils.HostInfo(self.op.name)
1091
1092     new_name = hostname.name
1093     self.ip = new_ip = hostname.ip
1094     old_name = self.cfg.GetClusterName()
1095     old_ip = self.cfg.GetMasterIP()
1096     if new_name == old_name and new_ip == old_ip:
1097       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1098                                  " cluster has changed")
1099     if new_ip != old_ip:
1100       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1101         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1102                                    " reachable on the network. Aborting." %
1103                                    new_ip)
1104
1105     self.op.name = new_name
1106
1107   def Exec(self, feedback_fn):
1108     """Rename the cluster.
1109
1110     """
1111     clustername = self.op.name
1112     ip = self.ip
1113
1114     # shutdown the master IP
1115     master = self.cfg.GetMasterNode()
1116     if not self.rpc.call_node_stop_master(master, False):
1117       raise errors.OpExecError("Could not disable the master role")
1118
1119     try:
1120       # modify the sstore
1121       # TODO: sstore
1122       ss.SetKey(ss.SS_MASTER_IP, ip)
1123       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1124
1125       # Distribute updated ss config to all nodes
1126       myself = self.cfg.GetNodeInfo(master)
1127       dist_nodes = self.cfg.GetNodeList()
1128       if myself.name in dist_nodes:
1129         dist_nodes.remove(myself.name)
1130
1131       logging.debug("Copying updated ssconf data to all nodes")
1132       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1133         fname = ss.KeyToFilename(keyname)
1134         result = self.rpc.call_upload_file(dist_nodes, fname)
1135         for to_node in dist_nodes:
1136           if not result[to_node]:
1137             self.LogWarning("Copy of file %s to node %s failed",
1138                             fname, to_node)
1139     finally:
1140       if not self.rpc.call_node_start_master(master, False):
1141         self.LogWarning("Could not re-enable the master role on"
1142                         " the master, please restart manually.")
1143
1144
1145 def _RecursiveCheckIfLVMBased(disk):
1146   """Check if the given disk or its children are lvm-based.
1147
1148   Args:
1149     disk: ganeti.objects.Disk object
1150
1151   Returns:
1152     boolean indicating whether a LD_LV dev_type was found or not
1153
1154   """
1155   if disk.children:
1156     for chdisk in disk.children:
1157       if _RecursiveCheckIfLVMBased(chdisk):
1158         return True
1159   return disk.dev_type == constants.LD_LV
1160
1161
1162 class LUSetClusterParams(LogicalUnit):
1163   """Change the parameters of the cluster.
1164
1165   """
1166   HPATH = "cluster-modify"
1167   HTYPE = constants.HTYPE_CLUSTER
1168   _OP_REQP = []
1169   REQ_BGL = False
1170
1171   def ExpandNames(self):
1172     # FIXME: in the future maybe other cluster params won't require checking on
1173     # all nodes to be modified.
1174     self.needed_locks = {
1175       locking.LEVEL_NODE: locking.ALL_SET,
1176     }
1177     self.share_locks[locking.LEVEL_NODE] = 1
1178
1179   def BuildHooksEnv(self):
1180     """Build hooks env.
1181
1182     """
1183     env = {
1184       "OP_TARGET": self.cfg.GetClusterName(),
1185       "NEW_VG_NAME": self.op.vg_name,
1186       }
1187     mn = self.cfg.GetMasterNode()
1188     return env, [mn], [mn]
1189
1190   def CheckPrereq(self):
1191     """Check prerequisites.
1192
1193     This checks whether the given params don't conflict and
1194     if the given volume group is valid.
1195
1196     """
1197     # FIXME: This only works because there is only one parameter that can be
1198     # changed or removed.
1199     if self.op.vg_name is not None and not self.op.vg_name:
1200       instances = self.cfg.GetAllInstancesInfo().values()
1201       for inst in instances:
1202         for disk in inst.disks:
1203           if _RecursiveCheckIfLVMBased(disk):
1204             raise errors.OpPrereqError("Cannot disable lvm storage while"
1205                                        " lvm-based instances exist")
1206
1207     node_list = self.acquired_locks[locking.LEVEL_NODE]
1208
1209     # if vg_name not None, checks given volume group on all nodes
1210     if self.op.vg_name:
1211       vglist = self.rpc.call_vg_list(node_list)
1212       for node in node_list:
1213         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1214                                               constants.MIN_VG_SIZE)
1215         if vgstatus:
1216           raise errors.OpPrereqError("Error on node '%s': %s" %
1217                                      (node, vgstatus))
1218
1219     self.cluster = cluster = self.cfg.GetClusterInfo()
1220     # beparams changes do not need validation (we can't validate?),
1221     # but we still process here
1222     if self.op.beparams:
1223       self.new_beparams = cluster.FillDict(
1224         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1225
1226     # hypervisor list/parameters
1227     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1228     if self.op.hvparams:
1229       if not isinstance(self.op.hvparams, dict):
1230         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1231       for hv_name, hv_dict in self.op.hvparams.items():
1232         if hv_name not in self.new_hvparams:
1233           self.new_hvparams[hv_name] = hv_dict
1234         else:
1235           self.new_hvparams[hv_name].update(hv_dict)
1236
1237     if self.op.enabled_hypervisors is not None:
1238       self.hv_list = self.op.enabled_hypervisors
1239     else:
1240       self.hv_list = cluster.enabled_hypervisors
1241
1242     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1243       # either the enabled list has changed, or the parameters have, validate
1244       for hv_name, hv_params in self.new_hvparams.items():
1245         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1246             (self.op.enabled_hypervisors and
1247              hv_name in self.op.enabled_hypervisors)):
1248           # either this is a new hypervisor, or its parameters have changed
1249           hv_class = hypervisor.GetHypervisor(hv_name)
1250           hv_class.CheckParameterSyntax(hv_params)
1251           _CheckHVParams(self, node_list, hv_name, hv_params)
1252
1253   def Exec(self, feedback_fn):
1254     """Change the parameters of the cluster.
1255
1256     """
1257     if self.op.vg_name is not None:
1258       if self.op.vg_name != self.cfg.GetVGName():
1259         self.cfg.SetVGName(self.op.vg_name)
1260       else:
1261         feedback_fn("Cluster LVM configuration already in desired"
1262                     " state, not changing")
1263     if self.op.hvparams:
1264       self.cluster.hvparams = self.new_hvparams
1265     if self.op.enabled_hypervisors is not None:
1266       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1267     if self.op.beparams:
1268       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1269     self.cfg.Update(self.cluster)
1270
1271
1272 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1273   """Sleep and poll for an instance's disk to sync.
1274
1275   """
1276   if not instance.disks:
1277     return True
1278
1279   if not oneshot:
1280     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1281
1282   node = instance.primary_node
1283
1284   for dev in instance.disks:
1285     lu.cfg.SetDiskID(dev, node)
1286
1287   retries = 0
1288   while True:
1289     max_time = 0
1290     done = True
1291     cumul_degraded = False
1292     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1293     if not rstats:
1294       lu.LogWarning("Can't get any data from node %s", node)
1295       retries += 1
1296       if retries >= 10:
1297         raise errors.RemoteError("Can't contact node %s for mirror data,"
1298                                  " aborting." % node)
1299       time.sleep(6)
1300       continue
1301     retries = 0
1302     for i in range(len(rstats)):
1303       mstat = rstats[i]
1304       if mstat is None:
1305         lu.LogWarning("Can't compute data for node %s/%s",
1306                            node, instance.disks[i].iv_name)
1307         continue
1308       # we ignore the ldisk parameter
1309       perc_done, est_time, is_degraded, _ = mstat
1310       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1311       if perc_done is not None:
1312         done = False
1313         if est_time is not None:
1314           rem_time = "%d estimated seconds remaining" % est_time
1315           max_time = est_time
1316         else:
1317           rem_time = "no time estimate"
1318         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1319                         (instance.disks[i].iv_name, perc_done, rem_time))
1320     if done or oneshot:
1321       break
1322
1323     time.sleep(min(60, max_time))
1324
1325   if done:
1326     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1327   return not cumul_degraded
1328
1329
1330 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1331   """Check that mirrors are not degraded.
1332
1333   The ldisk parameter, if True, will change the test from the
1334   is_degraded attribute (which represents overall non-ok status for
1335   the device(s)) to the ldisk (representing the local storage status).
1336
1337   """
1338   lu.cfg.SetDiskID(dev, node)
1339   if ldisk:
1340     idx = 6
1341   else:
1342     idx = 5
1343
1344   result = True
1345   if on_primary or dev.AssembleOnSecondary():
1346     rstats = lu.rpc.call_blockdev_find(node, dev)
1347     if not rstats:
1348       logging.warning("Node %s: disk degraded, not found or node down", node)
1349       result = False
1350     else:
1351       result = result and (not rstats[idx])
1352   if dev.children:
1353     for child in dev.children:
1354       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1355
1356   return result
1357
1358
1359 class LUDiagnoseOS(NoHooksLU):
1360   """Logical unit for OS diagnose/query.
1361
1362   """
1363   _OP_REQP = ["output_fields", "names"]
1364   REQ_BGL = False
1365   _FIELDS_STATIC = _FieldSet()
1366   _FIELDS_DYNAMIC = _FieldSet("name", "valid", "node_status")
1367
1368   def ExpandNames(self):
1369     if self.op.names:
1370       raise errors.OpPrereqError("Selective OS query not supported")
1371
1372     _CheckOutputFields(static=self._FIELDS_STATIC,
1373                        dynamic=self._FIELDS_DYNAMIC,
1374                        selected=self.op.output_fields)
1375
1376     # Lock all nodes, in shared mode
1377     self.needed_locks = {}
1378     self.share_locks[locking.LEVEL_NODE] = 1
1379     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1380
1381   def CheckPrereq(self):
1382     """Check prerequisites.
1383
1384     """
1385
1386   @staticmethod
1387   def _DiagnoseByOS(node_list, rlist):
1388     """Remaps a per-node return list into an a per-os per-node dictionary
1389
1390       Args:
1391         node_list: a list with the names of all nodes
1392         rlist: a map with node names as keys and OS objects as values
1393
1394       Returns:
1395         map: a map with osnames as keys and as value another map, with
1396              nodes as
1397              keys and list of OS objects as values
1398              e.g. {"debian-etch": {"node1": [<object>,...],
1399                                    "node2": [<object>,]}
1400                   }
1401
1402     """
1403     all_os = {}
1404     for node_name, nr in rlist.iteritems():
1405       if not nr:
1406         continue
1407       for os_obj in nr:
1408         if os_obj.name not in all_os:
1409           # build a list of nodes for this os containing empty lists
1410           # for each node in node_list
1411           all_os[os_obj.name] = {}
1412           for nname in node_list:
1413             all_os[os_obj.name][nname] = []
1414         all_os[os_obj.name][node_name].append(os_obj)
1415     return all_os
1416
1417   def Exec(self, feedback_fn):
1418     """Compute the list of OSes.
1419
1420     """
1421     node_list = self.acquired_locks[locking.LEVEL_NODE]
1422     node_data = self.rpc.call_os_diagnose(node_list)
1423     if node_data == False:
1424       raise errors.OpExecError("Can't gather the list of OSes")
1425     pol = self._DiagnoseByOS(node_list, node_data)
1426     output = []
1427     for os_name, os_data in pol.iteritems():
1428       row = []
1429       for field in self.op.output_fields:
1430         if field == "name":
1431           val = os_name
1432         elif field == "valid":
1433           val = utils.all([osl and osl[0] for osl in os_data.values()])
1434         elif field == "node_status":
1435           val = {}
1436           for node_name, nos_list in os_data.iteritems():
1437             val[node_name] = [(v.status, v.path) for v in nos_list]
1438         else:
1439           raise errors.ParameterError(field)
1440         row.append(val)
1441       output.append(row)
1442
1443     return output
1444
1445
1446 class LURemoveNode(LogicalUnit):
1447   """Logical unit for removing a node.
1448
1449   """
1450   HPATH = "node-remove"
1451   HTYPE = constants.HTYPE_NODE
1452   _OP_REQP = ["node_name"]
1453
1454   def BuildHooksEnv(self):
1455     """Build hooks env.
1456
1457     This doesn't run on the target node in the pre phase as a failed
1458     node would then be impossible to remove.
1459
1460     """
1461     env = {
1462       "OP_TARGET": self.op.node_name,
1463       "NODE_NAME": self.op.node_name,
1464       }
1465     all_nodes = self.cfg.GetNodeList()
1466     all_nodes.remove(self.op.node_name)
1467     return env, all_nodes, all_nodes
1468
1469   def CheckPrereq(self):
1470     """Check prerequisites.
1471
1472     This checks:
1473      - the node exists in the configuration
1474      - it does not have primary or secondary instances
1475      - it's not the master
1476
1477     Any errors are signalled by raising errors.OpPrereqError.
1478
1479     """
1480     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1481     if node is None:
1482       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1483
1484     instance_list = self.cfg.GetInstanceList()
1485
1486     masternode = self.cfg.GetMasterNode()
1487     if node.name == masternode:
1488       raise errors.OpPrereqError("Node is the master node,"
1489                                  " you need to failover first.")
1490
1491     for instance_name in instance_list:
1492       instance = self.cfg.GetInstanceInfo(instance_name)
1493       if node.name == instance.primary_node:
1494         raise errors.OpPrereqError("Instance %s still running on the node,"
1495                                    " please remove first." % instance_name)
1496       if node.name in instance.secondary_nodes:
1497         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1498                                    " please remove first." % instance_name)
1499     self.op.node_name = node.name
1500     self.node = node
1501
1502   def Exec(self, feedback_fn):
1503     """Removes the node from the cluster.
1504
1505     """
1506     node = self.node
1507     logging.info("Stopping the node daemon and removing configs from node %s",
1508                  node.name)
1509
1510     self.context.RemoveNode(node.name)
1511
1512     self.rpc.call_node_leave_cluster(node.name)
1513
1514
1515 class LUQueryNodes(NoHooksLU):
1516   """Logical unit for querying nodes.
1517
1518   """
1519   _OP_REQP = ["output_fields", "names"]
1520   REQ_BGL = False
1521   _FIELDS_DYNAMIC = _FieldSet(
1522     "dtotal", "dfree",
1523     "mtotal", "mnode", "mfree",
1524     "bootid",
1525     "ctotal",
1526     )
1527
1528   _FIELDS_STATIC = _FieldSet(
1529     "name", "pinst_cnt", "sinst_cnt",
1530     "pinst_list", "sinst_list",
1531     "pip", "sip", "tags",
1532     "serial_no",
1533     )
1534
1535   def ExpandNames(self):
1536     _CheckOutputFields(static=self._FIELDS_STATIC,
1537                        dynamic=self._FIELDS_DYNAMIC,
1538                        selected=self.op.output_fields)
1539
1540     self.needed_locks = {}
1541     self.share_locks[locking.LEVEL_NODE] = 1
1542
1543     if self.op.names:
1544       self.wanted = _GetWantedNodes(self, self.op.names)
1545     else:
1546       self.wanted = locking.ALL_SET
1547
1548     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1549     if self.do_locking:
1550       # if we don't request only static fields, we need to lock the nodes
1551       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1552
1553
1554   def CheckPrereq(self):
1555     """Check prerequisites.
1556
1557     """
1558     # The validation of the node list is done in the _GetWantedNodes,
1559     # if non empty, and if empty, there's no validation to do
1560     pass
1561
1562   def Exec(self, feedback_fn):
1563     """Computes the list of nodes and their attributes.
1564
1565     """
1566     all_info = self.cfg.GetAllNodesInfo()
1567     if self.do_locking:
1568       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1569     elif self.wanted != locking.ALL_SET:
1570       nodenames = self.wanted
1571       missing = set(nodenames).difference(all_info.keys())
1572       if missing:
1573         raise errors.OpExecError(
1574           "Some nodes were removed before retrieving their data: %s" % missing)
1575     else:
1576       nodenames = all_info.keys()
1577
1578     nodenames = utils.NiceSort(nodenames)
1579     nodelist = [all_info[name] for name in nodenames]
1580
1581     # begin data gathering
1582
1583     if self.do_locking:
1584       live_data = {}
1585       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1586                                           self.cfg.GetHypervisorType())
1587       for name in nodenames:
1588         nodeinfo = node_data.get(name, None)
1589         if nodeinfo:
1590           live_data[name] = {
1591             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1592             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1593             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1594             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1595             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1596             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1597             "bootid": nodeinfo['bootid'],
1598             }
1599         else:
1600           live_data[name] = {}
1601     else:
1602       live_data = dict.fromkeys(nodenames, {})
1603
1604     node_to_primary = dict([(name, set()) for name in nodenames])
1605     node_to_secondary = dict([(name, set()) for name in nodenames])
1606
1607     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1608                              "sinst_cnt", "sinst_list"))
1609     if inst_fields & frozenset(self.op.output_fields):
1610       instancelist = self.cfg.GetInstanceList()
1611
1612       for instance_name in instancelist:
1613         inst = self.cfg.GetInstanceInfo(instance_name)
1614         if inst.primary_node in node_to_primary:
1615           node_to_primary[inst.primary_node].add(inst.name)
1616         for secnode in inst.secondary_nodes:
1617           if secnode in node_to_secondary:
1618             node_to_secondary[secnode].add(inst.name)
1619
1620     # end data gathering
1621
1622     output = []
1623     for node in nodelist:
1624       node_output = []
1625       for field in self.op.output_fields:
1626         if field == "name":
1627           val = node.name
1628         elif field == "pinst_list":
1629           val = list(node_to_primary[node.name])
1630         elif field == "sinst_list":
1631           val = list(node_to_secondary[node.name])
1632         elif field == "pinst_cnt":
1633           val = len(node_to_primary[node.name])
1634         elif field == "sinst_cnt":
1635           val = len(node_to_secondary[node.name])
1636         elif field == "pip":
1637           val = node.primary_ip
1638         elif field == "sip":
1639           val = node.secondary_ip
1640         elif field == "tags":
1641           val = list(node.GetTags())
1642         elif field == "serial_no":
1643           val = node.serial_no
1644         elif self._FIELDS_DYNAMIC.Matches(field):
1645           val = live_data[node.name].get(field, None)
1646         else:
1647           raise errors.ParameterError(field)
1648         node_output.append(val)
1649       output.append(node_output)
1650
1651     return output
1652
1653
1654 class LUQueryNodeVolumes(NoHooksLU):
1655   """Logical unit for getting volumes on node(s).
1656
1657   """
1658   _OP_REQP = ["nodes", "output_fields"]
1659   REQ_BGL = False
1660   _FIELDS_DYNAMIC = _FieldSet("phys", "vg", "name", "size", "instance")
1661   _FIELDS_STATIC = _FieldSet("node")
1662
1663   def ExpandNames(self):
1664     _CheckOutputFields(static=self._FIELDS_STATIC,
1665                        dynamic=self._FIELDS_DYNAMIC,
1666                        selected=self.op.output_fields)
1667
1668     self.needed_locks = {}
1669     self.share_locks[locking.LEVEL_NODE] = 1
1670     if not self.op.nodes:
1671       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1672     else:
1673       self.needed_locks[locking.LEVEL_NODE] = \
1674         _GetWantedNodes(self, self.op.nodes)
1675
1676   def CheckPrereq(self):
1677     """Check prerequisites.
1678
1679     This checks that the fields required are valid output fields.
1680
1681     """
1682     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1683
1684   def Exec(self, feedback_fn):
1685     """Computes the list of nodes and their attributes.
1686
1687     """
1688     nodenames = self.nodes
1689     volumes = self.rpc.call_node_volumes(nodenames)
1690
1691     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1692              in self.cfg.GetInstanceList()]
1693
1694     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1695
1696     output = []
1697     for node in nodenames:
1698       if node not in volumes or not volumes[node]:
1699         continue
1700
1701       node_vols = volumes[node][:]
1702       node_vols.sort(key=lambda vol: vol['dev'])
1703
1704       for vol in node_vols:
1705         node_output = []
1706         for field in self.op.output_fields:
1707           if field == "node":
1708             val = node
1709           elif field == "phys":
1710             val = vol['dev']
1711           elif field == "vg":
1712             val = vol['vg']
1713           elif field == "name":
1714             val = vol['name']
1715           elif field == "size":
1716             val = int(float(vol['size']))
1717           elif field == "instance":
1718             for inst in ilist:
1719               if node not in lv_by_node[inst]:
1720                 continue
1721               if vol['name'] in lv_by_node[inst][node]:
1722                 val = inst.name
1723                 break
1724             else:
1725               val = '-'
1726           else:
1727             raise errors.ParameterError(field)
1728           node_output.append(str(val))
1729
1730         output.append(node_output)
1731
1732     return output
1733
1734
1735 class LUAddNode(LogicalUnit):
1736   """Logical unit for adding node to the cluster.
1737
1738   """
1739   HPATH = "node-add"
1740   HTYPE = constants.HTYPE_NODE
1741   _OP_REQP = ["node_name"]
1742
1743   def BuildHooksEnv(self):
1744     """Build hooks env.
1745
1746     This will run on all nodes before, and on all nodes + the new node after.
1747
1748     """
1749     env = {
1750       "OP_TARGET": self.op.node_name,
1751       "NODE_NAME": self.op.node_name,
1752       "NODE_PIP": self.op.primary_ip,
1753       "NODE_SIP": self.op.secondary_ip,
1754       }
1755     nodes_0 = self.cfg.GetNodeList()
1756     nodes_1 = nodes_0 + [self.op.node_name, ]
1757     return env, nodes_0, nodes_1
1758
1759   def CheckPrereq(self):
1760     """Check prerequisites.
1761
1762     This checks:
1763      - the new node is not already in the config
1764      - it is resolvable
1765      - its parameters (single/dual homed) matches the cluster
1766
1767     Any errors are signalled by raising errors.OpPrereqError.
1768
1769     """
1770     node_name = self.op.node_name
1771     cfg = self.cfg
1772
1773     dns_data = utils.HostInfo(node_name)
1774
1775     node = dns_data.name
1776     primary_ip = self.op.primary_ip = dns_data.ip
1777     secondary_ip = getattr(self.op, "secondary_ip", None)
1778     if secondary_ip is None:
1779       secondary_ip = primary_ip
1780     if not utils.IsValidIP(secondary_ip):
1781       raise errors.OpPrereqError("Invalid secondary IP given")
1782     self.op.secondary_ip = secondary_ip
1783
1784     node_list = cfg.GetNodeList()
1785     if not self.op.readd and node in node_list:
1786       raise errors.OpPrereqError("Node %s is already in the configuration" %
1787                                  node)
1788     elif self.op.readd and node not in node_list:
1789       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1790
1791     for existing_node_name in node_list:
1792       existing_node = cfg.GetNodeInfo(existing_node_name)
1793
1794       if self.op.readd and node == existing_node_name:
1795         if (existing_node.primary_ip != primary_ip or
1796             existing_node.secondary_ip != secondary_ip):
1797           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1798                                      " address configuration as before")
1799         continue
1800
1801       if (existing_node.primary_ip == primary_ip or
1802           existing_node.secondary_ip == primary_ip or
1803           existing_node.primary_ip == secondary_ip or
1804           existing_node.secondary_ip == secondary_ip):
1805         raise errors.OpPrereqError("New node ip address(es) conflict with"
1806                                    " existing node %s" % existing_node.name)
1807
1808     # check that the type of the node (single versus dual homed) is the
1809     # same as for the master
1810     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1811     master_singlehomed = myself.secondary_ip == myself.primary_ip
1812     newbie_singlehomed = secondary_ip == primary_ip
1813     if master_singlehomed != newbie_singlehomed:
1814       if master_singlehomed:
1815         raise errors.OpPrereqError("The master has no private ip but the"
1816                                    " new node has one")
1817       else:
1818         raise errors.OpPrereqError("The master has a private ip but the"
1819                                    " new node doesn't have one")
1820
1821     # checks reachablity
1822     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1823       raise errors.OpPrereqError("Node not reachable by ping")
1824
1825     if not newbie_singlehomed:
1826       # check reachability from my secondary ip to newbie's secondary ip
1827       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1828                            source=myself.secondary_ip):
1829         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1830                                    " based ping to noded port")
1831
1832     self.new_node = objects.Node(name=node,
1833                                  primary_ip=primary_ip,
1834                                  secondary_ip=secondary_ip)
1835
1836   def Exec(self, feedback_fn):
1837     """Adds the new node to the cluster.
1838
1839     """
1840     new_node = self.new_node
1841     node = new_node.name
1842
1843     # check connectivity
1844     result = self.rpc.call_version([node])[node]
1845     if result:
1846       if constants.PROTOCOL_VERSION == result:
1847         logging.info("Communication to node %s fine, sw version %s match",
1848                      node, result)
1849       else:
1850         raise errors.OpExecError("Version mismatch master version %s,"
1851                                  " node version %s" %
1852                                  (constants.PROTOCOL_VERSION, result))
1853     else:
1854       raise errors.OpExecError("Cannot get version from the new node")
1855
1856     # setup ssh on node
1857     logging.info("Copy ssh key to node %s", node)
1858     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1859     keyarray = []
1860     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1861                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1862                 priv_key, pub_key]
1863
1864     for i in keyfiles:
1865       f = open(i, 'r')
1866       try:
1867         keyarray.append(f.read())
1868       finally:
1869         f.close()
1870
1871     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1872                                     keyarray[2],
1873                                     keyarray[3], keyarray[4], keyarray[5])
1874
1875     if not result:
1876       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1877
1878     # Add node to our /etc/hosts, and add key to known_hosts
1879     utils.AddHostToEtcHosts(new_node.name)
1880
1881     if new_node.secondary_ip != new_node.primary_ip:
1882       if not self.rpc.call_node_has_ip_address(new_node.name,
1883                                                new_node.secondary_ip):
1884         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1885                                  " you gave (%s). Please fix and re-run this"
1886                                  " command." % new_node.secondary_ip)
1887
1888     node_verify_list = [self.cfg.GetMasterNode()]
1889     node_verify_param = {
1890       'nodelist': [node],
1891       # TODO: do a node-net-test as well?
1892     }
1893
1894     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1895                                        self.cfg.GetClusterName())
1896     for verifier in node_verify_list:
1897       if not result[verifier]:
1898         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1899                                  " for remote verification" % verifier)
1900       if result[verifier]['nodelist']:
1901         for failed in result[verifier]['nodelist']:
1902           feedback_fn("ssh/hostname verification failed %s -> %s" %
1903                       (verifier, result[verifier]['nodelist'][failed]))
1904         raise errors.OpExecError("ssh/hostname verification failed.")
1905
1906     # Distribute updated /etc/hosts and known_hosts to all nodes,
1907     # including the node just added
1908     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1909     dist_nodes = self.cfg.GetNodeList()
1910     if not self.op.readd:
1911       dist_nodes.append(node)
1912     if myself.name in dist_nodes:
1913       dist_nodes.remove(myself.name)
1914
1915     logging.debug("Copying hosts and known_hosts to all nodes")
1916     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1917       result = self.rpc.call_upload_file(dist_nodes, fname)
1918       for to_node in dist_nodes:
1919         if not result[to_node]:
1920           logging.error("Copy of file %s to node %s failed", fname, to_node)
1921
1922     to_copy = []
1923     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1924       to_copy.append(constants.VNC_PASSWORD_FILE)
1925     for fname in to_copy:
1926       result = self.rpc.call_upload_file([node], fname)
1927       if not result[node]:
1928         logging.error("Could not copy file %s to node %s", fname, node)
1929
1930     if self.op.readd:
1931       self.context.ReaddNode(new_node)
1932     else:
1933       self.context.AddNode(new_node)
1934
1935
1936 class LUQueryClusterInfo(NoHooksLU):
1937   """Query cluster configuration.
1938
1939   """
1940   _OP_REQP = []
1941   REQ_MASTER = False
1942   REQ_BGL = False
1943
1944   def ExpandNames(self):
1945     self.needed_locks = {}
1946
1947   def CheckPrereq(self):
1948     """No prerequsites needed for this LU.
1949
1950     """
1951     pass
1952
1953   def Exec(self, feedback_fn):
1954     """Return cluster config.
1955
1956     """
1957     cluster = self.cfg.GetClusterInfo()
1958     result = {
1959       "software_version": constants.RELEASE_VERSION,
1960       "protocol_version": constants.PROTOCOL_VERSION,
1961       "config_version": constants.CONFIG_VERSION,
1962       "os_api_version": constants.OS_API_VERSION,
1963       "export_version": constants.EXPORT_VERSION,
1964       "architecture": (platform.architecture()[0], platform.machine()),
1965       "name": cluster.cluster_name,
1966       "master": cluster.master_node,
1967       "default_hypervisor": cluster.default_hypervisor,
1968       "enabled_hypervisors": cluster.enabled_hypervisors,
1969       "hvparams": cluster.hvparams,
1970       "beparams": cluster.beparams,
1971       }
1972
1973     return result
1974
1975
1976 class LUQueryConfigValues(NoHooksLU):
1977   """Return configuration values.
1978
1979   """
1980   _OP_REQP = []
1981   REQ_BGL = False
1982   _FIELDS_DYNAMIC = _FieldSet()
1983   _FIELDS_STATIC = _FieldSet("cluster_name", "master_node", "drain_flag")
1984
1985   def ExpandNames(self):
1986     self.needed_locks = {}
1987
1988     _CheckOutputFields(static=self._FIELDS_STATIC,
1989                        dynamic=self._FIELDS_DYNAMIC,
1990                        selected=self.op.output_fields)
1991
1992   def CheckPrereq(self):
1993     """No prerequisites.
1994
1995     """
1996     pass
1997
1998   def Exec(self, feedback_fn):
1999     """Dump a representation of the cluster config to the standard output.
2000
2001     """
2002     values = []
2003     for field in self.op.output_fields:
2004       if field == "cluster_name":
2005         entry = self.cfg.GetClusterName()
2006       elif field == "master_node":
2007         entry = self.cfg.GetMasterNode()
2008       elif field == "drain_flag":
2009         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2010       else:
2011         raise errors.ParameterError(field)
2012       values.append(entry)
2013     return values
2014
2015
2016 class LUActivateInstanceDisks(NoHooksLU):
2017   """Bring up an instance's disks.
2018
2019   """
2020   _OP_REQP = ["instance_name"]
2021   REQ_BGL = False
2022
2023   def ExpandNames(self):
2024     self._ExpandAndLockInstance()
2025     self.needed_locks[locking.LEVEL_NODE] = []
2026     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2027
2028   def DeclareLocks(self, level):
2029     if level == locking.LEVEL_NODE:
2030       self._LockInstancesNodes()
2031
2032   def CheckPrereq(self):
2033     """Check prerequisites.
2034
2035     This checks that the instance is in the cluster.
2036
2037     """
2038     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2039     assert self.instance is not None, \
2040       "Cannot retrieve locked instance %s" % self.op.instance_name
2041
2042   def Exec(self, feedback_fn):
2043     """Activate the disks.
2044
2045     """
2046     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2047     if not disks_ok:
2048       raise errors.OpExecError("Cannot activate block devices")
2049
2050     return disks_info
2051
2052
2053 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2054   """Prepare the block devices for an instance.
2055
2056   This sets up the block devices on all nodes.
2057
2058   Args:
2059     instance: a ganeti.objects.Instance object
2060     ignore_secondaries: if true, errors on secondary nodes won't result
2061                         in an error return from the function
2062
2063   Returns:
2064     false if the operation failed
2065     list of (host, instance_visible_name, node_visible_name) if the operation
2066          suceeded with the mapping from node devices to instance devices
2067   """
2068   device_info = []
2069   disks_ok = True
2070   iname = instance.name
2071   # With the two passes mechanism we try to reduce the window of
2072   # opportunity for the race condition of switching DRBD to primary
2073   # before handshaking occured, but we do not eliminate it
2074
2075   # The proper fix would be to wait (with some limits) until the
2076   # connection has been made and drbd transitions from WFConnection
2077   # into any other network-connected state (Connected, SyncTarget,
2078   # SyncSource, etc.)
2079
2080   # 1st pass, assemble on all nodes in secondary mode
2081   for inst_disk in instance.disks:
2082     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2083       lu.cfg.SetDiskID(node_disk, node)
2084       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2085       if not result:
2086         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2087                            " (is_primary=False, pass=1)",
2088                            inst_disk.iv_name, node)
2089         if not ignore_secondaries:
2090           disks_ok = False
2091
2092   # FIXME: race condition on drbd migration to primary
2093
2094   # 2nd pass, do only the primary node
2095   for inst_disk in instance.disks:
2096     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2097       if node != instance.primary_node:
2098         continue
2099       lu.cfg.SetDiskID(node_disk, node)
2100       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2101       if not result:
2102         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2103                            " (is_primary=True, pass=2)",
2104                            inst_disk.iv_name, node)
2105         disks_ok = False
2106     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2107
2108   # leave the disks configured for the primary node
2109   # this is a workaround that would be fixed better by
2110   # improving the logical/physical id handling
2111   for disk in instance.disks:
2112     lu.cfg.SetDiskID(disk, instance.primary_node)
2113
2114   return disks_ok, device_info
2115
2116
2117 def _StartInstanceDisks(lu, instance, force):
2118   """Start the disks of an instance.
2119
2120   """
2121   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2122                                            ignore_secondaries=force)
2123   if not disks_ok:
2124     _ShutdownInstanceDisks(lu, instance)
2125     if force is not None and not force:
2126       lu.proc.LogWarning("", hint="If the message above refers to a"
2127                          " secondary node,"
2128                          " you can retry the operation using '--force'.")
2129     raise errors.OpExecError("Disk consistency error")
2130
2131
2132 class LUDeactivateInstanceDisks(NoHooksLU):
2133   """Shutdown an instance's disks.
2134
2135   """
2136   _OP_REQP = ["instance_name"]
2137   REQ_BGL = False
2138
2139   def ExpandNames(self):
2140     self._ExpandAndLockInstance()
2141     self.needed_locks[locking.LEVEL_NODE] = []
2142     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2143
2144   def DeclareLocks(self, level):
2145     if level == locking.LEVEL_NODE:
2146       self._LockInstancesNodes()
2147
2148   def CheckPrereq(self):
2149     """Check prerequisites.
2150
2151     This checks that the instance is in the cluster.
2152
2153     """
2154     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2155     assert self.instance is not None, \
2156       "Cannot retrieve locked instance %s" % self.op.instance_name
2157
2158   def Exec(self, feedback_fn):
2159     """Deactivate the disks
2160
2161     """
2162     instance = self.instance
2163     _SafeShutdownInstanceDisks(self, instance)
2164
2165
2166 def _SafeShutdownInstanceDisks(lu, instance):
2167   """Shutdown block devices of an instance.
2168
2169   This function checks if an instance is running, before calling
2170   _ShutdownInstanceDisks.
2171
2172   """
2173   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2174                                       [instance.hypervisor])
2175   ins_l = ins_l[instance.primary_node]
2176   if not type(ins_l) is list:
2177     raise errors.OpExecError("Can't contact node '%s'" %
2178                              instance.primary_node)
2179
2180   if instance.name in ins_l:
2181     raise errors.OpExecError("Instance is running, can't shutdown"
2182                              " block devices.")
2183
2184   _ShutdownInstanceDisks(lu, instance)
2185
2186
2187 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2188   """Shutdown block devices of an instance.
2189
2190   This does the shutdown on all nodes of the instance.
2191
2192   If the ignore_primary is false, errors on the primary node are
2193   ignored.
2194
2195   """
2196   result = True
2197   for disk in instance.disks:
2198     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2199       lu.cfg.SetDiskID(top_disk, node)
2200       if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2201         logging.error("Could not shutdown block device %s on node %s",
2202                       disk.iv_name, node)
2203         if not ignore_primary or node != instance.primary_node:
2204           result = False
2205   return result
2206
2207
2208 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2209   """Checks if a node has enough free memory.
2210
2211   This function check if a given node has the needed amount of free
2212   memory. In case the node has less memory or we cannot get the
2213   information from the node, this function raise an OpPrereqError
2214   exception.
2215
2216   @type lu: C{LogicalUnit}
2217   @param lu: a logical unit from which we get configuration data
2218   @type node: C{str}
2219   @param node: the node to check
2220   @type reason: C{str}
2221   @param reason: string to use in the error message
2222   @type requested: C{int}
2223   @param requested: the amount of memory in MiB to check for
2224   @type hypervisor: C{str}
2225   @param hypervisor: the hypervisor to ask for memory stats
2226   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2227       we cannot check the node
2228
2229   """
2230   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2231   if not nodeinfo or not isinstance(nodeinfo, dict):
2232     raise errors.OpPrereqError("Could not contact node %s for resource"
2233                              " information" % (node,))
2234
2235   free_mem = nodeinfo[node].get('memory_free')
2236   if not isinstance(free_mem, int):
2237     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2238                              " was '%s'" % (node, free_mem))
2239   if requested > free_mem:
2240     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2241                              " needed %s MiB, available %s MiB" %
2242                              (node, reason, requested, free_mem))
2243
2244
2245 class LUStartupInstance(LogicalUnit):
2246   """Starts an instance.
2247
2248   """
2249   HPATH = "instance-start"
2250   HTYPE = constants.HTYPE_INSTANCE
2251   _OP_REQP = ["instance_name", "force"]
2252   REQ_BGL = False
2253
2254   def ExpandNames(self):
2255     self._ExpandAndLockInstance()
2256     self.needed_locks[locking.LEVEL_NODE] = []
2257     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2258
2259   def DeclareLocks(self, level):
2260     if level == locking.LEVEL_NODE:
2261       self._LockInstancesNodes()
2262
2263   def BuildHooksEnv(self):
2264     """Build hooks env.
2265
2266     This runs on master, primary and secondary nodes of the instance.
2267
2268     """
2269     env = {
2270       "FORCE": self.op.force,
2271       }
2272     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2273     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2274           list(self.instance.secondary_nodes))
2275     return env, nl, nl
2276
2277   def CheckPrereq(self):
2278     """Check prerequisites.
2279
2280     This checks that the instance is in the cluster.
2281
2282     """
2283     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2284     assert self.instance is not None, \
2285       "Cannot retrieve locked instance %s" % self.op.instance_name
2286
2287     bep = self.cfg.GetClusterInfo().FillBE(instance)
2288     # check bridges existance
2289     _CheckInstanceBridgesExist(self, instance)
2290
2291     _CheckNodeFreeMemory(self, instance.primary_node,
2292                          "starting instance %s" % instance.name,
2293                          bep[constants.BE_MEMORY], instance.hypervisor)
2294
2295   def Exec(self, feedback_fn):
2296     """Start the instance.
2297
2298     """
2299     instance = self.instance
2300     force = self.op.force
2301     extra_args = getattr(self.op, "extra_args", "")
2302
2303     self.cfg.MarkInstanceUp(instance.name)
2304
2305     node_current = instance.primary_node
2306
2307     _StartInstanceDisks(self, instance, force)
2308
2309     if not self.rpc.call_instance_start(node_current, instance, extra_args):
2310       _ShutdownInstanceDisks(self, instance)
2311       raise errors.OpExecError("Could not start instance")
2312
2313
2314 class LURebootInstance(LogicalUnit):
2315   """Reboot an instance.
2316
2317   """
2318   HPATH = "instance-reboot"
2319   HTYPE = constants.HTYPE_INSTANCE
2320   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2321   REQ_BGL = False
2322
2323   def ExpandNames(self):
2324     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2325                                    constants.INSTANCE_REBOOT_HARD,
2326                                    constants.INSTANCE_REBOOT_FULL]:
2327       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2328                                   (constants.INSTANCE_REBOOT_SOFT,
2329                                    constants.INSTANCE_REBOOT_HARD,
2330                                    constants.INSTANCE_REBOOT_FULL))
2331     self._ExpandAndLockInstance()
2332     self.needed_locks[locking.LEVEL_NODE] = []
2333     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2334
2335   def DeclareLocks(self, level):
2336     if level == locking.LEVEL_NODE:
2337       primary_only = not constants.INSTANCE_REBOOT_FULL
2338       self._LockInstancesNodes(primary_only=primary_only)
2339
2340   def BuildHooksEnv(self):
2341     """Build hooks env.
2342
2343     This runs on master, primary and secondary nodes of the instance.
2344
2345     """
2346     env = {
2347       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2348       }
2349     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2350     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2351           list(self.instance.secondary_nodes))
2352     return env, nl, nl
2353
2354   def CheckPrereq(self):
2355     """Check prerequisites.
2356
2357     This checks that the instance is in the cluster.
2358
2359     """
2360     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2361     assert self.instance is not None, \
2362       "Cannot retrieve locked instance %s" % self.op.instance_name
2363
2364     # check bridges existance
2365     _CheckInstanceBridgesExist(self, instance)
2366
2367   def Exec(self, feedback_fn):
2368     """Reboot the instance.
2369
2370     """
2371     instance = self.instance
2372     ignore_secondaries = self.op.ignore_secondaries
2373     reboot_type = self.op.reboot_type
2374     extra_args = getattr(self.op, "extra_args", "")
2375
2376     node_current = instance.primary_node
2377
2378     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2379                        constants.INSTANCE_REBOOT_HARD]:
2380       if not self.rpc.call_instance_reboot(node_current, instance,
2381                                            reboot_type, extra_args):
2382         raise errors.OpExecError("Could not reboot instance")
2383     else:
2384       if not self.rpc.call_instance_shutdown(node_current, instance):
2385         raise errors.OpExecError("could not shutdown instance for full reboot")
2386       _ShutdownInstanceDisks(self, instance)
2387       _StartInstanceDisks(self, instance, ignore_secondaries)
2388       if not self.rpc.call_instance_start(node_current, instance, extra_args):
2389         _ShutdownInstanceDisks(self, instance)
2390         raise errors.OpExecError("Could not start instance for full reboot")
2391
2392     self.cfg.MarkInstanceUp(instance.name)
2393
2394
2395 class LUShutdownInstance(LogicalUnit):
2396   """Shutdown an instance.
2397
2398   """
2399   HPATH = "instance-stop"
2400   HTYPE = constants.HTYPE_INSTANCE
2401   _OP_REQP = ["instance_name"]
2402   REQ_BGL = False
2403
2404   def ExpandNames(self):
2405     self._ExpandAndLockInstance()
2406     self.needed_locks[locking.LEVEL_NODE] = []
2407     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2408
2409   def DeclareLocks(self, level):
2410     if level == locking.LEVEL_NODE:
2411       self._LockInstancesNodes()
2412
2413   def BuildHooksEnv(self):
2414     """Build hooks env.
2415
2416     This runs on master, primary and secondary nodes of the instance.
2417
2418     """
2419     env = _BuildInstanceHookEnvByObject(self, self.instance)
2420     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2421           list(self.instance.secondary_nodes))
2422     return env, nl, nl
2423
2424   def CheckPrereq(self):
2425     """Check prerequisites.
2426
2427     This checks that the instance is in the cluster.
2428
2429     """
2430     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2431     assert self.instance is not None, \
2432       "Cannot retrieve locked instance %s" % self.op.instance_name
2433
2434   def Exec(self, feedback_fn):
2435     """Shutdown the instance.
2436
2437     """
2438     instance = self.instance
2439     node_current = instance.primary_node
2440     self.cfg.MarkInstanceDown(instance.name)
2441     if not self.rpc.call_instance_shutdown(node_current, instance):
2442       self.proc.LogWarning("Could not shutdown instance")
2443
2444     _ShutdownInstanceDisks(self, instance)
2445
2446
2447 class LUReinstallInstance(LogicalUnit):
2448   """Reinstall an instance.
2449
2450   """
2451   HPATH = "instance-reinstall"
2452   HTYPE = constants.HTYPE_INSTANCE
2453   _OP_REQP = ["instance_name"]
2454   REQ_BGL = False
2455
2456   def ExpandNames(self):
2457     self._ExpandAndLockInstance()
2458     self.needed_locks[locking.LEVEL_NODE] = []
2459     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2460
2461   def DeclareLocks(self, level):
2462     if level == locking.LEVEL_NODE:
2463       self._LockInstancesNodes()
2464
2465   def BuildHooksEnv(self):
2466     """Build hooks env.
2467
2468     This runs on master, primary and secondary nodes of the instance.
2469
2470     """
2471     env = _BuildInstanceHookEnvByObject(self, self.instance)
2472     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2473           list(self.instance.secondary_nodes))
2474     return env, nl, nl
2475
2476   def CheckPrereq(self):
2477     """Check prerequisites.
2478
2479     This checks that the instance is in the cluster and is not running.
2480
2481     """
2482     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2483     assert instance is not None, \
2484       "Cannot retrieve locked instance %s" % self.op.instance_name
2485
2486     if instance.disk_template == constants.DT_DISKLESS:
2487       raise errors.OpPrereqError("Instance '%s' has no disks" %
2488                                  self.op.instance_name)
2489     if instance.status != "down":
2490       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2491                                  self.op.instance_name)
2492     remote_info = self.rpc.call_instance_info(instance.primary_node,
2493                                               instance.name,
2494                                               instance.hypervisor)
2495     if remote_info:
2496       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2497                                  (self.op.instance_name,
2498                                   instance.primary_node))
2499
2500     self.op.os_type = getattr(self.op, "os_type", None)
2501     if self.op.os_type is not None:
2502       # OS verification
2503       pnode = self.cfg.GetNodeInfo(
2504         self.cfg.ExpandNodeName(instance.primary_node))
2505       if pnode is None:
2506         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2507                                    self.op.pnode)
2508       os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2509       if not os_obj:
2510         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2511                                    " primary node"  % self.op.os_type)
2512
2513     self.instance = instance
2514
2515   def Exec(self, feedback_fn):
2516     """Reinstall the instance.
2517
2518     """
2519     inst = self.instance
2520
2521     if self.op.os_type is not None:
2522       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2523       inst.os = self.op.os_type
2524       self.cfg.Update(inst)
2525
2526     _StartInstanceDisks(self, inst, None)
2527     try:
2528       feedback_fn("Running the instance OS create scripts...")
2529       if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2530         raise errors.OpExecError("Could not install OS for instance %s"
2531                                  " on node %s" %
2532                                  (inst.name, inst.primary_node))
2533     finally:
2534       _ShutdownInstanceDisks(self, inst)
2535
2536
2537 class LURenameInstance(LogicalUnit):
2538   """Rename an instance.
2539
2540   """
2541   HPATH = "instance-rename"
2542   HTYPE = constants.HTYPE_INSTANCE
2543   _OP_REQP = ["instance_name", "new_name"]
2544
2545   def BuildHooksEnv(self):
2546     """Build hooks env.
2547
2548     This runs on master, primary and secondary nodes of the instance.
2549
2550     """
2551     env = _BuildInstanceHookEnvByObject(self, self.instance)
2552     env["INSTANCE_NEW_NAME"] = self.op.new_name
2553     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2554           list(self.instance.secondary_nodes))
2555     return env, nl, nl
2556
2557   def CheckPrereq(self):
2558     """Check prerequisites.
2559
2560     This checks that the instance is in the cluster and is not running.
2561
2562     """
2563     instance = self.cfg.GetInstanceInfo(
2564       self.cfg.ExpandInstanceName(self.op.instance_name))
2565     if instance is None:
2566       raise errors.OpPrereqError("Instance '%s' not known" %
2567                                  self.op.instance_name)
2568     if instance.status != "down":
2569       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2570                                  self.op.instance_name)
2571     remote_info = self.rpc.call_instance_info(instance.primary_node,
2572                                               instance.name,
2573                                               instance.hypervisor)
2574     if remote_info:
2575       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2576                                  (self.op.instance_name,
2577                                   instance.primary_node))
2578     self.instance = instance
2579
2580     # new name verification
2581     name_info = utils.HostInfo(self.op.new_name)
2582
2583     self.op.new_name = new_name = name_info.name
2584     instance_list = self.cfg.GetInstanceList()
2585     if new_name in instance_list:
2586       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2587                                  new_name)
2588
2589     if not getattr(self.op, "ignore_ip", False):
2590       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2591         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2592                                    (name_info.ip, new_name))
2593
2594
2595   def Exec(self, feedback_fn):
2596     """Reinstall the instance.
2597
2598     """
2599     inst = self.instance
2600     old_name = inst.name
2601
2602     if inst.disk_template == constants.DT_FILE:
2603       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2604
2605     self.cfg.RenameInstance(inst.name, self.op.new_name)
2606     # Change the instance lock. This is definitely safe while we hold the BGL
2607     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2608     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2609
2610     # re-read the instance from the configuration after rename
2611     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2612
2613     if inst.disk_template == constants.DT_FILE:
2614       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2615       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2616                                                      old_file_storage_dir,
2617                                                      new_file_storage_dir)
2618
2619       if not result:
2620         raise errors.OpExecError("Could not connect to node '%s' to rename"
2621                                  " directory '%s' to '%s' (but the instance"
2622                                  " has been renamed in Ganeti)" % (
2623                                  inst.primary_node, old_file_storage_dir,
2624                                  new_file_storage_dir))
2625
2626       if not result[0]:
2627         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2628                                  " (but the instance has been renamed in"
2629                                  " Ganeti)" % (old_file_storage_dir,
2630                                                new_file_storage_dir))
2631
2632     _StartInstanceDisks(self, inst, None)
2633     try:
2634       if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2635                                                old_name):
2636         msg = ("Could not run OS rename script for instance %s on node %s"
2637                " (but the instance has been renamed in Ganeti)" %
2638                (inst.name, inst.primary_node))
2639         self.proc.LogWarning(msg)
2640     finally:
2641       _ShutdownInstanceDisks(self, inst)
2642
2643
2644 class LURemoveInstance(LogicalUnit):
2645   """Remove an instance.
2646
2647   """
2648   HPATH = "instance-remove"
2649   HTYPE = constants.HTYPE_INSTANCE
2650   _OP_REQP = ["instance_name", "ignore_failures"]
2651   REQ_BGL = False
2652
2653   def ExpandNames(self):
2654     self._ExpandAndLockInstance()
2655     self.needed_locks[locking.LEVEL_NODE] = []
2656     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2657
2658   def DeclareLocks(self, level):
2659     if level == locking.LEVEL_NODE:
2660       self._LockInstancesNodes()
2661
2662   def BuildHooksEnv(self):
2663     """Build hooks env.
2664
2665     This runs on master, primary and secondary nodes of the instance.
2666
2667     """
2668     env = _BuildInstanceHookEnvByObject(self, self.instance)
2669     nl = [self.cfg.GetMasterNode()]
2670     return env, nl, nl
2671
2672   def CheckPrereq(self):
2673     """Check prerequisites.
2674
2675     This checks that the instance is in the cluster.
2676
2677     """
2678     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2679     assert self.instance is not None, \
2680       "Cannot retrieve locked instance %s" % self.op.instance_name
2681
2682   def Exec(self, feedback_fn):
2683     """Remove the instance.
2684
2685     """
2686     instance = self.instance
2687     logging.info("Shutting down instance %s on node %s",
2688                  instance.name, instance.primary_node)
2689
2690     if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2691       if self.op.ignore_failures:
2692         feedback_fn("Warning: can't shutdown instance")
2693       else:
2694         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2695                                  (instance.name, instance.primary_node))
2696
2697     logging.info("Removing block devices for instance %s", instance.name)
2698
2699     if not _RemoveDisks(self, instance):
2700       if self.op.ignore_failures:
2701         feedback_fn("Warning: can't remove instance's disks")
2702       else:
2703         raise errors.OpExecError("Can't remove instance's disks")
2704
2705     logging.info("Removing instance %s out of cluster config", instance.name)
2706
2707     self.cfg.RemoveInstance(instance.name)
2708     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2709
2710
2711 class LUQueryInstances(NoHooksLU):
2712   """Logical unit for querying instances.
2713
2714   """
2715   _OP_REQP = ["output_fields", "names"]
2716   REQ_BGL = False
2717   _FIELDS_STATIC = _FieldSet(*["name", "os", "pnode", "snodes",
2718                                "admin_state", "admin_ram",
2719                                "disk_template", "ip", "mac", "bridge",
2720                                "sda_size", "sdb_size", "vcpus", "tags",
2721                                "network_port", "beparams",
2722                                "(disk).(size)/([0-9]+)",
2723                                "(nic).(mac|ip|bridge)/([0-9]+)",
2724                                "(disk|nic).(count)",
2725                                "serial_no", "hypervisor", "hvparams",] +
2726                              ["hv/%s" % name
2727                               for name in constants.HVS_PARAMETERS] +
2728                              ["be/%s" % name
2729                               for name in constants.BES_PARAMETERS])
2730   _FIELDS_DYNAMIC = _FieldSet("oper_state", "oper_ram", "status")
2731
2732
2733   def ExpandNames(self):
2734     _CheckOutputFields(static=self._FIELDS_STATIC,
2735                        dynamic=self._FIELDS_DYNAMIC,
2736                        selected=self.op.output_fields)
2737
2738     self.needed_locks = {}
2739     self.share_locks[locking.LEVEL_INSTANCE] = 1
2740     self.share_locks[locking.LEVEL_NODE] = 1
2741
2742     if self.op.names:
2743       self.wanted = _GetWantedInstances(self, self.op.names)
2744     else:
2745       self.wanted = locking.ALL_SET
2746
2747     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2748     if self.do_locking:
2749       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2750       self.needed_locks[locking.LEVEL_NODE] = []
2751       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2752
2753   def DeclareLocks(self, level):
2754     if level == locking.LEVEL_NODE and self.do_locking:
2755       self._LockInstancesNodes()
2756
2757   def CheckPrereq(self):
2758     """Check prerequisites.
2759
2760     """
2761     pass
2762
2763   def Exec(self, feedback_fn):
2764     """Computes the list of nodes and their attributes.
2765
2766     """
2767     all_info = self.cfg.GetAllInstancesInfo()
2768     if self.do_locking:
2769       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2770     elif self.wanted != locking.ALL_SET:
2771       instance_names = self.wanted
2772       missing = set(instance_names).difference(all_info.keys())
2773       if missing:
2774         raise errors.OpExecError(
2775           "Some instances were removed before retrieving their data: %s"
2776           % missing)
2777     else:
2778       instance_names = all_info.keys()
2779
2780     instance_names = utils.NiceSort(instance_names)
2781     instance_list = [all_info[iname] for iname in instance_names]
2782
2783     # begin data gathering
2784
2785     nodes = frozenset([inst.primary_node for inst in instance_list])
2786     hv_list = list(set([inst.hypervisor for inst in instance_list]))
2787
2788     bad_nodes = []
2789     if self.do_locking:
2790       live_data = {}
2791       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2792       for name in nodes:
2793         result = node_data[name]
2794         if result:
2795           live_data.update(result)
2796         elif result == False:
2797           bad_nodes.append(name)
2798         # else no instance is alive
2799     else:
2800       live_data = dict([(name, {}) for name in instance_names])
2801
2802     # end data gathering
2803
2804     HVPREFIX = "hv/"
2805     BEPREFIX = "be/"
2806     output = []
2807     for instance in instance_list:
2808       iout = []
2809       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2810       i_be = self.cfg.GetClusterInfo().FillBE(instance)
2811       for field in self.op.output_fields:
2812         st_match = self._FIELDS_STATIC.Matches(field)
2813         if field == "name":
2814           val = instance.name
2815         elif field == "os":
2816           val = instance.os
2817         elif field == "pnode":
2818           val = instance.primary_node
2819         elif field == "snodes":
2820           val = list(instance.secondary_nodes)
2821         elif field == "admin_state":
2822           val = (instance.status != "down")
2823         elif field == "oper_state":
2824           if instance.primary_node in bad_nodes:
2825             val = None
2826           else:
2827             val = bool(live_data.get(instance.name))
2828         elif field == "status":
2829           if instance.primary_node in bad_nodes:
2830             val = "ERROR_nodedown"
2831           else:
2832             running = bool(live_data.get(instance.name))
2833             if running:
2834               if instance.status != "down":
2835                 val = "running"
2836               else:
2837                 val = "ERROR_up"
2838             else:
2839               if instance.status != "down":
2840                 val = "ERROR_down"
2841               else:
2842                 val = "ADMIN_down"
2843         elif field == "oper_ram":
2844           if instance.primary_node in bad_nodes:
2845             val = None
2846           elif instance.name in live_data:
2847             val = live_data[instance.name].get("memory", "?")
2848           else:
2849             val = "-"
2850         elif field == "disk_template":
2851           val = instance.disk_template
2852         elif field == "ip":
2853           val = instance.nics[0].ip
2854         elif field == "bridge":
2855           val = instance.nics[0].bridge
2856         elif field == "mac":
2857           val = instance.nics[0].mac
2858         elif field == "sda_size" or field == "sdb_size":
2859           disk = instance.FindDisk(field[:3])
2860           if disk is None:
2861             val = None
2862           else:
2863             val = disk.size
2864         elif field == "tags":
2865           val = list(instance.GetTags())
2866         elif field == "serial_no":
2867           val = instance.serial_no
2868         elif field == "network_port":
2869           val = instance.network_port
2870         elif field == "hypervisor":
2871           val = instance.hypervisor
2872         elif field == "hvparams":
2873           val = i_hv
2874         elif (field.startswith(HVPREFIX) and
2875               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2876           val = i_hv.get(field[len(HVPREFIX):], None)
2877         elif field == "beparams":
2878           val = i_be
2879         elif (field.startswith(BEPREFIX) and
2880               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2881           val = i_be.get(field[len(BEPREFIX):], None)
2882         elif st_match and st_match.groups():
2883           # matches a variable list
2884           st_groups = st_match.groups()
2885           if st_groups and st_groups[0] == "disk":
2886             if st_groups[1] == "count":
2887               val = len(instance.disks)
2888             elif st_groups[1] == "size":
2889               disk_idx = int(st_groups[2])
2890               if disk_idx >= len(instance.disks):
2891                 val = None
2892               else:
2893                 val = instance.disks[disk_idx].size
2894             else:
2895               assert False, "Unhandled disk parameter"
2896           elif st_groups[0] == "nic":
2897             if st_groups[1] == "count":
2898               val = len(instance.nics)
2899             else:
2900               # index-based item
2901               nic_idx = int(st_groups[2])
2902               if nic_idx >= len(instance.nics):
2903                 val = None
2904               else:
2905                 if st_groups[1] == "mac":
2906                   val = instance.nics[nic_idx].mac
2907                 elif st_groups[1] == "ip":
2908                   val = instance.nics[nic_idx].ip
2909                 elif st_groups[1] == "bridge":
2910                   val = instance.nics[nic_idx].bridge
2911                 else:
2912                   assert False, "Unhandled NIC parameter"
2913           else:
2914             assert False, "Unhandled variable parameter"
2915         else:
2916           raise errors.ParameterError(field)
2917         iout.append(val)
2918       output.append(iout)
2919
2920     return output
2921
2922
2923 class LUFailoverInstance(LogicalUnit):
2924   """Failover an instance.
2925
2926   """
2927   HPATH = "instance-failover"
2928   HTYPE = constants.HTYPE_INSTANCE
2929   _OP_REQP = ["instance_name", "ignore_consistency"]
2930   REQ_BGL = False
2931
2932   def ExpandNames(self):
2933     self._ExpandAndLockInstance()
2934     self.needed_locks[locking.LEVEL_NODE] = []
2935     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2936
2937   def DeclareLocks(self, level):
2938     if level == locking.LEVEL_NODE:
2939       self._LockInstancesNodes()
2940
2941   def BuildHooksEnv(self):
2942     """Build hooks env.
2943
2944     This runs on master, primary and secondary nodes of the instance.
2945
2946     """
2947     env = {
2948       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2949       }
2950     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2951     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2952     return env, nl, nl
2953
2954   def CheckPrereq(self):
2955     """Check prerequisites.
2956
2957     This checks that the instance is in the cluster.
2958
2959     """
2960     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2961     assert self.instance is not None, \
2962       "Cannot retrieve locked instance %s" % self.op.instance_name
2963
2964     bep = self.cfg.GetClusterInfo().FillBE(instance)
2965     if instance.disk_template not in constants.DTS_NET_MIRROR:
2966       raise errors.OpPrereqError("Instance's disk layout is not"
2967                                  " network mirrored, cannot failover.")
2968
2969     secondary_nodes = instance.secondary_nodes
2970     if not secondary_nodes:
2971       raise errors.ProgrammerError("no secondary node but using "
2972                                    "a mirrored disk template")
2973
2974     target_node = secondary_nodes[0]
2975     # check memory requirements on the secondary node
2976     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2977                          instance.name, bep[constants.BE_MEMORY],
2978                          instance.hypervisor)
2979
2980     # check bridge existance
2981     brlist = [nic.bridge for nic in instance.nics]
2982     if not self.rpc.call_bridges_exist(target_node, brlist):
2983       raise errors.OpPrereqError("One or more target bridges %s does not"
2984                                  " exist on destination node '%s'" %
2985                                  (brlist, target_node))
2986
2987   def Exec(self, feedback_fn):
2988     """Failover an instance.
2989
2990     The failover is done by shutting it down on its present node and
2991     starting it on the secondary.
2992
2993     """
2994     instance = self.instance
2995
2996     source_node = instance.primary_node
2997     target_node = instance.secondary_nodes[0]
2998
2999     feedback_fn("* checking disk consistency between source and target")
3000     for dev in instance.disks:
3001       # for drbd, these are drbd over lvm
3002       if not _CheckDiskConsistency(self, dev, target_node, False):
3003         if instance.status == "up" and not self.op.ignore_consistency:
3004           raise errors.OpExecError("Disk %s is degraded on target node,"
3005                                    " aborting failover." % dev.iv_name)
3006
3007     feedback_fn("* shutting down instance on source node")
3008     logging.info("Shutting down instance %s on node %s",
3009                  instance.name, source_node)
3010
3011     if not self.rpc.call_instance_shutdown(source_node, instance):
3012       if self.op.ignore_consistency:
3013         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3014                              " Proceeding"
3015                              " anyway. Please make sure node %s is down",
3016                              instance.name, source_node, source_node)
3017       else:
3018         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3019                                  (instance.name, source_node))
3020
3021     feedback_fn("* deactivating the instance's disks on source node")
3022     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3023       raise errors.OpExecError("Can't shut down the instance's disks.")
3024
3025     instance.primary_node = target_node
3026     # distribute new instance config to the other nodes
3027     self.cfg.Update(instance)
3028
3029     # Only start the instance if it's marked as up
3030     if instance.status == "up":
3031       feedback_fn("* activating the instance's disks on target node")
3032       logging.info("Starting instance %s on node %s",
3033                    instance.name, target_node)
3034
3035       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3036                                                ignore_secondaries=True)
3037       if not disks_ok:
3038         _ShutdownInstanceDisks(self, instance)
3039         raise errors.OpExecError("Can't activate the instance's disks")
3040
3041       feedback_fn("* starting the instance on the target node")
3042       if not self.rpc.call_instance_start(target_node, instance, None):
3043         _ShutdownInstanceDisks(self, instance)
3044         raise errors.OpExecError("Could not start instance %s on node %s." %
3045                                  (instance.name, target_node))
3046
3047
3048 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3049   """Create a tree of block devices on the primary node.
3050
3051   This always creates all devices.
3052
3053   """
3054   if device.children:
3055     for child in device.children:
3056       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3057         return False
3058
3059   lu.cfg.SetDiskID(device, node)
3060   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3061                                        instance.name, True, info)
3062   if not new_id:
3063     return False
3064   if device.physical_id is None:
3065     device.physical_id = new_id
3066   return True
3067
3068
3069 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3070   """Create a tree of block devices on a secondary node.
3071
3072   If this device type has to be created on secondaries, create it and
3073   all its children.
3074
3075   If not, just recurse to children keeping the same 'force' value.
3076
3077   """
3078   if device.CreateOnSecondary():
3079     force = True
3080   if device.children:
3081     for child in device.children:
3082       if not _CreateBlockDevOnSecondary(lu, node, instance,
3083                                         child, force, info):
3084         return False
3085
3086   if not force:
3087     return True
3088   lu.cfg.SetDiskID(device, node)
3089   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3090                                        instance.name, False, info)
3091   if not new_id:
3092     return False
3093   if device.physical_id is None:
3094     device.physical_id = new_id
3095   return True
3096
3097
3098 def _GenerateUniqueNames(lu, exts):
3099   """Generate a suitable LV name.
3100
3101   This will generate a logical volume name for the given instance.
3102
3103   """
3104   results = []
3105   for val in exts:
3106     new_id = lu.cfg.GenerateUniqueID()
3107     results.append("%s%s" % (new_id, val))
3108   return results
3109
3110
3111 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3112                          p_minor, s_minor):
3113   """Generate a drbd8 device complete with its children.
3114
3115   """
3116   port = lu.cfg.AllocatePort()
3117   vgname = lu.cfg.GetVGName()
3118   shared_secret = lu.cfg.GenerateDRBDSecret()
3119   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3120                           logical_id=(vgname, names[0]))
3121   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3122                           logical_id=(vgname, names[1]))
3123   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3124                           logical_id=(primary, secondary, port,
3125                                       p_minor, s_minor,
3126                                       shared_secret),
3127                           children=[dev_data, dev_meta],
3128                           iv_name=iv_name)
3129   return drbd_dev
3130
3131
3132 def _GenerateDiskTemplate(lu, template_name,
3133                           instance_name, primary_node,
3134                           secondary_nodes, disk_sz, swap_sz,
3135                           file_storage_dir, file_driver):
3136   """Generate the entire disk layout for a given template type.
3137
3138   """
3139   #TODO: compute space requirements
3140
3141   vgname = lu.cfg.GetVGName()
3142   if template_name == constants.DT_DISKLESS:
3143     disks = []
3144   elif template_name == constants.DT_PLAIN:
3145     if len(secondary_nodes) != 0:
3146       raise errors.ProgrammerError("Wrong template configuration")
3147
3148     names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3149     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3150                            logical_id=(vgname, names[0]),
3151                            iv_name = "sda")
3152     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3153                            logical_id=(vgname, names[1]),
3154                            iv_name = "sdb")
3155     disks = [sda_dev, sdb_dev]
3156   elif template_name == constants.DT_DRBD8:
3157     if len(secondary_nodes) != 1:
3158       raise errors.ProgrammerError("Wrong template configuration")
3159     remote_node = secondary_nodes[0]
3160     (minor_pa, minor_pb,
3161      minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3162       [primary_node, primary_node, remote_node, remote_node], instance_name)
3163
3164     names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3165                                       ".sdb_data", ".sdb_meta"])
3166     drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3167                                         disk_sz, names[0:2], "sda",
3168                                         minor_pa, minor_sa)
3169     drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3170                                         swap_sz, names[2:4], "sdb",
3171                                         minor_pb, minor_sb)
3172     disks = [drbd_sda_dev, drbd_sdb_dev]
3173   elif template_name == constants.DT_FILE:
3174     if len(secondary_nodes) != 0:
3175       raise errors.ProgrammerError("Wrong template configuration")
3176
3177     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3178                                 iv_name="sda", logical_id=(file_driver,
3179                                 "%s/sda" % file_storage_dir))
3180     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3181                                 iv_name="sdb", logical_id=(file_driver,
3182                                 "%s/sdb" % file_storage_dir))
3183     disks = [file_sda_dev, file_sdb_dev]
3184   else:
3185     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3186   return disks
3187
3188
3189 def _GetInstanceInfoText(instance):
3190   """Compute that text that should be added to the disk's metadata.
3191
3192   """
3193   return "originstname+%s" % instance.name
3194
3195
3196 def _CreateDisks(lu, instance):
3197   """Create all disks for an instance.
3198
3199   This abstracts away some work from AddInstance.
3200
3201   Args:
3202     instance: the instance object
3203
3204   Returns:
3205     True or False showing the success of the creation process
3206
3207   """
3208   info = _GetInstanceInfoText(instance)
3209
3210   if instance.disk_template == constants.DT_FILE:
3211     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3212     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3213                                                  file_storage_dir)
3214
3215     if not result:
3216       logging.error("Could not connect to node '%s'", instance.primary_node)
3217       return False
3218
3219     if not result[0]:
3220       logging.error("Failed to create directory '%s'", file_storage_dir)
3221       return False
3222
3223   for device in instance.disks:
3224     logging.info("Creating volume %s for instance %s",
3225                  device.iv_name, instance.name)
3226     #HARDCODE
3227     for secondary_node in instance.secondary_nodes:
3228       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3229                                         device, False, info):
3230         logging.error("Failed to create volume %s (%s) on secondary node %s!",
3231                       device.iv_name, device, secondary_node)
3232         return False
3233     #HARDCODE
3234     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3235                                     instance, device, info):
3236       logging.error("Failed to create volume %s on primary!", device.iv_name)
3237       return False
3238
3239   return True
3240
3241
3242 def _RemoveDisks(lu, instance):
3243   """Remove all disks for an instance.
3244
3245   This abstracts away some work from `AddInstance()` and
3246   `RemoveInstance()`. Note that in case some of the devices couldn't
3247   be removed, the removal will continue with the other ones (compare
3248   with `_CreateDisks()`).
3249
3250   Args:
3251     instance: the instance object
3252
3253   Returns:
3254     True or False showing the success of the removal proces
3255
3256   """
3257   logging.info("Removing block devices for instance %s", instance.name)
3258
3259   result = True
3260   for device in instance.disks:
3261     for node, disk in device.ComputeNodeTree(instance.primary_node):
3262       lu.cfg.SetDiskID(disk, node)
3263       if not lu.rpc.call_blockdev_remove(node, disk):
3264         lu.proc.LogWarning("Could not remove block device %s on node %s,"
3265                            " continuing anyway", device.iv_name, node)
3266         result = False
3267
3268   if instance.disk_template == constants.DT_FILE:
3269     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3270     if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3271                                                file_storage_dir):
3272       logging.error("Could not remove directory '%s'", file_storage_dir)
3273       result = False
3274
3275   return result
3276
3277
3278 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3279   """Compute disk size requirements in the volume group
3280
3281   This is currently hard-coded for the two-drive layout.
3282
3283   """
3284   # Required free disk space as a function of disk and swap space
3285   req_size_dict = {
3286     constants.DT_DISKLESS: None,
3287     constants.DT_PLAIN: disk_size + swap_size,
3288     # 256 MB are added for drbd metadata, 128MB for each drbd device
3289     constants.DT_DRBD8: disk_size + swap_size + 256,
3290     constants.DT_FILE: None,
3291   }
3292
3293   if disk_template not in req_size_dict:
3294     raise errors.ProgrammerError("Disk template '%s' size requirement"
3295                                  " is unknown" %  disk_template)
3296
3297   return req_size_dict[disk_template]
3298
3299
3300 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3301   """Hypervisor parameter validation.
3302
3303   This function abstract the hypervisor parameter validation to be
3304   used in both instance create and instance modify.
3305
3306   @type lu: L{LogicalUnit}
3307   @param lu: the logical unit for which we check
3308   @type nodenames: list
3309   @param nodenames: the list of nodes on which we should check
3310   @type hvname: string
3311   @param hvname: the name of the hypervisor we should use
3312   @type hvparams: dict
3313   @param hvparams: the parameters which we need to check
3314   @raise errors.OpPrereqError: if the parameters are not valid
3315
3316   """
3317   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3318                                                   hvname,
3319                                                   hvparams)
3320   for node in nodenames:
3321     info = hvinfo.get(node, None)
3322     if not info or not isinstance(info, (tuple, list)):
3323       raise errors.OpPrereqError("Cannot get current information"
3324                                  " from node '%s' (%s)" % (node, info))
3325     if not info[0]:
3326       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3327                                  " %s" % info[1])
3328
3329
3330 class LUCreateInstance(LogicalUnit):
3331   """Create an instance.
3332
3333   """
3334   HPATH = "instance-add"
3335   HTYPE = constants.HTYPE_INSTANCE
3336   _OP_REQP = ["instance_name", "disk_size",
3337               "disk_template", "swap_size", "mode", "start",
3338               "wait_for_sync", "ip_check", "mac",
3339               "hvparams", "beparams"]
3340   REQ_BGL = False
3341
3342   def _ExpandNode(self, node):
3343     """Expands and checks one node name.
3344
3345     """
3346     node_full = self.cfg.ExpandNodeName(node)
3347     if node_full is None:
3348       raise errors.OpPrereqError("Unknown node %s" % node)
3349     return node_full
3350
3351   def ExpandNames(self):
3352     """ExpandNames for CreateInstance.
3353
3354     Figure out the right locks for instance creation.
3355
3356     """
3357     self.needed_locks = {}
3358
3359     # set optional parameters to none if they don't exist
3360     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3361       if not hasattr(self.op, attr):
3362         setattr(self.op, attr, None)
3363
3364     # cheap checks, mostly valid constants given
3365
3366     # verify creation mode
3367     if self.op.mode not in (constants.INSTANCE_CREATE,
3368                             constants.INSTANCE_IMPORT):
3369       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3370                                  self.op.mode)
3371
3372     # disk template and mirror node verification
3373     if self.op.disk_template not in constants.DISK_TEMPLATES:
3374       raise errors.OpPrereqError("Invalid disk template name")
3375
3376     if self.op.hypervisor is None:
3377       self.op.hypervisor = self.cfg.GetHypervisorType()
3378
3379     cluster = self.cfg.GetClusterInfo()
3380     enabled_hvs = cluster.enabled_hypervisors
3381     if self.op.hypervisor not in enabled_hvs:
3382       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3383                                  " cluster (%s)" % (self.op.hypervisor,
3384                                   ",".join(enabled_hvs)))
3385
3386     # check hypervisor parameter syntax (locally)
3387
3388     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3389                                   self.op.hvparams)
3390     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3391     hv_type.CheckParameterSyntax(filled_hvp)
3392
3393     # fill and remember the beparams dict
3394     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3395                                     self.op.beparams)
3396
3397     #### instance parameters check
3398
3399     # instance name verification
3400     hostname1 = utils.HostInfo(self.op.instance_name)
3401     self.op.instance_name = instance_name = hostname1.name
3402
3403     # this is just a preventive check, but someone might still add this
3404     # instance in the meantime, and creation will fail at lock-add time
3405     if instance_name in self.cfg.GetInstanceList():
3406       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3407                                  instance_name)
3408
3409     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3410
3411     # ip validity checks
3412     ip = getattr(self.op, "ip", None)
3413     if ip is None or ip.lower() == "none":
3414       inst_ip = None
3415     elif ip.lower() == constants.VALUE_AUTO:
3416       inst_ip = hostname1.ip
3417     else:
3418       if not utils.IsValidIP(ip):
3419         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3420                                    " like a valid IP" % ip)
3421       inst_ip = ip
3422     self.inst_ip = self.op.ip = inst_ip
3423     # used in CheckPrereq for ip ping check
3424     self.check_ip = hostname1.ip
3425
3426     # MAC address verification
3427     if self.op.mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3428       if not utils.IsValidMac(self.op.mac.lower()):
3429         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3430                                    self.op.mac)
3431
3432     # file storage checks
3433     if (self.op.file_driver and
3434         not self.op.file_driver in constants.FILE_DRIVER):
3435       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3436                                  self.op.file_driver)
3437
3438     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3439       raise errors.OpPrereqError("File storage directory path not absolute")
3440
3441     ### Node/iallocator related checks
3442     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3443       raise errors.OpPrereqError("One and only one of iallocator and primary"
3444                                  " node must be given")
3445
3446     if self.op.iallocator:
3447       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3448     else:
3449       self.op.pnode = self._ExpandNode(self.op.pnode)
3450       nodelist = [self.op.pnode]
3451       if self.op.snode is not None:
3452         self.op.snode = self._ExpandNode(self.op.snode)
3453         nodelist.append(self.op.snode)
3454       self.needed_locks[locking.LEVEL_NODE] = nodelist
3455
3456     # in case of import lock the source node too
3457     if self.op.mode == constants.INSTANCE_IMPORT:
3458       src_node = getattr(self.op, "src_node", None)
3459       src_path = getattr(self.op, "src_path", None)
3460
3461       if src_node is None or src_path is None:
3462         raise errors.OpPrereqError("Importing an instance requires source"
3463                                    " node and path options")
3464
3465       if not os.path.isabs(src_path):
3466         raise errors.OpPrereqError("The source path must be absolute")
3467
3468       self.op.src_node = src_node = self._ExpandNode(src_node)
3469       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3470         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3471
3472     else: # INSTANCE_CREATE
3473       if getattr(self.op, "os_type", None) is None:
3474         raise errors.OpPrereqError("No guest OS specified")
3475
3476   def _RunAllocator(self):
3477     """Run the allocator based on input opcode.
3478
3479     """
3480     disks = [{"size": self.op.disk_size, "mode": "w"},
3481              {"size": self.op.swap_size, "mode": "w"}]
3482     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3483              "bridge": self.op.bridge}]
3484     ial = IAllocator(self,
3485                      mode=constants.IALLOCATOR_MODE_ALLOC,
3486                      name=self.op.instance_name,
3487                      disk_template=self.op.disk_template,
3488                      tags=[],
3489                      os=self.op.os_type,
3490                      vcpus=self.be_full[constants.BE_VCPUS],
3491                      mem_size=self.be_full[constants.BE_MEMORY],
3492                      disks=disks,
3493                      nics=nics,
3494                      )
3495
3496     ial.Run(self.op.iallocator)
3497
3498     if not ial.success:
3499       raise errors.OpPrereqError("Can't compute nodes using"
3500                                  " iallocator '%s': %s" % (self.op.iallocator,
3501                                                            ial.info))
3502     if len(ial.nodes) != ial.required_nodes:
3503       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3504                                  " of nodes (%s), required %s" %
3505                                  (self.op.iallocator, len(ial.nodes),
3506                                   ial.required_nodes))
3507     self.op.pnode = ial.nodes[0]
3508     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3509                  self.op.instance_name, self.op.iallocator,
3510                  ", ".join(ial.nodes))
3511     if ial.required_nodes == 2:
3512       self.op.snode = ial.nodes[1]
3513
3514   def BuildHooksEnv(self):
3515     """Build hooks env.
3516
3517     This runs on master, primary and secondary nodes of the instance.
3518
3519     """
3520     env = {
3521       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3522       "INSTANCE_DISK_SIZE": self.op.disk_size,
3523       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3524       "INSTANCE_ADD_MODE": self.op.mode,
3525       }
3526     if self.op.mode == constants.INSTANCE_IMPORT:
3527       env["INSTANCE_SRC_NODE"] = self.op.src_node
3528       env["INSTANCE_SRC_PATH"] = self.op.src_path
3529       env["INSTANCE_SRC_IMAGES"] = self.src_images
3530
3531     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3532       primary_node=self.op.pnode,
3533       secondary_nodes=self.secondaries,
3534       status=self.instance_status,
3535       os_type=self.op.os_type,
3536       memory=self.be_full[constants.BE_MEMORY],
3537       vcpus=self.be_full[constants.BE_VCPUS],
3538       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3539     ))
3540
3541     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3542           self.secondaries)
3543     return env, nl, nl
3544
3545
3546   def CheckPrereq(self):
3547     """Check prerequisites.
3548
3549     """
3550     if (not self.cfg.GetVGName() and
3551         self.op.disk_template not in constants.DTS_NOT_LVM):
3552       raise errors.OpPrereqError("Cluster does not support lvm-based"
3553                                  " instances")
3554
3555
3556     if self.op.mode == constants.INSTANCE_IMPORT:
3557       src_node = self.op.src_node
3558       src_path = self.op.src_path
3559
3560       export_info = self.rpc.call_export_info(src_node, src_path)
3561
3562       if not export_info:
3563         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3564
3565       if not export_info.has_section(constants.INISECT_EXP):
3566         raise errors.ProgrammerError("Corrupted export config")
3567
3568       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3569       if (int(ei_version) != constants.EXPORT_VERSION):
3570         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3571                                    (ei_version, constants.EXPORT_VERSION))
3572
3573       # Check that the new instance doesn't have less disks than the export
3574       # TODO: substitute "2" with the actual number of disks requested
3575       instance_disks = 2
3576       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3577       if instance_disks < export_disks:
3578         raise errors.OpPrereqError("Not enough disks to import."
3579                                    " (instance: %d, export: %d)" %
3580                                    (2, export_disks))
3581
3582       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3583       disk_images = []
3584       for idx in range(export_disks):
3585         option = 'disk%d_dump' % idx
3586         if export_info.has_option(constants.INISECT_INS, option):
3587           # FIXME: are the old os-es, disk sizes, etc. useful?
3588           export_name = export_info.get(constants.INISECT_INS, option)
3589           image = os.path.join(src_path, export_name)
3590           disk_images.append(image)
3591         else:
3592           disk_images.append(False)
3593
3594       self.src_images = disk_images
3595
3596       if self.op.mac == constants.VALUE_AUTO:
3597         old_name = export_info.get(constants.INISECT_INS, 'name')
3598         if self.op.instance_name == old_name:
3599           # FIXME: adjust every nic, when we'll be able to create instances
3600           # with more than one
3601           if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
3602             self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
3603
3604     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3605
3606     if self.op.start and not self.op.ip_check:
3607       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3608                                  " adding an instance in start mode")
3609
3610     if self.op.ip_check:
3611       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3612         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3613                                    (self.check_ip, self.op.instance_name))
3614
3615     # bridge verification
3616     bridge = getattr(self.op, "bridge", None)
3617     if bridge is None:
3618       self.op.bridge = self.cfg.GetDefBridge()
3619     else:
3620       self.op.bridge = bridge
3621
3622     #### allocator run
3623
3624     if self.op.iallocator is not None:
3625       self._RunAllocator()
3626
3627     #### node related checks
3628
3629     # check primary node
3630     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3631     assert self.pnode is not None, \
3632       "Cannot retrieve locked node %s" % self.op.pnode
3633     self.secondaries = []
3634
3635     # mirror node verification
3636     if self.op.disk_template in constants.DTS_NET_MIRROR:
3637       if self.op.snode is None:
3638         raise errors.OpPrereqError("The networked disk templates need"
3639                                    " a mirror node")
3640       if self.op.snode == pnode.name:
3641         raise errors.OpPrereqError("The secondary node cannot be"
3642                                    " the primary node.")
3643       self.secondaries.append(self.op.snode)
3644
3645     nodenames = [pnode.name] + self.secondaries
3646
3647     req_size = _ComputeDiskSize(self.op.disk_template,
3648                                 self.op.disk_size, self.op.swap_size)
3649
3650     # Check lv size requirements
3651     if req_size is not None:
3652       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3653                                          self.op.hypervisor)
3654       for node in nodenames:
3655         info = nodeinfo.get(node, None)
3656         if not info:
3657           raise errors.OpPrereqError("Cannot get current information"
3658                                      " from node '%s'" % node)
3659         vg_free = info.get('vg_free', None)
3660         if not isinstance(vg_free, int):
3661           raise errors.OpPrereqError("Can't compute free disk space on"
3662                                      " node %s" % node)
3663         if req_size > info['vg_free']:
3664           raise errors.OpPrereqError("Not enough disk space on target node %s."
3665                                      " %d MB available, %d MB required" %
3666                                      (node, info['vg_free'], req_size))
3667
3668     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3669
3670     # os verification
3671     os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3672     if not os_obj:
3673       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3674                                  " primary node"  % self.op.os_type)
3675
3676     # bridge check on primary node
3677     if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3678       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3679                                  " destination node '%s'" %
3680                                  (self.op.bridge, pnode.name))
3681
3682     # memory check on primary node
3683     if self.op.start:
3684       _CheckNodeFreeMemory(self, self.pnode.name,
3685                            "creating instance %s" % self.op.instance_name,
3686                            self.be_full[constants.BE_MEMORY],
3687                            self.op.hypervisor)
3688
3689     if self.op.start:
3690       self.instance_status = 'up'
3691     else:
3692       self.instance_status = 'down'
3693
3694   def Exec(self, feedback_fn):
3695     """Create and add the instance to the cluster.
3696
3697     """
3698     instance = self.op.instance_name
3699     pnode_name = self.pnode.name
3700
3701     if self.op.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3702       mac_address = self.cfg.GenerateMAC()
3703     else:
3704       mac_address = self.op.mac
3705
3706     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3707     if self.inst_ip is not None:
3708       nic.ip = self.inst_ip
3709
3710     ht_kind = self.op.hypervisor
3711     if ht_kind in constants.HTS_REQ_PORT:
3712       network_port = self.cfg.AllocatePort()
3713     else:
3714       network_port = None
3715
3716     ##if self.op.vnc_bind_address is None:
3717     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3718
3719     # this is needed because os.path.join does not accept None arguments
3720     if self.op.file_storage_dir is None:
3721       string_file_storage_dir = ""
3722     else:
3723       string_file_storage_dir = self.op.file_storage_dir
3724
3725     # build the full file storage dir path
3726     file_storage_dir = os.path.normpath(os.path.join(
3727                                         self.cfg.GetFileStorageDir(),
3728                                         string_file_storage_dir, instance))
3729
3730
3731     disks = _GenerateDiskTemplate(self,
3732                                   self.op.disk_template,
3733                                   instance, pnode_name,
3734                                   self.secondaries, self.op.disk_size,
3735                                   self.op.swap_size,
3736                                   file_storage_dir,
3737                                   self.op.file_driver)
3738
3739     iobj = objects.Instance(name=instance, os=self.op.os_type,
3740                             primary_node=pnode_name,
3741                             nics=[nic], disks=disks,
3742                             disk_template=self.op.disk_template,
3743                             status=self.instance_status,
3744                             network_port=network_port,
3745                             beparams=self.op.beparams,
3746                             hvparams=self.op.hvparams,
3747                             hypervisor=self.op.hypervisor,
3748                             )
3749
3750     feedback_fn("* creating instance disks...")
3751     if not _CreateDisks(self, iobj):
3752       _RemoveDisks(self, iobj)
3753       self.cfg.ReleaseDRBDMinors(instance)
3754       raise errors.OpExecError("Device creation failed, reverting...")
3755
3756     feedback_fn("adding instance %s to cluster config" % instance)
3757
3758     self.cfg.AddInstance(iobj)
3759     # Declare that we don't want to remove the instance lock anymore, as we've
3760     # added the instance to the config
3761     del self.remove_locks[locking.LEVEL_INSTANCE]
3762     # Remove the temp. assignements for the instance's drbds
3763     self.cfg.ReleaseDRBDMinors(instance)
3764
3765     if self.op.wait_for_sync:
3766       disk_abort = not _WaitForSync(self, iobj)
3767     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3768       # make sure the disks are not degraded (still sync-ing is ok)
3769       time.sleep(15)
3770       feedback_fn("* checking mirrors status")
3771       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3772     else:
3773       disk_abort = False
3774
3775     if disk_abort:
3776       _RemoveDisks(self, iobj)
3777       self.cfg.RemoveInstance(iobj.name)
3778       # Make sure the instance lock gets removed
3779       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3780       raise errors.OpExecError("There are some degraded disks for"
3781                                " this instance")
3782
3783     feedback_fn("creating os for instance %s on node %s" %
3784                 (instance, pnode_name))
3785
3786     if iobj.disk_template != constants.DT_DISKLESS:
3787       if self.op.mode == constants.INSTANCE_CREATE:
3788         feedback_fn("* running the instance OS create scripts...")
3789         if not self.rpc.call_instance_os_add(pnode_name, iobj):
3790           raise errors.OpExecError("could not add os for instance %s"
3791                                    " on node %s" %
3792                                    (instance, pnode_name))
3793
3794       elif self.op.mode == constants.INSTANCE_IMPORT:
3795         feedback_fn("* running the instance OS import scripts...")
3796         src_node = self.op.src_node
3797         src_images = self.src_images
3798         cluster_name = self.cfg.GetClusterName()
3799         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3800                                                          src_node, src_images,
3801                                                          cluster_name)
3802         for idx, result in enumerate(import_result):
3803           if not result:
3804             self.LogWarning("Could not image %s for on instance %s, disk %d,"
3805                             " on node %s" % (src_images[idx], instance, idx,
3806                                              pnode_name))
3807       else:
3808         # also checked in the prereq part
3809         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3810                                      % self.op.mode)
3811
3812     if self.op.start:
3813       logging.info("Starting instance %s on node %s", instance, pnode_name)
3814       feedback_fn("* starting instance...")
3815       if not self.rpc.call_instance_start(pnode_name, iobj, None):
3816         raise errors.OpExecError("Could not start instance")
3817
3818
3819 class LUConnectConsole(NoHooksLU):
3820   """Connect to an instance's console.
3821
3822   This is somewhat special in that it returns the command line that
3823   you need to run on the master node in order to connect to the
3824   console.
3825
3826   """
3827   _OP_REQP = ["instance_name"]
3828   REQ_BGL = False
3829
3830   def ExpandNames(self):
3831     self._ExpandAndLockInstance()
3832
3833   def CheckPrereq(self):
3834     """Check prerequisites.
3835
3836     This checks that the instance is in the cluster.
3837
3838     """
3839     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3840     assert self.instance is not None, \
3841       "Cannot retrieve locked instance %s" % self.op.instance_name
3842
3843   def Exec(self, feedback_fn):
3844     """Connect to the console of an instance
3845
3846     """
3847     instance = self.instance
3848     node = instance.primary_node
3849
3850     node_insts = self.rpc.call_instance_list([node],
3851                                              [instance.hypervisor])[node]
3852     if node_insts is False:
3853       raise errors.OpExecError("Can't connect to node %s." % node)
3854
3855     if instance.name not in node_insts:
3856       raise errors.OpExecError("Instance %s is not running." % instance.name)
3857
3858     logging.debug("Connecting to console of %s on %s", instance.name, node)
3859
3860     hyper = hypervisor.GetHypervisor(instance.hypervisor)
3861     console_cmd = hyper.GetShellCommandForConsole(instance)
3862
3863     # build ssh cmdline
3864     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3865
3866
3867 class LUReplaceDisks(LogicalUnit):
3868   """Replace the disks of an instance.
3869
3870   """
3871   HPATH = "mirrors-replace"
3872   HTYPE = constants.HTYPE_INSTANCE
3873   _OP_REQP = ["instance_name", "mode", "disks"]
3874   REQ_BGL = False
3875
3876   def ExpandNames(self):
3877     self._ExpandAndLockInstance()
3878
3879     if not hasattr(self.op, "remote_node"):
3880       self.op.remote_node = None
3881
3882     ia_name = getattr(self.op, "iallocator", None)
3883     if ia_name is not None:
3884       if self.op.remote_node is not None:
3885         raise errors.OpPrereqError("Give either the iallocator or the new"
3886                                    " secondary, not both")
3887       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3888     elif self.op.remote_node is not None:
3889       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3890       if remote_node is None:
3891         raise errors.OpPrereqError("Node '%s' not known" %
3892                                    self.op.remote_node)
3893       self.op.remote_node = remote_node
3894       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3895       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3896     else:
3897       self.needed_locks[locking.LEVEL_NODE] = []
3898       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3899
3900   def DeclareLocks(self, level):
3901     # If we're not already locking all nodes in the set we have to declare the
3902     # instance's primary/secondary nodes.
3903     if (level == locking.LEVEL_NODE and
3904         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3905       self._LockInstancesNodes()
3906
3907   def _RunAllocator(self):
3908     """Compute a new secondary node using an IAllocator.
3909
3910     """
3911     ial = IAllocator(self,
3912                      mode=constants.IALLOCATOR_MODE_RELOC,
3913                      name=self.op.instance_name,
3914                      relocate_from=[self.sec_node])
3915
3916     ial.Run(self.op.iallocator)
3917
3918     if not ial.success:
3919       raise errors.OpPrereqError("Can't compute nodes using"
3920                                  " iallocator '%s': %s" % (self.op.iallocator,
3921                                                            ial.info))
3922     if len(ial.nodes) != ial.required_nodes:
3923       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3924                                  " of nodes (%s), required %s" %
3925                                  (len(ial.nodes), ial.required_nodes))
3926     self.op.remote_node = ial.nodes[0]
3927     self.LogInfo("Selected new secondary for the instance: %s",
3928                  self.op.remote_node)
3929
3930   def BuildHooksEnv(self):
3931     """Build hooks env.
3932
3933     This runs on the master, the primary and all the secondaries.
3934
3935     """
3936     env = {
3937       "MODE": self.op.mode,
3938       "NEW_SECONDARY": self.op.remote_node,
3939       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3940       }
3941     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3942     nl = [
3943       self.cfg.GetMasterNode(),
3944       self.instance.primary_node,
3945       ]
3946     if self.op.remote_node is not None:
3947       nl.append(self.op.remote_node)
3948     return env, nl, nl
3949
3950   def CheckPrereq(self):
3951     """Check prerequisites.
3952
3953     This checks that the instance is in the cluster.
3954
3955     """
3956     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3957     assert instance is not None, \
3958       "Cannot retrieve locked instance %s" % self.op.instance_name
3959     self.instance = instance
3960
3961     if instance.disk_template not in constants.DTS_NET_MIRROR:
3962       raise errors.OpPrereqError("Instance's disk layout is not"
3963                                  " network mirrored.")
3964
3965     if len(instance.secondary_nodes) != 1:
3966       raise errors.OpPrereqError("The instance has a strange layout,"
3967                                  " expected one secondary but found %d" %
3968                                  len(instance.secondary_nodes))
3969
3970     self.sec_node = instance.secondary_nodes[0]
3971
3972     ia_name = getattr(self.op, "iallocator", None)
3973     if ia_name is not None:
3974       self._RunAllocator()
3975
3976     remote_node = self.op.remote_node
3977     if remote_node is not None:
3978       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3979       assert self.remote_node_info is not None, \
3980         "Cannot retrieve locked node %s" % remote_node
3981     else:
3982       self.remote_node_info = None
3983     if remote_node == instance.primary_node:
3984       raise errors.OpPrereqError("The specified node is the primary node of"
3985                                  " the instance.")
3986     elif remote_node == self.sec_node:
3987       if self.op.mode == constants.REPLACE_DISK_SEC:
3988         # this is for DRBD8, where we can't execute the same mode of
3989         # replacement as for drbd7 (no different port allocated)
3990         raise errors.OpPrereqError("Same secondary given, cannot execute"
3991                                    " replacement")
3992     if instance.disk_template == constants.DT_DRBD8:
3993       if (self.op.mode == constants.REPLACE_DISK_ALL and
3994           remote_node is not None):
3995         # switch to replace secondary mode
3996         self.op.mode = constants.REPLACE_DISK_SEC
3997
3998       if self.op.mode == constants.REPLACE_DISK_ALL:
3999         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4000                                    " secondary disk replacement, not"
4001                                    " both at once")
4002       elif self.op.mode == constants.REPLACE_DISK_PRI:
4003         if remote_node is not None:
4004           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4005                                      " the secondary while doing a primary"
4006                                      " node disk replacement")
4007         self.tgt_node = instance.primary_node
4008         self.oth_node = instance.secondary_nodes[0]
4009       elif self.op.mode == constants.REPLACE_DISK_SEC:
4010         self.new_node = remote_node # this can be None, in which case
4011                                     # we don't change the secondary
4012         self.tgt_node = instance.secondary_nodes[0]
4013         self.oth_node = instance.primary_node
4014       else:
4015         raise errors.ProgrammerError("Unhandled disk replace mode")
4016
4017     for name in self.op.disks:
4018       if instance.FindDisk(name) is None:
4019         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4020                                    (name, instance.name))
4021
4022   def _ExecD8DiskOnly(self, feedback_fn):
4023     """Replace a disk on the primary or secondary for dbrd8.
4024
4025     The algorithm for replace is quite complicated:
4026       - for each disk to be replaced:
4027         - create new LVs on the target node with unique names
4028         - detach old LVs from the drbd device
4029         - rename old LVs to name_replaced.<time_t>
4030         - rename new LVs to old LVs
4031         - attach the new LVs (with the old names now) to the drbd device
4032       - wait for sync across all devices
4033       - for each modified disk:
4034         - remove old LVs (which have the name name_replaces.<time_t>)
4035
4036     Failures are not very well handled.
4037
4038     """
4039     steps_total = 6
4040     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4041     instance = self.instance
4042     iv_names = {}
4043     vgname = self.cfg.GetVGName()
4044     # start of work
4045     cfg = self.cfg
4046     tgt_node = self.tgt_node
4047     oth_node = self.oth_node
4048
4049     # Step: check device activation
4050     self.proc.LogStep(1, steps_total, "check device existence")
4051     info("checking volume groups")
4052     my_vg = cfg.GetVGName()
4053     results = self.rpc.call_vg_list([oth_node, tgt_node])
4054     if not results:
4055       raise errors.OpExecError("Can't list volume groups on the nodes")
4056     for node in oth_node, tgt_node:
4057       res = results.get(node, False)
4058       if not res or my_vg not in res:
4059         raise errors.OpExecError("Volume group '%s' not found on %s" %
4060                                  (my_vg, node))
4061     for dev in instance.disks:
4062       if not dev.iv_name in self.op.disks:
4063         continue
4064       for node in tgt_node, oth_node:
4065         info("checking %s on %s" % (dev.iv_name, node))
4066         cfg.SetDiskID(dev, node)
4067         if not self.rpc.call_blockdev_find(node, dev):
4068           raise errors.OpExecError("Can't find device %s on node %s" %
4069                                    (dev.iv_name, node))
4070
4071     # Step: check other node consistency
4072     self.proc.LogStep(2, steps_total, "check peer consistency")
4073     for dev in instance.disks:
4074       if not dev.iv_name in self.op.disks:
4075         continue
4076       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4077       if not _CheckDiskConsistency(self, dev, oth_node,
4078                                    oth_node==instance.primary_node):
4079         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4080                                  " to replace disks on this node (%s)" %
4081                                  (oth_node, tgt_node))
4082
4083     # Step: create new storage
4084     self.proc.LogStep(3, steps_total, "allocate new storage")
4085     for dev in instance.disks:
4086       if not dev.iv_name in self.op.disks:
4087         continue
4088       size = dev.size
4089       cfg.SetDiskID(dev, tgt_node)
4090       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4091       names = _GenerateUniqueNames(self, lv_names)
4092       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4093                              logical_id=(vgname, names[0]))
4094       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4095                              logical_id=(vgname, names[1]))
4096       new_lvs = [lv_data, lv_meta]
4097       old_lvs = dev.children
4098       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4099       info("creating new local storage on %s for %s" %
4100            (tgt_node, dev.iv_name))
4101       # since we *always* want to create this LV, we use the
4102       # _Create...OnPrimary (which forces the creation), even if we
4103       # are talking about the secondary node
4104       for new_lv in new_lvs:
4105         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4106                                         _GetInstanceInfoText(instance)):
4107           raise errors.OpExecError("Failed to create new LV named '%s' on"
4108                                    " node '%s'" %
4109                                    (new_lv.logical_id[1], tgt_node))
4110
4111     # Step: for each lv, detach+rename*2+attach
4112     self.proc.LogStep(4, steps_total, "change drbd configuration")
4113     for dev, old_lvs, new_lvs in iv_names.itervalues():
4114       info("detaching %s drbd from local storage" % dev.iv_name)
4115       if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4116         raise errors.OpExecError("Can't detach drbd from local storage on node"
4117                                  " %s for device %s" % (tgt_node, dev.iv_name))
4118       #dev.children = []
4119       #cfg.Update(instance)
4120
4121       # ok, we created the new LVs, so now we know we have the needed
4122       # storage; as such, we proceed on the target node to rename
4123       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4124       # using the assumption that logical_id == physical_id (which in
4125       # turn is the unique_id on that node)
4126
4127       # FIXME(iustin): use a better name for the replaced LVs
4128       temp_suffix = int(time.time())
4129       ren_fn = lambda d, suff: (d.physical_id[0],
4130                                 d.physical_id[1] + "_replaced-%s" % suff)
4131       # build the rename list based on what LVs exist on the node
4132       rlist = []
4133       for to_ren in old_lvs:
4134         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4135         if find_res is not None: # device exists
4136           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4137
4138       info("renaming the old LVs on the target node")
4139       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4140         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4141       # now we rename the new LVs to the old LVs
4142       info("renaming the new LVs on the target node")
4143       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4144       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4145         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4146
4147       for old, new in zip(old_lvs, new_lvs):
4148         new.logical_id = old.logical_id
4149         cfg.SetDiskID(new, tgt_node)
4150
4151       for disk in old_lvs:
4152         disk.logical_id = ren_fn(disk, temp_suffix)
4153         cfg.SetDiskID(disk, tgt_node)
4154
4155       # now that the new lvs have the old name, we can add them to the device
4156       info("adding new mirror component on %s" % tgt_node)
4157       if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4158         for new_lv in new_lvs:
4159           if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4160             warning("Can't rollback device %s", hint="manually cleanup unused"
4161                     " logical volumes")
4162         raise errors.OpExecError("Can't add local storage to drbd")
4163
4164       dev.children = new_lvs
4165       cfg.Update(instance)
4166
4167     # Step: wait for sync
4168
4169     # this can fail as the old devices are degraded and _WaitForSync
4170     # does a combined result over all disks, so we don't check its
4171     # return value
4172     self.proc.LogStep(5, steps_total, "sync devices")
4173     _WaitForSync(self, instance, unlock=True)
4174
4175     # so check manually all the devices
4176     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4177       cfg.SetDiskID(dev, instance.primary_node)
4178       is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4179       if is_degr:
4180         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4181
4182     # Step: remove old storage
4183     self.proc.LogStep(6, steps_total, "removing old storage")
4184     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4185       info("remove logical volumes for %s" % name)
4186       for lv in old_lvs:
4187         cfg.SetDiskID(lv, tgt_node)
4188         if not self.rpc.call_blockdev_remove(tgt_node, lv):
4189           warning("Can't remove old LV", hint="manually remove unused LVs")
4190           continue
4191
4192   def _ExecD8Secondary(self, feedback_fn):
4193     """Replace the secondary node for drbd8.
4194
4195     The algorithm for replace is quite complicated:
4196       - for all disks of the instance:
4197         - create new LVs on the new node with same names
4198         - shutdown the drbd device on the old secondary
4199         - disconnect the drbd network on the primary
4200         - create the drbd device on the new secondary
4201         - network attach the drbd on the primary, using an artifice:
4202           the drbd code for Attach() will connect to the network if it
4203           finds a device which is connected to the good local disks but
4204           not network enabled
4205       - wait for sync across all devices
4206       - remove all disks from the old secondary
4207
4208     Failures are not very well handled.
4209
4210     """
4211     steps_total = 6
4212     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4213     instance = self.instance
4214     iv_names = {}
4215     vgname = self.cfg.GetVGName()
4216     # start of work
4217     cfg = self.cfg
4218     old_node = self.tgt_node
4219     new_node = self.new_node
4220     pri_node = instance.primary_node
4221
4222     # Step: check device activation
4223     self.proc.LogStep(1, steps_total, "check device existence")
4224     info("checking volume groups")
4225     my_vg = cfg.GetVGName()
4226     results = self.rpc.call_vg_list([pri_node, new_node])
4227     if not results:
4228       raise errors.OpExecError("Can't list volume groups on the nodes")
4229     for node in pri_node, new_node:
4230       res = results.get(node, False)
4231       if not res or my_vg not in res:
4232         raise errors.OpExecError("Volume group '%s' not found on %s" %
4233                                  (my_vg, node))
4234     for dev in instance.disks:
4235       if not dev.iv_name in self.op.disks:
4236         continue
4237       info("checking %s on %s" % (dev.iv_name, pri_node))
4238       cfg.SetDiskID(dev, pri_node)
4239       if not self.rpc.call_blockdev_find(pri_node, dev):
4240         raise errors.OpExecError("Can't find device %s on node %s" %
4241                                  (dev.iv_name, pri_node))
4242
4243     # Step: check other node consistency
4244     self.proc.LogStep(2, steps_total, "check peer consistency")
4245     for dev in instance.disks:
4246       if not dev.iv_name in self.op.disks:
4247         continue
4248       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4249       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4250         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4251                                  " unsafe to replace the secondary" %
4252                                  pri_node)
4253
4254     # Step: create new storage
4255     self.proc.LogStep(3, steps_total, "allocate new storage")
4256     for dev in instance.disks:
4257       size = dev.size
4258       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4259       # since we *always* want to create this LV, we use the
4260       # _Create...OnPrimary (which forces the creation), even if we
4261       # are talking about the secondary node
4262       for new_lv in dev.children:
4263         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4264                                         _GetInstanceInfoText(instance)):
4265           raise errors.OpExecError("Failed to create new LV named '%s' on"
4266                                    " node '%s'" %
4267                                    (new_lv.logical_id[1], new_node))
4268
4269
4270     # Step 4: dbrd minors and drbd setups changes
4271     # after this, we must manually remove the drbd minors on both the
4272     # error and the success paths
4273     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4274                                    instance.name)
4275     logging.debug("Allocated minors %s" % (minors,))
4276     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4277     for dev, new_minor in zip(instance.disks, minors):
4278       size = dev.size
4279       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4280       # create new devices on new_node
4281       if pri_node == dev.logical_id[0]:
4282         new_logical_id = (pri_node, new_node,
4283                           dev.logical_id[2], dev.logical_id[3], new_minor,
4284                           dev.logical_id[5])
4285       else:
4286         new_logical_id = (new_node, pri_node,
4287                           dev.logical_id[2], new_minor, dev.logical_id[4],
4288                           dev.logical_id[5])
4289       iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4290       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4291                     new_logical_id)
4292       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4293                               logical_id=new_logical_id,
4294                               children=dev.children)
4295       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4296                                         new_drbd, False,
4297                                         _GetInstanceInfoText(instance)):
4298         self.cfg.ReleaseDRBDMinors(instance.name)
4299         raise errors.OpExecError("Failed to create new DRBD on"
4300                                  " node '%s'" % new_node)
4301
4302     for dev in instance.disks:
4303       # we have new devices, shutdown the drbd on the old secondary
4304       info("shutting down drbd for %s on old node" % dev.iv_name)
4305       cfg.SetDiskID(dev, old_node)
4306       if not self.rpc.call_blockdev_shutdown(old_node, dev):
4307         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4308                 hint="Please cleanup this device manually as soon as possible")
4309
4310     info("detaching primary drbds from the network (=> standalone)")
4311     done = 0
4312     for dev in instance.disks:
4313       cfg.SetDiskID(dev, pri_node)
4314       # set the network part of the physical (unique in bdev terms) id
4315       # to None, meaning detach from network
4316       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4317       # and 'find' the device, which will 'fix' it to match the
4318       # standalone state
4319       if self.rpc.call_blockdev_find(pri_node, dev):
4320         done += 1
4321       else:
4322         warning("Failed to detach drbd %s from network, unusual case" %
4323                 dev.iv_name)
4324
4325     if not done:
4326       # no detaches succeeded (very unlikely)
4327       self.cfg.ReleaseDRBDMinors(instance.name)
4328       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4329
4330     # if we managed to detach at least one, we update all the disks of
4331     # the instance to point to the new secondary
4332     info("updating instance configuration")
4333     for dev, _, new_logical_id in iv_names.itervalues():
4334       dev.logical_id = new_logical_id
4335       cfg.SetDiskID(dev, pri_node)
4336     cfg.Update(instance)
4337     # we can remove now the temp minors as now the new values are
4338     # written to the config file (and therefore stable)
4339     self.cfg.ReleaseDRBDMinors(instance.name)
4340
4341     # and now perform the drbd attach
4342     info("attaching primary drbds to new secondary (standalone => connected)")
4343     failures = []
4344     for dev in instance.disks:
4345       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4346       # since the attach is smart, it's enough to 'find' the device,
4347       # it will automatically activate the network, if the physical_id
4348       # is correct
4349       cfg.SetDiskID(dev, pri_node)
4350       logging.debug("Disk to attach: %s", dev)
4351       if not self.rpc.call_blockdev_find(pri_node, dev):
4352         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4353                 "please do a gnt-instance info to see the status of disks")
4354
4355     # this can fail as the old devices are degraded and _WaitForSync
4356     # does a combined result over all disks, so we don't check its
4357     # return value
4358     self.proc.LogStep(5, steps_total, "sync devices")
4359     _WaitForSync(self, instance, unlock=True)
4360
4361     # so check manually all the devices
4362     for name, (dev, old_lvs, _) in iv_names.iteritems():
4363       cfg.SetDiskID(dev, pri_node)
4364       is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4365       if is_degr:
4366         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4367
4368     self.proc.LogStep(6, steps_total, "removing old storage")
4369     for name, (dev, old_lvs, _) in iv_names.iteritems():
4370       info("remove logical volumes for %s" % name)
4371       for lv in old_lvs:
4372         cfg.SetDiskID(lv, old_node)
4373         if not self.rpc.call_blockdev_remove(old_node, lv):
4374           warning("Can't remove LV on old secondary",
4375                   hint="Cleanup stale volumes by hand")
4376
4377   def Exec(self, feedback_fn):
4378     """Execute disk replacement.
4379
4380     This dispatches the disk replacement to the appropriate handler.
4381
4382     """
4383     instance = self.instance
4384
4385     # Activate the instance disks if we're replacing them on a down instance
4386     if instance.status == "down":
4387       _StartInstanceDisks(self, instance, True)
4388
4389     if instance.disk_template == constants.DT_DRBD8:
4390       if self.op.remote_node is None:
4391         fn = self._ExecD8DiskOnly
4392       else:
4393         fn = self._ExecD8Secondary
4394     else:
4395       raise errors.ProgrammerError("Unhandled disk replacement case")
4396
4397     ret = fn(feedback_fn)
4398
4399     # Deactivate the instance disks if we're replacing them on a down instance
4400     if instance.status == "down":
4401       _SafeShutdownInstanceDisks(self, instance)
4402
4403     return ret
4404
4405
4406 class LUGrowDisk(LogicalUnit):
4407   """Grow a disk of an instance.
4408
4409   """
4410   HPATH = "disk-grow"
4411   HTYPE = constants.HTYPE_INSTANCE
4412   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4413   REQ_BGL = False
4414
4415   def ExpandNames(self):
4416     self._ExpandAndLockInstance()
4417     self.needed_locks[locking.LEVEL_NODE] = []
4418     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4419
4420   def DeclareLocks(self, level):
4421     if level == locking.LEVEL_NODE:
4422       self._LockInstancesNodes()
4423
4424   def BuildHooksEnv(self):
4425     """Build hooks env.
4426
4427     This runs on the master, the primary and all the secondaries.
4428
4429     """
4430     env = {
4431       "DISK": self.op.disk,
4432       "AMOUNT": self.op.amount,
4433       }
4434     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4435     nl = [
4436       self.cfg.GetMasterNode(),
4437       self.instance.primary_node,
4438       ]
4439     return env, nl, nl
4440
4441   def CheckPrereq(self):
4442     """Check prerequisites.
4443
4444     This checks that the instance is in the cluster.
4445
4446     """
4447     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4448     assert instance is not None, \
4449       "Cannot retrieve locked instance %s" % self.op.instance_name
4450
4451     self.instance = instance
4452
4453     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4454       raise errors.OpPrereqError("Instance's disk layout does not support"
4455                                  " growing.")
4456
4457     if instance.FindDisk(self.op.disk) is None:
4458       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4459                                  (self.op.disk, instance.name))
4460
4461     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4462     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4463                                        instance.hypervisor)
4464     for node in nodenames:
4465       info = nodeinfo.get(node, None)
4466       if not info:
4467         raise errors.OpPrereqError("Cannot get current information"
4468                                    " from node '%s'" % node)
4469       vg_free = info.get('vg_free', None)
4470       if not isinstance(vg_free, int):
4471         raise errors.OpPrereqError("Can't compute free disk space on"
4472                                    " node %s" % node)
4473       if self.op.amount > info['vg_free']:
4474         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4475                                    " %d MiB available, %d MiB required" %
4476                                    (node, info['vg_free'], self.op.amount))
4477
4478   def Exec(self, feedback_fn):
4479     """Execute disk grow.
4480
4481     """
4482     instance = self.instance
4483     disk = instance.FindDisk(self.op.disk)
4484     for node in (instance.secondary_nodes + (instance.primary_node,)):
4485       self.cfg.SetDiskID(disk, node)
4486       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4487       if (not result or not isinstance(result, (list, tuple)) or
4488           len(result) != 2):
4489         raise errors.OpExecError("grow request failed to node %s" % node)
4490       elif not result[0]:
4491         raise errors.OpExecError("grow request failed to node %s: %s" %
4492                                  (node, result[1]))
4493     disk.RecordGrow(self.op.amount)
4494     self.cfg.Update(instance)
4495     if self.op.wait_for_sync:
4496       disk_abort = not _WaitForSync(self, instance)
4497       if disk_abort:
4498         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4499                              " status.\nPlease check the instance.")
4500
4501
4502 class LUQueryInstanceData(NoHooksLU):
4503   """Query runtime instance data.
4504
4505   """
4506   _OP_REQP = ["instances", "static"]
4507   REQ_BGL = False
4508
4509   def ExpandNames(self):
4510     self.needed_locks = {}
4511     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4512
4513     if not isinstance(self.op.instances, list):
4514       raise errors.OpPrereqError("Invalid argument type 'instances'")
4515
4516     if self.op.instances:
4517       self.wanted_names = []
4518       for name in self.op.instances:
4519         full_name = self.cfg.ExpandInstanceName(name)
4520         if full_name is None:
4521           raise errors.OpPrereqError("Instance '%s' not known" %
4522                                      self.op.instance_name)
4523         self.wanted_names.append(full_name)
4524       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4525     else:
4526       self.wanted_names = None
4527       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4528
4529     self.needed_locks[locking.LEVEL_NODE] = []
4530     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4531
4532   def DeclareLocks(self, level):
4533     if level == locking.LEVEL_NODE:
4534       self._LockInstancesNodes()
4535
4536   def CheckPrereq(self):
4537     """Check prerequisites.
4538
4539     This only checks the optional instance list against the existing names.
4540
4541     """
4542     if self.wanted_names is None:
4543       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4544
4545     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4546                              in self.wanted_names]
4547     return
4548
4549   def _ComputeDiskStatus(self, instance, snode, dev):
4550     """Compute block device status.
4551
4552     """
4553     static = self.op.static
4554     if not static:
4555       self.cfg.SetDiskID(dev, instance.primary_node)
4556       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4557     else:
4558       dev_pstatus = None
4559
4560     if dev.dev_type in constants.LDS_DRBD:
4561       # we change the snode then (otherwise we use the one passed in)
4562       if dev.logical_id[0] == instance.primary_node:
4563         snode = dev.logical_id[1]
4564       else:
4565         snode = dev.logical_id[0]
4566
4567     if snode and not static:
4568       self.cfg.SetDiskID(dev, snode)
4569       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4570     else:
4571       dev_sstatus = None
4572
4573     if dev.children:
4574       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4575                       for child in dev.children]
4576     else:
4577       dev_children = []
4578
4579     data = {
4580       "iv_name": dev.iv_name,
4581       "dev_type": dev.dev_type,
4582       "logical_id": dev.logical_id,
4583       "physical_id": dev.physical_id,
4584       "pstatus": dev_pstatus,
4585       "sstatus": dev_sstatus,
4586       "children": dev_children,
4587       }
4588
4589     return data
4590
4591   def Exec(self, feedback_fn):
4592     """Gather and return data"""
4593     result = {}
4594
4595     cluster = self.cfg.GetClusterInfo()
4596
4597     for instance in self.wanted_instances:
4598       if not self.op.static:
4599         remote_info = self.rpc.call_instance_info(instance.primary_node,
4600                                                   instance.name,
4601                                                   instance.hypervisor)
4602         if remote_info and "state" in remote_info:
4603           remote_state = "up"
4604         else:
4605           remote_state = "down"
4606       else:
4607         remote_state = None
4608       if instance.status == "down":
4609         config_state = "down"
4610       else:
4611         config_state = "up"
4612
4613       disks = [self._ComputeDiskStatus(instance, None, device)
4614                for device in instance.disks]
4615
4616       idict = {
4617         "name": instance.name,
4618         "config_state": config_state,
4619         "run_state": remote_state,
4620         "pnode": instance.primary_node,
4621         "snodes": instance.secondary_nodes,
4622         "os": instance.os,
4623         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4624         "disks": disks,
4625         "hypervisor": instance.hypervisor,
4626         "network_port": instance.network_port,
4627         "hv_instance": instance.hvparams,
4628         "hv_actual": cluster.FillHV(instance),
4629         "be_instance": instance.beparams,
4630         "be_actual": cluster.FillBE(instance),
4631         }
4632
4633       result[instance.name] = idict
4634
4635     return result
4636
4637
4638 class LUSetInstanceParams(LogicalUnit):
4639   """Modifies an instances's parameters.
4640
4641   """
4642   HPATH = "instance-modify"
4643   HTYPE = constants.HTYPE_INSTANCE
4644   _OP_REQP = ["instance_name", "hvparams"]
4645   REQ_BGL = False
4646
4647   def ExpandNames(self):
4648     self._ExpandAndLockInstance()
4649     self.needed_locks[locking.LEVEL_NODE] = []
4650     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4651
4652
4653   def DeclareLocks(self, level):
4654     if level == locking.LEVEL_NODE:
4655       self._LockInstancesNodes()
4656
4657   def BuildHooksEnv(self):
4658     """Build hooks env.
4659
4660     This runs on the master, primary and secondaries.
4661
4662     """
4663     args = dict()
4664     if constants.BE_MEMORY in self.be_new:
4665       args['memory'] = self.be_new[constants.BE_MEMORY]
4666     if constants.BE_VCPUS in self.be_new:
4667       args['vcpus'] = self.be_new[constants.BE_VCPUS]
4668     if self.do_ip or self.do_bridge or self.mac:
4669       if self.do_ip:
4670         ip = self.ip
4671       else:
4672         ip = self.instance.nics[0].ip
4673       if self.bridge:
4674         bridge = self.bridge
4675       else:
4676         bridge = self.instance.nics[0].bridge
4677       if self.mac:
4678         mac = self.mac
4679       else:
4680         mac = self.instance.nics[0].mac
4681       args['nics'] = [(ip, bridge, mac)]
4682     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4683     nl = [self.cfg.GetMasterNode(),
4684           self.instance.primary_node] + list(self.instance.secondary_nodes)
4685     return env, nl, nl
4686
4687   def CheckPrereq(self):
4688     """Check prerequisites.
4689
4690     This only checks the instance list against the existing names.
4691
4692     """
4693     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4694     # a separate CheckArguments function, if we implement one, so the operation
4695     # can be aborted without waiting for any lock, should it have an error...
4696     self.ip = getattr(self.op, "ip", None)
4697     self.mac = getattr(self.op, "mac", None)
4698     self.bridge = getattr(self.op, "bridge", None)
4699     self.kernel_path = getattr(self.op, "kernel_path", None)
4700     self.initrd_path = getattr(self.op, "initrd_path", None)
4701     self.force = getattr(self.op, "force", None)
4702     all_parms = [self.ip, self.bridge, self.mac]
4703     if (all_parms.count(None) == len(all_parms) and
4704         not self.op.hvparams and
4705         not self.op.beparams):
4706       raise errors.OpPrereqError("No changes submitted")
4707     for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4708       val = self.op.beparams.get(item, None)
4709       if val is not None:
4710         try:
4711           val = int(val)
4712         except ValueError, err:
4713           raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4714         self.op.beparams[item] = val
4715     if self.ip is not None:
4716       self.do_ip = True
4717       if self.ip.lower() == "none":
4718         self.ip = None
4719       else:
4720         if not utils.IsValidIP(self.ip):
4721           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4722     else:
4723       self.do_ip = False
4724     self.do_bridge = (self.bridge is not None)
4725     if self.mac is not None:
4726       if self.cfg.IsMacInUse(self.mac):
4727         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4728                                    self.mac)
4729       if not utils.IsValidMac(self.mac):
4730         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4731
4732     # checking the new params on the primary/secondary nodes
4733
4734     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4735     assert self.instance is not None, \
4736       "Cannot retrieve locked instance %s" % self.op.instance_name
4737     pnode = self.instance.primary_node
4738     nodelist = [pnode]
4739     nodelist.extend(instance.secondary_nodes)
4740
4741     # hvparams processing
4742     if self.op.hvparams:
4743       i_hvdict = copy.deepcopy(instance.hvparams)
4744       for key, val in self.op.hvparams.iteritems():
4745         if val is None:
4746           try:
4747             del i_hvdict[key]
4748           except KeyError:
4749             pass
4750         else:
4751           i_hvdict[key] = val
4752       cluster = self.cfg.GetClusterInfo()
4753       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4754                                 i_hvdict)
4755       # local check
4756       hypervisor.GetHypervisor(
4757         instance.hypervisor).CheckParameterSyntax(hv_new)
4758       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4759       self.hv_new = hv_new # the new actual values
4760       self.hv_inst = i_hvdict # the new dict (without defaults)
4761     else:
4762       self.hv_new = self.hv_inst = {}
4763
4764     # beparams processing
4765     if self.op.beparams:
4766       i_bedict = copy.deepcopy(instance.beparams)
4767       for key, val in self.op.beparams.iteritems():
4768         if val is None:
4769           try:
4770             del i_bedict[key]
4771           except KeyError:
4772             pass
4773         else:
4774           i_bedict[key] = val
4775       cluster = self.cfg.GetClusterInfo()
4776       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4777                                 i_bedict)
4778       self.be_new = be_new # the new actual values
4779       self.be_inst = i_bedict # the new dict (without defaults)
4780     else:
4781       self.hv_new = self.hv_inst = {}
4782
4783     self.warn = []
4784
4785     if constants.BE_MEMORY in self.op.beparams and not self.force:
4786       mem_check_list = [pnode]
4787       if be_new[constants.BE_AUTO_BALANCE]:
4788         # either we changed auto_balance to yes or it was from before
4789         mem_check_list.extend(instance.secondary_nodes)
4790       instance_info = self.rpc.call_instance_info(pnode, instance.name,
4791                                                   instance.hypervisor)
4792       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4793                                          instance.hypervisor)
4794
4795       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4796         # Assume the primary node is unreachable and go ahead
4797         self.warn.append("Can't get info from primary node %s" % pnode)
4798       else:
4799         if instance_info:
4800           current_mem = instance_info['memory']
4801         else:
4802           # Assume instance not running
4803           # (there is a slight race condition here, but it's not very probable,
4804           # and we have no other way to check)
4805           current_mem = 0
4806         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4807                     nodeinfo[pnode]['memory_free'])
4808         if miss_mem > 0:
4809           raise errors.OpPrereqError("This change will prevent the instance"
4810                                      " from starting, due to %d MB of memory"
4811                                      " missing on its primary node" % miss_mem)
4812
4813       if be_new[constants.BE_AUTO_BALANCE]:
4814         for node in instance.secondary_nodes:
4815           if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4816             self.warn.append("Can't get info from secondary node %s" % node)
4817           elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4818             self.warn.append("Not enough memory to failover instance to"
4819                              " secondary node %s" % node)
4820
4821     return
4822
4823   def Exec(self, feedback_fn):
4824     """Modifies an instance.
4825
4826     All parameters take effect only at the next restart of the instance.
4827     """
4828     # Process here the warnings from CheckPrereq, as we don't have a
4829     # feedback_fn there.
4830     for warn in self.warn:
4831       feedback_fn("WARNING: %s" % warn)
4832
4833     result = []
4834     instance = self.instance
4835     if self.do_ip:
4836       instance.nics[0].ip = self.ip
4837       result.append(("ip", self.ip))
4838     if self.bridge:
4839       instance.nics[0].bridge = self.bridge
4840       result.append(("bridge", self.bridge))
4841     if self.mac:
4842       instance.nics[0].mac = self.mac
4843       result.append(("mac", self.mac))
4844     if self.op.hvparams:
4845       instance.hvparams = self.hv_new
4846       for key, val in self.op.hvparams.iteritems():
4847         result.append(("hv/%s" % key, val))
4848     if self.op.beparams:
4849       instance.beparams = self.be_inst
4850       for key, val in self.op.beparams.iteritems():
4851         result.append(("be/%s" % key, val))
4852
4853     self.cfg.Update(instance)
4854
4855     return result
4856
4857
4858 class LUQueryExports(NoHooksLU):
4859   """Query the exports list
4860
4861   """
4862   _OP_REQP = ['nodes']
4863   REQ_BGL = False
4864
4865   def ExpandNames(self):
4866     self.needed_locks = {}
4867     self.share_locks[locking.LEVEL_NODE] = 1
4868     if not self.op.nodes:
4869       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4870     else:
4871       self.needed_locks[locking.LEVEL_NODE] = \
4872         _GetWantedNodes(self, self.op.nodes)
4873
4874   def CheckPrereq(self):
4875     """Check prerequisites.
4876
4877     """
4878     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4879
4880   def Exec(self, feedback_fn):
4881     """Compute the list of all the exported system images.
4882
4883     Returns:
4884       a dictionary with the structure node->(export-list)
4885       where export-list is a list of the instances exported on
4886       that node.
4887
4888     """
4889     return self.rpc.call_export_list(self.nodes)
4890
4891
4892 class LUExportInstance(LogicalUnit):
4893   """Export an instance to an image in the cluster.
4894
4895   """
4896   HPATH = "instance-export"
4897   HTYPE = constants.HTYPE_INSTANCE
4898   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4899   REQ_BGL = False
4900
4901   def ExpandNames(self):
4902     self._ExpandAndLockInstance()
4903     # FIXME: lock only instance primary and destination node
4904     #
4905     # Sad but true, for now we have do lock all nodes, as we don't know where
4906     # the previous export might be, and and in this LU we search for it and
4907     # remove it from its current node. In the future we could fix this by:
4908     #  - making a tasklet to search (share-lock all), then create the new one,
4909     #    then one to remove, after
4910     #  - removing the removal operation altoghether
4911     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4912
4913   def DeclareLocks(self, level):
4914     """Last minute lock declaration."""
4915     # All nodes are locked anyway, so nothing to do here.
4916
4917   def BuildHooksEnv(self):
4918     """Build hooks env.
4919
4920     This will run on the master, primary node and target node.
4921
4922     """
4923     env = {
4924       "EXPORT_NODE": self.op.target_node,
4925       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4926       }
4927     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4928     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4929           self.op.target_node]
4930     return env, nl, nl
4931
4932   def CheckPrereq(self):
4933     """Check prerequisites.
4934
4935     This checks that the instance and node names are valid.
4936
4937     """
4938     instance_name = self.op.instance_name
4939     self.instance = self.cfg.GetInstanceInfo(instance_name)
4940     assert self.instance is not None, \
4941           "Cannot retrieve locked instance %s" % self.op.instance_name
4942
4943     self.dst_node = self.cfg.GetNodeInfo(
4944       self.cfg.ExpandNodeName(self.op.target_node))
4945
4946     assert self.dst_node is not None, \
4947           "Cannot retrieve locked node %s" % self.op.target_node
4948
4949     # instance disk type verification
4950     for disk in self.instance.disks:
4951       if disk.dev_type == constants.LD_FILE:
4952         raise errors.OpPrereqError("Export not supported for instances with"
4953                                    " file-based disks")
4954
4955   def Exec(self, feedback_fn):
4956     """Export an instance to an image in the cluster.
4957
4958     """
4959     instance = self.instance
4960     dst_node = self.dst_node
4961     src_node = instance.primary_node
4962     if self.op.shutdown:
4963       # shutdown the instance, but not the disks
4964       if not self.rpc.call_instance_shutdown(src_node, instance):
4965         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4966                                  (instance.name, src_node))
4967
4968     vgname = self.cfg.GetVGName()
4969
4970     snap_disks = []
4971
4972     try:
4973       for disk in instance.disks:
4974         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4975         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4976
4977         if not new_dev_name:
4978           self.LogWarning("Could not snapshot block device %s on node %s",
4979                           disk.logical_id[1], src_node)
4980           snap_disks.append(False)
4981         else:
4982           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4983                                  logical_id=(vgname, new_dev_name),
4984                                  physical_id=(vgname, new_dev_name),
4985                                  iv_name=disk.iv_name)
4986           snap_disks.append(new_dev)
4987
4988     finally:
4989       if self.op.shutdown and instance.status == "up":
4990         if not self.rpc.call_instance_start(src_node, instance, None):
4991           _ShutdownInstanceDisks(self, instance)
4992           raise errors.OpExecError("Could not start instance")
4993
4994     # TODO: check for size
4995
4996     cluster_name = self.cfg.GetClusterName()
4997     for idx, dev in enumerate(snap_disks):
4998       if dev:
4999         if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5000                                              instance, cluster_name, idx):
5001           self.LogWarning("Could not export block device %s from node %s to"
5002                           " node %s", dev.logical_id[1], src_node,
5003                           dst_node.name)
5004         if not self.rpc.call_blockdev_remove(src_node, dev):
5005           self.LogWarning("Could not remove snapshot block device %s from node"
5006                           " %s", dev.logical_id[1], src_node)
5007
5008     if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5009       self.LogWarning("Could not finalize export for instance %s on node %s",
5010                       instance.name, dst_node.name)
5011
5012     nodelist = self.cfg.GetNodeList()
5013     nodelist.remove(dst_node.name)
5014
5015     # on one-node clusters nodelist will be empty after the removal
5016     # if we proceed the backup would be removed because OpQueryExports
5017     # substitutes an empty list with the full cluster node list.
5018     if nodelist:
5019       exportlist = self.rpc.call_export_list(nodelist)
5020       for node in exportlist:
5021         if instance.name in exportlist[node]:
5022           if not self.rpc.call_export_remove(node, instance.name):
5023             self.LogWarning("Could not remove older export for instance %s"
5024                             " on node %s", instance.name, node)
5025
5026
5027 class LURemoveExport(NoHooksLU):
5028   """Remove exports related to the named instance.
5029
5030   """
5031   _OP_REQP = ["instance_name"]
5032   REQ_BGL = False
5033
5034   def ExpandNames(self):
5035     self.needed_locks = {}
5036     # We need all nodes to be locked in order for RemoveExport to work, but we
5037     # don't need to lock the instance itself, as nothing will happen to it (and
5038     # we can remove exports also for a removed instance)
5039     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5040
5041   def CheckPrereq(self):
5042     """Check prerequisites.
5043     """
5044     pass
5045
5046   def Exec(self, feedback_fn):
5047     """Remove any export.
5048
5049     """
5050     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5051     # If the instance was not found we'll try with the name that was passed in.
5052     # This will only work if it was an FQDN, though.
5053     fqdn_warn = False
5054     if not instance_name:
5055       fqdn_warn = True
5056       instance_name = self.op.instance_name
5057
5058     exportlist = self.rpc.call_export_list(self.acquired_locks[
5059       locking.LEVEL_NODE])
5060     found = False
5061     for node in exportlist:
5062       if instance_name in exportlist[node]:
5063         found = True
5064         if not self.rpc.call_export_remove(node, instance_name):
5065           logging.error("Could not remove export for instance %s"
5066                         " on node %s", instance_name, node)
5067
5068     if fqdn_warn and not found:
5069       feedback_fn("Export not found. If trying to remove an export belonging"
5070                   " to a deleted instance please use its Fully Qualified"
5071                   " Domain Name.")
5072
5073
5074 class TagsLU(NoHooksLU):
5075   """Generic tags LU.
5076
5077   This is an abstract class which is the parent of all the other tags LUs.
5078
5079   """
5080
5081   def ExpandNames(self):
5082     self.needed_locks = {}
5083     if self.op.kind == constants.TAG_NODE:
5084       name = self.cfg.ExpandNodeName(self.op.name)
5085       if name is None:
5086         raise errors.OpPrereqError("Invalid node name (%s)" %
5087                                    (self.op.name,))
5088       self.op.name = name
5089       self.needed_locks[locking.LEVEL_NODE] = name
5090     elif self.op.kind == constants.TAG_INSTANCE:
5091       name = self.cfg.ExpandInstanceName(self.op.name)
5092       if name is None:
5093         raise errors.OpPrereqError("Invalid instance name (%s)" %
5094                                    (self.op.name,))
5095       self.op.name = name
5096       self.needed_locks[locking.LEVEL_INSTANCE] = name
5097
5098   def CheckPrereq(self):
5099     """Check prerequisites.
5100
5101     """
5102     if self.op.kind == constants.TAG_CLUSTER:
5103       self.target = self.cfg.GetClusterInfo()
5104     elif self.op.kind == constants.TAG_NODE:
5105       self.target = self.cfg.GetNodeInfo(self.op.name)
5106     elif self.op.kind == constants.TAG_INSTANCE:
5107       self.target = self.cfg.GetInstanceInfo(self.op.name)
5108     else:
5109       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5110                                  str(self.op.kind))
5111
5112
5113 class LUGetTags(TagsLU):
5114   """Returns the tags of a given object.
5115
5116   """
5117   _OP_REQP = ["kind", "name"]
5118   REQ_BGL = False
5119
5120   def Exec(self, feedback_fn):
5121     """Returns the tag list.
5122
5123     """
5124     return list(self.target.GetTags())
5125
5126
5127 class LUSearchTags(NoHooksLU):
5128   """Searches the tags for a given pattern.
5129
5130   """
5131   _OP_REQP = ["pattern"]
5132   REQ_BGL = False
5133
5134   def ExpandNames(self):
5135     self.needed_locks = {}
5136
5137   def CheckPrereq(self):
5138     """Check prerequisites.
5139
5140     This checks the pattern passed for validity by compiling it.
5141
5142     """
5143     try:
5144       self.re = re.compile(self.op.pattern)
5145     except re.error, err:
5146       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5147                                  (self.op.pattern, err))
5148
5149   def Exec(self, feedback_fn):
5150     """Returns the tag list.
5151
5152     """
5153     cfg = self.cfg
5154     tgts = [("/cluster", cfg.GetClusterInfo())]
5155     ilist = cfg.GetAllInstancesInfo().values()
5156     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5157     nlist = cfg.GetAllNodesInfo().values()
5158     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5159     results = []
5160     for path, target in tgts:
5161       for tag in target.GetTags():
5162         if self.re.search(tag):
5163           results.append((path, tag))
5164     return results
5165
5166
5167 class LUAddTags(TagsLU):
5168   """Sets a tag on a given object.
5169
5170   """
5171   _OP_REQP = ["kind", "name", "tags"]
5172   REQ_BGL = False
5173
5174   def CheckPrereq(self):
5175     """Check prerequisites.
5176
5177     This checks the type and length of the tag name and value.
5178
5179     """
5180     TagsLU.CheckPrereq(self)
5181     for tag in self.op.tags:
5182       objects.TaggableObject.ValidateTag(tag)
5183
5184   def Exec(self, feedback_fn):
5185     """Sets the tag.
5186
5187     """
5188     try:
5189       for tag in self.op.tags:
5190         self.target.AddTag(tag)
5191     except errors.TagError, err:
5192       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5193     try:
5194       self.cfg.Update(self.target)
5195     except errors.ConfigurationError:
5196       raise errors.OpRetryError("There has been a modification to the"
5197                                 " config file and the operation has been"
5198                                 " aborted. Please retry.")
5199
5200
5201 class LUDelTags(TagsLU):
5202   """Delete a list of tags from a given object.
5203
5204   """
5205   _OP_REQP = ["kind", "name", "tags"]
5206   REQ_BGL = False
5207
5208   def CheckPrereq(self):
5209     """Check prerequisites.
5210
5211     This checks that we have the given tag.
5212
5213     """
5214     TagsLU.CheckPrereq(self)
5215     for tag in self.op.tags:
5216       objects.TaggableObject.ValidateTag(tag)
5217     del_tags = frozenset(self.op.tags)
5218     cur_tags = self.target.GetTags()
5219     if not del_tags <= cur_tags:
5220       diff_tags = del_tags - cur_tags
5221       diff_names = ["'%s'" % tag for tag in diff_tags]
5222       diff_names.sort()
5223       raise errors.OpPrereqError("Tag(s) %s not found" %
5224                                  (",".join(diff_names)))
5225
5226   def Exec(self, feedback_fn):
5227     """Remove the tag from the object.
5228
5229     """
5230     for tag in self.op.tags:
5231       self.target.RemoveTag(tag)
5232     try:
5233       self.cfg.Update(self.target)
5234     except errors.ConfigurationError:
5235       raise errors.OpRetryError("There has been a modification to the"
5236                                 " config file and the operation has been"
5237                                 " aborted. Please retry.")
5238
5239
5240 class LUTestDelay(NoHooksLU):
5241   """Sleep for a specified amount of time.
5242
5243   This LU sleeps on the master and/or nodes for a specified amount of
5244   time.
5245
5246   """
5247   _OP_REQP = ["duration", "on_master", "on_nodes"]
5248   REQ_BGL = False
5249
5250   def ExpandNames(self):
5251     """Expand names and set required locks.
5252
5253     This expands the node list, if any.
5254
5255     """
5256     self.needed_locks = {}
5257     if self.op.on_nodes:
5258       # _GetWantedNodes can be used here, but is not always appropriate to use
5259       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5260       # more information.
5261       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5262       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5263
5264   def CheckPrereq(self):
5265     """Check prerequisites.
5266
5267     """
5268
5269   def Exec(self, feedback_fn):
5270     """Do the actual sleep.
5271
5272     """
5273     if self.op.on_master:
5274       if not utils.TestDelay(self.op.duration):
5275         raise errors.OpExecError("Error during master delay test")
5276     if self.op.on_nodes:
5277       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5278       if not result:
5279         raise errors.OpExecError("Complete failure from rpc call")
5280       for node, node_result in result.items():
5281         if not node_result:
5282           raise errors.OpExecError("Failure during rpc call to node %s,"
5283                                    " result: %s" % (node, node_result))
5284
5285
5286 class IAllocator(object):
5287   """IAllocator framework.
5288
5289   An IAllocator instance has three sets of attributes:
5290     - cfg that is needed to query the cluster
5291     - input data (all members of the _KEYS class attribute are required)
5292     - four buffer attributes (in|out_data|text), that represent the
5293       input (to the external script) in text and data structure format,
5294       and the output from it, again in two formats
5295     - the result variables from the script (success, info, nodes) for
5296       easy usage
5297
5298   """
5299   _ALLO_KEYS = [
5300     "mem_size", "disks", "disk_template",
5301     "os", "tags", "nics", "vcpus",
5302     ]
5303   _RELO_KEYS = [
5304     "relocate_from",
5305     ]
5306
5307   def __init__(self, lu, mode, name, **kwargs):
5308     self.lu = lu
5309     # init buffer variables
5310     self.in_text = self.out_text = self.in_data = self.out_data = None
5311     # init all input fields so that pylint is happy
5312     self.mode = mode
5313     self.name = name
5314     self.mem_size = self.disks = self.disk_template = None
5315     self.os = self.tags = self.nics = self.vcpus = None
5316     self.relocate_from = None
5317     # computed fields
5318     self.required_nodes = None
5319     # init result fields
5320     self.success = self.info = self.nodes = None
5321     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5322       keyset = self._ALLO_KEYS
5323     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5324       keyset = self._RELO_KEYS
5325     else:
5326       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5327                                    " IAllocator" % self.mode)
5328     for key in kwargs:
5329       if key not in keyset:
5330         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5331                                      " IAllocator" % key)
5332       setattr(self, key, kwargs[key])
5333     for key in keyset:
5334       if key not in kwargs:
5335         raise errors.ProgrammerError("Missing input parameter '%s' to"
5336                                      " IAllocator" % key)
5337     self._BuildInputData()
5338
5339   def _ComputeClusterData(self):
5340     """Compute the generic allocator input data.
5341
5342     This is the data that is independent of the actual operation.
5343
5344     """
5345     cfg = self.lu.cfg
5346     cluster_info = cfg.GetClusterInfo()
5347     # cluster data
5348     data = {
5349       "version": 1,
5350       "cluster_name": cfg.GetClusterName(),
5351       "cluster_tags": list(cluster_info.GetTags()),
5352       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5353       # we don't have job IDs
5354       }
5355
5356     i_list = []
5357     cluster = self.cfg.GetClusterInfo()
5358     for iname in cfg.GetInstanceList():
5359       i_obj = cfg.GetInstanceInfo(iname)
5360       i_list.append((i_obj, cluster.FillBE(i_obj)))
5361
5362     # node data
5363     node_results = {}
5364     node_list = cfg.GetNodeList()
5365     # FIXME: here we have only one hypervisor information, but
5366     # instance can belong to different hypervisors
5367     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5368                                            cfg.GetHypervisorType())
5369     for nname in node_list:
5370       ninfo = cfg.GetNodeInfo(nname)
5371       if nname not in node_data or not isinstance(node_data[nname], dict):
5372         raise errors.OpExecError("Can't get data for node %s" % nname)
5373       remote_info = node_data[nname]
5374       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5375                    'vg_size', 'vg_free', 'cpu_total']:
5376         if attr not in remote_info:
5377           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5378                                    (nname, attr))
5379         try:
5380           remote_info[attr] = int(remote_info[attr])
5381         except ValueError, err:
5382           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5383                                    " %s" % (nname, attr, str(err)))
5384       # compute memory used by primary instances
5385       i_p_mem = i_p_up_mem = 0
5386       for iinfo, beinfo in i_list:
5387         if iinfo.primary_node == nname:
5388           i_p_mem += beinfo[constants.BE_MEMORY]
5389           if iinfo.status == "up":
5390             i_p_up_mem += beinfo[constants.BE_MEMORY]
5391
5392       # compute memory used by instances
5393       pnr = {
5394         "tags": list(ninfo.GetTags()),
5395         "total_memory": remote_info['memory_total'],
5396         "reserved_memory": remote_info['memory_dom0'],
5397         "free_memory": remote_info['memory_free'],
5398         "i_pri_memory": i_p_mem,
5399         "i_pri_up_memory": i_p_up_mem,
5400         "total_disk": remote_info['vg_size'],
5401         "free_disk": remote_info['vg_free'],
5402         "primary_ip": ninfo.primary_ip,
5403         "secondary_ip": ninfo.secondary_ip,
5404         "total_cpus": remote_info['cpu_total'],
5405         }
5406       node_results[nname] = pnr
5407     data["nodes"] = node_results
5408
5409     # instance data
5410     instance_data = {}
5411     for iinfo, beinfo in i_list:
5412       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5413                   for n in iinfo.nics]
5414       pir = {
5415         "tags": list(iinfo.GetTags()),
5416         "should_run": iinfo.status == "up",
5417         "vcpus": beinfo[constants.BE_VCPUS],
5418         "memory": beinfo[constants.BE_MEMORY],
5419         "os": iinfo.os,
5420         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5421         "nics": nic_data,
5422         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5423         "disk_template": iinfo.disk_template,
5424         "hypervisor": iinfo.hypervisor,
5425         }
5426       instance_data[iinfo.name] = pir
5427
5428     data["instances"] = instance_data
5429
5430     self.in_data = data
5431
5432   def _AddNewInstance(self):
5433     """Add new instance data to allocator structure.
5434
5435     This in combination with _AllocatorGetClusterData will create the
5436     correct structure needed as input for the allocator.
5437
5438     The checks for the completeness of the opcode must have already been
5439     done.
5440
5441     """
5442     data = self.in_data
5443     if len(self.disks) != 2:
5444       raise errors.OpExecError("Only two-disk configurations supported")
5445
5446     disk_space = _ComputeDiskSize(self.disk_template,
5447                                   self.disks[0]["size"], self.disks[1]["size"])
5448
5449     if self.disk_template in constants.DTS_NET_MIRROR:
5450       self.required_nodes = 2
5451     else:
5452       self.required_nodes = 1
5453     request = {
5454       "type": "allocate",
5455       "name": self.name,
5456       "disk_template": self.disk_template,
5457       "tags": self.tags,
5458       "os": self.os,
5459       "vcpus": self.vcpus,
5460       "memory": self.mem_size,
5461       "disks": self.disks,
5462       "disk_space_total": disk_space,
5463       "nics": self.nics,
5464       "required_nodes": self.required_nodes,
5465       }
5466     data["request"] = request
5467
5468   def _AddRelocateInstance(self):
5469     """Add relocate instance data to allocator structure.
5470
5471     This in combination with _IAllocatorGetClusterData will create the
5472     correct structure needed as input for the allocator.
5473
5474     The checks for the completeness of the opcode must have already been
5475     done.
5476
5477     """
5478     instance = self.lu.cfg.GetInstanceInfo(self.name)
5479     if instance is None:
5480       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5481                                    " IAllocator" % self.name)
5482
5483     if instance.disk_template not in constants.DTS_NET_MIRROR:
5484       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5485
5486     if len(instance.secondary_nodes) != 1:
5487       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5488
5489     self.required_nodes = 1
5490
5491     disk_space = _ComputeDiskSize(instance.disk_template,
5492                                   instance.disks[0].size,
5493                                   instance.disks[1].size)
5494
5495     request = {
5496       "type": "relocate",
5497       "name": self.name,
5498       "disk_space_total": disk_space,
5499       "required_nodes": self.required_nodes,
5500       "relocate_from": self.relocate_from,
5501       }
5502     self.in_data["request"] = request
5503
5504   def _BuildInputData(self):
5505     """Build input data structures.
5506
5507     """
5508     self._ComputeClusterData()
5509
5510     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5511       self._AddNewInstance()
5512     else:
5513       self._AddRelocateInstance()
5514
5515     self.in_text = serializer.Dump(self.in_data)
5516
5517   def Run(self, name, validate=True, call_fn=None):
5518     """Run an instance allocator and return the results.
5519
5520     """
5521     if call_fn is None:
5522       call_fn = self.lu.rpc.call_iallocator_runner
5523     data = self.in_text
5524
5525     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5526
5527     if not isinstance(result, (list, tuple)) or len(result) != 4:
5528       raise errors.OpExecError("Invalid result from master iallocator runner")
5529
5530     rcode, stdout, stderr, fail = result
5531
5532     if rcode == constants.IARUN_NOTFOUND:
5533       raise errors.OpExecError("Can't find allocator '%s'" % name)
5534     elif rcode == constants.IARUN_FAILURE:
5535       raise errors.OpExecError("Instance allocator call failed: %s,"
5536                                " output: %s" % (fail, stdout+stderr))
5537     self.out_text = stdout
5538     if validate:
5539       self._ValidateResult()
5540
5541   def _ValidateResult(self):
5542     """Process the allocator results.
5543
5544     This will process and if successful save the result in
5545     self.out_data and the other parameters.
5546
5547     """
5548     try:
5549       rdict = serializer.Load(self.out_text)
5550     except Exception, err:
5551       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5552
5553     if not isinstance(rdict, dict):
5554       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5555
5556     for key in "success", "info", "nodes":
5557       if key not in rdict:
5558         raise errors.OpExecError("Can't parse iallocator results:"
5559                                  " missing key '%s'" % key)
5560       setattr(self, key, rdict[key])
5561
5562     if not isinstance(rdict["nodes"], list):
5563       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5564                                " is not a list")
5565     self.out_data = rdict
5566
5567
5568 class LUTestAllocator(NoHooksLU):
5569   """Run allocator tests.
5570
5571   This LU runs the allocator tests
5572
5573   """
5574   _OP_REQP = ["direction", "mode", "name"]
5575
5576   def CheckPrereq(self):
5577     """Check prerequisites.
5578
5579     This checks the opcode parameters depending on the director and mode test.
5580
5581     """
5582     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5583       for attr in ["name", "mem_size", "disks", "disk_template",
5584                    "os", "tags", "nics", "vcpus"]:
5585         if not hasattr(self.op, attr):
5586           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5587                                      attr)
5588       iname = self.cfg.ExpandInstanceName(self.op.name)
5589       if iname is not None:
5590         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5591                                    iname)
5592       if not isinstance(self.op.nics, list):
5593         raise errors.OpPrereqError("Invalid parameter 'nics'")
5594       for row in self.op.nics:
5595         if (not isinstance(row, dict) or
5596             "mac" not in row or
5597             "ip" not in row or
5598             "bridge" not in row):
5599           raise errors.OpPrereqError("Invalid contents of the"
5600                                      " 'nics' parameter")
5601       if not isinstance(self.op.disks, list):
5602         raise errors.OpPrereqError("Invalid parameter 'disks'")
5603       if len(self.op.disks) != 2:
5604         raise errors.OpPrereqError("Only two-disk configurations supported")
5605       for row in self.op.disks:
5606         if (not isinstance(row, dict) or
5607             "size" not in row or
5608             not isinstance(row["size"], int) or
5609             "mode" not in row or
5610             row["mode"] not in ['r', 'w']):
5611           raise errors.OpPrereqError("Invalid contents of the"
5612                                      " 'disks' parameter")
5613     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5614       if not hasattr(self.op, "name"):
5615         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5616       fname = self.cfg.ExpandInstanceName(self.op.name)
5617       if fname is None:
5618         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5619                                    self.op.name)
5620       self.op.name = fname
5621       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5622     else:
5623       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5624                                  self.op.mode)
5625
5626     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5627       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5628         raise errors.OpPrereqError("Missing allocator name")
5629     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5630       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5631                                  self.op.direction)
5632
5633   def Exec(self, feedback_fn):
5634     """Run the allocator test.
5635
5636     """
5637     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5638       ial = IAllocator(self,
5639                        mode=self.op.mode,
5640                        name=self.op.name,
5641                        mem_size=self.op.mem_size,
5642                        disks=self.op.disks,
5643                        disk_template=self.op.disk_template,
5644                        os=self.op.os,
5645                        tags=self.op.tags,
5646                        nics=self.op.nics,
5647                        vcpus=self.op.vcpus,
5648                        )
5649     else:
5650       ial = IAllocator(self,
5651                        mode=self.op.mode,
5652                        name=self.op.name,
5653                        relocate_from=list(self.relocate_from),
5654                        )
5655
5656     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5657       result = ial.in_text
5658     else:
5659       ial.Run(self.op.allocator, validate=False)
5660       result = ial.out_text
5661     return result