Extend DRBD disks with shared secret attribute
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_MASTER: the LU needs to run on the master node
59         REQ_WSSTORE: the LU needs a writable SimpleStore
60         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61
62   Note that all commands require root permissions.
63
64   """
65   HPATH = None
66   HTYPE = None
67   _OP_REQP = []
68   REQ_MASTER = True
69   REQ_WSSTORE = False
70   REQ_BGL = True
71
72   def __init__(self, processor, op, context, sstore):
73     """Constructor for LogicalUnit.
74
75     This needs to be overriden in derived classes in order to check op
76     validity.
77
78     """
79     self.proc = processor
80     self.op = op
81     self.cfg = context.cfg
82     self.sstore = sstore
83     self.context = context
84     # Dicts used to declare locking needs to mcpu
85     self.needed_locks = None
86     self.acquired_locks = {}
87     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
88     self.add_locks = {}
89     self.remove_locks = {}
90     # Used to force good behavior when calling helper functions
91     self.recalculate_locks = {}
92     self.__ssh = None
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99
100     if not self.cfg.IsCluster():
101       raise errors.OpPrereqError("Cluster not initialized yet,"
102                                  " use 'gnt-cluster init' first.")
103     if self.REQ_MASTER:
104       master = sstore.GetMasterNode()
105       if master != utils.HostInfo().name:
106         raise errors.OpPrereqError("Commands must be run on the master"
107                                    " node %s" % master)
108
109   def __GetSSH(self):
110     """Returns the SshRunner object
111
112     """
113     if not self.__ssh:
114       self.__ssh = ssh.SshRunner(self.sstore)
115     return self.__ssh
116
117   ssh = property(fget=__GetSSH)
118
119   def ExpandNames(self):
120     """Expand names for this LU.
121
122     This method is called before starting to execute the opcode, and it should
123     update all the parameters of the opcode to their canonical form (e.g. a
124     short node name must be fully expanded after this method has successfully
125     completed). This way locking, hooks, logging, ecc. can work correctly.
126
127     LUs which implement this method must also populate the self.needed_locks
128     member, as a dict with lock levels as keys, and a list of needed lock names
129     as values. Rules:
130       - Use an empty dict if you don't need any lock
131       - If you don't need any lock at a particular level omit that level
132       - Don't put anything for the BGL level
133       - If you want all locks at a level use locking.ALL_SET as a value
134
135     If you need to share locks (rather than acquire them exclusively) at one
136     level you can modify self.share_locks, setting a true value (usually 1) for
137     that level. By default locks are not shared.
138
139     Examples:
140     # Acquire all nodes and one instance
141     self.needed_locks = {
142       locking.LEVEL_NODE: locking.ALL_SET,
143       locking.LEVEL_INSTANCE: ['instance1.example.tld'],
144     }
145     # Acquire just two nodes
146     self.needed_locks = {
147       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148     }
149     # Acquire no locks
150     self.needed_locks = {} # No, you can't leave it to the default value None
151
152     """
153     # The implementation of this method is mandatory only if the new LU is
154     # concurrent, so that old LUs don't need to be changed all at the same
155     # time.
156     if self.REQ_BGL:
157       self.needed_locks = {} # Exclusive LUs don't need locks.
158     else:
159       raise NotImplementedError
160
161   def DeclareLocks(self, level):
162     """Declare LU locking needs for a level
163
164     While most LUs can just declare their locking needs at ExpandNames time,
165     sometimes there's the need to calculate some locks after having acquired
166     the ones before. This function is called just before acquiring locks at a
167     particular level, but after acquiring the ones at lower levels, and permits
168     such calculations. It can be used to modify self.needed_locks, and by
169     default it does nothing.
170
171     This function is only called if you have something already set in
172     self.needed_locks for the level.
173
174     @param level: Locking level which is going to be locked
175     @type level: member of ganeti.locking.LEVELS
176
177     """
178
179   def CheckPrereq(self):
180     """Check prerequisites for this LU.
181
182     This method should check that the prerequisites for the execution
183     of this LU are fulfilled. It can do internode communication, but
184     it should be idempotent - no cluster or system changes are
185     allowed.
186
187     The method should raise errors.OpPrereqError in case something is
188     not fulfilled. Its return value is ignored.
189
190     This method should also update all the parameters of the opcode to
191     their canonical form if it hasn't been done by ExpandNames before.
192
193     """
194     raise NotImplementedError
195
196   def Exec(self, feedback_fn):
197     """Execute the LU.
198
199     This method should implement the actual work. It should raise
200     errors.OpExecError for failures that are somewhat dealt with in
201     code, or expected.
202
203     """
204     raise NotImplementedError
205
206   def BuildHooksEnv(self):
207     """Build hooks environment for this LU.
208
209     This method should return a three-node tuple consisting of: a dict
210     containing the environment that will be used for running the
211     specific hook for this LU, a list of node names on which the hook
212     should run before the execution, and a list of node names on which
213     the hook should run after the execution.
214
215     The keys of the dict must not have 'GANETI_' prefixed as this will
216     be handled in the hooks runner. Also note additional keys will be
217     added by the hooks runner. If the LU doesn't define any
218     environment, an empty dict (and not None) should be returned.
219
220     No nodes should be returned as an empty list (and not None).
221
222     Note that if the HPATH for a LU class is None, this function will
223     not be called.
224
225     """
226     raise NotImplementedError
227
228   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
229     """Notify the LU about the results of its hooks.
230
231     This method is called every time a hooks phase is executed, and notifies
232     the Logical Unit about the hooks' result. The LU can then use it to alter
233     its result based on the hooks.  By default the method does nothing and the
234     previous result is passed back unchanged but any LU can define it if it
235     wants to use the local cluster hook-scripts somehow.
236
237     Args:
238       phase: the hooks phase that has just been run
239       hooks_results: the results of the multi-node hooks rpc call
240       feedback_fn: function to send feedback back to the caller
241       lu_result: the previous result this LU had, or None in the PRE phase.
242
243     """
244     return lu_result
245
246   def _ExpandAndLockInstance(self):
247     """Helper function to expand and lock an instance.
248
249     Many LUs that work on an instance take its name in self.op.instance_name
250     and need to expand it and then declare the expanded name for locking. This
251     function does it, and then updates self.op.instance_name to the expanded
252     name. It also initializes needed_locks as a dict, if this hasn't been done
253     before.
254
255     """
256     if self.needed_locks is None:
257       self.needed_locks = {}
258     else:
259       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
260         "_ExpandAndLockInstance called with instance-level locks set"
261     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
262     if expanded_name is None:
263       raise errors.OpPrereqError("Instance '%s' not known" %
264                                   self.op.instance_name)
265     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
266     self.op.instance_name = expanded_name
267
268   def _LockInstancesNodes(self, primary_only=False):
269     """Helper function to declare instances' nodes for locking.
270
271     This function should be called after locking one or more instances to lock
272     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
273     with all primary or secondary nodes for instances already locked and
274     present in self.needed_locks[locking.LEVEL_INSTANCE].
275
276     It should be called from DeclareLocks, and for safety only works if
277     self.recalculate_locks[locking.LEVEL_NODE] is set.
278
279     In the future it may grow parameters to just lock some instance's nodes, or
280     to just lock primaries or secondary nodes, if needed.
281
282     If should be called in DeclareLocks in a way similar to:
283
284     if level == locking.LEVEL_NODE:
285       self._LockInstancesNodes()
286
287     @type primary_only: boolean
288     @param primary_only: only lock primary nodes of locked instances
289
290     """
291     assert locking.LEVEL_NODE in self.recalculate_locks, \
292       "_LockInstancesNodes helper function called with no nodes to recalculate"
293
294     # TODO: check if we're really been called with the instance locks held
295
296     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
297     # future we might want to have different behaviors depending on the value
298     # of self.recalculate_locks[locking.LEVEL_NODE]
299     wanted_nodes = []
300     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
301       instance = self.context.cfg.GetInstanceInfo(instance_name)
302       wanted_nodes.append(instance.primary_node)
303       if not primary_only:
304         wanted_nodes.extend(instance.secondary_nodes)
305
306     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
307       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
308     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
309       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
310
311     del self.recalculate_locks[locking.LEVEL_NODE]
312
313
314 class NoHooksLU(LogicalUnit):
315   """Simple LU which runs no hooks.
316
317   This LU is intended as a parent for other LogicalUnits which will
318   run no hooks, in order to reduce duplicate code.
319
320   """
321   HPATH = None
322   HTYPE = None
323
324
325 def _GetWantedNodes(lu, nodes):
326   """Returns list of checked and expanded node names.
327
328   Args:
329     nodes: List of nodes (strings) or None for all
330
331   """
332   if not isinstance(nodes, list):
333     raise errors.OpPrereqError("Invalid argument type 'nodes'")
334
335   if not nodes:
336     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
337       " non-empty list of nodes whose name is to be expanded.")
338
339   wanted = []
340   for name in nodes:
341     node = lu.cfg.ExpandNodeName(name)
342     if node is None:
343       raise errors.OpPrereqError("No such node name '%s'" % name)
344     wanted.append(node)
345
346   return utils.NiceSort(wanted)
347
348
349 def _GetWantedInstances(lu, instances):
350   """Returns list of checked and expanded instance names.
351
352   Args:
353     instances: List of instances (strings) or None for all
354
355   """
356   if not isinstance(instances, list):
357     raise errors.OpPrereqError("Invalid argument type 'instances'")
358
359   if instances:
360     wanted = []
361
362     for name in instances:
363       instance = lu.cfg.ExpandInstanceName(name)
364       if instance is None:
365         raise errors.OpPrereqError("No such instance name '%s'" % name)
366       wanted.append(instance)
367
368   else:
369     wanted = lu.cfg.GetInstanceList()
370   return utils.NiceSort(wanted)
371
372
373 def _CheckOutputFields(static, dynamic, selected):
374   """Checks whether all selected fields are valid.
375
376   Args:
377     static: Static fields
378     dynamic: Dynamic fields
379
380   """
381   static_fields = frozenset(static)
382   dynamic_fields = frozenset(dynamic)
383
384   all_fields = static_fields | dynamic_fields
385
386   if not all_fields.issuperset(selected):
387     raise errors.OpPrereqError("Unknown output fields selected: %s"
388                                % ",".join(frozenset(selected).
389                                           difference(all_fields)))
390
391
392 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
393                           memory, vcpus, nics):
394   """Builds instance related env variables for hooks from single variables.
395
396   Args:
397     secondary_nodes: List of secondary nodes as strings
398   """
399   env = {
400     "OP_TARGET": name,
401     "INSTANCE_NAME": name,
402     "INSTANCE_PRIMARY": primary_node,
403     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
404     "INSTANCE_OS_TYPE": os_type,
405     "INSTANCE_STATUS": status,
406     "INSTANCE_MEMORY": memory,
407     "INSTANCE_VCPUS": vcpus,
408   }
409
410   if nics:
411     nic_count = len(nics)
412     for idx, (ip, bridge, mac) in enumerate(nics):
413       if ip is None:
414         ip = ""
415       env["INSTANCE_NIC%d_IP" % idx] = ip
416       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
417       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418   else:
419     nic_count = 0
420
421   env["INSTANCE_NIC_COUNT"] = nic_count
422
423   return env
424
425
426 def _BuildInstanceHookEnvByObject(instance, override=None):
427   """Builds instance related env variables for hooks from an object.
428
429   Args:
430     instance: objects.Instance object of instance
431     override: dict of values to override
432   """
433   args = {
434     'name': instance.name,
435     'primary_node': instance.primary_node,
436     'secondary_nodes': instance.secondary_nodes,
437     'os_type': instance.os,
438     'status': instance.os,
439     'memory': instance.memory,
440     'vcpus': instance.vcpus,
441     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
442   }
443   if override:
444     args.update(override)
445   return _BuildInstanceHookEnv(**args)
446
447
448 def _CheckInstanceBridgesExist(instance):
449   """Check that the brigdes needed by an instance exist.
450
451   """
452   # check bridges existance
453   brlist = [nic.bridge for nic in instance.nics]
454   if not rpc.call_bridges_exist(instance.primary_node, brlist):
455     raise errors.OpPrereqError("one or more target bridges %s does not"
456                                " exist on destination node '%s'" %
457                                (brlist, instance.primary_node))
458
459
460 class LUDestroyCluster(NoHooksLU):
461   """Logical unit for destroying the cluster.
462
463   """
464   _OP_REQP = []
465
466   def CheckPrereq(self):
467     """Check prerequisites.
468
469     This checks whether the cluster is empty.
470
471     Any errors are signalled by raising errors.OpPrereqError.
472
473     """
474     master = self.sstore.GetMasterNode()
475
476     nodelist = self.cfg.GetNodeList()
477     if len(nodelist) != 1 or nodelist[0] != master:
478       raise errors.OpPrereqError("There are still %d node(s) in"
479                                  " this cluster." % (len(nodelist) - 1))
480     instancelist = self.cfg.GetInstanceList()
481     if instancelist:
482       raise errors.OpPrereqError("There are still %d instance(s) in"
483                                  " this cluster." % len(instancelist))
484
485   def Exec(self, feedback_fn):
486     """Destroys the cluster.
487
488     """
489     master = self.sstore.GetMasterNode()
490     if not rpc.call_node_stop_master(master, False):
491       raise errors.OpExecError("Could not disable the master role")
492     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
493     utils.CreateBackup(priv_key)
494     utils.CreateBackup(pub_key)
495     return master
496
497
498 class LUVerifyCluster(LogicalUnit):
499   """Verifies the cluster status.
500
501   """
502   HPATH = "cluster-verify"
503   HTYPE = constants.HTYPE_CLUSTER
504   _OP_REQP = ["skip_checks"]
505   REQ_BGL = False
506
507   def ExpandNames(self):
508     self.needed_locks = {
509       locking.LEVEL_NODE: locking.ALL_SET,
510       locking.LEVEL_INSTANCE: locking.ALL_SET,
511     }
512     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
513
514   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
515                   remote_version, feedback_fn):
516     """Run multiple tests against a node.
517
518     Test list:
519       - compares ganeti version
520       - checks vg existance and size > 20G
521       - checks config file checksum
522       - checks ssh to other nodes
523
524     Args:
525       node: name of the node to check
526       file_list: required list of files
527       local_cksum: dictionary of local files and their checksums
528
529     """
530     # compares ganeti version
531     local_version = constants.PROTOCOL_VERSION
532     if not remote_version:
533       feedback_fn("  - ERROR: connection to %s failed" % (node))
534       return True
535
536     if local_version != remote_version:
537       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
538                       (local_version, node, remote_version))
539       return True
540
541     # checks vg existance and size > 20G
542
543     bad = False
544     if not vglist:
545       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
546                       (node,))
547       bad = True
548     else:
549       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
550                                             constants.MIN_VG_SIZE)
551       if vgstatus:
552         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
553         bad = True
554
555     # checks config file checksum
556     # checks ssh to any
557
558     if 'filelist' not in node_result:
559       bad = True
560       feedback_fn("  - ERROR: node hasn't returned file checksum data")
561     else:
562       remote_cksum = node_result['filelist']
563       for file_name in file_list:
564         if file_name not in remote_cksum:
565           bad = True
566           feedback_fn("  - ERROR: file '%s' missing" % file_name)
567         elif remote_cksum[file_name] != local_cksum[file_name]:
568           bad = True
569           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
570
571     if 'nodelist' not in node_result:
572       bad = True
573       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
574     else:
575       if node_result['nodelist']:
576         bad = True
577         for node in node_result['nodelist']:
578           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
579                           (node, node_result['nodelist'][node]))
580     if 'node-net-test' not in node_result:
581       bad = True
582       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
583     else:
584       if node_result['node-net-test']:
585         bad = True
586         nlist = utils.NiceSort(node_result['node-net-test'].keys())
587         for node in nlist:
588           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
589                           (node, node_result['node-net-test'][node]))
590
591     hyp_result = node_result.get('hypervisor', None)
592     if hyp_result is not None:
593       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
594     return bad
595
596   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
597                       node_instance, feedback_fn):
598     """Verify an instance.
599
600     This function checks to see if the required block devices are
601     available on the instance's node.
602
603     """
604     bad = False
605
606     node_current = instanceconfig.primary_node
607
608     node_vol_should = {}
609     instanceconfig.MapLVsByNode(node_vol_should)
610
611     for node in node_vol_should:
612       for volume in node_vol_should[node]:
613         if node not in node_vol_is or volume not in node_vol_is[node]:
614           feedback_fn("  - ERROR: volume %s missing on node %s" %
615                           (volume, node))
616           bad = True
617
618     if not instanceconfig.status == 'down':
619       if (node_current not in node_instance or
620           not instance in node_instance[node_current]):
621         feedback_fn("  - ERROR: instance %s not running on node %s" %
622                         (instance, node_current))
623         bad = True
624
625     for node in node_instance:
626       if (not node == node_current):
627         if instance in node_instance[node]:
628           feedback_fn("  - ERROR: instance %s should not run on node %s" %
629                           (instance, node))
630           bad = True
631
632     return bad
633
634   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
635     """Verify if there are any unknown volumes in the cluster.
636
637     The .os, .swap and backup volumes are ignored. All other volumes are
638     reported as unknown.
639
640     """
641     bad = False
642
643     for node in node_vol_is:
644       for volume in node_vol_is[node]:
645         if node not in node_vol_should or volume not in node_vol_should[node]:
646           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
647                       (volume, node))
648           bad = True
649     return bad
650
651   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
652     """Verify the list of running instances.
653
654     This checks what instances are running but unknown to the cluster.
655
656     """
657     bad = False
658     for node in node_instance:
659       for runninginstance in node_instance[node]:
660         if runninginstance not in instancelist:
661           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
662                           (runninginstance, node))
663           bad = True
664     return bad
665
666   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
667     """Verify N+1 Memory Resilience.
668
669     Check that if one single node dies we can still start all the instances it
670     was primary for.
671
672     """
673     bad = False
674
675     for node, nodeinfo in node_info.iteritems():
676       # This code checks that every node which is now listed as secondary has
677       # enough memory to host all instances it is supposed to should a single
678       # other node in the cluster fail.
679       # FIXME: not ready for failover to an arbitrary node
680       # FIXME: does not support file-backed instances
681       # WARNING: we currently take into account down instances as well as up
682       # ones, considering that even if they're down someone might want to start
683       # them even in the event of a node failure.
684       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
685         needed_mem = 0
686         for instance in instances:
687           needed_mem += instance_cfg[instance].memory
688         if nodeinfo['mfree'] < needed_mem:
689           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
690                       " failovers should node %s fail" % (node, prinode))
691           bad = True
692     return bad
693
694   def CheckPrereq(self):
695     """Check prerequisites.
696
697     Transform the list of checks we're going to skip into a set and check that
698     all its members are valid.
699
700     """
701     self.skip_set = frozenset(self.op.skip_checks)
702     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
703       raise errors.OpPrereqError("Invalid checks to be skipped specified")
704
705   def BuildHooksEnv(self):
706     """Build hooks env.
707
708     Cluster-Verify hooks just rone in the post phase and their failure makes
709     the output be logged in the verify output and the verification to fail.
710
711     """
712     all_nodes = self.cfg.GetNodeList()
713     # TODO: populate the environment with useful information for verify hooks
714     env = {}
715     return env, [], all_nodes
716
717   def Exec(self, feedback_fn):
718     """Verify integrity of cluster, performing various test on nodes.
719
720     """
721     bad = False
722     feedback_fn("* Verifying global settings")
723     for msg in self.cfg.VerifyConfig():
724       feedback_fn("  - ERROR: %s" % msg)
725
726     vg_name = self.cfg.GetVGName()
727     nodelist = utils.NiceSort(self.cfg.GetNodeList())
728     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
729     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
730     i_non_redundant = [] # Non redundant instances
731     node_volume = {}
732     node_instance = {}
733     node_info = {}
734     instance_cfg = {}
735
736     # FIXME: verify OS list
737     # do local checksums
738     file_names = list(self.sstore.GetFileList())
739     file_names.append(constants.SSL_CERT_FILE)
740     file_names.append(constants.CLUSTER_CONF_FILE)
741     local_checksums = utils.FingerprintFiles(file_names)
742
743     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
744     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
745     all_instanceinfo = rpc.call_instance_list(nodelist)
746     all_vglist = rpc.call_vg_list(nodelist)
747     node_verify_param = {
748       'filelist': file_names,
749       'nodelist': nodelist,
750       'hypervisor': None,
751       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
752                         for node in nodeinfo]
753       }
754     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
755     all_rversion = rpc.call_version(nodelist)
756     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
757
758     for node in nodelist:
759       feedback_fn("* Verifying node %s" % node)
760       result = self._VerifyNode(node, file_names, local_checksums,
761                                 all_vglist[node], all_nvinfo[node],
762                                 all_rversion[node], feedback_fn)
763       bad = bad or result
764
765       # node_volume
766       volumeinfo = all_volumeinfo[node]
767
768       if isinstance(volumeinfo, basestring):
769         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
770                     (node, volumeinfo[-400:].encode('string_escape')))
771         bad = True
772         node_volume[node] = {}
773       elif not isinstance(volumeinfo, dict):
774         feedback_fn("  - ERROR: connection to %s failed" % (node,))
775         bad = True
776         continue
777       else:
778         node_volume[node] = volumeinfo
779
780       # node_instance
781       nodeinstance = all_instanceinfo[node]
782       if type(nodeinstance) != list:
783         feedback_fn("  - ERROR: connection to %s failed" % (node,))
784         bad = True
785         continue
786
787       node_instance[node] = nodeinstance
788
789       # node_info
790       nodeinfo = all_ninfo[node]
791       if not isinstance(nodeinfo, dict):
792         feedback_fn("  - ERROR: connection to %s failed" % (node,))
793         bad = True
794         continue
795
796       try:
797         node_info[node] = {
798           "mfree": int(nodeinfo['memory_free']),
799           "dfree": int(nodeinfo['vg_free']),
800           "pinst": [],
801           "sinst": [],
802           # dictionary holding all instances this node is secondary for,
803           # grouped by their primary node. Each key is a cluster node, and each
804           # value is a list of instances which have the key as primary and the
805           # current node as secondary.  this is handy to calculate N+1 memory
806           # availability if you can only failover from a primary to its
807           # secondary.
808           "sinst-by-pnode": {},
809         }
810       except ValueError:
811         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
812         bad = True
813         continue
814
815     node_vol_should = {}
816
817     for instance in instancelist:
818       feedback_fn("* Verifying instance %s" % instance)
819       inst_config = self.cfg.GetInstanceInfo(instance)
820       result =  self._VerifyInstance(instance, inst_config, node_volume,
821                                      node_instance, feedback_fn)
822       bad = bad or result
823
824       inst_config.MapLVsByNode(node_vol_should)
825
826       instance_cfg[instance] = inst_config
827
828       pnode = inst_config.primary_node
829       if pnode in node_info:
830         node_info[pnode]['pinst'].append(instance)
831       else:
832         feedback_fn("  - ERROR: instance %s, connection to primary node"
833                     " %s failed" % (instance, pnode))
834         bad = True
835
836       # If the instance is non-redundant we cannot survive losing its primary
837       # node, so we are not N+1 compliant. On the other hand we have no disk
838       # templates with more than one secondary so that situation is not well
839       # supported either.
840       # FIXME: does not support file-backed instances
841       if len(inst_config.secondary_nodes) == 0:
842         i_non_redundant.append(instance)
843       elif len(inst_config.secondary_nodes) > 1:
844         feedback_fn("  - WARNING: multiple secondaries for instance %s"
845                     % instance)
846
847       for snode in inst_config.secondary_nodes:
848         if snode in node_info:
849           node_info[snode]['sinst'].append(instance)
850           if pnode not in node_info[snode]['sinst-by-pnode']:
851             node_info[snode]['sinst-by-pnode'][pnode] = []
852           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
853         else:
854           feedback_fn("  - ERROR: instance %s, connection to secondary node"
855                       " %s failed" % (instance, snode))
856
857     feedback_fn("* Verifying orphan volumes")
858     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
859                                        feedback_fn)
860     bad = bad or result
861
862     feedback_fn("* Verifying remaining instances")
863     result = self._VerifyOrphanInstances(instancelist, node_instance,
864                                          feedback_fn)
865     bad = bad or result
866
867     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
868       feedback_fn("* Verifying N+1 Memory redundancy")
869       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
870       bad = bad or result
871
872     feedback_fn("* Other Notes")
873     if i_non_redundant:
874       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
875                   % len(i_non_redundant))
876
877     return not bad
878
879   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
880     """Analize the post-hooks' result, handle it, and send some
881     nicely-formatted feedback back to the user.
882
883     Args:
884       phase: the hooks phase that has just been run
885       hooks_results: the results of the multi-node hooks rpc call
886       feedback_fn: function to send feedback back to the caller
887       lu_result: previous Exec result
888
889     """
890     # We only really run POST phase hooks, and are only interested in
891     # their results
892     if phase == constants.HOOKS_PHASE_POST:
893       # Used to change hooks' output to proper indentation
894       indent_re = re.compile('^', re.M)
895       feedback_fn("* Hooks Results")
896       if not hooks_results:
897         feedback_fn("  - ERROR: general communication failure")
898         lu_result = 1
899       else:
900         for node_name in hooks_results:
901           show_node_header = True
902           res = hooks_results[node_name]
903           if res is False or not isinstance(res, list):
904             feedback_fn("    Communication failure")
905             lu_result = 1
906             continue
907           for script, hkr, output in res:
908             if hkr == constants.HKR_FAIL:
909               # The node header is only shown once, if there are
910               # failing hooks on that node
911               if show_node_header:
912                 feedback_fn("  Node %s:" % node_name)
913                 show_node_header = False
914               feedback_fn("    ERROR: Script %s failed, output:" % script)
915               output = indent_re.sub('      ', output)
916               feedback_fn("%s" % output)
917               lu_result = 1
918
919       return lu_result
920
921
922 class LUVerifyDisks(NoHooksLU):
923   """Verifies the cluster disks status.
924
925   """
926   _OP_REQP = []
927   REQ_BGL = False
928
929   def ExpandNames(self):
930     self.needed_locks = {
931       locking.LEVEL_NODE: locking.ALL_SET,
932       locking.LEVEL_INSTANCE: locking.ALL_SET,
933     }
934     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
935
936   def CheckPrereq(self):
937     """Check prerequisites.
938
939     This has no prerequisites.
940
941     """
942     pass
943
944   def Exec(self, feedback_fn):
945     """Verify integrity of cluster disks.
946
947     """
948     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
949
950     vg_name = self.cfg.GetVGName()
951     nodes = utils.NiceSort(self.cfg.GetNodeList())
952     instances = [self.cfg.GetInstanceInfo(name)
953                  for name in self.cfg.GetInstanceList()]
954
955     nv_dict = {}
956     for inst in instances:
957       inst_lvs = {}
958       if (inst.status != "up" or
959           inst.disk_template not in constants.DTS_NET_MIRROR):
960         continue
961       inst.MapLVsByNode(inst_lvs)
962       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
963       for node, vol_list in inst_lvs.iteritems():
964         for vol in vol_list:
965           nv_dict[(node, vol)] = inst
966
967     if not nv_dict:
968       return result
969
970     node_lvs = rpc.call_volume_list(nodes, vg_name)
971
972     to_act = set()
973     for node in nodes:
974       # node_volume
975       lvs = node_lvs[node]
976
977       if isinstance(lvs, basestring):
978         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
979         res_nlvm[node] = lvs
980       elif not isinstance(lvs, dict):
981         logger.Info("connection to node %s failed or invalid data returned" %
982                     (node,))
983         res_nodes.append(node)
984         continue
985
986       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
987         inst = nv_dict.pop((node, lv_name), None)
988         if (not lv_online and inst is not None
989             and inst.name not in res_instances):
990           res_instances.append(inst.name)
991
992     # any leftover items in nv_dict are missing LVs, let's arrange the
993     # data better
994     for key, inst in nv_dict.iteritems():
995       if inst.name not in res_missing:
996         res_missing[inst.name] = []
997       res_missing[inst.name].append(key)
998
999     return result
1000
1001
1002 class LURenameCluster(LogicalUnit):
1003   """Rename the cluster.
1004
1005   """
1006   HPATH = "cluster-rename"
1007   HTYPE = constants.HTYPE_CLUSTER
1008   _OP_REQP = ["name"]
1009   REQ_WSSTORE = True
1010
1011   def BuildHooksEnv(self):
1012     """Build hooks env.
1013
1014     """
1015     env = {
1016       "OP_TARGET": self.sstore.GetClusterName(),
1017       "NEW_NAME": self.op.name,
1018       }
1019     mn = self.sstore.GetMasterNode()
1020     return env, [mn], [mn]
1021
1022   def CheckPrereq(self):
1023     """Verify that the passed name is a valid one.
1024
1025     """
1026     hostname = utils.HostInfo(self.op.name)
1027
1028     new_name = hostname.name
1029     self.ip = new_ip = hostname.ip
1030     old_name = self.sstore.GetClusterName()
1031     old_ip = self.sstore.GetMasterIP()
1032     if new_name == old_name and new_ip == old_ip:
1033       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1034                                  " cluster has changed")
1035     if new_ip != old_ip:
1036       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1037         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1038                                    " reachable on the network. Aborting." %
1039                                    new_ip)
1040
1041     self.op.name = new_name
1042
1043   def Exec(self, feedback_fn):
1044     """Rename the cluster.
1045
1046     """
1047     clustername = self.op.name
1048     ip = self.ip
1049     ss = self.sstore
1050
1051     # shutdown the master IP
1052     master = ss.GetMasterNode()
1053     if not rpc.call_node_stop_master(master, False):
1054       raise errors.OpExecError("Could not disable the master role")
1055
1056     try:
1057       # modify the sstore
1058       ss.SetKey(ss.SS_MASTER_IP, ip)
1059       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1060
1061       # Distribute updated ss config to all nodes
1062       myself = self.cfg.GetNodeInfo(master)
1063       dist_nodes = self.cfg.GetNodeList()
1064       if myself.name in dist_nodes:
1065         dist_nodes.remove(myself.name)
1066
1067       logger.Debug("Copying updated ssconf data to all nodes")
1068       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1069         fname = ss.KeyToFilename(keyname)
1070         result = rpc.call_upload_file(dist_nodes, fname)
1071         for to_node in dist_nodes:
1072           if not result[to_node]:
1073             logger.Error("copy of file %s to node %s failed" %
1074                          (fname, to_node))
1075     finally:
1076       if not rpc.call_node_start_master(master, False):
1077         logger.Error("Could not re-enable the master role on the master,"
1078                      " please restart manually.")
1079
1080
1081 def _RecursiveCheckIfLVMBased(disk):
1082   """Check if the given disk or its children are lvm-based.
1083
1084   Args:
1085     disk: ganeti.objects.Disk object
1086
1087   Returns:
1088     boolean indicating whether a LD_LV dev_type was found or not
1089
1090   """
1091   if disk.children:
1092     for chdisk in disk.children:
1093       if _RecursiveCheckIfLVMBased(chdisk):
1094         return True
1095   return disk.dev_type == constants.LD_LV
1096
1097
1098 class LUSetClusterParams(LogicalUnit):
1099   """Change the parameters of the cluster.
1100
1101   """
1102   HPATH = "cluster-modify"
1103   HTYPE = constants.HTYPE_CLUSTER
1104   _OP_REQP = []
1105   REQ_BGL = False
1106
1107   def ExpandNames(self):
1108     # FIXME: in the future maybe other cluster params won't require checking on
1109     # all nodes to be modified.
1110     self.needed_locks = {
1111       locking.LEVEL_NODE: locking.ALL_SET,
1112     }
1113     self.share_locks[locking.LEVEL_NODE] = 1
1114
1115   def BuildHooksEnv(self):
1116     """Build hooks env.
1117
1118     """
1119     env = {
1120       "OP_TARGET": self.sstore.GetClusterName(),
1121       "NEW_VG_NAME": self.op.vg_name,
1122       }
1123     mn = self.sstore.GetMasterNode()
1124     return env, [mn], [mn]
1125
1126   def CheckPrereq(self):
1127     """Check prerequisites.
1128
1129     This checks whether the given params don't conflict and
1130     if the given volume group is valid.
1131
1132     """
1133     # FIXME: This only works because there is only one parameter that can be
1134     # changed or removed.
1135     if not self.op.vg_name:
1136       instances = self.cfg.GetAllInstancesInfo().values()
1137       for inst in instances:
1138         for disk in inst.disks:
1139           if _RecursiveCheckIfLVMBased(disk):
1140             raise errors.OpPrereqError("Cannot disable lvm storage while"
1141                                        " lvm-based instances exist")
1142
1143     # if vg_name not None, checks given volume group on all nodes
1144     if self.op.vg_name:
1145       node_list = self.acquired_locks[locking.LEVEL_NODE]
1146       vglist = rpc.call_vg_list(node_list)
1147       for node in node_list:
1148         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1149                                               constants.MIN_VG_SIZE)
1150         if vgstatus:
1151           raise errors.OpPrereqError("Error on node '%s': %s" %
1152                                      (node, vgstatus))
1153
1154   def Exec(self, feedback_fn):
1155     """Change the parameters of the cluster.
1156
1157     """
1158     if self.op.vg_name != self.cfg.GetVGName():
1159       self.cfg.SetVGName(self.op.vg_name)
1160     else:
1161       feedback_fn("Cluster LVM configuration already in desired"
1162                   " state, not changing")
1163
1164
1165 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1166   """Sleep and poll for an instance's disk to sync.
1167
1168   """
1169   if not instance.disks:
1170     return True
1171
1172   if not oneshot:
1173     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1174
1175   node = instance.primary_node
1176
1177   for dev in instance.disks:
1178     cfgw.SetDiskID(dev, node)
1179
1180   retries = 0
1181   while True:
1182     max_time = 0
1183     done = True
1184     cumul_degraded = False
1185     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1186     if not rstats:
1187       proc.LogWarning("Can't get any data from node %s" % node)
1188       retries += 1
1189       if retries >= 10:
1190         raise errors.RemoteError("Can't contact node %s for mirror data,"
1191                                  " aborting." % node)
1192       time.sleep(6)
1193       continue
1194     retries = 0
1195     for i in range(len(rstats)):
1196       mstat = rstats[i]
1197       if mstat is None:
1198         proc.LogWarning("Can't compute data for node %s/%s" %
1199                         (node, instance.disks[i].iv_name))
1200         continue
1201       # we ignore the ldisk parameter
1202       perc_done, est_time, is_degraded, _ = mstat
1203       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1204       if perc_done is not None:
1205         done = False
1206         if est_time is not None:
1207           rem_time = "%d estimated seconds remaining" % est_time
1208           max_time = est_time
1209         else:
1210           rem_time = "no time estimate"
1211         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1212                      (instance.disks[i].iv_name, perc_done, rem_time))
1213     if done or oneshot:
1214       break
1215
1216     time.sleep(min(60, max_time))
1217
1218   if done:
1219     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1220   return not cumul_degraded
1221
1222
1223 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1224   """Check that mirrors are not degraded.
1225
1226   The ldisk parameter, if True, will change the test from the
1227   is_degraded attribute (which represents overall non-ok status for
1228   the device(s)) to the ldisk (representing the local storage status).
1229
1230   """
1231   cfgw.SetDiskID(dev, node)
1232   if ldisk:
1233     idx = 6
1234   else:
1235     idx = 5
1236
1237   result = True
1238   if on_primary or dev.AssembleOnSecondary():
1239     rstats = rpc.call_blockdev_find(node, dev)
1240     if not rstats:
1241       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1242       result = False
1243     else:
1244       result = result and (not rstats[idx])
1245   if dev.children:
1246     for child in dev.children:
1247       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1248
1249   return result
1250
1251
1252 class LUDiagnoseOS(NoHooksLU):
1253   """Logical unit for OS diagnose/query.
1254
1255   """
1256   _OP_REQP = ["output_fields", "names"]
1257   REQ_BGL = False
1258
1259   def ExpandNames(self):
1260     if self.op.names:
1261       raise errors.OpPrereqError("Selective OS query not supported")
1262
1263     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1264     _CheckOutputFields(static=[],
1265                        dynamic=self.dynamic_fields,
1266                        selected=self.op.output_fields)
1267
1268     # Lock all nodes, in shared mode
1269     self.needed_locks = {}
1270     self.share_locks[locking.LEVEL_NODE] = 1
1271     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1272
1273   def CheckPrereq(self):
1274     """Check prerequisites.
1275
1276     """
1277
1278   @staticmethod
1279   def _DiagnoseByOS(node_list, rlist):
1280     """Remaps a per-node return list into an a per-os per-node dictionary
1281
1282       Args:
1283         node_list: a list with the names of all nodes
1284         rlist: a map with node names as keys and OS objects as values
1285
1286       Returns:
1287         map: a map with osnames as keys and as value another map, with
1288              nodes as
1289              keys and list of OS objects as values
1290              e.g. {"debian-etch": {"node1": [<object>,...],
1291                                    "node2": [<object>,]}
1292                   }
1293
1294     """
1295     all_os = {}
1296     for node_name, nr in rlist.iteritems():
1297       if not nr:
1298         continue
1299       for os_obj in nr:
1300         if os_obj.name not in all_os:
1301           # build a list of nodes for this os containing empty lists
1302           # for each node in node_list
1303           all_os[os_obj.name] = {}
1304           for nname in node_list:
1305             all_os[os_obj.name][nname] = []
1306         all_os[os_obj.name][node_name].append(os_obj)
1307     return all_os
1308
1309   def Exec(self, feedback_fn):
1310     """Compute the list of OSes.
1311
1312     """
1313     node_list = self.acquired_locks[locking.LEVEL_NODE]
1314     node_data = rpc.call_os_diagnose(node_list)
1315     if node_data == False:
1316       raise errors.OpExecError("Can't gather the list of OSes")
1317     pol = self._DiagnoseByOS(node_list, node_data)
1318     output = []
1319     for os_name, os_data in pol.iteritems():
1320       row = []
1321       for field in self.op.output_fields:
1322         if field == "name":
1323           val = os_name
1324         elif field == "valid":
1325           val = utils.all([osl and osl[0] for osl in os_data.values()])
1326         elif field == "node_status":
1327           val = {}
1328           for node_name, nos_list in os_data.iteritems():
1329             val[node_name] = [(v.status, v.path) for v in nos_list]
1330         else:
1331           raise errors.ParameterError(field)
1332         row.append(val)
1333       output.append(row)
1334
1335     return output
1336
1337
1338 class LURemoveNode(LogicalUnit):
1339   """Logical unit for removing a node.
1340
1341   """
1342   HPATH = "node-remove"
1343   HTYPE = constants.HTYPE_NODE
1344   _OP_REQP = ["node_name"]
1345
1346   def BuildHooksEnv(self):
1347     """Build hooks env.
1348
1349     This doesn't run on the target node in the pre phase as a failed
1350     node would then be impossible to remove.
1351
1352     """
1353     env = {
1354       "OP_TARGET": self.op.node_name,
1355       "NODE_NAME": self.op.node_name,
1356       }
1357     all_nodes = self.cfg.GetNodeList()
1358     all_nodes.remove(self.op.node_name)
1359     return env, all_nodes, all_nodes
1360
1361   def CheckPrereq(self):
1362     """Check prerequisites.
1363
1364     This checks:
1365      - the node exists in the configuration
1366      - it does not have primary or secondary instances
1367      - it's not the master
1368
1369     Any errors are signalled by raising errors.OpPrereqError.
1370
1371     """
1372     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1373     if node is None:
1374       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1375
1376     instance_list = self.cfg.GetInstanceList()
1377
1378     masternode = self.sstore.GetMasterNode()
1379     if node.name == masternode:
1380       raise errors.OpPrereqError("Node is the master node,"
1381                                  " you need to failover first.")
1382
1383     for instance_name in instance_list:
1384       instance = self.cfg.GetInstanceInfo(instance_name)
1385       if node.name == instance.primary_node:
1386         raise errors.OpPrereqError("Instance %s still running on the node,"
1387                                    " please remove first." % instance_name)
1388       if node.name in instance.secondary_nodes:
1389         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1390                                    " please remove first." % instance_name)
1391     self.op.node_name = node.name
1392     self.node = node
1393
1394   def Exec(self, feedback_fn):
1395     """Removes the node from the cluster.
1396
1397     """
1398     node = self.node
1399     logger.Info("stopping the node daemon and removing configs from node %s" %
1400                 node.name)
1401
1402     self.context.RemoveNode(node.name)
1403
1404     rpc.call_node_leave_cluster(node.name)
1405
1406
1407 class LUQueryNodes(NoHooksLU):
1408   """Logical unit for querying nodes.
1409
1410   """
1411   _OP_REQP = ["output_fields", "names"]
1412   REQ_BGL = False
1413
1414   def ExpandNames(self):
1415     self.dynamic_fields = frozenset([
1416       "dtotal", "dfree",
1417       "mtotal", "mnode", "mfree",
1418       "bootid",
1419       "ctotal",
1420       ])
1421
1422     self.static_fields = frozenset([
1423       "name", "pinst_cnt", "sinst_cnt",
1424       "pinst_list", "sinst_list",
1425       "pip", "sip", "tags",
1426       "serial_no",
1427       ])
1428
1429     _CheckOutputFields(static=self.static_fields,
1430                        dynamic=self.dynamic_fields,
1431                        selected=self.op.output_fields)
1432
1433     self.needed_locks = {}
1434     self.share_locks[locking.LEVEL_NODE] = 1
1435
1436     if self.op.names:
1437       self.wanted = _GetWantedNodes(self, self.op.names)
1438     else:
1439       self.wanted = locking.ALL_SET
1440
1441     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1442     if self.do_locking:
1443       # if we don't request only static fields, we need to lock the nodes
1444       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1445
1446
1447   def CheckPrereq(self):
1448     """Check prerequisites.
1449
1450     """
1451     # The validation of the node list is done in the _GetWantedNodes,
1452     # if non empty, and if empty, there's no validation to do
1453     pass
1454
1455   def Exec(self, feedback_fn):
1456     """Computes the list of nodes and their attributes.
1457
1458     """
1459     all_info = self.cfg.GetAllNodesInfo()
1460     if self.do_locking:
1461       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1462     elif self.wanted != locking.ALL_SET:
1463       nodenames = self.wanted
1464       missing = set(nodenames).difference(all_info.keys())
1465       if missing:
1466         raise self.OpExecError(
1467           "Some nodes were removed before retrieving their data: %s" % missing)
1468     else:
1469       nodenames = all_info.keys()
1470     nodelist = [all_info[name] for name in nodenames]
1471
1472     # begin data gathering
1473
1474     if self.dynamic_fields.intersection(self.op.output_fields):
1475       live_data = {}
1476       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1477       for name in nodenames:
1478         nodeinfo = node_data.get(name, None)
1479         if nodeinfo:
1480           live_data[name] = {
1481             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1482             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1483             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1484             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1485             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1486             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1487             "bootid": nodeinfo['bootid'],
1488             }
1489         else:
1490           live_data[name] = {}
1491     else:
1492       live_data = dict.fromkeys(nodenames, {})
1493
1494     node_to_primary = dict([(name, set()) for name in nodenames])
1495     node_to_secondary = dict([(name, set()) for name in nodenames])
1496
1497     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1498                              "sinst_cnt", "sinst_list"))
1499     if inst_fields & frozenset(self.op.output_fields):
1500       instancelist = self.cfg.GetInstanceList()
1501
1502       for instance_name in instancelist:
1503         inst = self.cfg.GetInstanceInfo(instance_name)
1504         if inst.primary_node in node_to_primary:
1505           node_to_primary[inst.primary_node].add(inst.name)
1506         for secnode in inst.secondary_nodes:
1507           if secnode in node_to_secondary:
1508             node_to_secondary[secnode].add(inst.name)
1509
1510     # end data gathering
1511
1512     output = []
1513     for node in nodelist:
1514       node_output = []
1515       for field in self.op.output_fields:
1516         if field == "name":
1517           val = node.name
1518         elif field == "pinst_list":
1519           val = list(node_to_primary[node.name])
1520         elif field == "sinst_list":
1521           val = list(node_to_secondary[node.name])
1522         elif field == "pinst_cnt":
1523           val = len(node_to_primary[node.name])
1524         elif field == "sinst_cnt":
1525           val = len(node_to_secondary[node.name])
1526         elif field == "pip":
1527           val = node.primary_ip
1528         elif field == "sip":
1529           val = node.secondary_ip
1530         elif field == "tags":
1531           val = list(node.GetTags())
1532         elif field == "serial_no":
1533           val = node.serial_no
1534         elif field in self.dynamic_fields:
1535           val = live_data[node.name].get(field, None)
1536         else:
1537           raise errors.ParameterError(field)
1538         node_output.append(val)
1539       output.append(node_output)
1540
1541     return output
1542
1543
1544 class LUQueryNodeVolumes(NoHooksLU):
1545   """Logical unit for getting volumes on node(s).
1546
1547   """
1548   _OP_REQP = ["nodes", "output_fields"]
1549   REQ_BGL = False
1550
1551   def ExpandNames(self):
1552     _CheckOutputFields(static=["node"],
1553                        dynamic=["phys", "vg", "name", "size", "instance"],
1554                        selected=self.op.output_fields)
1555
1556     self.needed_locks = {}
1557     self.share_locks[locking.LEVEL_NODE] = 1
1558     if not self.op.nodes:
1559       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1560     else:
1561       self.needed_locks[locking.LEVEL_NODE] = \
1562         _GetWantedNodes(self, self.op.nodes)
1563
1564   def CheckPrereq(self):
1565     """Check prerequisites.
1566
1567     This checks that the fields required are valid output fields.
1568
1569     """
1570     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1571
1572   def Exec(self, feedback_fn):
1573     """Computes the list of nodes and their attributes.
1574
1575     """
1576     nodenames = self.nodes
1577     volumes = rpc.call_node_volumes(nodenames)
1578
1579     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1580              in self.cfg.GetInstanceList()]
1581
1582     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1583
1584     output = []
1585     for node in nodenames:
1586       if node not in volumes or not volumes[node]:
1587         continue
1588
1589       node_vols = volumes[node][:]
1590       node_vols.sort(key=lambda vol: vol['dev'])
1591
1592       for vol in node_vols:
1593         node_output = []
1594         for field in self.op.output_fields:
1595           if field == "node":
1596             val = node
1597           elif field == "phys":
1598             val = vol['dev']
1599           elif field == "vg":
1600             val = vol['vg']
1601           elif field == "name":
1602             val = vol['name']
1603           elif field == "size":
1604             val = int(float(vol['size']))
1605           elif field == "instance":
1606             for inst in ilist:
1607               if node not in lv_by_node[inst]:
1608                 continue
1609               if vol['name'] in lv_by_node[inst][node]:
1610                 val = inst.name
1611                 break
1612             else:
1613               val = '-'
1614           else:
1615             raise errors.ParameterError(field)
1616           node_output.append(str(val))
1617
1618         output.append(node_output)
1619
1620     return output
1621
1622
1623 class LUAddNode(LogicalUnit):
1624   """Logical unit for adding node to the cluster.
1625
1626   """
1627   HPATH = "node-add"
1628   HTYPE = constants.HTYPE_NODE
1629   _OP_REQP = ["node_name"]
1630
1631   def BuildHooksEnv(self):
1632     """Build hooks env.
1633
1634     This will run on all nodes before, and on all nodes + the new node after.
1635
1636     """
1637     env = {
1638       "OP_TARGET": self.op.node_name,
1639       "NODE_NAME": self.op.node_name,
1640       "NODE_PIP": self.op.primary_ip,
1641       "NODE_SIP": self.op.secondary_ip,
1642       }
1643     nodes_0 = self.cfg.GetNodeList()
1644     nodes_1 = nodes_0 + [self.op.node_name, ]
1645     return env, nodes_0, nodes_1
1646
1647   def CheckPrereq(self):
1648     """Check prerequisites.
1649
1650     This checks:
1651      - the new node is not already in the config
1652      - it is resolvable
1653      - its parameters (single/dual homed) matches the cluster
1654
1655     Any errors are signalled by raising errors.OpPrereqError.
1656
1657     """
1658     node_name = self.op.node_name
1659     cfg = self.cfg
1660
1661     dns_data = utils.HostInfo(node_name)
1662
1663     node = dns_data.name
1664     primary_ip = self.op.primary_ip = dns_data.ip
1665     secondary_ip = getattr(self.op, "secondary_ip", None)
1666     if secondary_ip is None:
1667       secondary_ip = primary_ip
1668     if not utils.IsValidIP(secondary_ip):
1669       raise errors.OpPrereqError("Invalid secondary IP given")
1670     self.op.secondary_ip = secondary_ip
1671
1672     node_list = cfg.GetNodeList()
1673     if not self.op.readd and node in node_list:
1674       raise errors.OpPrereqError("Node %s is already in the configuration" %
1675                                  node)
1676     elif self.op.readd and node not in node_list:
1677       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1678
1679     for existing_node_name in node_list:
1680       existing_node = cfg.GetNodeInfo(existing_node_name)
1681
1682       if self.op.readd and node == existing_node_name:
1683         if (existing_node.primary_ip != primary_ip or
1684             existing_node.secondary_ip != secondary_ip):
1685           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1686                                      " address configuration as before")
1687         continue
1688
1689       if (existing_node.primary_ip == primary_ip or
1690           existing_node.secondary_ip == primary_ip or
1691           existing_node.primary_ip == secondary_ip or
1692           existing_node.secondary_ip == secondary_ip):
1693         raise errors.OpPrereqError("New node ip address(es) conflict with"
1694                                    " existing node %s" % existing_node.name)
1695
1696     # check that the type of the node (single versus dual homed) is the
1697     # same as for the master
1698     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1699     master_singlehomed = myself.secondary_ip == myself.primary_ip
1700     newbie_singlehomed = secondary_ip == primary_ip
1701     if master_singlehomed != newbie_singlehomed:
1702       if master_singlehomed:
1703         raise errors.OpPrereqError("The master has no private ip but the"
1704                                    " new node has one")
1705       else:
1706         raise errors.OpPrereqError("The master has a private ip but the"
1707                                    " new node doesn't have one")
1708
1709     # checks reachablity
1710     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1711       raise errors.OpPrereqError("Node not reachable by ping")
1712
1713     if not newbie_singlehomed:
1714       # check reachability from my secondary ip to newbie's secondary ip
1715       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1716                            source=myself.secondary_ip):
1717         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1718                                    " based ping to noded port")
1719
1720     self.new_node = objects.Node(name=node,
1721                                  primary_ip=primary_ip,
1722                                  secondary_ip=secondary_ip)
1723
1724   def Exec(self, feedback_fn):
1725     """Adds the new node to the cluster.
1726
1727     """
1728     new_node = self.new_node
1729     node = new_node.name
1730
1731     # check connectivity
1732     result = rpc.call_version([node])[node]
1733     if result:
1734       if constants.PROTOCOL_VERSION == result:
1735         logger.Info("communication to node %s fine, sw version %s match" %
1736                     (node, result))
1737       else:
1738         raise errors.OpExecError("Version mismatch master version %s,"
1739                                  " node version %s" %
1740                                  (constants.PROTOCOL_VERSION, result))
1741     else:
1742       raise errors.OpExecError("Cannot get version from the new node")
1743
1744     # setup ssh on node
1745     logger.Info("copy ssh key to node %s" % node)
1746     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1747     keyarray = []
1748     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1749                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1750                 priv_key, pub_key]
1751
1752     for i in keyfiles:
1753       f = open(i, 'r')
1754       try:
1755         keyarray.append(f.read())
1756       finally:
1757         f.close()
1758
1759     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1760                                keyarray[3], keyarray[4], keyarray[5])
1761
1762     if not result:
1763       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1764
1765     # Add node to our /etc/hosts, and add key to known_hosts
1766     utils.AddHostToEtcHosts(new_node.name)
1767
1768     if new_node.secondary_ip != new_node.primary_ip:
1769       if not rpc.call_node_tcp_ping(new_node.name,
1770                                     constants.LOCALHOST_IP_ADDRESS,
1771                                     new_node.secondary_ip,
1772                                     constants.DEFAULT_NODED_PORT,
1773                                     10, False):
1774         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1775                                  " you gave (%s). Please fix and re-run this"
1776                                  " command." % new_node.secondary_ip)
1777
1778     node_verify_list = [self.sstore.GetMasterNode()]
1779     node_verify_param = {
1780       'nodelist': [node],
1781       # TODO: do a node-net-test as well?
1782     }
1783
1784     result = rpc.call_node_verify(node_verify_list, node_verify_param)
1785     for verifier in node_verify_list:
1786       if not result[verifier]:
1787         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1788                                  " for remote verification" % verifier)
1789       if result[verifier]['nodelist']:
1790         for failed in result[verifier]['nodelist']:
1791           feedback_fn("ssh/hostname verification failed %s -> %s" %
1792                       (verifier, result[verifier]['nodelist'][failed]))
1793         raise errors.OpExecError("ssh/hostname verification failed.")
1794
1795     # Distribute updated /etc/hosts and known_hosts to all nodes,
1796     # including the node just added
1797     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1798     dist_nodes = self.cfg.GetNodeList()
1799     if not self.op.readd:
1800       dist_nodes.append(node)
1801     if myself.name in dist_nodes:
1802       dist_nodes.remove(myself.name)
1803
1804     logger.Debug("Copying hosts and known_hosts to all nodes")
1805     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1806       result = rpc.call_upload_file(dist_nodes, fname)
1807       for to_node in dist_nodes:
1808         if not result[to_node]:
1809           logger.Error("copy of file %s to node %s failed" %
1810                        (fname, to_node))
1811
1812     to_copy = self.sstore.GetFileList()
1813     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1814       to_copy.append(constants.VNC_PASSWORD_FILE)
1815     for fname in to_copy:
1816       result = rpc.call_upload_file([node], fname)
1817       if not result[node]:
1818         logger.Error("could not copy file %s to node %s" % (fname, node))
1819
1820     if self.op.readd:
1821       self.context.ReaddNode(new_node)
1822     else:
1823       self.context.AddNode(new_node)
1824
1825
1826 class LUQueryClusterInfo(NoHooksLU):
1827   """Query cluster configuration.
1828
1829   """
1830   _OP_REQP = []
1831   REQ_MASTER = False
1832   REQ_BGL = False
1833
1834   def ExpandNames(self):
1835     self.needed_locks = {}
1836
1837   def CheckPrereq(self):
1838     """No prerequsites needed for this LU.
1839
1840     """
1841     pass
1842
1843   def Exec(self, feedback_fn):
1844     """Return cluster config.
1845
1846     """
1847     result = {
1848       "name": self.sstore.GetClusterName(),
1849       "software_version": constants.RELEASE_VERSION,
1850       "protocol_version": constants.PROTOCOL_VERSION,
1851       "config_version": constants.CONFIG_VERSION,
1852       "os_api_version": constants.OS_API_VERSION,
1853       "export_version": constants.EXPORT_VERSION,
1854       "master": self.sstore.GetMasterNode(),
1855       "architecture": (platform.architecture()[0], platform.machine()),
1856       "hypervisor_type": self.sstore.GetHypervisorType(),
1857       }
1858
1859     return result
1860
1861
1862 class LUDumpClusterConfig(NoHooksLU):
1863   """Return a text-representation of the cluster-config.
1864
1865   """
1866   _OP_REQP = []
1867   REQ_BGL = False
1868
1869   def ExpandNames(self):
1870     self.needed_locks = {}
1871
1872   def CheckPrereq(self):
1873     """No prerequisites.
1874
1875     """
1876     pass
1877
1878   def Exec(self, feedback_fn):
1879     """Dump a representation of the cluster config to the standard output.
1880
1881     """
1882     return self.cfg.DumpConfig()
1883
1884
1885 class LUActivateInstanceDisks(NoHooksLU):
1886   """Bring up an instance's disks.
1887
1888   """
1889   _OP_REQP = ["instance_name"]
1890   REQ_BGL = False
1891
1892   def ExpandNames(self):
1893     self._ExpandAndLockInstance()
1894     self.needed_locks[locking.LEVEL_NODE] = []
1895     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1896
1897   def DeclareLocks(self, level):
1898     if level == locking.LEVEL_NODE:
1899       self._LockInstancesNodes()
1900
1901   def CheckPrereq(self):
1902     """Check prerequisites.
1903
1904     This checks that the instance is in the cluster.
1905
1906     """
1907     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1908     assert self.instance is not None, \
1909       "Cannot retrieve locked instance %s" % self.op.instance_name
1910
1911   def Exec(self, feedback_fn):
1912     """Activate the disks.
1913
1914     """
1915     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1916     if not disks_ok:
1917       raise errors.OpExecError("Cannot activate block devices")
1918
1919     return disks_info
1920
1921
1922 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1923   """Prepare the block devices for an instance.
1924
1925   This sets up the block devices on all nodes.
1926
1927   Args:
1928     instance: a ganeti.objects.Instance object
1929     ignore_secondaries: if true, errors on secondary nodes won't result
1930                         in an error return from the function
1931
1932   Returns:
1933     false if the operation failed
1934     list of (host, instance_visible_name, node_visible_name) if the operation
1935          suceeded with the mapping from node devices to instance devices
1936   """
1937   device_info = []
1938   disks_ok = True
1939   iname = instance.name
1940   # With the two passes mechanism we try to reduce the window of
1941   # opportunity for the race condition of switching DRBD to primary
1942   # before handshaking occured, but we do not eliminate it
1943
1944   # The proper fix would be to wait (with some limits) until the
1945   # connection has been made and drbd transitions from WFConnection
1946   # into any other network-connected state (Connected, SyncTarget,
1947   # SyncSource, etc.)
1948
1949   # 1st pass, assemble on all nodes in secondary mode
1950   for inst_disk in instance.disks:
1951     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1952       cfg.SetDiskID(node_disk, node)
1953       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1954       if not result:
1955         logger.Error("could not prepare block device %s on node %s"
1956                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1957         if not ignore_secondaries:
1958           disks_ok = False
1959
1960   # FIXME: race condition on drbd migration to primary
1961
1962   # 2nd pass, do only the primary node
1963   for inst_disk in instance.disks:
1964     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1965       if node != instance.primary_node:
1966         continue
1967       cfg.SetDiskID(node_disk, node)
1968       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1969       if not result:
1970         logger.Error("could not prepare block device %s on node %s"
1971                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1972         disks_ok = False
1973     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1974
1975   # leave the disks configured for the primary node
1976   # this is a workaround that would be fixed better by
1977   # improving the logical/physical id handling
1978   for disk in instance.disks:
1979     cfg.SetDiskID(disk, instance.primary_node)
1980
1981   return disks_ok, device_info
1982
1983
1984 def _StartInstanceDisks(cfg, instance, force):
1985   """Start the disks of an instance.
1986
1987   """
1988   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1989                                            ignore_secondaries=force)
1990   if not disks_ok:
1991     _ShutdownInstanceDisks(instance, cfg)
1992     if force is not None and not force:
1993       logger.Error("If the message above refers to a secondary node,"
1994                    " you can retry the operation using '--force'.")
1995     raise errors.OpExecError("Disk consistency error")
1996
1997
1998 class LUDeactivateInstanceDisks(NoHooksLU):
1999   """Shutdown an instance's disks.
2000
2001   """
2002   _OP_REQP = ["instance_name"]
2003   REQ_BGL = False
2004
2005   def ExpandNames(self):
2006     self._ExpandAndLockInstance()
2007     self.needed_locks[locking.LEVEL_NODE] = []
2008     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2009
2010   def DeclareLocks(self, level):
2011     if level == locking.LEVEL_NODE:
2012       self._LockInstancesNodes()
2013
2014   def CheckPrereq(self):
2015     """Check prerequisites.
2016
2017     This checks that the instance is in the cluster.
2018
2019     """
2020     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2021     assert self.instance is not None, \
2022       "Cannot retrieve locked instance %s" % self.op.instance_name
2023
2024   def Exec(self, feedback_fn):
2025     """Deactivate the disks
2026
2027     """
2028     instance = self.instance
2029     _SafeShutdownInstanceDisks(instance, self.cfg)
2030
2031
2032 def _SafeShutdownInstanceDisks(instance, cfg):
2033   """Shutdown block devices of an instance.
2034
2035   This function checks if an instance is running, before calling
2036   _ShutdownInstanceDisks.
2037
2038   """
2039   ins_l = rpc.call_instance_list([instance.primary_node])
2040   ins_l = ins_l[instance.primary_node]
2041   if not type(ins_l) is list:
2042     raise errors.OpExecError("Can't contact node '%s'" %
2043                              instance.primary_node)
2044
2045   if instance.name in ins_l:
2046     raise errors.OpExecError("Instance is running, can't shutdown"
2047                              " block devices.")
2048
2049   _ShutdownInstanceDisks(instance, cfg)
2050
2051
2052 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2053   """Shutdown block devices of an instance.
2054
2055   This does the shutdown on all nodes of the instance.
2056
2057   If the ignore_primary is false, errors on the primary node are
2058   ignored.
2059
2060   """
2061   result = True
2062   for disk in instance.disks:
2063     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2064       cfg.SetDiskID(top_disk, node)
2065       if not rpc.call_blockdev_shutdown(node, top_disk):
2066         logger.Error("could not shutdown block device %s on node %s" %
2067                      (disk.iv_name, node))
2068         if not ignore_primary or node != instance.primary_node:
2069           result = False
2070   return result
2071
2072
2073 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2074   """Checks if a node has enough free memory.
2075
2076   This function check if a given node has the needed amount of free
2077   memory. In case the node has less memory or we cannot get the
2078   information from the node, this function raise an OpPrereqError
2079   exception.
2080
2081   Args:
2082     - cfg: a ConfigWriter instance
2083     - node: the node name
2084     - reason: string to use in the error message
2085     - requested: the amount of memory in MiB
2086
2087   """
2088   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2089   if not nodeinfo or not isinstance(nodeinfo, dict):
2090     raise errors.OpPrereqError("Could not contact node %s for resource"
2091                              " information" % (node,))
2092
2093   free_mem = nodeinfo[node].get('memory_free')
2094   if not isinstance(free_mem, int):
2095     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2096                              " was '%s'" % (node, free_mem))
2097   if requested > free_mem:
2098     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2099                              " needed %s MiB, available %s MiB" %
2100                              (node, reason, requested, free_mem))
2101
2102
2103 class LUStartupInstance(LogicalUnit):
2104   """Starts an instance.
2105
2106   """
2107   HPATH = "instance-start"
2108   HTYPE = constants.HTYPE_INSTANCE
2109   _OP_REQP = ["instance_name", "force"]
2110   REQ_BGL = False
2111
2112   def ExpandNames(self):
2113     self._ExpandAndLockInstance()
2114     self.needed_locks[locking.LEVEL_NODE] = []
2115     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2116
2117   def DeclareLocks(self, level):
2118     if level == locking.LEVEL_NODE:
2119       self._LockInstancesNodes()
2120
2121   def BuildHooksEnv(self):
2122     """Build hooks env.
2123
2124     This runs on master, primary and secondary nodes of the instance.
2125
2126     """
2127     env = {
2128       "FORCE": self.op.force,
2129       }
2130     env.update(_BuildInstanceHookEnvByObject(self.instance))
2131     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2132           list(self.instance.secondary_nodes))
2133     return env, nl, nl
2134
2135   def CheckPrereq(self):
2136     """Check prerequisites.
2137
2138     This checks that the instance is in the cluster.
2139
2140     """
2141     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2142     assert self.instance is not None, \
2143       "Cannot retrieve locked instance %s" % self.op.instance_name
2144
2145     # check bridges existance
2146     _CheckInstanceBridgesExist(instance)
2147
2148     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2149                          "starting instance %s" % instance.name,
2150                          instance.memory)
2151
2152   def Exec(self, feedback_fn):
2153     """Start the instance.
2154
2155     """
2156     instance = self.instance
2157     force = self.op.force
2158     extra_args = getattr(self.op, "extra_args", "")
2159
2160     self.cfg.MarkInstanceUp(instance.name)
2161
2162     node_current = instance.primary_node
2163
2164     _StartInstanceDisks(self.cfg, instance, force)
2165
2166     if not rpc.call_instance_start(node_current, instance, extra_args):
2167       _ShutdownInstanceDisks(instance, self.cfg)
2168       raise errors.OpExecError("Could not start instance")
2169
2170
2171 class LURebootInstance(LogicalUnit):
2172   """Reboot an instance.
2173
2174   """
2175   HPATH = "instance-reboot"
2176   HTYPE = constants.HTYPE_INSTANCE
2177   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2178   REQ_BGL = False
2179
2180   def ExpandNames(self):
2181     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2182                                    constants.INSTANCE_REBOOT_HARD,
2183                                    constants.INSTANCE_REBOOT_FULL]:
2184       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2185                                   (constants.INSTANCE_REBOOT_SOFT,
2186                                    constants.INSTANCE_REBOOT_HARD,
2187                                    constants.INSTANCE_REBOOT_FULL))
2188     self._ExpandAndLockInstance()
2189     self.needed_locks[locking.LEVEL_NODE] = []
2190     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2191
2192   def DeclareLocks(self, level):
2193     if level == locking.LEVEL_NODE:
2194       primary_only = not constants.INSTANCE_REBOOT_FULL
2195       self._LockInstancesNodes(primary_only=primary_only)
2196
2197   def BuildHooksEnv(self):
2198     """Build hooks env.
2199
2200     This runs on master, primary and secondary nodes of the instance.
2201
2202     """
2203     env = {
2204       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2205       }
2206     env.update(_BuildInstanceHookEnvByObject(self.instance))
2207     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2208           list(self.instance.secondary_nodes))
2209     return env, nl, nl
2210
2211   def CheckPrereq(self):
2212     """Check prerequisites.
2213
2214     This checks that the instance is in the cluster.
2215
2216     """
2217     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2218     assert self.instance is not None, \
2219       "Cannot retrieve locked instance %s" % self.op.instance_name
2220
2221     # check bridges existance
2222     _CheckInstanceBridgesExist(instance)
2223
2224   def Exec(self, feedback_fn):
2225     """Reboot the instance.
2226
2227     """
2228     instance = self.instance
2229     ignore_secondaries = self.op.ignore_secondaries
2230     reboot_type = self.op.reboot_type
2231     extra_args = getattr(self.op, "extra_args", "")
2232
2233     node_current = instance.primary_node
2234
2235     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2236                        constants.INSTANCE_REBOOT_HARD]:
2237       if not rpc.call_instance_reboot(node_current, instance,
2238                                       reboot_type, extra_args):
2239         raise errors.OpExecError("Could not reboot instance")
2240     else:
2241       if not rpc.call_instance_shutdown(node_current, instance):
2242         raise errors.OpExecError("could not shutdown instance for full reboot")
2243       _ShutdownInstanceDisks(instance, self.cfg)
2244       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2245       if not rpc.call_instance_start(node_current, instance, extra_args):
2246         _ShutdownInstanceDisks(instance, self.cfg)
2247         raise errors.OpExecError("Could not start instance for full reboot")
2248
2249     self.cfg.MarkInstanceUp(instance.name)
2250
2251
2252 class LUShutdownInstance(LogicalUnit):
2253   """Shutdown an instance.
2254
2255   """
2256   HPATH = "instance-stop"
2257   HTYPE = constants.HTYPE_INSTANCE
2258   _OP_REQP = ["instance_name"]
2259   REQ_BGL = False
2260
2261   def ExpandNames(self):
2262     self._ExpandAndLockInstance()
2263     self.needed_locks[locking.LEVEL_NODE] = []
2264     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2265
2266   def DeclareLocks(self, level):
2267     if level == locking.LEVEL_NODE:
2268       self._LockInstancesNodes()
2269
2270   def BuildHooksEnv(self):
2271     """Build hooks env.
2272
2273     This runs on master, primary and secondary nodes of the instance.
2274
2275     """
2276     env = _BuildInstanceHookEnvByObject(self.instance)
2277     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2278           list(self.instance.secondary_nodes))
2279     return env, nl, nl
2280
2281   def CheckPrereq(self):
2282     """Check prerequisites.
2283
2284     This checks that the instance is in the cluster.
2285
2286     """
2287     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2288     assert self.instance is not None, \
2289       "Cannot retrieve locked instance %s" % self.op.instance_name
2290
2291   def Exec(self, feedback_fn):
2292     """Shutdown the instance.
2293
2294     """
2295     instance = self.instance
2296     node_current = instance.primary_node
2297     self.cfg.MarkInstanceDown(instance.name)
2298     if not rpc.call_instance_shutdown(node_current, instance):
2299       logger.Error("could not shutdown instance")
2300
2301     _ShutdownInstanceDisks(instance, self.cfg)
2302
2303
2304 class LUReinstallInstance(LogicalUnit):
2305   """Reinstall an instance.
2306
2307   """
2308   HPATH = "instance-reinstall"
2309   HTYPE = constants.HTYPE_INSTANCE
2310   _OP_REQP = ["instance_name"]
2311   REQ_BGL = False
2312
2313   def ExpandNames(self):
2314     self._ExpandAndLockInstance()
2315     self.needed_locks[locking.LEVEL_NODE] = []
2316     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2317
2318   def DeclareLocks(self, level):
2319     if level == locking.LEVEL_NODE:
2320       self._LockInstancesNodes()
2321
2322   def BuildHooksEnv(self):
2323     """Build hooks env.
2324
2325     This runs on master, primary and secondary nodes of the instance.
2326
2327     """
2328     env = _BuildInstanceHookEnvByObject(self.instance)
2329     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2330           list(self.instance.secondary_nodes))
2331     return env, nl, nl
2332
2333   def CheckPrereq(self):
2334     """Check prerequisites.
2335
2336     This checks that the instance is in the cluster and is not running.
2337
2338     """
2339     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2340     assert instance is not None, \
2341       "Cannot retrieve locked instance %s" % self.op.instance_name
2342
2343     if instance.disk_template == constants.DT_DISKLESS:
2344       raise errors.OpPrereqError("Instance '%s' has no disks" %
2345                                  self.op.instance_name)
2346     if instance.status != "down":
2347       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2348                                  self.op.instance_name)
2349     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2350     if remote_info:
2351       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2352                                  (self.op.instance_name,
2353                                   instance.primary_node))
2354
2355     self.op.os_type = getattr(self.op, "os_type", None)
2356     if self.op.os_type is not None:
2357       # OS verification
2358       pnode = self.cfg.GetNodeInfo(
2359         self.cfg.ExpandNodeName(instance.primary_node))
2360       if pnode is None:
2361         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2362                                    self.op.pnode)
2363       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2364       if not os_obj:
2365         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2366                                    " primary node"  % self.op.os_type)
2367
2368     self.instance = instance
2369
2370   def Exec(self, feedback_fn):
2371     """Reinstall the instance.
2372
2373     """
2374     inst = self.instance
2375
2376     if self.op.os_type is not None:
2377       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2378       inst.os = self.op.os_type
2379       self.cfg.Update(inst)
2380
2381     _StartInstanceDisks(self.cfg, inst, None)
2382     try:
2383       feedback_fn("Running the instance OS create scripts...")
2384       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2385         raise errors.OpExecError("Could not install OS for instance %s"
2386                                  " on node %s" %
2387                                  (inst.name, inst.primary_node))
2388     finally:
2389       _ShutdownInstanceDisks(inst, self.cfg)
2390
2391
2392 class LURenameInstance(LogicalUnit):
2393   """Rename an instance.
2394
2395   """
2396   HPATH = "instance-rename"
2397   HTYPE = constants.HTYPE_INSTANCE
2398   _OP_REQP = ["instance_name", "new_name"]
2399
2400   def BuildHooksEnv(self):
2401     """Build hooks env.
2402
2403     This runs on master, primary and secondary nodes of the instance.
2404
2405     """
2406     env = _BuildInstanceHookEnvByObject(self.instance)
2407     env["INSTANCE_NEW_NAME"] = self.op.new_name
2408     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2409           list(self.instance.secondary_nodes))
2410     return env, nl, nl
2411
2412   def CheckPrereq(self):
2413     """Check prerequisites.
2414
2415     This checks that the instance is in the cluster and is not running.
2416
2417     """
2418     instance = self.cfg.GetInstanceInfo(
2419       self.cfg.ExpandInstanceName(self.op.instance_name))
2420     if instance is None:
2421       raise errors.OpPrereqError("Instance '%s' not known" %
2422                                  self.op.instance_name)
2423     if instance.status != "down":
2424       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2425                                  self.op.instance_name)
2426     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2427     if remote_info:
2428       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2429                                  (self.op.instance_name,
2430                                   instance.primary_node))
2431     self.instance = instance
2432
2433     # new name verification
2434     name_info = utils.HostInfo(self.op.new_name)
2435
2436     self.op.new_name = new_name = name_info.name
2437     instance_list = self.cfg.GetInstanceList()
2438     if new_name in instance_list:
2439       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2440                                  new_name)
2441
2442     if not getattr(self.op, "ignore_ip", False):
2443       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2444         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2445                                    (name_info.ip, new_name))
2446
2447
2448   def Exec(self, feedback_fn):
2449     """Reinstall the instance.
2450
2451     """
2452     inst = self.instance
2453     old_name = inst.name
2454
2455     if inst.disk_template == constants.DT_FILE:
2456       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2457
2458     self.cfg.RenameInstance(inst.name, self.op.new_name)
2459     # Change the instance lock. This is definitely safe while we hold the BGL
2460     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2461     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2462
2463     # re-read the instance from the configuration after rename
2464     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2465
2466     if inst.disk_template == constants.DT_FILE:
2467       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2468       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2469                                                 old_file_storage_dir,
2470                                                 new_file_storage_dir)
2471
2472       if not result:
2473         raise errors.OpExecError("Could not connect to node '%s' to rename"
2474                                  " directory '%s' to '%s' (but the instance"
2475                                  " has been renamed in Ganeti)" % (
2476                                  inst.primary_node, old_file_storage_dir,
2477                                  new_file_storage_dir))
2478
2479       if not result[0]:
2480         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2481                                  " (but the instance has been renamed in"
2482                                  " Ganeti)" % (old_file_storage_dir,
2483                                                new_file_storage_dir))
2484
2485     _StartInstanceDisks(self.cfg, inst, None)
2486     try:
2487       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2488                                           "sda", "sdb"):
2489         msg = ("Could not run OS rename script for instance %s on node %s"
2490                " (but the instance has been renamed in Ganeti)" %
2491                (inst.name, inst.primary_node))
2492         logger.Error(msg)
2493     finally:
2494       _ShutdownInstanceDisks(inst, self.cfg)
2495
2496
2497 class LURemoveInstance(LogicalUnit):
2498   """Remove an instance.
2499
2500   """
2501   HPATH = "instance-remove"
2502   HTYPE = constants.HTYPE_INSTANCE
2503   _OP_REQP = ["instance_name", "ignore_failures"]
2504   REQ_BGL = False
2505
2506   def ExpandNames(self):
2507     self._ExpandAndLockInstance()
2508     self.needed_locks[locking.LEVEL_NODE] = []
2509     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2510
2511   def DeclareLocks(self, level):
2512     if level == locking.LEVEL_NODE:
2513       self._LockInstancesNodes()
2514
2515   def BuildHooksEnv(self):
2516     """Build hooks env.
2517
2518     This runs on master, primary and secondary nodes of the instance.
2519
2520     """
2521     env = _BuildInstanceHookEnvByObject(self.instance)
2522     nl = [self.sstore.GetMasterNode()]
2523     return env, nl, nl
2524
2525   def CheckPrereq(self):
2526     """Check prerequisites.
2527
2528     This checks that the instance is in the cluster.
2529
2530     """
2531     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2532     assert self.instance is not None, \
2533       "Cannot retrieve locked instance %s" % self.op.instance_name
2534
2535   def Exec(self, feedback_fn):
2536     """Remove the instance.
2537
2538     """
2539     instance = self.instance
2540     logger.Info("shutting down instance %s on node %s" %
2541                 (instance.name, instance.primary_node))
2542
2543     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2544       if self.op.ignore_failures:
2545         feedback_fn("Warning: can't shutdown instance")
2546       else:
2547         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2548                                  (instance.name, instance.primary_node))
2549
2550     logger.Info("removing block devices for instance %s" % instance.name)
2551
2552     if not _RemoveDisks(instance, self.cfg):
2553       if self.op.ignore_failures:
2554         feedback_fn("Warning: can't remove instance's disks")
2555       else:
2556         raise errors.OpExecError("Can't remove instance's disks")
2557
2558     logger.Info("removing instance %s out of cluster config" % instance.name)
2559
2560     self.cfg.RemoveInstance(instance.name)
2561     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2562
2563
2564 class LUQueryInstances(NoHooksLU):
2565   """Logical unit for querying instances.
2566
2567   """
2568   _OP_REQP = ["output_fields", "names"]
2569   REQ_BGL = False
2570
2571   def ExpandNames(self):
2572     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2573     self.static_fields = frozenset([
2574       "name", "os", "pnode", "snodes",
2575       "admin_state", "admin_ram",
2576       "disk_template", "ip", "mac", "bridge",
2577       "sda_size", "sdb_size", "vcpus", "tags",
2578       "network_port", "kernel_path", "initrd_path",
2579       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2580       "hvm_cdrom_image_path", "hvm_nic_type",
2581       "hvm_disk_type", "vnc_bind_address",
2582       "serial_no",
2583       ])
2584     _CheckOutputFields(static=self.static_fields,
2585                        dynamic=self.dynamic_fields,
2586                        selected=self.op.output_fields)
2587
2588     self.needed_locks = {}
2589     self.share_locks[locking.LEVEL_INSTANCE] = 1
2590     self.share_locks[locking.LEVEL_NODE] = 1
2591
2592     if self.op.names:
2593       self.wanted = _GetWantedInstances(self, self.op.names)
2594     else:
2595       self.wanted = locking.ALL_SET
2596
2597     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2598     if self.do_locking:
2599       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2600       self.needed_locks[locking.LEVEL_NODE] = []
2601       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2602
2603   def DeclareLocks(self, level):
2604     if level == locking.LEVEL_NODE and self.do_locking:
2605       self._LockInstancesNodes()
2606
2607   def CheckPrereq(self):
2608     """Check prerequisites.
2609
2610     """
2611     pass
2612
2613   def Exec(self, feedback_fn):
2614     """Computes the list of nodes and their attributes.
2615
2616     """
2617     all_info = self.cfg.GetAllInstancesInfo()
2618     if self.do_locking:
2619       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2620     elif self.wanted != locking.ALL_SET:
2621       instance_names = self.wanted
2622       missing = set(instance_names).difference(all_info.keys())
2623       if missing:
2624         raise self.OpExecError(
2625           "Some instances were removed before retrieving their data: %s"
2626           % missing)
2627     else:
2628       instance_names = all_info.keys()
2629     instance_list = [all_info[iname] for iname in instance_names]
2630
2631     # begin data gathering
2632
2633     nodes = frozenset([inst.primary_node for inst in instance_list])
2634
2635     bad_nodes = []
2636     if self.dynamic_fields.intersection(self.op.output_fields):
2637       live_data = {}
2638       node_data = rpc.call_all_instances_info(nodes)
2639       for name in nodes:
2640         result = node_data[name]
2641         if result:
2642           live_data.update(result)
2643         elif result == False:
2644           bad_nodes.append(name)
2645         # else no instance is alive
2646     else:
2647       live_data = dict([(name, {}) for name in instance_names])
2648
2649     # end data gathering
2650
2651     output = []
2652     for instance in instance_list:
2653       iout = []
2654       for field in self.op.output_fields:
2655         if field == "name":
2656           val = instance.name
2657         elif field == "os":
2658           val = instance.os
2659         elif field == "pnode":
2660           val = instance.primary_node
2661         elif field == "snodes":
2662           val = list(instance.secondary_nodes)
2663         elif field == "admin_state":
2664           val = (instance.status != "down")
2665         elif field == "oper_state":
2666           if instance.primary_node in bad_nodes:
2667             val = None
2668           else:
2669             val = bool(live_data.get(instance.name))
2670         elif field == "status":
2671           if instance.primary_node in bad_nodes:
2672             val = "ERROR_nodedown"
2673           else:
2674             running = bool(live_data.get(instance.name))
2675             if running:
2676               if instance.status != "down":
2677                 val = "running"
2678               else:
2679                 val = "ERROR_up"
2680             else:
2681               if instance.status != "down":
2682                 val = "ERROR_down"
2683               else:
2684                 val = "ADMIN_down"
2685         elif field == "admin_ram":
2686           val = instance.memory
2687         elif field == "oper_ram":
2688           if instance.primary_node in bad_nodes:
2689             val = None
2690           elif instance.name in live_data:
2691             val = live_data[instance.name].get("memory", "?")
2692           else:
2693             val = "-"
2694         elif field == "disk_template":
2695           val = instance.disk_template
2696         elif field == "ip":
2697           val = instance.nics[0].ip
2698         elif field == "bridge":
2699           val = instance.nics[0].bridge
2700         elif field == "mac":
2701           val = instance.nics[0].mac
2702         elif field == "sda_size" or field == "sdb_size":
2703           disk = instance.FindDisk(field[:3])
2704           if disk is None:
2705             val = None
2706           else:
2707             val = disk.size
2708         elif field == "vcpus":
2709           val = instance.vcpus
2710         elif field == "tags":
2711           val = list(instance.GetTags())
2712         elif field == "serial_no":
2713           val = instance.serial_no
2714         elif field in ("network_port", "kernel_path", "initrd_path",
2715                        "hvm_boot_order", "hvm_acpi", "hvm_pae",
2716                        "hvm_cdrom_image_path", "hvm_nic_type",
2717                        "hvm_disk_type", "vnc_bind_address"):
2718           val = getattr(instance, field, None)
2719           if val is not None:
2720             pass
2721           elif field in ("hvm_nic_type", "hvm_disk_type",
2722                          "kernel_path", "initrd_path"):
2723             val = "default"
2724           else:
2725             val = "-"
2726         else:
2727           raise errors.ParameterError(field)
2728         iout.append(val)
2729       output.append(iout)
2730
2731     return output
2732
2733
2734 class LUFailoverInstance(LogicalUnit):
2735   """Failover an instance.
2736
2737   """
2738   HPATH = "instance-failover"
2739   HTYPE = constants.HTYPE_INSTANCE
2740   _OP_REQP = ["instance_name", "ignore_consistency"]
2741   REQ_BGL = False
2742
2743   def ExpandNames(self):
2744     self._ExpandAndLockInstance()
2745     self.needed_locks[locking.LEVEL_NODE] = []
2746     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2747
2748   def DeclareLocks(self, level):
2749     if level == locking.LEVEL_NODE:
2750       self._LockInstancesNodes()
2751
2752   def BuildHooksEnv(self):
2753     """Build hooks env.
2754
2755     This runs on master, primary and secondary nodes of the instance.
2756
2757     """
2758     env = {
2759       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2760       }
2761     env.update(_BuildInstanceHookEnvByObject(self.instance))
2762     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2763     return env, nl, nl
2764
2765   def CheckPrereq(self):
2766     """Check prerequisites.
2767
2768     This checks that the instance is in the cluster.
2769
2770     """
2771     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2772     assert self.instance is not None, \
2773       "Cannot retrieve locked instance %s" % self.op.instance_name
2774
2775     if instance.disk_template not in constants.DTS_NET_MIRROR:
2776       raise errors.OpPrereqError("Instance's disk layout is not"
2777                                  " network mirrored, cannot failover.")
2778
2779     secondary_nodes = instance.secondary_nodes
2780     if not secondary_nodes:
2781       raise errors.ProgrammerError("no secondary node but using "
2782                                    "a mirrored disk template")
2783
2784     target_node = secondary_nodes[0]
2785     # check memory requirements on the secondary node
2786     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2787                          instance.name, instance.memory)
2788
2789     # check bridge existance
2790     brlist = [nic.bridge for nic in instance.nics]
2791     if not rpc.call_bridges_exist(target_node, brlist):
2792       raise errors.OpPrereqError("One or more target bridges %s does not"
2793                                  " exist on destination node '%s'" %
2794                                  (brlist, target_node))
2795
2796   def Exec(self, feedback_fn):
2797     """Failover an instance.
2798
2799     The failover is done by shutting it down on its present node and
2800     starting it on the secondary.
2801
2802     """
2803     instance = self.instance
2804
2805     source_node = instance.primary_node
2806     target_node = instance.secondary_nodes[0]
2807
2808     feedback_fn("* checking disk consistency between source and target")
2809     for dev in instance.disks:
2810       # for drbd, these are drbd over lvm
2811       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2812         if instance.status == "up" and not self.op.ignore_consistency:
2813           raise errors.OpExecError("Disk %s is degraded on target node,"
2814                                    " aborting failover." % dev.iv_name)
2815
2816     feedback_fn("* shutting down instance on source node")
2817     logger.Info("Shutting down instance %s on node %s" %
2818                 (instance.name, source_node))
2819
2820     if not rpc.call_instance_shutdown(source_node, instance):
2821       if self.op.ignore_consistency:
2822         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2823                      " anyway. Please make sure node %s is down"  %
2824                      (instance.name, source_node, source_node))
2825       else:
2826         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2827                                  (instance.name, source_node))
2828
2829     feedback_fn("* deactivating the instance's disks on source node")
2830     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2831       raise errors.OpExecError("Can't shut down the instance's disks.")
2832
2833     instance.primary_node = target_node
2834     # distribute new instance config to the other nodes
2835     self.cfg.Update(instance)
2836
2837     # Only start the instance if it's marked as up
2838     if instance.status == "up":
2839       feedback_fn("* activating the instance's disks on target node")
2840       logger.Info("Starting instance %s on node %s" %
2841                   (instance.name, target_node))
2842
2843       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2844                                                ignore_secondaries=True)
2845       if not disks_ok:
2846         _ShutdownInstanceDisks(instance, self.cfg)
2847         raise errors.OpExecError("Can't activate the instance's disks")
2848
2849       feedback_fn("* starting the instance on the target node")
2850       if not rpc.call_instance_start(target_node, instance, None):
2851         _ShutdownInstanceDisks(instance, self.cfg)
2852         raise errors.OpExecError("Could not start instance %s on node %s." %
2853                                  (instance.name, target_node))
2854
2855
2856 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2857   """Create a tree of block devices on the primary node.
2858
2859   This always creates all devices.
2860
2861   """
2862   if device.children:
2863     for child in device.children:
2864       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2865         return False
2866
2867   cfg.SetDiskID(device, node)
2868   new_id = rpc.call_blockdev_create(node, device, device.size,
2869                                     instance.name, True, info)
2870   if not new_id:
2871     return False
2872   if device.physical_id is None:
2873     device.physical_id = new_id
2874   return True
2875
2876
2877 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2878   """Create a tree of block devices on a secondary node.
2879
2880   If this device type has to be created on secondaries, create it and
2881   all its children.
2882
2883   If not, just recurse to children keeping the same 'force' value.
2884
2885   """
2886   if device.CreateOnSecondary():
2887     force = True
2888   if device.children:
2889     for child in device.children:
2890       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2891                                         child, force, info):
2892         return False
2893
2894   if not force:
2895     return True
2896   cfg.SetDiskID(device, node)
2897   new_id = rpc.call_blockdev_create(node, device, device.size,
2898                                     instance.name, False, info)
2899   if not new_id:
2900     return False
2901   if device.physical_id is None:
2902     device.physical_id = new_id
2903   return True
2904
2905
2906 def _GenerateUniqueNames(cfg, exts):
2907   """Generate a suitable LV name.
2908
2909   This will generate a logical volume name for the given instance.
2910
2911   """
2912   results = []
2913   for val in exts:
2914     new_id = cfg.GenerateUniqueID()
2915     results.append("%s%s" % (new_id, val))
2916   return results
2917
2918
2919 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name,
2920                          p_minor, s_minor):
2921   """Generate a drbd8 device complete with its children.
2922
2923   """
2924   port = cfg.AllocatePort()
2925   vgname = cfg.GetVGName()
2926   shared_secret = cfg.GenerateDRBDSecret()
2927   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2928                           logical_id=(vgname, names[0]))
2929   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2930                           logical_id=(vgname, names[1]))
2931   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2932                           logical_id=(primary, secondary, port,
2933                                       p_minor, s_minor,
2934                                       shared_secret),
2935                           children=[dev_data, dev_meta],
2936                           iv_name=iv_name)
2937   return drbd_dev
2938
2939
2940 def _GenerateDiskTemplate(cfg, template_name,
2941                           instance_name, primary_node,
2942                           secondary_nodes, disk_sz, swap_sz,
2943                           file_storage_dir, file_driver):
2944   """Generate the entire disk layout for a given template type.
2945
2946   """
2947   #TODO: compute space requirements
2948
2949   vgname = cfg.GetVGName()
2950   if template_name == constants.DT_DISKLESS:
2951     disks = []
2952   elif template_name == constants.DT_PLAIN:
2953     if len(secondary_nodes) != 0:
2954       raise errors.ProgrammerError("Wrong template configuration")
2955
2956     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2957     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2958                            logical_id=(vgname, names[0]),
2959                            iv_name = "sda")
2960     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2961                            logical_id=(vgname, names[1]),
2962                            iv_name = "sdb")
2963     disks = [sda_dev, sdb_dev]
2964   elif template_name == constants.DT_DRBD8:
2965     if len(secondary_nodes) != 1:
2966       raise errors.ProgrammerError("Wrong template configuration")
2967     remote_node = secondary_nodes[0]
2968     (minor_pa, minor_pb,
2969      minor_sa, minor_sb) = cfg.AllocateDRBDMinor(
2970       [primary_node, primary_node, remote_node, remote_node], instance_name)
2971
2972     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2973                                        ".sdb_data", ".sdb_meta"])
2974     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2975                                         disk_sz, names[0:2], "sda",
2976                                         minor_pa, minor_sa)
2977     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2978                                         swap_sz, names[2:4], "sdb",
2979                                         minor_pb, minor_sb)
2980     disks = [drbd_sda_dev, drbd_sdb_dev]
2981   elif template_name == constants.DT_FILE:
2982     if len(secondary_nodes) != 0:
2983       raise errors.ProgrammerError("Wrong template configuration")
2984
2985     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2986                                 iv_name="sda", logical_id=(file_driver,
2987                                 "%s/sda" % file_storage_dir))
2988     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2989                                 iv_name="sdb", logical_id=(file_driver,
2990                                 "%s/sdb" % file_storage_dir))
2991     disks = [file_sda_dev, file_sdb_dev]
2992   else:
2993     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2994   return disks
2995
2996
2997 def _GetInstanceInfoText(instance):
2998   """Compute that text that should be added to the disk's metadata.
2999
3000   """
3001   return "originstname+%s" % instance.name
3002
3003
3004 def _CreateDisks(cfg, instance):
3005   """Create all disks for an instance.
3006
3007   This abstracts away some work from AddInstance.
3008
3009   Args:
3010     instance: the instance object
3011
3012   Returns:
3013     True or False showing the success of the creation process
3014
3015   """
3016   info = _GetInstanceInfoText(instance)
3017
3018   if instance.disk_template == constants.DT_FILE:
3019     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3020     result = rpc.call_file_storage_dir_create(instance.primary_node,
3021                                               file_storage_dir)
3022
3023     if not result:
3024       logger.Error("Could not connect to node '%s'" % instance.primary_node)
3025       return False
3026
3027     if not result[0]:
3028       logger.Error("failed to create directory '%s'" % file_storage_dir)
3029       return False
3030
3031   for device in instance.disks:
3032     logger.Info("creating volume %s for instance %s" %
3033                 (device.iv_name, instance.name))
3034     #HARDCODE
3035     for secondary_node in instance.secondary_nodes:
3036       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3037                                         device, False, info):
3038         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3039                      (device.iv_name, device, secondary_node))
3040         return False
3041     #HARDCODE
3042     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3043                                     instance, device, info):
3044       logger.Error("failed to create volume %s on primary!" %
3045                    device.iv_name)
3046       return False
3047
3048   return True
3049
3050
3051 def _RemoveDisks(instance, cfg):
3052   """Remove all disks for an instance.
3053
3054   This abstracts away some work from `AddInstance()` and
3055   `RemoveInstance()`. Note that in case some of the devices couldn't
3056   be removed, the removal will continue with the other ones (compare
3057   with `_CreateDisks()`).
3058
3059   Args:
3060     instance: the instance object
3061
3062   Returns:
3063     True or False showing the success of the removal proces
3064
3065   """
3066   logger.Info("removing block devices for instance %s" % instance.name)
3067
3068   result = True
3069   for device in instance.disks:
3070     for node, disk in device.ComputeNodeTree(instance.primary_node):
3071       cfg.SetDiskID(disk, node)
3072       if not rpc.call_blockdev_remove(node, disk):
3073         logger.Error("could not remove block device %s on node %s,"
3074                      " continuing anyway" %
3075                      (device.iv_name, node))
3076         result = False
3077
3078   if instance.disk_template == constants.DT_FILE:
3079     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3080     if not rpc.call_file_storage_dir_remove(instance.primary_node,
3081                                             file_storage_dir):
3082       logger.Error("could not remove directory '%s'" % file_storage_dir)
3083       result = False
3084
3085   return result
3086
3087
3088 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3089   """Compute disk size requirements in the volume group
3090
3091   This is currently hard-coded for the two-drive layout.
3092
3093   """
3094   # Required free disk space as a function of disk and swap space
3095   req_size_dict = {
3096     constants.DT_DISKLESS: None,
3097     constants.DT_PLAIN: disk_size + swap_size,
3098     # 256 MB are added for drbd metadata, 128MB for each drbd device
3099     constants.DT_DRBD8: disk_size + swap_size + 256,
3100     constants.DT_FILE: None,
3101   }
3102
3103   if disk_template not in req_size_dict:
3104     raise errors.ProgrammerError("Disk template '%s' size requirement"
3105                                  " is unknown" %  disk_template)
3106
3107   return req_size_dict[disk_template]
3108
3109
3110 class LUCreateInstance(LogicalUnit):
3111   """Create an instance.
3112
3113   """
3114   HPATH = "instance-add"
3115   HTYPE = constants.HTYPE_INSTANCE
3116   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3117               "disk_template", "swap_size", "mode", "start", "vcpus",
3118               "wait_for_sync", "ip_check", "mac"]
3119   REQ_BGL = False
3120
3121   def _ExpandNode(self, node):
3122     """Expands and checks one node name.
3123
3124     """
3125     node_full = self.cfg.ExpandNodeName(node)
3126     if node_full is None:
3127       raise errors.OpPrereqError("Unknown node %s" % node)
3128     return node_full
3129
3130   def ExpandNames(self):
3131     """ExpandNames for CreateInstance.
3132
3133     Figure out the right locks for instance creation.
3134
3135     """
3136     self.needed_locks = {}
3137
3138     # set optional parameters to none if they don't exist
3139     for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3140                  "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3141                  "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3142                  "vnc_bind_address"]:
3143       if not hasattr(self.op, attr):
3144         setattr(self.op, attr, None)
3145
3146     # verify creation mode
3147     if self.op.mode not in (constants.INSTANCE_CREATE,
3148                             constants.INSTANCE_IMPORT):
3149       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3150                                  self.op.mode)
3151     # disk template and mirror node verification
3152     if self.op.disk_template not in constants.DISK_TEMPLATES:
3153       raise errors.OpPrereqError("Invalid disk template name")
3154
3155     #### instance parameters check
3156
3157     # instance name verification
3158     hostname1 = utils.HostInfo(self.op.instance_name)
3159     self.op.instance_name = instance_name = hostname1.name
3160
3161     # this is just a preventive check, but someone might still add this
3162     # instance in the meantime, and creation will fail at lock-add time
3163     if instance_name in self.cfg.GetInstanceList():
3164       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3165                                  instance_name)
3166
3167     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3168
3169     # ip validity checks
3170     ip = getattr(self.op, "ip", None)
3171     if ip is None or ip.lower() == "none":
3172       inst_ip = None
3173     elif ip.lower() == "auto":
3174       inst_ip = hostname1.ip
3175     else:
3176       if not utils.IsValidIP(ip):
3177         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3178                                    " like a valid IP" % ip)
3179       inst_ip = ip
3180     self.inst_ip = self.op.ip = inst_ip
3181     # used in CheckPrereq for ip ping check
3182     self.check_ip = hostname1.ip
3183
3184     # MAC address verification
3185     if self.op.mac != "auto":
3186       if not utils.IsValidMac(self.op.mac.lower()):
3187         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3188                                    self.op.mac)
3189
3190     # boot order verification
3191     if self.op.hvm_boot_order is not None:
3192       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3193         raise errors.OpPrereqError("invalid boot order specified,"
3194                                    " must be one or more of [acdn]")
3195     # file storage checks
3196     if (self.op.file_driver and
3197         not self.op.file_driver in constants.FILE_DRIVER):
3198       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3199                                  self.op.file_driver)
3200
3201     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3202       raise errors.OpPrereqError("File storage directory path not absolute")
3203
3204     ### Node/iallocator related checks
3205     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3206       raise errors.OpPrereqError("One and only one of iallocator and primary"
3207                                  " node must be given")
3208
3209     if self.op.iallocator:
3210       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3211     else:
3212       self.op.pnode = self._ExpandNode(self.op.pnode)
3213       nodelist = [self.op.pnode]
3214       if self.op.snode is not None:
3215         self.op.snode = self._ExpandNode(self.op.snode)
3216         nodelist.append(self.op.snode)
3217       self.needed_locks[locking.LEVEL_NODE] = nodelist
3218
3219     # in case of import lock the source node too
3220     if self.op.mode == constants.INSTANCE_IMPORT:
3221       src_node = getattr(self.op, "src_node", None)
3222       src_path = getattr(self.op, "src_path", None)
3223
3224       if src_node is None or src_path is None:
3225         raise errors.OpPrereqError("Importing an instance requires source"
3226                                    " node and path options")
3227
3228       if not os.path.isabs(src_path):
3229         raise errors.OpPrereqError("The source path must be absolute")
3230
3231       self.op.src_node = src_node = self._ExpandNode(src_node)
3232       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3233         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3234
3235     else: # INSTANCE_CREATE
3236       if getattr(self.op, "os_type", None) is None:
3237         raise errors.OpPrereqError("No guest OS specified")
3238
3239   def _RunAllocator(self):
3240     """Run the allocator based on input opcode.
3241
3242     """
3243     disks = [{"size": self.op.disk_size, "mode": "w"},
3244              {"size": self.op.swap_size, "mode": "w"}]
3245     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3246              "bridge": self.op.bridge}]
3247     ial = IAllocator(self.cfg, self.sstore,
3248                      mode=constants.IALLOCATOR_MODE_ALLOC,
3249                      name=self.op.instance_name,
3250                      disk_template=self.op.disk_template,
3251                      tags=[],
3252                      os=self.op.os_type,
3253                      vcpus=self.op.vcpus,
3254                      mem_size=self.op.mem_size,
3255                      disks=disks,
3256                      nics=nics,
3257                      )
3258
3259     ial.Run(self.op.iallocator)
3260
3261     if not ial.success:
3262       raise errors.OpPrereqError("Can't compute nodes using"
3263                                  " iallocator '%s': %s" % (self.op.iallocator,
3264                                                            ial.info))
3265     if len(ial.nodes) != ial.required_nodes:
3266       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3267                                  " of nodes (%s), required %s" %
3268                                  (self.op.iallocator, len(ial.nodes),
3269                                   ial.required_nodes))
3270     self.op.pnode = ial.nodes[0]
3271     logger.ToStdout("Selected nodes for the instance: %s" %
3272                     (", ".join(ial.nodes),))
3273     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3274                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3275     if ial.required_nodes == 2:
3276       self.op.snode = ial.nodes[1]
3277
3278   def BuildHooksEnv(self):
3279     """Build hooks env.
3280
3281     This runs on master, primary and secondary nodes of the instance.
3282
3283     """
3284     env = {
3285       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3286       "INSTANCE_DISK_SIZE": self.op.disk_size,
3287       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3288       "INSTANCE_ADD_MODE": self.op.mode,
3289       }
3290     if self.op.mode == constants.INSTANCE_IMPORT:
3291       env["INSTANCE_SRC_NODE"] = self.op.src_node
3292       env["INSTANCE_SRC_PATH"] = self.op.src_path
3293       env["INSTANCE_SRC_IMAGE"] = self.src_image
3294
3295     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3296       primary_node=self.op.pnode,
3297       secondary_nodes=self.secondaries,
3298       status=self.instance_status,
3299       os_type=self.op.os_type,
3300       memory=self.op.mem_size,
3301       vcpus=self.op.vcpus,
3302       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3303     ))
3304
3305     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3306           self.secondaries)
3307     return env, nl, nl
3308
3309
3310   def CheckPrereq(self):
3311     """Check prerequisites.
3312
3313     """
3314     if (not self.cfg.GetVGName() and
3315         self.op.disk_template not in constants.DTS_NOT_LVM):
3316       raise errors.OpPrereqError("Cluster does not support lvm-based"
3317                                  " instances")
3318
3319     if self.op.mode == constants.INSTANCE_IMPORT:
3320       src_node = self.op.src_node
3321       src_path = self.op.src_path
3322
3323       export_info = rpc.call_export_info(src_node, src_path)
3324
3325       if not export_info:
3326         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3327
3328       if not export_info.has_section(constants.INISECT_EXP):
3329         raise errors.ProgrammerError("Corrupted export config")
3330
3331       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3332       if (int(ei_version) != constants.EXPORT_VERSION):
3333         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3334                                    (ei_version, constants.EXPORT_VERSION))
3335
3336       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3337         raise errors.OpPrereqError("Can't import instance with more than"
3338                                    " one data disk")
3339
3340       # FIXME: are the old os-es, disk sizes, etc. useful?
3341       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3342       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3343                                                          'disk0_dump'))
3344       self.src_image = diskimage
3345
3346     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3347
3348     if self.op.start and not self.op.ip_check:
3349       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3350                                  " adding an instance in start mode")
3351
3352     if self.op.ip_check:
3353       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3354         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3355                                    (self.check_ip, instance_name))
3356
3357     # bridge verification
3358     bridge = getattr(self.op, "bridge", None)
3359     if bridge is None:
3360       self.op.bridge = self.cfg.GetDefBridge()
3361     else:
3362       self.op.bridge = bridge
3363
3364     #### allocator run
3365
3366     if self.op.iallocator is not None:
3367       self._RunAllocator()
3368
3369     #### node related checks
3370
3371     # check primary node
3372     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3373     assert self.pnode is not None, \
3374       "Cannot retrieve locked node %s" % self.op.pnode
3375     self.secondaries = []
3376
3377     # mirror node verification
3378     if self.op.disk_template in constants.DTS_NET_MIRROR:
3379       if self.op.snode is None:
3380         raise errors.OpPrereqError("The networked disk templates need"
3381                                    " a mirror node")
3382       if self.op.snode == pnode.name:
3383         raise errors.OpPrereqError("The secondary node cannot be"
3384                                    " the primary node.")
3385       self.secondaries.append(self.op.snode)
3386
3387     req_size = _ComputeDiskSize(self.op.disk_template,
3388                                 self.op.disk_size, self.op.swap_size)
3389
3390     # Check lv size requirements
3391     if req_size is not None:
3392       nodenames = [pnode.name] + self.secondaries
3393       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3394       for node in nodenames:
3395         info = nodeinfo.get(node, None)
3396         if not info:
3397           raise errors.OpPrereqError("Cannot get current information"
3398                                      " from node '%s'" % node)
3399         vg_free = info.get('vg_free', None)
3400         if not isinstance(vg_free, int):
3401           raise errors.OpPrereqError("Can't compute free disk space on"
3402                                      " node %s" % node)
3403         if req_size > info['vg_free']:
3404           raise errors.OpPrereqError("Not enough disk space on target node %s."
3405                                      " %d MB available, %d MB required" %
3406                                      (node, info['vg_free'], req_size))
3407
3408     # os verification
3409     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3410     if not os_obj:
3411       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3412                                  " primary node"  % self.op.os_type)
3413
3414     if self.op.kernel_path == constants.VALUE_NONE:
3415       raise errors.OpPrereqError("Can't set instance kernel to none")
3416
3417     # bridge check on primary node
3418     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3419       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3420                                  " destination node '%s'" %
3421                                  (self.op.bridge, pnode.name))
3422
3423     # memory check on primary node
3424     if self.op.start:
3425       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3426                            "creating instance %s" % self.op.instance_name,
3427                            self.op.mem_size)
3428
3429     # hvm_cdrom_image_path verification
3430     if self.op.hvm_cdrom_image_path is not None:
3431       # FIXME (als): shouldn't these checks happen on the destination node?
3432       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3433         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3434                                    " be an absolute path or None, not %s" %
3435                                    self.op.hvm_cdrom_image_path)
3436       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3437         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3438                                    " regular file or a symlink pointing to"
3439                                    " an existing regular file, not %s" %
3440                                    self.op.hvm_cdrom_image_path)
3441
3442     # vnc_bind_address verification
3443     if self.op.vnc_bind_address is not None:
3444       if not utils.IsValidIP(self.op.vnc_bind_address):
3445         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3446                                    " like a valid IP address" %
3447                                    self.op.vnc_bind_address)
3448
3449     # Xen HVM device type checks
3450     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3451       if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3452         raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3453                                    " hypervisor" % self.op.hvm_nic_type)
3454       if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3455         raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3456                                    " hypervisor" % self.op.hvm_disk_type)
3457
3458     if self.op.start:
3459       self.instance_status = 'up'
3460     else:
3461       self.instance_status = 'down'
3462
3463   def Exec(self, feedback_fn):
3464     """Create and add the instance to the cluster.
3465
3466     """
3467     instance = self.op.instance_name
3468     pnode_name = self.pnode.name
3469
3470     if self.op.mac == "auto":
3471       mac_address = self.cfg.GenerateMAC()
3472     else:
3473       mac_address = self.op.mac
3474
3475     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3476     if self.inst_ip is not None:
3477       nic.ip = self.inst_ip
3478
3479     ht_kind = self.sstore.GetHypervisorType()
3480     if ht_kind in constants.HTS_REQ_PORT:
3481       network_port = self.cfg.AllocatePort()
3482     else:
3483       network_port = None
3484
3485     if self.op.vnc_bind_address is None:
3486       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3487
3488     # this is needed because os.path.join does not accept None arguments
3489     if self.op.file_storage_dir is None:
3490       string_file_storage_dir = ""
3491     else:
3492       string_file_storage_dir = self.op.file_storage_dir
3493
3494     # build the full file storage dir path
3495     file_storage_dir = os.path.normpath(os.path.join(
3496                                         self.sstore.GetFileStorageDir(),
3497                                         string_file_storage_dir, instance))
3498
3499
3500     disks = _GenerateDiskTemplate(self.cfg,
3501                                   self.op.disk_template,
3502                                   instance, pnode_name,
3503                                   self.secondaries, self.op.disk_size,
3504                                   self.op.swap_size,
3505                                   file_storage_dir,
3506                                   self.op.file_driver)
3507
3508     iobj = objects.Instance(name=instance, os=self.op.os_type,
3509                             primary_node=pnode_name,
3510                             memory=self.op.mem_size,
3511                             vcpus=self.op.vcpus,
3512                             nics=[nic], disks=disks,
3513                             disk_template=self.op.disk_template,
3514                             status=self.instance_status,
3515                             network_port=network_port,
3516                             kernel_path=self.op.kernel_path,
3517                             initrd_path=self.op.initrd_path,
3518                             hvm_boot_order=self.op.hvm_boot_order,
3519                             hvm_acpi=self.op.hvm_acpi,
3520                             hvm_pae=self.op.hvm_pae,
3521                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3522                             vnc_bind_address=self.op.vnc_bind_address,
3523                             hvm_nic_type=self.op.hvm_nic_type,
3524                             hvm_disk_type=self.op.hvm_disk_type,
3525                             )
3526
3527     feedback_fn("* creating instance disks...")
3528     if not _CreateDisks(self.cfg, iobj):
3529       _RemoveDisks(iobj, self.cfg)
3530       self.cfg.ReleaseDRBDMinors(instance)
3531       raise errors.OpExecError("Device creation failed, reverting...")
3532
3533     feedback_fn("adding instance %s to cluster config" % instance)
3534
3535     self.cfg.AddInstance(iobj)
3536     # Declare that we don't want to remove the instance lock anymore, as we've
3537     # added the instance to the config
3538     del self.remove_locks[locking.LEVEL_INSTANCE]
3539     # Remove the temp. assignements for the instance's drbds
3540     self.cfg.ReleaseDRBDMinors(instance)
3541
3542     if self.op.wait_for_sync:
3543       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3544     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3545       # make sure the disks are not degraded (still sync-ing is ok)
3546       time.sleep(15)
3547       feedback_fn("* checking mirrors status")
3548       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3549     else:
3550       disk_abort = False
3551
3552     if disk_abort:
3553       _RemoveDisks(iobj, self.cfg)
3554       self.cfg.RemoveInstance(iobj.name)
3555       # Make sure the instance lock gets removed
3556       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3557       raise errors.OpExecError("There are some degraded disks for"
3558                                " this instance")
3559
3560     feedback_fn("creating os for instance %s on node %s" %
3561                 (instance, pnode_name))
3562
3563     if iobj.disk_template != constants.DT_DISKLESS:
3564       if self.op.mode == constants.INSTANCE_CREATE:
3565         feedback_fn("* running the instance OS create scripts...")
3566         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3567           raise errors.OpExecError("could not add os for instance %s"
3568                                    " on node %s" %
3569                                    (instance, pnode_name))
3570
3571       elif self.op.mode == constants.INSTANCE_IMPORT:
3572         feedback_fn("* running the instance OS import scripts...")
3573         src_node = self.op.src_node
3574         src_image = self.src_image
3575         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3576                                                 src_node, src_image):
3577           raise errors.OpExecError("Could not import os for instance"
3578                                    " %s on node %s" %
3579                                    (instance, pnode_name))
3580       else:
3581         # also checked in the prereq part
3582         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3583                                      % self.op.mode)
3584
3585     if self.op.start:
3586       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3587       feedback_fn("* starting instance...")
3588       if not rpc.call_instance_start(pnode_name, iobj, None):
3589         raise errors.OpExecError("Could not start instance")
3590
3591
3592 class LUConnectConsole(NoHooksLU):
3593   """Connect to an instance's console.
3594
3595   This is somewhat special in that it returns the command line that
3596   you need to run on the master node in order to connect to the
3597   console.
3598
3599   """
3600   _OP_REQP = ["instance_name"]
3601   REQ_BGL = False
3602
3603   def ExpandNames(self):
3604     self._ExpandAndLockInstance()
3605
3606   def CheckPrereq(self):
3607     """Check prerequisites.
3608
3609     This checks that the instance is in the cluster.
3610
3611     """
3612     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3613     assert self.instance is not None, \
3614       "Cannot retrieve locked instance %s" % self.op.instance_name
3615
3616   def Exec(self, feedback_fn):
3617     """Connect to the console of an instance
3618
3619     """
3620     instance = self.instance
3621     node = instance.primary_node
3622
3623     node_insts = rpc.call_instance_list([node])[node]
3624     if node_insts is False:
3625       raise errors.OpExecError("Can't connect to node %s." % node)
3626
3627     if instance.name not in node_insts:
3628       raise errors.OpExecError("Instance %s is not running." % instance.name)
3629
3630     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3631
3632     hyper = hypervisor.GetHypervisor()
3633     console_cmd = hyper.GetShellCommandForConsole(instance)
3634
3635     # build ssh cmdline
3636     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3637
3638
3639 class LUReplaceDisks(LogicalUnit):
3640   """Replace the disks of an instance.
3641
3642   """
3643   HPATH = "mirrors-replace"
3644   HTYPE = constants.HTYPE_INSTANCE
3645   _OP_REQP = ["instance_name", "mode", "disks"]
3646   REQ_BGL = False
3647
3648   def ExpandNames(self):
3649     self._ExpandAndLockInstance()
3650
3651     if not hasattr(self.op, "remote_node"):
3652       self.op.remote_node = None
3653
3654     ia_name = getattr(self.op, "iallocator", None)
3655     if ia_name is not None:
3656       if self.op.remote_node is not None:
3657         raise errors.OpPrereqError("Give either the iallocator or the new"
3658                                    " secondary, not both")
3659       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3660     elif self.op.remote_node is not None:
3661       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3662       if remote_node is None:
3663         raise errors.OpPrereqError("Node '%s' not known" %
3664                                    self.op.remote_node)
3665       self.op.remote_node = remote_node
3666       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3667       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3668     else:
3669       self.needed_locks[locking.LEVEL_NODE] = []
3670       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3671
3672   def DeclareLocks(self, level):
3673     # If we're not already locking all nodes in the set we have to declare the
3674     # instance's primary/secondary nodes.
3675     if (level == locking.LEVEL_NODE and
3676         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3677       self._LockInstancesNodes()
3678
3679   def _RunAllocator(self):
3680     """Compute a new secondary node using an IAllocator.
3681
3682     """
3683     ial = IAllocator(self.cfg, self.sstore,
3684                      mode=constants.IALLOCATOR_MODE_RELOC,
3685                      name=self.op.instance_name,
3686                      relocate_from=[self.sec_node])
3687
3688     ial.Run(self.op.iallocator)
3689
3690     if not ial.success:
3691       raise errors.OpPrereqError("Can't compute nodes using"
3692                                  " iallocator '%s': %s" % (self.op.iallocator,
3693                                                            ial.info))
3694     if len(ial.nodes) != ial.required_nodes:
3695       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3696                                  " of nodes (%s), required %s" %
3697                                  (len(ial.nodes), ial.required_nodes))
3698     self.op.remote_node = ial.nodes[0]
3699     logger.ToStdout("Selected new secondary for the instance: %s" %
3700                     self.op.remote_node)
3701
3702   def BuildHooksEnv(self):
3703     """Build hooks env.
3704
3705     This runs on the master, the primary and all the secondaries.
3706
3707     """
3708     env = {
3709       "MODE": self.op.mode,
3710       "NEW_SECONDARY": self.op.remote_node,
3711       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3712       }
3713     env.update(_BuildInstanceHookEnvByObject(self.instance))
3714     nl = [
3715       self.sstore.GetMasterNode(),
3716       self.instance.primary_node,
3717       ]
3718     if self.op.remote_node is not None:
3719       nl.append(self.op.remote_node)
3720     return env, nl, nl
3721
3722   def CheckPrereq(self):
3723     """Check prerequisites.
3724
3725     This checks that the instance is in the cluster.
3726
3727     """
3728     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3729     assert instance is not None, \
3730       "Cannot retrieve locked instance %s" % self.op.instance_name
3731     self.instance = instance
3732
3733     if instance.disk_template not in constants.DTS_NET_MIRROR:
3734       raise errors.OpPrereqError("Instance's disk layout is not"
3735                                  " network mirrored.")
3736
3737     if len(instance.secondary_nodes) != 1:
3738       raise errors.OpPrereqError("The instance has a strange layout,"
3739                                  " expected one secondary but found %d" %
3740                                  len(instance.secondary_nodes))
3741
3742     self.sec_node = instance.secondary_nodes[0]
3743
3744     ia_name = getattr(self.op, "iallocator", None)
3745     if ia_name is not None:
3746       self._RunAllocator()
3747
3748     remote_node = self.op.remote_node
3749     if remote_node is not None:
3750       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3751       assert self.remote_node_info is not None, \
3752         "Cannot retrieve locked node %s" % remote_node
3753     else:
3754       self.remote_node_info = None
3755     if remote_node == instance.primary_node:
3756       raise errors.OpPrereqError("The specified node is the primary node of"
3757                                  " the instance.")
3758     elif remote_node == self.sec_node:
3759       if self.op.mode == constants.REPLACE_DISK_SEC:
3760         # this is for DRBD8, where we can't execute the same mode of
3761         # replacement as for drbd7 (no different port allocated)
3762         raise errors.OpPrereqError("Same secondary given, cannot execute"
3763                                    " replacement")
3764     if instance.disk_template == constants.DT_DRBD8:
3765       if (self.op.mode == constants.REPLACE_DISK_ALL and
3766           remote_node is not None):
3767         # switch to replace secondary mode
3768         self.op.mode = constants.REPLACE_DISK_SEC
3769
3770       if self.op.mode == constants.REPLACE_DISK_ALL:
3771         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3772                                    " secondary disk replacement, not"
3773                                    " both at once")
3774       elif self.op.mode == constants.REPLACE_DISK_PRI:
3775         if remote_node is not None:
3776           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3777                                      " the secondary while doing a primary"
3778                                      " node disk replacement")
3779         self.tgt_node = instance.primary_node
3780         self.oth_node = instance.secondary_nodes[0]
3781       elif self.op.mode == constants.REPLACE_DISK_SEC:
3782         self.new_node = remote_node # this can be None, in which case
3783                                     # we don't change the secondary
3784         self.tgt_node = instance.secondary_nodes[0]
3785         self.oth_node = instance.primary_node
3786       else:
3787         raise errors.ProgrammerError("Unhandled disk replace mode")
3788
3789     for name in self.op.disks:
3790       if instance.FindDisk(name) is None:
3791         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3792                                    (name, instance.name))
3793
3794   def _ExecD8DiskOnly(self, feedback_fn):
3795     """Replace a disk on the primary or secondary for dbrd8.
3796
3797     The algorithm for replace is quite complicated:
3798       - for each disk to be replaced:
3799         - create new LVs on the target node with unique names
3800         - detach old LVs from the drbd device
3801         - rename old LVs to name_replaced.<time_t>
3802         - rename new LVs to old LVs
3803         - attach the new LVs (with the old names now) to the drbd device
3804       - wait for sync across all devices
3805       - for each modified disk:
3806         - remove old LVs (which have the name name_replaces.<time_t>)
3807
3808     Failures are not very well handled.
3809
3810     """
3811     steps_total = 6
3812     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3813     instance = self.instance
3814     iv_names = {}
3815     vgname = self.cfg.GetVGName()
3816     # start of work
3817     cfg = self.cfg
3818     tgt_node = self.tgt_node
3819     oth_node = self.oth_node
3820
3821     # Step: check device activation
3822     self.proc.LogStep(1, steps_total, "check device existence")
3823     info("checking volume groups")
3824     my_vg = cfg.GetVGName()
3825     results = rpc.call_vg_list([oth_node, tgt_node])
3826     if not results:
3827       raise errors.OpExecError("Can't list volume groups on the nodes")
3828     for node in oth_node, tgt_node:
3829       res = results.get(node, False)
3830       if not res or my_vg not in res:
3831         raise errors.OpExecError("Volume group '%s' not found on %s" %
3832                                  (my_vg, node))
3833     for dev in instance.disks:
3834       if not dev.iv_name in self.op.disks:
3835         continue
3836       for node in tgt_node, oth_node:
3837         info("checking %s on %s" % (dev.iv_name, node))
3838         cfg.SetDiskID(dev, node)
3839         if not rpc.call_blockdev_find(node, dev):
3840           raise errors.OpExecError("Can't find device %s on node %s" %
3841                                    (dev.iv_name, node))
3842
3843     # Step: check other node consistency
3844     self.proc.LogStep(2, steps_total, "check peer consistency")
3845     for dev in instance.disks:
3846       if not dev.iv_name in self.op.disks:
3847         continue
3848       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3849       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3850                                    oth_node==instance.primary_node):
3851         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3852                                  " to replace disks on this node (%s)" %
3853                                  (oth_node, tgt_node))
3854
3855     # Step: create new storage
3856     self.proc.LogStep(3, steps_total, "allocate new storage")
3857     for dev in instance.disks:
3858       if not dev.iv_name in self.op.disks:
3859         continue
3860       size = dev.size
3861       cfg.SetDiskID(dev, tgt_node)
3862       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3863       names = _GenerateUniqueNames(cfg, lv_names)
3864       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3865                              logical_id=(vgname, names[0]))
3866       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3867                              logical_id=(vgname, names[1]))
3868       new_lvs = [lv_data, lv_meta]
3869       old_lvs = dev.children
3870       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3871       info("creating new local storage on %s for %s" %
3872            (tgt_node, dev.iv_name))
3873       # since we *always* want to create this LV, we use the
3874       # _Create...OnPrimary (which forces the creation), even if we
3875       # are talking about the secondary node
3876       for new_lv in new_lvs:
3877         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3878                                         _GetInstanceInfoText(instance)):
3879           raise errors.OpExecError("Failed to create new LV named '%s' on"
3880                                    " node '%s'" %
3881                                    (new_lv.logical_id[1], tgt_node))
3882
3883     # Step: for each lv, detach+rename*2+attach
3884     self.proc.LogStep(4, steps_total, "change drbd configuration")
3885     for dev, old_lvs, new_lvs in iv_names.itervalues():
3886       info("detaching %s drbd from local storage" % dev.iv_name)
3887       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3888         raise errors.OpExecError("Can't detach drbd from local storage on node"
3889                                  " %s for device %s" % (tgt_node, dev.iv_name))
3890       #dev.children = []
3891       #cfg.Update(instance)
3892
3893       # ok, we created the new LVs, so now we know we have the needed
3894       # storage; as such, we proceed on the target node to rename
3895       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3896       # using the assumption that logical_id == physical_id (which in
3897       # turn is the unique_id on that node)
3898
3899       # FIXME(iustin): use a better name for the replaced LVs
3900       temp_suffix = int(time.time())
3901       ren_fn = lambda d, suff: (d.physical_id[0],
3902                                 d.physical_id[1] + "_replaced-%s" % suff)
3903       # build the rename list based on what LVs exist on the node
3904       rlist = []
3905       for to_ren in old_lvs:
3906         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3907         if find_res is not None: # device exists
3908           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3909
3910       info("renaming the old LVs on the target node")
3911       if not rpc.call_blockdev_rename(tgt_node, rlist):
3912         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3913       # now we rename the new LVs to the old LVs
3914       info("renaming the new LVs on the target node")
3915       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3916       if not rpc.call_blockdev_rename(tgt_node, rlist):
3917         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3918
3919       for old, new in zip(old_lvs, new_lvs):
3920         new.logical_id = old.logical_id
3921         cfg.SetDiskID(new, tgt_node)
3922
3923       for disk in old_lvs:
3924         disk.logical_id = ren_fn(disk, temp_suffix)
3925         cfg.SetDiskID(disk, tgt_node)
3926
3927       # now that the new lvs have the old name, we can add them to the device
3928       info("adding new mirror component on %s" % tgt_node)
3929       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3930         for new_lv in new_lvs:
3931           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3932             warning("Can't rollback device %s", hint="manually cleanup unused"
3933                     " logical volumes")
3934         raise errors.OpExecError("Can't add local storage to drbd")
3935
3936       dev.children = new_lvs
3937       cfg.Update(instance)
3938
3939     # Step: wait for sync
3940
3941     # this can fail as the old devices are degraded and _WaitForSync
3942     # does a combined result over all disks, so we don't check its
3943     # return value
3944     self.proc.LogStep(5, steps_total, "sync devices")
3945     _WaitForSync(cfg, instance, self.proc, unlock=True)
3946
3947     # so check manually all the devices
3948     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3949       cfg.SetDiskID(dev, instance.primary_node)
3950       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3951       if is_degr:
3952         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3953
3954     # Step: remove old storage
3955     self.proc.LogStep(6, steps_total, "removing old storage")
3956     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3957       info("remove logical volumes for %s" % name)
3958       for lv in old_lvs:
3959         cfg.SetDiskID(lv, tgt_node)
3960         if not rpc.call_blockdev_remove(tgt_node, lv):
3961           warning("Can't remove old LV", hint="manually remove unused LVs")
3962           continue
3963
3964   def _ExecD8Secondary(self, feedback_fn):
3965     """Replace the secondary node for drbd8.
3966
3967     The algorithm for replace is quite complicated:
3968       - for all disks of the instance:
3969         - create new LVs on the new node with same names
3970         - shutdown the drbd device on the old secondary
3971         - disconnect the drbd network on the primary
3972         - create the drbd device on the new secondary
3973         - network attach the drbd on the primary, using an artifice:
3974           the drbd code for Attach() will connect to the network if it
3975           finds a device which is connected to the good local disks but
3976           not network enabled
3977       - wait for sync across all devices
3978       - remove all disks from the old secondary
3979
3980     Failures are not very well handled.
3981
3982     """
3983     steps_total = 6
3984     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3985     instance = self.instance
3986     iv_names = {}
3987     vgname = self.cfg.GetVGName()
3988     # start of work
3989     cfg = self.cfg
3990     old_node = self.tgt_node
3991     new_node = self.new_node
3992     pri_node = instance.primary_node
3993
3994     # Step: check device activation
3995     self.proc.LogStep(1, steps_total, "check device existence")
3996     info("checking volume groups")
3997     my_vg = cfg.GetVGName()
3998     results = rpc.call_vg_list([pri_node, new_node])
3999     if not results:
4000       raise errors.OpExecError("Can't list volume groups on the nodes")
4001     for node in pri_node, new_node:
4002       res = results.get(node, False)
4003       if not res or my_vg not in res:
4004         raise errors.OpExecError("Volume group '%s' not found on %s" %
4005                                  (my_vg, node))
4006     for dev in instance.disks:
4007       if not dev.iv_name in self.op.disks:
4008         continue
4009       info("checking %s on %s" % (dev.iv_name, pri_node))
4010       cfg.SetDiskID(dev, pri_node)
4011       if not rpc.call_blockdev_find(pri_node, dev):
4012         raise errors.OpExecError("Can't find device %s on node %s" %
4013                                  (dev.iv_name, pri_node))
4014
4015     # Step: check other node consistency
4016     self.proc.LogStep(2, steps_total, "check peer consistency")
4017     for dev in instance.disks:
4018       if not dev.iv_name in self.op.disks:
4019         continue
4020       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4021       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4022         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4023                                  " unsafe to replace the secondary" %
4024                                  pri_node)
4025
4026     # Step: create new storage
4027     self.proc.LogStep(3, steps_total, "allocate new storage")
4028     for dev in instance.disks:
4029       size = dev.size
4030       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4031       # since we *always* want to create this LV, we use the
4032       # _Create...OnPrimary (which forces the creation), even if we
4033       # are talking about the secondary node
4034       for new_lv in dev.children:
4035         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4036                                         _GetInstanceInfoText(instance)):
4037           raise errors.OpExecError("Failed to create new LV named '%s' on"
4038                                    " node '%s'" %
4039                                    (new_lv.logical_id[1], new_node))
4040
4041
4042     # Step 4: dbrd minors and drbd setups changes
4043     # after this, we must manually remove the drbd minors on both the
4044     # error and the success paths
4045     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4046                                    instance.name)
4047     logging.debug("Allocated minors %s" % (minors,))
4048     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4049     for dev, new_minor in zip(instance.disks, minors):
4050       size = dev.size
4051       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4052       # create new devices on new_node
4053       if pri_node == dev.logical_id[0]:
4054         new_logical_id = (pri_node, new_node,
4055                           dev.logical_id[2], dev.logical_id[3], new_minor,
4056                           dev.logical_id[5])
4057       else:
4058         new_logical_id = (new_node, pri_node,
4059                           dev.logical_id[2], new_minor, dev.logical_id[4],
4060                           dev.logical_id[5])
4061       iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4062       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4063                     new_logical_id)
4064       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4065                               logical_id=new_logical_id,
4066                               children=dev.children)
4067       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4068                                         new_drbd, False,
4069                                       _GetInstanceInfoText(instance)):
4070         self.cfg.ReleaseDRBDMinors(instance.name)
4071         raise errors.OpExecError("Failed to create new DRBD on"
4072                                  " node '%s'" % new_node)
4073
4074     for dev in instance.disks:
4075       # we have new devices, shutdown the drbd on the old secondary
4076       info("shutting down drbd for %s on old node" % dev.iv_name)
4077       cfg.SetDiskID(dev, old_node)
4078       if not rpc.call_blockdev_shutdown(old_node, dev):
4079         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4080                 hint="Please cleanup this device manually as soon as possible")
4081
4082     info("detaching primary drbds from the network (=> standalone)")
4083     done = 0
4084     for dev in instance.disks:
4085       cfg.SetDiskID(dev, pri_node)
4086       # set the network part of the physical (unique in bdev terms) id
4087       # to None, meaning detach from network
4088       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4089       # and 'find' the device, which will 'fix' it to match the
4090       # standalone state
4091       if rpc.call_blockdev_find(pri_node, dev):
4092         done += 1
4093       else:
4094         warning("Failed to detach drbd %s from network, unusual case" %
4095                 dev.iv_name)
4096
4097     if not done:
4098       # no detaches succeeded (very unlikely)
4099       self.cfg.ReleaseDRBDMinors(instance.name)
4100       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4101
4102     # if we managed to detach at least one, we update all the disks of
4103     # the instance to point to the new secondary
4104     info("updating instance configuration")
4105     for dev, _, new_logical_id in iv_names.itervalues():
4106       dev.logical_id = new_logical_id
4107       cfg.SetDiskID(dev, pri_node)
4108     cfg.Update(instance)
4109     # we can remove now the temp minors as now the new values are
4110     # written to the config file (and therefore stable)
4111     self.cfg.ReleaseDRBDMinors(instance.name)
4112
4113     # and now perform the drbd attach
4114     info("attaching primary drbds to new secondary (standalone => connected)")
4115     failures = []
4116     for dev in instance.disks:
4117       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4118       # since the attach is smart, it's enough to 'find' the device,
4119       # it will automatically activate the network, if the physical_id
4120       # is correct
4121       cfg.SetDiskID(dev, pri_node)
4122       logging.debug("Disk to attach: %s", dev)
4123       if not rpc.call_blockdev_find(pri_node, dev):
4124         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4125                 "please do a gnt-instance info to see the status of disks")
4126
4127     # this can fail as the old devices are degraded and _WaitForSync
4128     # does a combined result over all disks, so we don't check its
4129     # return value
4130     self.proc.LogStep(5, steps_total, "sync devices")
4131     _WaitForSync(cfg, instance, self.proc, unlock=True)
4132
4133     # so check manually all the devices
4134     for name, (dev, old_lvs, _) in iv_names.iteritems():
4135       cfg.SetDiskID(dev, pri_node)
4136       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4137       if is_degr:
4138         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4139
4140     self.proc.LogStep(6, steps_total, "removing old storage")
4141     for name, (dev, old_lvs, _) in iv_names.iteritems():
4142       info("remove logical volumes for %s" % name)
4143       for lv in old_lvs:
4144         cfg.SetDiskID(lv, old_node)
4145         if not rpc.call_blockdev_remove(old_node, lv):
4146           warning("Can't remove LV on old secondary",
4147                   hint="Cleanup stale volumes by hand")
4148
4149   def Exec(self, feedback_fn):
4150     """Execute disk replacement.
4151
4152     This dispatches the disk replacement to the appropriate handler.
4153
4154     """
4155     instance = self.instance
4156
4157     # Activate the instance disks if we're replacing them on a down instance
4158     if instance.status == "down":
4159       _StartInstanceDisks(self.cfg, instance, True)
4160
4161     if instance.disk_template == constants.DT_DRBD8:
4162       if self.op.remote_node is None:
4163         fn = self._ExecD8DiskOnly
4164       else:
4165         fn = self._ExecD8Secondary
4166     else:
4167       raise errors.ProgrammerError("Unhandled disk replacement case")
4168
4169     ret = fn(feedback_fn)
4170
4171     # Deactivate the instance disks if we're replacing them on a down instance
4172     if instance.status == "down":
4173       _SafeShutdownInstanceDisks(instance, self.cfg)
4174
4175     return ret
4176
4177
4178 class LUGrowDisk(LogicalUnit):
4179   """Grow a disk of an instance.
4180
4181   """
4182   HPATH = "disk-grow"
4183   HTYPE = constants.HTYPE_INSTANCE
4184   _OP_REQP = ["instance_name", "disk", "amount"]
4185   REQ_BGL = False
4186
4187   def ExpandNames(self):
4188     self._ExpandAndLockInstance()
4189     self.needed_locks[locking.LEVEL_NODE] = []
4190     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4191
4192   def DeclareLocks(self, level):
4193     if level == locking.LEVEL_NODE:
4194       self._LockInstancesNodes()
4195
4196   def BuildHooksEnv(self):
4197     """Build hooks env.
4198
4199     This runs on the master, the primary and all the secondaries.
4200
4201     """
4202     env = {
4203       "DISK": self.op.disk,
4204       "AMOUNT": self.op.amount,
4205       }
4206     env.update(_BuildInstanceHookEnvByObject(self.instance))
4207     nl = [
4208       self.sstore.GetMasterNode(),
4209       self.instance.primary_node,
4210       ]
4211     return env, nl, nl
4212
4213   def CheckPrereq(self):
4214     """Check prerequisites.
4215
4216     This checks that the instance is in the cluster.
4217
4218     """
4219     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4220     assert instance is not None, \
4221       "Cannot retrieve locked instance %s" % self.op.instance_name
4222
4223     self.instance = instance
4224
4225     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4226       raise errors.OpPrereqError("Instance's disk layout does not support"
4227                                  " growing.")
4228
4229     if instance.FindDisk(self.op.disk) is None:
4230       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4231                                  (self.op.disk, instance.name))
4232
4233     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4234     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4235     for node in nodenames:
4236       info = nodeinfo.get(node, None)
4237       if not info:
4238         raise errors.OpPrereqError("Cannot get current information"
4239                                    " from node '%s'" % node)
4240       vg_free = info.get('vg_free', None)
4241       if not isinstance(vg_free, int):
4242         raise errors.OpPrereqError("Can't compute free disk space on"
4243                                    " node %s" % node)
4244       if self.op.amount > info['vg_free']:
4245         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4246                                    " %d MiB available, %d MiB required" %
4247                                    (node, info['vg_free'], self.op.amount))
4248
4249   def Exec(self, feedback_fn):
4250     """Execute disk grow.
4251
4252     """
4253     instance = self.instance
4254     disk = instance.FindDisk(self.op.disk)
4255     for node in (instance.secondary_nodes + (instance.primary_node,)):
4256       self.cfg.SetDiskID(disk, node)
4257       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4258       if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4259         raise errors.OpExecError("grow request failed to node %s" % node)
4260       elif not result[0]:
4261         raise errors.OpExecError("grow request failed to node %s: %s" %
4262                                  (node, result[1]))
4263     disk.RecordGrow(self.op.amount)
4264     self.cfg.Update(instance)
4265     return
4266
4267
4268 class LUQueryInstanceData(NoHooksLU):
4269   """Query runtime instance data.
4270
4271   """
4272   _OP_REQP = ["instances"]
4273   REQ_BGL = False
4274   def ExpandNames(self):
4275     self.needed_locks = {}
4276     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4277
4278     if not isinstance(self.op.instances, list):
4279       raise errors.OpPrereqError("Invalid argument type 'instances'")
4280
4281     if self.op.instances:
4282       self.wanted_names = []
4283       for name in self.op.instances:
4284         full_name = self.cfg.ExpandInstanceName(name)
4285         if full_name is None:
4286           raise errors.OpPrereqError("Instance '%s' not known" %
4287                                      self.op.instance_name)
4288         self.wanted_names.append(full_name)
4289       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4290     else:
4291       self.wanted_names = None
4292       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4293
4294     self.needed_locks[locking.LEVEL_NODE] = []
4295     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4296
4297   def DeclareLocks(self, level):
4298     if level == locking.LEVEL_NODE:
4299       self._LockInstancesNodes()
4300
4301   def CheckPrereq(self):
4302     """Check prerequisites.
4303
4304     This only checks the optional instance list against the existing names.
4305
4306     """
4307     if self.wanted_names is None:
4308       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4309
4310     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4311                              in self.wanted_names]
4312     return
4313
4314   def _ComputeDiskStatus(self, instance, snode, dev):
4315     """Compute block device status.
4316
4317     """
4318     self.cfg.SetDiskID(dev, instance.primary_node)
4319     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4320     if dev.dev_type in constants.LDS_DRBD:
4321       # we change the snode then (otherwise we use the one passed in)
4322       if dev.logical_id[0] == instance.primary_node:
4323         snode = dev.logical_id[1]
4324       else:
4325         snode = dev.logical_id[0]
4326
4327     if snode:
4328       self.cfg.SetDiskID(dev, snode)
4329       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4330     else:
4331       dev_sstatus = None
4332
4333     if dev.children:
4334       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4335                       for child in dev.children]
4336     else:
4337       dev_children = []
4338
4339     data = {
4340       "iv_name": dev.iv_name,
4341       "dev_type": dev.dev_type,
4342       "logical_id": dev.logical_id,
4343       "physical_id": dev.physical_id,
4344       "pstatus": dev_pstatus,
4345       "sstatus": dev_sstatus,
4346       "children": dev_children,
4347       }
4348
4349     return data
4350
4351   def Exec(self, feedback_fn):
4352     """Gather and return data"""
4353     result = {}
4354     for instance in self.wanted_instances:
4355       remote_info = rpc.call_instance_info(instance.primary_node,
4356                                                 instance.name)
4357       if remote_info and "state" in remote_info:
4358         remote_state = "up"
4359       else:
4360         remote_state = "down"
4361       if instance.status == "down":
4362         config_state = "down"
4363       else:
4364         config_state = "up"
4365
4366       disks = [self._ComputeDiskStatus(instance, None, device)
4367                for device in instance.disks]
4368
4369       idict = {
4370         "name": instance.name,
4371         "config_state": config_state,
4372         "run_state": remote_state,
4373         "pnode": instance.primary_node,
4374         "snodes": instance.secondary_nodes,
4375         "os": instance.os,
4376         "memory": instance.memory,
4377         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4378         "disks": disks,
4379         "vcpus": instance.vcpus,
4380         }
4381
4382       htkind = self.sstore.GetHypervisorType()
4383       if htkind == constants.HT_XEN_PVM30:
4384         idict["kernel_path"] = instance.kernel_path
4385         idict["initrd_path"] = instance.initrd_path
4386
4387       if htkind == constants.HT_XEN_HVM31:
4388         idict["hvm_boot_order"] = instance.hvm_boot_order
4389         idict["hvm_acpi"] = instance.hvm_acpi
4390         idict["hvm_pae"] = instance.hvm_pae
4391         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4392         idict["hvm_nic_type"] = instance.hvm_nic_type
4393         idict["hvm_disk_type"] = instance.hvm_disk_type
4394
4395       if htkind in constants.HTS_REQ_PORT:
4396         if instance.vnc_bind_address is None:
4397           vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4398         else:
4399           vnc_bind_address = instance.vnc_bind_address
4400         if instance.network_port is None:
4401           vnc_console_port = None
4402         elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4403           vnc_console_port = "%s:%s" % (instance.primary_node,
4404                                        instance.network_port)
4405         elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4406           vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4407                                                    instance.network_port,
4408                                                    instance.primary_node)
4409         else:
4410           vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4411                                         instance.network_port)
4412         idict["vnc_console_port"] = vnc_console_port
4413         idict["vnc_bind_address"] = vnc_bind_address
4414         idict["network_port"] = instance.network_port
4415
4416       result[instance.name] = idict
4417
4418     return result
4419
4420
4421 class LUSetInstanceParams(LogicalUnit):
4422   """Modifies an instances's parameters.
4423
4424   """
4425   HPATH = "instance-modify"
4426   HTYPE = constants.HTYPE_INSTANCE
4427   _OP_REQP = ["instance_name"]
4428   REQ_BGL = False
4429
4430   def ExpandNames(self):
4431     self._ExpandAndLockInstance()
4432
4433   def BuildHooksEnv(self):
4434     """Build hooks env.
4435
4436     This runs on the master, primary and secondaries.
4437
4438     """
4439     args = dict()
4440     if self.mem:
4441       args['memory'] = self.mem
4442     if self.vcpus:
4443       args['vcpus'] = self.vcpus
4444     if self.do_ip or self.do_bridge or self.mac:
4445       if self.do_ip:
4446         ip = self.ip
4447       else:
4448         ip = self.instance.nics[0].ip
4449       if self.bridge:
4450         bridge = self.bridge
4451       else:
4452         bridge = self.instance.nics[0].bridge
4453       if self.mac:
4454         mac = self.mac
4455       else:
4456         mac = self.instance.nics[0].mac
4457       args['nics'] = [(ip, bridge, mac)]
4458     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4459     nl = [self.sstore.GetMasterNode(),
4460           self.instance.primary_node] + list(self.instance.secondary_nodes)
4461     return env, nl, nl
4462
4463   def CheckPrereq(self):
4464     """Check prerequisites.
4465
4466     This only checks the instance list against the existing names.
4467
4468     """
4469     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4470     # a separate CheckArguments function, if we implement one, so the operation
4471     # can be aborted without waiting for any lock, should it have an error...
4472     self.mem = getattr(self.op, "mem", None)
4473     self.vcpus = getattr(self.op, "vcpus", None)
4474     self.ip = getattr(self.op, "ip", None)
4475     self.mac = getattr(self.op, "mac", None)
4476     self.bridge = getattr(self.op, "bridge", None)
4477     self.kernel_path = getattr(self.op, "kernel_path", None)
4478     self.initrd_path = getattr(self.op, "initrd_path", None)
4479     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4480     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4481     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4482     self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4483     self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4484     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4485     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4486     self.force = getattr(self.op, "force", None)
4487     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4488                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4489                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4490                  self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4491     if all_parms.count(None) == len(all_parms):
4492       raise errors.OpPrereqError("No changes submitted")
4493     if self.mem is not None:
4494       try:
4495         self.mem = int(self.mem)
4496       except ValueError, err:
4497         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4498     if self.vcpus is not None:
4499       try:
4500         self.vcpus = int(self.vcpus)
4501       except ValueError, err:
4502         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4503     if self.ip is not None:
4504       self.do_ip = True
4505       if self.ip.lower() == "none":
4506         self.ip = None
4507       else:
4508         if not utils.IsValidIP(self.ip):
4509           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4510     else:
4511       self.do_ip = False
4512     self.do_bridge = (self.bridge is not None)
4513     if self.mac is not None:
4514       if self.cfg.IsMacInUse(self.mac):
4515         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4516                                    self.mac)
4517       if not utils.IsValidMac(self.mac):
4518         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4519
4520     if self.kernel_path is not None:
4521       self.do_kernel_path = True
4522       if self.kernel_path == constants.VALUE_NONE:
4523         raise errors.OpPrereqError("Can't set instance to no kernel")
4524
4525       if self.kernel_path != constants.VALUE_DEFAULT:
4526         if not os.path.isabs(self.kernel_path):
4527           raise errors.OpPrereqError("The kernel path must be an absolute"
4528                                     " filename")
4529     else:
4530       self.do_kernel_path = False
4531
4532     if self.initrd_path is not None:
4533       self.do_initrd_path = True
4534       if self.initrd_path not in (constants.VALUE_NONE,
4535                                   constants.VALUE_DEFAULT):
4536         if not os.path.isabs(self.initrd_path):
4537           raise errors.OpPrereqError("The initrd path must be an absolute"
4538                                     " filename")
4539     else:
4540       self.do_initrd_path = False
4541
4542     # boot order verification
4543     if self.hvm_boot_order is not None:
4544       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4545         if len(self.hvm_boot_order.strip("acdn")) != 0:
4546           raise errors.OpPrereqError("invalid boot order specified,"
4547                                      " must be one or more of [acdn]"
4548                                      " or 'default'")
4549
4550     # hvm_cdrom_image_path verification
4551     if self.op.hvm_cdrom_image_path is not None:
4552       if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4553               self.op.hvm_cdrom_image_path.lower() == "none"):
4554         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4555                                    " be an absolute path or None, not %s" %
4556                                    self.op.hvm_cdrom_image_path)
4557       if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4558               self.op.hvm_cdrom_image_path.lower() == "none"):
4559         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4560                                    " regular file or a symlink pointing to"
4561                                    " an existing regular file, not %s" %
4562                                    self.op.hvm_cdrom_image_path)
4563
4564     # vnc_bind_address verification
4565     if self.op.vnc_bind_address is not None:
4566       if not utils.IsValidIP(self.op.vnc_bind_address):
4567         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4568                                    " like a valid IP address" %
4569                                    self.op.vnc_bind_address)
4570
4571     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4572     assert self.instance is not None, \
4573       "Cannot retrieve locked instance %s" % self.op.instance_name
4574     self.warn = []
4575     if self.mem is not None and not self.force:
4576       pnode = self.instance.primary_node
4577       nodelist = [pnode]
4578       nodelist.extend(instance.secondary_nodes)
4579       instance_info = rpc.call_instance_info(pnode, instance.name)
4580       nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4581
4582       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4583         # Assume the primary node is unreachable and go ahead
4584         self.warn.append("Can't get info from primary node %s" % pnode)
4585       else:
4586         if instance_info:
4587           current_mem = instance_info['memory']
4588         else:
4589           # Assume instance not running
4590           # (there is a slight race condition here, but it's not very probable,
4591           # and we have no other way to check)
4592           current_mem = 0
4593         miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4594         if miss_mem > 0:
4595           raise errors.OpPrereqError("This change will prevent the instance"
4596                                      " from starting, due to %d MB of memory"
4597                                      " missing on its primary node" % miss_mem)
4598
4599       for node in instance.secondary_nodes:
4600         if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4601           self.warn.append("Can't get info from secondary node %s" % node)
4602         elif self.mem > nodeinfo[node]['memory_free']:
4603           self.warn.append("Not enough memory to failover instance to secondary"
4604                            " node %s" % node)
4605
4606     # Xen HVM device type checks
4607     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
4608       if self.op.hvm_nic_type is not None:
4609         if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4610           raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4611                                      " HVM  hypervisor" % self.op.hvm_nic_type)
4612       if self.op.hvm_disk_type is not None:
4613         if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4614           raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4615                                      " HVM hypervisor" % self.op.hvm_disk_type)
4616
4617     return
4618
4619   def Exec(self, feedback_fn):
4620     """Modifies an instance.
4621
4622     All parameters take effect only at the next restart of the instance.
4623     """
4624     # Process here the warnings from CheckPrereq, as we don't have a
4625     # feedback_fn there.
4626     for warn in self.warn:
4627       feedback_fn("WARNING: %s" % warn)
4628
4629     result = []
4630     instance = self.instance
4631     if self.mem:
4632       instance.memory = self.mem
4633       result.append(("mem", self.mem))
4634     if self.vcpus:
4635       instance.vcpus = self.vcpus
4636       result.append(("vcpus",  self.vcpus))
4637     if self.do_ip:
4638       instance.nics[0].ip = self.ip
4639       result.append(("ip", self.ip))
4640     if self.bridge:
4641       instance.nics[0].bridge = self.bridge
4642       result.append(("bridge", self.bridge))
4643     if self.mac:
4644       instance.nics[0].mac = self.mac
4645       result.append(("mac", self.mac))
4646     if self.do_kernel_path:
4647       instance.kernel_path = self.kernel_path
4648       result.append(("kernel_path", self.kernel_path))
4649     if self.do_initrd_path:
4650       instance.initrd_path = self.initrd_path
4651       result.append(("initrd_path", self.initrd_path))
4652     if self.hvm_boot_order:
4653       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4654         instance.hvm_boot_order = None
4655       else:
4656         instance.hvm_boot_order = self.hvm_boot_order
4657       result.append(("hvm_boot_order", self.hvm_boot_order))
4658     if self.hvm_acpi is not None:
4659       instance.hvm_acpi = self.hvm_acpi
4660       result.append(("hvm_acpi", self.hvm_acpi))
4661     if self.hvm_pae is not None:
4662       instance.hvm_pae = self.hvm_pae
4663       result.append(("hvm_pae", self.hvm_pae))
4664     if self.hvm_nic_type is not None:
4665       instance.hvm_nic_type = self.hvm_nic_type
4666       result.append(("hvm_nic_type", self.hvm_nic_type))
4667     if self.hvm_disk_type is not None:
4668       instance.hvm_disk_type = self.hvm_disk_type
4669       result.append(("hvm_disk_type", self.hvm_disk_type))
4670     if self.hvm_cdrom_image_path:
4671       if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4672         instance.hvm_cdrom_image_path = None
4673       else:
4674         instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4675       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4676     if self.vnc_bind_address:
4677       instance.vnc_bind_address = self.vnc_bind_address
4678       result.append(("vnc_bind_address", self.vnc_bind_address))
4679
4680     self.cfg.Update(instance)
4681
4682     return result
4683
4684
4685 class LUQueryExports(NoHooksLU):
4686   """Query the exports list
4687
4688   """
4689   _OP_REQP = ['nodes']
4690   REQ_BGL = False
4691
4692   def ExpandNames(self):
4693     self.needed_locks = {}
4694     self.share_locks[locking.LEVEL_NODE] = 1
4695     if not self.op.nodes:
4696       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4697     else:
4698       self.needed_locks[locking.LEVEL_NODE] = \
4699         _GetWantedNodes(self, self.op.nodes)
4700
4701   def CheckPrereq(self):
4702     """Check prerequisites.
4703
4704     """
4705     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4706
4707   def Exec(self, feedback_fn):
4708     """Compute the list of all the exported system images.
4709
4710     Returns:
4711       a dictionary with the structure node->(export-list)
4712       where export-list is a list of the instances exported on
4713       that node.
4714
4715     """
4716     return rpc.call_export_list(self.nodes)
4717
4718
4719 class LUExportInstance(LogicalUnit):
4720   """Export an instance to an image in the cluster.
4721
4722   """
4723   HPATH = "instance-export"
4724   HTYPE = constants.HTYPE_INSTANCE
4725   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4726   REQ_BGL = False
4727
4728   def ExpandNames(self):
4729     self._ExpandAndLockInstance()
4730     # FIXME: lock only instance primary and destination node
4731     #
4732     # Sad but true, for now we have do lock all nodes, as we don't know where
4733     # the previous export might be, and and in this LU we search for it and
4734     # remove it from its current node. In the future we could fix this by:
4735     #  - making a tasklet to search (share-lock all), then create the new one,
4736     #    then one to remove, after
4737     #  - removing the removal operation altoghether
4738     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4739
4740   def DeclareLocks(self, level):
4741     """Last minute lock declaration."""
4742     # All nodes are locked anyway, so nothing to do here.
4743
4744   def BuildHooksEnv(self):
4745     """Build hooks env.
4746
4747     This will run on the master, primary node and target node.
4748
4749     """
4750     env = {
4751       "EXPORT_NODE": self.op.target_node,
4752       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4753       }
4754     env.update(_BuildInstanceHookEnvByObject(self.instance))
4755     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4756           self.op.target_node]
4757     return env, nl, nl
4758
4759   def CheckPrereq(self):
4760     """Check prerequisites.
4761
4762     This checks that the instance and node names are valid.
4763
4764     """
4765     instance_name = self.op.instance_name
4766     self.instance = self.cfg.GetInstanceInfo(instance_name)
4767     assert self.instance is not None, \
4768           "Cannot retrieve locked instance %s" % self.op.instance_name
4769
4770     self.dst_node = self.cfg.GetNodeInfo(
4771       self.cfg.ExpandNodeName(self.op.target_node))
4772
4773     assert self.dst_node is not None, \
4774           "Cannot retrieve locked node %s" % self.op.target_node
4775
4776     # instance disk type verification
4777     for disk in self.instance.disks:
4778       if disk.dev_type == constants.LD_FILE:
4779         raise errors.OpPrereqError("Export not supported for instances with"
4780                                    " file-based disks")
4781
4782   def Exec(self, feedback_fn):
4783     """Export an instance to an image in the cluster.
4784
4785     """
4786     instance = self.instance
4787     dst_node = self.dst_node
4788     src_node = instance.primary_node
4789     if self.op.shutdown:
4790       # shutdown the instance, but not the disks
4791       if not rpc.call_instance_shutdown(src_node, instance):
4792         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4793                                  (instance.name, src_node))
4794
4795     vgname = self.cfg.GetVGName()
4796
4797     snap_disks = []
4798
4799     try:
4800       for disk in instance.disks:
4801         if disk.iv_name == "sda":
4802           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4803           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4804
4805           if not new_dev_name:
4806             logger.Error("could not snapshot block device %s on node %s" %
4807                          (disk.logical_id[1], src_node))
4808           else:
4809             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4810                                       logical_id=(vgname, new_dev_name),
4811                                       physical_id=(vgname, new_dev_name),
4812                                       iv_name=disk.iv_name)
4813             snap_disks.append(new_dev)
4814
4815     finally:
4816       if self.op.shutdown and instance.status == "up":
4817         if not rpc.call_instance_start(src_node, instance, None):
4818           _ShutdownInstanceDisks(instance, self.cfg)
4819           raise errors.OpExecError("Could not start instance")
4820
4821     # TODO: check for size
4822
4823     for dev in snap_disks:
4824       if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4825         logger.Error("could not export block device %s from node %s to node %s"
4826                      % (dev.logical_id[1], src_node, dst_node.name))
4827       if not rpc.call_blockdev_remove(src_node, dev):
4828         logger.Error("could not remove snapshot block device %s from node %s" %
4829                      (dev.logical_id[1], src_node))
4830
4831     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4832       logger.Error("could not finalize export for instance %s on node %s" %
4833                    (instance.name, dst_node.name))
4834
4835     nodelist = self.cfg.GetNodeList()
4836     nodelist.remove(dst_node.name)
4837
4838     # on one-node clusters nodelist will be empty after the removal
4839     # if we proceed the backup would be removed because OpQueryExports
4840     # substitutes an empty list with the full cluster node list.
4841     if nodelist:
4842       exportlist = rpc.call_export_list(nodelist)
4843       for node in exportlist:
4844         if instance.name in exportlist[node]:
4845           if not rpc.call_export_remove(node, instance.name):
4846             logger.Error("could not remove older export for instance %s"
4847                          " on node %s" % (instance.name, node))
4848
4849
4850 class LURemoveExport(NoHooksLU):
4851   """Remove exports related to the named instance.
4852
4853   """
4854   _OP_REQP = ["instance_name"]
4855   REQ_BGL = False
4856
4857   def ExpandNames(self):
4858     self.needed_locks = {}
4859     # We need all nodes to be locked in order for RemoveExport to work, but we
4860     # don't need to lock the instance itself, as nothing will happen to it (and
4861     # we can remove exports also for a removed instance)
4862     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4863
4864   def CheckPrereq(self):
4865     """Check prerequisites.
4866     """
4867     pass
4868
4869   def Exec(self, feedback_fn):
4870     """Remove any export.
4871
4872     """
4873     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4874     # If the instance was not found we'll try with the name that was passed in.
4875     # This will only work if it was an FQDN, though.
4876     fqdn_warn = False
4877     if not instance_name:
4878       fqdn_warn = True
4879       instance_name = self.op.instance_name
4880
4881     exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE])
4882     found = False
4883     for node in exportlist:
4884       if instance_name in exportlist[node]:
4885         found = True
4886         if not rpc.call_export_remove(node, instance_name):
4887           logger.Error("could not remove export for instance %s"
4888                        " on node %s" % (instance_name, node))
4889
4890     if fqdn_warn and not found:
4891       feedback_fn("Export not found. If trying to remove an export belonging"
4892                   " to a deleted instance please use its Fully Qualified"
4893                   " Domain Name.")
4894
4895
4896 class TagsLU(NoHooksLU):
4897   """Generic tags LU.
4898
4899   This is an abstract class which is the parent of all the other tags LUs.
4900
4901   """
4902
4903   def ExpandNames(self):
4904     self.needed_locks = {}
4905     if self.op.kind == constants.TAG_NODE:
4906       name = self.cfg.ExpandNodeName(self.op.name)
4907       if name is None:
4908         raise errors.OpPrereqError("Invalid node name (%s)" %
4909                                    (self.op.name,))
4910       self.op.name = name
4911       self.needed_locks[locking.LEVEL_NODE] = name
4912     elif self.op.kind == constants.TAG_INSTANCE:
4913       name = self.cfg.ExpandInstanceName(self.op.name)
4914       if name is None:
4915         raise errors.OpPrereqError("Invalid instance name (%s)" %
4916                                    (self.op.name,))
4917       self.op.name = name
4918       self.needed_locks[locking.LEVEL_INSTANCE] = name
4919
4920   def CheckPrereq(self):
4921     """Check prerequisites.
4922
4923     """
4924     if self.op.kind == constants.TAG_CLUSTER:
4925       self.target = self.cfg.GetClusterInfo()
4926     elif self.op.kind == constants.TAG_NODE:
4927       self.target = self.cfg.GetNodeInfo(self.op.name)
4928     elif self.op.kind == constants.TAG_INSTANCE:
4929       self.target = self.cfg.GetInstanceInfo(self.op.name)
4930     else:
4931       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4932                                  str(self.op.kind))
4933
4934
4935 class LUGetTags(TagsLU):
4936   """Returns the tags of a given object.
4937
4938   """
4939   _OP_REQP = ["kind", "name"]
4940   REQ_BGL = False
4941
4942   def Exec(self, feedback_fn):
4943     """Returns the tag list.
4944
4945     """
4946     return list(self.target.GetTags())
4947
4948
4949 class LUSearchTags(NoHooksLU):
4950   """Searches the tags for a given pattern.
4951
4952   """
4953   _OP_REQP = ["pattern"]
4954   REQ_BGL = False
4955
4956   def ExpandNames(self):
4957     self.needed_locks = {}
4958
4959   def CheckPrereq(self):
4960     """Check prerequisites.
4961
4962     This checks the pattern passed for validity by compiling it.
4963
4964     """
4965     try:
4966       self.re = re.compile(self.op.pattern)
4967     except re.error, err:
4968       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4969                                  (self.op.pattern, err))
4970
4971   def Exec(self, feedback_fn):
4972     """Returns the tag list.
4973
4974     """
4975     cfg = self.cfg
4976     tgts = [("/cluster", cfg.GetClusterInfo())]
4977     ilist = cfg.GetAllInstancesInfo().values()
4978     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4979     nlist = cfg.GetAllNodesInfo().values()
4980     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4981     results = []
4982     for path, target in tgts:
4983       for tag in target.GetTags():
4984         if self.re.search(tag):
4985           results.append((path, tag))
4986     return results
4987
4988
4989 class LUAddTags(TagsLU):
4990   """Sets a tag on a given object.
4991
4992   """
4993   _OP_REQP = ["kind", "name", "tags"]
4994   REQ_BGL = False
4995
4996   def CheckPrereq(self):
4997     """Check prerequisites.
4998
4999     This checks the type and length of the tag name and value.
5000
5001     """
5002     TagsLU.CheckPrereq(self)
5003     for tag in self.op.tags:
5004       objects.TaggableObject.ValidateTag(tag)
5005
5006   def Exec(self, feedback_fn):
5007     """Sets the tag.
5008
5009     """
5010     try:
5011       for tag in self.op.tags:
5012         self.target.AddTag(tag)
5013     except errors.TagError, err:
5014       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5015     try:
5016       self.cfg.Update(self.target)
5017     except errors.ConfigurationError:
5018       raise errors.OpRetryError("There has been a modification to the"
5019                                 " config file and the operation has been"
5020                                 " aborted. Please retry.")
5021
5022
5023 class LUDelTags(TagsLU):
5024   """Delete a list of tags from a given object.
5025
5026   """
5027   _OP_REQP = ["kind", "name", "tags"]
5028   REQ_BGL = False
5029
5030   def CheckPrereq(self):
5031     """Check prerequisites.
5032
5033     This checks that we have the given tag.
5034
5035     """
5036     TagsLU.CheckPrereq(self)
5037     for tag in self.op.tags:
5038       objects.TaggableObject.ValidateTag(tag)
5039     del_tags = frozenset(self.op.tags)
5040     cur_tags = self.target.GetTags()
5041     if not del_tags <= cur_tags:
5042       diff_tags = del_tags - cur_tags
5043       diff_names = ["'%s'" % tag for tag in diff_tags]
5044       diff_names.sort()
5045       raise errors.OpPrereqError("Tag(s) %s not found" %
5046                                  (",".join(diff_names)))
5047
5048   def Exec(self, feedback_fn):
5049     """Remove the tag from the object.
5050
5051     """
5052     for tag in self.op.tags:
5053       self.target.RemoveTag(tag)
5054     try:
5055       self.cfg.Update(self.target)
5056     except errors.ConfigurationError:
5057       raise errors.OpRetryError("There has been a modification to the"
5058                                 " config file and the operation has been"
5059                                 " aborted. Please retry.")
5060
5061
5062 class LUTestDelay(NoHooksLU):
5063   """Sleep for a specified amount of time.
5064
5065   This LU sleeps on the master and/or nodes for a specified amount of
5066   time.
5067
5068   """
5069   _OP_REQP = ["duration", "on_master", "on_nodes"]
5070   REQ_BGL = False
5071
5072   def ExpandNames(self):
5073     """Expand names and set required locks.
5074
5075     This expands the node list, if any.
5076
5077     """
5078     self.needed_locks = {}
5079     if self.op.on_nodes:
5080       # _GetWantedNodes can be used here, but is not always appropriate to use
5081       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5082       # more information.
5083       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5084       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5085
5086   def CheckPrereq(self):
5087     """Check prerequisites.
5088
5089     """
5090
5091   def Exec(self, feedback_fn):
5092     """Do the actual sleep.
5093
5094     """
5095     if self.op.on_master:
5096       if not utils.TestDelay(self.op.duration):
5097         raise errors.OpExecError("Error during master delay test")
5098     if self.op.on_nodes:
5099       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5100       if not result:
5101         raise errors.OpExecError("Complete failure from rpc call")
5102       for node, node_result in result.items():
5103         if not node_result:
5104           raise errors.OpExecError("Failure during rpc call to node %s,"
5105                                    " result: %s" % (node, node_result))
5106
5107
5108 class IAllocator(object):
5109   """IAllocator framework.
5110
5111   An IAllocator instance has three sets of attributes:
5112     - cfg/sstore that are needed to query the cluster
5113     - input data (all members of the _KEYS class attribute are required)
5114     - four buffer attributes (in|out_data|text), that represent the
5115       input (to the external script) in text and data structure format,
5116       and the output from it, again in two formats
5117     - the result variables from the script (success, info, nodes) for
5118       easy usage
5119
5120   """
5121   _ALLO_KEYS = [
5122     "mem_size", "disks", "disk_template",
5123     "os", "tags", "nics", "vcpus",
5124     ]
5125   _RELO_KEYS = [
5126     "relocate_from",
5127     ]
5128
5129   def __init__(self, cfg, sstore, mode, name, **kwargs):
5130     self.cfg = cfg
5131     self.sstore = sstore
5132     # init buffer variables
5133     self.in_text = self.out_text = self.in_data = self.out_data = None
5134     # init all input fields so that pylint is happy
5135     self.mode = mode
5136     self.name = name
5137     self.mem_size = self.disks = self.disk_template = None
5138     self.os = self.tags = self.nics = self.vcpus = None
5139     self.relocate_from = None
5140     # computed fields
5141     self.required_nodes = None
5142     # init result fields
5143     self.success = self.info = self.nodes = None
5144     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5145       keyset = self._ALLO_KEYS
5146     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5147       keyset = self._RELO_KEYS
5148     else:
5149       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5150                                    " IAllocator" % self.mode)
5151     for key in kwargs:
5152       if key not in keyset:
5153         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5154                                      " IAllocator" % key)
5155       setattr(self, key, kwargs[key])
5156     for key in keyset:
5157       if key not in kwargs:
5158         raise errors.ProgrammerError("Missing input parameter '%s' to"
5159                                      " IAllocator" % key)
5160     self._BuildInputData()
5161
5162   def _ComputeClusterData(self):
5163     """Compute the generic allocator input data.
5164
5165     This is the data that is independent of the actual operation.
5166
5167     """
5168     cfg = self.cfg
5169     # cluster data
5170     data = {
5171       "version": 1,
5172       "cluster_name": self.sstore.GetClusterName(),
5173       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5174       "hypervisor_type": self.sstore.GetHypervisorType(),
5175       # we don't have job IDs
5176       }
5177
5178     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5179
5180     # node data
5181     node_results = {}
5182     node_list = cfg.GetNodeList()
5183     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5184     for nname in node_list:
5185       ninfo = cfg.GetNodeInfo(nname)
5186       if nname not in node_data or not isinstance(node_data[nname], dict):
5187         raise errors.OpExecError("Can't get data for node %s" % nname)
5188       remote_info = node_data[nname]
5189       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5190                    'vg_size', 'vg_free', 'cpu_total']:
5191         if attr not in remote_info:
5192           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5193                                    (nname, attr))
5194         try:
5195           remote_info[attr] = int(remote_info[attr])
5196         except ValueError, err:
5197           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5198                                    " %s" % (nname, attr, str(err)))
5199       # compute memory used by primary instances
5200       i_p_mem = i_p_up_mem = 0
5201       for iinfo in i_list:
5202         if iinfo.primary_node == nname:
5203           i_p_mem += iinfo.memory
5204           if iinfo.status == "up":
5205             i_p_up_mem += iinfo.memory
5206
5207       # compute memory used by instances
5208       pnr = {
5209         "tags": list(ninfo.GetTags()),
5210         "total_memory": remote_info['memory_total'],
5211         "reserved_memory": remote_info['memory_dom0'],
5212         "free_memory": remote_info['memory_free'],
5213         "i_pri_memory": i_p_mem,
5214         "i_pri_up_memory": i_p_up_mem,
5215         "total_disk": remote_info['vg_size'],
5216         "free_disk": remote_info['vg_free'],
5217         "primary_ip": ninfo.primary_ip,
5218         "secondary_ip": ninfo.secondary_ip,
5219         "total_cpus": remote_info['cpu_total'],
5220         }
5221       node_results[nname] = pnr
5222     data["nodes"] = node_results
5223
5224     # instance data
5225     instance_data = {}
5226     for iinfo in i_list:
5227       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5228                   for n in iinfo.nics]
5229       pir = {
5230         "tags": list(iinfo.GetTags()),
5231         "should_run": iinfo.status == "up",
5232         "vcpus": iinfo.vcpus,
5233         "memory": iinfo.memory,
5234         "os": iinfo.os,
5235         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5236         "nics": nic_data,
5237         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5238         "disk_template": iinfo.disk_template,
5239         }
5240       instance_data[iinfo.name] = pir
5241
5242     data["instances"] = instance_data
5243
5244     self.in_data = data
5245
5246   def _AddNewInstance(self):
5247     """Add new instance data to allocator structure.
5248
5249     This in combination with _AllocatorGetClusterData will create the
5250     correct structure needed as input for the allocator.
5251
5252     The checks for the completeness of the opcode must have already been
5253     done.
5254
5255     """
5256     data = self.in_data
5257     if len(self.disks) != 2:
5258       raise errors.OpExecError("Only two-disk configurations supported")
5259
5260     disk_space = _ComputeDiskSize(self.disk_template,
5261                                   self.disks[0]["size"], self.disks[1]["size"])
5262
5263     if self.disk_template in constants.DTS_NET_MIRROR:
5264       self.required_nodes = 2
5265     else:
5266       self.required_nodes = 1
5267     request = {
5268       "type": "allocate",
5269       "name": self.name,
5270       "disk_template": self.disk_template,
5271       "tags": self.tags,
5272       "os": self.os,
5273       "vcpus": self.vcpus,
5274       "memory": self.mem_size,
5275       "disks": self.disks,
5276       "disk_space_total": disk_space,
5277       "nics": self.nics,
5278       "required_nodes": self.required_nodes,
5279       }
5280     data["request"] = request
5281
5282   def _AddRelocateInstance(self):
5283     """Add relocate instance data to allocator structure.
5284
5285     This in combination with _IAllocatorGetClusterData will create the
5286     correct structure needed as input for the allocator.
5287
5288     The checks for the completeness of the opcode must have already been
5289     done.
5290
5291     """
5292     instance = self.cfg.GetInstanceInfo(self.name)
5293     if instance is None:
5294       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5295                                    " IAllocator" % self.name)
5296
5297     if instance.disk_template not in constants.DTS_NET_MIRROR:
5298       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5299
5300     if len(instance.secondary_nodes) != 1:
5301       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5302
5303     self.required_nodes = 1
5304
5305     disk_space = _ComputeDiskSize(instance.disk_template,
5306                                   instance.disks[0].size,
5307                                   instance.disks[1].size)
5308
5309     request = {
5310       "type": "relocate",
5311       "name": self.name,
5312       "disk_space_total": disk_space,
5313       "required_nodes": self.required_nodes,
5314       "relocate_from": self.relocate_from,
5315       }
5316     self.in_data["request"] = request
5317
5318   def _BuildInputData(self):
5319     """Build input data structures.
5320
5321     """
5322     self._ComputeClusterData()
5323
5324     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5325       self._AddNewInstance()
5326     else:
5327       self._AddRelocateInstance()
5328
5329     self.in_text = serializer.Dump(self.in_data)
5330
5331   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5332     """Run an instance allocator and return the results.
5333
5334     """
5335     data = self.in_text
5336
5337     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5338
5339     if not isinstance(result, (list, tuple)) or len(result) != 4:
5340       raise errors.OpExecError("Invalid result from master iallocator runner")
5341
5342     rcode, stdout, stderr, fail = result
5343
5344     if rcode == constants.IARUN_NOTFOUND:
5345       raise errors.OpExecError("Can't find allocator '%s'" % name)
5346     elif rcode == constants.IARUN_FAILURE:
5347       raise errors.OpExecError("Instance allocator call failed: %s,"
5348                                " output: %s" % (fail, stdout+stderr))
5349     self.out_text = stdout
5350     if validate:
5351       self._ValidateResult()
5352
5353   def _ValidateResult(self):
5354     """Process the allocator results.
5355
5356     This will process and if successful save the result in
5357     self.out_data and the other parameters.
5358
5359     """
5360     try:
5361       rdict = serializer.Load(self.out_text)
5362     except Exception, err:
5363       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5364
5365     if not isinstance(rdict, dict):
5366       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5367
5368     for key in "success", "info", "nodes":
5369       if key not in rdict:
5370         raise errors.OpExecError("Can't parse iallocator results:"
5371                                  " missing key '%s'" % key)
5372       setattr(self, key, rdict[key])
5373
5374     if not isinstance(rdict["nodes"], list):
5375       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5376                                " is not a list")
5377     self.out_data = rdict
5378
5379
5380 class LUTestAllocator(NoHooksLU):
5381   """Run allocator tests.
5382
5383   This LU runs the allocator tests
5384
5385   """
5386   _OP_REQP = ["direction", "mode", "name"]
5387
5388   def CheckPrereq(self):
5389     """Check prerequisites.
5390
5391     This checks the opcode parameters depending on the director and mode test.
5392
5393     """
5394     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5395       for attr in ["name", "mem_size", "disks", "disk_template",
5396                    "os", "tags", "nics", "vcpus"]:
5397         if not hasattr(self.op, attr):
5398           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5399                                      attr)
5400       iname = self.cfg.ExpandInstanceName(self.op.name)
5401       if iname is not None:
5402         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5403                                    iname)
5404       if not isinstance(self.op.nics, list):
5405         raise errors.OpPrereqError("Invalid parameter 'nics'")
5406       for row in self.op.nics:
5407         if (not isinstance(row, dict) or
5408             "mac" not in row or
5409             "ip" not in row or
5410             "bridge" not in row):
5411           raise errors.OpPrereqError("Invalid contents of the"
5412                                      " 'nics' parameter")
5413       if not isinstance(self.op.disks, list):
5414         raise errors.OpPrereqError("Invalid parameter 'disks'")
5415       if len(self.op.disks) != 2:
5416         raise errors.OpPrereqError("Only two-disk configurations supported")
5417       for row in self.op.disks:
5418         if (not isinstance(row, dict) or
5419             "size" not in row or
5420             not isinstance(row["size"], int) or
5421             "mode" not in row or
5422             row["mode"] not in ['r', 'w']):
5423           raise errors.OpPrereqError("Invalid contents of the"
5424                                      " 'disks' parameter")
5425     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5426       if not hasattr(self.op, "name"):
5427         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5428       fname = self.cfg.ExpandInstanceName(self.op.name)
5429       if fname is None:
5430         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5431                                    self.op.name)
5432       self.op.name = fname
5433       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5434     else:
5435       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5436                                  self.op.mode)
5437
5438     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5439       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5440         raise errors.OpPrereqError("Missing allocator name")
5441     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5442       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5443                                  self.op.direction)
5444
5445   def Exec(self, feedback_fn):
5446     """Run the allocator test.
5447
5448     """
5449     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5450       ial = IAllocator(self.cfg, self.sstore,
5451                        mode=self.op.mode,
5452                        name=self.op.name,
5453                        mem_size=self.op.mem_size,
5454                        disks=self.op.disks,
5455                        disk_template=self.op.disk_template,
5456                        os=self.op.os,
5457                        tags=self.op.tags,
5458                        nics=self.op.nics,
5459                        vcpus=self.op.vcpus,
5460                        )
5461     else:
5462       ial = IAllocator(self.cfg, self.sstore,
5463                        mode=self.op.mode,
5464                        name=self.op.name,
5465                        relocate_from=list(self.relocate_from),
5466                        )
5467
5468     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5469       result = ial.in_text
5470     else:
5471       ial.Run(self.op.allocator, validate=False)
5472       result = ial.out_text
5473     return result