7ffdc8996331470543482af1e095395c4e4eef88
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_MASTER: the LU needs to run on the master node
59         REQ_WSSTORE: the LU needs a writable SimpleStore
60         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61
62   Note that all commands require root permissions.
63
64   """
65   HPATH = None
66   HTYPE = None
67   _OP_REQP = []
68   REQ_MASTER = True
69   REQ_WSSTORE = False
70   REQ_BGL = True
71
72   def __init__(self, processor, op, context, sstore):
73     """Constructor for LogicalUnit.
74
75     This needs to be overriden in derived classes in order to check op
76     validity.
77
78     """
79     self.proc = processor
80     self.op = op
81     self.cfg = context.cfg
82     self.sstore = sstore
83     self.context = context
84     # Dicts used to declare locking needs to mcpu
85     self.needed_locks = None
86     self.acquired_locks = {}
87     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
88     self.add_locks = {}
89     self.remove_locks = {}
90     # Used to force good behavior when calling helper functions
91     self.recalculate_locks = {}
92     self.__ssh = None
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99
100     if not self.cfg.IsCluster():
101       raise errors.OpPrereqError("Cluster not initialized yet,"
102                                  " use 'gnt-cluster init' first.")
103     if self.REQ_MASTER:
104       master = self.cfg.GetMasterNode()
105       if master != utils.HostInfo().name:
106         raise errors.OpPrereqError("Commands must be run on the master"
107                                    " node %s" % master)
108
109   def __GetSSH(self):
110     """Returns the SshRunner object
111
112     """
113     if not self.__ssh:
114       self.__ssh = ssh.SshRunner(self.cfg)
115     return self.__ssh
116
117   ssh = property(fget=__GetSSH)
118
119   def ExpandNames(self):
120     """Expand names for this LU.
121
122     This method is called before starting to execute the opcode, and it should
123     update all the parameters of the opcode to their canonical form (e.g. a
124     short node name must be fully expanded after this method has successfully
125     completed). This way locking, hooks, logging, ecc. can work correctly.
126
127     LUs which implement this method must also populate the self.needed_locks
128     member, as a dict with lock levels as keys, and a list of needed lock names
129     as values. Rules:
130       - Use an empty dict if you don't need any lock
131       - If you don't need any lock at a particular level omit that level
132       - Don't put anything for the BGL level
133       - If you want all locks at a level use locking.ALL_SET as a value
134
135     If you need to share locks (rather than acquire them exclusively) at one
136     level you can modify self.share_locks, setting a true value (usually 1) for
137     that level. By default locks are not shared.
138
139     Examples:
140     # Acquire all nodes and one instance
141     self.needed_locks = {
142       locking.LEVEL_NODE: locking.ALL_SET,
143       locking.LEVEL_INSTANCE: ['instance1.example.tld'],
144     }
145     # Acquire just two nodes
146     self.needed_locks = {
147       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148     }
149     # Acquire no locks
150     self.needed_locks = {} # No, you can't leave it to the default value None
151
152     """
153     # The implementation of this method is mandatory only if the new LU is
154     # concurrent, so that old LUs don't need to be changed all at the same
155     # time.
156     if self.REQ_BGL:
157       self.needed_locks = {} # Exclusive LUs don't need locks.
158     else:
159       raise NotImplementedError
160
161   def DeclareLocks(self, level):
162     """Declare LU locking needs for a level
163
164     While most LUs can just declare their locking needs at ExpandNames time,
165     sometimes there's the need to calculate some locks after having acquired
166     the ones before. This function is called just before acquiring locks at a
167     particular level, but after acquiring the ones at lower levels, and permits
168     such calculations. It can be used to modify self.needed_locks, and by
169     default it does nothing.
170
171     This function is only called if you have something already set in
172     self.needed_locks for the level.
173
174     @param level: Locking level which is going to be locked
175     @type level: member of ganeti.locking.LEVELS
176
177     """
178
179   def CheckPrereq(self):
180     """Check prerequisites for this LU.
181
182     This method should check that the prerequisites for the execution
183     of this LU are fulfilled. It can do internode communication, but
184     it should be idempotent - no cluster or system changes are
185     allowed.
186
187     The method should raise errors.OpPrereqError in case something is
188     not fulfilled. Its return value is ignored.
189
190     This method should also update all the parameters of the opcode to
191     their canonical form if it hasn't been done by ExpandNames before.
192
193     """
194     raise NotImplementedError
195
196   def Exec(self, feedback_fn):
197     """Execute the LU.
198
199     This method should implement the actual work. It should raise
200     errors.OpExecError for failures that are somewhat dealt with in
201     code, or expected.
202
203     """
204     raise NotImplementedError
205
206   def BuildHooksEnv(self):
207     """Build hooks environment for this LU.
208
209     This method should return a three-node tuple consisting of: a dict
210     containing the environment that will be used for running the
211     specific hook for this LU, a list of node names on which the hook
212     should run before the execution, and a list of node names on which
213     the hook should run after the execution.
214
215     The keys of the dict must not have 'GANETI_' prefixed as this will
216     be handled in the hooks runner. Also note additional keys will be
217     added by the hooks runner. If the LU doesn't define any
218     environment, an empty dict (and not None) should be returned.
219
220     No nodes should be returned as an empty list (and not None).
221
222     Note that if the HPATH for a LU class is None, this function will
223     not be called.
224
225     """
226     raise NotImplementedError
227
228   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
229     """Notify the LU about the results of its hooks.
230
231     This method is called every time a hooks phase is executed, and notifies
232     the Logical Unit about the hooks' result. The LU can then use it to alter
233     its result based on the hooks.  By default the method does nothing and the
234     previous result is passed back unchanged but any LU can define it if it
235     wants to use the local cluster hook-scripts somehow.
236
237     Args:
238       phase: the hooks phase that has just been run
239       hooks_results: the results of the multi-node hooks rpc call
240       feedback_fn: function to send feedback back to the caller
241       lu_result: the previous result this LU had, or None in the PRE phase.
242
243     """
244     return lu_result
245
246   def _ExpandAndLockInstance(self):
247     """Helper function to expand and lock an instance.
248
249     Many LUs that work on an instance take its name in self.op.instance_name
250     and need to expand it and then declare the expanded name for locking. This
251     function does it, and then updates self.op.instance_name to the expanded
252     name. It also initializes needed_locks as a dict, if this hasn't been done
253     before.
254
255     """
256     if self.needed_locks is None:
257       self.needed_locks = {}
258     else:
259       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
260         "_ExpandAndLockInstance called with instance-level locks set"
261     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
262     if expanded_name is None:
263       raise errors.OpPrereqError("Instance '%s' not known" %
264                                   self.op.instance_name)
265     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
266     self.op.instance_name = expanded_name
267
268   def _LockInstancesNodes(self, primary_only=False):
269     """Helper function to declare instances' nodes for locking.
270
271     This function should be called after locking one or more instances to lock
272     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
273     with all primary or secondary nodes for instances already locked and
274     present in self.needed_locks[locking.LEVEL_INSTANCE].
275
276     It should be called from DeclareLocks, and for safety only works if
277     self.recalculate_locks[locking.LEVEL_NODE] is set.
278
279     In the future it may grow parameters to just lock some instance's nodes, or
280     to just lock primaries or secondary nodes, if needed.
281
282     If should be called in DeclareLocks in a way similar to:
283
284     if level == locking.LEVEL_NODE:
285       self._LockInstancesNodes()
286
287     @type primary_only: boolean
288     @param primary_only: only lock primary nodes of locked instances
289
290     """
291     assert locking.LEVEL_NODE in self.recalculate_locks, \
292       "_LockInstancesNodes helper function called with no nodes to recalculate"
293
294     # TODO: check if we're really been called with the instance locks held
295
296     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
297     # future we might want to have different behaviors depending on the value
298     # of self.recalculate_locks[locking.LEVEL_NODE]
299     wanted_nodes = []
300     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
301       instance = self.context.cfg.GetInstanceInfo(instance_name)
302       wanted_nodes.append(instance.primary_node)
303       if not primary_only:
304         wanted_nodes.extend(instance.secondary_nodes)
305
306     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
307       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
308     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
309       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
310
311     del self.recalculate_locks[locking.LEVEL_NODE]
312
313
314 class NoHooksLU(LogicalUnit):
315   """Simple LU which runs no hooks.
316
317   This LU is intended as a parent for other LogicalUnits which will
318   run no hooks, in order to reduce duplicate code.
319
320   """
321   HPATH = None
322   HTYPE = None
323
324
325 def _GetWantedNodes(lu, nodes):
326   """Returns list of checked and expanded node names.
327
328   Args:
329     nodes: List of nodes (strings) or None for all
330
331   """
332   if not isinstance(nodes, list):
333     raise errors.OpPrereqError("Invalid argument type 'nodes'")
334
335   if not nodes:
336     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
337       " non-empty list of nodes whose name is to be expanded.")
338
339   wanted = []
340   for name in nodes:
341     node = lu.cfg.ExpandNodeName(name)
342     if node is None:
343       raise errors.OpPrereqError("No such node name '%s'" % name)
344     wanted.append(node)
345
346   return utils.NiceSort(wanted)
347
348
349 def _GetWantedInstances(lu, instances):
350   """Returns list of checked and expanded instance names.
351
352   Args:
353     instances: List of instances (strings) or None for all
354
355   """
356   if not isinstance(instances, list):
357     raise errors.OpPrereqError("Invalid argument type 'instances'")
358
359   if instances:
360     wanted = []
361
362     for name in instances:
363       instance = lu.cfg.ExpandInstanceName(name)
364       if instance is None:
365         raise errors.OpPrereqError("No such instance name '%s'" % name)
366       wanted.append(instance)
367
368   else:
369     wanted = lu.cfg.GetInstanceList()
370   return utils.NiceSort(wanted)
371
372
373 def _CheckOutputFields(static, dynamic, selected):
374   """Checks whether all selected fields are valid.
375
376   Args:
377     static: Static fields
378     dynamic: Dynamic fields
379
380   """
381   static_fields = frozenset(static)
382   dynamic_fields = frozenset(dynamic)
383
384   all_fields = static_fields | dynamic_fields
385
386   if not all_fields.issuperset(selected):
387     raise errors.OpPrereqError("Unknown output fields selected: %s"
388                                % ",".join(frozenset(selected).
389                                           difference(all_fields)))
390
391
392 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
393                           memory, vcpus, nics):
394   """Builds instance related env variables for hooks from single variables.
395
396   Args:
397     secondary_nodes: List of secondary nodes as strings
398   """
399   env = {
400     "OP_TARGET": name,
401     "INSTANCE_NAME": name,
402     "INSTANCE_PRIMARY": primary_node,
403     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
404     "INSTANCE_OS_TYPE": os_type,
405     "INSTANCE_STATUS": status,
406     "INSTANCE_MEMORY": memory,
407     "INSTANCE_VCPUS": vcpus,
408   }
409
410   if nics:
411     nic_count = len(nics)
412     for idx, (ip, bridge, mac) in enumerate(nics):
413       if ip is None:
414         ip = ""
415       env["INSTANCE_NIC%d_IP" % idx] = ip
416       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
417       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418   else:
419     nic_count = 0
420
421   env["INSTANCE_NIC_COUNT"] = nic_count
422
423   return env
424
425
426 def _BuildInstanceHookEnvByObject(instance, override=None):
427   """Builds instance related env variables for hooks from an object.
428
429   Args:
430     instance: objects.Instance object of instance
431     override: dict of values to override
432   """
433   args = {
434     'name': instance.name,
435     'primary_node': instance.primary_node,
436     'secondary_nodes': instance.secondary_nodes,
437     'os_type': instance.os,
438     'status': instance.os,
439     'memory': instance.memory,
440     'vcpus': instance.vcpus,
441     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
442   }
443   if override:
444     args.update(override)
445   return _BuildInstanceHookEnv(**args)
446
447
448 def _CheckInstanceBridgesExist(instance):
449   """Check that the brigdes needed by an instance exist.
450
451   """
452   # check bridges existance
453   brlist = [nic.bridge for nic in instance.nics]
454   if not rpc.call_bridges_exist(instance.primary_node, brlist):
455     raise errors.OpPrereqError("one or more target bridges %s does not"
456                                " exist on destination node '%s'" %
457                                (brlist, instance.primary_node))
458
459
460 class LUDestroyCluster(NoHooksLU):
461   """Logical unit for destroying the cluster.
462
463   """
464   _OP_REQP = []
465
466   def CheckPrereq(self):
467     """Check prerequisites.
468
469     This checks whether the cluster is empty.
470
471     Any errors are signalled by raising errors.OpPrereqError.
472
473     """
474     master = self.cfg.GetMasterNode()
475
476     nodelist = self.cfg.GetNodeList()
477     if len(nodelist) != 1 or nodelist[0] != master:
478       raise errors.OpPrereqError("There are still %d node(s) in"
479                                  " this cluster." % (len(nodelist) - 1))
480     instancelist = self.cfg.GetInstanceList()
481     if instancelist:
482       raise errors.OpPrereqError("There are still %d instance(s) in"
483                                  " this cluster." % len(instancelist))
484
485   def Exec(self, feedback_fn):
486     """Destroys the cluster.
487
488     """
489     master = self.cfg.GetMasterNode()
490     if not rpc.call_node_stop_master(master, False):
491       raise errors.OpExecError("Could not disable the master role")
492     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
493     utils.CreateBackup(priv_key)
494     utils.CreateBackup(pub_key)
495     return master
496
497
498 class LUVerifyCluster(LogicalUnit):
499   """Verifies the cluster status.
500
501   """
502   HPATH = "cluster-verify"
503   HTYPE = constants.HTYPE_CLUSTER
504   _OP_REQP = ["skip_checks"]
505   REQ_BGL = False
506
507   def ExpandNames(self):
508     self.needed_locks = {
509       locking.LEVEL_NODE: locking.ALL_SET,
510       locking.LEVEL_INSTANCE: locking.ALL_SET,
511     }
512     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
513
514   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
515                   remote_version, feedback_fn):
516     """Run multiple tests against a node.
517
518     Test list:
519       - compares ganeti version
520       - checks vg existance and size > 20G
521       - checks config file checksum
522       - checks ssh to other nodes
523
524     Args:
525       node: name of the node to check
526       file_list: required list of files
527       local_cksum: dictionary of local files and their checksums
528
529     """
530     # compares ganeti version
531     local_version = constants.PROTOCOL_VERSION
532     if not remote_version:
533       feedback_fn("  - ERROR: connection to %s failed" % (node))
534       return True
535
536     if local_version != remote_version:
537       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
538                       (local_version, node, remote_version))
539       return True
540
541     # checks vg existance and size > 20G
542
543     bad = False
544     if not vglist:
545       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
546                       (node,))
547       bad = True
548     else:
549       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
550                                             constants.MIN_VG_SIZE)
551       if vgstatus:
552         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
553         bad = True
554
555     # checks config file checksum
556     # checks ssh to any
557
558     if 'filelist' not in node_result:
559       bad = True
560       feedback_fn("  - ERROR: node hasn't returned file checksum data")
561     else:
562       remote_cksum = node_result['filelist']
563       for file_name in file_list:
564         if file_name not in remote_cksum:
565           bad = True
566           feedback_fn("  - ERROR: file '%s' missing" % file_name)
567         elif remote_cksum[file_name] != local_cksum[file_name]:
568           bad = True
569           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
570
571     if 'nodelist' not in node_result:
572       bad = True
573       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
574     else:
575       if node_result['nodelist']:
576         bad = True
577         for node in node_result['nodelist']:
578           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
579                           (node, node_result['nodelist'][node]))
580     if 'node-net-test' not in node_result:
581       bad = True
582       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
583     else:
584       if node_result['node-net-test']:
585         bad = True
586         nlist = utils.NiceSort(node_result['node-net-test'].keys())
587         for node in nlist:
588           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
589                           (node, node_result['node-net-test'][node]))
590
591     hyp_result = node_result.get('hypervisor', None)
592     if hyp_result is not None:
593       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
594     return bad
595
596   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
597                       node_instance, feedback_fn):
598     """Verify an instance.
599
600     This function checks to see if the required block devices are
601     available on the instance's node.
602
603     """
604     bad = False
605
606     node_current = instanceconfig.primary_node
607
608     node_vol_should = {}
609     instanceconfig.MapLVsByNode(node_vol_should)
610
611     for node in node_vol_should:
612       for volume in node_vol_should[node]:
613         if node not in node_vol_is or volume not in node_vol_is[node]:
614           feedback_fn("  - ERROR: volume %s missing on node %s" %
615                           (volume, node))
616           bad = True
617
618     if not instanceconfig.status == 'down':
619       if (node_current not in node_instance or
620           not instance in node_instance[node_current]):
621         feedback_fn("  - ERROR: instance %s not running on node %s" %
622                         (instance, node_current))
623         bad = True
624
625     for node in node_instance:
626       if (not node == node_current):
627         if instance in node_instance[node]:
628           feedback_fn("  - ERROR: instance %s should not run on node %s" %
629                           (instance, node))
630           bad = True
631
632     return bad
633
634   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
635     """Verify if there are any unknown volumes in the cluster.
636
637     The .os, .swap and backup volumes are ignored. All other volumes are
638     reported as unknown.
639
640     """
641     bad = False
642
643     for node in node_vol_is:
644       for volume in node_vol_is[node]:
645         if node not in node_vol_should or volume not in node_vol_should[node]:
646           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
647                       (volume, node))
648           bad = True
649     return bad
650
651   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
652     """Verify the list of running instances.
653
654     This checks what instances are running but unknown to the cluster.
655
656     """
657     bad = False
658     for node in node_instance:
659       for runninginstance in node_instance[node]:
660         if runninginstance not in instancelist:
661           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
662                           (runninginstance, node))
663           bad = True
664     return bad
665
666   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
667     """Verify N+1 Memory Resilience.
668
669     Check that if one single node dies we can still start all the instances it
670     was primary for.
671
672     """
673     bad = False
674
675     for node, nodeinfo in node_info.iteritems():
676       # This code checks that every node which is now listed as secondary has
677       # enough memory to host all instances it is supposed to should a single
678       # other node in the cluster fail.
679       # FIXME: not ready for failover to an arbitrary node
680       # FIXME: does not support file-backed instances
681       # WARNING: we currently take into account down instances as well as up
682       # ones, considering that even if they're down someone might want to start
683       # them even in the event of a node failure.
684       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
685         needed_mem = 0
686         for instance in instances:
687           needed_mem += instance_cfg[instance].memory
688         if nodeinfo['mfree'] < needed_mem:
689           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
690                       " failovers should node %s fail" % (node, prinode))
691           bad = True
692     return bad
693
694   def CheckPrereq(self):
695     """Check prerequisites.
696
697     Transform the list of checks we're going to skip into a set and check that
698     all its members are valid.
699
700     """
701     self.skip_set = frozenset(self.op.skip_checks)
702     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
703       raise errors.OpPrereqError("Invalid checks to be skipped specified")
704
705   def BuildHooksEnv(self):
706     """Build hooks env.
707
708     Cluster-Verify hooks just rone in the post phase and their failure makes
709     the output be logged in the verify output and the verification to fail.
710
711     """
712     all_nodes = self.cfg.GetNodeList()
713     # TODO: populate the environment with useful information for verify hooks
714     env = {}
715     return env, [], all_nodes
716
717   def Exec(self, feedback_fn):
718     """Verify integrity of cluster, performing various test on nodes.
719
720     """
721     bad = False
722     feedback_fn("* Verifying global settings")
723     for msg in self.cfg.VerifyConfig():
724       feedback_fn("  - ERROR: %s" % msg)
725
726     vg_name = self.cfg.GetVGName()
727     nodelist = utils.NiceSort(self.cfg.GetNodeList())
728     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
729     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
730     i_non_redundant = [] # Non redundant instances
731     node_volume = {}
732     node_instance = {}
733     node_info = {}
734     instance_cfg = {}
735
736     # FIXME: verify OS list
737     # do local checksums
738     file_names = []
739     file_names.append(constants.SSL_CERT_FILE)
740     file_names.append(constants.CLUSTER_CONF_FILE)
741     local_checksums = utils.FingerprintFiles(file_names)
742
743     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
744     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
745     all_instanceinfo = rpc.call_instance_list(nodelist)
746     all_vglist = rpc.call_vg_list(nodelist)
747     node_verify_param = {
748       'filelist': file_names,
749       'nodelist': nodelist,
750       'hypervisor': None,
751       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
752                         for node in nodeinfo]
753       }
754     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
755     all_rversion = rpc.call_version(nodelist)
756     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
757
758     for node in nodelist:
759       feedback_fn("* Verifying node %s" % node)
760       result = self._VerifyNode(node, file_names, local_checksums,
761                                 all_vglist[node], all_nvinfo[node],
762                                 all_rversion[node], feedback_fn)
763       bad = bad or result
764
765       # node_volume
766       volumeinfo = all_volumeinfo[node]
767
768       if isinstance(volumeinfo, basestring):
769         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
770                     (node, volumeinfo[-400:].encode('string_escape')))
771         bad = True
772         node_volume[node] = {}
773       elif not isinstance(volumeinfo, dict):
774         feedback_fn("  - ERROR: connection to %s failed" % (node,))
775         bad = True
776         continue
777       else:
778         node_volume[node] = volumeinfo
779
780       # node_instance
781       nodeinstance = all_instanceinfo[node]
782       if type(nodeinstance) != list:
783         feedback_fn("  - ERROR: connection to %s failed" % (node,))
784         bad = True
785         continue
786
787       node_instance[node] = nodeinstance
788
789       # node_info
790       nodeinfo = all_ninfo[node]
791       if not isinstance(nodeinfo, dict):
792         feedback_fn("  - ERROR: connection to %s failed" % (node,))
793         bad = True
794         continue
795
796       try:
797         node_info[node] = {
798           "mfree": int(nodeinfo['memory_free']),
799           "dfree": int(nodeinfo['vg_free']),
800           "pinst": [],
801           "sinst": [],
802           # dictionary holding all instances this node is secondary for,
803           # grouped by their primary node. Each key is a cluster node, and each
804           # value is a list of instances which have the key as primary and the
805           # current node as secondary.  this is handy to calculate N+1 memory
806           # availability if you can only failover from a primary to its
807           # secondary.
808           "sinst-by-pnode": {},
809         }
810       except ValueError:
811         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
812         bad = True
813         continue
814
815     node_vol_should = {}
816
817     for instance in instancelist:
818       feedback_fn("* Verifying instance %s" % instance)
819       inst_config = self.cfg.GetInstanceInfo(instance)
820       result =  self._VerifyInstance(instance, inst_config, node_volume,
821                                      node_instance, feedback_fn)
822       bad = bad or result
823
824       inst_config.MapLVsByNode(node_vol_should)
825
826       instance_cfg[instance] = inst_config
827
828       pnode = inst_config.primary_node
829       if pnode in node_info:
830         node_info[pnode]['pinst'].append(instance)
831       else:
832         feedback_fn("  - ERROR: instance %s, connection to primary node"
833                     " %s failed" % (instance, pnode))
834         bad = True
835
836       # If the instance is non-redundant we cannot survive losing its primary
837       # node, so we are not N+1 compliant. On the other hand we have no disk
838       # templates with more than one secondary so that situation is not well
839       # supported either.
840       # FIXME: does not support file-backed instances
841       if len(inst_config.secondary_nodes) == 0:
842         i_non_redundant.append(instance)
843       elif len(inst_config.secondary_nodes) > 1:
844         feedback_fn("  - WARNING: multiple secondaries for instance %s"
845                     % instance)
846
847       for snode in inst_config.secondary_nodes:
848         if snode in node_info:
849           node_info[snode]['sinst'].append(instance)
850           if pnode not in node_info[snode]['sinst-by-pnode']:
851             node_info[snode]['sinst-by-pnode'][pnode] = []
852           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
853         else:
854           feedback_fn("  - ERROR: instance %s, connection to secondary node"
855                       " %s failed" % (instance, snode))
856
857     feedback_fn("* Verifying orphan volumes")
858     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
859                                        feedback_fn)
860     bad = bad or result
861
862     feedback_fn("* Verifying remaining instances")
863     result = self._VerifyOrphanInstances(instancelist, node_instance,
864                                          feedback_fn)
865     bad = bad or result
866
867     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
868       feedback_fn("* Verifying N+1 Memory redundancy")
869       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
870       bad = bad or result
871
872     feedback_fn("* Other Notes")
873     if i_non_redundant:
874       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
875                   % len(i_non_redundant))
876
877     return not bad
878
879   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
880     """Analize the post-hooks' result, handle it, and send some
881     nicely-formatted feedback back to the user.
882
883     Args:
884       phase: the hooks phase that has just been run
885       hooks_results: the results of the multi-node hooks rpc call
886       feedback_fn: function to send feedback back to the caller
887       lu_result: previous Exec result
888
889     """
890     # We only really run POST phase hooks, and are only interested in
891     # their results
892     if phase == constants.HOOKS_PHASE_POST:
893       # Used to change hooks' output to proper indentation
894       indent_re = re.compile('^', re.M)
895       feedback_fn("* Hooks Results")
896       if not hooks_results:
897         feedback_fn("  - ERROR: general communication failure")
898         lu_result = 1
899       else:
900         for node_name in hooks_results:
901           show_node_header = True
902           res = hooks_results[node_name]
903           if res is False or not isinstance(res, list):
904             feedback_fn("    Communication failure")
905             lu_result = 1
906             continue
907           for script, hkr, output in res:
908             if hkr == constants.HKR_FAIL:
909               # The node header is only shown once, if there are
910               # failing hooks on that node
911               if show_node_header:
912                 feedback_fn("  Node %s:" % node_name)
913                 show_node_header = False
914               feedback_fn("    ERROR: Script %s failed, output:" % script)
915               output = indent_re.sub('      ', output)
916               feedback_fn("%s" % output)
917               lu_result = 1
918
919       return lu_result
920
921
922 class LUVerifyDisks(NoHooksLU):
923   """Verifies the cluster disks status.
924
925   """
926   _OP_REQP = []
927   REQ_BGL = False
928
929   def ExpandNames(self):
930     self.needed_locks = {
931       locking.LEVEL_NODE: locking.ALL_SET,
932       locking.LEVEL_INSTANCE: locking.ALL_SET,
933     }
934     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
935
936   def CheckPrereq(self):
937     """Check prerequisites.
938
939     This has no prerequisites.
940
941     """
942     pass
943
944   def Exec(self, feedback_fn):
945     """Verify integrity of cluster disks.
946
947     """
948     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
949
950     vg_name = self.cfg.GetVGName()
951     nodes = utils.NiceSort(self.cfg.GetNodeList())
952     instances = [self.cfg.GetInstanceInfo(name)
953                  for name in self.cfg.GetInstanceList()]
954
955     nv_dict = {}
956     for inst in instances:
957       inst_lvs = {}
958       if (inst.status != "up" or
959           inst.disk_template not in constants.DTS_NET_MIRROR):
960         continue
961       inst.MapLVsByNode(inst_lvs)
962       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
963       for node, vol_list in inst_lvs.iteritems():
964         for vol in vol_list:
965           nv_dict[(node, vol)] = inst
966
967     if not nv_dict:
968       return result
969
970     node_lvs = rpc.call_volume_list(nodes, vg_name)
971
972     to_act = set()
973     for node in nodes:
974       # node_volume
975       lvs = node_lvs[node]
976
977       if isinstance(lvs, basestring):
978         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
979         res_nlvm[node] = lvs
980       elif not isinstance(lvs, dict):
981         logger.Info("connection to node %s failed or invalid data returned" %
982                     (node,))
983         res_nodes.append(node)
984         continue
985
986       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
987         inst = nv_dict.pop((node, lv_name), None)
988         if (not lv_online and inst is not None
989             and inst.name not in res_instances):
990           res_instances.append(inst.name)
991
992     # any leftover items in nv_dict are missing LVs, let's arrange the
993     # data better
994     for key, inst in nv_dict.iteritems():
995       if inst.name not in res_missing:
996         res_missing[inst.name] = []
997       res_missing[inst.name].append(key)
998
999     return result
1000
1001
1002 class LURenameCluster(LogicalUnit):
1003   """Rename the cluster.
1004
1005   """
1006   HPATH = "cluster-rename"
1007   HTYPE = constants.HTYPE_CLUSTER
1008   _OP_REQP = ["name"]
1009
1010   def BuildHooksEnv(self):
1011     """Build hooks env.
1012
1013     """
1014     env = {
1015       "OP_TARGET": self.cfg.GetClusterName(),
1016       "NEW_NAME": self.op.name,
1017       }
1018     mn = self.cfg.GetMasterNode()
1019     return env, [mn], [mn]
1020
1021   def CheckPrereq(self):
1022     """Verify that the passed name is a valid one.
1023
1024     """
1025     hostname = utils.HostInfo(self.op.name)
1026
1027     new_name = hostname.name
1028     self.ip = new_ip = hostname.ip
1029     old_name = self.cfg.GetClusterName()
1030     old_ip = self.cfg.GetMasterIP()
1031     if new_name == old_name and new_ip == old_ip:
1032       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1033                                  " cluster has changed")
1034     if new_ip != old_ip:
1035       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1036         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1037                                    " reachable on the network. Aborting." %
1038                                    new_ip)
1039
1040     self.op.name = new_name
1041
1042   def Exec(self, feedback_fn):
1043     """Rename the cluster.
1044
1045     """
1046     clustername = self.op.name
1047     ip = self.ip
1048
1049     # shutdown the master IP
1050     master = self.cfg.GetMasterNode()
1051     if not rpc.call_node_stop_master(master, False):
1052       raise errors.OpExecError("Could not disable the master role")
1053
1054     try:
1055       # modify the sstore
1056       # TODO: sstore
1057       ss.SetKey(ss.SS_MASTER_IP, ip)
1058       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1059
1060       # Distribute updated ss config to all nodes
1061       myself = self.cfg.GetNodeInfo(master)
1062       dist_nodes = self.cfg.GetNodeList()
1063       if myself.name in dist_nodes:
1064         dist_nodes.remove(myself.name)
1065
1066       logger.Debug("Copying updated ssconf data to all nodes")
1067       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1068         fname = ss.KeyToFilename(keyname)
1069         result = rpc.call_upload_file(dist_nodes, fname)
1070         for to_node in dist_nodes:
1071           if not result[to_node]:
1072             logger.Error("copy of file %s to node %s failed" %
1073                          (fname, to_node))
1074     finally:
1075       if not rpc.call_node_start_master(master, False):
1076         logger.Error("Could not re-enable the master role on the master,"
1077                      " please restart manually.")
1078
1079
1080 def _RecursiveCheckIfLVMBased(disk):
1081   """Check if the given disk or its children are lvm-based.
1082
1083   Args:
1084     disk: ganeti.objects.Disk object
1085
1086   Returns:
1087     boolean indicating whether a LD_LV dev_type was found or not
1088
1089   """
1090   if disk.children:
1091     for chdisk in disk.children:
1092       if _RecursiveCheckIfLVMBased(chdisk):
1093         return True
1094   return disk.dev_type == constants.LD_LV
1095
1096
1097 class LUSetClusterParams(LogicalUnit):
1098   """Change the parameters of the cluster.
1099
1100   """
1101   HPATH = "cluster-modify"
1102   HTYPE = constants.HTYPE_CLUSTER
1103   _OP_REQP = []
1104   REQ_BGL = False
1105
1106   def ExpandNames(self):
1107     # FIXME: in the future maybe other cluster params won't require checking on
1108     # all nodes to be modified.
1109     self.needed_locks = {
1110       locking.LEVEL_NODE: locking.ALL_SET,
1111     }
1112     self.share_locks[locking.LEVEL_NODE] = 1
1113
1114   def BuildHooksEnv(self):
1115     """Build hooks env.
1116
1117     """
1118     env = {
1119       "OP_TARGET": self.cfg.GetClusterName(),
1120       "NEW_VG_NAME": self.op.vg_name,
1121       }
1122     mn = self.cfg.GetMasterNode()
1123     return env, [mn], [mn]
1124
1125   def CheckPrereq(self):
1126     """Check prerequisites.
1127
1128     This checks whether the given params don't conflict and
1129     if the given volume group is valid.
1130
1131     """
1132     # FIXME: This only works because there is only one parameter that can be
1133     # changed or removed.
1134     if not self.op.vg_name:
1135       instances = self.cfg.GetAllInstancesInfo().values()
1136       for inst in instances:
1137         for disk in inst.disks:
1138           if _RecursiveCheckIfLVMBased(disk):
1139             raise errors.OpPrereqError("Cannot disable lvm storage while"
1140                                        " lvm-based instances exist")
1141
1142     # if vg_name not None, checks given volume group on all nodes
1143     if self.op.vg_name:
1144       node_list = self.acquired_locks[locking.LEVEL_NODE]
1145       vglist = rpc.call_vg_list(node_list)
1146       for node in node_list:
1147         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1148                                               constants.MIN_VG_SIZE)
1149         if vgstatus:
1150           raise errors.OpPrereqError("Error on node '%s': %s" %
1151                                      (node, vgstatus))
1152
1153   def Exec(self, feedback_fn):
1154     """Change the parameters of the cluster.
1155
1156     """
1157     if self.op.vg_name != self.cfg.GetVGName():
1158       self.cfg.SetVGName(self.op.vg_name)
1159     else:
1160       feedback_fn("Cluster LVM configuration already in desired"
1161                   " state, not changing")
1162
1163
1164 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1165   """Sleep and poll for an instance's disk to sync.
1166
1167   """
1168   if not instance.disks:
1169     return True
1170
1171   if not oneshot:
1172     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1173
1174   node = instance.primary_node
1175
1176   for dev in instance.disks:
1177     cfgw.SetDiskID(dev, node)
1178
1179   retries = 0
1180   while True:
1181     max_time = 0
1182     done = True
1183     cumul_degraded = False
1184     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1185     if not rstats:
1186       proc.LogWarning("Can't get any data from node %s" % node)
1187       retries += 1
1188       if retries >= 10:
1189         raise errors.RemoteError("Can't contact node %s for mirror data,"
1190                                  " aborting." % node)
1191       time.sleep(6)
1192       continue
1193     retries = 0
1194     for i in range(len(rstats)):
1195       mstat = rstats[i]
1196       if mstat is None:
1197         proc.LogWarning("Can't compute data for node %s/%s" %
1198                         (node, instance.disks[i].iv_name))
1199         continue
1200       # we ignore the ldisk parameter
1201       perc_done, est_time, is_degraded, _ = mstat
1202       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1203       if perc_done is not None:
1204         done = False
1205         if est_time is not None:
1206           rem_time = "%d estimated seconds remaining" % est_time
1207           max_time = est_time
1208         else:
1209           rem_time = "no time estimate"
1210         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1211                      (instance.disks[i].iv_name, perc_done, rem_time))
1212     if done or oneshot:
1213       break
1214
1215     time.sleep(min(60, max_time))
1216
1217   if done:
1218     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1219   return not cumul_degraded
1220
1221
1222 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1223   """Check that mirrors are not degraded.
1224
1225   The ldisk parameter, if True, will change the test from the
1226   is_degraded attribute (which represents overall non-ok status for
1227   the device(s)) to the ldisk (representing the local storage status).
1228
1229   """
1230   cfgw.SetDiskID(dev, node)
1231   if ldisk:
1232     idx = 6
1233   else:
1234     idx = 5
1235
1236   result = True
1237   if on_primary or dev.AssembleOnSecondary():
1238     rstats = rpc.call_blockdev_find(node, dev)
1239     if not rstats:
1240       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1241       result = False
1242     else:
1243       result = result and (not rstats[idx])
1244   if dev.children:
1245     for child in dev.children:
1246       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1247
1248   return result
1249
1250
1251 class LUDiagnoseOS(NoHooksLU):
1252   """Logical unit for OS diagnose/query.
1253
1254   """
1255   _OP_REQP = ["output_fields", "names"]
1256   REQ_BGL = False
1257
1258   def ExpandNames(self):
1259     if self.op.names:
1260       raise errors.OpPrereqError("Selective OS query not supported")
1261
1262     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1263     _CheckOutputFields(static=[],
1264                        dynamic=self.dynamic_fields,
1265                        selected=self.op.output_fields)
1266
1267     # Lock all nodes, in shared mode
1268     self.needed_locks = {}
1269     self.share_locks[locking.LEVEL_NODE] = 1
1270     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1271
1272   def CheckPrereq(self):
1273     """Check prerequisites.
1274
1275     """
1276
1277   @staticmethod
1278   def _DiagnoseByOS(node_list, rlist):
1279     """Remaps a per-node return list into an a per-os per-node dictionary
1280
1281       Args:
1282         node_list: a list with the names of all nodes
1283         rlist: a map with node names as keys and OS objects as values
1284
1285       Returns:
1286         map: a map with osnames as keys and as value another map, with
1287              nodes as
1288              keys and list of OS objects as values
1289              e.g. {"debian-etch": {"node1": [<object>,...],
1290                                    "node2": [<object>,]}
1291                   }
1292
1293     """
1294     all_os = {}
1295     for node_name, nr in rlist.iteritems():
1296       if not nr:
1297         continue
1298       for os_obj in nr:
1299         if os_obj.name not in all_os:
1300           # build a list of nodes for this os containing empty lists
1301           # for each node in node_list
1302           all_os[os_obj.name] = {}
1303           for nname in node_list:
1304             all_os[os_obj.name][nname] = []
1305         all_os[os_obj.name][node_name].append(os_obj)
1306     return all_os
1307
1308   def Exec(self, feedback_fn):
1309     """Compute the list of OSes.
1310
1311     """
1312     node_list = self.acquired_locks[locking.LEVEL_NODE]
1313     node_data = rpc.call_os_diagnose(node_list)
1314     if node_data == False:
1315       raise errors.OpExecError("Can't gather the list of OSes")
1316     pol = self._DiagnoseByOS(node_list, node_data)
1317     output = []
1318     for os_name, os_data in pol.iteritems():
1319       row = []
1320       for field in self.op.output_fields:
1321         if field == "name":
1322           val = os_name
1323         elif field == "valid":
1324           val = utils.all([osl and osl[0] for osl in os_data.values()])
1325         elif field == "node_status":
1326           val = {}
1327           for node_name, nos_list in os_data.iteritems():
1328             val[node_name] = [(v.status, v.path) for v in nos_list]
1329         else:
1330           raise errors.ParameterError(field)
1331         row.append(val)
1332       output.append(row)
1333
1334     return output
1335
1336
1337 class LURemoveNode(LogicalUnit):
1338   """Logical unit for removing a node.
1339
1340   """
1341   HPATH = "node-remove"
1342   HTYPE = constants.HTYPE_NODE
1343   _OP_REQP = ["node_name"]
1344
1345   def BuildHooksEnv(self):
1346     """Build hooks env.
1347
1348     This doesn't run on the target node in the pre phase as a failed
1349     node would then be impossible to remove.
1350
1351     """
1352     env = {
1353       "OP_TARGET": self.op.node_name,
1354       "NODE_NAME": self.op.node_name,
1355       }
1356     all_nodes = self.cfg.GetNodeList()
1357     all_nodes.remove(self.op.node_name)
1358     return env, all_nodes, all_nodes
1359
1360   def CheckPrereq(self):
1361     """Check prerequisites.
1362
1363     This checks:
1364      - the node exists in the configuration
1365      - it does not have primary or secondary instances
1366      - it's not the master
1367
1368     Any errors are signalled by raising errors.OpPrereqError.
1369
1370     """
1371     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1372     if node is None:
1373       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1374
1375     instance_list = self.cfg.GetInstanceList()
1376
1377     masternode = self.cfg.GetMasterNode()
1378     if node.name == masternode:
1379       raise errors.OpPrereqError("Node is the master node,"
1380                                  " you need to failover first.")
1381
1382     for instance_name in instance_list:
1383       instance = self.cfg.GetInstanceInfo(instance_name)
1384       if node.name == instance.primary_node:
1385         raise errors.OpPrereqError("Instance %s still running on the node,"
1386                                    " please remove first." % instance_name)
1387       if node.name in instance.secondary_nodes:
1388         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1389                                    " please remove first." % instance_name)
1390     self.op.node_name = node.name
1391     self.node = node
1392
1393   def Exec(self, feedback_fn):
1394     """Removes the node from the cluster.
1395
1396     """
1397     node = self.node
1398     logger.Info("stopping the node daemon and removing configs from node %s" %
1399                 node.name)
1400
1401     self.context.RemoveNode(node.name)
1402
1403     rpc.call_node_leave_cluster(node.name)
1404
1405
1406 class LUQueryNodes(NoHooksLU):
1407   """Logical unit for querying nodes.
1408
1409   """
1410   _OP_REQP = ["output_fields", "names"]
1411   REQ_BGL = False
1412
1413   def ExpandNames(self):
1414     self.dynamic_fields = frozenset([
1415       "dtotal", "dfree",
1416       "mtotal", "mnode", "mfree",
1417       "bootid",
1418       "ctotal",
1419       ])
1420
1421     self.static_fields = frozenset([
1422       "name", "pinst_cnt", "sinst_cnt",
1423       "pinst_list", "sinst_list",
1424       "pip", "sip", "tags",
1425       "serial_no",
1426       ])
1427
1428     _CheckOutputFields(static=self.static_fields,
1429                        dynamic=self.dynamic_fields,
1430                        selected=self.op.output_fields)
1431
1432     self.needed_locks = {}
1433     self.share_locks[locking.LEVEL_NODE] = 1
1434
1435     if self.op.names:
1436       self.wanted = _GetWantedNodes(self, self.op.names)
1437     else:
1438       self.wanted = locking.ALL_SET
1439
1440     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1441     if self.do_locking:
1442       # if we don't request only static fields, we need to lock the nodes
1443       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1444
1445
1446   def CheckPrereq(self):
1447     """Check prerequisites.
1448
1449     """
1450     # The validation of the node list is done in the _GetWantedNodes,
1451     # if non empty, and if empty, there's no validation to do
1452     pass
1453
1454   def Exec(self, feedback_fn):
1455     """Computes the list of nodes and their attributes.
1456
1457     """
1458     all_info = self.cfg.GetAllNodesInfo()
1459     if self.do_locking:
1460       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1461     elif self.wanted != locking.ALL_SET:
1462       nodenames = self.wanted
1463       missing = set(nodenames).difference(all_info.keys())
1464       if missing:
1465         raise self.OpExecError(
1466           "Some nodes were removed before retrieving their data: %s" % missing)
1467     else:
1468       nodenames = all_info.keys()
1469     nodelist = [all_info[name] for name in nodenames]
1470
1471     # begin data gathering
1472
1473     if self.dynamic_fields.intersection(self.op.output_fields):
1474       live_data = {}
1475       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1476       for name in nodenames:
1477         nodeinfo = node_data.get(name, None)
1478         if nodeinfo:
1479           live_data[name] = {
1480             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1481             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1482             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1483             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1484             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1485             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1486             "bootid": nodeinfo['bootid'],
1487             }
1488         else:
1489           live_data[name] = {}
1490     else:
1491       live_data = dict.fromkeys(nodenames, {})
1492
1493     node_to_primary = dict([(name, set()) for name in nodenames])
1494     node_to_secondary = dict([(name, set()) for name in nodenames])
1495
1496     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1497                              "sinst_cnt", "sinst_list"))
1498     if inst_fields & frozenset(self.op.output_fields):
1499       instancelist = self.cfg.GetInstanceList()
1500
1501       for instance_name in instancelist:
1502         inst = self.cfg.GetInstanceInfo(instance_name)
1503         if inst.primary_node in node_to_primary:
1504           node_to_primary[inst.primary_node].add(inst.name)
1505         for secnode in inst.secondary_nodes:
1506           if secnode in node_to_secondary:
1507             node_to_secondary[secnode].add(inst.name)
1508
1509     # end data gathering
1510
1511     output = []
1512     for node in nodelist:
1513       node_output = []
1514       for field in self.op.output_fields:
1515         if field == "name":
1516           val = node.name
1517         elif field == "pinst_list":
1518           val = list(node_to_primary[node.name])
1519         elif field == "sinst_list":
1520           val = list(node_to_secondary[node.name])
1521         elif field == "pinst_cnt":
1522           val = len(node_to_primary[node.name])
1523         elif field == "sinst_cnt":
1524           val = len(node_to_secondary[node.name])
1525         elif field == "pip":
1526           val = node.primary_ip
1527         elif field == "sip":
1528           val = node.secondary_ip
1529         elif field == "tags":
1530           val = list(node.GetTags())
1531         elif field == "serial_no":
1532           val = node.serial_no
1533         elif field in self.dynamic_fields:
1534           val = live_data[node.name].get(field, None)
1535         else:
1536           raise errors.ParameterError(field)
1537         node_output.append(val)
1538       output.append(node_output)
1539
1540     return output
1541
1542
1543 class LUQueryNodeVolumes(NoHooksLU):
1544   """Logical unit for getting volumes on node(s).
1545
1546   """
1547   _OP_REQP = ["nodes", "output_fields"]
1548   REQ_BGL = False
1549
1550   def ExpandNames(self):
1551     _CheckOutputFields(static=["node"],
1552                        dynamic=["phys", "vg", "name", "size", "instance"],
1553                        selected=self.op.output_fields)
1554
1555     self.needed_locks = {}
1556     self.share_locks[locking.LEVEL_NODE] = 1
1557     if not self.op.nodes:
1558       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1559     else:
1560       self.needed_locks[locking.LEVEL_NODE] = \
1561         _GetWantedNodes(self, self.op.nodes)
1562
1563   def CheckPrereq(self):
1564     """Check prerequisites.
1565
1566     This checks that the fields required are valid output fields.
1567
1568     """
1569     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1570
1571   def Exec(self, feedback_fn):
1572     """Computes the list of nodes and their attributes.
1573
1574     """
1575     nodenames = self.nodes
1576     volumes = rpc.call_node_volumes(nodenames)
1577
1578     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1579              in self.cfg.GetInstanceList()]
1580
1581     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1582
1583     output = []
1584     for node in nodenames:
1585       if node not in volumes or not volumes[node]:
1586         continue
1587
1588       node_vols = volumes[node][:]
1589       node_vols.sort(key=lambda vol: vol['dev'])
1590
1591       for vol in node_vols:
1592         node_output = []
1593         for field in self.op.output_fields:
1594           if field == "node":
1595             val = node
1596           elif field == "phys":
1597             val = vol['dev']
1598           elif field == "vg":
1599             val = vol['vg']
1600           elif field == "name":
1601             val = vol['name']
1602           elif field == "size":
1603             val = int(float(vol['size']))
1604           elif field == "instance":
1605             for inst in ilist:
1606               if node not in lv_by_node[inst]:
1607                 continue
1608               if vol['name'] in lv_by_node[inst][node]:
1609                 val = inst.name
1610                 break
1611             else:
1612               val = '-'
1613           else:
1614             raise errors.ParameterError(field)
1615           node_output.append(str(val))
1616
1617         output.append(node_output)
1618
1619     return output
1620
1621
1622 class LUAddNode(LogicalUnit):
1623   """Logical unit for adding node to the cluster.
1624
1625   """
1626   HPATH = "node-add"
1627   HTYPE = constants.HTYPE_NODE
1628   _OP_REQP = ["node_name"]
1629
1630   def BuildHooksEnv(self):
1631     """Build hooks env.
1632
1633     This will run on all nodes before, and on all nodes + the new node after.
1634
1635     """
1636     env = {
1637       "OP_TARGET": self.op.node_name,
1638       "NODE_NAME": self.op.node_name,
1639       "NODE_PIP": self.op.primary_ip,
1640       "NODE_SIP": self.op.secondary_ip,
1641       }
1642     nodes_0 = self.cfg.GetNodeList()
1643     nodes_1 = nodes_0 + [self.op.node_name, ]
1644     return env, nodes_0, nodes_1
1645
1646   def CheckPrereq(self):
1647     """Check prerequisites.
1648
1649     This checks:
1650      - the new node is not already in the config
1651      - it is resolvable
1652      - its parameters (single/dual homed) matches the cluster
1653
1654     Any errors are signalled by raising errors.OpPrereqError.
1655
1656     """
1657     node_name = self.op.node_name
1658     cfg = self.cfg
1659
1660     dns_data = utils.HostInfo(node_name)
1661
1662     node = dns_data.name
1663     primary_ip = self.op.primary_ip = dns_data.ip
1664     secondary_ip = getattr(self.op, "secondary_ip", None)
1665     if secondary_ip is None:
1666       secondary_ip = primary_ip
1667     if not utils.IsValidIP(secondary_ip):
1668       raise errors.OpPrereqError("Invalid secondary IP given")
1669     self.op.secondary_ip = secondary_ip
1670
1671     node_list = cfg.GetNodeList()
1672     if not self.op.readd and node in node_list:
1673       raise errors.OpPrereqError("Node %s is already in the configuration" %
1674                                  node)
1675     elif self.op.readd and node not in node_list:
1676       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1677
1678     for existing_node_name in node_list:
1679       existing_node = cfg.GetNodeInfo(existing_node_name)
1680
1681       if self.op.readd and node == existing_node_name:
1682         if (existing_node.primary_ip != primary_ip or
1683             existing_node.secondary_ip != secondary_ip):
1684           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1685                                      " address configuration as before")
1686         continue
1687
1688       if (existing_node.primary_ip == primary_ip or
1689           existing_node.secondary_ip == primary_ip or
1690           existing_node.primary_ip == secondary_ip or
1691           existing_node.secondary_ip == secondary_ip):
1692         raise errors.OpPrereqError("New node ip address(es) conflict with"
1693                                    " existing node %s" % existing_node.name)
1694
1695     # check that the type of the node (single versus dual homed) is the
1696     # same as for the master
1697     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1698     master_singlehomed = myself.secondary_ip == myself.primary_ip
1699     newbie_singlehomed = secondary_ip == primary_ip
1700     if master_singlehomed != newbie_singlehomed:
1701       if master_singlehomed:
1702         raise errors.OpPrereqError("The master has no private ip but the"
1703                                    " new node has one")
1704       else:
1705         raise errors.OpPrereqError("The master has a private ip but the"
1706                                    " new node doesn't have one")
1707
1708     # checks reachablity
1709     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1710       raise errors.OpPrereqError("Node not reachable by ping")
1711
1712     if not newbie_singlehomed:
1713       # check reachability from my secondary ip to newbie's secondary ip
1714       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1715                            source=myself.secondary_ip):
1716         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1717                                    " based ping to noded port")
1718
1719     self.new_node = objects.Node(name=node,
1720                                  primary_ip=primary_ip,
1721                                  secondary_ip=secondary_ip)
1722
1723   def Exec(self, feedback_fn):
1724     """Adds the new node to the cluster.
1725
1726     """
1727     new_node = self.new_node
1728     node = new_node.name
1729
1730     # check connectivity
1731     result = rpc.call_version([node])[node]
1732     if result:
1733       if constants.PROTOCOL_VERSION == result:
1734         logger.Info("communication to node %s fine, sw version %s match" %
1735                     (node, result))
1736       else:
1737         raise errors.OpExecError("Version mismatch master version %s,"
1738                                  " node version %s" %
1739                                  (constants.PROTOCOL_VERSION, result))
1740     else:
1741       raise errors.OpExecError("Cannot get version from the new node")
1742
1743     # setup ssh on node
1744     logger.Info("copy ssh key to node %s" % node)
1745     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1746     keyarray = []
1747     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1748                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1749                 priv_key, pub_key]
1750
1751     for i in keyfiles:
1752       f = open(i, 'r')
1753       try:
1754         keyarray.append(f.read())
1755       finally:
1756         f.close()
1757
1758     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1759                                keyarray[3], keyarray[4], keyarray[5])
1760
1761     if not result:
1762       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1763
1764     # Add node to our /etc/hosts, and add key to known_hosts
1765     utils.AddHostToEtcHosts(new_node.name)
1766
1767     if new_node.secondary_ip != new_node.primary_ip:
1768       if not rpc.call_node_tcp_ping(new_node.name,
1769                                     constants.LOCALHOST_IP_ADDRESS,
1770                                     new_node.secondary_ip,
1771                                     constants.DEFAULT_NODED_PORT,
1772                                     10, False):
1773         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1774                                  " you gave (%s). Please fix and re-run this"
1775                                  " command." % new_node.secondary_ip)
1776
1777     node_verify_list = [self.cfg.GetMasterNode()]
1778     node_verify_param = {
1779       'nodelist': [node],
1780       # TODO: do a node-net-test as well?
1781     }
1782
1783     result = rpc.call_node_verify(node_verify_list, node_verify_param)
1784     for verifier in node_verify_list:
1785       if not result[verifier]:
1786         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1787                                  " for remote verification" % verifier)
1788       if result[verifier]['nodelist']:
1789         for failed in result[verifier]['nodelist']:
1790           feedback_fn("ssh/hostname verification failed %s -> %s" %
1791                       (verifier, result[verifier]['nodelist'][failed]))
1792         raise errors.OpExecError("ssh/hostname verification failed.")
1793
1794     # Distribute updated /etc/hosts and known_hosts to all nodes,
1795     # including the node just added
1796     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1797     dist_nodes = self.cfg.GetNodeList()
1798     if not self.op.readd:
1799       dist_nodes.append(node)
1800     if myself.name in dist_nodes:
1801       dist_nodes.remove(myself.name)
1802
1803     logger.Debug("Copying hosts and known_hosts to all nodes")
1804     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1805       result = rpc.call_upload_file(dist_nodes, fname)
1806       for to_node in dist_nodes:
1807         if not result[to_node]:
1808           logger.Error("copy of file %s to node %s failed" %
1809                        (fname, to_node))
1810
1811     to_copy = []
1812     if self.cfg.GetHypervisorType() == constants.HT_XEN_HVM31:
1813       to_copy.append(constants.VNC_PASSWORD_FILE)
1814     for fname in to_copy:
1815       result = rpc.call_upload_file([node], fname)
1816       if not result[node]:
1817         logger.Error("could not copy file %s to node %s" % (fname, node))
1818
1819     if self.op.readd:
1820       self.context.ReaddNode(new_node)
1821     else:
1822       self.context.AddNode(new_node)
1823
1824
1825 class LUQueryClusterInfo(NoHooksLU):
1826   """Query cluster configuration.
1827
1828   """
1829   _OP_REQP = []
1830   REQ_MASTER = False
1831   REQ_BGL = False
1832
1833   def ExpandNames(self):
1834     self.needed_locks = {}
1835
1836   def CheckPrereq(self):
1837     """No prerequsites needed for this LU.
1838
1839     """
1840     pass
1841
1842   def Exec(self, feedback_fn):
1843     """Return cluster config.
1844
1845     """
1846     result = {
1847       "name": self.cfg.GetClusterName(),
1848       "software_version": constants.RELEASE_VERSION,
1849       "protocol_version": constants.PROTOCOL_VERSION,
1850       "config_version": constants.CONFIG_VERSION,
1851       "os_api_version": constants.OS_API_VERSION,
1852       "export_version": constants.EXPORT_VERSION,
1853       "master": self.cfg.GetMasterNode(),
1854       "architecture": (platform.architecture()[0], platform.machine()),
1855       "hypervisor_type": self.cfg.GetHypervisorType(),
1856       }
1857
1858     return result
1859
1860
1861 class LUQueryConfigValues(NoHooksLU):
1862   """Return configuration values.
1863
1864   """
1865   _OP_REQP = []
1866   REQ_BGL = False
1867
1868   def ExpandNames(self):
1869     self.needed_locks = {}
1870
1871     static_fields = ["cluster_name", "master_node"]
1872     _CheckOutputFields(static=static_fields,
1873                        dynamic=[],
1874                        selected=self.op.output_fields)
1875
1876   def CheckPrereq(self):
1877     """No prerequisites.
1878
1879     """
1880     pass
1881
1882   def Exec(self, feedback_fn):
1883     """Dump a representation of the cluster config to the standard output.
1884
1885     """
1886     values = []
1887     for field in self.op.output_fields:
1888       if field == "cluster_name":
1889         values.append(self.cfg.GetClusterName())
1890       elif field == "master_node":
1891         values.append(self.cfg.GetMasterNode())
1892       else:
1893         raise errors.ParameterError(field)
1894     return values
1895
1896
1897 class LUActivateInstanceDisks(NoHooksLU):
1898   """Bring up an instance's disks.
1899
1900   """
1901   _OP_REQP = ["instance_name"]
1902   REQ_BGL = False
1903
1904   def ExpandNames(self):
1905     self._ExpandAndLockInstance()
1906     self.needed_locks[locking.LEVEL_NODE] = []
1907     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1908
1909   def DeclareLocks(self, level):
1910     if level == locking.LEVEL_NODE:
1911       self._LockInstancesNodes()
1912
1913   def CheckPrereq(self):
1914     """Check prerequisites.
1915
1916     This checks that the instance is in the cluster.
1917
1918     """
1919     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1920     assert self.instance is not None, \
1921       "Cannot retrieve locked instance %s" % self.op.instance_name
1922
1923   def Exec(self, feedback_fn):
1924     """Activate the disks.
1925
1926     """
1927     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1928     if not disks_ok:
1929       raise errors.OpExecError("Cannot activate block devices")
1930
1931     return disks_info
1932
1933
1934 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1935   """Prepare the block devices for an instance.
1936
1937   This sets up the block devices on all nodes.
1938
1939   Args:
1940     instance: a ganeti.objects.Instance object
1941     ignore_secondaries: if true, errors on secondary nodes won't result
1942                         in an error return from the function
1943
1944   Returns:
1945     false if the operation failed
1946     list of (host, instance_visible_name, node_visible_name) if the operation
1947          suceeded with the mapping from node devices to instance devices
1948   """
1949   device_info = []
1950   disks_ok = True
1951   iname = instance.name
1952   # With the two passes mechanism we try to reduce the window of
1953   # opportunity for the race condition of switching DRBD to primary
1954   # before handshaking occured, but we do not eliminate it
1955
1956   # The proper fix would be to wait (with some limits) until the
1957   # connection has been made and drbd transitions from WFConnection
1958   # into any other network-connected state (Connected, SyncTarget,
1959   # SyncSource, etc.)
1960
1961   # 1st pass, assemble on all nodes in secondary mode
1962   for inst_disk in instance.disks:
1963     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1964       cfg.SetDiskID(node_disk, node)
1965       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1966       if not result:
1967         logger.Error("could not prepare block device %s on node %s"
1968                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1969         if not ignore_secondaries:
1970           disks_ok = False
1971
1972   # FIXME: race condition on drbd migration to primary
1973
1974   # 2nd pass, do only the primary node
1975   for inst_disk in instance.disks:
1976     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1977       if node != instance.primary_node:
1978         continue
1979       cfg.SetDiskID(node_disk, node)
1980       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1981       if not result:
1982         logger.Error("could not prepare block device %s on node %s"
1983                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1984         disks_ok = False
1985     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1986
1987   # leave the disks configured for the primary node
1988   # this is a workaround that would be fixed better by
1989   # improving the logical/physical id handling
1990   for disk in instance.disks:
1991     cfg.SetDiskID(disk, instance.primary_node)
1992
1993   return disks_ok, device_info
1994
1995
1996 def _StartInstanceDisks(cfg, instance, force):
1997   """Start the disks of an instance.
1998
1999   """
2000   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2001                                            ignore_secondaries=force)
2002   if not disks_ok:
2003     _ShutdownInstanceDisks(instance, cfg)
2004     if force is not None and not force:
2005       logger.Error("If the message above refers to a secondary node,"
2006                    " you can retry the operation using '--force'.")
2007     raise errors.OpExecError("Disk consistency error")
2008
2009
2010 class LUDeactivateInstanceDisks(NoHooksLU):
2011   """Shutdown an instance's disks.
2012
2013   """
2014   _OP_REQP = ["instance_name"]
2015   REQ_BGL = False
2016
2017   def ExpandNames(self):
2018     self._ExpandAndLockInstance()
2019     self.needed_locks[locking.LEVEL_NODE] = []
2020     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2021
2022   def DeclareLocks(self, level):
2023     if level == locking.LEVEL_NODE:
2024       self._LockInstancesNodes()
2025
2026   def CheckPrereq(self):
2027     """Check prerequisites.
2028
2029     This checks that the instance is in the cluster.
2030
2031     """
2032     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2033     assert self.instance is not None, \
2034       "Cannot retrieve locked instance %s" % self.op.instance_name
2035
2036   def Exec(self, feedback_fn):
2037     """Deactivate the disks
2038
2039     """
2040     instance = self.instance
2041     _SafeShutdownInstanceDisks(instance, self.cfg)
2042
2043
2044 def _SafeShutdownInstanceDisks(instance, cfg):
2045   """Shutdown block devices of an instance.
2046
2047   This function checks if an instance is running, before calling
2048   _ShutdownInstanceDisks.
2049
2050   """
2051   ins_l = rpc.call_instance_list([instance.primary_node])
2052   ins_l = ins_l[instance.primary_node]
2053   if not type(ins_l) is list:
2054     raise errors.OpExecError("Can't contact node '%s'" %
2055                              instance.primary_node)
2056
2057   if instance.name in ins_l:
2058     raise errors.OpExecError("Instance is running, can't shutdown"
2059                              " block devices.")
2060
2061   _ShutdownInstanceDisks(instance, cfg)
2062
2063
2064 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2065   """Shutdown block devices of an instance.
2066
2067   This does the shutdown on all nodes of the instance.
2068
2069   If the ignore_primary is false, errors on the primary node are
2070   ignored.
2071
2072   """
2073   result = True
2074   for disk in instance.disks:
2075     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2076       cfg.SetDiskID(top_disk, node)
2077       if not rpc.call_blockdev_shutdown(node, top_disk):
2078         logger.Error("could not shutdown block device %s on node %s" %
2079                      (disk.iv_name, node))
2080         if not ignore_primary or node != instance.primary_node:
2081           result = False
2082   return result
2083
2084
2085 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2086   """Checks if a node has enough free memory.
2087
2088   This function check if a given node has the needed amount of free
2089   memory. In case the node has less memory or we cannot get the
2090   information from the node, this function raise an OpPrereqError
2091   exception.
2092
2093   Args:
2094     - cfg: a ConfigWriter instance
2095     - node: the node name
2096     - reason: string to use in the error message
2097     - requested: the amount of memory in MiB
2098
2099   """
2100   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2101   if not nodeinfo or not isinstance(nodeinfo, dict):
2102     raise errors.OpPrereqError("Could not contact node %s for resource"
2103                              " information" % (node,))
2104
2105   free_mem = nodeinfo[node].get('memory_free')
2106   if not isinstance(free_mem, int):
2107     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2108                              " was '%s'" % (node, free_mem))
2109   if requested > free_mem:
2110     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2111                              " needed %s MiB, available %s MiB" %
2112                              (node, reason, requested, free_mem))
2113
2114
2115 class LUStartupInstance(LogicalUnit):
2116   """Starts an instance.
2117
2118   """
2119   HPATH = "instance-start"
2120   HTYPE = constants.HTYPE_INSTANCE
2121   _OP_REQP = ["instance_name", "force"]
2122   REQ_BGL = False
2123
2124   def ExpandNames(self):
2125     self._ExpandAndLockInstance()
2126     self.needed_locks[locking.LEVEL_NODE] = []
2127     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2128
2129   def DeclareLocks(self, level):
2130     if level == locking.LEVEL_NODE:
2131       self._LockInstancesNodes()
2132
2133   def BuildHooksEnv(self):
2134     """Build hooks env.
2135
2136     This runs on master, primary and secondary nodes of the instance.
2137
2138     """
2139     env = {
2140       "FORCE": self.op.force,
2141       }
2142     env.update(_BuildInstanceHookEnvByObject(self.instance))
2143     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2144           list(self.instance.secondary_nodes))
2145     return env, nl, nl
2146
2147   def CheckPrereq(self):
2148     """Check prerequisites.
2149
2150     This checks that the instance is in the cluster.
2151
2152     """
2153     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2154     assert self.instance is not None, \
2155       "Cannot retrieve locked instance %s" % self.op.instance_name
2156
2157     # check bridges existance
2158     _CheckInstanceBridgesExist(instance)
2159
2160     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2161                          "starting instance %s" % instance.name,
2162                          instance.memory)
2163
2164   def Exec(self, feedback_fn):
2165     """Start the instance.
2166
2167     """
2168     instance = self.instance
2169     force = self.op.force
2170     extra_args = getattr(self.op, "extra_args", "")
2171
2172     self.cfg.MarkInstanceUp(instance.name)
2173
2174     node_current = instance.primary_node
2175
2176     _StartInstanceDisks(self.cfg, instance, force)
2177
2178     if not rpc.call_instance_start(node_current, instance, extra_args):
2179       _ShutdownInstanceDisks(instance, self.cfg)
2180       raise errors.OpExecError("Could not start instance")
2181
2182
2183 class LURebootInstance(LogicalUnit):
2184   """Reboot an instance.
2185
2186   """
2187   HPATH = "instance-reboot"
2188   HTYPE = constants.HTYPE_INSTANCE
2189   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2190   REQ_BGL = False
2191
2192   def ExpandNames(self):
2193     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2194                                    constants.INSTANCE_REBOOT_HARD,
2195                                    constants.INSTANCE_REBOOT_FULL]:
2196       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2197                                   (constants.INSTANCE_REBOOT_SOFT,
2198                                    constants.INSTANCE_REBOOT_HARD,
2199                                    constants.INSTANCE_REBOOT_FULL))
2200     self._ExpandAndLockInstance()
2201     self.needed_locks[locking.LEVEL_NODE] = []
2202     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2203
2204   def DeclareLocks(self, level):
2205     if level == locking.LEVEL_NODE:
2206       primary_only = not constants.INSTANCE_REBOOT_FULL
2207       self._LockInstancesNodes(primary_only=primary_only)
2208
2209   def BuildHooksEnv(self):
2210     """Build hooks env.
2211
2212     This runs on master, primary and secondary nodes of the instance.
2213
2214     """
2215     env = {
2216       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2217       }
2218     env.update(_BuildInstanceHookEnvByObject(self.instance))
2219     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2220           list(self.instance.secondary_nodes))
2221     return env, nl, nl
2222
2223   def CheckPrereq(self):
2224     """Check prerequisites.
2225
2226     This checks that the instance is in the cluster.
2227
2228     """
2229     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2230     assert self.instance is not None, \
2231       "Cannot retrieve locked instance %s" % self.op.instance_name
2232
2233     # check bridges existance
2234     _CheckInstanceBridgesExist(instance)
2235
2236   def Exec(self, feedback_fn):
2237     """Reboot the instance.
2238
2239     """
2240     instance = self.instance
2241     ignore_secondaries = self.op.ignore_secondaries
2242     reboot_type = self.op.reboot_type
2243     extra_args = getattr(self.op, "extra_args", "")
2244
2245     node_current = instance.primary_node
2246
2247     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2248                        constants.INSTANCE_REBOOT_HARD]:
2249       if not rpc.call_instance_reboot(node_current, instance,
2250                                       reboot_type, extra_args):
2251         raise errors.OpExecError("Could not reboot instance")
2252     else:
2253       if not rpc.call_instance_shutdown(node_current, instance):
2254         raise errors.OpExecError("could not shutdown instance for full reboot")
2255       _ShutdownInstanceDisks(instance, self.cfg)
2256       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2257       if not rpc.call_instance_start(node_current, instance, extra_args):
2258         _ShutdownInstanceDisks(instance, self.cfg)
2259         raise errors.OpExecError("Could not start instance for full reboot")
2260
2261     self.cfg.MarkInstanceUp(instance.name)
2262
2263
2264 class LUShutdownInstance(LogicalUnit):
2265   """Shutdown an instance.
2266
2267   """
2268   HPATH = "instance-stop"
2269   HTYPE = constants.HTYPE_INSTANCE
2270   _OP_REQP = ["instance_name"]
2271   REQ_BGL = False
2272
2273   def ExpandNames(self):
2274     self._ExpandAndLockInstance()
2275     self.needed_locks[locking.LEVEL_NODE] = []
2276     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2277
2278   def DeclareLocks(self, level):
2279     if level == locking.LEVEL_NODE:
2280       self._LockInstancesNodes()
2281
2282   def BuildHooksEnv(self):
2283     """Build hooks env.
2284
2285     This runs on master, primary and secondary nodes of the instance.
2286
2287     """
2288     env = _BuildInstanceHookEnvByObject(self.instance)
2289     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2290           list(self.instance.secondary_nodes))
2291     return env, nl, nl
2292
2293   def CheckPrereq(self):
2294     """Check prerequisites.
2295
2296     This checks that the instance is in the cluster.
2297
2298     """
2299     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2300     assert self.instance is not None, \
2301       "Cannot retrieve locked instance %s" % self.op.instance_name
2302
2303   def Exec(self, feedback_fn):
2304     """Shutdown the instance.
2305
2306     """
2307     instance = self.instance
2308     node_current = instance.primary_node
2309     self.cfg.MarkInstanceDown(instance.name)
2310     if not rpc.call_instance_shutdown(node_current, instance):
2311       logger.Error("could not shutdown instance")
2312
2313     _ShutdownInstanceDisks(instance, self.cfg)
2314
2315
2316 class LUReinstallInstance(LogicalUnit):
2317   """Reinstall an instance.
2318
2319   """
2320   HPATH = "instance-reinstall"
2321   HTYPE = constants.HTYPE_INSTANCE
2322   _OP_REQP = ["instance_name"]
2323   REQ_BGL = False
2324
2325   def ExpandNames(self):
2326     self._ExpandAndLockInstance()
2327     self.needed_locks[locking.LEVEL_NODE] = []
2328     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2329
2330   def DeclareLocks(self, level):
2331     if level == locking.LEVEL_NODE:
2332       self._LockInstancesNodes()
2333
2334   def BuildHooksEnv(self):
2335     """Build hooks env.
2336
2337     This runs on master, primary and secondary nodes of the instance.
2338
2339     """
2340     env = _BuildInstanceHookEnvByObject(self.instance)
2341     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2342           list(self.instance.secondary_nodes))
2343     return env, nl, nl
2344
2345   def CheckPrereq(self):
2346     """Check prerequisites.
2347
2348     This checks that the instance is in the cluster and is not running.
2349
2350     """
2351     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2352     assert instance is not None, \
2353       "Cannot retrieve locked instance %s" % self.op.instance_name
2354
2355     if instance.disk_template == constants.DT_DISKLESS:
2356       raise errors.OpPrereqError("Instance '%s' has no disks" %
2357                                  self.op.instance_name)
2358     if instance.status != "down":
2359       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2360                                  self.op.instance_name)
2361     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2362     if remote_info:
2363       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2364                                  (self.op.instance_name,
2365                                   instance.primary_node))
2366
2367     self.op.os_type = getattr(self.op, "os_type", None)
2368     if self.op.os_type is not None:
2369       # OS verification
2370       pnode = self.cfg.GetNodeInfo(
2371         self.cfg.ExpandNodeName(instance.primary_node))
2372       if pnode is None:
2373         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2374                                    self.op.pnode)
2375       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2376       if not os_obj:
2377         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2378                                    " primary node"  % self.op.os_type)
2379
2380     self.instance = instance
2381
2382   def Exec(self, feedback_fn):
2383     """Reinstall the instance.
2384
2385     """
2386     inst = self.instance
2387
2388     if self.op.os_type is not None:
2389       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2390       inst.os = self.op.os_type
2391       self.cfg.Update(inst)
2392
2393     _StartInstanceDisks(self.cfg, inst, None)
2394     try:
2395       feedback_fn("Running the instance OS create scripts...")
2396       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2397         raise errors.OpExecError("Could not install OS for instance %s"
2398                                  " on node %s" %
2399                                  (inst.name, inst.primary_node))
2400     finally:
2401       _ShutdownInstanceDisks(inst, self.cfg)
2402
2403
2404 class LURenameInstance(LogicalUnit):
2405   """Rename an instance.
2406
2407   """
2408   HPATH = "instance-rename"
2409   HTYPE = constants.HTYPE_INSTANCE
2410   _OP_REQP = ["instance_name", "new_name"]
2411
2412   def BuildHooksEnv(self):
2413     """Build hooks env.
2414
2415     This runs on master, primary and secondary nodes of the instance.
2416
2417     """
2418     env = _BuildInstanceHookEnvByObject(self.instance)
2419     env["INSTANCE_NEW_NAME"] = self.op.new_name
2420     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2421           list(self.instance.secondary_nodes))
2422     return env, nl, nl
2423
2424   def CheckPrereq(self):
2425     """Check prerequisites.
2426
2427     This checks that the instance is in the cluster and is not running.
2428
2429     """
2430     instance = self.cfg.GetInstanceInfo(
2431       self.cfg.ExpandInstanceName(self.op.instance_name))
2432     if instance is None:
2433       raise errors.OpPrereqError("Instance '%s' not known" %
2434                                  self.op.instance_name)
2435     if instance.status != "down":
2436       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2437                                  self.op.instance_name)
2438     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2439     if remote_info:
2440       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2441                                  (self.op.instance_name,
2442                                   instance.primary_node))
2443     self.instance = instance
2444
2445     # new name verification
2446     name_info = utils.HostInfo(self.op.new_name)
2447
2448     self.op.new_name = new_name = name_info.name
2449     instance_list = self.cfg.GetInstanceList()
2450     if new_name in instance_list:
2451       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2452                                  new_name)
2453
2454     if not getattr(self.op, "ignore_ip", False):
2455       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2456         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2457                                    (name_info.ip, new_name))
2458
2459
2460   def Exec(self, feedback_fn):
2461     """Reinstall the instance.
2462
2463     """
2464     inst = self.instance
2465     old_name = inst.name
2466
2467     if inst.disk_template == constants.DT_FILE:
2468       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2469
2470     self.cfg.RenameInstance(inst.name, self.op.new_name)
2471     # Change the instance lock. This is definitely safe while we hold the BGL
2472     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2473     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2474
2475     # re-read the instance from the configuration after rename
2476     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2477
2478     if inst.disk_template == constants.DT_FILE:
2479       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2480       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2481                                                 old_file_storage_dir,
2482                                                 new_file_storage_dir)
2483
2484       if not result:
2485         raise errors.OpExecError("Could not connect to node '%s' to rename"
2486                                  " directory '%s' to '%s' (but the instance"
2487                                  " has been renamed in Ganeti)" % (
2488                                  inst.primary_node, old_file_storage_dir,
2489                                  new_file_storage_dir))
2490
2491       if not result[0]:
2492         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2493                                  " (but the instance has been renamed in"
2494                                  " Ganeti)" % (old_file_storage_dir,
2495                                                new_file_storage_dir))
2496
2497     _StartInstanceDisks(self.cfg, inst, None)
2498     try:
2499       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2500                                           "sda", "sdb"):
2501         msg = ("Could not run OS rename script for instance %s on node %s"
2502                " (but the instance has been renamed in Ganeti)" %
2503                (inst.name, inst.primary_node))
2504         logger.Error(msg)
2505     finally:
2506       _ShutdownInstanceDisks(inst, self.cfg)
2507
2508
2509 class LURemoveInstance(LogicalUnit):
2510   """Remove an instance.
2511
2512   """
2513   HPATH = "instance-remove"
2514   HTYPE = constants.HTYPE_INSTANCE
2515   _OP_REQP = ["instance_name", "ignore_failures"]
2516   REQ_BGL = False
2517
2518   def ExpandNames(self):
2519     self._ExpandAndLockInstance()
2520     self.needed_locks[locking.LEVEL_NODE] = []
2521     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2522
2523   def DeclareLocks(self, level):
2524     if level == locking.LEVEL_NODE:
2525       self._LockInstancesNodes()
2526
2527   def BuildHooksEnv(self):
2528     """Build hooks env.
2529
2530     This runs on master, primary and secondary nodes of the instance.
2531
2532     """
2533     env = _BuildInstanceHookEnvByObject(self.instance)
2534     nl = [self.cfg.GetMasterNode()]
2535     return env, nl, nl
2536
2537   def CheckPrereq(self):
2538     """Check prerequisites.
2539
2540     This checks that the instance is in the cluster.
2541
2542     """
2543     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2544     assert self.instance is not None, \
2545       "Cannot retrieve locked instance %s" % self.op.instance_name
2546
2547   def Exec(self, feedback_fn):
2548     """Remove the instance.
2549
2550     """
2551     instance = self.instance
2552     logger.Info("shutting down instance %s on node %s" %
2553                 (instance.name, instance.primary_node))
2554
2555     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2556       if self.op.ignore_failures:
2557         feedback_fn("Warning: can't shutdown instance")
2558       else:
2559         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2560                                  (instance.name, instance.primary_node))
2561
2562     logger.Info("removing block devices for instance %s" % instance.name)
2563
2564     if not _RemoveDisks(instance, self.cfg):
2565       if self.op.ignore_failures:
2566         feedback_fn("Warning: can't remove instance's disks")
2567       else:
2568         raise errors.OpExecError("Can't remove instance's disks")
2569
2570     logger.Info("removing instance %s out of cluster config" % instance.name)
2571
2572     self.cfg.RemoveInstance(instance.name)
2573     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2574
2575
2576 class LUQueryInstances(NoHooksLU):
2577   """Logical unit for querying instances.
2578
2579   """
2580   _OP_REQP = ["output_fields", "names"]
2581   REQ_BGL = False
2582
2583   def ExpandNames(self):
2584     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2585     self.static_fields = frozenset([
2586       "name", "os", "pnode", "snodes",
2587       "admin_state", "admin_ram",
2588       "disk_template", "ip", "mac", "bridge",
2589       "sda_size", "sdb_size", "vcpus", "tags",
2590       "network_port", "kernel_path", "initrd_path",
2591       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2592       "hvm_cdrom_image_path", "hvm_nic_type",
2593       "hvm_disk_type", "vnc_bind_address",
2594       "serial_no",
2595       ])
2596     _CheckOutputFields(static=self.static_fields,
2597                        dynamic=self.dynamic_fields,
2598                        selected=self.op.output_fields)
2599
2600     self.needed_locks = {}
2601     self.share_locks[locking.LEVEL_INSTANCE] = 1
2602     self.share_locks[locking.LEVEL_NODE] = 1
2603
2604     if self.op.names:
2605       self.wanted = _GetWantedInstances(self, self.op.names)
2606     else:
2607       self.wanted = locking.ALL_SET
2608
2609     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2610     if self.do_locking:
2611       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2612       self.needed_locks[locking.LEVEL_NODE] = []
2613       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2614
2615   def DeclareLocks(self, level):
2616     if level == locking.LEVEL_NODE and self.do_locking:
2617       self._LockInstancesNodes()
2618
2619   def CheckPrereq(self):
2620     """Check prerequisites.
2621
2622     """
2623     pass
2624
2625   def Exec(self, feedback_fn):
2626     """Computes the list of nodes and their attributes.
2627
2628     """
2629     all_info = self.cfg.GetAllInstancesInfo()
2630     if self.do_locking:
2631       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2632     elif self.wanted != locking.ALL_SET:
2633       instance_names = self.wanted
2634       missing = set(instance_names).difference(all_info.keys())
2635       if missing:
2636         raise self.OpExecError(
2637           "Some instances were removed before retrieving their data: %s"
2638           % missing)
2639     else:
2640       instance_names = all_info.keys()
2641     instance_list = [all_info[iname] for iname in instance_names]
2642
2643     # begin data gathering
2644
2645     nodes = frozenset([inst.primary_node for inst in instance_list])
2646
2647     bad_nodes = []
2648     if self.dynamic_fields.intersection(self.op.output_fields):
2649       live_data = {}
2650       node_data = rpc.call_all_instances_info(nodes)
2651       for name in nodes:
2652         result = node_data[name]
2653         if result:
2654           live_data.update(result)
2655         elif result == False:
2656           bad_nodes.append(name)
2657         # else no instance is alive
2658     else:
2659       live_data = dict([(name, {}) for name in instance_names])
2660
2661     # end data gathering
2662
2663     output = []
2664     for instance in instance_list:
2665       iout = []
2666       for field in self.op.output_fields:
2667         if field == "name":
2668           val = instance.name
2669         elif field == "os":
2670           val = instance.os
2671         elif field == "pnode":
2672           val = instance.primary_node
2673         elif field == "snodes":
2674           val = list(instance.secondary_nodes)
2675         elif field == "admin_state":
2676           val = (instance.status != "down")
2677         elif field == "oper_state":
2678           if instance.primary_node in bad_nodes:
2679             val = None
2680           else:
2681             val = bool(live_data.get(instance.name))
2682         elif field == "status":
2683           if instance.primary_node in bad_nodes:
2684             val = "ERROR_nodedown"
2685           else:
2686             running = bool(live_data.get(instance.name))
2687             if running:
2688               if instance.status != "down":
2689                 val = "running"
2690               else:
2691                 val = "ERROR_up"
2692             else:
2693               if instance.status != "down":
2694                 val = "ERROR_down"
2695               else:
2696                 val = "ADMIN_down"
2697         elif field == "admin_ram":
2698           val = instance.memory
2699         elif field == "oper_ram":
2700           if instance.primary_node in bad_nodes:
2701             val = None
2702           elif instance.name in live_data:
2703             val = live_data[instance.name].get("memory", "?")
2704           else:
2705             val = "-"
2706         elif field == "disk_template":
2707           val = instance.disk_template
2708         elif field == "ip":
2709           val = instance.nics[0].ip
2710         elif field == "bridge":
2711           val = instance.nics[0].bridge
2712         elif field == "mac":
2713           val = instance.nics[0].mac
2714         elif field == "sda_size" or field == "sdb_size":
2715           disk = instance.FindDisk(field[:3])
2716           if disk is None:
2717             val = None
2718           else:
2719             val = disk.size
2720         elif field == "vcpus":
2721           val = instance.vcpus
2722         elif field == "tags":
2723           val = list(instance.GetTags())
2724         elif field == "serial_no":
2725           val = instance.serial_no
2726         elif field in ("network_port", "kernel_path", "initrd_path",
2727                        "hvm_boot_order", "hvm_acpi", "hvm_pae",
2728                        "hvm_cdrom_image_path", "hvm_nic_type",
2729                        "hvm_disk_type", "vnc_bind_address"):
2730           val = getattr(instance, field, None)
2731           if val is not None:
2732             pass
2733           elif field in ("hvm_nic_type", "hvm_disk_type",
2734                          "kernel_path", "initrd_path"):
2735             val = "default"
2736           else:
2737             val = "-"
2738         else:
2739           raise errors.ParameterError(field)
2740         iout.append(val)
2741       output.append(iout)
2742
2743     return output
2744
2745
2746 class LUFailoverInstance(LogicalUnit):
2747   """Failover an instance.
2748
2749   """
2750   HPATH = "instance-failover"
2751   HTYPE = constants.HTYPE_INSTANCE
2752   _OP_REQP = ["instance_name", "ignore_consistency"]
2753   REQ_BGL = False
2754
2755   def ExpandNames(self):
2756     self._ExpandAndLockInstance()
2757     self.needed_locks[locking.LEVEL_NODE] = []
2758     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2759
2760   def DeclareLocks(self, level):
2761     if level == locking.LEVEL_NODE:
2762       self._LockInstancesNodes()
2763
2764   def BuildHooksEnv(self):
2765     """Build hooks env.
2766
2767     This runs on master, primary and secondary nodes of the instance.
2768
2769     """
2770     env = {
2771       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2772       }
2773     env.update(_BuildInstanceHookEnvByObject(self.instance))
2774     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2775     return env, nl, nl
2776
2777   def CheckPrereq(self):
2778     """Check prerequisites.
2779
2780     This checks that the instance is in the cluster.
2781
2782     """
2783     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2784     assert self.instance is not None, \
2785       "Cannot retrieve locked instance %s" % self.op.instance_name
2786
2787     if instance.disk_template not in constants.DTS_NET_MIRROR:
2788       raise errors.OpPrereqError("Instance's disk layout is not"
2789                                  " network mirrored, cannot failover.")
2790
2791     secondary_nodes = instance.secondary_nodes
2792     if not secondary_nodes:
2793       raise errors.ProgrammerError("no secondary node but using "
2794                                    "a mirrored disk template")
2795
2796     target_node = secondary_nodes[0]
2797     # check memory requirements on the secondary node
2798     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2799                          instance.name, instance.memory)
2800
2801     # check bridge existance
2802     brlist = [nic.bridge for nic in instance.nics]
2803     if not rpc.call_bridges_exist(target_node, brlist):
2804       raise errors.OpPrereqError("One or more target bridges %s does not"
2805                                  " exist on destination node '%s'" %
2806                                  (brlist, target_node))
2807
2808   def Exec(self, feedback_fn):
2809     """Failover an instance.
2810
2811     The failover is done by shutting it down on its present node and
2812     starting it on the secondary.
2813
2814     """
2815     instance = self.instance
2816
2817     source_node = instance.primary_node
2818     target_node = instance.secondary_nodes[0]
2819
2820     feedback_fn("* checking disk consistency between source and target")
2821     for dev in instance.disks:
2822       # for drbd, these are drbd over lvm
2823       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2824         if instance.status == "up" and not self.op.ignore_consistency:
2825           raise errors.OpExecError("Disk %s is degraded on target node,"
2826                                    " aborting failover." % dev.iv_name)
2827
2828     feedback_fn("* shutting down instance on source node")
2829     logger.Info("Shutting down instance %s on node %s" %
2830                 (instance.name, source_node))
2831
2832     if not rpc.call_instance_shutdown(source_node, instance):
2833       if self.op.ignore_consistency:
2834         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2835                      " anyway. Please make sure node %s is down"  %
2836                      (instance.name, source_node, source_node))
2837       else:
2838         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2839                                  (instance.name, source_node))
2840
2841     feedback_fn("* deactivating the instance's disks on source node")
2842     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2843       raise errors.OpExecError("Can't shut down the instance's disks.")
2844
2845     instance.primary_node = target_node
2846     # distribute new instance config to the other nodes
2847     self.cfg.Update(instance)
2848
2849     # Only start the instance if it's marked as up
2850     if instance.status == "up":
2851       feedback_fn("* activating the instance's disks on target node")
2852       logger.Info("Starting instance %s on node %s" %
2853                   (instance.name, target_node))
2854
2855       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2856                                                ignore_secondaries=True)
2857       if not disks_ok:
2858         _ShutdownInstanceDisks(instance, self.cfg)
2859         raise errors.OpExecError("Can't activate the instance's disks")
2860
2861       feedback_fn("* starting the instance on the target node")
2862       if not rpc.call_instance_start(target_node, instance, None):
2863         _ShutdownInstanceDisks(instance, self.cfg)
2864         raise errors.OpExecError("Could not start instance %s on node %s." %
2865                                  (instance.name, target_node))
2866
2867
2868 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2869   """Create a tree of block devices on the primary node.
2870
2871   This always creates all devices.
2872
2873   """
2874   if device.children:
2875     for child in device.children:
2876       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2877         return False
2878
2879   cfg.SetDiskID(device, node)
2880   new_id = rpc.call_blockdev_create(node, device, device.size,
2881                                     instance.name, True, info)
2882   if not new_id:
2883     return False
2884   if device.physical_id is None:
2885     device.physical_id = new_id
2886   return True
2887
2888
2889 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2890   """Create a tree of block devices on a secondary node.
2891
2892   If this device type has to be created on secondaries, create it and
2893   all its children.
2894
2895   If not, just recurse to children keeping the same 'force' value.
2896
2897   """
2898   if device.CreateOnSecondary():
2899     force = True
2900   if device.children:
2901     for child in device.children:
2902       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2903                                         child, force, info):
2904         return False
2905
2906   if not force:
2907     return True
2908   cfg.SetDiskID(device, node)
2909   new_id = rpc.call_blockdev_create(node, device, device.size,
2910                                     instance.name, False, info)
2911   if not new_id:
2912     return False
2913   if device.physical_id is None:
2914     device.physical_id = new_id
2915   return True
2916
2917
2918 def _GenerateUniqueNames(cfg, exts):
2919   """Generate a suitable LV name.
2920
2921   This will generate a logical volume name for the given instance.
2922
2923   """
2924   results = []
2925   for val in exts:
2926     new_id = cfg.GenerateUniqueID()
2927     results.append("%s%s" % (new_id, val))
2928   return results
2929
2930
2931 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name,
2932                          p_minor, s_minor):
2933   """Generate a drbd8 device complete with its children.
2934
2935   """
2936   port = cfg.AllocatePort()
2937   vgname = cfg.GetVGName()
2938   shared_secret = cfg.GenerateDRBDSecret()
2939   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2940                           logical_id=(vgname, names[0]))
2941   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2942                           logical_id=(vgname, names[1]))
2943   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2944                           logical_id=(primary, secondary, port,
2945                                       p_minor, s_minor,
2946                                       shared_secret),
2947                           children=[dev_data, dev_meta],
2948                           iv_name=iv_name)
2949   return drbd_dev
2950
2951
2952 def _GenerateDiskTemplate(cfg, template_name,
2953                           instance_name, primary_node,
2954                           secondary_nodes, disk_sz, swap_sz,
2955                           file_storage_dir, file_driver):
2956   """Generate the entire disk layout for a given template type.
2957
2958   """
2959   #TODO: compute space requirements
2960
2961   vgname = cfg.GetVGName()
2962   if template_name == constants.DT_DISKLESS:
2963     disks = []
2964   elif template_name == constants.DT_PLAIN:
2965     if len(secondary_nodes) != 0:
2966       raise errors.ProgrammerError("Wrong template configuration")
2967
2968     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2969     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2970                            logical_id=(vgname, names[0]),
2971                            iv_name = "sda")
2972     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2973                            logical_id=(vgname, names[1]),
2974                            iv_name = "sdb")
2975     disks = [sda_dev, sdb_dev]
2976   elif template_name == constants.DT_DRBD8:
2977     if len(secondary_nodes) != 1:
2978       raise errors.ProgrammerError("Wrong template configuration")
2979     remote_node = secondary_nodes[0]
2980     (minor_pa, minor_pb,
2981      minor_sa, minor_sb) = cfg.AllocateDRBDMinor(
2982       [primary_node, primary_node, remote_node, remote_node], instance_name)
2983
2984     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2985                                        ".sdb_data", ".sdb_meta"])
2986     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2987                                         disk_sz, names[0:2], "sda",
2988                                         minor_pa, minor_sa)
2989     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2990                                         swap_sz, names[2:4], "sdb",
2991                                         minor_pb, minor_sb)
2992     disks = [drbd_sda_dev, drbd_sdb_dev]
2993   elif template_name == constants.DT_FILE:
2994     if len(secondary_nodes) != 0:
2995       raise errors.ProgrammerError("Wrong template configuration")
2996
2997     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2998                                 iv_name="sda", logical_id=(file_driver,
2999                                 "%s/sda" % file_storage_dir))
3000     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3001                                 iv_name="sdb", logical_id=(file_driver,
3002                                 "%s/sdb" % file_storage_dir))
3003     disks = [file_sda_dev, file_sdb_dev]
3004   else:
3005     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3006   return disks
3007
3008
3009 def _GetInstanceInfoText(instance):
3010   """Compute that text that should be added to the disk's metadata.
3011
3012   """
3013   return "originstname+%s" % instance.name
3014
3015
3016 def _CreateDisks(cfg, instance):
3017   """Create all disks for an instance.
3018
3019   This abstracts away some work from AddInstance.
3020
3021   Args:
3022     instance: the instance object
3023
3024   Returns:
3025     True or False showing the success of the creation process
3026
3027   """
3028   info = _GetInstanceInfoText(instance)
3029
3030   if instance.disk_template == constants.DT_FILE:
3031     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3032     result = rpc.call_file_storage_dir_create(instance.primary_node,
3033                                               file_storage_dir)
3034
3035     if not result:
3036       logger.Error("Could not connect to node '%s'" % instance.primary_node)
3037       return False
3038
3039     if not result[0]:
3040       logger.Error("failed to create directory '%s'" % file_storage_dir)
3041       return False
3042
3043   for device in instance.disks:
3044     logger.Info("creating volume %s for instance %s" %
3045                 (device.iv_name, instance.name))
3046     #HARDCODE
3047     for secondary_node in instance.secondary_nodes:
3048       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3049                                         device, False, info):
3050         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3051                      (device.iv_name, device, secondary_node))
3052         return False
3053     #HARDCODE
3054     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3055                                     instance, device, info):
3056       logger.Error("failed to create volume %s on primary!" %
3057                    device.iv_name)
3058       return False
3059
3060   return True
3061
3062
3063 def _RemoveDisks(instance, cfg):
3064   """Remove all disks for an instance.
3065
3066   This abstracts away some work from `AddInstance()` and
3067   `RemoveInstance()`. Note that in case some of the devices couldn't
3068   be removed, the removal will continue with the other ones (compare
3069   with `_CreateDisks()`).
3070
3071   Args:
3072     instance: the instance object
3073
3074   Returns:
3075     True or False showing the success of the removal proces
3076
3077   """
3078   logger.Info("removing block devices for instance %s" % instance.name)
3079
3080   result = True
3081   for device in instance.disks:
3082     for node, disk in device.ComputeNodeTree(instance.primary_node):
3083       cfg.SetDiskID(disk, node)
3084       if not rpc.call_blockdev_remove(node, disk):
3085         logger.Error("could not remove block device %s on node %s,"
3086                      " continuing anyway" %
3087                      (device.iv_name, node))
3088         result = False
3089
3090   if instance.disk_template == constants.DT_FILE:
3091     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3092     if not rpc.call_file_storage_dir_remove(instance.primary_node,
3093                                             file_storage_dir):
3094       logger.Error("could not remove directory '%s'" % file_storage_dir)
3095       result = False
3096
3097   return result
3098
3099
3100 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3101   """Compute disk size requirements in the volume group
3102
3103   This is currently hard-coded for the two-drive layout.
3104
3105   """
3106   # Required free disk space as a function of disk and swap space
3107   req_size_dict = {
3108     constants.DT_DISKLESS: None,
3109     constants.DT_PLAIN: disk_size + swap_size,
3110     # 256 MB are added for drbd metadata, 128MB for each drbd device
3111     constants.DT_DRBD8: disk_size + swap_size + 256,
3112     constants.DT_FILE: None,
3113   }
3114
3115   if disk_template not in req_size_dict:
3116     raise errors.ProgrammerError("Disk template '%s' size requirement"
3117                                  " is unknown" %  disk_template)
3118
3119   return req_size_dict[disk_template]
3120
3121
3122 class LUCreateInstance(LogicalUnit):
3123   """Create an instance.
3124
3125   """
3126   HPATH = "instance-add"
3127   HTYPE = constants.HTYPE_INSTANCE
3128   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3129               "disk_template", "swap_size", "mode", "start", "vcpus",
3130               "wait_for_sync", "ip_check", "mac"]
3131   REQ_BGL = False
3132
3133   def _ExpandNode(self, node):
3134     """Expands and checks one node name.
3135
3136     """
3137     node_full = self.cfg.ExpandNodeName(node)
3138     if node_full is None:
3139       raise errors.OpPrereqError("Unknown node %s" % node)
3140     return node_full
3141
3142   def ExpandNames(self):
3143     """ExpandNames for CreateInstance.
3144
3145     Figure out the right locks for instance creation.
3146
3147     """
3148     self.needed_locks = {}
3149
3150     # set optional parameters to none if they don't exist
3151     for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3152                  "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3153                  "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3154                  "vnc_bind_address"]:
3155       if not hasattr(self.op, attr):
3156         setattr(self.op, attr, None)
3157
3158     # verify creation mode
3159     if self.op.mode not in (constants.INSTANCE_CREATE,
3160                             constants.INSTANCE_IMPORT):
3161       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3162                                  self.op.mode)
3163     # disk template and mirror node verification
3164     if self.op.disk_template not in constants.DISK_TEMPLATES:
3165       raise errors.OpPrereqError("Invalid disk template name")
3166
3167     #### instance parameters check
3168
3169     # instance name verification
3170     hostname1 = utils.HostInfo(self.op.instance_name)
3171     self.op.instance_name = instance_name = hostname1.name
3172
3173     # this is just a preventive check, but someone might still add this
3174     # instance in the meantime, and creation will fail at lock-add time
3175     if instance_name in self.cfg.GetInstanceList():
3176       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3177                                  instance_name)
3178
3179     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3180
3181     # ip validity checks
3182     ip = getattr(self.op, "ip", None)
3183     if ip is None or ip.lower() == "none":
3184       inst_ip = None
3185     elif ip.lower() == "auto":
3186       inst_ip = hostname1.ip
3187     else:
3188       if not utils.IsValidIP(ip):
3189         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3190                                    " like a valid IP" % ip)
3191       inst_ip = ip
3192     self.inst_ip = self.op.ip = inst_ip
3193     # used in CheckPrereq for ip ping check
3194     self.check_ip = hostname1.ip
3195
3196     # MAC address verification
3197     if self.op.mac != "auto":
3198       if not utils.IsValidMac(self.op.mac.lower()):
3199         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3200                                    self.op.mac)
3201
3202     # boot order verification
3203     if self.op.hvm_boot_order is not None:
3204       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3205         raise errors.OpPrereqError("invalid boot order specified,"
3206                                    " must be one or more of [acdn]")
3207     # file storage checks
3208     if (self.op.file_driver and
3209         not self.op.file_driver in constants.FILE_DRIVER):
3210       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3211                                  self.op.file_driver)
3212
3213     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3214       raise errors.OpPrereqError("File storage directory path not absolute")
3215
3216     ### Node/iallocator related checks
3217     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3218       raise errors.OpPrereqError("One and only one of iallocator and primary"
3219                                  " node must be given")
3220
3221     if self.op.iallocator:
3222       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3223     else:
3224       self.op.pnode = self._ExpandNode(self.op.pnode)
3225       nodelist = [self.op.pnode]
3226       if self.op.snode is not None:
3227         self.op.snode = self._ExpandNode(self.op.snode)
3228         nodelist.append(self.op.snode)
3229       self.needed_locks[locking.LEVEL_NODE] = nodelist
3230
3231     # in case of import lock the source node too
3232     if self.op.mode == constants.INSTANCE_IMPORT:
3233       src_node = getattr(self.op, "src_node", None)
3234       src_path = getattr(self.op, "src_path", None)
3235
3236       if src_node is None or src_path is None:
3237         raise errors.OpPrereqError("Importing an instance requires source"
3238                                    " node and path options")
3239
3240       if not os.path.isabs(src_path):
3241         raise errors.OpPrereqError("The source path must be absolute")
3242
3243       self.op.src_node = src_node = self._ExpandNode(src_node)
3244       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3245         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3246
3247     else: # INSTANCE_CREATE
3248       if getattr(self.op, "os_type", None) is None:
3249         raise errors.OpPrereqError("No guest OS specified")
3250
3251   def _RunAllocator(self):
3252     """Run the allocator based on input opcode.
3253
3254     """
3255     disks = [{"size": self.op.disk_size, "mode": "w"},
3256              {"size": self.op.swap_size, "mode": "w"}]
3257     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3258              "bridge": self.op.bridge}]
3259     ial = IAllocator(self.cfg,
3260                      mode=constants.IALLOCATOR_MODE_ALLOC,
3261                      name=self.op.instance_name,
3262                      disk_template=self.op.disk_template,
3263                      tags=[],
3264                      os=self.op.os_type,
3265                      vcpus=self.op.vcpus,
3266                      mem_size=self.op.mem_size,
3267                      disks=disks,
3268                      nics=nics,
3269                      )
3270
3271     ial.Run(self.op.iallocator)
3272
3273     if not ial.success:
3274       raise errors.OpPrereqError("Can't compute nodes using"
3275                                  " iallocator '%s': %s" % (self.op.iallocator,
3276                                                            ial.info))
3277     if len(ial.nodes) != ial.required_nodes:
3278       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3279                                  " of nodes (%s), required %s" %
3280                                  (self.op.iallocator, len(ial.nodes),
3281                                   ial.required_nodes))
3282     self.op.pnode = ial.nodes[0]
3283     logger.ToStdout("Selected nodes for the instance: %s" %
3284                     (", ".join(ial.nodes),))
3285     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3286                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3287     if ial.required_nodes == 2:
3288       self.op.snode = ial.nodes[1]
3289
3290   def BuildHooksEnv(self):
3291     """Build hooks env.
3292
3293     This runs on master, primary and secondary nodes of the instance.
3294
3295     """
3296     env = {
3297       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3298       "INSTANCE_DISK_SIZE": self.op.disk_size,
3299       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3300       "INSTANCE_ADD_MODE": self.op.mode,
3301       }
3302     if self.op.mode == constants.INSTANCE_IMPORT:
3303       env["INSTANCE_SRC_NODE"] = self.op.src_node
3304       env["INSTANCE_SRC_PATH"] = self.op.src_path
3305       env["INSTANCE_SRC_IMAGE"] = self.src_image
3306
3307     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3308       primary_node=self.op.pnode,
3309       secondary_nodes=self.secondaries,
3310       status=self.instance_status,
3311       os_type=self.op.os_type,
3312       memory=self.op.mem_size,
3313       vcpus=self.op.vcpus,
3314       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3315     ))
3316
3317     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3318           self.secondaries)
3319     return env, nl, nl
3320
3321
3322   def CheckPrereq(self):
3323     """Check prerequisites.
3324
3325     """
3326     if (not self.cfg.GetVGName() and
3327         self.op.disk_template not in constants.DTS_NOT_LVM):
3328       raise errors.OpPrereqError("Cluster does not support lvm-based"
3329                                  " instances")
3330
3331     if self.op.mode == constants.INSTANCE_IMPORT:
3332       src_node = self.op.src_node
3333       src_path = self.op.src_path
3334
3335       export_info = rpc.call_export_info(src_node, src_path)
3336
3337       if not export_info:
3338         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3339
3340       if not export_info.has_section(constants.INISECT_EXP):
3341         raise errors.ProgrammerError("Corrupted export config")
3342
3343       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3344       if (int(ei_version) != constants.EXPORT_VERSION):
3345         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3346                                    (ei_version, constants.EXPORT_VERSION))
3347
3348       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3349         raise errors.OpPrereqError("Can't import instance with more than"
3350                                    " one data disk")
3351
3352       # FIXME: are the old os-es, disk sizes, etc. useful?
3353       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3354       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3355                                                          'disk0_dump'))
3356       self.src_image = diskimage
3357
3358     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3359
3360     if self.op.start and not self.op.ip_check:
3361       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3362                                  " adding an instance in start mode")
3363
3364     if self.op.ip_check:
3365       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3366         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3367                                    (self.check_ip, instance_name))
3368
3369     # bridge verification
3370     bridge = getattr(self.op, "bridge", None)
3371     if bridge is None:
3372       self.op.bridge = self.cfg.GetDefBridge()
3373     else:
3374       self.op.bridge = bridge
3375
3376     #### allocator run
3377
3378     if self.op.iallocator is not None:
3379       self._RunAllocator()
3380
3381     #### node related checks
3382
3383     # check primary node
3384     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3385     assert self.pnode is not None, \
3386       "Cannot retrieve locked node %s" % self.op.pnode
3387     self.secondaries = []
3388
3389     # mirror node verification
3390     if self.op.disk_template in constants.DTS_NET_MIRROR:
3391       if self.op.snode is None:
3392         raise errors.OpPrereqError("The networked disk templates need"
3393                                    " a mirror node")
3394       if self.op.snode == pnode.name:
3395         raise errors.OpPrereqError("The secondary node cannot be"
3396                                    " the primary node.")
3397       self.secondaries.append(self.op.snode)
3398
3399     req_size = _ComputeDiskSize(self.op.disk_template,
3400                                 self.op.disk_size, self.op.swap_size)
3401
3402     # Check lv size requirements
3403     if req_size is not None:
3404       nodenames = [pnode.name] + self.secondaries
3405       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3406       for node in nodenames:
3407         info = nodeinfo.get(node, None)
3408         if not info:
3409           raise errors.OpPrereqError("Cannot get current information"
3410                                      " from node '%s'" % node)
3411         vg_free = info.get('vg_free', None)
3412         if not isinstance(vg_free, int):
3413           raise errors.OpPrereqError("Can't compute free disk space on"
3414                                      " node %s" % node)
3415         if req_size > info['vg_free']:
3416           raise errors.OpPrereqError("Not enough disk space on target node %s."
3417                                      " %d MB available, %d MB required" %
3418                                      (node, info['vg_free'], req_size))
3419
3420     # os verification
3421     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3422     if not os_obj:
3423       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3424                                  " primary node"  % self.op.os_type)
3425
3426     if self.op.kernel_path == constants.VALUE_NONE:
3427       raise errors.OpPrereqError("Can't set instance kernel to none")
3428
3429     # bridge check on primary node
3430     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3431       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3432                                  " destination node '%s'" %
3433                                  (self.op.bridge, pnode.name))
3434
3435     # memory check on primary node
3436     if self.op.start:
3437       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3438                            "creating instance %s" % self.op.instance_name,
3439                            self.op.mem_size)
3440
3441     # hvm_cdrom_image_path verification
3442     if self.op.hvm_cdrom_image_path is not None:
3443       # FIXME (als): shouldn't these checks happen on the destination node?
3444       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3445         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3446                                    " be an absolute path or None, not %s" %
3447                                    self.op.hvm_cdrom_image_path)
3448       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3449         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3450                                    " regular file or a symlink pointing to"
3451                                    " an existing regular file, not %s" %
3452                                    self.op.hvm_cdrom_image_path)
3453
3454     # vnc_bind_address verification
3455     if self.op.vnc_bind_address is not None:
3456       if not utils.IsValidIP(self.op.vnc_bind_address):
3457         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3458                                    " like a valid IP address" %
3459                                    self.op.vnc_bind_address)
3460
3461     # Xen HVM device type checks
3462     if self.cfg.GetHypervisorType() == constants.HT_XEN_HVM31:
3463       if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3464         raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3465                                    " hypervisor" % self.op.hvm_nic_type)
3466       if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3467         raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3468                                    " hypervisor" % self.op.hvm_disk_type)
3469
3470     if self.op.start:
3471       self.instance_status = 'up'
3472     else:
3473       self.instance_status = 'down'
3474
3475   def Exec(self, feedback_fn):
3476     """Create and add the instance to the cluster.
3477
3478     """
3479     instance = self.op.instance_name
3480     pnode_name = self.pnode.name
3481
3482     if self.op.mac == "auto":
3483       mac_address = self.cfg.GenerateMAC()
3484     else:
3485       mac_address = self.op.mac
3486
3487     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3488     if self.inst_ip is not None:
3489       nic.ip = self.inst_ip
3490
3491     ht_kind = self.cfg.GetHypervisorType()
3492     if ht_kind in constants.HTS_REQ_PORT:
3493       network_port = self.cfg.AllocatePort()
3494     else:
3495       network_port = None
3496
3497     if self.op.vnc_bind_address is None:
3498       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3499
3500     # this is needed because os.path.join does not accept None arguments
3501     if self.op.file_storage_dir is None:
3502       string_file_storage_dir = ""
3503     else:
3504       string_file_storage_dir = self.op.file_storage_dir
3505
3506     # build the full file storage dir path
3507     file_storage_dir = os.path.normpath(os.path.join(
3508                                         self.cfg.GetFileStorageDir(),
3509                                         string_file_storage_dir, instance))
3510
3511
3512     disks = _GenerateDiskTemplate(self.cfg,
3513                                   self.op.disk_template,
3514                                   instance, pnode_name,
3515                                   self.secondaries, self.op.disk_size,
3516                                   self.op.swap_size,
3517                                   file_storage_dir,
3518                                   self.op.file_driver)
3519
3520     iobj = objects.Instance(name=instance, os=self.op.os_type,
3521                             primary_node=pnode_name,
3522                             memory=self.op.mem_size,
3523                             vcpus=self.op.vcpus,
3524                             nics=[nic], disks=disks,
3525                             disk_template=self.op.disk_template,
3526                             status=self.instance_status,
3527                             network_port=network_port,
3528                             kernel_path=self.op.kernel_path,
3529                             initrd_path=self.op.initrd_path,
3530                             hvm_boot_order=self.op.hvm_boot_order,
3531                             hvm_acpi=self.op.hvm_acpi,
3532                             hvm_pae=self.op.hvm_pae,
3533                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3534                             vnc_bind_address=self.op.vnc_bind_address,
3535                             hvm_nic_type=self.op.hvm_nic_type,
3536                             hvm_disk_type=self.op.hvm_disk_type,
3537                             )
3538
3539     feedback_fn("* creating instance disks...")
3540     if not _CreateDisks(self.cfg, iobj):
3541       _RemoveDisks(iobj, self.cfg)
3542       self.cfg.ReleaseDRBDMinors(instance)
3543       raise errors.OpExecError("Device creation failed, reverting...")
3544
3545     feedback_fn("adding instance %s to cluster config" % instance)
3546
3547     self.cfg.AddInstance(iobj)
3548     # Declare that we don't want to remove the instance lock anymore, as we've
3549     # added the instance to the config
3550     del self.remove_locks[locking.LEVEL_INSTANCE]
3551     # Remove the temp. assignements for the instance's drbds
3552     self.cfg.ReleaseDRBDMinors(instance)
3553
3554     if self.op.wait_for_sync:
3555       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3556     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3557       # make sure the disks are not degraded (still sync-ing is ok)
3558       time.sleep(15)
3559       feedback_fn("* checking mirrors status")
3560       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3561     else:
3562       disk_abort = False
3563
3564     if disk_abort:
3565       _RemoveDisks(iobj, self.cfg)
3566       self.cfg.RemoveInstance(iobj.name)
3567       # Make sure the instance lock gets removed
3568       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3569       raise errors.OpExecError("There are some degraded disks for"
3570                                " this instance")
3571
3572     feedback_fn("creating os for instance %s on node %s" %
3573                 (instance, pnode_name))
3574
3575     if iobj.disk_template != constants.DT_DISKLESS:
3576       if self.op.mode == constants.INSTANCE_CREATE:
3577         feedback_fn("* running the instance OS create scripts...")
3578         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3579           raise errors.OpExecError("could not add os for instance %s"
3580                                    " on node %s" %
3581                                    (instance, pnode_name))
3582
3583       elif self.op.mode == constants.INSTANCE_IMPORT:
3584         feedback_fn("* running the instance OS import scripts...")
3585         src_node = self.op.src_node
3586         src_image = self.src_image
3587         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3588                                                 src_node, src_image):
3589           raise errors.OpExecError("Could not import os for instance"
3590                                    " %s on node %s" %
3591                                    (instance, pnode_name))
3592       else:
3593         # also checked in the prereq part
3594         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3595                                      % self.op.mode)
3596
3597     if self.op.start:
3598       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3599       feedback_fn("* starting instance...")
3600       if not rpc.call_instance_start(pnode_name, iobj, None):
3601         raise errors.OpExecError("Could not start instance")
3602
3603
3604 class LUConnectConsole(NoHooksLU):
3605   """Connect to an instance's console.
3606
3607   This is somewhat special in that it returns the command line that
3608   you need to run on the master node in order to connect to the
3609   console.
3610
3611   """
3612   _OP_REQP = ["instance_name"]
3613   REQ_BGL = False
3614
3615   def ExpandNames(self):
3616     self._ExpandAndLockInstance()
3617
3618   def CheckPrereq(self):
3619     """Check prerequisites.
3620
3621     This checks that the instance is in the cluster.
3622
3623     """
3624     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3625     assert self.instance is not None, \
3626       "Cannot retrieve locked instance %s" % self.op.instance_name
3627
3628   def Exec(self, feedback_fn):
3629     """Connect to the console of an instance
3630
3631     """
3632     instance = self.instance
3633     node = instance.primary_node
3634
3635     node_insts = rpc.call_instance_list([node])[node]
3636     if node_insts is False:
3637       raise errors.OpExecError("Can't connect to node %s." % node)
3638
3639     if instance.name not in node_insts:
3640       raise errors.OpExecError("Instance %s is not running." % instance.name)
3641
3642     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3643
3644     hyper = hypervisor.GetHypervisor(self.cfg)
3645     console_cmd = hyper.GetShellCommandForConsole(instance)
3646
3647     # build ssh cmdline
3648     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3649
3650
3651 class LUReplaceDisks(LogicalUnit):
3652   """Replace the disks of an instance.
3653
3654   """
3655   HPATH = "mirrors-replace"
3656   HTYPE = constants.HTYPE_INSTANCE
3657   _OP_REQP = ["instance_name", "mode", "disks"]
3658   REQ_BGL = False
3659
3660   def ExpandNames(self):
3661     self._ExpandAndLockInstance()
3662
3663     if not hasattr(self.op, "remote_node"):
3664       self.op.remote_node = None
3665
3666     ia_name = getattr(self.op, "iallocator", None)
3667     if ia_name is not None:
3668       if self.op.remote_node is not None:
3669         raise errors.OpPrereqError("Give either the iallocator or the new"
3670                                    " secondary, not both")
3671       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3672     elif self.op.remote_node is not None:
3673       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3674       if remote_node is None:
3675         raise errors.OpPrereqError("Node '%s' not known" %
3676                                    self.op.remote_node)
3677       self.op.remote_node = remote_node
3678       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3679       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3680     else:
3681       self.needed_locks[locking.LEVEL_NODE] = []
3682       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3683
3684   def DeclareLocks(self, level):
3685     # If we're not already locking all nodes in the set we have to declare the
3686     # instance's primary/secondary nodes.
3687     if (level == locking.LEVEL_NODE and
3688         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3689       self._LockInstancesNodes()
3690
3691   def _RunAllocator(self):
3692     """Compute a new secondary node using an IAllocator.
3693
3694     """
3695     ial = IAllocator(self.cfg,
3696                      mode=constants.IALLOCATOR_MODE_RELOC,
3697                      name=self.op.instance_name,
3698                      relocate_from=[self.sec_node])
3699
3700     ial.Run(self.op.iallocator)
3701
3702     if not ial.success:
3703       raise errors.OpPrereqError("Can't compute nodes using"
3704                                  " iallocator '%s': %s" % (self.op.iallocator,
3705                                                            ial.info))
3706     if len(ial.nodes) != ial.required_nodes:
3707       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3708                                  " of nodes (%s), required %s" %
3709                                  (len(ial.nodes), ial.required_nodes))
3710     self.op.remote_node = ial.nodes[0]
3711     logger.ToStdout("Selected new secondary for the instance: %s" %
3712                     self.op.remote_node)
3713
3714   def BuildHooksEnv(self):
3715     """Build hooks env.
3716
3717     This runs on the master, the primary and all the secondaries.
3718
3719     """
3720     env = {
3721       "MODE": self.op.mode,
3722       "NEW_SECONDARY": self.op.remote_node,
3723       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3724       }
3725     env.update(_BuildInstanceHookEnvByObject(self.instance))
3726     nl = [
3727       self.cfg.GetMasterNode(),
3728       self.instance.primary_node,
3729       ]
3730     if self.op.remote_node is not None:
3731       nl.append(self.op.remote_node)
3732     return env, nl, nl
3733
3734   def CheckPrereq(self):
3735     """Check prerequisites.
3736
3737     This checks that the instance is in the cluster.
3738
3739     """
3740     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3741     assert instance is not None, \
3742       "Cannot retrieve locked instance %s" % self.op.instance_name
3743     self.instance = instance
3744
3745     if instance.disk_template not in constants.DTS_NET_MIRROR:
3746       raise errors.OpPrereqError("Instance's disk layout is not"
3747                                  " network mirrored.")
3748
3749     if len(instance.secondary_nodes) != 1:
3750       raise errors.OpPrereqError("The instance has a strange layout,"
3751                                  " expected one secondary but found %d" %
3752                                  len(instance.secondary_nodes))
3753
3754     self.sec_node = instance.secondary_nodes[0]
3755
3756     ia_name = getattr(self.op, "iallocator", None)
3757     if ia_name is not None:
3758       self._RunAllocator()
3759
3760     remote_node = self.op.remote_node
3761     if remote_node is not None:
3762       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3763       assert self.remote_node_info is not None, \
3764         "Cannot retrieve locked node %s" % remote_node
3765     else:
3766       self.remote_node_info = None
3767     if remote_node == instance.primary_node:
3768       raise errors.OpPrereqError("The specified node is the primary node of"
3769                                  " the instance.")
3770     elif remote_node == self.sec_node:
3771       if self.op.mode == constants.REPLACE_DISK_SEC:
3772         # this is for DRBD8, where we can't execute the same mode of
3773         # replacement as for drbd7 (no different port allocated)
3774         raise errors.OpPrereqError("Same secondary given, cannot execute"
3775                                    " replacement")
3776     if instance.disk_template == constants.DT_DRBD8:
3777       if (self.op.mode == constants.REPLACE_DISK_ALL and
3778           remote_node is not None):
3779         # switch to replace secondary mode
3780         self.op.mode = constants.REPLACE_DISK_SEC
3781
3782       if self.op.mode == constants.REPLACE_DISK_ALL:
3783         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3784                                    " secondary disk replacement, not"
3785                                    " both at once")
3786       elif self.op.mode == constants.REPLACE_DISK_PRI:
3787         if remote_node is not None:
3788           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3789                                      " the secondary while doing a primary"
3790                                      " node disk replacement")
3791         self.tgt_node = instance.primary_node
3792         self.oth_node = instance.secondary_nodes[0]
3793       elif self.op.mode == constants.REPLACE_DISK_SEC:
3794         self.new_node = remote_node # this can be None, in which case
3795                                     # we don't change the secondary
3796         self.tgt_node = instance.secondary_nodes[0]
3797         self.oth_node = instance.primary_node
3798       else:
3799         raise errors.ProgrammerError("Unhandled disk replace mode")
3800
3801     for name in self.op.disks:
3802       if instance.FindDisk(name) is None:
3803         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3804                                    (name, instance.name))
3805
3806   def _ExecD8DiskOnly(self, feedback_fn):
3807     """Replace a disk on the primary or secondary for dbrd8.
3808
3809     The algorithm for replace is quite complicated:
3810       - for each disk to be replaced:
3811         - create new LVs on the target node with unique names
3812         - detach old LVs from the drbd device
3813         - rename old LVs to name_replaced.<time_t>
3814         - rename new LVs to old LVs
3815         - attach the new LVs (with the old names now) to the drbd device
3816       - wait for sync across all devices
3817       - for each modified disk:
3818         - remove old LVs (which have the name name_replaces.<time_t>)
3819
3820     Failures are not very well handled.
3821
3822     """
3823     steps_total = 6
3824     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3825     instance = self.instance
3826     iv_names = {}
3827     vgname = self.cfg.GetVGName()
3828     # start of work
3829     cfg = self.cfg
3830     tgt_node = self.tgt_node
3831     oth_node = self.oth_node
3832
3833     # Step: check device activation
3834     self.proc.LogStep(1, steps_total, "check device existence")
3835     info("checking volume groups")
3836     my_vg = cfg.GetVGName()
3837     results = rpc.call_vg_list([oth_node, tgt_node])
3838     if not results:
3839       raise errors.OpExecError("Can't list volume groups on the nodes")
3840     for node in oth_node, tgt_node:
3841       res = results.get(node, False)
3842       if not res or my_vg not in res:
3843         raise errors.OpExecError("Volume group '%s' not found on %s" %
3844                                  (my_vg, node))
3845     for dev in instance.disks:
3846       if not dev.iv_name in self.op.disks:
3847         continue
3848       for node in tgt_node, oth_node:
3849         info("checking %s on %s" % (dev.iv_name, node))
3850         cfg.SetDiskID(dev, node)
3851         if not rpc.call_blockdev_find(node, dev):
3852           raise errors.OpExecError("Can't find device %s on node %s" %
3853                                    (dev.iv_name, node))
3854
3855     # Step: check other node consistency
3856     self.proc.LogStep(2, steps_total, "check peer consistency")
3857     for dev in instance.disks:
3858       if not dev.iv_name in self.op.disks:
3859         continue
3860       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3861       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3862                                    oth_node==instance.primary_node):
3863         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3864                                  " to replace disks on this node (%s)" %
3865                                  (oth_node, tgt_node))
3866
3867     # Step: create new storage
3868     self.proc.LogStep(3, steps_total, "allocate new storage")
3869     for dev in instance.disks:
3870       if not dev.iv_name in self.op.disks:
3871         continue
3872       size = dev.size
3873       cfg.SetDiskID(dev, tgt_node)
3874       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3875       names = _GenerateUniqueNames(cfg, lv_names)
3876       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3877                              logical_id=(vgname, names[0]))
3878       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3879                              logical_id=(vgname, names[1]))
3880       new_lvs = [lv_data, lv_meta]
3881       old_lvs = dev.children
3882       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3883       info("creating new local storage on %s for %s" %
3884            (tgt_node, dev.iv_name))
3885       # since we *always* want to create this LV, we use the
3886       # _Create...OnPrimary (which forces the creation), even if we
3887       # are talking about the secondary node
3888       for new_lv in new_lvs:
3889         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3890                                         _GetInstanceInfoText(instance)):
3891           raise errors.OpExecError("Failed to create new LV named '%s' on"
3892                                    " node '%s'" %
3893                                    (new_lv.logical_id[1], tgt_node))
3894
3895     # Step: for each lv, detach+rename*2+attach
3896     self.proc.LogStep(4, steps_total, "change drbd configuration")
3897     for dev, old_lvs, new_lvs in iv_names.itervalues():
3898       info("detaching %s drbd from local storage" % dev.iv_name)
3899       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3900         raise errors.OpExecError("Can't detach drbd from local storage on node"
3901                                  " %s for device %s" % (tgt_node, dev.iv_name))
3902       #dev.children = []
3903       #cfg.Update(instance)
3904
3905       # ok, we created the new LVs, so now we know we have the needed
3906       # storage; as such, we proceed on the target node to rename
3907       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3908       # using the assumption that logical_id == physical_id (which in
3909       # turn is the unique_id on that node)
3910
3911       # FIXME(iustin): use a better name for the replaced LVs
3912       temp_suffix = int(time.time())
3913       ren_fn = lambda d, suff: (d.physical_id[0],
3914                                 d.physical_id[1] + "_replaced-%s" % suff)
3915       # build the rename list based on what LVs exist on the node
3916       rlist = []
3917       for to_ren in old_lvs:
3918         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3919         if find_res is not None: # device exists
3920           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3921
3922       info("renaming the old LVs on the target node")
3923       if not rpc.call_blockdev_rename(tgt_node, rlist):
3924         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3925       # now we rename the new LVs to the old LVs
3926       info("renaming the new LVs on the target node")
3927       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3928       if not rpc.call_blockdev_rename(tgt_node, rlist):
3929         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3930
3931       for old, new in zip(old_lvs, new_lvs):
3932         new.logical_id = old.logical_id
3933         cfg.SetDiskID(new, tgt_node)
3934
3935       for disk in old_lvs:
3936         disk.logical_id = ren_fn(disk, temp_suffix)
3937         cfg.SetDiskID(disk, tgt_node)
3938
3939       # now that the new lvs have the old name, we can add them to the device
3940       info("adding new mirror component on %s" % tgt_node)
3941       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3942         for new_lv in new_lvs:
3943           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3944             warning("Can't rollback device %s", hint="manually cleanup unused"
3945                     " logical volumes")
3946         raise errors.OpExecError("Can't add local storage to drbd")
3947
3948       dev.children = new_lvs
3949       cfg.Update(instance)
3950
3951     # Step: wait for sync
3952
3953     # this can fail as the old devices are degraded and _WaitForSync
3954     # does a combined result over all disks, so we don't check its
3955     # return value
3956     self.proc.LogStep(5, steps_total, "sync devices")
3957     _WaitForSync(cfg, instance, self.proc, unlock=True)
3958
3959     # so check manually all the devices
3960     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3961       cfg.SetDiskID(dev, instance.primary_node)
3962       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3963       if is_degr:
3964         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3965
3966     # Step: remove old storage
3967     self.proc.LogStep(6, steps_total, "removing old storage")
3968     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3969       info("remove logical volumes for %s" % name)
3970       for lv in old_lvs:
3971         cfg.SetDiskID(lv, tgt_node)
3972         if not rpc.call_blockdev_remove(tgt_node, lv):
3973           warning("Can't remove old LV", hint="manually remove unused LVs")
3974           continue
3975
3976   def _ExecD8Secondary(self, feedback_fn):
3977     """Replace the secondary node for drbd8.
3978
3979     The algorithm for replace is quite complicated:
3980       - for all disks of the instance:
3981         - create new LVs on the new node with same names
3982         - shutdown the drbd device on the old secondary
3983         - disconnect the drbd network on the primary
3984         - create the drbd device on the new secondary
3985         - network attach the drbd on the primary, using an artifice:
3986           the drbd code for Attach() will connect to the network if it
3987           finds a device which is connected to the good local disks but
3988           not network enabled
3989       - wait for sync across all devices
3990       - remove all disks from the old secondary
3991
3992     Failures are not very well handled.
3993
3994     """
3995     steps_total = 6
3996     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3997     instance = self.instance
3998     iv_names = {}
3999     vgname = self.cfg.GetVGName()
4000     # start of work
4001     cfg = self.cfg
4002     old_node = self.tgt_node
4003     new_node = self.new_node
4004     pri_node = instance.primary_node
4005
4006     # Step: check device activation
4007     self.proc.LogStep(1, steps_total, "check device existence")
4008     info("checking volume groups")
4009     my_vg = cfg.GetVGName()
4010     results = rpc.call_vg_list([pri_node, new_node])
4011     if not results:
4012       raise errors.OpExecError("Can't list volume groups on the nodes")
4013     for node in pri_node, new_node:
4014       res = results.get(node, False)
4015       if not res or my_vg not in res:
4016         raise errors.OpExecError("Volume group '%s' not found on %s" %
4017                                  (my_vg, node))
4018     for dev in instance.disks:
4019       if not dev.iv_name in self.op.disks:
4020         continue
4021       info("checking %s on %s" % (dev.iv_name, pri_node))
4022       cfg.SetDiskID(dev, pri_node)
4023       if not rpc.call_blockdev_find(pri_node, dev):
4024         raise errors.OpExecError("Can't find device %s on node %s" %
4025                                  (dev.iv_name, pri_node))
4026
4027     # Step: check other node consistency
4028     self.proc.LogStep(2, steps_total, "check peer consistency")
4029     for dev in instance.disks:
4030       if not dev.iv_name in self.op.disks:
4031         continue
4032       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4033       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4034         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4035                                  " unsafe to replace the secondary" %
4036                                  pri_node)
4037
4038     # Step: create new storage
4039     self.proc.LogStep(3, steps_total, "allocate new storage")
4040     for dev in instance.disks:
4041       size = dev.size
4042       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4043       # since we *always* want to create this LV, we use the
4044       # _Create...OnPrimary (which forces the creation), even if we
4045       # are talking about the secondary node
4046       for new_lv in dev.children:
4047         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4048                                         _GetInstanceInfoText(instance)):
4049           raise errors.OpExecError("Failed to create new LV named '%s' on"
4050                                    " node '%s'" %
4051                                    (new_lv.logical_id[1], new_node))
4052
4053
4054     # Step 4: dbrd minors and drbd setups changes
4055     # after this, we must manually remove the drbd minors on both the
4056     # error and the success paths
4057     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4058                                    instance.name)
4059     logging.debug("Allocated minors %s" % (minors,))
4060     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4061     for dev, new_minor in zip(instance.disks, minors):
4062       size = dev.size
4063       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4064       # create new devices on new_node
4065       if pri_node == dev.logical_id[0]:
4066         new_logical_id = (pri_node, new_node,
4067                           dev.logical_id[2], dev.logical_id[3], new_minor,
4068                           dev.logical_id[5])
4069       else:
4070         new_logical_id = (new_node, pri_node,
4071                           dev.logical_id[2], new_minor, dev.logical_id[4],
4072                           dev.logical_id[5])
4073       iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4074       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4075                     new_logical_id)
4076       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4077                               logical_id=new_logical_id,
4078                               children=dev.children)
4079       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4080                                         new_drbd, False,
4081                                       _GetInstanceInfoText(instance)):
4082         self.cfg.ReleaseDRBDMinors(instance.name)
4083         raise errors.OpExecError("Failed to create new DRBD on"
4084                                  " node '%s'" % new_node)
4085
4086     for dev in instance.disks:
4087       # we have new devices, shutdown the drbd on the old secondary
4088       info("shutting down drbd for %s on old node" % dev.iv_name)
4089       cfg.SetDiskID(dev, old_node)
4090       if not rpc.call_blockdev_shutdown(old_node, dev):
4091         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4092                 hint="Please cleanup this device manually as soon as possible")
4093
4094     info("detaching primary drbds from the network (=> standalone)")
4095     done = 0
4096     for dev in instance.disks:
4097       cfg.SetDiskID(dev, pri_node)
4098       # set the network part of the physical (unique in bdev terms) id
4099       # to None, meaning detach from network
4100       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4101       # and 'find' the device, which will 'fix' it to match the
4102       # standalone state
4103       if rpc.call_blockdev_find(pri_node, dev):
4104         done += 1
4105       else:
4106         warning("Failed to detach drbd %s from network, unusual case" %
4107                 dev.iv_name)
4108
4109     if not done:
4110       # no detaches succeeded (very unlikely)
4111       self.cfg.ReleaseDRBDMinors(instance.name)
4112       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4113
4114     # if we managed to detach at least one, we update all the disks of
4115     # the instance to point to the new secondary
4116     info("updating instance configuration")
4117     for dev, _, new_logical_id in iv_names.itervalues():
4118       dev.logical_id = new_logical_id
4119       cfg.SetDiskID(dev, pri_node)
4120     cfg.Update(instance)
4121     # we can remove now the temp minors as now the new values are
4122     # written to the config file (and therefore stable)
4123     self.cfg.ReleaseDRBDMinors(instance.name)
4124
4125     # and now perform the drbd attach
4126     info("attaching primary drbds to new secondary (standalone => connected)")
4127     failures = []
4128     for dev in instance.disks:
4129       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4130       # since the attach is smart, it's enough to 'find' the device,
4131       # it will automatically activate the network, if the physical_id
4132       # is correct
4133       cfg.SetDiskID(dev, pri_node)
4134       logging.debug("Disk to attach: %s", dev)
4135       if not rpc.call_blockdev_find(pri_node, dev):
4136         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4137                 "please do a gnt-instance info to see the status of disks")
4138
4139     # this can fail as the old devices are degraded and _WaitForSync
4140     # does a combined result over all disks, so we don't check its
4141     # return value
4142     self.proc.LogStep(5, steps_total, "sync devices")
4143     _WaitForSync(cfg, instance, self.proc, unlock=True)
4144
4145     # so check manually all the devices
4146     for name, (dev, old_lvs, _) in iv_names.iteritems():
4147       cfg.SetDiskID(dev, pri_node)
4148       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4149       if is_degr:
4150         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4151
4152     self.proc.LogStep(6, steps_total, "removing old storage")
4153     for name, (dev, old_lvs, _) in iv_names.iteritems():
4154       info("remove logical volumes for %s" % name)
4155       for lv in old_lvs:
4156         cfg.SetDiskID(lv, old_node)
4157         if not rpc.call_blockdev_remove(old_node, lv):
4158           warning("Can't remove LV on old secondary",
4159                   hint="Cleanup stale volumes by hand")
4160
4161   def Exec(self, feedback_fn):
4162     """Execute disk replacement.
4163
4164     This dispatches the disk replacement to the appropriate handler.
4165
4166     """
4167     instance = self.instance
4168
4169     # Activate the instance disks if we're replacing them on a down instance
4170     if instance.status == "down":
4171       _StartInstanceDisks(self.cfg, instance, True)
4172
4173     if instance.disk_template == constants.DT_DRBD8:
4174       if self.op.remote_node is None:
4175         fn = self._ExecD8DiskOnly
4176       else:
4177         fn = self._ExecD8Secondary
4178     else:
4179       raise errors.ProgrammerError("Unhandled disk replacement case")
4180
4181     ret = fn(feedback_fn)
4182
4183     # Deactivate the instance disks if we're replacing them on a down instance
4184     if instance.status == "down":
4185       _SafeShutdownInstanceDisks(instance, self.cfg)
4186
4187     return ret
4188
4189
4190 class LUGrowDisk(LogicalUnit):
4191   """Grow a disk of an instance.
4192
4193   """
4194   HPATH = "disk-grow"
4195   HTYPE = constants.HTYPE_INSTANCE
4196   _OP_REQP = ["instance_name", "disk", "amount"]
4197   REQ_BGL = False
4198
4199   def ExpandNames(self):
4200     self._ExpandAndLockInstance()
4201     self.needed_locks[locking.LEVEL_NODE] = []
4202     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4203
4204   def DeclareLocks(self, level):
4205     if level == locking.LEVEL_NODE:
4206       self._LockInstancesNodes()
4207
4208   def BuildHooksEnv(self):
4209     """Build hooks env.
4210
4211     This runs on the master, the primary and all the secondaries.
4212
4213     """
4214     env = {
4215       "DISK": self.op.disk,
4216       "AMOUNT": self.op.amount,
4217       }
4218     env.update(_BuildInstanceHookEnvByObject(self.instance))
4219     nl = [
4220       self.cfg.GetMasterNode(),
4221       self.instance.primary_node,
4222       ]
4223     return env, nl, nl
4224
4225   def CheckPrereq(self):
4226     """Check prerequisites.
4227
4228     This checks that the instance is in the cluster.
4229
4230     """
4231     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4232     assert instance is not None, \
4233       "Cannot retrieve locked instance %s" % self.op.instance_name
4234
4235     self.instance = instance
4236
4237     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4238       raise errors.OpPrereqError("Instance's disk layout does not support"
4239                                  " growing.")
4240
4241     if instance.FindDisk(self.op.disk) is None:
4242       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4243                                  (self.op.disk, instance.name))
4244
4245     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4246     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4247     for node in nodenames:
4248       info = nodeinfo.get(node, None)
4249       if not info:
4250         raise errors.OpPrereqError("Cannot get current information"
4251                                    " from node '%s'" % node)
4252       vg_free = info.get('vg_free', None)
4253       if not isinstance(vg_free, int):
4254         raise errors.OpPrereqError("Can't compute free disk space on"
4255                                    " node %s" % node)
4256       if self.op.amount > info['vg_free']:
4257         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4258                                    " %d MiB available, %d MiB required" %
4259                                    (node, info['vg_free'], self.op.amount))
4260
4261   def Exec(self, feedback_fn):
4262     """Execute disk grow.
4263
4264     """
4265     instance = self.instance
4266     disk = instance.FindDisk(self.op.disk)
4267     for node in (instance.secondary_nodes + (instance.primary_node,)):
4268       self.cfg.SetDiskID(disk, node)
4269       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4270       if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4271         raise errors.OpExecError("grow request failed to node %s" % node)
4272       elif not result[0]:
4273         raise errors.OpExecError("grow request failed to node %s: %s" %
4274                                  (node, result[1]))
4275     disk.RecordGrow(self.op.amount)
4276     self.cfg.Update(instance)
4277     return
4278
4279
4280 class LUQueryInstanceData(NoHooksLU):
4281   """Query runtime instance data.
4282
4283   """
4284   _OP_REQP = ["instances"]
4285   REQ_BGL = False
4286
4287   def ExpandNames(self):
4288     self.needed_locks = {}
4289     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4290
4291     if not isinstance(self.op.instances, list):
4292       raise errors.OpPrereqError("Invalid argument type 'instances'")
4293
4294     if self.op.instances:
4295       self.wanted_names = []
4296       for name in self.op.instances:
4297         full_name = self.cfg.ExpandInstanceName(name)
4298         if full_name is None:
4299           raise errors.OpPrereqError("Instance '%s' not known" %
4300                                      self.op.instance_name)
4301         self.wanted_names.append(full_name)
4302       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4303     else:
4304       self.wanted_names = None
4305       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4306
4307     self.needed_locks[locking.LEVEL_NODE] = []
4308     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4309
4310   def DeclareLocks(self, level):
4311     if level == locking.LEVEL_NODE:
4312       self._LockInstancesNodes()
4313
4314   def CheckPrereq(self):
4315     """Check prerequisites.
4316
4317     This only checks the optional instance list against the existing names.
4318
4319     """
4320     if self.wanted_names is None:
4321       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4322
4323     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4324                              in self.wanted_names]
4325     return
4326
4327   def _ComputeDiskStatus(self, instance, snode, dev):
4328     """Compute block device status.
4329
4330     """
4331     self.cfg.SetDiskID(dev, instance.primary_node)
4332     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4333     if dev.dev_type in constants.LDS_DRBD:
4334       # we change the snode then (otherwise we use the one passed in)
4335       if dev.logical_id[0] == instance.primary_node:
4336         snode = dev.logical_id[1]
4337       else:
4338         snode = dev.logical_id[0]
4339
4340     if snode:
4341       self.cfg.SetDiskID(dev, snode)
4342       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4343     else:
4344       dev_sstatus = None
4345
4346     if dev.children:
4347       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4348                       for child in dev.children]
4349     else:
4350       dev_children = []
4351
4352     data = {
4353       "iv_name": dev.iv_name,
4354       "dev_type": dev.dev_type,
4355       "logical_id": dev.logical_id,
4356       "physical_id": dev.physical_id,
4357       "pstatus": dev_pstatus,
4358       "sstatus": dev_sstatus,
4359       "children": dev_children,
4360       }
4361
4362     return data
4363
4364   def Exec(self, feedback_fn):
4365     """Gather and return data"""
4366     result = {}
4367     for instance in self.wanted_instances:
4368       remote_info = rpc.call_instance_info(instance.primary_node,
4369                                                 instance.name)
4370       if remote_info and "state" in remote_info:
4371         remote_state = "up"
4372       else:
4373         remote_state = "down"
4374       if instance.status == "down":
4375         config_state = "down"
4376       else:
4377         config_state = "up"
4378
4379       disks = [self._ComputeDiskStatus(instance, None, device)
4380                for device in instance.disks]
4381
4382       idict = {
4383         "name": instance.name,
4384         "config_state": config_state,
4385         "run_state": remote_state,
4386         "pnode": instance.primary_node,
4387         "snodes": instance.secondary_nodes,
4388         "os": instance.os,
4389         "memory": instance.memory,
4390         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4391         "disks": disks,
4392         "vcpus": instance.vcpus,
4393         }
4394
4395       htkind = self.cfg.GetHypervisorType()
4396       if htkind == constants.HT_XEN_PVM30:
4397         idict["kernel_path"] = instance.kernel_path
4398         idict["initrd_path"] = instance.initrd_path
4399
4400       if htkind == constants.HT_XEN_HVM31:
4401         idict["hvm_boot_order"] = instance.hvm_boot_order
4402         idict["hvm_acpi"] = instance.hvm_acpi
4403         idict["hvm_pae"] = instance.hvm_pae
4404         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4405         idict["hvm_nic_type"] = instance.hvm_nic_type
4406         idict["hvm_disk_type"] = instance.hvm_disk_type
4407
4408       if htkind in constants.HTS_REQ_PORT:
4409         if instance.vnc_bind_address is None:
4410           vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4411         else:
4412           vnc_bind_address = instance.vnc_bind_address
4413         if instance.network_port is None:
4414           vnc_console_port = None
4415         elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4416           vnc_console_port = "%s:%s" % (instance.primary_node,
4417                                        instance.network_port)
4418         elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4419           vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4420                                                    instance.network_port,
4421                                                    instance.primary_node)
4422         else:
4423           vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4424                                         instance.network_port)
4425         idict["vnc_console_port"] = vnc_console_port
4426         idict["vnc_bind_address"] = vnc_bind_address
4427         idict["network_port"] = instance.network_port
4428
4429       result[instance.name] = idict
4430
4431     return result
4432
4433
4434 class LUSetInstanceParams(LogicalUnit):
4435   """Modifies an instances's parameters.
4436
4437   """
4438   HPATH = "instance-modify"
4439   HTYPE = constants.HTYPE_INSTANCE
4440   _OP_REQP = ["instance_name"]
4441   REQ_BGL = False
4442
4443   def ExpandNames(self):
4444     self._ExpandAndLockInstance()
4445
4446   def BuildHooksEnv(self):
4447     """Build hooks env.
4448
4449     This runs on the master, primary and secondaries.
4450
4451     """
4452     args = dict()
4453     if self.mem:
4454       args['memory'] = self.mem
4455     if self.vcpus:
4456       args['vcpus'] = self.vcpus
4457     if self.do_ip or self.do_bridge or self.mac:
4458       if self.do_ip:
4459         ip = self.ip
4460       else:
4461         ip = self.instance.nics[0].ip
4462       if self.bridge:
4463         bridge = self.bridge
4464       else:
4465         bridge = self.instance.nics[0].bridge
4466       if self.mac:
4467         mac = self.mac
4468       else:
4469         mac = self.instance.nics[0].mac
4470       args['nics'] = [(ip, bridge, mac)]
4471     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4472     nl = [self.cfg.GetMasterNode(),
4473           self.instance.primary_node] + list(self.instance.secondary_nodes)
4474     return env, nl, nl
4475
4476   def CheckPrereq(self):
4477     """Check prerequisites.
4478
4479     This only checks the instance list against the existing names.
4480
4481     """
4482     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4483     # a separate CheckArguments function, if we implement one, so the operation
4484     # can be aborted without waiting for any lock, should it have an error...
4485     self.mem = getattr(self.op, "mem", None)
4486     self.vcpus = getattr(self.op, "vcpus", None)
4487     self.ip = getattr(self.op, "ip", None)
4488     self.mac = getattr(self.op, "mac", None)
4489     self.bridge = getattr(self.op, "bridge", None)
4490     self.kernel_path = getattr(self.op, "kernel_path", None)
4491     self.initrd_path = getattr(self.op, "initrd_path", None)
4492     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4493     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4494     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4495     self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4496     self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4497     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4498     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4499     self.force = getattr(self.op, "force", None)
4500     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4501                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4502                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4503                  self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4504     if all_parms.count(None) == len(all_parms):
4505       raise errors.OpPrereqError("No changes submitted")
4506     if self.mem is not None:
4507       try:
4508         self.mem = int(self.mem)
4509       except ValueError, err:
4510         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4511     if self.vcpus is not None:
4512       try:
4513         self.vcpus = int(self.vcpus)
4514       except ValueError, err:
4515         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4516     if self.ip is not None:
4517       self.do_ip = True
4518       if self.ip.lower() == "none":
4519         self.ip = None
4520       else:
4521         if not utils.IsValidIP(self.ip):
4522           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4523     else:
4524       self.do_ip = False
4525     self.do_bridge = (self.bridge is not None)
4526     if self.mac is not None:
4527       if self.cfg.IsMacInUse(self.mac):
4528         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4529                                    self.mac)
4530       if not utils.IsValidMac(self.mac):
4531         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4532
4533     if self.kernel_path is not None:
4534       self.do_kernel_path = True
4535       if self.kernel_path == constants.VALUE_NONE:
4536         raise errors.OpPrereqError("Can't set instance to no kernel")
4537
4538       if self.kernel_path != constants.VALUE_DEFAULT:
4539         if not os.path.isabs(self.kernel_path):
4540           raise errors.OpPrereqError("The kernel path must be an absolute"
4541                                     " filename")
4542     else:
4543       self.do_kernel_path = False
4544
4545     if self.initrd_path is not None:
4546       self.do_initrd_path = True
4547       if self.initrd_path not in (constants.VALUE_NONE,
4548                                   constants.VALUE_DEFAULT):
4549         if not os.path.isabs(self.initrd_path):
4550           raise errors.OpPrereqError("The initrd path must be an absolute"
4551                                     " filename")
4552     else:
4553       self.do_initrd_path = False
4554
4555     # boot order verification
4556     if self.hvm_boot_order is not None:
4557       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4558         if len(self.hvm_boot_order.strip("acdn")) != 0:
4559           raise errors.OpPrereqError("invalid boot order specified,"
4560                                      " must be one or more of [acdn]"
4561                                      " or 'default'")
4562
4563     # hvm_cdrom_image_path verification
4564     if self.op.hvm_cdrom_image_path is not None:
4565       if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4566               self.op.hvm_cdrom_image_path.lower() == "none"):
4567         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4568                                    " be an absolute path or None, not %s" %
4569                                    self.op.hvm_cdrom_image_path)
4570       if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4571               self.op.hvm_cdrom_image_path.lower() == "none"):
4572         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4573                                    " regular file or a symlink pointing to"
4574                                    " an existing regular file, not %s" %
4575                                    self.op.hvm_cdrom_image_path)
4576
4577     # vnc_bind_address verification
4578     if self.op.vnc_bind_address is not None:
4579       if not utils.IsValidIP(self.op.vnc_bind_address):
4580         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4581                                    " like a valid IP address" %
4582                                    self.op.vnc_bind_address)
4583
4584     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4585     assert self.instance is not None, \
4586       "Cannot retrieve locked instance %s" % self.op.instance_name
4587     self.warn = []
4588     if self.mem is not None and not self.force:
4589       pnode = self.instance.primary_node
4590       nodelist = [pnode]
4591       nodelist.extend(instance.secondary_nodes)
4592       instance_info = rpc.call_instance_info(pnode, instance.name)
4593       nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4594
4595       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4596         # Assume the primary node is unreachable and go ahead
4597         self.warn.append("Can't get info from primary node %s" % pnode)
4598       else:
4599         if instance_info:
4600           current_mem = instance_info['memory']
4601         else:
4602           # Assume instance not running
4603           # (there is a slight race condition here, but it's not very probable,
4604           # and we have no other way to check)
4605           current_mem = 0
4606         miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4607         if miss_mem > 0:
4608           raise errors.OpPrereqError("This change will prevent the instance"
4609                                      " from starting, due to %d MB of memory"
4610                                      " missing on its primary node" % miss_mem)
4611
4612       for node in instance.secondary_nodes:
4613         if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4614           self.warn.append("Can't get info from secondary node %s" % node)
4615         elif self.mem > nodeinfo[node]['memory_free']:
4616           self.warn.append("Not enough memory to failover instance to secondary"
4617                            " node %s" % node)
4618
4619     # Xen HVM device type checks
4620     if self.cfg.GetHypervisorType() == constants.HT_XEN_HVM31:
4621       if self.op.hvm_nic_type is not None:
4622         if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4623           raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4624                                      " HVM  hypervisor" % self.op.hvm_nic_type)
4625       if self.op.hvm_disk_type is not None:
4626         if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4627           raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4628                                      " HVM hypervisor" % self.op.hvm_disk_type)
4629
4630     return
4631
4632   def Exec(self, feedback_fn):
4633     """Modifies an instance.
4634
4635     All parameters take effect only at the next restart of the instance.
4636     """
4637     # Process here the warnings from CheckPrereq, as we don't have a
4638     # feedback_fn there.
4639     for warn in self.warn:
4640       feedback_fn("WARNING: %s" % warn)
4641
4642     result = []
4643     instance = self.instance
4644     if self.mem:
4645       instance.memory = self.mem
4646       result.append(("mem", self.mem))
4647     if self.vcpus:
4648       instance.vcpus = self.vcpus
4649       result.append(("vcpus",  self.vcpus))
4650     if self.do_ip:
4651       instance.nics[0].ip = self.ip
4652       result.append(("ip", self.ip))
4653     if self.bridge:
4654       instance.nics[0].bridge = self.bridge
4655       result.append(("bridge", self.bridge))
4656     if self.mac:
4657       instance.nics[0].mac = self.mac
4658       result.append(("mac", self.mac))
4659     if self.do_kernel_path:
4660       instance.kernel_path = self.kernel_path
4661       result.append(("kernel_path", self.kernel_path))
4662     if self.do_initrd_path:
4663       instance.initrd_path = self.initrd_path
4664       result.append(("initrd_path", self.initrd_path))
4665     if self.hvm_boot_order:
4666       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4667         instance.hvm_boot_order = None
4668       else:
4669         instance.hvm_boot_order = self.hvm_boot_order
4670       result.append(("hvm_boot_order", self.hvm_boot_order))
4671     if self.hvm_acpi is not None:
4672       instance.hvm_acpi = self.hvm_acpi
4673       result.append(("hvm_acpi", self.hvm_acpi))
4674     if self.hvm_pae is not None:
4675       instance.hvm_pae = self.hvm_pae
4676       result.append(("hvm_pae", self.hvm_pae))
4677     if self.hvm_nic_type is not None:
4678       instance.hvm_nic_type = self.hvm_nic_type
4679       result.append(("hvm_nic_type", self.hvm_nic_type))
4680     if self.hvm_disk_type is not None:
4681       instance.hvm_disk_type = self.hvm_disk_type
4682       result.append(("hvm_disk_type", self.hvm_disk_type))
4683     if self.hvm_cdrom_image_path:
4684       if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4685         instance.hvm_cdrom_image_path = None
4686       else:
4687         instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4688       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4689     if self.vnc_bind_address:
4690       instance.vnc_bind_address = self.vnc_bind_address
4691       result.append(("vnc_bind_address", self.vnc_bind_address))
4692
4693     self.cfg.Update(instance)
4694
4695     return result
4696
4697
4698 class LUQueryExports(NoHooksLU):
4699   """Query the exports list
4700
4701   """
4702   _OP_REQP = ['nodes']
4703   REQ_BGL = False
4704
4705   def ExpandNames(self):
4706     self.needed_locks = {}
4707     self.share_locks[locking.LEVEL_NODE] = 1
4708     if not self.op.nodes:
4709       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4710     else:
4711       self.needed_locks[locking.LEVEL_NODE] = \
4712         _GetWantedNodes(self, self.op.nodes)
4713
4714   def CheckPrereq(self):
4715     """Check prerequisites.
4716
4717     """
4718     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4719
4720   def Exec(self, feedback_fn):
4721     """Compute the list of all the exported system images.
4722
4723     Returns:
4724       a dictionary with the structure node->(export-list)
4725       where export-list is a list of the instances exported on
4726       that node.
4727
4728     """
4729     return rpc.call_export_list(self.nodes)
4730
4731
4732 class LUExportInstance(LogicalUnit):
4733   """Export an instance to an image in the cluster.
4734
4735   """
4736   HPATH = "instance-export"
4737   HTYPE = constants.HTYPE_INSTANCE
4738   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4739   REQ_BGL = False
4740
4741   def ExpandNames(self):
4742     self._ExpandAndLockInstance()
4743     # FIXME: lock only instance primary and destination node
4744     #
4745     # Sad but true, for now we have do lock all nodes, as we don't know where
4746     # the previous export might be, and and in this LU we search for it and
4747     # remove it from its current node. In the future we could fix this by:
4748     #  - making a tasklet to search (share-lock all), then create the new one,
4749     #    then one to remove, after
4750     #  - removing the removal operation altoghether
4751     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4752
4753   def DeclareLocks(self, level):
4754     """Last minute lock declaration."""
4755     # All nodes are locked anyway, so nothing to do here.
4756
4757   def BuildHooksEnv(self):
4758     """Build hooks env.
4759
4760     This will run on the master, primary node and target node.
4761
4762     """
4763     env = {
4764       "EXPORT_NODE": self.op.target_node,
4765       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4766       }
4767     env.update(_BuildInstanceHookEnvByObject(self.instance))
4768     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4769           self.op.target_node]
4770     return env, nl, nl
4771
4772   def CheckPrereq(self):
4773     """Check prerequisites.
4774
4775     This checks that the instance and node names are valid.
4776
4777     """
4778     instance_name = self.op.instance_name
4779     self.instance = self.cfg.GetInstanceInfo(instance_name)
4780     assert self.instance is not None, \
4781           "Cannot retrieve locked instance %s" % self.op.instance_name
4782
4783     self.dst_node = self.cfg.GetNodeInfo(
4784       self.cfg.ExpandNodeName(self.op.target_node))
4785
4786     assert self.dst_node is not None, \
4787           "Cannot retrieve locked node %s" % self.op.target_node
4788
4789     # instance disk type verification
4790     for disk in self.instance.disks:
4791       if disk.dev_type == constants.LD_FILE:
4792         raise errors.OpPrereqError("Export not supported for instances with"
4793                                    " file-based disks")
4794
4795   def Exec(self, feedback_fn):
4796     """Export an instance to an image in the cluster.
4797
4798     """
4799     instance = self.instance
4800     dst_node = self.dst_node
4801     src_node = instance.primary_node
4802     if self.op.shutdown:
4803       # shutdown the instance, but not the disks
4804       if not rpc.call_instance_shutdown(src_node, instance):
4805         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4806                                  (instance.name, src_node))
4807
4808     vgname = self.cfg.GetVGName()
4809
4810     snap_disks = []
4811
4812     try:
4813       for disk in instance.disks:
4814         if disk.iv_name == "sda":
4815           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4816           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4817
4818           if not new_dev_name:
4819             logger.Error("could not snapshot block device %s on node %s" %
4820                          (disk.logical_id[1], src_node))
4821           else:
4822             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4823                                       logical_id=(vgname, new_dev_name),
4824                                       physical_id=(vgname, new_dev_name),
4825                                       iv_name=disk.iv_name)
4826             snap_disks.append(new_dev)
4827
4828     finally:
4829       if self.op.shutdown and instance.status == "up":
4830         if not rpc.call_instance_start(src_node, instance, None):
4831           _ShutdownInstanceDisks(instance, self.cfg)
4832           raise errors.OpExecError("Could not start instance")
4833
4834     # TODO: check for size
4835
4836     for dev in snap_disks:
4837       if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4838         logger.Error("could not export block device %s from node %s to node %s"
4839                      % (dev.logical_id[1], src_node, dst_node.name))
4840       if not rpc.call_blockdev_remove(src_node, dev):
4841         logger.Error("could not remove snapshot block device %s from node %s" %
4842                      (dev.logical_id[1], src_node))
4843
4844     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4845       logger.Error("could not finalize export for instance %s on node %s" %
4846                    (instance.name, dst_node.name))
4847
4848     nodelist = self.cfg.GetNodeList()
4849     nodelist.remove(dst_node.name)
4850
4851     # on one-node clusters nodelist will be empty after the removal
4852     # if we proceed the backup would be removed because OpQueryExports
4853     # substitutes an empty list with the full cluster node list.
4854     if nodelist:
4855       exportlist = rpc.call_export_list(nodelist)
4856       for node in exportlist:
4857         if instance.name in exportlist[node]:
4858           if not rpc.call_export_remove(node, instance.name):
4859             logger.Error("could not remove older export for instance %s"
4860                          " on node %s" % (instance.name, node))
4861
4862
4863 class LURemoveExport(NoHooksLU):
4864   """Remove exports related to the named instance.
4865
4866   """
4867   _OP_REQP = ["instance_name"]
4868   REQ_BGL = False
4869
4870   def ExpandNames(self):
4871     self.needed_locks = {}
4872     # We need all nodes to be locked in order for RemoveExport to work, but we
4873     # don't need to lock the instance itself, as nothing will happen to it (and
4874     # we can remove exports also for a removed instance)
4875     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4876
4877   def CheckPrereq(self):
4878     """Check prerequisites.
4879     """
4880     pass
4881
4882   def Exec(self, feedback_fn):
4883     """Remove any export.
4884
4885     """
4886     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4887     # If the instance was not found we'll try with the name that was passed in.
4888     # This will only work if it was an FQDN, though.
4889     fqdn_warn = False
4890     if not instance_name:
4891       fqdn_warn = True
4892       instance_name = self.op.instance_name
4893
4894     exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE])
4895     found = False
4896     for node in exportlist:
4897       if instance_name in exportlist[node]:
4898         found = True
4899         if not rpc.call_export_remove(node, instance_name):
4900           logger.Error("could not remove export for instance %s"
4901                        " on node %s" % (instance_name, node))
4902
4903     if fqdn_warn and not found:
4904       feedback_fn("Export not found. If trying to remove an export belonging"
4905                   " to a deleted instance please use its Fully Qualified"
4906                   " Domain Name.")
4907
4908
4909 class TagsLU(NoHooksLU):
4910   """Generic tags LU.
4911
4912   This is an abstract class which is the parent of all the other tags LUs.
4913
4914   """
4915
4916   def ExpandNames(self):
4917     self.needed_locks = {}
4918     if self.op.kind == constants.TAG_NODE:
4919       name = self.cfg.ExpandNodeName(self.op.name)
4920       if name is None:
4921         raise errors.OpPrereqError("Invalid node name (%s)" %
4922                                    (self.op.name,))
4923       self.op.name = name
4924       self.needed_locks[locking.LEVEL_NODE] = name
4925     elif self.op.kind == constants.TAG_INSTANCE:
4926       name = self.cfg.ExpandInstanceName(self.op.name)
4927       if name is None:
4928         raise errors.OpPrereqError("Invalid instance name (%s)" %
4929                                    (self.op.name,))
4930       self.op.name = name
4931       self.needed_locks[locking.LEVEL_INSTANCE] = name
4932
4933   def CheckPrereq(self):
4934     """Check prerequisites.
4935
4936     """
4937     if self.op.kind == constants.TAG_CLUSTER:
4938       self.target = self.cfg.GetClusterInfo()
4939     elif self.op.kind == constants.TAG_NODE:
4940       self.target = self.cfg.GetNodeInfo(self.op.name)
4941     elif self.op.kind == constants.TAG_INSTANCE:
4942       self.target = self.cfg.GetInstanceInfo(self.op.name)
4943     else:
4944       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4945                                  str(self.op.kind))
4946
4947
4948 class LUGetTags(TagsLU):
4949   """Returns the tags of a given object.
4950
4951   """
4952   _OP_REQP = ["kind", "name"]
4953   REQ_BGL = False
4954
4955   def Exec(self, feedback_fn):
4956     """Returns the tag list.
4957
4958     """
4959     return list(self.target.GetTags())
4960
4961
4962 class LUSearchTags(NoHooksLU):
4963   """Searches the tags for a given pattern.
4964
4965   """
4966   _OP_REQP = ["pattern"]
4967   REQ_BGL = False
4968
4969   def ExpandNames(self):
4970     self.needed_locks = {}
4971
4972   def CheckPrereq(self):
4973     """Check prerequisites.
4974
4975     This checks the pattern passed for validity by compiling it.
4976
4977     """
4978     try:
4979       self.re = re.compile(self.op.pattern)
4980     except re.error, err:
4981       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4982                                  (self.op.pattern, err))
4983
4984   def Exec(self, feedback_fn):
4985     """Returns the tag list.
4986
4987     """
4988     cfg = self.cfg
4989     tgts = [("/cluster", cfg.GetClusterInfo())]
4990     ilist = cfg.GetAllInstancesInfo().values()
4991     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4992     nlist = cfg.GetAllNodesInfo().values()
4993     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4994     results = []
4995     for path, target in tgts:
4996       for tag in target.GetTags():
4997         if self.re.search(tag):
4998           results.append((path, tag))
4999     return results
5000
5001
5002 class LUAddTags(TagsLU):
5003   """Sets a tag on a given object.
5004
5005   """
5006   _OP_REQP = ["kind", "name", "tags"]
5007   REQ_BGL = False
5008
5009   def CheckPrereq(self):
5010     """Check prerequisites.
5011
5012     This checks the type and length of the tag name and value.
5013
5014     """
5015     TagsLU.CheckPrereq(self)
5016     for tag in self.op.tags:
5017       objects.TaggableObject.ValidateTag(tag)
5018
5019   def Exec(self, feedback_fn):
5020     """Sets the tag.
5021
5022     """
5023     try:
5024       for tag in self.op.tags:
5025         self.target.AddTag(tag)
5026     except errors.TagError, err:
5027       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5028     try:
5029       self.cfg.Update(self.target)
5030     except errors.ConfigurationError:
5031       raise errors.OpRetryError("There has been a modification to the"
5032                                 " config file and the operation has been"
5033                                 " aborted. Please retry.")
5034
5035
5036 class LUDelTags(TagsLU):
5037   """Delete a list of tags from a given object.
5038
5039   """
5040   _OP_REQP = ["kind", "name", "tags"]
5041   REQ_BGL = False
5042
5043   def CheckPrereq(self):
5044     """Check prerequisites.
5045
5046     This checks that we have the given tag.
5047
5048     """
5049     TagsLU.CheckPrereq(self)
5050     for tag in self.op.tags:
5051       objects.TaggableObject.ValidateTag(tag)
5052     del_tags = frozenset(self.op.tags)
5053     cur_tags = self.target.GetTags()
5054     if not del_tags <= cur_tags:
5055       diff_tags = del_tags - cur_tags
5056       diff_names = ["'%s'" % tag for tag in diff_tags]
5057       diff_names.sort()
5058       raise errors.OpPrereqError("Tag(s) %s not found" %
5059                                  (",".join(diff_names)))
5060
5061   def Exec(self, feedback_fn):
5062     """Remove the tag from the object.
5063
5064     """
5065     for tag in self.op.tags:
5066       self.target.RemoveTag(tag)
5067     try:
5068       self.cfg.Update(self.target)
5069     except errors.ConfigurationError:
5070       raise errors.OpRetryError("There has been a modification to the"
5071                                 " config file and the operation has been"
5072                                 " aborted. Please retry.")
5073
5074
5075 class LUTestDelay(NoHooksLU):
5076   """Sleep for a specified amount of time.
5077
5078   This LU sleeps on the master and/or nodes for a specified amount of
5079   time.
5080
5081   """
5082   _OP_REQP = ["duration", "on_master", "on_nodes"]
5083   REQ_BGL = False
5084
5085   def ExpandNames(self):
5086     """Expand names and set required locks.
5087
5088     This expands the node list, if any.
5089
5090     """
5091     self.needed_locks = {}
5092     if self.op.on_nodes:
5093       # _GetWantedNodes can be used here, but is not always appropriate to use
5094       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5095       # more information.
5096       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5097       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5098
5099   def CheckPrereq(self):
5100     """Check prerequisites.
5101
5102     """
5103
5104   def Exec(self, feedback_fn):
5105     """Do the actual sleep.
5106
5107     """
5108     if self.op.on_master:
5109       if not utils.TestDelay(self.op.duration):
5110         raise errors.OpExecError("Error during master delay test")
5111     if self.op.on_nodes:
5112       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5113       if not result:
5114         raise errors.OpExecError("Complete failure from rpc call")
5115       for node, node_result in result.items():
5116         if not node_result:
5117           raise errors.OpExecError("Failure during rpc call to node %s,"
5118                                    " result: %s" % (node, node_result))
5119
5120
5121 class IAllocator(object):
5122   """IAllocator framework.
5123
5124   An IAllocator instance has three sets of attributes:
5125     - cfg that is needed to query the cluster
5126     - input data (all members of the _KEYS class attribute are required)
5127     - four buffer attributes (in|out_data|text), that represent the
5128       input (to the external script) in text and data structure format,
5129       and the output from it, again in two formats
5130     - the result variables from the script (success, info, nodes) for
5131       easy usage
5132
5133   """
5134   _ALLO_KEYS = [
5135     "mem_size", "disks", "disk_template",
5136     "os", "tags", "nics", "vcpus",
5137     ]
5138   _RELO_KEYS = [
5139     "relocate_from",
5140     ]
5141
5142   def __init__(self, cfg, mode, name, **kwargs):
5143     self.cfg = cfg
5144     # init buffer variables
5145     self.in_text = self.out_text = self.in_data = self.out_data = None
5146     # init all input fields so that pylint is happy
5147     self.mode = mode
5148     self.name = name
5149     self.mem_size = self.disks = self.disk_template = None
5150     self.os = self.tags = self.nics = self.vcpus = None
5151     self.relocate_from = None
5152     # computed fields
5153     self.required_nodes = None
5154     # init result fields
5155     self.success = self.info = self.nodes = None
5156     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5157       keyset = self._ALLO_KEYS
5158     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5159       keyset = self._RELO_KEYS
5160     else:
5161       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5162                                    " IAllocator" % self.mode)
5163     for key in kwargs:
5164       if key not in keyset:
5165         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5166                                      " IAllocator" % key)
5167       setattr(self, key, kwargs[key])
5168     for key in keyset:
5169       if key not in kwargs:
5170         raise errors.ProgrammerError("Missing input parameter '%s' to"
5171                                      " IAllocator" % key)
5172     self._BuildInputData()
5173
5174   def _ComputeClusterData(self):
5175     """Compute the generic allocator input data.
5176
5177     This is the data that is independent of the actual operation.
5178
5179     """
5180     cfg = self.cfg
5181     # cluster data
5182     data = {
5183       "version": 1,
5184       "cluster_name": self.cfg.GetClusterName(),
5185       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5186       "hypervisor_type": self.cfg.GetHypervisorType(),
5187       # we don't have job IDs
5188       }
5189
5190     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5191
5192     # node data
5193     node_results = {}
5194     node_list = cfg.GetNodeList()
5195     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5196     for nname in node_list:
5197       ninfo = cfg.GetNodeInfo(nname)
5198       if nname not in node_data or not isinstance(node_data[nname], dict):
5199         raise errors.OpExecError("Can't get data for node %s" % nname)
5200       remote_info = node_data[nname]
5201       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5202                    'vg_size', 'vg_free', 'cpu_total']:
5203         if attr not in remote_info:
5204           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5205                                    (nname, attr))
5206         try:
5207           remote_info[attr] = int(remote_info[attr])
5208         except ValueError, err:
5209           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5210                                    " %s" % (nname, attr, str(err)))
5211       # compute memory used by primary instances
5212       i_p_mem = i_p_up_mem = 0
5213       for iinfo in i_list:
5214         if iinfo.primary_node == nname:
5215           i_p_mem += iinfo.memory
5216           if iinfo.status == "up":
5217             i_p_up_mem += iinfo.memory
5218
5219       # compute memory used by instances
5220       pnr = {
5221         "tags": list(ninfo.GetTags()),
5222         "total_memory": remote_info['memory_total'],
5223         "reserved_memory": remote_info['memory_dom0'],
5224         "free_memory": remote_info['memory_free'],
5225         "i_pri_memory": i_p_mem,
5226         "i_pri_up_memory": i_p_up_mem,
5227         "total_disk": remote_info['vg_size'],
5228         "free_disk": remote_info['vg_free'],
5229         "primary_ip": ninfo.primary_ip,
5230         "secondary_ip": ninfo.secondary_ip,
5231         "total_cpus": remote_info['cpu_total'],
5232         }
5233       node_results[nname] = pnr
5234     data["nodes"] = node_results
5235
5236     # instance data
5237     instance_data = {}
5238     for iinfo in i_list:
5239       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5240                   for n in iinfo.nics]
5241       pir = {
5242         "tags": list(iinfo.GetTags()),
5243         "should_run": iinfo.status == "up",
5244         "vcpus": iinfo.vcpus,
5245         "memory": iinfo.memory,
5246         "os": iinfo.os,
5247         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5248         "nics": nic_data,
5249         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5250         "disk_template": iinfo.disk_template,
5251         }
5252       instance_data[iinfo.name] = pir
5253
5254     data["instances"] = instance_data
5255
5256     self.in_data = data
5257
5258   def _AddNewInstance(self):
5259     """Add new instance data to allocator structure.
5260
5261     This in combination with _AllocatorGetClusterData will create the
5262     correct structure needed as input for the allocator.
5263
5264     The checks for the completeness of the opcode must have already been
5265     done.
5266
5267     """
5268     data = self.in_data
5269     if len(self.disks) != 2:
5270       raise errors.OpExecError("Only two-disk configurations supported")
5271
5272     disk_space = _ComputeDiskSize(self.disk_template,
5273                                   self.disks[0]["size"], self.disks[1]["size"])
5274
5275     if self.disk_template in constants.DTS_NET_MIRROR:
5276       self.required_nodes = 2
5277     else:
5278       self.required_nodes = 1
5279     request = {
5280       "type": "allocate",
5281       "name": self.name,
5282       "disk_template": self.disk_template,
5283       "tags": self.tags,
5284       "os": self.os,
5285       "vcpus": self.vcpus,
5286       "memory": self.mem_size,
5287       "disks": self.disks,
5288       "disk_space_total": disk_space,
5289       "nics": self.nics,
5290       "required_nodes": self.required_nodes,
5291       }
5292     data["request"] = request
5293
5294   def _AddRelocateInstance(self):
5295     """Add relocate instance data to allocator structure.
5296
5297     This in combination with _IAllocatorGetClusterData will create the
5298     correct structure needed as input for the allocator.
5299
5300     The checks for the completeness of the opcode must have already been
5301     done.
5302
5303     """
5304     instance = self.cfg.GetInstanceInfo(self.name)
5305     if instance is None:
5306       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5307                                    " IAllocator" % self.name)
5308
5309     if instance.disk_template not in constants.DTS_NET_MIRROR:
5310       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5311
5312     if len(instance.secondary_nodes) != 1:
5313       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5314
5315     self.required_nodes = 1
5316
5317     disk_space = _ComputeDiskSize(instance.disk_template,
5318                                   instance.disks[0].size,
5319                                   instance.disks[1].size)
5320
5321     request = {
5322       "type": "relocate",
5323       "name": self.name,
5324       "disk_space_total": disk_space,
5325       "required_nodes": self.required_nodes,
5326       "relocate_from": self.relocate_from,
5327       }
5328     self.in_data["request"] = request
5329
5330   def _BuildInputData(self):
5331     """Build input data structures.
5332
5333     """
5334     self._ComputeClusterData()
5335
5336     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5337       self._AddNewInstance()
5338     else:
5339       self._AddRelocateInstance()
5340
5341     self.in_text = serializer.Dump(self.in_data)
5342
5343   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5344     """Run an instance allocator and return the results.
5345
5346     """
5347     data = self.in_text
5348
5349     result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
5350
5351     if not isinstance(result, (list, tuple)) or len(result) != 4:
5352       raise errors.OpExecError("Invalid result from master iallocator runner")
5353
5354     rcode, stdout, stderr, fail = result
5355
5356     if rcode == constants.IARUN_NOTFOUND:
5357       raise errors.OpExecError("Can't find allocator '%s'" % name)
5358     elif rcode == constants.IARUN_FAILURE:
5359       raise errors.OpExecError("Instance allocator call failed: %s,"
5360                                " output: %s" % (fail, stdout+stderr))
5361     self.out_text = stdout
5362     if validate:
5363       self._ValidateResult()
5364
5365   def _ValidateResult(self):
5366     """Process the allocator results.
5367
5368     This will process and if successful save the result in
5369     self.out_data and the other parameters.
5370
5371     """
5372     try:
5373       rdict = serializer.Load(self.out_text)
5374     except Exception, err:
5375       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5376
5377     if not isinstance(rdict, dict):
5378       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5379
5380     for key in "success", "info", "nodes":
5381       if key not in rdict:
5382         raise errors.OpExecError("Can't parse iallocator results:"
5383                                  " missing key '%s'" % key)
5384       setattr(self, key, rdict[key])
5385
5386     if not isinstance(rdict["nodes"], list):
5387       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5388                                " is not a list")
5389     self.out_data = rdict
5390
5391
5392 class LUTestAllocator(NoHooksLU):
5393   """Run allocator tests.
5394
5395   This LU runs the allocator tests
5396
5397   """
5398   _OP_REQP = ["direction", "mode", "name"]
5399
5400   def CheckPrereq(self):
5401     """Check prerequisites.
5402
5403     This checks the opcode parameters depending on the director and mode test.
5404
5405     """
5406     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5407       for attr in ["name", "mem_size", "disks", "disk_template",
5408                    "os", "tags", "nics", "vcpus"]:
5409         if not hasattr(self.op, attr):
5410           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5411                                      attr)
5412       iname = self.cfg.ExpandInstanceName(self.op.name)
5413       if iname is not None:
5414         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5415                                    iname)
5416       if not isinstance(self.op.nics, list):
5417         raise errors.OpPrereqError("Invalid parameter 'nics'")
5418       for row in self.op.nics:
5419         if (not isinstance(row, dict) or
5420             "mac" not in row or
5421             "ip" not in row or
5422             "bridge" not in row):
5423           raise errors.OpPrereqError("Invalid contents of the"
5424                                      " 'nics' parameter")
5425       if not isinstance(self.op.disks, list):
5426         raise errors.OpPrereqError("Invalid parameter 'disks'")
5427       if len(self.op.disks) != 2:
5428         raise errors.OpPrereqError("Only two-disk configurations supported")
5429       for row in self.op.disks:
5430         if (not isinstance(row, dict) or
5431             "size" not in row or
5432             not isinstance(row["size"], int) or
5433             "mode" not in row or
5434             row["mode"] not in ['r', 'w']):
5435           raise errors.OpPrereqError("Invalid contents of the"
5436                                      " 'disks' parameter")
5437     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5438       if not hasattr(self.op, "name"):
5439         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5440       fname = self.cfg.ExpandInstanceName(self.op.name)
5441       if fname is None:
5442         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5443                                    self.op.name)
5444       self.op.name = fname
5445       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5446     else:
5447       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5448                                  self.op.mode)
5449
5450     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5451       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5452         raise errors.OpPrereqError("Missing allocator name")
5453     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5454       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5455                                  self.op.direction)
5456
5457   def Exec(self, feedback_fn):
5458     """Run the allocator test.
5459
5460     """
5461     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5462       ial = IAllocator(self.cfg,
5463                        mode=self.op.mode,
5464                        name=self.op.name,
5465                        mem_size=self.op.mem_size,
5466                        disks=self.op.disks,
5467                        disk_template=self.op.disk_template,
5468                        os=self.op.os,
5469                        tags=self.op.tags,
5470                        nics=self.op.nics,
5471                        vcpus=self.op.vcpus,
5472                        )
5473     else:
5474       ial = IAllocator(self.cfg,
5475                        mode=self.op.mode,
5476                        name=self.op.name,
5477                        relocate_from=list(self.relocate_from),
5478                        )
5479
5480     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5481       result = ial.in_text
5482     else:
5483       ial.Run(self.op.allocator, validate=False)
5484       result = ial.out_text
5485     return result