RAPI: Export beparams as dict. The patch also enables LUQueryInstances to accept...
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement ExpandNames
52     - implement CheckPrereq
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements:
57         REQ_MASTER: the LU needs to run on the master node
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_MASTER = True
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99
100     if not self.cfg.IsCluster():
101       raise errors.OpPrereqError("Cluster not initialized yet,"
102                                  " use 'gnt-cluster init' first.")
103     if self.REQ_MASTER:
104       master = self.cfg.GetMasterNode()
105       if master != utils.HostInfo().name:
106         raise errors.OpPrereqError("Commands must be run on the master"
107                                    " node %s" % master)
108
109   def __GetSSH(self):
110     """Returns the SshRunner object
111
112     """
113     if not self.__ssh:
114       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115     return self.__ssh
116
117   ssh = property(fget=__GetSSH)
118
119   def ExpandNames(self):
120     """Expand names for this LU.
121
122     This method is called before starting to execute the opcode, and it should
123     update all the parameters of the opcode to their canonical form (e.g. a
124     short node name must be fully expanded after this method has successfully
125     completed). This way locking, hooks, logging, ecc. can work correctly.
126
127     LUs which implement this method must also populate the self.needed_locks
128     member, as a dict with lock levels as keys, and a list of needed lock names
129     as values. Rules:
130       - Use an empty dict if you don't need any lock
131       - If you don't need any lock at a particular level omit that level
132       - Don't put anything for the BGL level
133       - If you want all locks at a level use locking.ALL_SET as a value
134
135     If you need to share locks (rather than acquire them exclusively) at one
136     level you can modify self.share_locks, setting a true value (usually 1) for
137     that level. By default locks are not shared.
138
139     Examples:
140     # Acquire all nodes and one instance
141     self.needed_locks = {
142       locking.LEVEL_NODE: locking.ALL_SET,
143       locking.LEVEL_INSTANCE: ['instance1.example.tld'],
144     }
145     # Acquire just two nodes
146     self.needed_locks = {
147       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148     }
149     # Acquire no locks
150     self.needed_locks = {} # No, you can't leave it to the default value None
151
152     """
153     # The implementation of this method is mandatory only if the new LU is
154     # concurrent, so that old LUs don't need to be changed all at the same
155     # time.
156     if self.REQ_BGL:
157       self.needed_locks = {} # Exclusive LUs don't need locks.
158     else:
159       raise NotImplementedError
160
161   def DeclareLocks(self, level):
162     """Declare LU locking needs for a level
163
164     While most LUs can just declare their locking needs at ExpandNames time,
165     sometimes there's the need to calculate some locks after having acquired
166     the ones before. This function is called just before acquiring locks at a
167     particular level, but after acquiring the ones at lower levels, and permits
168     such calculations. It can be used to modify self.needed_locks, and by
169     default it does nothing.
170
171     This function is only called if you have something already set in
172     self.needed_locks for the level.
173
174     @param level: Locking level which is going to be locked
175     @type level: member of ganeti.locking.LEVELS
176
177     """
178
179   def CheckPrereq(self):
180     """Check prerequisites for this LU.
181
182     This method should check that the prerequisites for the execution
183     of this LU are fulfilled. It can do internode communication, but
184     it should be idempotent - no cluster or system changes are
185     allowed.
186
187     The method should raise errors.OpPrereqError in case something is
188     not fulfilled. Its return value is ignored.
189
190     This method should also update all the parameters of the opcode to
191     their canonical form if it hasn't been done by ExpandNames before.
192
193     """
194     raise NotImplementedError
195
196   def Exec(self, feedback_fn):
197     """Execute the LU.
198
199     This method should implement the actual work. It should raise
200     errors.OpExecError for failures that are somewhat dealt with in
201     code, or expected.
202
203     """
204     raise NotImplementedError
205
206   def BuildHooksEnv(self):
207     """Build hooks environment for this LU.
208
209     This method should return a three-node tuple consisting of: a dict
210     containing the environment that will be used for running the
211     specific hook for this LU, a list of node names on which the hook
212     should run before the execution, and a list of node names on which
213     the hook should run after the execution.
214
215     The keys of the dict must not have 'GANETI_' prefixed as this will
216     be handled in the hooks runner. Also note additional keys will be
217     added by the hooks runner. If the LU doesn't define any
218     environment, an empty dict (and not None) should be returned.
219
220     No nodes should be returned as an empty list (and not None).
221
222     Note that if the HPATH for a LU class is None, this function will
223     not be called.
224
225     """
226     raise NotImplementedError
227
228   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
229     """Notify the LU about the results of its hooks.
230
231     This method is called every time a hooks phase is executed, and notifies
232     the Logical Unit about the hooks' result. The LU can then use it to alter
233     its result based on the hooks.  By default the method does nothing and the
234     previous result is passed back unchanged but any LU can define it if it
235     wants to use the local cluster hook-scripts somehow.
236
237     Args:
238       phase: the hooks phase that has just been run
239       hooks_results: the results of the multi-node hooks rpc call
240       feedback_fn: function to send feedback back to the caller
241       lu_result: the previous result this LU had, or None in the PRE phase.
242
243     """
244     return lu_result
245
246   def _ExpandAndLockInstance(self):
247     """Helper function to expand and lock an instance.
248
249     Many LUs that work on an instance take its name in self.op.instance_name
250     and need to expand it and then declare the expanded name for locking. This
251     function does it, and then updates self.op.instance_name to the expanded
252     name. It also initializes needed_locks as a dict, if this hasn't been done
253     before.
254
255     """
256     if self.needed_locks is None:
257       self.needed_locks = {}
258     else:
259       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
260         "_ExpandAndLockInstance called with instance-level locks set"
261     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
262     if expanded_name is None:
263       raise errors.OpPrereqError("Instance '%s' not known" %
264                                   self.op.instance_name)
265     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
266     self.op.instance_name = expanded_name
267
268   def _LockInstancesNodes(self, primary_only=False):
269     """Helper function to declare instances' nodes for locking.
270
271     This function should be called after locking one or more instances to lock
272     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
273     with all primary or secondary nodes for instances already locked and
274     present in self.needed_locks[locking.LEVEL_INSTANCE].
275
276     It should be called from DeclareLocks, and for safety only works if
277     self.recalculate_locks[locking.LEVEL_NODE] is set.
278
279     In the future it may grow parameters to just lock some instance's nodes, or
280     to just lock primaries or secondary nodes, if needed.
281
282     If should be called in DeclareLocks in a way similar to:
283
284     if level == locking.LEVEL_NODE:
285       self._LockInstancesNodes()
286
287     @type primary_only: boolean
288     @param primary_only: only lock primary nodes of locked instances
289
290     """
291     assert locking.LEVEL_NODE in self.recalculate_locks, \
292       "_LockInstancesNodes helper function called with no nodes to recalculate"
293
294     # TODO: check if we're really been called with the instance locks held
295
296     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
297     # future we might want to have different behaviors depending on the value
298     # of self.recalculate_locks[locking.LEVEL_NODE]
299     wanted_nodes = []
300     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
301       instance = self.context.cfg.GetInstanceInfo(instance_name)
302       wanted_nodes.append(instance.primary_node)
303       if not primary_only:
304         wanted_nodes.extend(instance.secondary_nodes)
305
306     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
307       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
308     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
309       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
310
311     del self.recalculate_locks[locking.LEVEL_NODE]
312
313
314 class NoHooksLU(LogicalUnit):
315   """Simple LU which runs no hooks.
316
317   This LU is intended as a parent for other LogicalUnits which will
318   run no hooks, in order to reduce duplicate code.
319
320   """
321   HPATH = None
322   HTYPE = None
323
324
325 def _GetWantedNodes(lu, nodes):
326   """Returns list of checked and expanded node names.
327
328   Args:
329     nodes: List of nodes (strings) or None for all
330
331   """
332   if not isinstance(nodes, list):
333     raise errors.OpPrereqError("Invalid argument type 'nodes'")
334
335   if not nodes:
336     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
337       " non-empty list of nodes whose name is to be expanded.")
338
339   wanted = []
340   for name in nodes:
341     node = lu.cfg.ExpandNodeName(name)
342     if node is None:
343       raise errors.OpPrereqError("No such node name '%s'" % name)
344     wanted.append(node)
345
346   return utils.NiceSort(wanted)
347
348
349 def _GetWantedInstances(lu, instances):
350   """Returns list of checked and expanded instance names.
351
352   Args:
353     instances: List of instances (strings) or None for all
354
355   """
356   if not isinstance(instances, list):
357     raise errors.OpPrereqError("Invalid argument type 'instances'")
358
359   if instances:
360     wanted = []
361
362     for name in instances:
363       instance = lu.cfg.ExpandInstanceName(name)
364       if instance is None:
365         raise errors.OpPrereqError("No such instance name '%s'" % name)
366       wanted.append(instance)
367
368   else:
369     wanted = lu.cfg.GetInstanceList()
370   return utils.NiceSort(wanted)
371
372
373 def _CheckOutputFields(static, dynamic, selected):
374   """Checks whether all selected fields are valid.
375
376   Args:
377     static: Static fields
378     dynamic: Dynamic fields
379
380   """
381   static_fields = frozenset(static)
382   dynamic_fields = frozenset(dynamic)
383
384   all_fields = static_fields | dynamic_fields
385
386   if not all_fields.issuperset(selected):
387     raise errors.OpPrereqError("Unknown output fields selected: %s"
388                                % ",".join(frozenset(selected).
389                                           difference(all_fields)))
390
391
392 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
393                           memory, vcpus, nics):
394   """Builds instance related env variables for hooks from single variables.
395
396   Args:
397     secondary_nodes: List of secondary nodes as strings
398   """
399   env = {
400     "OP_TARGET": name,
401     "INSTANCE_NAME": name,
402     "INSTANCE_PRIMARY": primary_node,
403     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
404     "INSTANCE_OS_TYPE": os_type,
405     "INSTANCE_STATUS": status,
406     "INSTANCE_MEMORY": memory,
407     "INSTANCE_VCPUS": vcpus,
408   }
409
410   if nics:
411     nic_count = len(nics)
412     for idx, (ip, bridge, mac) in enumerate(nics):
413       if ip is None:
414         ip = ""
415       env["INSTANCE_NIC%d_IP" % idx] = ip
416       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
417       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418   else:
419     nic_count = 0
420
421   env["INSTANCE_NIC_COUNT"] = nic_count
422
423   return env
424
425
426 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
427   """Builds instance related env variables for hooks from an object.
428
429   Args:
430     instance: objects.Instance object of instance
431     override: dict of values to override
432   """
433   bep = lu.cfg.GetClusterInfo().FillBE(instance)
434   args = {
435     'name': instance.name,
436     'primary_node': instance.primary_node,
437     'secondary_nodes': instance.secondary_nodes,
438     'os_type': instance.os,
439     'status': instance.os,
440     'memory': bep[constants.BE_MEMORY],
441     'vcpus': bep[constants.BE_VCPUS],
442     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
443   }
444   if override:
445     args.update(override)
446   return _BuildInstanceHookEnv(**args)
447
448
449 def _CheckInstanceBridgesExist(lu, instance):
450   """Check that the brigdes needed by an instance exist.
451
452   """
453   # check bridges existance
454   brlist = [nic.bridge for nic in instance.nics]
455   if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
456     raise errors.OpPrereqError("one or more target bridges %s does not"
457                                " exist on destination node '%s'" %
458                                (brlist, instance.primary_node))
459
460
461 class LUDestroyCluster(NoHooksLU):
462   """Logical unit for destroying the cluster.
463
464   """
465   _OP_REQP = []
466
467   def CheckPrereq(self):
468     """Check prerequisites.
469
470     This checks whether the cluster is empty.
471
472     Any errors are signalled by raising errors.OpPrereqError.
473
474     """
475     master = self.cfg.GetMasterNode()
476
477     nodelist = self.cfg.GetNodeList()
478     if len(nodelist) != 1 or nodelist[0] != master:
479       raise errors.OpPrereqError("There are still %d node(s) in"
480                                  " this cluster." % (len(nodelist) - 1))
481     instancelist = self.cfg.GetInstanceList()
482     if instancelist:
483       raise errors.OpPrereqError("There are still %d instance(s) in"
484                                  " this cluster." % len(instancelist))
485
486   def Exec(self, feedback_fn):
487     """Destroys the cluster.
488
489     """
490     master = self.cfg.GetMasterNode()
491     if not self.rpc.call_node_stop_master(master, False):
492       raise errors.OpExecError("Could not disable the master role")
493     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
494     utils.CreateBackup(priv_key)
495     utils.CreateBackup(pub_key)
496     return master
497
498
499 class LUVerifyCluster(LogicalUnit):
500   """Verifies the cluster status.
501
502   """
503   HPATH = "cluster-verify"
504   HTYPE = constants.HTYPE_CLUSTER
505   _OP_REQP = ["skip_checks"]
506   REQ_BGL = False
507
508   def ExpandNames(self):
509     self.needed_locks = {
510       locking.LEVEL_NODE: locking.ALL_SET,
511       locking.LEVEL_INSTANCE: locking.ALL_SET,
512     }
513     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
514
515   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
516                   remote_version, feedback_fn):
517     """Run multiple tests against a node.
518
519     Test list:
520       - compares ganeti version
521       - checks vg existance and size > 20G
522       - checks config file checksum
523       - checks ssh to other nodes
524
525     Args:
526       node: name of the node to check
527       file_list: required list of files
528       local_cksum: dictionary of local files and their checksums
529
530     """
531     # compares ganeti version
532     local_version = constants.PROTOCOL_VERSION
533     if not remote_version:
534       feedback_fn("  - ERROR: connection to %s failed" % (node))
535       return True
536
537     if local_version != remote_version:
538       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
539                       (local_version, node, remote_version))
540       return True
541
542     # checks vg existance and size > 20G
543
544     bad = False
545     if not vglist:
546       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
547                       (node,))
548       bad = True
549     else:
550       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
551                                             constants.MIN_VG_SIZE)
552       if vgstatus:
553         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
554         bad = True
555
556     if not node_result:
557       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
558       return True
559
560     # checks config file checksum
561     # checks ssh to any
562
563     if 'filelist' not in node_result:
564       bad = True
565       feedback_fn("  - ERROR: node hasn't returned file checksum data")
566     else:
567       remote_cksum = node_result['filelist']
568       for file_name in file_list:
569         if file_name not in remote_cksum:
570           bad = True
571           feedback_fn("  - ERROR: file '%s' missing" % file_name)
572         elif remote_cksum[file_name] != local_cksum[file_name]:
573           bad = True
574           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
575
576     if 'nodelist' not in node_result:
577       bad = True
578       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
579     else:
580       if node_result['nodelist']:
581         bad = True
582         for node in node_result['nodelist']:
583           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
584                           (node, node_result['nodelist'][node]))
585     if 'node-net-test' not in node_result:
586       bad = True
587       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
588     else:
589       if node_result['node-net-test']:
590         bad = True
591         nlist = utils.NiceSort(node_result['node-net-test'].keys())
592         for node in nlist:
593           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
594                           (node, node_result['node-net-test'][node]))
595
596     hyp_result = node_result.get('hypervisor', None)
597     if isinstance(hyp_result, dict):
598       for hv_name, hv_result in hyp_result.iteritems():
599         if hv_result is not None:
600           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
601                       (hv_name, hv_result))
602     return bad
603
604   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
605                       node_instance, feedback_fn):
606     """Verify an instance.
607
608     This function checks to see if the required block devices are
609     available on the instance's node.
610
611     """
612     bad = False
613
614     node_current = instanceconfig.primary_node
615
616     node_vol_should = {}
617     instanceconfig.MapLVsByNode(node_vol_should)
618
619     for node in node_vol_should:
620       for volume in node_vol_should[node]:
621         if node not in node_vol_is or volume not in node_vol_is[node]:
622           feedback_fn("  - ERROR: volume %s missing on node %s" %
623                           (volume, node))
624           bad = True
625
626     if not instanceconfig.status == 'down':
627       if (node_current not in node_instance or
628           not instance in node_instance[node_current]):
629         feedback_fn("  - ERROR: instance %s not running on node %s" %
630                         (instance, node_current))
631         bad = True
632
633     for node in node_instance:
634       if (not node == node_current):
635         if instance in node_instance[node]:
636           feedback_fn("  - ERROR: instance %s should not run on node %s" %
637                           (instance, node))
638           bad = True
639
640     return bad
641
642   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
643     """Verify if there are any unknown volumes in the cluster.
644
645     The .os, .swap and backup volumes are ignored. All other volumes are
646     reported as unknown.
647
648     """
649     bad = False
650
651     for node in node_vol_is:
652       for volume in node_vol_is[node]:
653         if node not in node_vol_should or volume not in node_vol_should[node]:
654           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
655                       (volume, node))
656           bad = True
657     return bad
658
659   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
660     """Verify the list of running instances.
661
662     This checks what instances are running but unknown to the cluster.
663
664     """
665     bad = False
666     for node in node_instance:
667       for runninginstance in node_instance[node]:
668         if runninginstance not in instancelist:
669           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
670                           (runninginstance, node))
671           bad = True
672     return bad
673
674   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
675     """Verify N+1 Memory Resilience.
676
677     Check that if one single node dies we can still start all the instances it
678     was primary for.
679
680     """
681     bad = False
682
683     for node, nodeinfo in node_info.iteritems():
684       # This code checks that every node which is now listed as secondary has
685       # enough memory to host all instances it is supposed to should a single
686       # other node in the cluster fail.
687       # FIXME: not ready for failover to an arbitrary node
688       # FIXME: does not support file-backed instances
689       # WARNING: we currently take into account down instances as well as up
690       # ones, considering that even if they're down someone might want to start
691       # them even in the event of a node failure.
692       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
693         needed_mem = 0
694         for instance in instances:
695           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
696           if bep[constants.BE_AUTO_BALANCE]:
697             needed_mem += bep[constants.BE_MEMORY]
698         if nodeinfo['mfree'] < needed_mem:
699           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
700                       " failovers should node %s fail" % (node, prinode))
701           bad = True
702     return bad
703
704   def CheckPrereq(self):
705     """Check prerequisites.
706
707     Transform the list of checks we're going to skip into a set and check that
708     all its members are valid.
709
710     """
711     self.skip_set = frozenset(self.op.skip_checks)
712     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
713       raise errors.OpPrereqError("Invalid checks to be skipped specified")
714
715   def BuildHooksEnv(self):
716     """Build hooks env.
717
718     Cluster-Verify hooks just rone in the post phase and their failure makes
719     the output be logged in the verify output and the verification to fail.
720
721     """
722     all_nodes = self.cfg.GetNodeList()
723     # TODO: populate the environment with useful information for verify hooks
724     env = {}
725     return env, [], all_nodes
726
727   def Exec(self, feedback_fn):
728     """Verify integrity of cluster, performing various test on nodes.
729
730     """
731     bad = False
732     feedback_fn("* Verifying global settings")
733     for msg in self.cfg.VerifyConfig():
734       feedback_fn("  - ERROR: %s" % msg)
735
736     vg_name = self.cfg.GetVGName()
737     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
738     nodelist = utils.NiceSort(self.cfg.GetNodeList())
739     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
740     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
741     i_non_redundant = [] # Non redundant instances
742     i_non_a_balanced = [] # Non auto-balanced instances
743     node_volume = {}
744     node_instance = {}
745     node_info = {}
746     instance_cfg = {}
747
748     # FIXME: verify OS list
749     # do local checksums
750     file_names = []
751     file_names.append(constants.SSL_CERT_FILE)
752     file_names.append(constants.CLUSTER_CONF_FILE)
753     local_checksums = utils.FingerprintFiles(file_names)
754
755     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
756     all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
757     all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
758     all_vglist = self.rpc.call_vg_list(nodelist)
759     node_verify_param = {
760       'filelist': file_names,
761       'nodelist': nodelist,
762       'hypervisor': hypervisors,
763       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
764                         for node in nodeinfo]
765       }
766     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
767                                            self.cfg.GetClusterName())
768     all_rversion = self.rpc.call_version(nodelist)
769     all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
770                                         self.cfg.GetHypervisorType())
771
772     cluster = self.cfg.GetClusterInfo()
773     for node in nodelist:
774       feedback_fn("* Verifying node %s" % node)
775       result = self._VerifyNode(node, file_names, local_checksums,
776                                 all_vglist[node], all_nvinfo[node],
777                                 all_rversion[node], feedback_fn)
778       bad = bad or result
779
780       # node_volume
781       volumeinfo = all_volumeinfo[node]
782
783       if isinstance(volumeinfo, basestring):
784         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
785                     (node, volumeinfo[-400:].encode('string_escape')))
786         bad = True
787         node_volume[node] = {}
788       elif not isinstance(volumeinfo, dict):
789         feedback_fn("  - ERROR: connection to %s failed" % (node,))
790         bad = True
791         continue
792       else:
793         node_volume[node] = volumeinfo
794
795       # node_instance
796       nodeinstance = all_instanceinfo[node]
797       if type(nodeinstance) != list:
798         feedback_fn("  - ERROR: connection to %s failed" % (node,))
799         bad = True
800         continue
801
802       node_instance[node] = nodeinstance
803
804       # node_info
805       nodeinfo = all_ninfo[node]
806       if not isinstance(nodeinfo, dict):
807         feedback_fn("  - ERROR: connection to %s failed" % (node,))
808         bad = True
809         continue
810
811       try:
812         node_info[node] = {
813           "mfree": int(nodeinfo['memory_free']),
814           "dfree": int(nodeinfo['vg_free']),
815           "pinst": [],
816           "sinst": [],
817           # dictionary holding all instances this node is secondary for,
818           # grouped by their primary node. Each key is a cluster node, and each
819           # value is a list of instances which have the key as primary and the
820           # current node as secondary.  this is handy to calculate N+1 memory
821           # availability if you can only failover from a primary to its
822           # secondary.
823           "sinst-by-pnode": {},
824         }
825       except ValueError:
826         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
827         bad = True
828         continue
829
830     node_vol_should = {}
831
832     for instance in instancelist:
833       feedback_fn("* Verifying instance %s" % instance)
834       inst_config = self.cfg.GetInstanceInfo(instance)
835       result =  self._VerifyInstance(instance, inst_config, node_volume,
836                                      node_instance, feedback_fn)
837       bad = bad or result
838
839       inst_config.MapLVsByNode(node_vol_should)
840
841       instance_cfg[instance] = inst_config
842
843       pnode = inst_config.primary_node
844       if pnode in node_info:
845         node_info[pnode]['pinst'].append(instance)
846       else:
847         feedback_fn("  - ERROR: instance %s, connection to primary node"
848                     " %s failed" % (instance, pnode))
849         bad = True
850
851       # If the instance is non-redundant we cannot survive losing its primary
852       # node, so we are not N+1 compliant. On the other hand we have no disk
853       # templates with more than one secondary so that situation is not well
854       # supported either.
855       # FIXME: does not support file-backed instances
856       if len(inst_config.secondary_nodes) == 0:
857         i_non_redundant.append(instance)
858       elif len(inst_config.secondary_nodes) > 1:
859         feedback_fn("  - WARNING: multiple secondaries for instance %s"
860                     % instance)
861
862       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
863         i_non_a_balanced.append(instance)
864
865       for snode in inst_config.secondary_nodes:
866         if snode in node_info:
867           node_info[snode]['sinst'].append(instance)
868           if pnode not in node_info[snode]['sinst-by-pnode']:
869             node_info[snode]['sinst-by-pnode'][pnode] = []
870           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
871         else:
872           feedback_fn("  - ERROR: instance %s, connection to secondary node"
873                       " %s failed" % (instance, snode))
874
875     feedback_fn("* Verifying orphan volumes")
876     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
877                                        feedback_fn)
878     bad = bad or result
879
880     feedback_fn("* Verifying remaining instances")
881     result = self._VerifyOrphanInstances(instancelist, node_instance,
882                                          feedback_fn)
883     bad = bad or result
884
885     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
886       feedback_fn("* Verifying N+1 Memory redundancy")
887       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
888       bad = bad or result
889
890     feedback_fn("* Other Notes")
891     if i_non_redundant:
892       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
893                   % len(i_non_redundant))
894
895     if i_non_a_balanced:
896       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
897                   % len(i_non_a_balanced))
898
899     return not bad
900
901   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
902     """Analize the post-hooks' result, handle it, and send some
903     nicely-formatted feedback back to the user.
904
905     Args:
906       phase: the hooks phase that has just been run
907       hooks_results: the results of the multi-node hooks rpc call
908       feedback_fn: function to send feedback back to the caller
909       lu_result: previous Exec result
910
911     """
912     # We only really run POST phase hooks, and are only interested in
913     # their results
914     if phase == constants.HOOKS_PHASE_POST:
915       # Used to change hooks' output to proper indentation
916       indent_re = re.compile('^', re.M)
917       feedback_fn("* Hooks Results")
918       if not hooks_results:
919         feedback_fn("  - ERROR: general communication failure")
920         lu_result = 1
921       else:
922         for node_name in hooks_results:
923           show_node_header = True
924           res = hooks_results[node_name]
925           if res is False or not isinstance(res, list):
926             feedback_fn("    Communication failure")
927             lu_result = 1
928             continue
929           for script, hkr, output in res:
930             if hkr == constants.HKR_FAIL:
931               # The node header is only shown once, if there are
932               # failing hooks on that node
933               if show_node_header:
934                 feedback_fn("  Node %s:" % node_name)
935                 show_node_header = False
936               feedback_fn("    ERROR: Script %s failed, output:" % script)
937               output = indent_re.sub('      ', output)
938               feedback_fn("%s" % output)
939               lu_result = 1
940
941       return lu_result
942
943
944 class LUVerifyDisks(NoHooksLU):
945   """Verifies the cluster disks status.
946
947   """
948   _OP_REQP = []
949   REQ_BGL = False
950
951   def ExpandNames(self):
952     self.needed_locks = {
953       locking.LEVEL_NODE: locking.ALL_SET,
954       locking.LEVEL_INSTANCE: locking.ALL_SET,
955     }
956     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
957
958   def CheckPrereq(self):
959     """Check prerequisites.
960
961     This has no prerequisites.
962
963     """
964     pass
965
966   def Exec(self, feedback_fn):
967     """Verify integrity of cluster disks.
968
969     """
970     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
971
972     vg_name = self.cfg.GetVGName()
973     nodes = utils.NiceSort(self.cfg.GetNodeList())
974     instances = [self.cfg.GetInstanceInfo(name)
975                  for name in self.cfg.GetInstanceList()]
976
977     nv_dict = {}
978     for inst in instances:
979       inst_lvs = {}
980       if (inst.status != "up" or
981           inst.disk_template not in constants.DTS_NET_MIRROR):
982         continue
983       inst.MapLVsByNode(inst_lvs)
984       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
985       for node, vol_list in inst_lvs.iteritems():
986         for vol in vol_list:
987           nv_dict[(node, vol)] = inst
988
989     if not nv_dict:
990       return result
991
992     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
993
994     to_act = set()
995     for node in nodes:
996       # node_volume
997       lvs = node_lvs[node]
998
999       if isinstance(lvs, basestring):
1000         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1001         res_nlvm[node] = lvs
1002       elif not isinstance(lvs, dict):
1003         logging.warning("Connection to node %s failed or invalid data"
1004                         " returned", node)
1005         res_nodes.append(node)
1006         continue
1007
1008       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1009         inst = nv_dict.pop((node, lv_name), None)
1010         if (not lv_online and inst is not None
1011             and inst.name not in res_instances):
1012           res_instances.append(inst.name)
1013
1014     # any leftover items in nv_dict are missing LVs, let's arrange the
1015     # data better
1016     for key, inst in nv_dict.iteritems():
1017       if inst.name not in res_missing:
1018         res_missing[inst.name] = []
1019       res_missing[inst.name].append(key)
1020
1021     return result
1022
1023
1024 class LURenameCluster(LogicalUnit):
1025   """Rename the cluster.
1026
1027   """
1028   HPATH = "cluster-rename"
1029   HTYPE = constants.HTYPE_CLUSTER
1030   _OP_REQP = ["name"]
1031
1032   def BuildHooksEnv(self):
1033     """Build hooks env.
1034
1035     """
1036     env = {
1037       "OP_TARGET": self.cfg.GetClusterName(),
1038       "NEW_NAME": self.op.name,
1039       }
1040     mn = self.cfg.GetMasterNode()
1041     return env, [mn], [mn]
1042
1043   def CheckPrereq(self):
1044     """Verify that the passed name is a valid one.
1045
1046     """
1047     hostname = utils.HostInfo(self.op.name)
1048
1049     new_name = hostname.name
1050     self.ip = new_ip = hostname.ip
1051     old_name = self.cfg.GetClusterName()
1052     old_ip = self.cfg.GetMasterIP()
1053     if new_name == old_name and new_ip == old_ip:
1054       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1055                                  " cluster has changed")
1056     if new_ip != old_ip:
1057       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1058         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1059                                    " reachable on the network. Aborting." %
1060                                    new_ip)
1061
1062     self.op.name = new_name
1063
1064   def Exec(self, feedback_fn):
1065     """Rename the cluster.
1066
1067     """
1068     clustername = self.op.name
1069     ip = self.ip
1070
1071     # shutdown the master IP
1072     master = self.cfg.GetMasterNode()
1073     if not self.rpc.call_node_stop_master(master, False):
1074       raise errors.OpExecError("Could not disable the master role")
1075
1076     try:
1077       # modify the sstore
1078       # TODO: sstore
1079       ss.SetKey(ss.SS_MASTER_IP, ip)
1080       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1081
1082       # Distribute updated ss config to all nodes
1083       myself = self.cfg.GetNodeInfo(master)
1084       dist_nodes = self.cfg.GetNodeList()
1085       if myself.name in dist_nodes:
1086         dist_nodes.remove(myself.name)
1087
1088       logging.debug("Copying updated ssconf data to all nodes")
1089       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1090         fname = ss.KeyToFilename(keyname)
1091         result = self.rpc.call_upload_file(dist_nodes, fname)
1092         for to_node in dist_nodes:
1093           if not result[to_node]:
1094             self.LogWarning("Copy of file %s to node %s failed",
1095                             fname, to_node)
1096     finally:
1097       if not self.rpc.call_node_start_master(master, False):
1098         self.LogWarning("Could not re-enable the master role on"
1099                         " the master, please restart manually.")
1100
1101
1102 def _RecursiveCheckIfLVMBased(disk):
1103   """Check if the given disk or its children are lvm-based.
1104
1105   Args:
1106     disk: ganeti.objects.Disk object
1107
1108   Returns:
1109     boolean indicating whether a LD_LV dev_type was found or not
1110
1111   """
1112   if disk.children:
1113     for chdisk in disk.children:
1114       if _RecursiveCheckIfLVMBased(chdisk):
1115         return True
1116   return disk.dev_type == constants.LD_LV
1117
1118
1119 class LUSetClusterParams(LogicalUnit):
1120   """Change the parameters of the cluster.
1121
1122   """
1123   HPATH = "cluster-modify"
1124   HTYPE = constants.HTYPE_CLUSTER
1125   _OP_REQP = []
1126   REQ_BGL = False
1127
1128   def ExpandNames(self):
1129     # FIXME: in the future maybe other cluster params won't require checking on
1130     # all nodes to be modified.
1131     self.needed_locks = {
1132       locking.LEVEL_NODE: locking.ALL_SET,
1133     }
1134     self.share_locks[locking.LEVEL_NODE] = 1
1135
1136   def BuildHooksEnv(self):
1137     """Build hooks env.
1138
1139     """
1140     env = {
1141       "OP_TARGET": self.cfg.GetClusterName(),
1142       "NEW_VG_NAME": self.op.vg_name,
1143       }
1144     mn = self.cfg.GetMasterNode()
1145     return env, [mn], [mn]
1146
1147   def CheckPrereq(self):
1148     """Check prerequisites.
1149
1150     This checks whether the given params don't conflict and
1151     if the given volume group is valid.
1152
1153     """
1154     # FIXME: This only works because there is only one parameter that can be
1155     # changed or removed.
1156     if self.op.vg_name is not None and not self.op.vg_name:
1157       instances = self.cfg.GetAllInstancesInfo().values()
1158       for inst in instances:
1159         for disk in inst.disks:
1160           if _RecursiveCheckIfLVMBased(disk):
1161             raise errors.OpPrereqError("Cannot disable lvm storage while"
1162                                        " lvm-based instances exist")
1163
1164     node_list = self.acquired_locks[locking.LEVEL_NODE]
1165
1166     # if vg_name not None, checks given volume group on all nodes
1167     if self.op.vg_name:
1168       vglist = self.rpc.call_vg_list(node_list)
1169       for node in node_list:
1170         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1171                                               constants.MIN_VG_SIZE)
1172         if vgstatus:
1173           raise errors.OpPrereqError("Error on node '%s': %s" %
1174                                      (node, vgstatus))
1175
1176     self.cluster = cluster = self.cfg.GetClusterInfo()
1177     # beparams changes do not need validation (we can't validate?),
1178     # but we still process here
1179     if self.op.beparams:
1180       self.new_beparams = cluster.FillDict(
1181         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1182
1183     # hypervisor list/parameters
1184     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1185     if self.op.hvparams:
1186       if not isinstance(self.op.hvparams, dict):
1187         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1188       for hv_name, hv_dict in self.op.hvparams.items():
1189         if hv_name not in self.new_hvparams:
1190           self.new_hvparams[hv_name] = hv_dict
1191         else:
1192           self.new_hvparams[hv_name].update(hv_dict)
1193
1194     if self.op.enabled_hypervisors is not None:
1195       self.hv_list = self.op.enabled_hypervisors
1196     else:
1197       self.hv_list = cluster.enabled_hypervisors
1198
1199     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1200       # either the enabled list has changed, or the parameters have, validate
1201       for hv_name, hv_params in self.new_hvparams.items():
1202         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1203             (self.op.enabled_hypervisors and
1204              hv_name in self.op.enabled_hypervisors)):
1205           # either this is a new hypervisor, or its parameters have changed
1206           hv_class = hypervisor.GetHypervisor(hv_name)
1207           hv_class.CheckParameterSyntax(hv_params)
1208           _CheckHVParams(self, node_list, hv_name, hv_params)
1209
1210   def Exec(self, feedback_fn):
1211     """Change the parameters of the cluster.
1212
1213     """
1214     if self.op.vg_name is not None:
1215       if self.op.vg_name != self.cfg.GetVGName():
1216         self.cfg.SetVGName(self.op.vg_name)
1217       else:
1218         feedback_fn("Cluster LVM configuration already in desired"
1219                     " state, not changing")
1220     if self.op.hvparams:
1221       self.cluster.hvparams = self.new_hvparams
1222     if self.op.enabled_hypervisors is not None:
1223       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1224     if self.op.beparams:
1225       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1226     self.cfg.Update(self.cluster)
1227
1228
1229 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1230   """Sleep and poll for an instance's disk to sync.
1231
1232   """
1233   if not instance.disks:
1234     return True
1235
1236   if not oneshot:
1237     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1238
1239   node = instance.primary_node
1240
1241   for dev in instance.disks:
1242     lu.cfg.SetDiskID(dev, node)
1243
1244   retries = 0
1245   while True:
1246     max_time = 0
1247     done = True
1248     cumul_degraded = False
1249     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1250     if not rstats:
1251       lu.LogWarning("Can't get any data from node %s", node)
1252       retries += 1
1253       if retries >= 10:
1254         raise errors.RemoteError("Can't contact node %s for mirror data,"
1255                                  " aborting." % node)
1256       time.sleep(6)
1257       continue
1258     retries = 0
1259     for i in range(len(rstats)):
1260       mstat = rstats[i]
1261       if mstat is None:
1262         lu.LogWarning("Can't compute data for node %s/%s",
1263                            node, instance.disks[i].iv_name)
1264         continue
1265       # we ignore the ldisk parameter
1266       perc_done, est_time, is_degraded, _ = mstat
1267       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1268       if perc_done is not None:
1269         done = False
1270         if est_time is not None:
1271           rem_time = "%d estimated seconds remaining" % est_time
1272           max_time = est_time
1273         else:
1274           rem_time = "no time estimate"
1275         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1276                         (instance.disks[i].iv_name, perc_done, rem_time))
1277     if done or oneshot:
1278       break
1279
1280     time.sleep(min(60, max_time))
1281
1282   if done:
1283     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1284   return not cumul_degraded
1285
1286
1287 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1288   """Check that mirrors are not degraded.
1289
1290   The ldisk parameter, if True, will change the test from the
1291   is_degraded attribute (which represents overall non-ok status for
1292   the device(s)) to the ldisk (representing the local storage status).
1293
1294   """
1295   lu.cfg.SetDiskID(dev, node)
1296   if ldisk:
1297     idx = 6
1298   else:
1299     idx = 5
1300
1301   result = True
1302   if on_primary or dev.AssembleOnSecondary():
1303     rstats = lu.rpc.call_blockdev_find(node, dev)
1304     if not rstats:
1305       logging.warning("Node %s: disk degraded, not found or node down", node)
1306       result = False
1307     else:
1308       result = result and (not rstats[idx])
1309   if dev.children:
1310     for child in dev.children:
1311       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1312
1313   return result
1314
1315
1316 class LUDiagnoseOS(NoHooksLU):
1317   """Logical unit for OS diagnose/query.
1318
1319   """
1320   _OP_REQP = ["output_fields", "names"]
1321   REQ_BGL = False
1322
1323   def ExpandNames(self):
1324     if self.op.names:
1325       raise errors.OpPrereqError("Selective OS query not supported")
1326
1327     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1328     _CheckOutputFields(static=[],
1329                        dynamic=self.dynamic_fields,
1330                        selected=self.op.output_fields)
1331
1332     # Lock all nodes, in shared mode
1333     self.needed_locks = {}
1334     self.share_locks[locking.LEVEL_NODE] = 1
1335     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1336
1337   def CheckPrereq(self):
1338     """Check prerequisites.
1339
1340     """
1341
1342   @staticmethod
1343   def _DiagnoseByOS(node_list, rlist):
1344     """Remaps a per-node return list into an a per-os per-node dictionary
1345
1346       Args:
1347         node_list: a list with the names of all nodes
1348         rlist: a map with node names as keys and OS objects as values
1349
1350       Returns:
1351         map: a map with osnames as keys and as value another map, with
1352              nodes as
1353              keys and list of OS objects as values
1354              e.g. {"debian-etch": {"node1": [<object>,...],
1355                                    "node2": [<object>,]}
1356                   }
1357
1358     """
1359     all_os = {}
1360     for node_name, nr in rlist.iteritems():
1361       if not nr:
1362         continue
1363       for os_obj in nr:
1364         if os_obj.name not in all_os:
1365           # build a list of nodes for this os containing empty lists
1366           # for each node in node_list
1367           all_os[os_obj.name] = {}
1368           for nname in node_list:
1369             all_os[os_obj.name][nname] = []
1370         all_os[os_obj.name][node_name].append(os_obj)
1371     return all_os
1372
1373   def Exec(self, feedback_fn):
1374     """Compute the list of OSes.
1375
1376     """
1377     node_list = self.acquired_locks[locking.LEVEL_NODE]
1378     node_data = self.rpc.call_os_diagnose(node_list)
1379     if node_data == False:
1380       raise errors.OpExecError("Can't gather the list of OSes")
1381     pol = self._DiagnoseByOS(node_list, node_data)
1382     output = []
1383     for os_name, os_data in pol.iteritems():
1384       row = []
1385       for field in self.op.output_fields:
1386         if field == "name":
1387           val = os_name
1388         elif field == "valid":
1389           val = utils.all([osl and osl[0] for osl in os_data.values()])
1390         elif field == "node_status":
1391           val = {}
1392           for node_name, nos_list in os_data.iteritems():
1393             val[node_name] = [(v.status, v.path) for v in nos_list]
1394         else:
1395           raise errors.ParameterError(field)
1396         row.append(val)
1397       output.append(row)
1398
1399     return output
1400
1401
1402 class LURemoveNode(LogicalUnit):
1403   """Logical unit for removing a node.
1404
1405   """
1406   HPATH = "node-remove"
1407   HTYPE = constants.HTYPE_NODE
1408   _OP_REQP = ["node_name"]
1409
1410   def BuildHooksEnv(self):
1411     """Build hooks env.
1412
1413     This doesn't run on the target node in the pre phase as a failed
1414     node would then be impossible to remove.
1415
1416     """
1417     env = {
1418       "OP_TARGET": self.op.node_name,
1419       "NODE_NAME": self.op.node_name,
1420       }
1421     all_nodes = self.cfg.GetNodeList()
1422     all_nodes.remove(self.op.node_name)
1423     return env, all_nodes, all_nodes
1424
1425   def CheckPrereq(self):
1426     """Check prerequisites.
1427
1428     This checks:
1429      - the node exists in the configuration
1430      - it does not have primary or secondary instances
1431      - it's not the master
1432
1433     Any errors are signalled by raising errors.OpPrereqError.
1434
1435     """
1436     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1437     if node is None:
1438       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1439
1440     instance_list = self.cfg.GetInstanceList()
1441
1442     masternode = self.cfg.GetMasterNode()
1443     if node.name == masternode:
1444       raise errors.OpPrereqError("Node is the master node,"
1445                                  " you need to failover first.")
1446
1447     for instance_name in instance_list:
1448       instance = self.cfg.GetInstanceInfo(instance_name)
1449       if node.name == instance.primary_node:
1450         raise errors.OpPrereqError("Instance %s still running on the node,"
1451                                    " please remove first." % instance_name)
1452       if node.name in instance.secondary_nodes:
1453         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1454                                    " please remove first." % instance_name)
1455     self.op.node_name = node.name
1456     self.node = node
1457
1458   def Exec(self, feedback_fn):
1459     """Removes the node from the cluster.
1460
1461     """
1462     node = self.node
1463     logging.info("Stopping the node daemon and removing configs from node %s",
1464                  node.name)
1465
1466     self.context.RemoveNode(node.name)
1467
1468     self.rpc.call_node_leave_cluster(node.name)
1469
1470
1471 class LUQueryNodes(NoHooksLU):
1472   """Logical unit for querying nodes.
1473
1474   """
1475   _OP_REQP = ["output_fields", "names"]
1476   REQ_BGL = False
1477
1478   def ExpandNames(self):
1479     self.dynamic_fields = frozenset([
1480       "dtotal", "dfree",
1481       "mtotal", "mnode", "mfree",
1482       "bootid",
1483       "ctotal",
1484       ])
1485
1486     self.static_fields = frozenset([
1487       "name", "pinst_cnt", "sinst_cnt",
1488       "pinst_list", "sinst_list",
1489       "pip", "sip", "tags",
1490       "serial_no",
1491       ])
1492
1493     _CheckOutputFields(static=self.static_fields,
1494                        dynamic=self.dynamic_fields,
1495                        selected=self.op.output_fields)
1496
1497     self.needed_locks = {}
1498     self.share_locks[locking.LEVEL_NODE] = 1
1499
1500     if self.op.names:
1501       self.wanted = _GetWantedNodes(self, self.op.names)
1502     else:
1503       self.wanted = locking.ALL_SET
1504
1505     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1506     if self.do_locking:
1507       # if we don't request only static fields, we need to lock the nodes
1508       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1509
1510
1511   def CheckPrereq(self):
1512     """Check prerequisites.
1513
1514     """
1515     # The validation of the node list is done in the _GetWantedNodes,
1516     # if non empty, and if empty, there's no validation to do
1517     pass
1518
1519   def Exec(self, feedback_fn):
1520     """Computes the list of nodes and their attributes.
1521
1522     """
1523     all_info = self.cfg.GetAllNodesInfo()
1524     if self.do_locking:
1525       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1526     elif self.wanted != locking.ALL_SET:
1527       nodenames = self.wanted
1528       missing = set(nodenames).difference(all_info.keys())
1529       if missing:
1530         raise errors.OpExecError(
1531           "Some nodes were removed before retrieving their data: %s" % missing)
1532     else:
1533       nodenames = all_info.keys()
1534
1535     nodenames = utils.NiceSort(nodenames)
1536     nodelist = [all_info[name] for name in nodenames]
1537
1538     # begin data gathering
1539
1540     if self.dynamic_fields.intersection(self.op.output_fields):
1541       live_data = {}
1542       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1543                                           self.cfg.GetHypervisorType())
1544       for name in nodenames:
1545         nodeinfo = node_data.get(name, None)
1546         if nodeinfo:
1547           live_data[name] = {
1548             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1549             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1550             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1551             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1552             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1553             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1554             "bootid": nodeinfo['bootid'],
1555             }
1556         else:
1557           live_data[name] = {}
1558     else:
1559       live_data = dict.fromkeys(nodenames, {})
1560
1561     node_to_primary = dict([(name, set()) for name in nodenames])
1562     node_to_secondary = dict([(name, set()) for name in nodenames])
1563
1564     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1565                              "sinst_cnt", "sinst_list"))
1566     if inst_fields & frozenset(self.op.output_fields):
1567       instancelist = self.cfg.GetInstanceList()
1568
1569       for instance_name in instancelist:
1570         inst = self.cfg.GetInstanceInfo(instance_name)
1571         if inst.primary_node in node_to_primary:
1572           node_to_primary[inst.primary_node].add(inst.name)
1573         for secnode in inst.secondary_nodes:
1574           if secnode in node_to_secondary:
1575             node_to_secondary[secnode].add(inst.name)
1576
1577     # end data gathering
1578
1579     output = []
1580     for node in nodelist:
1581       node_output = []
1582       for field in self.op.output_fields:
1583         if field == "name":
1584           val = node.name
1585         elif field == "pinst_list":
1586           val = list(node_to_primary[node.name])
1587         elif field == "sinst_list":
1588           val = list(node_to_secondary[node.name])
1589         elif field == "pinst_cnt":
1590           val = len(node_to_primary[node.name])
1591         elif field == "sinst_cnt":
1592           val = len(node_to_secondary[node.name])
1593         elif field == "pip":
1594           val = node.primary_ip
1595         elif field == "sip":
1596           val = node.secondary_ip
1597         elif field == "tags":
1598           val = list(node.GetTags())
1599         elif field == "serial_no":
1600           val = node.serial_no
1601         elif field in self.dynamic_fields:
1602           val = live_data[node.name].get(field, None)
1603         else:
1604           raise errors.ParameterError(field)
1605         node_output.append(val)
1606       output.append(node_output)
1607
1608     return output
1609
1610
1611 class LUQueryNodeVolumes(NoHooksLU):
1612   """Logical unit for getting volumes on node(s).
1613
1614   """
1615   _OP_REQP = ["nodes", "output_fields"]
1616   REQ_BGL = False
1617
1618   def ExpandNames(self):
1619     _CheckOutputFields(static=["node"],
1620                        dynamic=["phys", "vg", "name", "size", "instance"],
1621                        selected=self.op.output_fields)
1622
1623     self.needed_locks = {}
1624     self.share_locks[locking.LEVEL_NODE] = 1
1625     if not self.op.nodes:
1626       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1627     else:
1628       self.needed_locks[locking.LEVEL_NODE] = \
1629         _GetWantedNodes(self, self.op.nodes)
1630
1631   def CheckPrereq(self):
1632     """Check prerequisites.
1633
1634     This checks that the fields required are valid output fields.
1635
1636     """
1637     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1638
1639   def Exec(self, feedback_fn):
1640     """Computes the list of nodes and their attributes.
1641
1642     """
1643     nodenames = self.nodes
1644     volumes = self.rpc.call_node_volumes(nodenames)
1645
1646     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1647              in self.cfg.GetInstanceList()]
1648
1649     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1650
1651     output = []
1652     for node in nodenames:
1653       if node not in volumes or not volumes[node]:
1654         continue
1655
1656       node_vols = volumes[node][:]
1657       node_vols.sort(key=lambda vol: vol['dev'])
1658
1659       for vol in node_vols:
1660         node_output = []
1661         for field in self.op.output_fields:
1662           if field == "node":
1663             val = node
1664           elif field == "phys":
1665             val = vol['dev']
1666           elif field == "vg":
1667             val = vol['vg']
1668           elif field == "name":
1669             val = vol['name']
1670           elif field == "size":
1671             val = int(float(vol['size']))
1672           elif field == "instance":
1673             for inst in ilist:
1674               if node not in lv_by_node[inst]:
1675                 continue
1676               if vol['name'] in lv_by_node[inst][node]:
1677                 val = inst.name
1678                 break
1679             else:
1680               val = '-'
1681           else:
1682             raise errors.ParameterError(field)
1683           node_output.append(str(val))
1684
1685         output.append(node_output)
1686
1687     return output
1688
1689
1690 class LUAddNode(LogicalUnit):
1691   """Logical unit for adding node to the cluster.
1692
1693   """
1694   HPATH = "node-add"
1695   HTYPE = constants.HTYPE_NODE
1696   _OP_REQP = ["node_name"]
1697
1698   def BuildHooksEnv(self):
1699     """Build hooks env.
1700
1701     This will run on all nodes before, and on all nodes + the new node after.
1702
1703     """
1704     env = {
1705       "OP_TARGET": self.op.node_name,
1706       "NODE_NAME": self.op.node_name,
1707       "NODE_PIP": self.op.primary_ip,
1708       "NODE_SIP": self.op.secondary_ip,
1709       }
1710     nodes_0 = self.cfg.GetNodeList()
1711     nodes_1 = nodes_0 + [self.op.node_name, ]
1712     return env, nodes_0, nodes_1
1713
1714   def CheckPrereq(self):
1715     """Check prerequisites.
1716
1717     This checks:
1718      - the new node is not already in the config
1719      - it is resolvable
1720      - its parameters (single/dual homed) matches the cluster
1721
1722     Any errors are signalled by raising errors.OpPrereqError.
1723
1724     """
1725     node_name = self.op.node_name
1726     cfg = self.cfg
1727
1728     dns_data = utils.HostInfo(node_name)
1729
1730     node = dns_data.name
1731     primary_ip = self.op.primary_ip = dns_data.ip
1732     secondary_ip = getattr(self.op, "secondary_ip", None)
1733     if secondary_ip is None:
1734       secondary_ip = primary_ip
1735     if not utils.IsValidIP(secondary_ip):
1736       raise errors.OpPrereqError("Invalid secondary IP given")
1737     self.op.secondary_ip = secondary_ip
1738
1739     node_list = cfg.GetNodeList()
1740     if not self.op.readd and node in node_list:
1741       raise errors.OpPrereqError("Node %s is already in the configuration" %
1742                                  node)
1743     elif self.op.readd and node not in node_list:
1744       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1745
1746     for existing_node_name in node_list:
1747       existing_node = cfg.GetNodeInfo(existing_node_name)
1748
1749       if self.op.readd and node == existing_node_name:
1750         if (existing_node.primary_ip != primary_ip or
1751             existing_node.secondary_ip != secondary_ip):
1752           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1753                                      " address configuration as before")
1754         continue
1755
1756       if (existing_node.primary_ip == primary_ip or
1757           existing_node.secondary_ip == primary_ip or
1758           existing_node.primary_ip == secondary_ip or
1759           existing_node.secondary_ip == secondary_ip):
1760         raise errors.OpPrereqError("New node ip address(es) conflict with"
1761                                    " existing node %s" % existing_node.name)
1762
1763     # check that the type of the node (single versus dual homed) is the
1764     # same as for the master
1765     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1766     master_singlehomed = myself.secondary_ip == myself.primary_ip
1767     newbie_singlehomed = secondary_ip == primary_ip
1768     if master_singlehomed != newbie_singlehomed:
1769       if master_singlehomed:
1770         raise errors.OpPrereqError("The master has no private ip but the"
1771                                    " new node has one")
1772       else:
1773         raise errors.OpPrereqError("The master has a private ip but the"
1774                                    " new node doesn't have one")
1775
1776     # checks reachablity
1777     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1778       raise errors.OpPrereqError("Node not reachable by ping")
1779
1780     if not newbie_singlehomed:
1781       # check reachability from my secondary ip to newbie's secondary ip
1782       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1783                            source=myself.secondary_ip):
1784         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1785                                    " based ping to noded port")
1786
1787     self.new_node = objects.Node(name=node,
1788                                  primary_ip=primary_ip,
1789                                  secondary_ip=secondary_ip)
1790
1791   def Exec(self, feedback_fn):
1792     """Adds the new node to the cluster.
1793
1794     """
1795     new_node = self.new_node
1796     node = new_node.name
1797
1798     # check connectivity
1799     result = self.rpc.call_version([node])[node]
1800     if result:
1801       if constants.PROTOCOL_VERSION == result:
1802         logging.info("Communication to node %s fine, sw version %s match",
1803                      node, result)
1804       else:
1805         raise errors.OpExecError("Version mismatch master version %s,"
1806                                  " node version %s" %
1807                                  (constants.PROTOCOL_VERSION, result))
1808     else:
1809       raise errors.OpExecError("Cannot get version from the new node")
1810
1811     # setup ssh on node
1812     logging.info("Copy ssh key to node %s", node)
1813     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1814     keyarray = []
1815     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1816                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1817                 priv_key, pub_key]
1818
1819     for i in keyfiles:
1820       f = open(i, 'r')
1821       try:
1822         keyarray.append(f.read())
1823       finally:
1824         f.close()
1825
1826     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1827                                     keyarray[2],
1828                                     keyarray[3], keyarray[4], keyarray[5])
1829
1830     if not result:
1831       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1832
1833     # Add node to our /etc/hosts, and add key to known_hosts
1834     utils.AddHostToEtcHosts(new_node.name)
1835
1836     if new_node.secondary_ip != new_node.primary_ip:
1837       if not self.rpc.call_node_has_ip_address(new_node.name,
1838                                                new_node.secondary_ip):
1839         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1840                                  " you gave (%s). Please fix and re-run this"
1841                                  " command." % new_node.secondary_ip)
1842
1843     node_verify_list = [self.cfg.GetMasterNode()]
1844     node_verify_param = {
1845       'nodelist': [node],
1846       # TODO: do a node-net-test as well?
1847     }
1848
1849     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1850                                        self.cfg.GetClusterName())
1851     for verifier in node_verify_list:
1852       if not result[verifier]:
1853         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1854                                  " for remote verification" % verifier)
1855       if result[verifier]['nodelist']:
1856         for failed in result[verifier]['nodelist']:
1857           feedback_fn("ssh/hostname verification failed %s -> %s" %
1858                       (verifier, result[verifier]['nodelist'][failed]))
1859         raise errors.OpExecError("ssh/hostname verification failed.")
1860
1861     # Distribute updated /etc/hosts and known_hosts to all nodes,
1862     # including the node just added
1863     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1864     dist_nodes = self.cfg.GetNodeList()
1865     if not self.op.readd:
1866       dist_nodes.append(node)
1867     if myself.name in dist_nodes:
1868       dist_nodes.remove(myself.name)
1869
1870     logging.debug("Copying hosts and known_hosts to all nodes")
1871     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1872       result = self.rpc.call_upload_file(dist_nodes, fname)
1873       for to_node in dist_nodes:
1874         if not result[to_node]:
1875           logging.error("Copy of file %s to node %s failed", fname, to_node)
1876
1877     to_copy = []
1878     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1879       to_copy.append(constants.VNC_PASSWORD_FILE)
1880     for fname in to_copy:
1881       result = self.rpc.call_upload_file([node], fname)
1882       if not result[node]:
1883         logging.error("Could not copy file %s to node %s", fname, node)
1884
1885     if self.op.readd:
1886       self.context.ReaddNode(new_node)
1887     else:
1888       self.context.AddNode(new_node)
1889
1890
1891 class LUQueryClusterInfo(NoHooksLU):
1892   """Query cluster configuration.
1893
1894   """
1895   _OP_REQP = []
1896   REQ_MASTER = False
1897   REQ_BGL = False
1898
1899   def ExpandNames(self):
1900     self.needed_locks = {}
1901
1902   def CheckPrereq(self):
1903     """No prerequsites needed for this LU.
1904
1905     """
1906     pass
1907
1908   def Exec(self, feedback_fn):
1909     """Return cluster config.
1910
1911     """
1912     cluster = self.cfg.GetClusterInfo()
1913     result = {
1914       "software_version": constants.RELEASE_VERSION,
1915       "protocol_version": constants.PROTOCOL_VERSION,
1916       "config_version": constants.CONFIG_VERSION,
1917       "os_api_version": constants.OS_API_VERSION,
1918       "export_version": constants.EXPORT_VERSION,
1919       "architecture": (platform.architecture()[0], platform.machine()),
1920       "name": cluster.cluster_name,
1921       "master": cluster.master_node,
1922       "default_hypervisor": cluster.default_hypervisor,
1923       "enabled_hypervisors": cluster.enabled_hypervisors,
1924       "hvparams": cluster.hvparams,
1925       "beparams": cluster.beparams,
1926       }
1927
1928     return result
1929
1930
1931 class LUQueryConfigValues(NoHooksLU):
1932   """Return configuration values.
1933
1934   """
1935   _OP_REQP = []
1936   REQ_BGL = False
1937
1938   def ExpandNames(self):
1939     self.needed_locks = {}
1940
1941     static_fields = ["cluster_name", "master_node", "drain_flag"]
1942     _CheckOutputFields(static=static_fields,
1943                        dynamic=[],
1944                        selected=self.op.output_fields)
1945
1946   def CheckPrereq(self):
1947     """No prerequisites.
1948
1949     """
1950     pass
1951
1952   def Exec(self, feedback_fn):
1953     """Dump a representation of the cluster config to the standard output.
1954
1955     """
1956     values = []
1957     for field in self.op.output_fields:
1958       if field == "cluster_name":
1959         entry = self.cfg.GetClusterName()
1960       elif field == "master_node":
1961         entry = self.cfg.GetMasterNode()
1962       elif field == "drain_flag":
1963         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1964       else:
1965         raise errors.ParameterError(field)
1966       values.append(entry)
1967     return values
1968
1969
1970 class LUActivateInstanceDisks(NoHooksLU):
1971   """Bring up an instance's disks.
1972
1973   """
1974   _OP_REQP = ["instance_name"]
1975   REQ_BGL = False
1976
1977   def ExpandNames(self):
1978     self._ExpandAndLockInstance()
1979     self.needed_locks[locking.LEVEL_NODE] = []
1980     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1981
1982   def DeclareLocks(self, level):
1983     if level == locking.LEVEL_NODE:
1984       self._LockInstancesNodes()
1985
1986   def CheckPrereq(self):
1987     """Check prerequisites.
1988
1989     This checks that the instance is in the cluster.
1990
1991     """
1992     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1993     assert self.instance is not None, \
1994       "Cannot retrieve locked instance %s" % self.op.instance_name
1995
1996   def Exec(self, feedback_fn):
1997     """Activate the disks.
1998
1999     """
2000     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2001     if not disks_ok:
2002       raise errors.OpExecError("Cannot activate block devices")
2003
2004     return disks_info
2005
2006
2007 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2008   """Prepare the block devices for an instance.
2009
2010   This sets up the block devices on all nodes.
2011
2012   Args:
2013     instance: a ganeti.objects.Instance object
2014     ignore_secondaries: if true, errors on secondary nodes won't result
2015                         in an error return from the function
2016
2017   Returns:
2018     false if the operation failed
2019     list of (host, instance_visible_name, node_visible_name) if the operation
2020          suceeded with the mapping from node devices to instance devices
2021   """
2022   device_info = []
2023   disks_ok = True
2024   iname = instance.name
2025   # With the two passes mechanism we try to reduce the window of
2026   # opportunity for the race condition of switching DRBD to primary
2027   # before handshaking occured, but we do not eliminate it
2028
2029   # The proper fix would be to wait (with some limits) until the
2030   # connection has been made and drbd transitions from WFConnection
2031   # into any other network-connected state (Connected, SyncTarget,
2032   # SyncSource, etc.)
2033
2034   # 1st pass, assemble on all nodes in secondary mode
2035   for inst_disk in instance.disks:
2036     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2037       lu.cfg.SetDiskID(node_disk, node)
2038       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2039       if not result:
2040         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2041                            " (is_primary=False, pass=1)",
2042                            inst_disk.iv_name, node)
2043         if not ignore_secondaries:
2044           disks_ok = False
2045
2046   # FIXME: race condition on drbd migration to primary
2047
2048   # 2nd pass, do only the primary node
2049   for inst_disk in instance.disks:
2050     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2051       if node != instance.primary_node:
2052         continue
2053       lu.cfg.SetDiskID(node_disk, node)
2054       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2055       if not result:
2056         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2057                            " (is_primary=True, pass=2)",
2058                            inst_disk.iv_name, node)
2059         disks_ok = False
2060     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2061
2062   # leave the disks configured for the primary node
2063   # this is a workaround that would be fixed better by
2064   # improving the logical/physical id handling
2065   for disk in instance.disks:
2066     lu.cfg.SetDiskID(disk, instance.primary_node)
2067
2068   return disks_ok, device_info
2069
2070
2071 def _StartInstanceDisks(lu, instance, force):
2072   """Start the disks of an instance.
2073
2074   """
2075   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2076                                            ignore_secondaries=force)
2077   if not disks_ok:
2078     _ShutdownInstanceDisks(lu, instance)
2079     if force is not None and not force:
2080       lu.proc.LogWarning("", hint="If the message above refers to a"
2081                          " secondary node,"
2082                          " you can retry the operation using '--force'.")
2083     raise errors.OpExecError("Disk consistency error")
2084
2085
2086 class LUDeactivateInstanceDisks(NoHooksLU):
2087   """Shutdown an instance's disks.
2088
2089   """
2090   _OP_REQP = ["instance_name"]
2091   REQ_BGL = False
2092
2093   def ExpandNames(self):
2094     self._ExpandAndLockInstance()
2095     self.needed_locks[locking.LEVEL_NODE] = []
2096     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2097
2098   def DeclareLocks(self, level):
2099     if level == locking.LEVEL_NODE:
2100       self._LockInstancesNodes()
2101
2102   def CheckPrereq(self):
2103     """Check prerequisites.
2104
2105     This checks that the instance is in the cluster.
2106
2107     """
2108     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2109     assert self.instance is not None, \
2110       "Cannot retrieve locked instance %s" % self.op.instance_name
2111
2112   def Exec(self, feedback_fn):
2113     """Deactivate the disks
2114
2115     """
2116     instance = self.instance
2117     _SafeShutdownInstanceDisks(self, instance)
2118
2119
2120 def _SafeShutdownInstanceDisks(lu, instance):
2121   """Shutdown block devices of an instance.
2122
2123   This function checks if an instance is running, before calling
2124   _ShutdownInstanceDisks.
2125
2126   """
2127   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2128                                       [instance.hypervisor])
2129   ins_l = ins_l[instance.primary_node]
2130   if not type(ins_l) is list:
2131     raise errors.OpExecError("Can't contact node '%s'" %
2132                              instance.primary_node)
2133
2134   if instance.name in ins_l:
2135     raise errors.OpExecError("Instance is running, can't shutdown"
2136                              " block devices.")
2137
2138   _ShutdownInstanceDisks(lu, instance)
2139
2140
2141 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2142   """Shutdown block devices of an instance.
2143
2144   This does the shutdown on all nodes of the instance.
2145
2146   If the ignore_primary is false, errors on the primary node are
2147   ignored.
2148
2149   """
2150   result = True
2151   for disk in instance.disks:
2152     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2153       lu.cfg.SetDiskID(top_disk, node)
2154       if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2155         logging.error("Could not shutdown block device %s on node %s",
2156                       disk.iv_name, node)
2157         if not ignore_primary or node != instance.primary_node:
2158           result = False
2159   return result
2160
2161
2162 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2163   """Checks if a node has enough free memory.
2164
2165   This function check if a given node has the needed amount of free
2166   memory. In case the node has less memory or we cannot get the
2167   information from the node, this function raise an OpPrereqError
2168   exception.
2169
2170   @type lu: C{LogicalUnit}
2171   @param lu: a logical unit from which we get configuration data
2172   @type node: C{str}
2173   @param node: the node to check
2174   @type reason: C{str}
2175   @param reason: string to use in the error message
2176   @type requested: C{int}
2177   @param requested: the amount of memory in MiB to check for
2178   @type hypervisor: C{str}
2179   @param hypervisor: the hypervisor to ask for memory stats
2180   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2181       we cannot check the node
2182
2183   """
2184   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2185   if not nodeinfo or not isinstance(nodeinfo, dict):
2186     raise errors.OpPrereqError("Could not contact node %s for resource"
2187                              " information" % (node,))
2188
2189   free_mem = nodeinfo[node].get('memory_free')
2190   if not isinstance(free_mem, int):
2191     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2192                              " was '%s'" % (node, free_mem))
2193   if requested > free_mem:
2194     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2195                              " needed %s MiB, available %s MiB" %
2196                              (node, reason, requested, free_mem))
2197
2198
2199 class LUStartupInstance(LogicalUnit):
2200   """Starts an instance.
2201
2202   """
2203   HPATH = "instance-start"
2204   HTYPE = constants.HTYPE_INSTANCE
2205   _OP_REQP = ["instance_name", "force"]
2206   REQ_BGL = False
2207
2208   def ExpandNames(self):
2209     self._ExpandAndLockInstance()
2210     self.needed_locks[locking.LEVEL_NODE] = []
2211     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2212
2213   def DeclareLocks(self, level):
2214     if level == locking.LEVEL_NODE:
2215       self._LockInstancesNodes()
2216
2217   def BuildHooksEnv(self):
2218     """Build hooks env.
2219
2220     This runs on master, primary and secondary nodes of the instance.
2221
2222     """
2223     env = {
2224       "FORCE": self.op.force,
2225       }
2226     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2227     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2228           list(self.instance.secondary_nodes))
2229     return env, nl, nl
2230
2231   def CheckPrereq(self):
2232     """Check prerequisites.
2233
2234     This checks that the instance is in the cluster.
2235
2236     """
2237     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2238     assert self.instance is not None, \
2239       "Cannot retrieve locked instance %s" % self.op.instance_name
2240
2241     bep = self.cfg.GetClusterInfo().FillBE(instance)
2242     # check bridges existance
2243     _CheckInstanceBridgesExist(self, instance)
2244
2245     _CheckNodeFreeMemory(self, instance.primary_node,
2246                          "starting instance %s" % instance.name,
2247                          bep[constants.BE_MEMORY], instance.hypervisor)
2248
2249   def Exec(self, feedback_fn):
2250     """Start the instance.
2251
2252     """
2253     instance = self.instance
2254     force = self.op.force
2255     extra_args = getattr(self.op, "extra_args", "")
2256
2257     self.cfg.MarkInstanceUp(instance.name)
2258
2259     node_current = instance.primary_node
2260
2261     _StartInstanceDisks(self, instance, force)
2262
2263     if not self.rpc.call_instance_start(node_current, instance, extra_args):
2264       _ShutdownInstanceDisks(self, instance)
2265       raise errors.OpExecError("Could not start instance")
2266
2267
2268 class LURebootInstance(LogicalUnit):
2269   """Reboot an instance.
2270
2271   """
2272   HPATH = "instance-reboot"
2273   HTYPE = constants.HTYPE_INSTANCE
2274   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2275   REQ_BGL = False
2276
2277   def ExpandNames(self):
2278     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2279                                    constants.INSTANCE_REBOOT_HARD,
2280                                    constants.INSTANCE_REBOOT_FULL]:
2281       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2282                                   (constants.INSTANCE_REBOOT_SOFT,
2283                                    constants.INSTANCE_REBOOT_HARD,
2284                                    constants.INSTANCE_REBOOT_FULL))
2285     self._ExpandAndLockInstance()
2286     self.needed_locks[locking.LEVEL_NODE] = []
2287     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2288
2289   def DeclareLocks(self, level):
2290     if level == locking.LEVEL_NODE:
2291       primary_only = not constants.INSTANCE_REBOOT_FULL
2292       self._LockInstancesNodes(primary_only=primary_only)
2293
2294   def BuildHooksEnv(self):
2295     """Build hooks env.
2296
2297     This runs on master, primary and secondary nodes of the instance.
2298
2299     """
2300     env = {
2301       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2302       }
2303     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2304     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2305           list(self.instance.secondary_nodes))
2306     return env, nl, nl
2307
2308   def CheckPrereq(self):
2309     """Check prerequisites.
2310
2311     This checks that the instance is in the cluster.
2312
2313     """
2314     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2315     assert self.instance is not None, \
2316       "Cannot retrieve locked instance %s" % self.op.instance_name
2317
2318     # check bridges existance
2319     _CheckInstanceBridgesExist(self, instance)
2320
2321   def Exec(self, feedback_fn):
2322     """Reboot the instance.
2323
2324     """
2325     instance = self.instance
2326     ignore_secondaries = self.op.ignore_secondaries
2327     reboot_type = self.op.reboot_type
2328     extra_args = getattr(self.op, "extra_args", "")
2329
2330     node_current = instance.primary_node
2331
2332     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2333                        constants.INSTANCE_REBOOT_HARD]:
2334       if not self.rpc.call_instance_reboot(node_current, instance,
2335                                            reboot_type, extra_args):
2336         raise errors.OpExecError("Could not reboot instance")
2337     else:
2338       if not self.rpc.call_instance_shutdown(node_current, instance):
2339         raise errors.OpExecError("could not shutdown instance for full reboot")
2340       _ShutdownInstanceDisks(self, instance)
2341       _StartInstanceDisks(self, instance, ignore_secondaries)
2342       if not self.rpc.call_instance_start(node_current, instance, extra_args):
2343         _ShutdownInstanceDisks(self, instance)
2344         raise errors.OpExecError("Could not start instance for full reboot")
2345
2346     self.cfg.MarkInstanceUp(instance.name)
2347
2348
2349 class LUShutdownInstance(LogicalUnit):
2350   """Shutdown an instance.
2351
2352   """
2353   HPATH = "instance-stop"
2354   HTYPE = constants.HTYPE_INSTANCE
2355   _OP_REQP = ["instance_name"]
2356   REQ_BGL = False
2357
2358   def ExpandNames(self):
2359     self._ExpandAndLockInstance()
2360     self.needed_locks[locking.LEVEL_NODE] = []
2361     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2362
2363   def DeclareLocks(self, level):
2364     if level == locking.LEVEL_NODE:
2365       self._LockInstancesNodes()
2366
2367   def BuildHooksEnv(self):
2368     """Build hooks env.
2369
2370     This runs on master, primary and secondary nodes of the instance.
2371
2372     """
2373     env = _BuildInstanceHookEnvByObject(self, self.instance)
2374     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2375           list(self.instance.secondary_nodes))
2376     return env, nl, nl
2377
2378   def CheckPrereq(self):
2379     """Check prerequisites.
2380
2381     This checks that the instance is in the cluster.
2382
2383     """
2384     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2385     assert self.instance is not None, \
2386       "Cannot retrieve locked instance %s" % self.op.instance_name
2387
2388   def Exec(self, feedback_fn):
2389     """Shutdown the instance.
2390
2391     """
2392     instance = self.instance
2393     node_current = instance.primary_node
2394     self.cfg.MarkInstanceDown(instance.name)
2395     if not self.rpc.call_instance_shutdown(node_current, instance):
2396       self.proc.LogWarning("Could not shutdown instance")
2397
2398     _ShutdownInstanceDisks(self, instance)
2399
2400
2401 class LUReinstallInstance(LogicalUnit):
2402   """Reinstall an instance.
2403
2404   """
2405   HPATH = "instance-reinstall"
2406   HTYPE = constants.HTYPE_INSTANCE
2407   _OP_REQP = ["instance_name"]
2408   REQ_BGL = False
2409
2410   def ExpandNames(self):
2411     self._ExpandAndLockInstance()
2412     self.needed_locks[locking.LEVEL_NODE] = []
2413     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2414
2415   def DeclareLocks(self, level):
2416     if level == locking.LEVEL_NODE:
2417       self._LockInstancesNodes()
2418
2419   def BuildHooksEnv(self):
2420     """Build hooks env.
2421
2422     This runs on master, primary and secondary nodes of the instance.
2423
2424     """
2425     env = _BuildInstanceHookEnvByObject(self, self.instance)
2426     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2427           list(self.instance.secondary_nodes))
2428     return env, nl, nl
2429
2430   def CheckPrereq(self):
2431     """Check prerequisites.
2432
2433     This checks that the instance is in the cluster and is not running.
2434
2435     """
2436     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2437     assert instance is not None, \
2438       "Cannot retrieve locked instance %s" % self.op.instance_name
2439
2440     if instance.disk_template == constants.DT_DISKLESS:
2441       raise errors.OpPrereqError("Instance '%s' has no disks" %
2442                                  self.op.instance_name)
2443     if instance.status != "down":
2444       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2445                                  self.op.instance_name)
2446     remote_info = self.rpc.call_instance_info(instance.primary_node,
2447                                               instance.name,
2448                                               instance.hypervisor)
2449     if remote_info:
2450       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2451                                  (self.op.instance_name,
2452                                   instance.primary_node))
2453
2454     self.op.os_type = getattr(self.op, "os_type", None)
2455     if self.op.os_type is not None:
2456       # OS verification
2457       pnode = self.cfg.GetNodeInfo(
2458         self.cfg.ExpandNodeName(instance.primary_node))
2459       if pnode is None:
2460         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2461                                    self.op.pnode)
2462       os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2463       if not os_obj:
2464         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2465                                    " primary node"  % self.op.os_type)
2466
2467     self.instance = instance
2468
2469   def Exec(self, feedback_fn):
2470     """Reinstall the instance.
2471
2472     """
2473     inst = self.instance
2474
2475     if self.op.os_type is not None:
2476       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2477       inst.os = self.op.os_type
2478       self.cfg.Update(inst)
2479
2480     _StartInstanceDisks(self, inst, None)
2481     try:
2482       feedback_fn("Running the instance OS create scripts...")
2483       if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2484                                            "sda", "sdb"):
2485         raise errors.OpExecError("Could not install OS for instance %s"
2486                                  " on node %s" %
2487                                  (inst.name, inst.primary_node))
2488     finally:
2489       _ShutdownInstanceDisks(self, inst)
2490
2491
2492 class LURenameInstance(LogicalUnit):
2493   """Rename an instance.
2494
2495   """
2496   HPATH = "instance-rename"
2497   HTYPE = constants.HTYPE_INSTANCE
2498   _OP_REQP = ["instance_name", "new_name"]
2499
2500   def BuildHooksEnv(self):
2501     """Build hooks env.
2502
2503     This runs on master, primary and secondary nodes of the instance.
2504
2505     """
2506     env = _BuildInstanceHookEnvByObject(self, self.instance)
2507     env["INSTANCE_NEW_NAME"] = self.op.new_name
2508     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2509           list(self.instance.secondary_nodes))
2510     return env, nl, nl
2511
2512   def CheckPrereq(self):
2513     """Check prerequisites.
2514
2515     This checks that the instance is in the cluster and is not running.
2516
2517     """
2518     instance = self.cfg.GetInstanceInfo(
2519       self.cfg.ExpandInstanceName(self.op.instance_name))
2520     if instance is None:
2521       raise errors.OpPrereqError("Instance '%s' not known" %
2522                                  self.op.instance_name)
2523     if instance.status != "down":
2524       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2525                                  self.op.instance_name)
2526     remote_info = self.rpc.call_instance_info(instance.primary_node,
2527                                               instance.name,
2528                                               instance.hypervisor)
2529     if remote_info:
2530       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2531                                  (self.op.instance_name,
2532                                   instance.primary_node))
2533     self.instance = instance
2534
2535     # new name verification
2536     name_info = utils.HostInfo(self.op.new_name)
2537
2538     self.op.new_name = new_name = name_info.name
2539     instance_list = self.cfg.GetInstanceList()
2540     if new_name in instance_list:
2541       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2542                                  new_name)
2543
2544     if not getattr(self.op, "ignore_ip", False):
2545       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2546         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2547                                    (name_info.ip, new_name))
2548
2549
2550   def Exec(self, feedback_fn):
2551     """Reinstall the instance.
2552
2553     """
2554     inst = self.instance
2555     old_name = inst.name
2556
2557     if inst.disk_template == constants.DT_FILE:
2558       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2559
2560     self.cfg.RenameInstance(inst.name, self.op.new_name)
2561     # Change the instance lock. This is definitely safe while we hold the BGL
2562     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2563     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2564
2565     # re-read the instance from the configuration after rename
2566     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2567
2568     if inst.disk_template == constants.DT_FILE:
2569       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2570       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2571                                                      old_file_storage_dir,
2572                                                      new_file_storage_dir)
2573
2574       if not result:
2575         raise errors.OpExecError("Could not connect to node '%s' to rename"
2576                                  " directory '%s' to '%s' (but the instance"
2577                                  " has been renamed in Ganeti)" % (
2578                                  inst.primary_node, old_file_storage_dir,
2579                                  new_file_storage_dir))
2580
2581       if not result[0]:
2582         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2583                                  " (but the instance has been renamed in"
2584                                  " Ganeti)" % (old_file_storage_dir,
2585                                                new_file_storage_dir))
2586
2587     _StartInstanceDisks(self, inst, None)
2588     try:
2589       if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2590                                                old_name):
2591         msg = ("Could not run OS rename script for instance %s on node %s"
2592                " (but the instance has been renamed in Ganeti)" %
2593                (inst.name, inst.primary_node))
2594         self.proc.LogWarning(msg)
2595     finally:
2596       _ShutdownInstanceDisks(self, inst)
2597
2598
2599 class LURemoveInstance(LogicalUnit):
2600   """Remove an instance.
2601
2602   """
2603   HPATH = "instance-remove"
2604   HTYPE = constants.HTYPE_INSTANCE
2605   _OP_REQP = ["instance_name", "ignore_failures"]
2606   REQ_BGL = False
2607
2608   def ExpandNames(self):
2609     self._ExpandAndLockInstance()
2610     self.needed_locks[locking.LEVEL_NODE] = []
2611     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2612
2613   def DeclareLocks(self, level):
2614     if level == locking.LEVEL_NODE:
2615       self._LockInstancesNodes()
2616
2617   def BuildHooksEnv(self):
2618     """Build hooks env.
2619
2620     This runs on master, primary and secondary nodes of the instance.
2621
2622     """
2623     env = _BuildInstanceHookEnvByObject(self, self.instance)
2624     nl = [self.cfg.GetMasterNode()]
2625     return env, nl, nl
2626
2627   def CheckPrereq(self):
2628     """Check prerequisites.
2629
2630     This checks that the instance is in the cluster.
2631
2632     """
2633     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2634     assert self.instance is not None, \
2635       "Cannot retrieve locked instance %s" % self.op.instance_name
2636
2637   def Exec(self, feedback_fn):
2638     """Remove the instance.
2639
2640     """
2641     instance = self.instance
2642     logging.info("Shutting down instance %s on node %s",
2643                  instance.name, instance.primary_node)
2644
2645     if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2646       if self.op.ignore_failures:
2647         feedback_fn("Warning: can't shutdown instance")
2648       else:
2649         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2650                                  (instance.name, instance.primary_node))
2651
2652     logging.info("Removing block devices for instance %s", instance.name)
2653
2654     if not _RemoveDisks(self, instance):
2655       if self.op.ignore_failures:
2656         feedback_fn("Warning: can't remove instance's disks")
2657       else:
2658         raise errors.OpExecError("Can't remove instance's disks")
2659
2660     logging.info("Removing instance %s out of cluster config", instance.name)
2661
2662     self.cfg.RemoveInstance(instance.name)
2663     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2664
2665
2666 class LUQueryInstances(NoHooksLU):
2667   """Logical unit for querying instances.
2668
2669   """
2670   _OP_REQP = ["output_fields", "names"]
2671   REQ_BGL = False
2672
2673   def ExpandNames(self):
2674     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2675     hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2676     bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2677     self.static_fields = frozenset([
2678       "name", "os", "pnode", "snodes",
2679       "admin_state", "admin_ram",
2680       "disk_template", "ip", "mac", "bridge",
2681       "sda_size", "sdb_size", "vcpus", "tags",
2682       "network_port", "beparams",
2683       "serial_no", "hypervisor", "hvparams",
2684       ] + hvp + bep)
2685
2686     _CheckOutputFields(static=self.static_fields,
2687                        dynamic=self.dynamic_fields,
2688                        selected=self.op.output_fields)
2689
2690     self.needed_locks = {}
2691     self.share_locks[locking.LEVEL_INSTANCE] = 1
2692     self.share_locks[locking.LEVEL_NODE] = 1
2693
2694     if self.op.names:
2695       self.wanted = _GetWantedInstances(self, self.op.names)
2696     else:
2697       self.wanted = locking.ALL_SET
2698
2699     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2700     if self.do_locking:
2701       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2702       self.needed_locks[locking.LEVEL_NODE] = []
2703       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2704
2705   def DeclareLocks(self, level):
2706     if level == locking.LEVEL_NODE and self.do_locking:
2707       self._LockInstancesNodes()
2708
2709   def CheckPrereq(self):
2710     """Check prerequisites.
2711
2712     """
2713     pass
2714
2715   def Exec(self, feedback_fn):
2716     """Computes the list of nodes and their attributes.
2717
2718     """
2719     all_info = self.cfg.GetAllInstancesInfo()
2720     if self.do_locking:
2721       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2722     elif self.wanted != locking.ALL_SET:
2723       instance_names = self.wanted
2724       missing = set(instance_names).difference(all_info.keys())
2725       if missing:
2726         raise errors.OpExecError(
2727           "Some instances were removed before retrieving their data: %s"
2728           % missing)
2729     else:
2730       instance_names = all_info.keys()
2731
2732     instance_names = utils.NiceSort(instance_names)
2733     instance_list = [all_info[iname] for iname in instance_names]
2734
2735     # begin data gathering
2736
2737     nodes = frozenset([inst.primary_node for inst in instance_list])
2738     hv_list = list(set([inst.hypervisor for inst in instance_list]))
2739
2740     bad_nodes = []
2741     if self.dynamic_fields.intersection(self.op.output_fields):
2742       live_data = {}
2743       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2744       for name in nodes:
2745         result = node_data[name]
2746         if result:
2747           live_data.update(result)
2748         elif result == False:
2749           bad_nodes.append(name)
2750         # else no instance is alive
2751     else:
2752       live_data = dict([(name, {}) for name in instance_names])
2753
2754     # end data gathering
2755
2756     HVPREFIX = "hv/"
2757     BEPREFIX = "be/"
2758     output = []
2759     for instance in instance_list:
2760       iout = []
2761       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2762       i_be = self.cfg.GetClusterInfo().FillBE(instance)
2763       for field in self.op.output_fields:
2764         if field == "name":
2765           val = instance.name
2766         elif field == "os":
2767           val = instance.os
2768         elif field == "pnode":
2769           val = instance.primary_node
2770         elif field == "snodes":
2771           val = list(instance.secondary_nodes)
2772         elif field == "admin_state":
2773           val = (instance.status != "down")
2774         elif field == "oper_state":
2775           if instance.primary_node in bad_nodes:
2776             val = None
2777           else:
2778             val = bool(live_data.get(instance.name))
2779         elif field == "status":
2780           if instance.primary_node in bad_nodes:
2781             val = "ERROR_nodedown"
2782           else:
2783             running = bool(live_data.get(instance.name))
2784             if running:
2785               if instance.status != "down":
2786                 val = "running"
2787               else:
2788                 val = "ERROR_up"
2789             else:
2790               if instance.status != "down":
2791                 val = "ERROR_down"
2792               else:
2793                 val = "ADMIN_down"
2794         elif field == "oper_ram":
2795           if instance.primary_node in bad_nodes:
2796             val = None
2797           elif instance.name in live_data:
2798             val = live_data[instance.name].get("memory", "?")
2799           else:
2800             val = "-"
2801         elif field == "disk_template":
2802           val = instance.disk_template
2803         elif field == "ip":
2804           val = instance.nics[0].ip
2805         elif field == "bridge":
2806           val = instance.nics[0].bridge
2807         elif field == "mac":
2808           val = instance.nics[0].mac
2809         elif field == "sda_size" or field == "sdb_size":
2810           disk = instance.FindDisk(field[:3])
2811           if disk is None:
2812             val = None
2813           else:
2814             val = disk.size
2815         elif field == "tags":
2816           val = list(instance.GetTags())
2817         elif field == "serial_no":
2818           val = instance.serial_no
2819         elif field == "network_port":
2820           val = instance.network_port
2821         elif field == "hypervisor":
2822           val = instance.hypervisor
2823         elif field == "hvparams":
2824           val = i_hv
2825         elif (field.startswith(HVPREFIX) and
2826               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2827           val = i_hv.get(field[len(HVPREFIX):], None)
2828         elif field == "beparams":
2829           val = i_be
2830         elif (field.startswith(BEPREFIX) and
2831               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2832           val = i_be.get(field[len(BEPREFIX):], None)
2833         else:
2834           raise errors.ParameterError(field)
2835         iout.append(val)
2836       output.append(iout)
2837
2838     return output
2839
2840
2841 class LUFailoverInstance(LogicalUnit):
2842   """Failover an instance.
2843
2844   """
2845   HPATH = "instance-failover"
2846   HTYPE = constants.HTYPE_INSTANCE
2847   _OP_REQP = ["instance_name", "ignore_consistency"]
2848   REQ_BGL = False
2849
2850   def ExpandNames(self):
2851     self._ExpandAndLockInstance()
2852     self.needed_locks[locking.LEVEL_NODE] = []
2853     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2854
2855   def DeclareLocks(self, level):
2856     if level == locking.LEVEL_NODE:
2857       self._LockInstancesNodes()
2858
2859   def BuildHooksEnv(self):
2860     """Build hooks env.
2861
2862     This runs on master, primary and secondary nodes of the instance.
2863
2864     """
2865     env = {
2866       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2867       }
2868     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2869     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2870     return env, nl, nl
2871
2872   def CheckPrereq(self):
2873     """Check prerequisites.
2874
2875     This checks that the instance is in the cluster.
2876
2877     """
2878     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2879     assert self.instance is not None, \
2880       "Cannot retrieve locked instance %s" % self.op.instance_name
2881
2882     bep = self.cfg.GetClusterInfo().FillBE(instance)
2883     if instance.disk_template not in constants.DTS_NET_MIRROR:
2884       raise errors.OpPrereqError("Instance's disk layout is not"
2885                                  " network mirrored, cannot failover.")
2886
2887     secondary_nodes = instance.secondary_nodes
2888     if not secondary_nodes:
2889       raise errors.ProgrammerError("no secondary node but using "
2890                                    "a mirrored disk template")
2891
2892     target_node = secondary_nodes[0]
2893     # check memory requirements on the secondary node
2894     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2895                          instance.name, bep[constants.BE_MEMORY],
2896                          instance.hypervisor)
2897
2898     # check bridge existance
2899     brlist = [nic.bridge for nic in instance.nics]
2900     if not self.rpc.call_bridges_exist(target_node, brlist):
2901       raise errors.OpPrereqError("One or more target bridges %s does not"
2902                                  " exist on destination node '%s'" %
2903                                  (brlist, target_node))
2904
2905   def Exec(self, feedback_fn):
2906     """Failover an instance.
2907
2908     The failover is done by shutting it down on its present node and
2909     starting it on the secondary.
2910
2911     """
2912     instance = self.instance
2913
2914     source_node = instance.primary_node
2915     target_node = instance.secondary_nodes[0]
2916
2917     feedback_fn("* checking disk consistency between source and target")
2918     for dev in instance.disks:
2919       # for drbd, these are drbd over lvm
2920       if not _CheckDiskConsistency(self, dev, target_node, False):
2921         if instance.status == "up" and not self.op.ignore_consistency:
2922           raise errors.OpExecError("Disk %s is degraded on target node,"
2923                                    " aborting failover." % dev.iv_name)
2924
2925     feedback_fn("* shutting down instance on source node")
2926     logging.info("Shutting down instance %s on node %s",
2927                  instance.name, source_node)
2928
2929     if not self.rpc.call_instance_shutdown(source_node, instance):
2930       if self.op.ignore_consistency:
2931         self.proc.LogWarning("Could not shutdown instance %s on node %s."
2932                              " Proceeding"
2933                              " anyway. Please make sure node %s is down",
2934                              instance.name, source_node, source_node)
2935       else:
2936         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2937                                  (instance.name, source_node))
2938
2939     feedback_fn("* deactivating the instance's disks on source node")
2940     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2941       raise errors.OpExecError("Can't shut down the instance's disks.")
2942
2943     instance.primary_node = target_node
2944     # distribute new instance config to the other nodes
2945     self.cfg.Update(instance)
2946
2947     # Only start the instance if it's marked as up
2948     if instance.status == "up":
2949       feedback_fn("* activating the instance's disks on target node")
2950       logging.info("Starting instance %s on node %s",
2951                    instance.name, target_node)
2952
2953       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2954                                                ignore_secondaries=True)
2955       if not disks_ok:
2956         _ShutdownInstanceDisks(self, instance)
2957         raise errors.OpExecError("Can't activate the instance's disks")
2958
2959       feedback_fn("* starting the instance on the target node")
2960       if not self.rpc.call_instance_start(target_node, instance, None):
2961         _ShutdownInstanceDisks(self, instance)
2962         raise errors.OpExecError("Could not start instance %s on node %s." %
2963                                  (instance.name, target_node))
2964
2965
2966 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2967   """Create a tree of block devices on the primary node.
2968
2969   This always creates all devices.
2970
2971   """
2972   if device.children:
2973     for child in device.children:
2974       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2975         return False
2976
2977   lu.cfg.SetDiskID(device, node)
2978   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2979                                        instance.name, True, info)
2980   if not new_id:
2981     return False
2982   if device.physical_id is None:
2983     device.physical_id = new_id
2984   return True
2985
2986
2987 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2988   """Create a tree of block devices on a secondary node.
2989
2990   If this device type has to be created on secondaries, create it and
2991   all its children.
2992
2993   If not, just recurse to children keeping the same 'force' value.
2994
2995   """
2996   if device.CreateOnSecondary():
2997     force = True
2998   if device.children:
2999     for child in device.children:
3000       if not _CreateBlockDevOnSecondary(lu, node, instance,
3001                                         child, force, info):
3002         return False
3003
3004   if not force:
3005     return True
3006   lu.cfg.SetDiskID(device, node)
3007   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3008                                        instance.name, False, info)
3009   if not new_id:
3010     return False
3011   if device.physical_id is None:
3012     device.physical_id = new_id
3013   return True
3014
3015
3016 def _GenerateUniqueNames(lu, exts):
3017   """Generate a suitable LV name.
3018
3019   This will generate a logical volume name for the given instance.
3020
3021   """
3022   results = []
3023   for val in exts:
3024     new_id = lu.cfg.GenerateUniqueID()
3025     results.append("%s%s" % (new_id, val))
3026   return results
3027
3028
3029 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3030                          p_minor, s_minor):
3031   """Generate a drbd8 device complete with its children.
3032
3033   """
3034   port = lu.cfg.AllocatePort()
3035   vgname = lu.cfg.GetVGName()
3036   shared_secret = lu.cfg.GenerateDRBDSecret()
3037   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3038                           logical_id=(vgname, names[0]))
3039   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3040                           logical_id=(vgname, names[1]))
3041   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3042                           logical_id=(primary, secondary, port,
3043                                       p_minor, s_minor,
3044                                       shared_secret),
3045                           children=[dev_data, dev_meta],
3046                           iv_name=iv_name)
3047   return drbd_dev
3048
3049
3050 def _GenerateDiskTemplate(lu, template_name,
3051                           instance_name, primary_node,
3052                           secondary_nodes, disk_sz, swap_sz,
3053                           file_storage_dir, file_driver):
3054   """Generate the entire disk layout for a given template type.
3055
3056   """
3057   #TODO: compute space requirements
3058
3059   vgname = lu.cfg.GetVGName()
3060   if template_name == constants.DT_DISKLESS:
3061     disks = []
3062   elif template_name == constants.DT_PLAIN:
3063     if len(secondary_nodes) != 0:
3064       raise errors.ProgrammerError("Wrong template configuration")
3065
3066     names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3067     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3068                            logical_id=(vgname, names[0]),
3069                            iv_name = "sda")
3070     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3071                            logical_id=(vgname, names[1]),
3072                            iv_name = "sdb")
3073     disks = [sda_dev, sdb_dev]
3074   elif template_name == constants.DT_DRBD8:
3075     if len(secondary_nodes) != 1:
3076       raise errors.ProgrammerError("Wrong template configuration")
3077     remote_node = secondary_nodes[0]
3078     (minor_pa, minor_pb,
3079      minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3080       [primary_node, primary_node, remote_node, remote_node], instance_name)
3081
3082     names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3083                                       ".sdb_data", ".sdb_meta"])
3084     drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3085                                         disk_sz, names[0:2], "sda",
3086                                         minor_pa, minor_sa)
3087     drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3088                                         swap_sz, names[2:4], "sdb",
3089                                         minor_pb, minor_sb)
3090     disks = [drbd_sda_dev, drbd_sdb_dev]
3091   elif template_name == constants.DT_FILE:
3092     if len(secondary_nodes) != 0:
3093       raise errors.ProgrammerError("Wrong template configuration")
3094
3095     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3096                                 iv_name="sda", logical_id=(file_driver,
3097                                 "%s/sda" % file_storage_dir))
3098     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3099                                 iv_name="sdb", logical_id=(file_driver,
3100                                 "%s/sdb" % file_storage_dir))
3101     disks = [file_sda_dev, file_sdb_dev]
3102   else:
3103     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3104   return disks
3105
3106
3107 def _GetInstanceInfoText(instance):
3108   """Compute that text that should be added to the disk's metadata.
3109
3110   """
3111   return "originstname+%s" % instance.name
3112
3113
3114 def _CreateDisks(lu, instance):
3115   """Create all disks for an instance.
3116
3117   This abstracts away some work from AddInstance.
3118
3119   Args:
3120     instance: the instance object
3121
3122   Returns:
3123     True or False showing the success of the creation process
3124
3125   """
3126   info = _GetInstanceInfoText(instance)
3127
3128   if instance.disk_template == constants.DT_FILE:
3129     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3130     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3131                                                  file_storage_dir)
3132
3133     if not result:
3134       logging.error("Could not connect to node '%s'", instance.primary_node)
3135       return False
3136
3137     if not result[0]:
3138       logging.error("Failed to create directory '%s'", file_storage_dir)
3139       return False
3140
3141   for device in instance.disks:
3142     logging.info("Creating volume %s for instance %s",
3143                  device.iv_name, instance.name)
3144     #HARDCODE
3145     for secondary_node in instance.secondary_nodes:
3146       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3147                                         device, False, info):
3148         logging.error("Failed to create volume %s (%s) on secondary node %s!",
3149                       device.iv_name, device, secondary_node)
3150         return False
3151     #HARDCODE
3152     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3153                                     instance, device, info):
3154       logging.error("Failed to create volume %s on primary!", device.iv_name)
3155       return False
3156
3157   return True
3158
3159
3160 def _RemoveDisks(lu, instance):
3161   """Remove all disks for an instance.
3162
3163   This abstracts away some work from `AddInstance()` and
3164   `RemoveInstance()`. Note that in case some of the devices couldn't
3165   be removed, the removal will continue with the other ones (compare
3166   with `_CreateDisks()`).
3167
3168   Args:
3169     instance: the instance object
3170
3171   Returns:
3172     True or False showing the success of the removal proces
3173
3174   """
3175   logging.info("Removing block devices for instance %s", instance.name)
3176
3177   result = True
3178   for device in instance.disks:
3179     for node, disk in device.ComputeNodeTree(instance.primary_node):
3180       lu.cfg.SetDiskID(disk, node)
3181       if not lu.rpc.call_blockdev_remove(node, disk):
3182         lu.proc.LogWarning("Could not remove block device %s on node %s,"
3183                            " continuing anyway", device.iv_name, node)
3184         result = False
3185
3186   if instance.disk_template == constants.DT_FILE:
3187     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3188     if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3189                                                file_storage_dir):
3190       logging.error("Could not remove directory '%s'", file_storage_dir)
3191       result = False
3192
3193   return result
3194
3195
3196 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3197   """Compute disk size requirements in the volume group
3198
3199   This is currently hard-coded for the two-drive layout.
3200
3201   """
3202   # Required free disk space as a function of disk and swap space
3203   req_size_dict = {
3204     constants.DT_DISKLESS: None,
3205     constants.DT_PLAIN: disk_size + swap_size,
3206     # 256 MB are added for drbd metadata, 128MB for each drbd device
3207     constants.DT_DRBD8: disk_size + swap_size + 256,
3208     constants.DT_FILE: None,
3209   }
3210
3211   if disk_template not in req_size_dict:
3212     raise errors.ProgrammerError("Disk template '%s' size requirement"
3213                                  " is unknown" %  disk_template)
3214
3215   return req_size_dict[disk_template]
3216
3217
3218 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3219   """Hypervisor parameter validation.
3220
3221   This function abstract the hypervisor parameter validation to be
3222   used in both instance create and instance modify.
3223
3224   @type lu: L{LogicalUnit}
3225   @param lu: the logical unit for which we check
3226   @type nodenames: list
3227   @param nodenames: the list of nodes on which we should check
3228   @type hvname: string
3229   @param hvname: the name of the hypervisor we should use
3230   @type hvparams: dict
3231   @param hvparams: the parameters which we need to check
3232   @raise errors.OpPrereqError: if the parameters are not valid
3233
3234   """
3235   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3236                                                   hvname,
3237                                                   hvparams)
3238   for node in nodenames:
3239     info = hvinfo.get(node, None)
3240     if not info or not isinstance(info, (tuple, list)):
3241       raise errors.OpPrereqError("Cannot get current information"
3242                                  " from node '%s' (%s)" % (node, info))
3243     if not info[0]:
3244       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3245                                  " %s" % info[1])
3246
3247
3248 class LUCreateInstance(LogicalUnit):
3249   """Create an instance.
3250
3251   """
3252   HPATH = "instance-add"
3253   HTYPE = constants.HTYPE_INSTANCE
3254   _OP_REQP = ["instance_name", "disk_size",
3255               "disk_template", "swap_size", "mode", "start",
3256               "wait_for_sync", "ip_check", "mac",
3257               "hvparams", "beparams"]
3258   REQ_BGL = False
3259
3260   def _ExpandNode(self, node):
3261     """Expands and checks one node name.
3262
3263     """
3264     node_full = self.cfg.ExpandNodeName(node)
3265     if node_full is None:
3266       raise errors.OpPrereqError("Unknown node %s" % node)
3267     return node_full
3268
3269   def ExpandNames(self):
3270     """ExpandNames for CreateInstance.
3271
3272     Figure out the right locks for instance creation.
3273
3274     """
3275     self.needed_locks = {}
3276
3277     # set optional parameters to none if they don't exist
3278     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3279       if not hasattr(self.op, attr):
3280         setattr(self.op, attr, None)
3281
3282     # cheap checks, mostly valid constants given
3283
3284     # verify creation mode
3285     if self.op.mode not in (constants.INSTANCE_CREATE,
3286                             constants.INSTANCE_IMPORT):
3287       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3288                                  self.op.mode)
3289
3290     # disk template and mirror node verification
3291     if self.op.disk_template not in constants.DISK_TEMPLATES:
3292       raise errors.OpPrereqError("Invalid disk template name")
3293
3294     if self.op.hypervisor is None:
3295       self.op.hypervisor = self.cfg.GetHypervisorType()
3296
3297     cluster = self.cfg.GetClusterInfo()
3298     enabled_hvs = cluster.enabled_hypervisors
3299     if self.op.hypervisor not in enabled_hvs:
3300       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3301                                  " cluster (%s)" % (self.op.hypervisor,
3302                                   ",".join(enabled_hvs)))
3303
3304     # check hypervisor parameter syntax (locally)
3305
3306     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3307                                   self.op.hvparams)
3308     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3309     hv_type.CheckParameterSyntax(filled_hvp)
3310
3311     # fill and remember the beparams dict
3312     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3313                                     self.op.beparams)
3314
3315     #### instance parameters check
3316
3317     # instance name verification
3318     hostname1 = utils.HostInfo(self.op.instance_name)
3319     self.op.instance_name = instance_name = hostname1.name
3320
3321     # this is just a preventive check, but someone might still add this
3322     # instance in the meantime, and creation will fail at lock-add time
3323     if instance_name in self.cfg.GetInstanceList():
3324       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3325                                  instance_name)
3326
3327     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3328
3329     # ip validity checks
3330     ip = getattr(self.op, "ip", None)
3331     if ip is None or ip.lower() == "none":
3332       inst_ip = None
3333     elif ip.lower() == constants.VALUE_AUTO:
3334       inst_ip = hostname1.ip
3335     else:
3336       if not utils.IsValidIP(ip):
3337         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3338                                    " like a valid IP" % ip)
3339       inst_ip = ip
3340     self.inst_ip = self.op.ip = inst_ip
3341     # used in CheckPrereq for ip ping check
3342     self.check_ip = hostname1.ip
3343
3344     # MAC address verification
3345     if self.op.mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3346       if not utils.IsValidMac(self.op.mac.lower()):
3347         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3348                                    self.op.mac)
3349
3350     # file storage checks
3351     if (self.op.file_driver and
3352         not self.op.file_driver in constants.FILE_DRIVER):
3353       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3354                                  self.op.file_driver)
3355
3356     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3357       raise errors.OpPrereqError("File storage directory path not absolute")
3358
3359     ### Node/iallocator related checks
3360     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3361       raise errors.OpPrereqError("One and only one of iallocator and primary"
3362                                  " node must be given")
3363
3364     if self.op.iallocator:
3365       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3366     else:
3367       self.op.pnode = self._ExpandNode(self.op.pnode)
3368       nodelist = [self.op.pnode]
3369       if self.op.snode is not None:
3370         self.op.snode = self._ExpandNode(self.op.snode)
3371         nodelist.append(self.op.snode)
3372       self.needed_locks[locking.LEVEL_NODE] = nodelist
3373
3374     # in case of import lock the source node too
3375     if self.op.mode == constants.INSTANCE_IMPORT:
3376       src_node = getattr(self.op, "src_node", None)
3377       src_path = getattr(self.op, "src_path", None)
3378
3379       if src_node is None or src_path is None:
3380         raise errors.OpPrereqError("Importing an instance requires source"
3381                                    " node and path options")
3382
3383       if not os.path.isabs(src_path):
3384         raise errors.OpPrereqError("The source path must be absolute")
3385
3386       self.op.src_node = src_node = self._ExpandNode(src_node)
3387       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3388         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3389
3390     else: # INSTANCE_CREATE
3391       if getattr(self.op, "os_type", None) is None:
3392         raise errors.OpPrereqError("No guest OS specified")
3393
3394   def _RunAllocator(self):
3395     """Run the allocator based on input opcode.
3396
3397     """
3398     disks = [{"size": self.op.disk_size, "mode": "w"},
3399              {"size": self.op.swap_size, "mode": "w"}]
3400     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3401              "bridge": self.op.bridge}]
3402     ial = IAllocator(self,
3403                      mode=constants.IALLOCATOR_MODE_ALLOC,
3404                      name=self.op.instance_name,
3405                      disk_template=self.op.disk_template,
3406                      tags=[],
3407                      os=self.op.os_type,
3408                      vcpus=self.be_full[constants.BE_VCPUS],
3409                      mem_size=self.be_full[constants.BE_MEMORY],
3410                      disks=disks,
3411                      nics=nics,
3412                      )
3413
3414     ial.Run(self.op.iallocator)
3415
3416     if not ial.success:
3417       raise errors.OpPrereqError("Can't compute nodes using"
3418                                  " iallocator '%s': %s" % (self.op.iallocator,
3419                                                            ial.info))
3420     if len(ial.nodes) != ial.required_nodes:
3421       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3422                                  " of nodes (%s), required %s" %
3423                                  (self.op.iallocator, len(ial.nodes),
3424                                   ial.required_nodes))
3425     self.op.pnode = ial.nodes[0]
3426     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3427                  self.op.instance_name, self.op.iallocator,
3428                  ", ".join(ial.nodes))
3429     if ial.required_nodes == 2:
3430       self.op.snode = ial.nodes[1]
3431
3432   def BuildHooksEnv(self):
3433     """Build hooks env.
3434
3435     This runs on master, primary and secondary nodes of the instance.
3436
3437     """
3438     env = {
3439       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3440       "INSTANCE_DISK_SIZE": self.op.disk_size,
3441       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3442       "INSTANCE_ADD_MODE": self.op.mode,
3443       }
3444     if self.op.mode == constants.INSTANCE_IMPORT:
3445       env["INSTANCE_SRC_NODE"] = self.op.src_node
3446       env["INSTANCE_SRC_PATH"] = self.op.src_path
3447       env["INSTANCE_SRC_IMAGES"] = self.src_images
3448
3449     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3450       primary_node=self.op.pnode,
3451       secondary_nodes=self.secondaries,
3452       status=self.instance_status,
3453       os_type=self.op.os_type,
3454       memory=self.be_full[constants.BE_MEMORY],
3455       vcpus=self.be_full[constants.BE_VCPUS],
3456       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3457     ))
3458
3459     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3460           self.secondaries)
3461     return env, nl, nl
3462
3463
3464   def CheckPrereq(self):
3465     """Check prerequisites.
3466
3467     """
3468     if (not self.cfg.GetVGName() and
3469         self.op.disk_template not in constants.DTS_NOT_LVM):
3470       raise errors.OpPrereqError("Cluster does not support lvm-based"
3471                                  " instances")
3472
3473
3474     if self.op.mode == constants.INSTANCE_IMPORT:
3475       src_node = self.op.src_node
3476       src_path = self.op.src_path
3477
3478       export_info = self.rpc.call_export_info(src_node, src_path)
3479
3480       if not export_info:
3481         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3482
3483       if not export_info.has_section(constants.INISECT_EXP):
3484         raise errors.ProgrammerError("Corrupted export config")
3485
3486       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3487       if (int(ei_version) != constants.EXPORT_VERSION):
3488         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3489                                    (ei_version, constants.EXPORT_VERSION))
3490
3491       # Check that the new instance doesn't have less disks than the export
3492       # TODO: substitute "2" with the actual number of disks requested
3493       instance_disks = 2
3494       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3495       if instance_disks < export_disks:
3496         raise errors.OpPrereqError("Not enough disks to import."
3497                                    " (instance: %d, export: %d)" %
3498                                    (2, export_disks))
3499
3500       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3501       disk_images = []
3502       for idx in range(export_disks):
3503         option = 'disk%d_dump' % idx
3504         if export_info.has_option(constants.INISECT_INS, option):
3505           # FIXME: are the old os-es, disk sizes, etc. useful?
3506           export_name = export_info.get(constants.INISECT_INS, option)
3507           image = os.path.join(src_path, export_name)
3508           disk_images.append(image)
3509         else:
3510           disk_images.append(False)
3511
3512       self.src_images = disk_images
3513
3514       if self.op.mac == constants.VALUE_AUTO:
3515         old_name = export_info.get(constants.INISECT_INS, 'name')
3516         if self.op.instance_name == old_name:
3517           # FIXME: adjust every nic, when we'll be able to create instances
3518           # with more than one
3519           if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
3520             self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
3521
3522     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3523
3524     if self.op.start and not self.op.ip_check:
3525       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3526                                  " adding an instance in start mode")
3527
3528     if self.op.ip_check:
3529       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3530         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3531                                    (self.check_ip, self.op.instance_name))
3532
3533     # bridge verification
3534     bridge = getattr(self.op, "bridge", None)
3535     if bridge is None:
3536       self.op.bridge = self.cfg.GetDefBridge()
3537     else:
3538       self.op.bridge = bridge
3539
3540     #### allocator run
3541
3542     if self.op.iallocator is not None:
3543       self._RunAllocator()
3544
3545     #### node related checks
3546
3547     # check primary node
3548     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3549     assert self.pnode is not None, \
3550       "Cannot retrieve locked node %s" % self.op.pnode
3551     self.secondaries = []
3552
3553     # mirror node verification
3554     if self.op.disk_template in constants.DTS_NET_MIRROR:
3555       if self.op.snode is None:
3556         raise errors.OpPrereqError("The networked disk templates need"
3557                                    " a mirror node")
3558       if self.op.snode == pnode.name:
3559         raise errors.OpPrereqError("The secondary node cannot be"
3560                                    " the primary node.")
3561       self.secondaries.append(self.op.snode)
3562
3563     nodenames = [pnode.name] + self.secondaries
3564
3565     req_size = _ComputeDiskSize(self.op.disk_template,
3566                                 self.op.disk_size, self.op.swap_size)
3567
3568     # Check lv size requirements
3569     if req_size is not None:
3570       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3571                                          self.op.hypervisor)
3572       for node in nodenames:
3573         info = nodeinfo.get(node, None)
3574         if not info:
3575           raise errors.OpPrereqError("Cannot get current information"
3576                                      " from node '%s'" % node)
3577         vg_free = info.get('vg_free', None)
3578         if not isinstance(vg_free, int):
3579           raise errors.OpPrereqError("Can't compute free disk space on"
3580                                      " node %s" % node)
3581         if req_size > info['vg_free']:
3582           raise errors.OpPrereqError("Not enough disk space on target node %s."
3583                                      " %d MB available, %d MB required" %
3584                                      (node, info['vg_free'], req_size))
3585
3586     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3587
3588     # os verification
3589     os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3590     if not os_obj:
3591       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3592                                  " primary node"  % self.op.os_type)
3593
3594     # bridge check on primary node
3595     if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3596       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3597                                  " destination node '%s'" %
3598                                  (self.op.bridge, pnode.name))
3599
3600     # memory check on primary node
3601     if self.op.start:
3602       _CheckNodeFreeMemory(self, self.pnode.name,
3603                            "creating instance %s" % self.op.instance_name,
3604                            self.be_full[constants.BE_MEMORY],
3605                            self.op.hypervisor)
3606
3607     if self.op.start:
3608       self.instance_status = 'up'
3609     else:
3610       self.instance_status = 'down'
3611
3612   def Exec(self, feedback_fn):
3613     """Create and add the instance to the cluster.
3614
3615     """
3616     instance = self.op.instance_name
3617     pnode_name = self.pnode.name
3618
3619     if self.op.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3620       mac_address = self.cfg.GenerateMAC()
3621     else:
3622       mac_address = self.op.mac
3623
3624     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3625     if self.inst_ip is not None:
3626       nic.ip = self.inst_ip
3627
3628     ht_kind = self.op.hypervisor
3629     if ht_kind in constants.HTS_REQ_PORT:
3630       network_port = self.cfg.AllocatePort()
3631     else:
3632       network_port = None
3633
3634     ##if self.op.vnc_bind_address is None:
3635     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3636
3637     # this is needed because os.path.join does not accept None arguments
3638     if self.op.file_storage_dir is None:
3639       string_file_storage_dir = ""
3640     else:
3641       string_file_storage_dir = self.op.file_storage_dir
3642
3643     # build the full file storage dir path
3644     file_storage_dir = os.path.normpath(os.path.join(
3645                                         self.cfg.GetFileStorageDir(),
3646                                         string_file_storage_dir, instance))
3647
3648
3649     disks = _GenerateDiskTemplate(self,
3650                                   self.op.disk_template,
3651                                   instance, pnode_name,
3652                                   self.secondaries, self.op.disk_size,
3653                                   self.op.swap_size,
3654                                   file_storage_dir,
3655                                   self.op.file_driver)
3656
3657     iobj = objects.Instance(name=instance, os=self.op.os_type,
3658                             primary_node=pnode_name,
3659                             nics=[nic], disks=disks,
3660                             disk_template=self.op.disk_template,
3661                             status=self.instance_status,
3662                             network_port=network_port,
3663                             beparams=self.op.beparams,
3664                             hvparams=self.op.hvparams,
3665                             hypervisor=self.op.hypervisor,
3666                             )
3667
3668     feedback_fn("* creating instance disks...")
3669     if not _CreateDisks(self, iobj):
3670       _RemoveDisks(self, iobj)
3671       self.cfg.ReleaseDRBDMinors(instance)
3672       raise errors.OpExecError("Device creation failed, reverting...")
3673
3674     feedback_fn("adding instance %s to cluster config" % instance)
3675
3676     self.cfg.AddInstance(iobj)
3677     # Declare that we don't want to remove the instance lock anymore, as we've
3678     # added the instance to the config
3679     del self.remove_locks[locking.LEVEL_INSTANCE]
3680     # Remove the temp. assignements for the instance's drbds
3681     self.cfg.ReleaseDRBDMinors(instance)
3682
3683     if self.op.wait_for_sync:
3684       disk_abort = not _WaitForSync(self, iobj)
3685     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3686       # make sure the disks are not degraded (still sync-ing is ok)
3687       time.sleep(15)
3688       feedback_fn("* checking mirrors status")
3689       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3690     else:
3691       disk_abort = False
3692
3693     if disk_abort:
3694       _RemoveDisks(self, iobj)
3695       self.cfg.RemoveInstance(iobj.name)
3696       # Make sure the instance lock gets removed
3697       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3698       raise errors.OpExecError("There are some degraded disks for"
3699                                " this instance")
3700
3701     feedback_fn("creating os for instance %s on node %s" %
3702                 (instance, pnode_name))
3703
3704     if iobj.disk_template != constants.DT_DISKLESS:
3705       if self.op.mode == constants.INSTANCE_CREATE:
3706         feedback_fn("* running the instance OS create scripts...")
3707         if not self.rpc.call_instance_os_add(pnode_name, iobj):
3708           raise errors.OpExecError("could not add os for instance %s"
3709                                    " on node %s" %
3710                                    (instance, pnode_name))
3711
3712       elif self.op.mode == constants.INSTANCE_IMPORT:
3713         feedback_fn("* running the instance OS import scripts...")
3714         src_node = self.op.src_node
3715         src_images = self.src_images
3716         cluster_name = self.cfg.GetClusterName()
3717         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3718                                                          src_node, src_images,
3719                                                          cluster_name)
3720         for idx, result in enumerate(import_result):
3721           if not result:
3722             self.LogWarning("Could not image %s for on instance %s, disk %d,"
3723                             " on node %s" % (src_images[idx], instance, idx,
3724                                              pnode_name))
3725       else:
3726         # also checked in the prereq part
3727         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3728                                      % self.op.mode)
3729
3730     if self.op.start:
3731       logging.info("Starting instance %s on node %s", instance, pnode_name)
3732       feedback_fn("* starting instance...")
3733       if not self.rpc.call_instance_start(pnode_name, iobj, None):
3734         raise errors.OpExecError("Could not start instance")
3735
3736
3737 class LUConnectConsole(NoHooksLU):
3738   """Connect to an instance's console.
3739
3740   This is somewhat special in that it returns the command line that
3741   you need to run on the master node in order to connect to the
3742   console.
3743
3744   """
3745   _OP_REQP = ["instance_name"]
3746   REQ_BGL = False
3747
3748   def ExpandNames(self):
3749     self._ExpandAndLockInstance()
3750
3751   def CheckPrereq(self):
3752     """Check prerequisites.
3753
3754     This checks that the instance is in the cluster.
3755
3756     """
3757     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3758     assert self.instance is not None, \
3759       "Cannot retrieve locked instance %s" % self.op.instance_name
3760
3761   def Exec(self, feedback_fn):
3762     """Connect to the console of an instance
3763
3764     """
3765     instance = self.instance
3766     node = instance.primary_node
3767
3768     node_insts = self.rpc.call_instance_list([node],
3769                                              [instance.hypervisor])[node]
3770     if node_insts is False:
3771       raise errors.OpExecError("Can't connect to node %s." % node)
3772
3773     if instance.name not in node_insts:
3774       raise errors.OpExecError("Instance %s is not running." % instance.name)
3775
3776     logging.debug("Connecting to console of %s on %s", instance.name, node)
3777
3778     hyper = hypervisor.GetHypervisor(instance.hypervisor)
3779     console_cmd = hyper.GetShellCommandForConsole(instance)
3780
3781     # build ssh cmdline
3782     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3783
3784
3785 class LUReplaceDisks(LogicalUnit):
3786   """Replace the disks of an instance.
3787
3788   """
3789   HPATH = "mirrors-replace"
3790   HTYPE = constants.HTYPE_INSTANCE
3791   _OP_REQP = ["instance_name", "mode", "disks"]
3792   REQ_BGL = False
3793
3794   def ExpandNames(self):
3795     self._ExpandAndLockInstance()
3796
3797     if not hasattr(self.op, "remote_node"):
3798       self.op.remote_node = None
3799
3800     ia_name = getattr(self.op, "iallocator", None)
3801     if ia_name is not None:
3802       if self.op.remote_node is not None:
3803         raise errors.OpPrereqError("Give either the iallocator or the new"
3804                                    " secondary, not both")
3805       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3806     elif self.op.remote_node is not None:
3807       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3808       if remote_node is None:
3809         raise errors.OpPrereqError("Node '%s' not known" %
3810                                    self.op.remote_node)
3811       self.op.remote_node = remote_node
3812       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3813       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3814     else:
3815       self.needed_locks[locking.LEVEL_NODE] = []
3816       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3817
3818   def DeclareLocks(self, level):
3819     # If we're not already locking all nodes in the set we have to declare the
3820     # instance's primary/secondary nodes.
3821     if (level == locking.LEVEL_NODE and
3822         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3823       self._LockInstancesNodes()
3824
3825   def _RunAllocator(self):
3826     """Compute a new secondary node using an IAllocator.
3827
3828     """
3829     ial = IAllocator(self,
3830                      mode=constants.IALLOCATOR_MODE_RELOC,
3831                      name=self.op.instance_name,
3832                      relocate_from=[self.sec_node])
3833
3834     ial.Run(self.op.iallocator)
3835
3836     if not ial.success:
3837       raise errors.OpPrereqError("Can't compute nodes using"
3838                                  " iallocator '%s': %s" % (self.op.iallocator,
3839                                                            ial.info))
3840     if len(ial.nodes) != ial.required_nodes:
3841       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3842                                  " of nodes (%s), required %s" %
3843                                  (len(ial.nodes), ial.required_nodes))
3844     self.op.remote_node = ial.nodes[0]
3845     self.LogInfo("Selected new secondary for the instance: %s",
3846                  self.op.remote_node)
3847
3848   def BuildHooksEnv(self):
3849     """Build hooks env.
3850
3851     This runs on the master, the primary and all the secondaries.
3852
3853     """
3854     env = {
3855       "MODE": self.op.mode,
3856       "NEW_SECONDARY": self.op.remote_node,
3857       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3858       }
3859     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3860     nl = [
3861       self.cfg.GetMasterNode(),
3862       self.instance.primary_node,
3863       ]
3864     if self.op.remote_node is not None:
3865       nl.append(self.op.remote_node)
3866     return env, nl, nl
3867
3868   def CheckPrereq(self):
3869     """Check prerequisites.
3870
3871     This checks that the instance is in the cluster.
3872
3873     """
3874     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3875     assert instance is not None, \
3876       "Cannot retrieve locked instance %s" % self.op.instance_name
3877     self.instance = instance
3878
3879     if instance.disk_template not in constants.DTS_NET_MIRROR:
3880       raise errors.OpPrereqError("Instance's disk layout is not"
3881                                  " network mirrored.")
3882
3883     if len(instance.secondary_nodes) != 1:
3884       raise errors.OpPrereqError("The instance has a strange layout,"
3885                                  " expected one secondary but found %d" %
3886                                  len(instance.secondary_nodes))
3887
3888     self.sec_node = instance.secondary_nodes[0]
3889
3890     ia_name = getattr(self.op, "iallocator", None)
3891     if ia_name is not None:
3892       self._RunAllocator()
3893
3894     remote_node = self.op.remote_node
3895     if remote_node is not None:
3896       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3897       assert self.remote_node_info is not None, \
3898         "Cannot retrieve locked node %s" % remote_node
3899     else:
3900       self.remote_node_info = None
3901     if remote_node == instance.primary_node:
3902       raise errors.OpPrereqError("The specified node is the primary node of"
3903                                  " the instance.")
3904     elif remote_node == self.sec_node:
3905       if self.op.mode == constants.REPLACE_DISK_SEC:
3906         # this is for DRBD8, where we can't execute the same mode of
3907         # replacement as for drbd7 (no different port allocated)
3908         raise errors.OpPrereqError("Same secondary given, cannot execute"
3909                                    " replacement")
3910     if instance.disk_template == constants.DT_DRBD8:
3911       if (self.op.mode == constants.REPLACE_DISK_ALL and
3912           remote_node is not None):
3913         # switch to replace secondary mode
3914         self.op.mode = constants.REPLACE_DISK_SEC
3915
3916       if self.op.mode == constants.REPLACE_DISK_ALL:
3917         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3918                                    " secondary disk replacement, not"
3919                                    " both at once")
3920       elif self.op.mode == constants.REPLACE_DISK_PRI:
3921         if remote_node is not None:
3922           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3923                                      " the secondary while doing a primary"
3924                                      " node disk replacement")
3925         self.tgt_node = instance.primary_node
3926         self.oth_node = instance.secondary_nodes[0]
3927       elif self.op.mode == constants.REPLACE_DISK_SEC:
3928         self.new_node = remote_node # this can be None, in which case
3929                                     # we don't change the secondary
3930         self.tgt_node = instance.secondary_nodes[0]
3931         self.oth_node = instance.primary_node
3932       else:
3933         raise errors.ProgrammerError("Unhandled disk replace mode")
3934
3935     for name in self.op.disks:
3936       if instance.FindDisk(name) is None:
3937         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3938                                    (name, instance.name))
3939
3940   def _ExecD8DiskOnly(self, feedback_fn):
3941     """Replace a disk on the primary or secondary for dbrd8.
3942
3943     The algorithm for replace is quite complicated:
3944       - for each disk to be replaced:
3945         - create new LVs on the target node with unique names
3946         - detach old LVs from the drbd device
3947         - rename old LVs to name_replaced.<time_t>
3948         - rename new LVs to old LVs
3949         - attach the new LVs (with the old names now) to the drbd device
3950       - wait for sync across all devices
3951       - for each modified disk:
3952         - remove old LVs (which have the name name_replaces.<time_t>)
3953
3954     Failures are not very well handled.
3955
3956     """
3957     steps_total = 6
3958     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3959     instance = self.instance
3960     iv_names = {}
3961     vgname = self.cfg.GetVGName()
3962     # start of work
3963     cfg = self.cfg
3964     tgt_node = self.tgt_node
3965     oth_node = self.oth_node
3966
3967     # Step: check device activation
3968     self.proc.LogStep(1, steps_total, "check device existence")
3969     info("checking volume groups")
3970     my_vg = cfg.GetVGName()
3971     results = self.rpc.call_vg_list([oth_node, tgt_node])
3972     if not results:
3973       raise errors.OpExecError("Can't list volume groups on the nodes")
3974     for node in oth_node, tgt_node:
3975       res = results.get(node, False)
3976       if not res or my_vg not in res:
3977         raise errors.OpExecError("Volume group '%s' not found on %s" %
3978                                  (my_vg, node))
3979     for dev in instance.disks:
3980       if not dev.iv_name in self.op.disks:
3981         continue
3982       for node in tgt_node, oth_node:
3983         info("checking %s on %s" % (dev.iv_name, node))
3984         cfg.SetDiskID(dev, node)
3985         if not self.rpc.call_blockdev_find(node, dev):
3986           raise errors.OpExecError("Can't find device %s on node %s" %
3987                                    (dev.iv_name, node))
3988
3989     # Step: check other node consistency
3990     self.proc.LogStep(2, steps_total, "check peer consistency")
3991     for dev in instance.disks:
3992       if not dev.iv_name in self.op.disks:
3993         continue
3994       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3995       if not _CheckDiskConsistency(self, dev, oth_node,
3996                                    oth_node==instance.primary_node):
3997         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3998                                  " to replace disks on this node (%s)" %
3999                                  (oth_node, tgt_node))
4000
4001     # Step: create new storage
4002     self.proc.LogStep(3, steps_total, "allocate new storage")
4003     for dev in instance.disks:
4004       if not dev.iv_name in self.op.disks:
4005         continue
4006       size = dev.size
4007       cfg.SetDiskID(dev, tgt_node)
4008       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4009       names = _GenerateUniqueNames(self, lv_names)
4010       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4011                              logical_id=(vgname, names[0]))
4012       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4013                              logical_id=(vgname, names[1]))
4014       new_lvs = [lv_data, lv_meta]
4015       old_lvs = dev.children
4016       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4017       info("creating new local storage on %s for %s" %
4018            (tgt_node, dev.iv_name))
4019       # since we *always* want to create this LV, we use the
4020       # _Create...OnPrimary (which forces the creation), even if we
4021       # are talking about the secondary node
4022       for new_lv in new_lvs:
4023         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4024                                         _GetInstanceInfoText(instance)):
4025           raise errors.OpExecError("Failed to create new LV named '%s' on"
4026                                    " node '%s'" %
4027                                    (new_lv.logical_id[1], tgt_node))
4028
4029     # Step: for each lv, detach+rename*2+attach
4030     self.proc.LogStep(4, steps_total, "change drbd configuration")
4031     for dev, old_lvs, new_lvs in iv_names.itervalues():
4032       info("detaching %s drbd from local storage" % dev.iv_name)
4033       if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4034         raise errors.OpExecError("Can't detach drbd from local storage on node"
4035                                  " %s for device %s" % (tgt_node, dev.iv_name))
4036       #dev.children = []
4037       #cfg.Update(instance)
4038
4039       # ok, we created the new LVs, so now we know we have the needed
4040       # storage; as such, we proceed on the target node to rename
4041       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4042       # using the assumption that logical_id == physical_id (which in
4043       # turn is the unique_id on that node)
4044
4045       # FIXME(iustin): use a better name for the replaced LVs
4046       temp_suffix = int(time.time())
4047       ren_fn = lambda d, suff: (d.physical_id[0],
4048                                 d.physical_id[1] + "_replaced-%s" % suff)
4049       # build the rename list based on what LVs exist on the node
4050       rlist = []
4051       for to_ren in old_lvs:
4052         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4053         if find_res is not None: # device exists
4054           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4055
4056       info("renaming the old LVs on the target node")
4057       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4058         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4059       # now we rename the new LVs to the old LVs
4060       info("renaming the new LVs on the target node")
4061       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4062       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4063         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4064
4065       for old, new in zip(old_lvs, new_lvs):
4066         new.logical_id = old.logical_id
4067         cfg.SetDiskID(new, tgt_node)
4068
4069       for disk in old_lvs:
4070         disk.logical_id = ren_fn(disk, temp_suffix)
4071         cfg.SetDiskID(disk, tgt_node)
4072
4073       # now that the new lvs have the old name, we can add them to the device
4074       info("adding new mirror component on %s" % tgt_node)
4075       if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4076         for new_lv in new_lvs:
4077           if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4078             warning("Can't rollback device %s", hint="manually cleanup unused"
4079                     " logical volumes")
4080         raise errors.OpExecError("Can't add local storage to drbd")
4081
4082       dev.children = new_lvs
4083       cfg.Update(instance)
4084
4085     # Step: wait for sync
4086
4087     # this can fail as the old devices are degraded and _WaitForSync
4088     # does a combined result over all disks, so we don't check its
4089     # return value
4090     self.proc.LogStep(5, steps_total, "sync devices")
4091     _WaitForSync(self, instance, unlock=True)
4092
4093     # so check manually all the devices
4094     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4095       cfg.SetDiskID(dev, instance.primary_node)
4096       is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4097       if is_degr:
4098         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4099
4100     # Step: remove old storage
4101     self.proc.LogStep(6, steps_total, "removing old storage")
4102     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4103       info("remove logical volumes for %s" % name)
4104       for lv in old_lvs:
4105         cfg.SetDiskID(lv, tgt_node)
4106         if not self.rpc.call_blockdev_remove(tgt_node, lv):
4107           warning("Can't remove old LV", hint="manually remove unused LVs")
4108           continue
4109
4110   def _ExecD8Secondary(self, feedback_fn):
4111     """Replace the secondary node for drbd8.
4112
4113     The algorithm for replace is quite complicated:
4114       - for all disks of the instance:
4115         - create new LVs on the new node with same names
4116         - shutdown the drbd device on the old secondary
4117         - disconnect the drbd network on the primary
4118         - create the drbd device on the new secondary
4119         - network attach the drbd on the primary, using an artifice:
4120           the drbd code for Attach() will connect to the network if it
4121           finds a device which is connected to the good local disks but
4122           not network enabled
4123       - wait for sync across all devices
4124       - remove all disks from the old secondary
4125
4126     Failures are not very well handled.
4127
4128     """
4129     steps_total = 6
4130     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4131     instance = self.instance
4132     iv_names = {}
4133     vgname = self.cfg.GetVGName()
4134     # start of work
4135     cfg = self.cfg
4136     old_node = self.tgt_node
4137     new_node = self.new_node
4138     pri_node = instance.primary_node
4139
4140     # Step: check device activation
4141     self.proc.LogStep(1, steps_total, "check device existence")
4142     info("checking volume groups")
4143     my_vg = cfg.GetVGName()
4144     results = self.rpc.call_vg_list([pri_node, new_node])
4145     if not results:
4146       raise errors.OpExecError("Can't list volume groups on the nodes")
4147     for node in pri_node, new_node:
4148       res = results.get(node, False)
4149       if not res or my_vg not in res:
4150         raise errors.OpExecError("Volume group '%s' not found on %s" %
4151                                  (my_vg, node))
4152     for dev in instance.disks:
4153       if not dev.iv_name in self.op.disks:
4154         continue
4155       info("checking %s on %s" % (dev.iv_name, pri_node))
4156       cfg.SetDiskID(dev, pri_node)
4157       if not self.rpc.call_blockdev_find(pri_node, dev):
4158         raise errors.OpExecError("Can't find device %s on node %s" %
4159                                  (dev.iv_name, pri_node))
4160
4161     # Step: check other node consistency
4162     self.proc.LogStep(2, steps_total, "check peer consistency")
4163     for dev in instance.disks:
4164       if not dev.iv_name in self.op.disks:
4165         continue
4166       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4167       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4168         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4169                                  " unsafe to replace the secondary" %
4170                                  pri_node)
4171
4172     # Step: create new storage
4173     self.proc.LogStep(3, steps_total, "allocate new storage")
4174     for dev in instance.disks:
4175       size = dev.size
4176       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4177       # since we *always* want to create this LV, we use the
4178       # _Create...OnPrimary (which forces the creation), even if we
4179       # are talking about the secondary node
4180       for new_lv in dev.children:
4181         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4182                                         _GetInstanceInfoText(instance)):
4183           raise errors.OpExecError("Failed to create new LV named '%s' on"
4184                                    " node '%s'" %
4185                                    (new_lv.logical_id[1], new_node))
4186
4187
4188     # Step 4: dbrd minors and drbd setups changes
4189     # after this, we must manually remove the drbd minors on both the
4190     # error and the success paths
4191     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4192                                    instance.name)
4193     logging.debug("Allocated minors %s" % (minors,))
4194     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4195     for dev, new_minor in zip(instance.disks, minors):
4196       size = dev.size
4197       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4198       # create new devices on new_node
4199       if pri_node == dev.logical_id[0]:
4200         new_logical_id = (pri_node, new_node,
4201                           dev.logical_id[2], dev.logical_id[3], new_minor,
4202                           dev.logical_id[5])
4203       else:
4204         new_logical_id = (new_node, pri_node,
4205                           dev.logical_id[2], new_minor, dev.logical_id[4],
4206                           dev.logical_id[5])
4207       iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4208       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4209                     new_logical_id)
4210       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4211                               logical_id=new_logical_id,
4212                               children=dev.children)
4213       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4214                                         new_drbd, False,
4215                                         _GetInstanceInfoText(instance)):
4216         self.cfg.ReleaseDRBDMinors(instance.name)
4217         raise errors.OpExecError("Failed to create new DRBD on"
4218                                  " node '%s'" % new_node)
4219
4220     for dev in instance.disks:
4221       # we have new devices, shutdown the drbd on the old secondary
4222       info("shutting down drbd for %s on old node" % dev.iv_name)
4223       cfg.SetDiskID(dev, old_node)
4224       if not self.rpc.call_blockdev_shutdown(old_node, dev):
4225         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4226                 hint="Please cleanup this device manually as soon as possible")
4227
4228     info("detaching primary drbds from the network (=> standalone)")
4229     done = 0
4230     for dev in instance.disks:
4231       cfg.SetDiskID(dev, pri_node)
4232       # set the network part of the physical (unique in bdev terms) id
4233       # to None, meaning detach from network
4234       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4235       # and 'find' the device, which will 'fix' it to match the
4236       # standalone state
4237       if self.rpc.call_blockdev_find(pri_node, dev):
4238         done += 1
4239       else:
4240         warning("Failed to detach drbd %s from network, unusual case" %
4241                 dev.iv_name)
4242
4243     if not done:
4244       # no detaches succeeded (very unlikely)
4245       self.cfg.ReleaseDRBDMinors(instance.name)
4246       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4247
4248     # if we managed to detach at least one, we update all the disks of
4249     # the instance to point to the new secondary
4250     info("updating instance configuration")
4251     for dev, _, new_logical_id in iv_names.itervalues():
4252       dev.logical_id = new_logical_id
4253       cfg.SetDiskID(dev, pri_node)
4254     cfg.Update(instance)
4255     # we can remove now the temp minors as now the new values are
4256     # written to the config file (and therefore stable)
4257     self.cfg.ReleaseDRBDMinors(instance.name)
4258
4259     # and now perform the drbd attach
4260     info("attaching primary drbds to new secondary (standalone => connected)")
4261     failures = []
4262     for dev in instance.disks:
4263       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4264       # since the attach is smart, it's enough to 'find' the device,
4265       # it will automatically activate the network, if the physical_id
4266       # is correct
4267       cfg.SetDiskID(dev, pri_node)
4268       logging.debug("Disk to attach: %s", dev)
4269       if not self.rpc.call_blockdev_find(pri_node, dev):
4270         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4271                 "please do a gnt-instance info to see the status of disks")
4272
4273     # this can fail as the old devices are degraded and _WaitForSync
4274     # does a combined result over all disks, so we don't check its
4275     # return value
4276     self.proc.LogStep(5, steps_total, "sync devices")
4277     _WaitForSync(self, instance, unlock=True)
4278
4279     # so check manually all the devices
4280     for name, (dev, old_lvs, _) in iv_names.iteritems():
4281       cfg.SetDiskID(dev, pri_node)
4282       is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4283       if is_degr:
4284         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4285
4286     self.proc.LogStep(6, steps_total, "removing old storage")
4287     for name, (dev, old_lvs, _) in iv_names.iteritems():
4288       info("remove logical volumes for %s" % name)
4289       for lv in old_lvs:
4290         cfg.SetDiskID(lv, old_node)
4291         if not self.rpc.call_blockdev_remove(old_node, lv):
4292           warning("Can't remove LV on old secondary",
4293                   hint="Cleanup stale volumes by hand")
4294
4295   def Exec(self, feedback_fn):
4296     """Execute disk replacement.
4297
4298     This dispatches the disk replacement to the appropriate handler.
4299
4300     """
4301     instance = self.instance
4302
4303     # Activate the instance disks if we're replacing them on a down instance
4304     if instance.status == "down":
4305       _StartInstanceDisks(self, instance, True)
4306
4307     if instance.disk_template == constants.DT_DRBD8:
4308       if self.op.remote_node is None:
4309         fn = self._ExecD8DiskOnly
4310       else:
4311         fn = self._ExecD8Secondary
4312     else:
4313       raise errors.ProgrammerError("Unhandled disk replacement case")
4314
4315     ret = fn(feedback_fn)
4316
4317     # Deactivate the instance disks if we're replacing them on a down instance
4318     if instance.status == "down":
4319       _SafeShutdownInstanceDisks(self, instance)
4320
4321     return ret
4322
4323
4324 class LUGrowDisk(LogicalUnit):
4325   """Grow a disk of an instance.
4326
4327   """
4328   HPATH = "disk-grow"
4329   HTYPE = constants.HTYPE_INSTANCE
4330   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4331   REQ_BGL = False
4332
4333   def ExpandNames(self):
4334     self._ExpandAndLockInstance()
4335     self.needed_locks[locking.LEVEL_NODE] = []
4336     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4337
4338   def DeclareLocks(self, level):
4339     if level == locking.LEVEL_NODE:
4340       self._LockInstancesNodes()
4341
4342   def BuildHooksEnv(self):
4343     """Build hooks env.
4344
4345     This runs on the master, the primary and all the secondaries.
4346
4347     """
4348     env = {
4349       "DISK": self.op.disk,
4350       "AMOUNT": self.op.amount,
4351       }
4352     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4353     nl = [
4354       self.cfg.GetMasterNode(),
4355       self.instance.primary_node,
4356       ]
4357     return env, nl, nl
4358
4359   def CheckPrereq(self):
4360     """Check prerequisites.
4361
4362     This checks that the instance is in the cluster.
4363
4364     """
4365     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4366     assert instance is not None, \
4367       "Cannot retrieve locked instance %s" % self.op.instance_name
4368
4369     self.instance = instance
4370
4371     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4372       raise errors.OpPrereqError("Instance's disk layout does not support"
4373                                  " growing.")
4374
4375     if instance.FindDisk(self.op.disk) is None:
4376       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4377                                  (self.op.disk, instance.name))
4378
4379     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4380     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4381                                        instance.hypervisor)
4382     for node in nodenames:
4383       info = nodeinfo.get(node, None)
4384       if not info:
4385         raise errors.OpPrereqError("Cannot get current information"
4386                                    " from node '%s'" % node)
4387       vg_free = info.get('vg_free', None)
4388       if not isinstance(vg_free, int):
4389         raise errors.OpPrereqError("Can't compute free disk space on"
4390                                    " node %s" % node)
4391       if self.op.amount > info['vg_free']:
4392         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4393                                    " %d MiB available, %d MiB required" %
4394                                    (node, info['vg_free'], self.op.amount))
4395
4396   def Exec(self, feedback_fn):
4397     """Execute disk grow.
4398
4399     """
4400     instance = self.instance
4401     disk = instance.FindDisk(self.op.disk)
4402     for node in (instance.secondary_nodes + (instance.primary_node,)):
4403       self.cfg.SetDiskID(disk, node)
4404       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4405       if (not result or not isinstance(result, (list, tuple)) or
4406           len(result) != 2):
4407         raise errors.OpExecError("grow request failed to node %s" % node)
4408       elif not result[0]:
4409         raise errors.OpExecError("grow request failed to node %s: %s" %
4410                                  (node, result[1]))
4411     disk.RecordGrow(self.op.amount)
4412     self.cfg.Update(instance)
4413     if self.op.wait_for_sync:
4414       disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4415       if disk_abort:
4416         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4417                              " status.\nPlease check the instance.")
4418
4419
4420 class LUQueryInstanceData(NoHooksLU):
4421   """Query runtime instance data.
4422
4423   """
4424   _OP_REQP = ["instances", "static"]
4425   REQ_BGL = False
4426
4427   def ExpandNames(self):
4428     self.needed_locks = {}
4429     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4430
4431     if not isinstance(self.op.instances, list):
4432       raise errors.OpPrereqError("Invalid argument type 'instances'")
4433
4434     if self.op.instances:
4435       self.wanted_names = []
4436       for name in self.op.instances:
4437         full_name = self.cfg.ExpandInstanceName(name)
4438         if full_name is None:
4439           raise errors.OpPrereqError("Instance '%s' not known" %
4440                                      self.op.instance_name)
4441         self.wanted_names.append(full_name)
4442       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4443     else:
4444       self.wanted_names = None
4445       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4446
4447     self.needed_locks[locking.LEVEL_NODE] = []
4448     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4449
4450   def DeclareLocks(self, level):
4451     if level == locking.LEVEL_NODE:
4452       self._LockInstancesNodes()
4453
4454   def CheckPrereq(self):
4455     """Check prerequisites.
4456
4457     This only checks the optional instance list against the existing names.
4458
4459     """
4460     if self.wanted_names is None:
4461       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4462
4463     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4464                              in self.wanted_names]
4465     return
4466
4467   def _ComputeDiskStatus(self, instance, snode, dev):
4468     """Compute block device status.
4469
4470     """
4471     static = self.op.static
4472     if not static:
4473       self.cfg.SetDiskID(dev, instance.primary_node)
4474       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4475     else:
4476       dev_pstatus = None
4477
4478     if dev.dev_type in constants.LDS_DRBD:
4479       # we change the snode then (otherwise we use the one passed in)
4480       if dev.logical_id[0] == instance.primary_node:
4481         snode = dev.logical_id[1]
4482       else:
4483         snode = dev.logical_id[0]
4484
4485     if snode and not static:
4486       self.cfg.SetDiskID(dev, snode)
4487       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4488     else:
4489       dev_sstatus = None
4490
4491     if dev.children:
4492       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4493                       for child in dev.children]
4494     else:
4495       dev_children = []
4496
4497     data = {
4498       "iv_name": dev.iv_name,
4499       "dev_type": dev.dev_type,
4500       "logical_id": dev.logical_id,
4501       "physical_id": dev.physical_id,
4502       "pstatus": dev_pstatus,
4503       "sstatus": dev_sstatus,
4504       "children": dev_children,
4505       }
4506
4507     return data
4508
4509   def Exec(self, feedback_fn):
4510     """Gather and return data"""
4511     result = {}
4512
4513     cluster = self.cfg.GetClusterInfo()
4514
4515     for instance in self.wanted_instances:
4516       if not self.op.static:
4517         remote_info = self.rpc.call_instance_info(instance.primary_node,
4518                                                   instance.name,
4519                                                   instance.hypervisor)
4520         if remote_info and "state" in remote_info:
4521           remote_state = "up"
4522         else:
4523           remote_state = "down"
4524       else:
4525         remote_state = None
4526       if instance.status == "down":
4527         config_state = "down"
4528       else:
4529         config_state = "up"
4530
4531       disks = [self._ComputeDiskStatus(instance, None, device)
4532                for device in instance.disks]
4533
4534       idict = {
4535         "name": instance.name,
4536         "config_state": config_state,
4537         "run_state": remote_state,
4538         "pnode": instance.primary_node,
4539         "snodes": instance.secondary_nodes,
4540         "os": instance.os,
4541         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4542         "disks": disks,
4543         "hypervisor": instance.hypervisor,
4544         "network_port": instance.network_port,
4545         "hv_instance": instance.hvparams,
4546         "hv_actual": cluster.FillHV(instance),
4547         "be_instance": instance.beparams,
4548         "be_actual": cluster.FillBE(instance),
4549         }
4550
4551       result[instance.name] = idict
4552
4553     return result
4554
4555
4556 class LUSetInstanceParams(LogicalUnit):
4557   """Modifies an instances's parameters.
4558
4559   """
4560   HPATH = "instance-modify"
4561   HTYPE = constants.HTYPE_INSTANCE
4562   _OP_REQP = ["instance_name", "hvparams"]
4563   REQ_BGL = False
4564
4565   def ExpandNames(self):
4566     self._ExpandAndLockInstance()
4567     self.needed_locks[locking.LEVEL_NODE] = []
4568     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4569
4570
4571   def DeclareLocks(self, level):
4572     if level == locking.LEVEL_NODE:
4573       self._LockInstancesNodes()
4574
4575   def BuildHooksEnv(self):
4576     """Build hooks env.
4577
4578     This runs on the master, primary and secondaries.
4579
4580     """
4581     args = dict()
4582     if constants.BE_MEMORY in self.be_new:
4583       args['memory'] = self.be_new[constants.BE_MEMORY]
4584     if constants.BE_VCPUS in self.be_new:
4585       args['vcpus'] = self.be_new[constants.BE_VCPUS]
4586     if self.do_ip or self.do_bridge or self.mac:
4587       if self.do_ip:
4588         ip = self.ip
4589       else:
4590         ip = self.instance.nics[0].ip
4591       if self.bridge:
4592         bridge = self.bridge
4593       else:
4594         bridge = self.instance.nics[0].bridge
4595       if self.mac:
4596         mac = self.mac
4597       else:
4598         mac = self.instance.nics[0].mac
4599       args['nics'] = [(ip, bridge, mac)]
4600     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4601     nl = [self.cfg.GetMasterNode(),
4602           self.instance.primary_node] + list(self.instance.secondary_nodes)
4603     return env, nl, nl
4604
4605   def CheckPrereq(self):
4606     """Check prerequisites.
4607
4608     This only checks the instance list against the existing names.
4609
4610     """
4611     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4612     # a separate CheckArguments function, if we implement one, so the operation
4613     # can be aborted without waiting for any lock, should it have an error...
4614     self.ip = getattr(self.op, "ip", None)
4615     self.mac = getattr(self.op, "mac", None)
4616     self.bridge = getattr(self.op, "bridge", None)
4617     self.kernel_path = getattr(self.op, "kernel_path", None)
4618     self.initrd_path = getattr(self.op, "initrd_path", None)
4619     self.force = getattr(self.op, "force", None)
4620     all_parms = [self.ip, self.bridge, self.mac]
4621     if (all_parms.count(None) == len(all_parms) and
4622         not self.op.hvparams and
4623         not self.op.beparams):
4624       raise errors.OpPrereqError("No changes submitted")
4625     for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4626       val = self.op.beparams.get(item, None)
4627       if val is not None:
4628         try:
4629           val = int(val)
4630         except ValueError, err:
4631           raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4632         self.op.beparams[item] = val
4633     if self.ip is not None:
4634       self.do_ip = True
4635       if self.ip.lower() == "none":
4636         self.ip = None
4637       else:
4638         if not utils.IsValidIP(self.ip):
4639           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4640     else:
4641       self.do_ip = False
4642     self.do_bridge = (self.bridge is not None)
4643     if self.mac is not None:
4644       if self.cfg.IsMacInUse(self.mac):
4645         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4646                                    self.mac)
4647       if not utils.IsValidMac(self.mac):
4648         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4649
4650     # checking the new params on the primary/secondary nodes
4651
4652     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4653     assert self.instance is not None, \
4654       "Cannot retrieve locked instance %s" % self.op.instance_name
4655     pnode = self.instance.primary_node
4656     nodelist = [pnode]
4657     nodelist.extend(instance.secondary_nodes)
4658
4659     # hvparams processing
4660     if self.op.hvparams:
4661       i_hvdict = copy.deepcopy(instance.hvparams)
4662       for key, val in self.op.hvparams.iteritems():
4663         if val is None:
4664           try:
4665             del i_hvdict[key]
4666           except KeyError:
4667             pass
4668         else:
4669           i_hvdict[key] = val
4670       cluster = self.cfg.GetClusterInfo()
4671       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4672                                 i_hvdict)
4673       # local check
4674       hypervisor.GetHypervisor(
4675         instance.hypervisor).CheckParameterSyntax(hv_new)
4676       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4677       self.hv_new = hv_new # the new actual values
4678       self.hv_inst = i_hvdict # the new dict (without defaults)
4679     else:
4680       self.hv_new = self.hv_inst = {}
4681
4682     # beparams processing
4683     if self.op.beparams:
4684       i_bedict = copy.deepcopy(instance.beparams)
4685       for key, val in self.op.beparams.iteritems():
4686         if val is None:
4687           try:
4688             del i_bedict[key]
4689           except KeyError:
4690             pass
4691         else:
4692           i_bedict[key] = val
4693       cluster = self.cfg.GetClusterInfo()
4694       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4695                                 i_bedict)
4696       self.be_new = be_new # the new actual values
4697       self.be_inst = i_bedict # the new dict (without defaults)
4698     else:
4699       self.hv_new = self.hv_inst = {}
4700
4701     self.warn = []
4702
4703     if constants.BE_MEMORY in self.op.beparams and not self.force:
4704       mem_check_list = [pnode]
4705       if be_new[constants.BE_AUTO_BALANCE]:
4706         # either we changed auto_balance to yes or it was from before
4707         mem_check_list.extend(instance.secondary_nodes)
4708       instance_info = self.rpc.call_instance_info(pnode, instance.name,
4709                                                   instance.hypervisor)
4710       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4711                                          instance.hypervisor)
4712
4713       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4714         # Assume the primary node is unreachable and go ahead
4715         self.warn.append("Can't get info from primary node %s" % pnode)
4716       else:
4717         if instance_info:
4718           current_mem = instance_info['memory']
4719         else:
4720           # Assume instance not running
4721           # (there is a slight race condition here, but it's not very probable,
4722           # and we have no other way to check)
4723           current_mem = 0
4724         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4725                     nodeinfo[pnode]['memory_free'])
4726         if miss_mem > 0:
4727           raise errors.OpPrereqError("This change will prevent the instance"
4728                                      " from starting, due to %d MB of memory"
4729                                      " missing on its primary node" % miss_mem)
4730
4731       if be_new[constants.BE_AUTO_BALANCE]:
4732         for node in instance.secondary_nodes:
4733           if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4734             self.warn.append("Can't get info from secondary node %s" % node)
4735           elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4736             self.warn.append("Not enough memory to failover instance to"
4737                              " secondary node %s" % node)
4738
4739     return
4740
4741   def Exec(self, feedback_fn):
4742     """Modifies an instance.
4743
4744     All parameters take effect only at the next restart of the instance.
4745     """
4746     # Process here the warnings from CheckPrereq, as we don't have a
4747     # feedback_fn there.
4748     for warn in self.warn:
4749       feedback_fn("WARNING: %s" % warn)
4750
4751     result = []
4752     instance = self.instance
4753     if self.do_ip:
4754       instance.nics[0].ip = self.ip
4755       result.append(("ip", self.ip))
4756     if self.bridge:
4757       instance.nics[0].bridge = self.bridge
4758       result.append(("bridge", self.bridge))
4759     if self.mac:
4760       instance.nics[0].mac = self.mac
4761       result.append(("mac", self.mac))
4762     if self.op.hvparams:
4763       instance.hvparams = self.hv_new
4764       for key, val in self.op.hvparams.iteritems():
4765         result.append(("hv/%s" % key, val))
4766     if self.op.beparams:
4767       instance.beparams = self.be_inst
4768       for key, val in self.op.beparams.iteritems():
4769         result.append(("be/%s" % key, val))
4770
4771     self.cfg.Update(instance)
4772
4773     return result
4774
4775
4776 class LUQueryExports(NoHooksLU):
4777   """Query the exports list
4778
4779   """
4780   _OP_REQP = ['nodes']
4781   REQ_BGL = False
4782
4783   def ExpandNames(self):
4784     self.needed_locks = {}
4785     self.share_locks[locking.LEVEL_NODE] = 1
4786     if not self.op.nodes:
4787       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4788     else:
4789       self.needed_locks[locking.LEVEL_NODE] = \
4790         _GetWantedNodes(self, self.op.nodes)
4791
4792   def CheckPrereq(self):
4793     """Check prerequisites.
4794
4795     """
4796     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4797
4798   def Exec(self, feedback_fn):
4799     """Compute the list of all the exported system images.
4800
4801     Returns:
4802       a dictionary with the structure node->(export-list)
4803       where export-list is a list of the instances exported on
4804       that node.
4805
4806     """
4807     return self.rpc.call_export_list(self.nodes)
4808
4809
4810 class LUExportInstance(LogicalUnit):
4811   """Export an instance to an image in the cluster.
4812
4813   """
4814   HPATH = "instance-export"
4815   HTYPE = constants.HTYPE_INSTANCE
4816   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4817   REQ_BGL = False
4818
4819   def ExpandNames(self):
4820     self._ExpandAndLockInstance()
4821     # FIXME: lock only instance primary and destination node
4822     #
4823     # Sad but true, for now we have do lock all nodes, as we don't know where
4824     # the previous export might be, and and in this LU we search for it and
4825     # remove it from its current node. In the future we could fix this by:
4826     #  - making a tasklet to search (share-lock all), then create the new one,
4827     #    then one to remove, after
4828     #  - removing the removal operation altoghether
4829     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4830
4831   def DeclareLocks(self, level):
4832     """Last minute lock declaration."""
4833     # All nodes are locked anyway, so nothing to do here.
4834
4835   def BuildHooksEnv(self):
4836     """Build hooks env.
4837
4838     This will run on the master, primary node and target node.
4839
4840     """
4841     env = {
4842       "EXPORT_NODE": self.op.target_node,
4843       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4844       }
4845     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4846     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4847           self.op.target_node]
4848     return env, nl, nl
4849
4850   def CheckPrereq(self):
4851     """Check prerequisites.
4852
4853     This checks that the instance and node names are valid.
4854
4855     """
4856     instance_name = self.op.instance_name
4857     self.instance = self.cfg.GetInstanceInfo(instance_name)
4858     assert self.instance is not None, \
4859           "Cannot retrieve locked instance %s" % self.op.instance_name
4860
4861     self.dst_node = self.cfg.GetNodeInfo(
4862       self.cfg.ExpandNodeName(self.op.target_node))
4863
4864     assert self.dst_node is not None, \
4865           "Cannot retrieve locked node %s" % self.op.target_node
4866
4867     # instance disk type verification
4868     for disk in self.instance.disks:
4869       if disk.dev_type == constants.LD_FILE:
4870         raise errors.OpPrereqError("Export not supported for instances with"
4871                                    " file-based disks")
4872
4873   def Exec(self, feedback_fn):
4874     """Export an instance to an image in the cluster.
4875
4876     """
4877     instance = self.instance
4878     dst_node = self.dst_node
4879     src_node = instance.primary_node
4880     if self.op.shutdown:
4881       # shutdown the instance, but not the disks
4882       if not self.rpc.call_instance_shutdown(src_node, instance):
4883         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4884                                  (instance.name, src_node))
4885
4886     vgname = self.cfg.GetVGName()
4887
4888     snap_disks = []
4889
4890     try:
4891       for disk in instance.disks:
4892         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4893         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4894
4895         if not new_dev_name:
4896           self.LogWarning("Could not snapshot block device %s on node %s",
4897                           disk.logical_id[1], src_node)
4898           snap_disks.append(False)
4899         else:
4900           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4901                                  logical_id=(vgname, new_dev_name),
4902                                  physical_id=(vgname, new_dev_name),
4903                                  iv_name=disk.iv_name)
4904           snap_disks.append(new_dev)
4905
4906     finally:
4907       if self.op.shutdown and instance.status == "up":
4908         if not self.rpc.call_instance_start(src_node, instance, None):
4909           _ShutdownInstanceDisks(self, instance)
4910           raise errors.OpExecError("Could not start instance")
4911
4912     # TODO: check for size
4913
4914     cluster_name = self.cfg.GetClusterName()
4915     for dev in snap_disks:
4916       if dev:
4917         if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4918                                              instance, cluster_name):
4919           self.LogWarning("Could not export block device %s from node %s to"
4920                           " node %s", dev.logical_id[1], src_node,
4921                           dst_node.name)
4922         if not self.rpc.call_blockdev_remove(src_node, dev):
4923           self.LogWarning("Could not remove snapshot block device %s from node"
4924                           " %s", dev.logical_id[1], src_node)
4925
4926     if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4927       self.LogWarning("Could not finalize export for instance %s on node %s",
4928                       instance.name, dst_node.name)
4929
4930     nodelist = self.cfg.GetNodeList()
4931     nodelist.remove(dst_node.name)
4932
4933     # on one-node clusters nodelist will be empty after the removal
4934     # if we proceed the backup would be removed because OpQueryExports
4935     # substitutes an empty list with the full cluster node list.
4936     if nodelist:
4937       exportlist = self.rpc.call_export_list(nodelist)
4938       for node in exportlist:
4939         if instance.name in exportlist[node]:
4940           if not self.rpc.call_export_remove(node, instance.name):
4941             self.LogWarning("Could not remove older export for instance %s"
4942                             " on node %s", instance.name, node)
4943
4944
4945 class LURemoveExport(NoHooksLU):
4946   """Remove exports related to the named instance.
4947
4948   """
4949   _OP_REQP = ["instance_name"]
4950   REQ_BGL = False
4951
4952   def ExpandNames(self):
4953     self.needed_locks = {}
4954     # We need all nodes to be locked in order for RemoveExport to work, but we
4955     # don't need to lock the instance itself, as nothing will happen to it (and
4956     # we can remove exports also for a removed instance)
4957     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4958
4959   def CheckPrereq(self):
4960     """Check prerequisites.
4961     """
4962     pass
4963
4964   def Exec(self, feedback_fn):
4965     """Remove any export.
4966
4967     """
4968     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4969     # If the instance was not found we'll try with the name that was passed in.
4970     # This will only work if it was an FQDN, though.
4971     fqdn_warn = False
4972     if not instance_name:
4973       fqdn_warn = True
4974       instance_name = self.op.instance_name
4975
4976     exportlist = self.rpc.call_export_list(self.acquired_locks[
4977       locking.LEVEL_NODE])
4978     found = False
4979     for node in exportlist:
4980       if instance_name in exportlist[node]:
4981         found = True
4982         if not self.rpc.call_export_remove(node, instance_name):
4983           logging.error("Could not remove export for instance %s"
4984                         " on node %s", instance_name, node)
4985
4986     if fqdn_warn and not found:
4987       feedback_fn("Export not found. If trying to remove an export belonging"
4988                   " to a deleted instance please use its Fully Qualified"
4989                   " Domain Name.")
4990
4991
4992 class TagsLU(NoHooksLU):
4993   """Generic tags LU.
4994
4995   This is an abstract class which is the parent of all the other tags LUs.
4996
4997   """
4998
4999   def ExpandNames(self):
5000     self.needed_locks = {}
5001     if self.op.kind == constants.TAG_NODE:
5002       name = self.cfg.ExpandNodeName(self.op.name)
5003       if name is None:
5004         raise errors.OpPrereqError("Invalid node name (%s)" %
5005                                    (self.op.name,))
5006       self.op.name = name
5007       self.needed_locks[locking.LEVEL_NODE] = name
5008     elif self.op.kind == constants.TAG_INSTANCE:
5009       name = self.cfg.ExpandInstanceName(self.op.name)
5010       if name is None:
5011         raise errors.OpPrereqError("Invalid instance name (%s)" %
5012                                    (self.op.name,))
5013       self.op.name = name
5014       self.needed_locks[locking.LEVEL_INSTANCE] = name
5015
5016   def CheckPrereq(self):
5017     """Check prerequisites.
5018
5019     """
5020     if self.op.kind == constants.TAG_CLUSTER:
5021       self.target = self.cfg.GetClusterInfo()
5022     elif self.op.kind == constants.TAG_NODE:
5023       self.target = self.cfg.GetNodeInfo(self.op.name)
5024     elif self.op.kind == constants.TAG_INSTANCE:
5025       self.target = self.cfg.GetInstanceInfo(self.op.name)
5026     else:
5027       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5028                                  str(self.op.kind))
5029
5030
5031 class LUGetTags(TagsLU):
5032   """Returns the tags of a given object.
5033
5034   """
5035   _OP_REQP = ["kind", "name"]
5036   REQ_BGL = False
5037
5038   def Exec(self, feedback_fn):
5039     """Returns the tag list.
5040
5041     """
5042     return list(self.target.GetTags())
5043
5044
5045 class LUSearchTags(NoHooksLU):
5046   """Searches the tags for a given pattern.
5047
5048   """
5049   _OP_REQP = ["pattern"]
5050   REQ_BGL = False
5051
5052   def ExpandNames(self):
5053     self.needed_locks = {}
5054
5055   def CheckPrereq(self):
5056     """Check prerequisites.
5057
5058     This checks the pattern passed for validity by compiling it.
5059
5060     """
5061     try:
5062       self.re = re.compile(self.op.pattern)
5063     except re.error, err:
5064       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5065                                  (self.op.pattern, err))
5066
5067   def Exec(self, feedback_fn):
5068     """Returns the tag list.
5069
5070     """
5071     cfg = self.cfg
5072     tgts = [("/cluster", cfg.GetClusterInfo())]
5073     ilist = cfg.GetAllInstancesInfo().values()
5074     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5075     nlist = cfg.GetAllNodesInfo().values()
5076     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5077     results = []
5078     for path, target in tgts:
5079       for tag in target.GetTags():
5080         if self.re.search(tag):
5081           results.append((path, tag))
5082     return results
5083
5084
5085 class LUAddTags(TagsLU):
5086   """Sets a tag on a given object.
5087
5088   """
5089   _OP_REQP = ["kind", "name", "tags"]
5090   REQ_BGL = False
5091
5092   def CheckPrereq(self):
5093     """Check prerequisites.
5094
5095     This checks the type and length of the tag name and value.
5096
5097     """
5098     TagsLU.CheckPrereq(self)
5099     for tag in self.op.tags:
5100       objects.TaggableObject.ValidateTag(tag)
5101
5102   def Exec(self, feedback_fn):
5103     """Sets the tag.
5104
5105     """
5106     try:
5107       for tag in self.op.tags:
5108         self.target.AddTag(tag)
5109     except errors.TagError, err:
5110       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5111     try:
5112       self.cfg.Update(self.target)
5113     except errors.ConfigurationError:
5114       raise errors.OpRetryError("There has been a modification to the"
5115                                 " config file and the operation has been"
5116                                 " aborted. Please retry.")
5117
5118
5119 class LUDelTags(TagsLU):
5120   """Delete a list of tags from a given object.
5121
5122   """
5123   _OP_REQP = ["kind", "name", "tags"]
5124   REQ_BGL = False
5125
5126   def CheckPrereq(self):
5127     """Check prerequisites.
5128
5129     This checks that we have the given tag.
5130
5131     """
5132     TagsLU.CheckPrereq(self)
5133     for tag in self.op.tags:
5134       objects.TaggableObject.ValidateTag(tag)
5135     del_tags = frozenset(self.op.tags)
5136     cur_tags = self.target.GetTags()
5137     if not del_tags <= cur_tags:
5138       diff_tags = del_tags - cur_tags
5139       diff_names = ["'%s'" % tag for tag in diff_tags]
5140       diff_names.sort()
5141       raise errors.OpPrereqError("Tag(s) %s not found" %
5142                                  (",".join(diff_names)))
5143
5144   def Exec(self, feedback_fn):
5145     """Remove the tag from the object.
5146
5147     """
5148     for tag in self.op.tags:
5149       self.target.RemoveTag(tag)
5150     try:
5151       self.cfg.Update(self.target)
5152     except errors.ConfigurationError:
5153       raise errors.OpRetryError("There has been a modification to the"
5154                                 " config file and the operation has been"
5155                                 " aborted. Please retry.")
5156
5157
5158 class LUTestDelay(NoHooksLU):
5159   """Sleep for a specified amount of time.
5160
5161   This LU sleeps on the master and/or nodes for a specified amount of
5162   time.
5163
5164   """
5165   _OP_REQP = ["duration", "on_master", "on_nodes"]
5166   REQ_BGL = False
5167
5168   def ExpandNames(self):
5169     """Expand names and set required locks.
5170
5171     This expands the node list, if any.
5172
5173     """
5174     self.needed_locks = {}
5175     if self.op.on_nodes:
5176       # _GetWantedNodes can be used here, but is not always appropriate to use
5177       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5178       # more information.
5179       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5180       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5181
5182   def CheckPrereq(self):
5183     """Check prerequisites.
5184
5185     """
5186
5187   def Exec(self, feedback_fn):
5188     """Do the actual sleep.
5189
5190     """
5191     if self.op.on_master:
5192       if not utils.TestDelay(self.op.duration):
5193         raise errors.OpExecError("Error during master delay test")
5194     if self.op.on_nodes:
5195       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5196       if not result:
5197         raise errors.OpExecError("Complete failure from rpc call")
5198       for node, node_result in result.items():
5199         if not node_result:
5200           raise errors.OpExecError("Failure during rpc call to node %s,"
5201                                    " result: %s" % (node, node_result))
5202
5203
5204 class IAllocator(object):
5205   """IAllocator framework.
5206
5207   An IAllocator instance has three sets of attributes:
5208     - cfg that is needed to query the cluster
5209     - input data (all members of the _KEYS class attribute are required)
5210     - four buffer attributes (in|out_data|text), that represent the
5211       input (to the external script) in text and data structure format,
5212       and the output from it, again in two formats
5213     - the result variables from the script (success, info, nodes) for
5214       easy usage
5215
5216   """
5217   _ALLO_KEYS = [
5218     "mem_size", "disks", "disk_template",
5219     "os", "tags", "nics", "vcpus",
5220     ]
5221   _RELO_KEYS = [
5222     "relocate_from",
5223     ]
5224
5225   def __init__(self, lu, mode, name, **kwargs):
5226     self.lu = lu
5227     # init buffer variables
5228     self.in_text = self.out_text = self.in_data = self.out_data = None
5229     # init all input fields so that pylint is happy
5230     self.mode = mode
5231     self.name = name
5232     self.mem_size = self.disks = self.disk_template = None
5233     self.os = self.tags = self.nics = self.vcpus = None
5234     self.relocate_from = None
5235     # computed fields
5236     self.required_nodes = None
5237     # init result fields
5238     self.success = self.info = self.nodes = None
5239     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5240       keyset = self._ALLO_KEYS
5241     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5242       keyset = self._RELO_KEYS
5243     else:
5244       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5245                                    " IAllocator" % self.mode)
5246     for key in kwargs:
5247       if key not in keyset:
5248         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5249                                      " IAllocator" % key)
5250       setattr(self, key, kwargs[key])
5251     for key in keyset:
5252       if key not in kwargs:
5253         raise errors.ProgrammerError("Missing input parameter '%s' to"
5254                                      " IAllocator" % key)
5255     self._BuildInputData()
5256
5257   def _ComputeClusterData(self):
5258     """Compute the generic allocator input data.
5259
5260     This is the data that is independent of the actual operation.
5261
5262     """
5263     cfg = self.lu.cfg
5264     cluster_info = cfg.GetClusterInfo()
5265     # cluster data
5266     data = {
5267       "version": 1,
5268       "cluster_name": cfg.GetClusterName(),
5269       "cluster_tags": list(cluster_info.GetTags()),
5270       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5271       # we don't have job IDs
5272       }
5273
5274     i_list = []
5275     cluster = self.cfg.GetClusterInfo()
5276     for iname in cfg.GetInstanceList():
5277       i_obj = cfg.GetInstanceInfo(iname)
5278       i_list.append((i_obj, cluster.FillBE(i_obj)))
5279
5280     # node data
5281     node_results = {}
5282     node_list = cfg.GetNodeList()
5283     # FIXME: here we have only one hypervisor information, but
5284     # instance can belong to different hypervisors
5285     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5286                                            cfg.GetHypervisorType())
5287     for nname in node_list:
5288       ninfo = cfg.GetNodeInfo(nname)
5289       if nname not in node_data or not isinstance(node_data[nname], dict):
5290         raise errors.OpExecError("Can't get data for node %s" % nname)
5291       remote_info = node_data[nname]
5292       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5293                    'vg_size', 'vg_free', 'cpu_total']:
5294         if attr not in remote_info:
5295           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5296                                    (nname, attr))
5297         try:
5298           remote_info[attr] = int(remote_info[attr])
5299         except ValueError, err:
5300           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5301                                    " %s" % (nname, attr, str(err)))
5302       # compute memory used by primary instances
5303       i_p_mem = i_p_up_mem = 0
5304       for iinfo, beinfo in i_list:
5305         if iinfo.primary_node == nname:
5306           i_p_mem += beinfo[constants.BE_MEMORY]
5307           if iinfo.status == "up":
5308             i_p_up_mem += beinfo[constants.BE_MEMORY]
5309
5310       # compute memory used by instances
5311       pnr = {
5312         "tags": list(ninfo.GetTags()),
5313         "total_memory": remote_info['memory_total'],
5314         "reserved_memory": remote_info['memory_dom0'],
5315         "free_memory": remote_info['memory_free'],
5316         "i_pri_memory": i_p_mem,
5317         "i_pri_up_memory": i_p_up_mem,
5318         "total_disk": remote_info['vg_size'],
5319         "free_disk": remote_info['vg_free'],
5320         "primary_ip": ninfo.primary_ip,
5321         "secondary_ip": ninfo.secondary_ip,
5322         "total_cpus": remote_info['cpu_total'],
5323         }
5324       node_results[nname] = pnr
5325     data["nodes"] = node_results
5326
5327     # instance data
5328     instance_data = {}
5329     for iinfo, beinfo in i_list:
5330       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5331                   for n in iinfo.nics]
5332       pir = {
5333         "tags": list(iinfo.GetTags()),
5334         "should_run": iinfo.status == "up",
5335         "vcpus": beinfo[constants.BE_VCPUS],
5336         "memory": beinfo[constants.BE_MEMORY],
5337         "os": iinfo.os,
5338         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5339         "nics": nic_data,
5340         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5341         "disk_template": iinfo.disk_template,
5342         "hypervisor": iinfo.hypervisor,
5343         }
5344       instance_data[iinfo.name] = pir
5345
5346     data["instances"] = instance_data
5347
5348     self.in_data = data
5349
5350   def _AddNewInstance(self):
5351     """Add new instance data to allocator structure.
5352
5353     This in combination with _AllocatorGetClusterData will create the
5354     correct structure needed as input for the allocator.
5355
5356     The checks for the completeness of the opcode must have already been
5357     done.
5358
5359     """
5360     data = self.in_data
5361     if len(self.disks) != 2:
5362       raise errors.OpExecError("Only two-disk configurations supported")
5363
5364     disk_space = _ComputeDiskSize(self.disk_template,
5365                                   self.disks[0]["size"], self.disks[1]["size"])
5366
5367     if self.disk_template in constants.DTS_NET_MIRROR:
5368       self.required_nodes = 2
5369     else:
5370       self.required_nodes = 1
5371     request = {
5372       "type": "allocate",
5373       "name": self.name,
5374       "disk_template": self.disk_template,
5375       "tags": self.tags,
5376       "os": self.os,
5377       "vcpus": self.vcpus,
5378       "memory": self.mem_size,
5379       "disks": self.disks,
5380       "disk_space_total": disk_space,
5381       "nics": self.nics,
5382       "required_nodes": self.required_nodes,
5383       }
5384     data["request"] = request
5385
5386   def _AddRelocateInstance(self):
5387     """Add relocate instance data to allocator structure.
5388
5389     This in combination with _IAllocatorGetClusterData will create the
5390     correct structure needed as input for the allocator.
5391
5392     The checks for the completeness of the opcode must have already been
5393     done.
5394
5395     """
5396     instance = self.lu.cfg.GetInstanceInfo(self.name)
5397     if instance is None:
5398       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5399                                    " IAllocator" % self.name)
5400
5401     if instance.disk_template not in constants.DTS_NET_MIRROR:
5402       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5403
5404     if len(instance.secondary_nodes) != 1:
5405       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5406
5407     self.required_nodes = 1
5408
5409     disk_space = _ComputeDiskSize(instance.disk_template,
5410                                   instance.disks[0].size,
5411                                   instance.disks[1].size)
5412
5413     request = {
5414       "type": "relocate",
5415       "name": self.name,
5416       "disk_space_total": disk_space,
5417       "required_nodes": self.required_nodes,
5418       "relocate_from": self.relocate_from,
5419       }
5420     self.in_data["request"] = request
5421
5422   def _BuildInputData(self):
5423     """Build input data structures.
5424
5425     """
5426     self._ComputeClusterData()
5427
5428     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5429       self._AddNewInstance()
5430     else:
5431       self._AddRelocateInstance()
5432
5433     self.in_text = serializer.Dump(self.in_data)
5434
5435   def Run(self, name, validate=True, call_fn=None):
5436     """Run an instance allocator and return the results.
5437
5438     """
5439     if call_fn is None:
5440       call_fn = self.lu.rpc.call_iallocator_runner
5441     data = self.in_text
5442
5443     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5444
5445     if not isinstance(result, (list, tuple)) or len(result) != 4:
5446       raise errors.OpExecError("Invalid result from master iallocator runner")
5447
5448     rcode, stdout, stderr, fail = result
5449
5450     if rcode == constants.IARUN_NOTFOUND:
5451       raise errors.OpExecError("Can't find allocator '%s'" % name)
5452     elif rcode == constants.IARUN_FAILURE:
5453       raise errors.OpExecError("Instance allocator call failed: %s,"
5454                                " output: %s" % (fail, stdout+stderr))
5455     self.out_text = stdout
5456     if validate:
5457       self._ValidateResult()
5458
5459   def _ValidateResult(self):
5460     """Process the allocator results.
5461
5462     This will process and if successful save the result in
5463     self.out_data and the other parameters.
5464
5465     """
5466     try:
5467       rdict = serializer.Load(self.out_text)
5468     except Exception, err:
5469       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5470
5471     if not isinstance(rdict, dict):
5472       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5473
5474     for key in "success", "info", "nodes":
5475       if key not in rdict:
5476         raise errors.OpExecError("Can't parse iallocator results:"
5477                                  " missing key '%s'" % key)
5478       setattr(self, key, rdict[key])
5479
5480     if not isinstance(rdict["nodes"], list):
5481       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5482                                " is not a list")
5483     self.out_data = rdict
5484
5485
5486 class LUTestAllocator(NoHooksLU):
5487   """Run allocator tests.
5488
5489   This LU runs the allocator tests
5490
5491   """
5492   _OP_REQP = ["direction", "mode", "name"]
5493
5494   def CheckPrereq(self):
5495     """Check prerequisites.
5496
5497     This checks the opcode parameters depending on the director and mode test.
5498
5499     """
5500     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5501       for attr in ["name", "mem_size", "disks", "disk_template",
5502                    "os", "tags", "nics", "vcpus"]:
5503         if not hasattr(self.op, attr):
5504           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5505                                      attr)
5506       iname = self.cfg.ExpandInstanceName(self.op.name)
5507       if iname is not None:
5508         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5509                                    iname)
5510       if not isinstance(self.op.nics, list):
5511         raise errors.OpPrereqError("Invalid parameter 'nics'")
5512       for row in self.op.nics:
5513         if (not isinstance(row, dict) or
5514             "mac" not in row or
5515             "ip" not in row or
5516             "bridge" not in row):
5517           raise errors.OpPrereqError("Invalid contents of the"
5518                                      " 'nics' parameter")
5519       if not isinstance(self.op.disks, list):
5520         raise errors.OpPrereqError("Invalid parameter 'disks'")
5521       if len(self.op.disks) != 2:
5522         raise errors.OpPrereqError("Only two-disk configurations supported")
5523       for row in self.op.disks:
5524         if (not isinstance(row, dict) or
5525             "size" not in row or
5526             not isinstance(row["size"], int) or
5527             "mode" not in row or
5528             row["mode"] not in ['r', 'w']):
5529           raise errors.OpPrereqError("Invalid contents of the"
5530                                      " 'disks' parameter")
5531     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5532       if not hasattr(self.op, "name"):
5533         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5534       fname = self.cfg.ExpandInstanceName(self.op.name)
5535       if fname is None:
5536         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5537                                    self.op.name)
5538       self.op.name = fname
5539       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5540     else:
5541       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5542                                  self.op.mode)
5543
5544     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5545       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5546         raise errors.OpPrereqError("Missing allocator name")
5547     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5548       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5549                                  self.op.direction)
5550
5551   def Exec(self, feedback_fn):
5552     """Run the allocator test.
5553
5554     """
5555     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5556       ial = IAllocator(self,
5557                        mode=self.op.mode,
5558                        name=self.op.name,
5559                        mem_size=self.op.mem_size,
5560                        disks=self.op.disks,
5561                        disk_template=self.op.disk_template,
5562                        os=self.op.os,
5563                        tags=self.op.tags,
5564                        nics=self.op.nics,
5565                        vcpus=self.op.vcpus,
5566                        )
5567     else:
5568       ial = IAllocator(self,
5569                        mode=self.op.mode,
5570                        name=self.op.name,
5571                        relocate_from=list(self.relocate_from),
5572                        )
5573
5574     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5575       result = ial.in_text
5576     else:
5577       ial.Run(self.op.allocator, validate=False)
5578       result = ial.out_text
5579     return result