Allow listing of the serial_no via gnt-* list
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_MASTER: the LU needs to run on the master node
59         REQ_WSSTORE: the LU needs a writable SimpleStore
60         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61
62   Note that all commands require root permissions.
63
64   """
65   HPATH = None
66   HTYPE = None
67   _OP_REQP = []
68   REQ_MASTER = True
69   REQ_WSSTORE = False
70   REQ_BGL = True
71
72   def __init__(self, processor, op, context, sstore):
73     """Constructor for LogicalUnit.
74
75     This needs to be overriden in derived classes in order to check op
76     validity.
77
78     """
79     self.proc = processor
80     self.op = op
81     self.cfg = context.cfg
82     self.sstore = sstore
83     self.context = context
84     # Dicts used to declare locking needs to mcpu
85     self.needed_locks = None
86     self.acquired_locks = {}
87     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
88     self.add_locks = {}
89     self.remove_locks = {}
90     # Used to force good behavior when calling helper functions
91     self.recalculate_locks = {}
92     self.__ssh = None
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99
100     if not self.cfg.IsCluster():
101       raise errors.OpPrereqError("Cluster not initialized yet,"
102                                  " use 'gnt-cluster init' first.")
103     if self.REQ_MASTER:
104       master = sstore.GetMasterNode()
105       if master != utils.HostInfo().name:
106         raise errors.OpPrereqError("Commands must be run on the master"
107                                    " node %s" % master)
108
109   def __GetSSH(self):
110     """Returns the SshRunner object
111
112     """
113     if not self.__ssh:
114       self.__ssh = ssh.SshRunner(self.sstore)
115     return self.__ssh
116
117   ssh = property(fget=__GetSSH)
118
119   def ExpandNames(self):
120     """Expand names for this LU.
121
122     This method is called before starting to execute the opcode, and it should
123     update all the parameters of the opcode to their canonical form (e.g. a
124     short node name must be fully expanded after this method has successfully
125     completed). This way locking, hooks, logging, ecc. can work correctly.
126
127     LUs which implement this method must also populate the self.needed_locks
128     member, as a dict with lock levels as keys, and a list of needed lock names
129     as values. Rules:
130       - Use an empty dict if you don't need any lock
131       - If you don't need any lock at a particular level omit that level
132       - Don't put anything for the BGL level
133       - If you want all locks at a level use locking.ALL_SET as a value
134
135     If you need to share locks (rather than acquire them exclusively) at one
136     level you can modify self.share_locks, setting a true value (usually 1) for
137     that level. By default locks are not shared.
138
139     Examples:
140     # Acquire all nodes and one instance
141     self.needed_locks = {
142       locking.LEVEL_NODE: locking.ALL_SET,
143       locking.LEVEL_INSTANCE: ['instance1.example.tld'],
144     }
145     # Acquire just two nodes
146     self.needed_locks = {
147       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148     }
149     # Acquire no locks
150     self.needed_locks = {} # No, you can't leave it to the default value None
151
152     """
153     # The implementation of this method is mandatory only if the new LU is
154     # concurrent, so that old LUs don't need to be changed all at the same
155     # time.
156     if self.REQ_BGL:
157       self.needed_locks = {} # Exclusive LUs don't need locks.
158     else:
159       raise NotImplementedError
160
161   def DeclareLocks(self, level):
162     """Declare LU locking needs for a level
163
164     While most LUs can just declare their locking needs at ExpandNames time,
165     sometimes there's the need to calculate some locks after having acquired
166     the ones before. This function is called just before acquiring locks at a
167     particular level, but after acquiring the ones at lower levels, and permits
168     such calculations. It can be used to modify self.needed_locks, and by
169     default it does nothing.
170
171     This function is only called if you have something already set in
172     self.needed_locks for the level.
173
174     @param level: Locking level which is going to be locked
175     @type level: member of ganeti.locking.LEVELS
176
177     """
178
179   def CheckPrereq(self):
180     """Check prerequisites for this LU.
181
182     This method should check that the prerequisites for the execution
183     of this LU are fulfilled. It can do internode communication, but
184     it should be idempotent - no cluster or system changes are
185     allowed.
186
187     The method should raise errors.OpPrereqError in case something is
188     not fulfilled. Its return value is ignored.
189
190     This method should also update all the parameters of the opcode to
191     their canonical form if it hasn't been done by ExpandNames before.
192
193     """
194     raise NotImplementedError
195
196   def Exec(self, feedback_fn):
197     """Execute the LU.
198
199     This method should implement the actual work. It should raise
200     errors.OpExecError for failures that are somewhat dealt with in
201     code, or expected.
202
203     """
204     raise NotImplementedError
205
206   def BuildHooksEnv(self):
207     """Build hooks environment for this LU.
208
209     This method should return a three-node tuple consisting of: a dict
210     containing the environment that will be used for running the
211     specific hook for this LU, a list of node names on which the hook
212     should run before the execution, and a list of node names on which
213     the hook should run after the execution.
214
215     The keys of the dict must not have 'GANETI_' prefixed as this will
216     be handled in the hooks runner. Also note additional keys will be
217     added by the hooks runner. If the LU doesn't define any
218     environment, an empty dict (and not None) should be returned.
219
220     No nodes should be returned as an empty list (and not None).
221
222     Note that if the HPATH for a LU class is None, this function will
223     not be called.
224
225     """
226     raise NotImplementedError
227
228   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
229     """Notify the LU about the results of its hooks.
230
231     This method is called every time a hooks phase is executed, and notifies
232     the Logical Unit about the hooks' result. The LU can then use it to alter
233     its result based on the hooks.  By default the method does nothing and the
234     previous result is passed back unchanged but any LU can define it if it
235     wants to use the local cluster hook-scripts somehow.
236
237     Args:
238       phase: the hooks phase that has just been run
239       hooks_results: the results of the multi-node hooks rpc call
240       feedback_fn: function to send feedback back to the caller
241       lu_result: the previous result this LU had, or None in the PRE phase.
242
243     """
244     return lu_result
245
246   def _ExpandAndLockInstance(self):
247     """Helper function to expand and lock an instance.
248
249     Many LUs that work on an instance take its name in self.op.instance_name
250     and need to expand it and then declare the expanded name for locking. This
251     function does it, and then updates self.op.instance_name to the expanded
252     name. It also initializes needed_locks as a dict, if this hasn't been done
253     before.
254
255     """
256     if self.needed_locks is None:
257       self.needed_locks = {}
258     else:
259       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
260         "_ExpandAndLockInstance called with instance-level locks set"
261     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
262     if expanded_name is None:
263       raise errors.OpPrereqError("Instance '%s' not known" %
264                                   self.op.instance_name)
265     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
266     self.op.instance_name = expanded_name
267
268   def _LockInstancesNodes(self, primary_only=False):
269     """Helper function to declare instances' nodes for locking.
270
271     This function should be called after locking one or more instances to lock
272     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
273     with all primary or secondary nodes for instances already locked and
274     present in self.needed_locks[locking.LEVEL_INSTANCE].
275
276     It should be called from DeclareLocks, and for safety only works if
277     self.recalculate_locks[locking.LEVEL_NODE] is set.
278
279     In the future it may grow parameters to just lock some instance's nodes, or
280     to just lock primaries or secondary nodes, if needed.
281
282     If should be called in DeclareLocks in a way similar to:
283
284     if level == locking.LEVEL_NODE:
285       self._LockInstancesNodes()
286
287     @type primary_only: boolean
288     @param primary_only: only lock primary nodes of locked instances
289
290     """
291     assert locking.LEVEL_NODE in self.recalculate_locks, \
292       "_LockInstancesNodes helper function called with no nodes to recalculate"
293
294     # TODO: check if we're really been called with the instance locks held
295
296     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
297     # future we might want to have different behaviors depending on the value
298     # of self.recalculate_locks[locking.LEVEL_NODE]
299     wanted_nodes = []
300     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
301       instance = self.context.cfg.GetInstanceInfo(instance_name)
302       wanted_nodes.append(instance.primary_node)
303       if not primary_only:
304         wanted_nodes.extend(instance.secondary_nodes)
305
306     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
307       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
308     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
309       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
310
311     del self.recalculate_locks[locking.LEVEL_NODE]
312
313
314 class NoHooksLU(LogicalUnit):
315   """Simple LU which runs no hooks.
316
317   This LU is intended as a parent for other LogicalUnits which will
318   run no hooks, in order to reduce duplicate code.
319
320   """
321   HPATH = None
322   HTYPE = None
323
324
325 def _GetWantedNodes(lu, nodes):
326   """Returns list of checked and expanded node names.
327
328   Args:
329     nodes: List of nodes (strings) or None for all
330
331   """
332   if not isinstance(nodes, list):
333     raise errors.OpPrereqError("Invalid argument type 'nodes'")
334
335   if not nodes:
336     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
337       " non-empty list of nodes whose name is to be expanded.")
338
339   wanted = []
340   for name in nodes:
341     node = lu.cfg.ExpandNodeName(name)
342     if node is None:
343       raise errors.OpPrereqError("No such node name '%s'" % name)
344     wanted.append(node)
345
346   return utils.NiceSort(wanted)
347
348
349 def _GetWantedInstances(lu, instances):
350   """Returns list of checked and expanded instance names.
351
352   Args:
353     instances: List of instances (strings) or None for all
354
355   """
356   if not isinstance(instances, list):
357     raise errors.OpPrereqError("Invalid argument type 'instances'")
358
359   if instances:
360     wanted = []
361
362     for name in instances:
363       instance = lu.cfg.ExpandInstanceName(name)
364       if instance is None:
365         raise errors.OpPrereqError("No such instance name '%s'" % name)
366       wanted.append(instance)
367
368   else:
369     wanted = lu.cfg.GetInstanceList()
370   return utils.NiceSort(wanted)
371
372
373 def _CheckOutputFields(static, dynamic, selected):
374   """Checks whether all selected fields are valid.
375
376   Args:
377     static: Static fields
378     dynamic: Dynamic fields
379
380   """
381   static_fields = frozenset(static)
382   dynamic_fields = frozenset(dynamic)
383
384   all_fields = static_fields | dynamic_fields
385
386   if not all_fields.issuperset(selected):
387     raise errors.OpPrereqError("Unknown output fields selected: %s"
388                                % ",".join(frozenset(selected).
389                                           difference(all_fields)))
390
391
392 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
393                           memory, vcpus, nics):
394   """Builds instance related env variables for hooks from single variables.
395
396   Args:
397     secondary_nodes: List of secondary nodes as strings
398   """
399   env = {
400     "OP_TARGET": name,
401     "INSTANCE_NAME": name,
402     "INSTANCE_PRIMARY": primary_node,
403     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
404     "INSTANCE_OS_TYPE": os_type,
405     "INSTANCE_STATUS": status,
406     "INSTANCE_MEMORY": memory,
407     "INSTANCE_VCPUS": vcpus,
408   }
409
410   if nics:
411     nic_count = len(nics)
412     for idx, (ip, bridge, mac) in enumerate(nics):
413       if ip is None:
414         ip = ""
415       env["INSTANCE_NIC%d_IP" % idx] = ip
416       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
417       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418   else:
419     nic_count = 0
420
421   env["INSTANCE_NIC_COUNT"] = nic_count
422
423   return env
424
425
426 def _BuildInstanceHookEnvByObject(instance, override=None):
427   """Builds instance related env variables for hooks from an object.
428
429   Args:
430     instance: objects.Instance object of instance
431     override: dict of values to override
432   """
433   args = {
434     'name': instance.name,
435     'primary_node': instance.primary_node,
436     'secondary_nodes': instance.secondary_nodes,
437     'os_type': instance.os,
438     'status': instance.os,
439     'memory': instance.memory,
440     'vcpus': instance.vcpus,
441     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
442   }
443   if override:
444     args.update(override)
445   return _BuildInstanceHookEnv(**args)
446
447
448 def _CheckInstanceBridgesExist(instance):
449   """Check that the brigdes needed by an instance exist.
450
451   """
452   # check bridges existance
453   brlist = [nic.bridge for nic in instance.nics]
454   if not rpc.call_bridges_exist(instance.primary_node, brlist):
455     raise errors.OpPrereqError("one or more target bridges %s does not"
456                                " exist on destination node '%s'" %
457                                (brlist, instance.primary_node))
458
459
460 class LUDestroyCluster(NoHooksLU):
461   """Logical unit for destroying the cluster.
462
463   """
464   _OP_REQP = []
465
466   def CheckPrereq(self):
467     """Check prerequisites.
468
469     This checks whether the cluster is empty.
470
471     Any errors are signalled by raising errors.OpPrereqError.
472
473     """
474     master = self.sstore.GetMasterNode()
475
476     nodelist = self.cfg.GetNodeList()
477     if len(nodelist) != 1 or nodelist[0] != master:
478       raise errors.OpPrereqError("There are still %d node(s) in"
479                                  " this cluster." % (len(nodelist) - 1))
480     instancelist = self.cfg.GetInstanceList()
481     if instancelist:
482       raise errors.OpPrereqError("There are still %d instance(s) in"
483                                  " this cluster." % len(instancelist))
484
485   def Exec(self, feedback_fn):
486     """Destroys the cluster.
487
488     """
489     master = self.sstore.GetMasterNode()
490     if not rpc.call_node_stop_master(master, False):
491       raise errors.OpExecError("Could not disable the master role")
492     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
493     utils.CreateBackup(priv_key)
494     utils.CreateBackup(pub_key)
495     return master
496
497
498 class LUVerifyCluster(LogicalUnit):
499   """Verifies the cluster status.
500
501   """
502   HPATH = "cluster-verify"
503   HTYPE = constants.HTYPE_CLUSTER
504   _OP_REQP = ["skip_checks"]
505   REQ_BGL = False
506
507   def ExpandNames(self):
508     self.needed_locks = {
509       locking.LEVEL_NODE: locking.ALL_SET,
510       locking.LEVEL_INSTANCE: locking.ALL_SET,
511     }
512     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
513
514   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
515                   remote_version, feedback_fn):
516     """Run multiple tests against a node.
517
518     Test list:
519       - compares ganeti version
520       - checks vg existance and size > 20G
521       - checks config file checksum
522       - checks ssh to other nodes
523
524     Args:
525       node: name of the node to check
526       file_list: required list of files
527       local_cksum: dictionary of local files and their checksums
528
529     """
530     # compares ganeti version
531     local_version = constants.PROTOCOL_VERSION
532     if not remote_version:
533       feedback_fn("  - ERROR: connection to %s failed" % (node))
534       return True
535
536     if local_version != remote_version:
537       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
538                       (local_version, node, remote_version))
539       return True
540
541     # checks vg existance and size > 20G
542
543     bad = False
544     if not vglist:
545       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
546                       (node,))
547       bad = True
548     else:
549       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
550                                             constants.MIN_VG_SIZE)
551       if vgstatus:
552         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
553         bad = True
554
555     # checks config file checksum
556     # checks ssh to any
557
558     if 'filelist' not in node_result:
559       bad = True
560       feedback_fn("  - ERROR: node hasn't returned file checksum data")
561     else:
562       remote_cksum = node_result['filelist']
563       for file_name in file_list:
564         if file_name not in remote_cksum:
565           bad = True
566           feedback_fn("  - ERROR: file '%s' missing" % file_name)
567         elif remote_cksum[file_name] != local_cksum[file_name]:
568           bad = True
569           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
570
571     if 'nodelist' not in node_result:
572       bad = True
573       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
574     else:
575       if node_result['nodelist']:
576         bad = True
577         for node in node_result['nodelist']:
578           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
579                           (node, node_result['nodelist'][node]))
580     if 'node-net-test' not in node_result:
581       bad = True
582       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
583     else:
584       if node_result['node-net-test']:
585         bad = True
586         nlist = utils.NiceSort(node_result['node-net-test'].keys())
587         for node in nlist:
588           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
589                           (node, node_result['node-net-test'][node]))
590
591     hyp_result = node_result.get('hypervisor', None)
592     if hyp_result is not None:
593       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
594     return bad
595
596   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
597                       node_instance, feedback_fn):
598     """Verify an instance.
599
600     This function checks to see if the required block devices are
601     available on the instance's node.
602
603     """
604     bad = False
605
606     node_current = instanceconfig.primary_node
607
608     node_vol_should = {}
609     instanceconfig.MapLVsByNode(node_vol_should)
610
611     for node in node_vol_should:
612       for volume in node_vol_should[node]:
613         if node not in node_vol_is or volume not in node_vol_is[node]:
614           feedback_fn("  - ERROR: volume %s missing on node %s" %
615                           (volume, node))
616           bad = True
617
618     if not instanceconfig.status == 'down':
619       if (node_current not in node_instance or
620           not instance in node_instance[node_current]):
621         feedback_fn("  - ERROR: instance %s not running on node %s" %
622                         (instance, node_current))
623         bad = True
624
625     for node in node_instance:
626       if (not node == node_current):
627         if instance in node_instance[node]:
628           feedback_fn("  - ERROR: instance %s should not run on node %s" %
629                           (instance, node))
630           bad = True
631
632     return bad
633
634   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
635     """Verify if there are any unknown volumes in the cluster.
636
637     The .os, .swap and backup volumes are ignored. All other volumes are
638     reported as unknown.
639
640     """
641     bad = False
642
643     for node in node_vol_is:
644       for volume in node_vol_is[node]:
645         if node not in node_vol_should or volume not in node_vol_should[node]:
646           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
647                       (volume, node))
648           bad = True
649     return bad
650
651   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
652     """Verify the list of running instances.
653
654     This checks what instances are running but unknown to the cluster.
655
656     """
657     bad = False
658     for node in node_instance:
659       for runninginstance in node_instance[node]:
660         if runninginstance not in instancelist:
661           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
662                           (runninginstance, node))
663           bad = True
664     return bad
665
666   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
667     """Verify N+1 Memory Resilience.
668
669     Check that if one single node dies we can still start all the instances it
670     was primary for.
671
672     """
673     bad = False
674
675     for node, nodeinfo in node_info.iteritems():
676       # This code checks that every node which is now listed as secondary has
677       # enough memory to host all instances it is supposed to should a single
678       # other node in the cluster fail.
679       # FIXME: not ready for failover to an arbitrary node
680       # FIXME: does not support file-backed instances
681       # WARNING: we currently take into account down instances as well as up
682       # ones, considering that even if they're down someone might want to start
683       # them even in the event of a node failure.
684       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
685         needed_mem = 0
686         for instance in instances:
687           needed_mem += instance_cfg[instance].memory
688         if nodeinfo['mfree'] < needed_mem:
689           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
690                       " failovers should node %s fail" % (node, prinode))
691           bad = True
692     return bad
693
694   def CheckPrereq(self):
695     """Check prerequisites.
696
697     Transform the list of checks we're going to skip into a set and check that
698     all its members are valid.
699
700     """
701     self.skip_set = frozenset(self.op.skip_checks)
702     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
703       raise errors.OpPrereqError("Invalid checks to be skipped specified")
704
705   def BuildHooksEnv(self):
706     """Build hooks env.
707
708     Cluster-Verify hooks just rone in the post phase and their failure makes
709     the output be logged in the verify output and the verification to fail.
710
711     """
712     all_nodes = self.cfg.GetNodeList()
713     # TODO: populate the environment with useful information for verify hooks
714     env = {}
715     return env, [], all_nodes
716
717   def Exec(self, feedback_fn):
718     """Verify integrity of cluster, performing various test on nodes.
719
720     """
721     bad = False
722     feedback_fn("* Verifying global settings")
723     for msg in self.cfg.VerifyConfig():
724       feedback_fn("  - ERROR: %s" % msg)
725
726     vg_name = self.cfg.GetVGName()
727     nodelist = utils.NiceSort(self.cfg.GetNodeList())
728     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
729     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
730     i_non_redundant = [] # Non redundant instances
731     node_volume = {}
732     node_instance = {}
733     node_info = {}
734     instance_cfg = {}
735
736     # FIXME: verify OS list
737     # do local checksums
738     file_names = list(self.sstore.GetFileList())
739     file_names.append(constants.SSL_CERT_FILE)
740     file_names.append(constants.CLUSTER_CONF_FILE)
741     local_checksums = utils.FingerprintFiles(file_names)
742
743     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
744     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
745     all_instanceinfo = rpc.call_instance_list(nodelist)
746     all_vglist = rpc.call_vg_list(nodelist)
747     node_verify_param = {
748       'filelist': file_names,
749       'nodelist': nodelist,
750       'hypervisor': None,
751       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
752                         for node in nodeinfo]
753       }
754     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
755     all_rversion = rpc.call_version(nodelist)
756     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
757
758     for node in nodelist:
759       feedback_fn("* Verifying node %s" % node)
760       result = self._VerifyNode(node, file_names, local_checksums,
761                                 all_vglist[node], all_nvinfo[node],
762                                 all_rversion[node], feedback_fn)
763       bad = bad or result
764
765       # node_volume
766       volumeinfo = all_volumeinfo[node]
767
768       if isinstance(volumeinfo, basestring):
769         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
770                     (node, volumeinfo[-400:].encode('string_escape')))
771         bad = True
772         node_volume[node] = {}
773       elif not isinstance(volumeinfo, dict):
774         feedback_fn("  - ERROR: connection to %s failed" % (node,))
775         bad = True
776         continue
777       else:
778         node_volume[node] = volumeinfo
779
780       # node_instance
781       nodeinstance = all_instanceinfo[node]
782       if type(nodeinstance) != list:
783         feedback_fn("  - ERROR: connection to %s failed" % (node,))
784         bad = True
785         continue
786
787       node_instance[node] = nodeinstance
788
789       # node_info
790       nodeinfo = all_ninfo[node]
791       if not isinstance(nodeinfo, dict):
792         feedback_fn("  - ERROR: connection to %s failed" % (node,))
793         bad = True
794         continue
795
796       try:
797         node_info[node] = {
798           "mfree": int(nodeinfo['memory_free']),
799           "dfree": int(nodeinfo['vg_free']),
800           "pinst": [],
801           "sinst": [],
802           # dictionary holding all instances this node is secondary for,
803           # grouped by their primary node. Each key is a cluster node, and each
804           # value is a list of instances which have the key as primary and the
805           # current node as secondary.  this is handy to calculate N+1 memory
806           # availability if you can only failover from a primary to its
807           # secondary.
808           "sinst-by-pnode": {},
809         }
810       except ValueError:
811         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
812         bad = True
813         continue
814
815     node_vol_should = {}
816
817     for instance in instancelist:
818       feedback_fn("* Verifying instance %s" % instance)
819       inst_config = self.cfg.GetInstanceInfo(instance)
820       result =  self._VerifyInstance(instance, inst_config, node_volume,
821                                      node_instance, feedback_fn)
822       bad = bad or result
823
824       inst_config.MapLVsByNode(node_vol_should)
825
826       instance_cfg[instance] = inst_config
827
828       pnode = inst_config.primary_node
829       if pnode in node_info:
830         node_info[pnode]['pinst'].append(instance)
831       else:
832         feedback_fn("  - ERROR: instance %s, connection to primary node"
833                     " %s failed" % (instance, pnode))
834         bad = True
835
836       # If the instance is non-redundant we cannot survive losing its primary
837       # node, so we are not N+1 compliant. On the other hand we have no disk
838       # templates with more than one secondary so that situation is not well
839       # supported either.
840       # FIXME: does not support file-backed instances
841       if len(inst_config.secondary_nodes) == 0:
842         i_non_redundant.append(instance)
843       elif len(inst_config.secondary_nodes) > 1:
844         feedback_fn("  - WARNING: multiple secondaries for instance %s"
845                     % instance)
846
847       for snode in inst_config.secondary_nodes:
848         if snode in node_info:
849           node_info[snode]['sinst'].append(instance)
850           if pnode not in node_info[snode]['sinst-by-pnode']:
851             node_info[snode]['sinst-by-pnode'][pnode] = []
852           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
853         else:
854           feedback_fn("  - ERROR: instance %s, connection to secondary node"
855                       " %s failed" % (instance, snode))
856
857     feedback_fn("* Verifying orphan volumes")
858     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
859                                        feedback_fn)
860     bad = bad or result
861
862     feedback_fn("* Verifying remaining instances")
863     result = self._VerifyOrphanInstances(instancelist, node_instance,
864                                          feedback_fn)
865     bad = bad or result
866
867     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
868       feedback_fn("* Verifying N+1 Memory redundancy")
869       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
870       bad = bad or result
871
872     feedback_fn("* Other Notes")
873     if i_non_redundant:
874       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
875                   % len(i_non_redundant))
876
877     return not bad
878
879   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
880     """Analize the post-hooks' result, handle it, and send some
881     nicely-formatted feedback back to the user.
882
883     Args:
884       phase: the hooks phase that has just been run
885       hooks_results: the results of the multi-node hooks rpc call
886       feedback_fn: function to send feedback back to the caller
887       lu_result: previous Exec result
888
889     """
890     # We only really run POST phase hooks, and are only interested in
891     # their results
892     if phase == constants.HOOKS_PHASE_POST:
893       # Used to change hooks' output to proper indentation
894       indent_re = re.compile('^', re.M)
895       feedback_fn("* Hooks Results")
896       if not hooks_results:
897         feedback_fn("  - ERROR: general communication failure")
898         lu_result = 1
899       else:
900         for node_name in hooks_results:
901           show_node_header = True
902           res = hooks_results[node_name]
903           if res is False or not isinstance(res, list):
904             feedback_fn("    Communication failure")
905             lu_result = 1
906             continue
907           for script, hkr, output in res:
908             if hkr == constants.HKR_FAIL:
909               # The node header is only shown once, if there are
910               # failing hooks on that node
911               if show_node_header:
912                 feedback_fn("  Node %s:" % node_name)
913                 show_node_header = False
914               feedback_fn("    ERROR: Script %s failed, output:" % script)
915               output = indent_re.sub('      ', output)
916               feedback_fn("%s" % output)
917               lu_result = 1
918
919       return lu_result
920
921
922 class LUVerifyDisks(NoHooksLU):
923   """Verifies the cluster disks status.
924
925   """
926   _OP_REQP = []
927   REQ_BGL = False
928
929   def ExpandNames(self):
930     self.needed_locks = {
931       locking.LEVEL_NODE: locking.ALL_SET,
932       locking.LEVEL_INSTANCE: locking.ALL_SET,
933     }
934     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
935
936   def CheckPrereq(self):
937     """Check prerequisites.
938
939     This has no prerequisites.
940
941     """
942     pass
943
944   def Exec(self, feedback_fn):
945     """Verify integrity of cluster disks.
946
947     """
948     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
949
950     vg_name = self.cfg.GetVGName()
951     nodes = utils.NiceSort(self.cfg.GetNodeList())
952     instances = [self.cfg.GetInstanceInfo(name)
953                  for name in self.cfg.GetInstanceList()]
954
955     nv_dict = {}
956     for inst in instances:
957       inst_lvs = {}
958       if (inst.status != "up" or
959           inst.disk_template not in constants.DTS_NET_MIRROR):
960         continue
961       inst.MapLVsByNode(inst_lvs)
962       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
963       for node, vol_list in inst_lvs.iteritems():
964         for vol in vol_list:
965           nv_dict[(node, vol)] = inst
966
967     if not nv_dict:
968       return result
969
970     node_lvs = rpc.call_volume_list(nodes, vg_name)
971
972     to_act = set()
973     for node in nodes:
974       # node_volume
975       lvs = node_lvs[node]
976
977       if isinstance(lvs, basestring):
978         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
979         res_nlvm[node] = lvs
980       elif not isinstance(lvs, dict):
981         logger.Info("connection to node %s failed or invalid data returned" %
982                     (node,))
983         res_nodes.append(node)
984         continue
985
986       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
987         inst = nv_dict.pop((node, lv_name), None)
988         if (not lv_online and inst is not None
989             and inst.name not in res_instances):
990           res_instances.append(inst.name)
991
992     # any leftover items in nv_dict are missing LVs, let's arrange the
993     # data better
994     for key, inst in nv_dict.iteritems():
995       if inst.name not in res_missing:
996         res_missing[inst.name] = []
997       res_missing[inst.name].append(key)
998
999     return result
1000
1001
1002 class LURenameCluster(LogicalUnit):
1003   """Rename the cluster.
1004
1005   """
1006   HPATH = "cluster-rename"
1007   HTYPE = constants.HTYPE_CLUSTER
1008   _OP_REQP = ["name"]
1009   REQ_WSSTORE = True
1010
1011   def BuildHooksEnv(self):
1012     """Build hooks env.
1013
1014     """
1015     env = {
1016       "OP_TARGET": self.sstore.GetClusterName(),
1017       "NEW_NAME": self.op.name,
1018       }
1019     mn = self.sstore.GetMasterNode()
1020     return env, [mn], [mn]
1021
1022   def CheckPrereq(self):
1023     """Verify that the passed name is a valid one.
1024
1025     """
1026     hostname = utils.HostInfo(self.op.name)
1027
1028     new_name = hostname.name
1029     self.ip = new_ip = hostname.ip
1030     old_name = self.sstore.GetClusterName()
1031     old_ip = self.sstore.GetMasterIP()
1032     if new_name == old_name and new_ip == old_ip:
1033       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1034                                  " cluster has changed")
1035     if new_ip != old_ip:
1036       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1037         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1038                                    " reachable on the network. Aborting." %
1039                                    new_ip)
1040
1041     self.op.name = new_name
1042
1043   def Exec(self, feedback_fn):
1044     """Rename the cluster.
1045
1046     """
1047     clustername = self.op.name
1048     ip = self.ip
1049     ss = self.sstore
1050
1051     # shutdown the master IP
1052     master = ss.GetMasterNode()
1053     if not rpc.call_node_stop_master(master, False):
1054       raise errors.OpExecError("Could not disable the master role")
1055
1056     try:
1057       # modify the sstore
1058       ss.SetKey(ss.SS_MASTER_IP, ip)
1059       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1060
1061       # Distribute updated ss config to all nodes
1062       myself = self.cfg.GetNodeInfo(master)
1063       dist_nodes = self.cfg.GetNodeList()
1064       if myself.name in dist_nodes:
1065         dist_nodes.remove(myself.name)
1066
1067       logger.Debug("Copying updated ssconf data to all nodes")
1068       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1069         fname = ss.KeyToFilename(keyname)
1070         result = rpc.call_upload_file(dist_nodes, fname)
1071         for to_node in dist_nodes:
1072           if not result[to_node]:
1073             logger.Error("copy of file %s to node %s failed" %
1074                          (fname, to_node))
1075     finally:
1076       if not rpc.call_node_start_master(master, False):
1077         logger.Error("Could not re-enable the master role on the master,"
1078                      " please restart manually.")
1079
1080
1081 def _RecursiveCheckIfLVMBased(disk):
1082   """Check if the given disk or its children are lvm-based.
1083
1084   Args:
1085     disk: ganeti.objects.Disk object
1086
1087   Returns:
1088     boolean indicating whether a LD_LV dev_type was found or not
1089
1090   """
1091   if disk.children:
1092     for chdisk in disk.children:
1093       if _RecursiveCheckIfLVMBased(chdisk):
1094         return True
1095   return disk.dev_type == constants.LD_LV
1096
1097
1098 class LUSetClusterParams(LogicalUnit):
1099   """Change the parameters of the cluster.
1100
1101   """
1102   HPATH = "cluster-modify"
1103   HTYPE = constants.HTYPE_CLUSTER
1104   _OP_REQP = []
1105   REQ_BGL = False
1106
1107   def ExpandNames(self):
1108     # FIXME: in the future maybe other cluster params won't require checking on
1109     # all nodes to be modified.
1110     self.needed_locks = {
1111       locking.LEVEL_NODE: locking.ALL_SET,
1112     }
1113     self.share_locks[locking.LEVEL_NODE] = 1
1114
1115   def BuildHooksEnv(self):
1116     """Build hooks env.
1117
1118     """
1119     env = {
1120       "OP_TARGET": self.sstore.GetClusterName(),
1121       "NEW_VG_NAME": self.op.vg_name,
1122       }
1123     mn = self.sstore.GetMasterNode()
1124     return env, [mn], [mn]
1125
1126   def CheckPrereq(self):
1127     """Check prerequisites.
1128
1129     This checks whether the given params don't conflict and
1130     if the given volume group is valid.
1131
1132     """
1133     # FIXME: This only works because there is only one parameter that can be
1134     # changed or removed.
1135     if not self.op.vg_name:
1136       instances = self.cfg.GetAllInstancesInfo().values()
1137       for inst in instances:
1138         for disk in inst.disks:
1139           if _RecursiveCheckIfLVMBased(disk):
1140             raise errors.OpPrereqError("Cannot disable lvm storage while"
1141                                        " lvm-based instances exist")
1142
1143     # if vg_name not None, checks given volume group on all nodes
1144     if self.op.vg_name:
1145       node_list = self.acquired_locks[locking.LEVEL_NODE]
1146       vglist = rpc.call_vg_list(node_list)
1147       for node in node_list:
1148         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1149                                               constants.MIN_VG_SIZE)
1150         if vgstatus:
1151           raise errors.OpPrereqError("Error on node '%s': %s" %
1152                                      (node, vgstatus))
1153
1154   def Exec(self, feedback_fn):
1155     """Change the parameters of the cluster.
1156
1157     """
1158     if self.op.vg_name != self.cfg.GetVGName():
1159       self.cfg.SetVGName(self.op.vg_name)
1160     else:
1161       feedback_fn("Cluster LVM configuration already in desired"
1162                   " state, not changing")
1163
1164
1165 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1166   """Sleep and poll for an instance's disk to sync.
1167
1168   """
1169   if not instance.disks:
1170     return True
1171
1172   if not oneshot:
1173     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1174
1175   node = instance.primary_node
1176
1177   for dev in instance.disks:
1178     cfgw.SetDiskID(dev, node)
1179
1180   retries = 0
1181   while True:
1182     max_time = 0
1183     done = True
1184     cumul_degraded = False
1185     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1186     if not rstats:
1187       proc.LogWarning("Can't get any data from node %s" % node)
1188       retries += 1
1189       if retries >= 10:
1190         raise errors.RemoteError("Can't contact node %s for mirror data,"
1191                                  " aborting." % node)
1192       time.sleep(6)
1193       continue
1194     retries = 0
1195     for i in range(len(rstats)):
1196       mstat = rstats[i]
1197       if mstat is None:
1198         proc.LogWarning("Can't compute data for node %s/%s" %
1199                         (node, instance.disks[i].iv_name))
1200         continue
1201       # we ignore the ldisk parameter
1202       perc_done, est_time, is_degraded, _ = mstat
1203       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1204       if perc_done is not None:
1205         done = False
1206         if est_time is not None:
1207           rem_time = "%d estimated seconds remaining" % est_time
1208           max_time = est_time
1209         else:
1210           rem_time = "no time estimate"
1211         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1212                      (instance.disks[i].iv_name, perc_done, rem_time))
1213     if done or oneshot:
1214       break
1215
1216     time.sleep(min(60, max_time))
1217
1218   if done:
1219     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1220   return not cumul_degraded
1221
1222
1223 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1224   """Check that mirrors are not degraded.
1225
1226   The ldisk parameter, if True, will change the test from the
1227   is_degraded attribute (which represents overall non-ok status for
1228   the device(s)) to the ldisk (representing the local storage status).
1229
1230   """
1231   cfgw.SetDiskID(dev, node)
1232   if ldisk:
1233     idx = 6
1234   else:
1235     idx = 5
1236
1237   result = True
1238   if on_primary or dev.AssembleOnSecondary():
1239     rstats = rpc.call_blockdev_find(node, dev)
1240     if not rstats:
1241       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1242       result = False
1243     else:
1244       result = result and (not rstats[idx])
1245   if dev.children:
1246     for child in dev.children:
1247       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1248
1249   return result
1250
1251
1252 class LUDiagnoseOS(NoHooksLU):
1253   """Logical unit for OS diagnose/query.
1254
1255   """
1256   _OP_REQP = ["output_fields", "names"]
1257   REQ_BGL = False
1258
1259   def ExpandNames(self):
1260     if self.op.names:
1261       raise errors.OpPrereqError("Selective OS query not supported")
1262
1263     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1264     _CheckOutputFields(static=[],
1265                        dynamic=self.dynamic_fields,
1266                        selected=self.op.output_fields)
1267
1268     # Lock all nodes, in shared mode
1269     self.needed_locks = {}
1270     self.share_locks[locking.LEVEL_NODE] = 1
1271     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1272
1273   def CheckPrereq(self):
1274     """Check prerequisites.
1275
1276     """
1277
1278   @staticmethod
1279   def _DiagnoseByOS(node_list, rlist):
1280     """Remaps a per-node return list into an a per-os per-node dictionary
1281
1282       Args:
1283         node_list: a list with the names of all nodes
1284         rlist: a map with node names as keys and OS objects as values
1285
1286       Returns:
1287         map: a map with osnames as keys and as value another map, with
1288              nodes as
1289              keys and list of OS objects as values
1290              e.g. {"debian-etch": {"node1": [<object>,...],
1291                                    "node2": [<object>,]}
1292                   }
1293
1294     """
1295     all_os = {}
1296     for node_name, nr in rlist.iteritems():
1297       if not nr:
1298         continue
1299       for os_obj in nr:
1300         if os_obj.name not in all_os:
1301           # build a list of nodes for this os containing empty lists
1302           # for each node in node_list
1303           all_os[os_obj.name] = {}
1304           for nname in node_list:
1305             all_os[os_obj.name][nname] = []
1306         all_os[os_obj.name][node_name].append(os_obj)
1307     return all_os
1308
1309   def Exec(self, feedback_fn):
1310     """Compute the list of OSes.
1311
1312     """
1313     node_list = self.acquired_locks[locking.LEVEL_NODE]
1314     node_data = rpc.call_os_diagnose(node_list)
1315     if node_data == False:
1316       raise errors.OpExecError("Can't gather the list of OSes")
1317     pol = self._DiagnoseByOS(node_list, node_data)
1318     output = []
1319     for os_name, os_data in pol.iteritems():
1320       row = []
1321       for field in self.op.output_fields:
1322         if field == "name":
1323           val = os_name
1324         elif field == "valid":
1325           val = utils.all([osl and osl[0] for osl in os_data.values()])
1326         elif field == "node_status":
1327           val = {}
1328           for node_name, nos_list in os_data.iteritems():
1329             val[node_name] = [(v.status, v.path) for v in nos_list]
1330         else:
1331           raise errors.ParameterError(field)
1332         row.append(val)
1333       output.append(row)
1334
1335     return output
1336
1337
1338 class LURemoveNode(LogicalUnit):
1339   """Logical unit for removing a node.
1340
1341   """
1342   HPATH = "node-remove"
1343   HTYPE = constants.HTYPE_NODE
1344   _OP_REQP = ["node_name"]
1345
1346   def BuildHooksEnv(self):
1347     """Build hooks env.
1348
1349     This doesn't run on the target node in the pre phase as a failed
1350     node would then be impossible to remove.
1351
1352     """
1353     env = {
1354       "OP_TARGET": self.op.node_name,
1355       "NODE_NAME": self.op.node_name,
1356       }
1357     all_nodes = self.cfg.GetNodeList()
1358     all_nodes.remove(self.op.node_name)
1359     return env, all_nodes, all_nodes
1360
1361   def CheckPrereq(self):
1362     """Check prerequisites.
1363
1364     This checks:
1365      - the node exists in the configuration
1366      - it does not have primary or secondary instances
1367      - it's not the master
1368
1369     Any errors are signalled by raising errors.OpPrereqError.
1370
1371     """
1372     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1373     if node is None:
1374       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1375
1376     instance_list = self.cfg.GetInstanceList()
1377
1378     masternode = self.sstore.GetMasterNode()
1379     if node.name == masternode:
1380       raise errors.OpPrereqError("Node is the master node,"
1381                                  " you need to failover first.")
1382
1383     for instance_name in instance_list:
1384       instance = self.cfg.GetInstanceInfo(instance_name)
1385       if node.name == instance.primary_node:
1386         raise errors.OpPrereqError("Instance %s still running on the node,"
1387                                    " please remove first." % instance_name)
1388       if node.name in instance.secondary_nodes:
1389         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1390                                    " please remove first." % instance_name)
1391     self.op.node_name = node.name
1392     self.node = node
1393
1394   def Exec(self, feedback_fn):
1395     """Removes the node from the cluster.
1396
1397     """
1398     node = self.node
1399     logger.Info("stopping the node daemon and removing configs from node %s" %
1400                 node.name)
1401
1402     self.context.RemoveNode(node.name)
1403
1404     rpc.call_node_leave_cluster(node.name)
1405
1406
1407 class LUQueryNodes(NoHooksLU):
1408   """Logical unit for querying nodes.
1409
1410   """
1411   _OP_REQP = ["output_fields", "names"]
1412   REQ_BGL = False
1413
1414   def ExpandNames(self):
1415     self.dynamic_fields = frozenset([
1416       "dtotal", "dfree",
1417       "mtotal", "mnode", "mfree",
1418       "bootid",
1419       "ctotal",
1420       ])
1421
1422     self.static_fields = frozenset([
1423       "name", "pinst_cnt", "sinst_cnt",
1424       "pinst_list", "sinst_list",
1425       "pip", "sip", "tags",
1426       "serial_no",
1427       ])
1428
1429     _CheckOutputFields(static=self.static_fields,
1430                        dynamic=self.dynamic_fields,
1431                        selected=self.op.output_fields)
1432
1433     self.needed_locks = {}
1434     self.share_locks[locking.LEVEL_NODE] = 1
1435
1436     if self.op.names:
1437       self.wanted = _GetWantedNodes(self, self.op.names)
1438     else:
1439       self.wanted = locking.ALL_SET
1440
1441     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1442     if self.do_locking:
1443       # if we don't request only static fields, we need to lock the nodes
1444       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1445
1446
1447   def CheckPrereq(self):
1448     """Check prerequisites.
1449
1450     """
1451     # The validation of the node list is done in the _GetWantedNodes,
1452     # if non empty, and if empty, there's no validation to do
1453     pass
1454
1455   def Exec(self, feedback_fn):
1456     """Computes the list of nodes and their attributes.
1457
1458     """
1459     all_info = self.cfg.GetAllNodesInfo()
1460     if self.do_locking:
1461       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1462     elif self.wanted != locking.ALL_SET:
1463       nodenames = self.wanted
1464       missing = set(nodenames).difference(all_info.keys())
1465       if missing:
1466         raise self.OpExecError(
1467           "Some nodes were removed before retrieving their data: %s" % missing)
1468     else:
1469       nodenames = all_info.keys()
1470     nodelist = [all_info[name] for name in nodenames]
1471
1472     # begin data gathering
1473
1474     if self.dynamic_fields.intersection(self.op.output_fields):
1475       live_data = {}
1476       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1477       for name in nodenames:
1478         nodeinfo = node_data.get(name, None)
1479         if nodeinfo:
1480           live_data[name] = {
1481             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1482             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1483             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1484             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1485             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1486             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1487             "bootid": nodeinfo['bootid'],
1488             }
1489         else:
1490           live_data[name] = {}
1491     else:
1492       live_data = dict.fromkeys(nodenames, {})
1493
1494     node_to_primary = dict([(name, set()) for name in nodenames])
1495     node_to_secondary = dict([(name, set()) for name in nodenames])
1496
1497     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1498                              "sinst_cnt", "sinst_list"))
1499     if inst_fields & frozenset(self.op.output_fields):
1500       instancelist = self.cfg.GetInstanceList()
1501
1502       for instance_name in instancelist:
1503         inst = self.cfg.GetInstanceInfo(instance_name)
1504         if inst.primary_node in node_to_primary:
1505           node_to_primary[inst.primary_node].add(inst.name)
1506         for secnode in inst.secondary_nodes:
1507           if secnode in node_to_secondary:
1508             node_to_secondary[secnode].add(inst.name)
1509
1510     # end data gathering
1511
1512     output = []
1513     for node in nodelist:
1514       node_output = []
1515       for field in self.op.output_fields:
1516         if field == "name":
1517           val = node.name
1518         elif field == "pinst_list":
1519           val = list(node_to_primary[node.name])
1520         elif field == "sinst_list":
1521           val = list(node_to_secondary[node.name])
1522         elif field == "pinst_cnt":
1523           val = len(node_to_primary[node.name])
1524         elif field == "sinst_cnt":
1525           val = len(node_to_secondary[node.name])
1526         elif field == "pip":
1527           val = node.primary_ip
1528         elif field == "sip":
1529           val = node.secondary_ip
1530         elif field == "tags":
1531           val = list(node.GetTags())
1532         elif field == "serial_no":
1533           val = node.serial_no
1534         elif field in self.dynamic_fields:
1535           val = live_data[node.name].get(field, None)
1536         else:
1537           raise errors.ParameterError(field)
1538         node_output.append(val)
1539       output.append(node_output)
1540
1541     return output
1542
1543
1544 class LUQueryNodeVolumes(NoHooksLU):
1545   """Logical unit for getting volumes on node(s).
1546
1547   """
1548   _OP_REQP = ["nodes", "output_fields"]
1549   REQ_BGL = False
1550
1551   def ExpandNames(self):
1552     _CheckOutputFields(static=["node"],
1553                        dynamic=["phys", "vg", "name", "size", "instance"],
1554                        selected=self.op.output_fields)
1555
1556     self.needed_locks = {}
1557     self.share_locks[locking.LEVEL_NODE] = 1
1558     if not self.op.nodes:
1559       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1560     else:
1561       self.needed_locks[locking.LEVEL_NODE] = \
1562         _GetWantedNodes(self, self.op.nodes)
1563
1564   def CheckPrereq(self):
1565     """Check prerequisites.
1566
1567     This checks that the fields required are valid output fields.
1568
1569     """
1570     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1571
1572   def Exec(self, feedback_fn):
1573     """Computes the list of nodes and their attributes.
1574
1575     """
1576     nodenames = self.nodes
1577     volumes = rpc.call_node_volumes(nodenames)
1578
1579     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1580              in self.cfg.GetInstanceList()]
1581
1582     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1583
1584     output = []
1585     for node in nodenames:
1586       if node not in volumes or not volumes[node]:
1587         continue
1588
1589       node_vols = volumes[node][:]
1590       node_vols.sort(key=lambda vol: vol['dev'])
1591
1592       for vol in node_vols:
1593         node_output = []
1594         for field in self.op.output_fields:
1595           if field == "node":
1596             val = node
1597           elif field == "phys":
1598             val = vol['dev']
1599           elif field == "vg":
1600             val = vol['vg']
1601           elif field == "name":
1602             val = vol['name']
1603           elif field == "size":
1604             val = int(float(vol['size']))
1605           elif field == "instance":
1606             for inst in ilist:
1607               if node not in lv_by_node[inst]:
1608                 continue
1609               if vol['name'] in lv_by_node[inst][node]:
1610                 val = inst.name
1611                 break
1612             else:
1613               val = '-'
1614           else:
1615             raise errors.ParameterError(field)
1616           node_output.append(str(val))
1617
1618         output.append(node_output)
1619
1620     return output
1621
1622
1623 class LUAddNode(LogicalUnit):
1624   """Logical unit for adding node to the cluster.
1625
1626   """
1627   HPATH = "node-add"
1628   HTYPE = constants.HTYPE_NODE
1629   _OP_REQP = ["node_name"]
1630
1631   def BuildHooksEnv(self):
1632     """Build hooks env.
1633
1634     This will run on all nodes before, and on all nodes + the new node after.
1635
1636     """
1637     env = {
1638       "OP_TARGET": self.op.node_name,
1639       "NODE_NAME": self.op.node_name,
1640       "NODE_PIP": self.op.primary_ip,
1641       "NODE_SIP": self.op.secondary_ip,
1642       }
1643     nodes_0 = self.cfg.GetNodeList()
1644     nodes_1 = nodes_0 + [self.op.node_name, ]
1645     return env, nodes_0, nodes_1
1646
1647   def CheckPrereq(self):
1648     """Check prerequisites.
1649
1650     This checks:
1651      - the new node is not already in the config
1652      - it is resolvable
1653      - its parameters (single/dual homed) matches the cluster
1654
1655     Any errors are signalled by raising errors.OpPrereqError.
1656
1657     """
1658     node_name = self.op.node_name
1659     cfg = self.cfg
1660
1661     dns_data = utils.HostInfo(node_name)
1662
1663     node = dns_data.name
1664     primary_ip = self.op.primary_ip = dns_data.ip
1665     secondary_ip = getattr(self.op, "secondary_ip", None)
1666     if secondary_ip is None:
1667       secondary_ip = primary_ip
1668     if not utils.IsValidIP(secondary_ip):
1669       raise errors.OpPrereqError("Invalid secondary IP given")
1670     self.op.secondary_ip = secondary_ip
1671
1672     node_list = cfg.GetNodeList()
1673     if not self.op.readd and node in node_list:
1674       raise errors.OpPrereqError("Node %s is already in the configuration" %
1675                                  node)
1676     elif self.op.readd and node not in node_list:
1677       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1678
1679     for existing_node_name in node_list:
1680       existing_node = cfg.GetNodeInfo(existing_node_name)
1681
1682       if self.op.readd and node == existing_node_name:
1683         if (existing_node.primary_ip != primary_ip or
1684             existing_node.secondary_ip != secondary_ip):
1685           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1686                                      " address configuration as before")
1687         continue
1688
1689       if (existing_node.primary_ip == primary_ip or
1690           existing_node.secondary_ip == primary_ip or
1691           existing_node.primary_ip == secondary_ip or
1692           existing_node.secondary_ip == secondary_ip):
1693         raise errors.OpPrereqError("New node ip address(es) conflict with"
1694                                    " existing node %s" % existing_node.name)
1695
1696     # check that the type of the node (single versus dual homed) is the
1697     # same as for the master
1698     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1699     master_singlehomed = myself.secondary_ip == myself.primary_ip
1700     newbie_singlehomed = secondary_ip == primary_ip
1701     if master_singlehomed != newbie_singlehomed:
1702       if master_singlehomed:
1703         raise errors.OpPrereqError("The master has no private ip but the"
1704                                    " new node has one")
1705       else:
1706         raise errors.OpPrereqError("The master has a private ip but the"
1707                                    " new node doesn't have one")
1708
1709     # checks reachablity
1710     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1711       raise errors.OpPrereqError("Node not reachable by ping")
1712
1713     if not newbie_singlehomed:
1714       # check reachability from my secondary ip to newbie's secondary ip
1715       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1716                            source=myself.secondary_ip):
1717         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1718                                    " based ping to noded port")
1719
1720     self.new_node = objects.Node(name=node,
1721                                  primary_ip=primary_ip,
1722                                  secondary_ip=secondary_ip)
1723
1724   def Exec(self, feedback_fn):
1725     """Adds the new node to the cluster.
1726
1727     """
1728     new_node = self.new_node
1729     node = new_node.name
1730
1731     # check connectivity
1732     result = rpc.call_version([node])[node]
1733     if result:
1734       if constants.PROTOCOL_VERSION == result:
1735         logger.Info("communication to node %s fine, sw version %s match" %
1736                     (node, result))
1737       else:
1738         raise errors.OpExecError("Version mismatch master version %s,"
1739                                  " node version %s" %
1740                                  (constants.PROTOCOL_VERSION, result))
1741     else:
1742       raise errors.OpExecError("Cannot get version from the new node")
1743
1744     # setup ssh on node
1745     logger.Info("copy ssh key to node %s" % node)
1746     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1747     keyarray = []
1748     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1749                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1750                 priv_key, pub_key]
1751
1752     for i in keyfiles:
1753       f = open(i, 'r')
1754       try:
1755         keyarray.append(f.read())
1756       finally:
1757         f.close()
1758
1759     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1760                                keyarray[3], keyarray[4], keyarray[5])
1761
1762     if not result:
1763       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1764
1765     # Add node to our /etc/hosts, and add key to known_hosts
1766     utils.AddHostToEtcHosts(new_node.name)
1767
1768     if new_node.secondary_ip != new_node.primary_ip:
1769       if not rpc.call_node_tcp_ping(new_node.name,
1770                                     constants.LOCALHOST_IP_ADDRESS,
1771                                     new_node.secondary_ip,
1772                                     constants.DEFAULT_NODED_PORT,
1773                                     10, False):
1774         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1775                                  " you gave (%s). Please fix and re-run this"
1776                                  " command." % new_node.secondary_ip)
1777
1778     node_verify_list = [self.sstore.GetMasterNode()]
1779     node_verify_param = {
1780       'nodelist': [node],
1781       # TODO: do a node-net-test as well?
1782     }
1783
1784     result = rpc.call_node_verify(node_verify_list, node_verify_param)
1785     for verifier in node_verify_list:
1786       if not result[verifier]:
1787         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1788                                  " for remote verification" % verifier)
1789       if result[verifier]['nodelist']:
1790         for failed in result[verifier]['nodelist']:
1791           feedback_fn("ssh/hostname verification failed %s -> %s" %
1792                       (verifier, result[verifier]['nodelist'][failed]))
1793         raise errors.OpExecError("ssh/hostname verification failed.")
1794
1795     # Distribute updated /etc/hosts and known_hosts to all nodes,
1796     # including the node just added
1797     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1798     dist_nodes = self.cfg.GetNodeList()
1799     if not self.op.readd:
1800       dist_nodes.append(node)
1801     if myself.name in dist_nodes:
1802       dist_nodes.remove(myself.name)
1803
1804     logger.Debug("Copying hosts and known_hosts to all nodes")
1805     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1806       result = rpc.call_upload_file(dist_nodes, fname)
1807       for to_node in dist_nodes:
1808         if not result[to_node]:
1809           logger.Error("copy of file %s to node %s failed" %
1810                        (fname, to_node))
1811
1812     to_copy = self.sstore.GetFileList()
1813     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1814       to_copy.append(constants.VNC_PASSWORD_FILE)
1815     for fname in to_copy:
1816       result = rpc.call_upload_file([node], fname)
1817       if not result[node]:
1818         logger.Error("could not copy file %s to node %s" % (fname, node))
1819
1820     if self.op.readd:
1821       self.context.ReaddNode(new_node)
1822     else:
1823       self.context.AddNode(new_node)
1824
1825
1826 class LUQueryClusterInfo(NoHooksLU):
1827   """Query cluster configuration.
1828
1829   """
1830   _OP_REQP = []
1831   REQ_MASTER = False
1832   REQ_BGL = False
1833
1834   def ExpandNames(self):
1835     self.needed_locks = {}
1836
1837   def CheckPrereq(self):
1838     """No prerequsites needed for this LU.
1839
1840     """
1841     pass
1842
1843   def Exec(self, feedback_fn):
1844     """Return cluster config.
1845
1846     """
1847     result = {
1848       "name": self.sstore.GetClusterName(),
1849       "software_version": constants.RELEASE_VERSION,
1850       "protocol_version": constants.PROTOCOL_VERSION,
1851       "config_version": constants.CONFIG_VERSION,
1852       "os_api_version": constants.OS_API_VERSION,
1853       "export_version": constants.EXPORT_VERSION,
1854       "master": self.sstore.GetMasterNode(),
1855       "architecture": (platform.architecture()[0], platform.machine()),
1856       "hypervisor_type": self.sstore.GetHypervisorType(),
1857       }
1858
1859     return result
1860
1861
1862 class LUDumpClusterConfig(NoHooksLU):
1863   """Return a text-representation of the cluster-config.
1864
1865   """
1866   _OP_REQP = []
1867   REQ_BGL = False
1868
1869   def ExpandNames(self):
1870     self.needed_locks = {}
1871
1872   def CheckPrereq(self):
1873     """No prerequisites.
1874
1875     """
1876     pass
1877
1878   def Exec(self, feedback_fn):
1879     """Dump a representation of the cluster config to the standard output.
1880
1881     """
1882     return self.cfg.DumpConfig()
1883
1884
1885 class LUActivateInstanceDisks(NoHooksLU):
1886   """Bring up an instance's disks.
1887
1888   """
1889   _OP_REQP = ["instance_name"]
1890   REQ_BGL = False
1891
1892   def ExpandNames(self):
1893     self._ExpandAndLockInstance()
1894     self.needed_locks[locking.LEVEL_NODE] = []
1895     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1896
1897   def DeclareLocks(self, level):
1898     if level == locking.LEVEL_NODE:
1899       self._LockInstancesNodes()
1900
1901   def CheckPrereq(self):
1902     """Check prerequisites.
1903
1904     This checks that the instance is in the cluster.
1905
1906     """
1907     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1908     assert self.instance is not None, \
1909       "Cannot retrieve locked instance %s" % self.op.instance_name
1910
1911   def Exec(self, feedback_fn):
1912     """Activate the disks.
1913
1914     """
1915     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1916     if not disks_ok:
1917       raise errors.OpExecError("Cannot activate block devices")
1918
1919     return disks_info
1920
1921
1922 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1923   """Prepare the block devices for an instance.
1924
1925   This sets up the block devices on all nodes.
1926
1927   Args:
1928     instance: a ganeti.objects.Instance object
1929     ignore_secondaries: if true, errors on secondary nodes won't result
1930                         in an error return from the function
1931
1932   Returns:
1933     false if the operation failed
1934     list of (host, instance_visible_name, node_visible_name) if the operation
1935          suceeded with the mapping from node devices to instance devices
1936   """
1937   device_info = []
1938   disks_ok = True
1939   iname = instance.name
1940   # With the two passes mechanism we try to reduce the window of
1941   # opportunity for the race condition of switching DRBD to primary
1942   # before handshaking occured, but we do not eliminate it
1943
1944   # The proper fix would be to wait (with some limits) until the
1945   # connection has been made and drbd transitions from WFConnection
1946   # into any other network-connected state (Connected, SyncTarget,
1947   # SyncSource, etc.)
1948
1949   # 1st pass, assemble on all nodes in secondary mode
1950   for inst_disk in instance.disks:
1951     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1952       cfg.SetDiskID(node_disk, node)
1953       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1954       if not result:
1955         logger.Error("could not prepare block device %s on node %s"
1956                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1957         if not ignore_secondaries:
1958           disks_ok = False
1959
1960   # FIXME: race condition on drbd migration to primary
1961
1962   # 2nd pass, do only the primary node
1963   for inst_disk in instance.disks:
1964     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1965       if node != instance.primary_node:
1966         continue
1967       cfg.SetDiskID(node_disk, node)
1968       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1969       if not result:
1970         logger.Error("could not prepare block device %s on node %s"
1971                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1972         disks_ok = False
1973     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1974
1975   # leave the disks configured for the primary node
1976   # this is a workaround that would be fixed better by
1977   # improving the logical/physical id handling
1978   for disk in instance.disks:
1979     cfg.SetDiskID(disk, instance.primary_node)
1980
1981   return disks_ok, device_info
1982
1983
1984 def _StartInstanceDisks(cfg, instance, force):
1985   """Start the disks of an instance.
1986
1987   """
1988   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1989                                            ignore_secondaries=force)
1990   if not disks_ok:
1991     _ShutdownInstanceDisks(instance, cfg)
1992     if force is not None and not force:
1993       logger.Error("If the message above refers to a secondary node,"
1994                    " you can retry the operation using '--force'.")
1995     raise errors.OpExecError("Disk consistency error")
1996
1997
1998 class LUDeactivateInstanceDisks(NoHooksLU):
1999   """Shutdown an instance's disks.
2000
2001   """
2002   _OP_REQP = ["instance_name"]
2003   REQ_BGL = False
2004
2005   def ExpandNames(self):
2006     self._ExpandAndLockInstance()
2007     self.needed_locks[locking.LEVEL_NODE] = []
2008     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2009
2010   def DeclareLocks(self, level):
2011     if level == locking.LEVEL_NODE:
2012       self._LockInstancesNodes()
2013
2014   def CheckPrereq(self):
2015     """Check prerequisites.
2016
2017     This checks that the instance is in the cluster.
2018
2019     """
2020     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2021     assert self.instance is not None, \
2022       "Cannot retrieve locked instance %s" % self.op.instance_name
2023
2024   def Exec(self, feedback_fn):
2025     """Deactivate the disks
2026
2027     """
2028     instance = self.instance
2029     _SafeShutdownInstanceDisks(instance, self.cfg)
2030
2031
2032 def _SafeShutdownInstanceDisks(instance, cfg):
2033   """Shutdown block devices of an instance.
2034
2035   This function checks if an instance is running, before calling
2036   _ShutdownInstanceDisks.
2037
2038   """
2039   ins_l = rpc.call_instance_list([instance.primary_node])
2040   ins_l = ins_l[instance.primary_node]
2041   if not type(ins_l) is list:
2042     raise errors.OpExecError("Can't contact node '%s'" %
2043                              instance.primary_node)
2044
2045   if instance.name in ins_l:
2046     raise errors.OpExecError("Instance is running, can't shutdown"
2047                              " block devices.")
2048
2049   _ShutdownInstanceDisks(instance, cfg)
2050
2051
2052 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2053   """Shutdown block devices of an instance.
2054
2055   This does the shutdown on all nodes of the instance.
2056
2057   If the ignore_primary is false, errors on the primary node are
2058   ignored.
2059
2060   """
2061   result = True
2062   for disk in instance.disks:
2063     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2064       cfg.SetDiskID(top_disk, node)
2065       if not rpc.call_blockdev_shutdown(node, top_disk):
2066         logger.Error("could not shutdown block device %s on node %s" %
2067                      (disk.iv_name, node))
2068         if not ignore_primary or node != instance.primary_node:
2069           result = False
2070   return result
2071
2072
2073 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2074   """Checks if a node has enough free memory.
2075
2076   This function check if a given node has the needed amount of free
2077   memory. In case the node has less memory or we cannot get the
2078   information from the node, this function raise an OpPrereqError
2079   exception.
2080
2081   Args:
2082     - cfg: a ConfigWriter instance
2083     - node: the node name
2084     - reason: string to use in the error message
2085     - requested: the amount of memory in MiB
2086
2087   """
2088   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2089   if not nodeinfo or not isinstance(nodeinfo, dict):
2090     raise errors.OpPrereqError("Could not contact node %s for resource"
2091                              " information" % (node,))
2092
2093   free_mem = nodeinfo[node].get('memory_free')
2094   if not isinstance(free_mem, int):
2095     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2096                              " was '%s'" % (node, free_mem))
2097   if requested > free_mem:
2098     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2099                              " needed %s MiB, available %s MiB" %
2100                              (node, reason, requested, free_mem))
2101
2102
2103 class LUStartupInstance(LogicalUnit):
2104   """Starts an instance.
2105
2106   """
2107   HPATH = "instance-start"
2108   HTYPE = constants.HTYPE_INSTANCE
2109   _OP_REQP = ["instance_name", "force"]
2110   REQ_BGL = False
2111
2112   def ExpandNames(self):
2113     self._ExpandAndLockInstance()
2114     self.needed_locks[locking.LEVEL_NODE] = []
2115     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2116
2117   def DeclareLocks(self, level):
2118     if level == locking.LEVEL_NODE:
2119       self._LockInstancesNodes()
2120
2121   def BuildHooksEnv(self):
2122     """Build hooks env.
2123
2124     This runs on master, primary and secondary nodes of the instance.
2125
2126     """
2127     env = {
2128       "FORCE": self.op.force,
2129       }
2130     env.update(_BuildInstanceHookEnvByObject(self.instance))
2131     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2132           list(self.instance.secondary_nodes))
2133     return env, nl, nl
2134
2135   def CheckPrereq(self):
2136     """Check prerequisites.
2137
2138     This checks that the instance is in the cluster.
2139
2140     """
2141     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2142     assert self.instance is not None, \
2143       "Cannot retrieve locked instance %s" % self.op.instance_name
2144
2145     # check bridges existance
2146     _CheckInstanceBridgesExist(instance)
2147
2148     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2149                          "starting instance %s" % instance.name,
2150                          instance.memory)
2151
2152   def Exec(self, feedback_fn):
2153     """Start the instance.
2154
2155     """
2156     instance = self.instance
2157     force = self.op.force
2158     extra_args = getattr(self.op, "extra_args", "")
2159
2160     self.cfg.MarkInstanceUp(instance.name)
2161
2162     node_current = instance.primary_node
2163
2164     _StartInstanceDisks(self.cfg, instance, force)
2165
2166     if not rpc.call_instance_start(node_current, instance, extra_args):
2167       _ShutdownInstanceDisks(instance, self.cfg)
2168       raise errors.OpExecError("Could not start instance")
2169
2170
2171 class LURebootInstance(LogicalUnit):
2172   """Reboot an instance.
2173
2174   """
2175   HPATH = "instance-reboot"
2176   HTYPE = constants.HTYPE_INSTANCE
2177   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2178   REQ_BGL = False
2179
2180   def ExpandNames(self):
2181     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2182                                    constants.INSTANCE_REBOOT_HARD,
2183                                    constants.INSTANCE_REBOOT_FULL]:
2184       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2185                                   (constants.INSTANCE_REBOOT_SOFT,
2186                                    constants.INSTANCE_REBOOT_HARD,
2187                                    constants.INSTANCE_REBOOT_FULL))
2188     self._ExpandAndLockInstance()
2189     self.needed_locks[locking.LEVEL_NODE] = []
2190     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2191
2192   def DeclareLocks(self, level):
2193     if level == locking.LEVEL_NODE:
2194       primary_only = not constants.INSTANCE_REBOOT_FULL
2195       self._LockInstancesNodes(primary_only=primary_only)
2196
2197   def BuildHooksEnv(self):
2198     """Build hooks env.
2199
2200     This runs on master, primary and secondary nodes of the instance.
2201
2202     """
2203     env = {
2204       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2205       }
2206     env.update(_BuildInstanceHookEnvByObject(self.instance))
2207     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2208           list(self.instance.secondary_nodes))
2209     return env, nl, nl
2210
2211   def CheckPrereq(self):
2212     """Check prerequisites.
2213
2214     This checks that the instance is in the cluster.
2215
2216     """
2217     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2218     assert self.instance is not None, \
2219       "Cannot retrieve locked instance %s" % self.op.instance_name
2220
2221     # check bridges existance
2222     _CheckInstanceBridgesExist(instance)
2223
2224   def Exec(self, feedback_fn):
2225     """Reboot the instance.
2226
2227     """
2228     instance = self.instance
2229     ignore_secondaries = self.op.ignore_secondaries
2230     reboot_type = self.op.reboot_type
2231     extra_args = getattr(self.op, "extra_args", "")
2232
2233     node_current = instance.primary_node
2234
2235     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2236                        constants.INSTANCE_REBOOT_HARD]:
2237       if not rpc.call_instance_reboot(node_current, instance,
2238                                       reboot_type, extra_args):
2239         raise errors.OpExecError("Could not reboot instance")
2240     else:
2241       if not rpc.call_instance_shutdown(node_current, instance):
2242         raise errors.OpExecError("could not shutdown instance for full reboot")
2243       _ShutdownInstanceDisks(instance, self.cfg)
2244       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2245       if not rpc.call_instance_start(node_current, instance, extra_args):
2246         _ShutdownInstanceDisks(instance, self.cfg)
2247         raise errors.OpExecError("Could not start instance for full reboot")
2248
2249     self.cfg.MarkInstanceUp(instance.name)
2250
2251
2252 class LUShutdownInstance(LogicalUnit):
2253   """Shutdown an instance.
2254
2255   """
2256   HPATH = "instance-stop"
2257   HTYPE = constants.HTYPE_INSTANCE
2258   _OP_REQP = ["instance_name"]
2259   REQ_BGL = False
2260
2261   def ExpandNames(self):
2262     self._ExpandAndLockInstance()
2263     self.needed_locks[locking.LEVEL_NODE] = []
2264     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2265
2266   def DeclareLocks(self, level):
2267     if level == locking.LEVEL_NODE:
2268       self._LockInstancesNodes()
2269
2270   def BuildHooksEnv(self):
2271     """Build hooks env.
2272
2273     This runs on master, primary and secondary nodes of the instance.
2274
2275     """
2276     env = _BuildInstanceHookEnvByObject(self.instance)
2277     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2278           list(self.instance.secondary_nodes))
2279     return env, nl, nl
2280
2281   def CheckPrereq(self):
2282     """Check prerequisites.
2283
2284     This checks that the instance is in the cluster.
2285
2286     """
2287     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2288     assert self.instance is not None, \
2289       "Cannot retrieve locked instance %s" % self.op.instance_name
2290
2291   def Exec(self, feedback_fn):
2292     """Shutdown the instance.
2293
2294     """
2295     instance = self.instance
2296     node_current = instance.primary_node
2297     self.cfg.MarkInstanceDown(instance.name)
2298     if not rpc.call_instance_shutdown(node_current, instance):
2299       logger.Error("could not shutdown instance")
2300
2301     _ShutdownInstanceDisks(instance, self.cfg)
2302
2303
2304 class LUReinstallInstance(LogicalUnit):
2305   """Reinstall an instance.
2306
2307   """
2308   HPATH = "instance-reinstall"
2309   HTYPE = constants.HTYPE_INSTANCE
2310   _OP_REQP = ["instance_name"]
2311   REQ_BGL = False
2312
2313   def ExpandNames(self):
2314     self._ExpandAndLockInstance()
2315     self.needed_locks[locking.LEVEL_NODE] = []
2316     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2317
2318   def DeclareLocks(self, level):
2319     if level == locking.LEVEL_NODE:
2320       self._LockInstancesNodes()
2321
2322   def BuildHooksEnv(self):
2323     """Build hooks env.
2324
2325     This runs on master, primary and secondary nodes of the instance.
2326
2327     """
2328     env = _BuildInstanceHookEnvByObject(self.instance)
2329     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2330           list(self.instance.secondary_nodes))
2331     return env, nl, nl
2332
2333   def CheckPrereq(self):
2334     """Check prerequisites.
2335
2336     This checks that the instance is in the cluster and is not running.
2337
2338     """
2339     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2340     assert instance is not None, \
2341       "Cannot retrieve locked instance %s" % self.op.instance_name
2342
2343     if instance.disk_template == constants.DT_DISKLESS:
2344       raise errors.OpPrereqError("Instance '%s' has no disks" %
2345                                  self.op.instance_name)
2346     if instance.status != "down":
2347       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2348                                  self.op.instance_name)
2349     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2350     if remote_info:
2351       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2352                                  (self.op.instance_name,
2353                                   instance.primary_node))
2354
2355     self.op.os_type = getattr(self.op, "os_type", None)
2356     if self.op.os_type is not None:
2357       # OS verification
2358       pnode = self.cfg.GetNodeInfo(
2359         self.cfg.ExpandNodeName(instance.primary_node))
2360       if pnode is None:
2361         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2362                                    self.op.pnode)
2363       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2364       if not os_obj:
2365         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2366                                    " primary node"  % self.op.os_type)
2367
2368     self.instance = instance
2369
2370   def Exec(self, feedback_fn):
2371     """Reinstall the instance.
2372
2373     """
2374     inst = self.instance
2375
2376     if self.op.os_type is not None:
2377       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2378       inst.os = self.op.os_type
2379       self.cfg.Update(inst)
2380
2381     _StartInstanceDisks(self.cfg, inst, None)
2382     try:
2383       feedback_fn("Running the instance OS create scripts...")
2384       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2385         raise errors.OpExecError("Could not install OS for instance %s"
2386                                  " on node %s" %
2387                                  (inst.name, inst.primary_node))
2388     finally:
2389       _ShutdownInstanceDisks(inst, self.cfg)
2390
2391
2392 class LURenameInstance(LogicalUnit):
2393   """Rename an instance.
2394
2395   """
2396   HPATH = "instance-rename"
2397   HTYPE = constants.HTYPE_INSTANCE
2398   _OP_REQP = ["instance_name", "new_name"]
2399
2400   def BuildHooksEnv(self):
2401     """Build hooks env.
2402
2403     This runs on master, primary and secondary nodes of the instance.
2404
2405     """
2406     env = _BuildInstanceHookEnvByObject(self.instance)
2407     env["INSTANCE_NEW_NAME"] = self.op.new_name
2408     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2409           list(self.instance.secondary_nodes))
2410     return env, nl, nl
2411
2412   def CheckPrereq(self):
2413     """Check prerequisites.
2414
2415     This checks that the instance is in the cluster and is not running.
2416
2417     """
2418     instance = self.cfg.GetInstanceInfo(
2419       self.cfg.ExpandInstanceName(self.op.instance_name))
2420     if instance is None:
2421       raise errors.OpPrereqError("Instance '%s' not known" %
2422                                  self.op.instance_name)
2423     if instance.status != "down":
2424       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2425                                  self.op.instance_name)
2426     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2427     if remote_info:
2428       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2429                                  (self.op.instance_name,
2430                                   instance.primary_node))
2431     self.instance = instance
2432
2433     # new name verification
2434     name_info = utils.HostInfo(self.op.new_name)
2435
2436     self.op.new_name = new_name = name_info.name
2437     instance_list = self.cfg.GetInstanceList()
2438     if new_name in instance_list:
2439       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2440                                  new_name)
2441
2442     if not getattr(self.op, "ignore_ip", False):
2443       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2444         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2445                                    (name_info.ip, new_name))
2446
2447
2448   def Exec(self, feedback_fn):
2449     """Reinstall the instance.
2450
2451     """
2452     inst = self.instance
2453     old_name = inst.name
2454
2455     if inst.disk_template == constants.DT_FILE:
2456       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2457
2458     self.cfg.RenameInstance(inst.name, self.op.new_name)
2459     # Change the instance lock. This is definitely safe while we hold the BGL
2460     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2461     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2462
2463     # re-read the instance from the configuration after rename
2464     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2465
2466     if inst.disk_template == constants.DT_FILE:
2467       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2468       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2469                                                 old_file_storage_dir,
2470                                                 new_file_storage_dir)
2471
2472       if not result:
2473         raise errors.OpExecError("Could not connect to node '%s' to rename"
2474                                  " directory '%s' to '%s' (but the instance"
2475                                  " has been renamed in Ganeti)" % (
2476                                  inst.primary_node, old_file_storage_dir,
2477                                  new_file_storage_dir))
2478
2479       if not result[0]:
2480         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2481                                  " (but the instance has been renamed in"
2482                                  " Ganeti)" % (old_file_storage_dir,
2483                                                new_file_storage_dir))
2484
2485     _StartInstanceDisks(self.cfg, inst, None)
2486     try:
2487       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2488                                           "sda", "sdb"):
2489         msg = ("Could not run OS rename script for instance %s on node %s"
2490                " (but the instance has been renamed in Ganeti)" %
2491                (inst.name, inst.primary_node))
2492         logger.Error(msg)
2493     finally:
2494       _ShutdownInstanceDisks(inst, self.cfg)
2495
2496
2497 class LURemoveInstance(LogicalUnit):
2498   """Remove an instance.
2499
2500   """
2501   HPATH = "instance-remove"
2502   HTYPE = constants.HTYPE_INSTANCE
2503   _OP_REQP = ["instance_name", "ignore_failures"]
2504   REQ_BGL = False
2505
2506   def ExpandNames(self):
2507     self._ExpandAndLockInstance()
2508     self.needed_locks[locking.LEVEL_NODE] = []
2509     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2510
2511   def DeclareLocks(self, level):
2512     if level == locking.LEVEL_NODE:
2513       self._LockInstancesNodes()
2514
2515   def BuildHooksEnv(self):
2516     """Build hooks env.
2517
2518     This runs on master, primary and secondary nodes of the instance.
2519
2520     """
2521     env = _BuildInstanceHookEnvByObject(self.instance)
2522     nl = [self.sstore.GetMasterNode()]
2523     return env, nl, nl
2524
2525   def CheckPrereq(self):
2526     """Check prerequisites.
2527
2528     This checks that the instance is in the cluster.
2529
2530     """
2531     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2532     assert self.instance is not None, \
2533       "Cannot retrieve locked instance %s" % self.op.instance_name
2534
2535   def Exec(self, feedback_fn):
2536     """Remove the instance.
2537
2538     """
2539     instance = self.instance
2540     logger.Info("shutting down instance %s on node %s" %
2541                 (instance.name, instance.primary_node))
2542
2543     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2544       if self.op.ignore_failures:
2545         feedback_fn("Warning: can't shutdown instance")
2546       else:
2547         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2548                                  (instance.name, instance.primary_node))
2549
2550     logger.Info("removing block devices for instance %s" % instance.name)
2551
2552     if not _RemoveDisks(instance, self.cfg):
2553       if self.op.ignore_failures:
2554         feedback_fn("Warning: can't remove instance's disks")
2555       else:
2556         raise errors.OpExecError("Can't remove instance's disks")
2557
2558     logger.Info("removing instance %s out of cluster config" % instance.name)
2559
2560     self.cfg.RemoveInstance(instance.name)
2561     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2562
2563
2564 class LUQueryInstances(NoHooksLU):
2565   """Logical unit for querying instances.
2566
2567   """
2568   _OP_REQP = ["output_fields", "names"]
2569   REQ_BGL = False
2570
2571   def ExpandNames(self):
2572     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2573     self.static_fields = frozenset([
2574       "name", "os", "pnode", "snodes",
2575       "admin_state", "admin_ram",
2576       "disk_template", "ip", "mac", "bridge",
2577       "sda_size", "sdb_size", "vcpus", "tags",
2578       "network_port", "kernel_path", "initrd_path",
2579       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2580       "hvm_cdrom_image_path", "hvm_nic_type",
2581       "hvm_disk_type", "vnc_bind_address",
2582       "serial_no",
2583       ])
2584     _CheckOutputFields(static=self.static_fields,
2585                        dynamic=self.dynamic_fields,
2586                        selected=self.op.output_fields)
2587
2588     self.needed_locks = {}
2589     self.share_locks[locking.LEVEL_INSTANCE] = 1
2590     self.share_locks[locking.LEVEL_NODE] = 1
2591
2592     if self.op.names:
2593       self.wanted = _GetWantedInstances(self, self.op.names)
2594     else:
2595       self.wanted = locking.ALL_SET
2596
2597     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2598     if self.do_locking:
2599       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2600       self.needed_locks[locking.LEVEL_NODE] = []
2601       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2602
2603   def DeclareLocks(self, level):
2604     if level == locking.LEVEL_NODE and self.do_locking:
2605       self._LockInstancesNodes()
2606
2607   def CheckPrereq(self):
2608     """Check prerequisites.
2609
2610     """
2611     pass
2612
2613   def Exec(self, feedback_fn):
2614     """Computes the list of nodes and their attributes.
2615
2616     """
2617     all_info = self.cfg.GetAllInstancesInfo()
2618     if self.do_locking:
2619       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2620     elif self.wanted != locking.ALL_SET:
2621       instance_names = self.wanted
2622       missing = set(instance_names).difference(all_info.keys())
2623       if missing:
2624         raise self.OpExecError(
2625           "Some instances were removed before retrieving their data: %s"
2626           % missing)
2627     else:
2628       instance_names = all_info.keys()
2629     instance_list = [all_info[iname] for iname in instance_names]
2630
2631     # begin data gathering
2632
2633     nodes = frozenset([inst.primary_node for inst in instance_list])
2634
2635     bad_nodes = []
2636     if self.dynamic_fields.intersection(self.op.output_fields):
2637       live_data = {}
2638       node_data = rpc.call_all_instances_info(nodes)
2639       for name in nodes:
2640         result = node_data[name]
2641         if result:
2642           live_data.update(result)
2643         elif result == False:
2644           bad_nodes.append(name)
2645         # else no instance is alive
2646     else:
2647       live_data = dict([(name, {}) for name in instance_names])
2648
2649     # end data gathering
2650
2651     output = []
2652     for instance in instance_list:
2653       iout = []
2654       for field in self.op.output_fields:
2655         if field == "name":
2656           val = instance.name
2657         elif field == "os":
2658           val = instance.os
2659         elif field == "pnode":
2660           val = instance.primary_node
2661         elif field == "snodes":
2662           val = list(instance.secondary_nodes)
2663         elif field == "admin_state":
2664           val = (instance.status != "down")
2665         elif field == "oper_state":
2666           if instance.primary_node in bad_nodes:
2667             val = None
2668           else:
2669             val = bool(live_data.get(instance.name))
2670         elif field == "status":
2671           if instance.primary_node in bad_nodes:
2672             val = "ERROR_nodedown"
2673           else:
2674             running = bool(live_data.get(instance.name))
2675             if running:
2676               if instance.status != "down":
2677                 val = "running"
2678               else:
2679                 val = "ERROR_up"
2680             else:
2681               if instance.status != "down":
2682                 val = "ERROR_down"
2683               else:
2684                 val = "ADMIN_down"
2685         elif field == "admin_ram":
2686           val = instance.memory
2687         elif field == "oper_ram":
2688           if instance.primary_node in bad_nodes:
2689             val = None
2690           elif instance.name in live_data:
2691             val = live_data[instance.name].get("memory", "?")
2692           else:
2693             val = "-"
2694         elif field == "disk_template":
2695           val = instance.disk_template
2696         elif field == "ip":
2697           val = instance.nics[0].ip
2698         elif field == "bridge":
2699           val = instance.nics[0].bridge
2700         elif field == "mac":
2701           val = instance.nics[0].mac
2702         elif field == "sda_size" or field == "sdb_size":
2703           disk = instance.FindDisk(field[:3])
2704           if disk is None:
2705             val = None
2706           else:
2707             val = disk.size
2708         elif field == "vcpus":
2709           val = instance.vcpus
2710         elif field == "tags":
2711           val = list(instance.GetTags())
2712         elif field == "serial_no":
2713           val = instance.serial_no
2714         elif field in ("network_port", "kernel_path", "initrd_path",
2715                        "hvm_boot_order", "hvm_acpi", "hvm_pae",
2716                        "hvm_cdrom_image_path", "hvm_nic_type",
2717                        "hvm_disk_type", "vnc_bind_address"):
2718           val = getattr(instance, field, None)
2719           if val is not None:
2720             pass
2721           elif field in ("hvm_nic_type", "hvm_disk_type",
2722                          "kernel_path", "initrd_path"):
2723             val = "default"
2724           else:
2725             val = "-"
2726         else:
2727           raise errors.ParameterError(field)
2728         iout.append(val)
2729       output.append(iout)
2730
2731     return output
2732
2733
2734 class LUFailoverInstance(LogicalUnit):
2735   """Failover an instance.
2736
2737   """
2738   HPATH = "instance-failover"
2739   HTYPE = constants.HTYPE_INSTANCE
2740   _OP_REQP = ["instance_name", "ignore_consistency"]
2741   REQ_BGL = False
2742
2743   def ExpandNames(self):
2744     self._ExpandAndLockInstance()
2745     self.needed_locks[locking.LEVEL_NODE] = []
2746     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2747
2748   def DeclareLocks(self, level):
2749     if level == locking.LEVEL_NODE:
2750       self._LockInstancesNodes()
2751
2752   def BuildHooksEnv(self):
2753     """Build hooks env.
2754
2755     This runs on master, primary and secondary nodes of the instance.
2756
2757     """
2758     env = {
2759       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2760       }
2761     env.update(_BuildInstanceHookEnvByObject(self.instance))
2762     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2763     return env, nl, nl
2764
2765   def CheckPrereq(self):
2766     """Check prerequisites.
2767
2768     This checks that the instance is in the cluster.
2769
2770     """
2771     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2772     assert self.instance is not None, \
2773       "Cannot retrieve locked instance %s" % self.op.instance_name
2774
2775     if instance.disk_template not in constants.DTS_NET_MIRROR:
2776       raise errors.OpPrereqError("Instance's disk layout is not"
2777                                  " network mirrored, cannot failover.")
2778
2779     secondary_nodes = instance.secondary_nodes
2780     if not secondary_nodes:
2781       raise errors.ProgrammerError("no secondary node but using "
2782                                    "a mirrored disk template")
2783
2784     target_node = secondary_nodes[0]
2785     # check memory requirements on the secondary node
2786     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2787                          instance.name, instance.memory)
2788
2789     # check bridge existance
2790     brlist = [nic.bridge for nic in instance.nics]
2791     if not rpc.call_bridges_exist(target_node, brlist):
2792       raise errors.OpPrereqError("One or more target bridges %s does not"
2793                                  " exist on destination node '%s'" %
2794                                  (brlist, target_node))
2795
2796   def Exec(self, feedback_fn):
2797     """Failover an instance.
2798
2799     The failover is done by shutting it down on its present node and
2800     starting it on the secondary.
2801
2802     """
2803     instance = self.instance
2804
2805     source_node = instance.primary_node
2806     target_node = instance.secondary_nodes[0]
2807
2808     feedback_fn("* checking disk consistency between source and target")
2809     for dev in instance.disks:
2810       # for drbd, these are drbd over lvm
2811       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2812         if instance.status == "up" and not self.op.ignore_consistency:
2813           raise errors.OpExecError("Disk %s is degraded on target node,"
2814                                    " aborting failover." % dev.iv_name)
2815
2816     feedback_fn("* shutting down instance on source node")
2817     logger.Info("Shutting down instance %s on node %s" %
2818                 (instance.name, source_node))
2819
2820     if not rpc.call_instance_shutdown(source_node, instance):
2821       if self.op.ignore_consistency:
2822         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2823                      " anyway. Please make sure node %s is down"  %
2824                      (instance.name, source_node, source_node))
2825       else:
2826         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2827                                  (instance.name, source_node))
2828
2829     feedback_fn("* deactivating the instance's disks on source node")
2830     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2831       raise errors.OpExecError("Can't shut down the instance's disks.")
2832
2833     instance.primary_node = target_node
2834     # distribute new instance config to the other nodes
2835     self.cfg.Update(instance)
2836
2837     # Only start the instance if it's marked as up
2838     if instance.status == "up":
2839       feedback_fn("* activating the instance's disks on target node")
2840       logger.Info("Starting instance %s on node %s" %
2841                   (instance.name, target_node))
2842
2843       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2844                                                ignore_secondaries=True)
2845       if not disks_ok:
2846         _ShutdownInstanceDisks(instance, self.cfg)
2847         raise errors.OpExecError("Can't activate the instance's disks")
2848
2849       feedback_fn("* starting the instance on the target node")
2850       if not rpc.call_instance_start(target_node, instance, None):
2851         _ShutdownInstanceDisks(instance, self.cfg)
2852         raise errors.OpExecError("Could not start instance %s on node %s." %
2853                                  (instance.name, target_node))
2854
2855
2856 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2857   """Create a tree of block devices on the primary node.
2858
2859   This always creates all devices.
2860
2861   """
2862   if device.children:
2863     for child in device.children:
2864       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2865         return False
2866
2867   cfg.SetDiskID(device, node)
2868   new_id = rpc.call_blockdev_create(node, device, device.size,
2869                                     instance.name, True, info)
2870   if not new_id:
2871     return False
2872   if device.physical_id is None:
2873     device.physical_id = new_id
2874   return True
2875
2876
2877 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2878   """Create a tree of block devices on a secondary node.
2879
2880   If this device type has to be created on secondaries, create it and
2881   all its children.
2882
2883   If not, just recurse to children keeping the same 'force' value.
2884
2885   """
2886   if device.CreateOnSecondary():
2887     force = True
2888   if device.children:
2889     for child in device.children:
2890       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2891                                         child, force, info):
2892         return False
2893
2894   if not force:
2895     return True
2896   cfg.SetDiskID(device, node)
2897   new_id = rpc.call_blockdev_create(node, device, device.size,
2898                                     instance.name, False, info)
2899   if not new_id:
2900     return False
2901   if device.physical_id is None:
2902     device.physical_id = new_id
2903   return True
2904
2905
2906 def _GenerateUniqueNames(cfg, exts):
2907   """Generate a suitable LV name.
2908
2909   This will generate a logical volume name for the given instance.
2910
2911   """
2912   results = []
2913   for val in exts:
2914     new_id = cfg.GenerateUniqueID()
2915     results.append("%s%s" % (new_id, val))
2916   return results
2917
2918
2919 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name,
2920                          p_minor, s_minor):
2921   """Generate a drbd8 device complete with its children.
2922
2923   """
2924   port = cfg.AllocatePort()
2925   vgname = cfg.GetVGName()
2926   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2927                           logical_id=(vgname, names[0]))
2928   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2929                           logical_id=(vgname, names[1]))
2930   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2931                           logical_id=(primary, secondary, port,
2932                                       p_minor, s_minor),
2933                           children=[dev_data, dev_meta],
2934                           iv_name=iv_name)
2935   return drbd_dev
2936
2937
2938 def _GenerateDiskTemplate(cfg, template_name,
2939                           instance_name, primary_node,
2940                           secondary_nodes, disk_sz, swap_sz,
2941                           file_storage_dir, file_driver):
2942   """Generate the entire disk layout for a given template type.
2943
2944   """
2945   #TODO: compute space requirements
2946
2947   vgname = cfg.GetVGName()
2948   if template_name == constants.DT_DISKLESS:
2949     disks = []
2950   elif template_name == constants.DT_PLAIN:
2951     if len(secondary_nodes) != 0:
2952       raise errors.ProgrammerError("Wrong template configuration")
2953
2954     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2955     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2956                            logical_id=(vgname, names[0]),
2957                            iv_name = "sda")
2958     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2959                            logical_id=(vgname, names[1]),
2960                            iv_name = "sdb")
2961     disks = [sda_dev, sdb_dev]
2962   elif template_name == constants.DT_DRBD8:
2963     if len(secondary_nodes) != 1:
2964       raise errors.ProgrammerError("Wrong template configuration")
2965     remote_node = secondary_nodes[0]
2966     (minor_pa, minor_pb,
2967      minor_sa, minor_sb) = cfg.AllocateDRBDMinor(
2968       [primary_node, primary_node, remote_node, remote_node], instance_name)
2969
2970     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2971                                        ".sdb_data", ".sdb_meta"])
2972     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2973                                         disk_sz, names[0:2], "sda",
2974                                         minor_pa, minor_sa)
2975     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2976                                         swap_sz, names[2:4], "sdb",
2977                                         minor_pb, minor_sb)
2978     disks = [drbd_sda_dev, drbd_sdb_dev]
2979   elif template_name == constants.DT_FILE:
2980     if len(secondary_nodes) != 0:
2981       raise errors.ProgrammerError("Wrong template configuration")
2982
2983     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2984                                 iv_name="sda", logical_id=(file_driver,
2985                                 "%s/sda" % file_storage_dir))
2986     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2987                                 iv_name="sdb", logical_id=(file_driver,
2988                                 "%s/sdb" % file_storage_dir))
2989     disks = [file_sda_dev, file_sdb_dev]
2990   else:
2991     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2992   return disks
2993
2994
2995 def _GetInstanceInfoText(instance):
2996   """Compute that text that should be added to the disk's metadata.
2997
2998   """
2999   return "originstname+%s" % instance.name
3000
3001
3002 def _CreateDisks(cfg, instance):
3003   """Create all disks for an instance.
3004
3005   This abstracts away some work from AddInstance.
3006
3007   Args:
3008     instance: the instance object
3009
3010   Returns:
3011     True or False showing the success of the creation process
3012
3013   """
3014   info = _GetInstanceInfoText(instance)
3015
3016   if instance.disk_template == constants.DT_FILE:
3017     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3018     result = rpc.call_file_storage_dir_create(instance.primary_node,
3019                                               file_storage_dir)
3020
3021     if not result:
3022       logger.Error("Could not connect to node '%s'" % instance.primary_node)
3023       return False
3024
3025     if not result[0]:
3026       logger.Error("failed to create directory '%s'" % file_storage_dir)
3027       return False
3028
3029   for device in instance.disks:
3030     logger.Info("creating volume %s for instance %s" %
3031                 (device.iv_name, instance.name))
3032     #HARDCODE
3033     for secondary_node in instance.secondary_nodes:
3034       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3035                                         device, False, info):
3036         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3037                      (device.iv_name, device, secondary_node))
3038         return False
3039     #HARDCODE
3040     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3041                                     instance, device, info):
3042       logger.Error("failed to create volume %s on primary!" %
3043                    device.iv_name)
3044       return False
3045
3046   return True
3047
3048
3049 def _RemoveDisks(instance, cfg):
3050   """Remove all disks for an instance.
3051
3052   This abstracts away some work from `AddInstance()` and
3053   `RemoveInstance()`. Note that in case some of the devices couldn't
3054   be removed, the removal will continue with the other ones (compare
3055   with `_CreateDisks()`).
3056
3057   Args:
3058     instance: the instance object
3059
3060   Returns:
3061     True or False showing the success of the removal proces
3062
3063   """
3064   logger.Info("removing block devices for instance %s" % instance.name)
3065
3066   result = True
3067   for device in instance.disks:
3068     for node, disk in device.ComputeNodeTree(instance.primary_node):
3069       cfg.SetDiskID(disk, node)
3070       if not rpc.call_blockdev_remove(node, disk):
3071         logger.Error("could not remove block device %s on node %s,"
3072                      " continuing anyway" %
3073                      (device.iv_name, node))
3074         result = False
3075
3076   if instance.disk_template == constants.DT_FILE:
3077     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3078     if not rpc.call_file_storage_dir_remove(instance.primary_node,
3079                                             file_storage_dir):
3080       logger.Error("could not remove directory '%s'" % file_storage_dir)
3081       result = False
3082
3083   return result
3084
3085
3086 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3087   """Compute disk size requirements in the volume group
3088
3089   This is currently hard-coded for the two-drive layout.
3090
3091   """
3092   # Required free disk space as a function of disk and swap space
3093   req_size_dict = {
3094     constants.DT_DISKLESS: None,
3095     constants.DT_PLAIN: disk_size + swap_size,
3096     # 256 MB are added for drbd metadata, 128MB for each drbd device
3097     constants.DT_DRBD8: disk_size + swap_size + 256,
3098     constants.DT_FILE: None,
3099   }
3100
3101   if disk_template not in req_size_dict:
3102     raise errors.ProgrammerError("Disk template '%s' size requirement"
3103                                  " is unknown" %  disk_template)
3104
3105   return req_size_dict[disk_template]
3106
3107
3108 class LUCreateInstance(LogicalUnit):
3109   """Create an instance.
3110
3111   """
3112   HPATH = "instance-add"
3113   HTYPE = constants.HTYPE_INSTANCE
3114   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3115               "disk_template", "swap_size", "mode", "start", "vcpus",
3116               "wait_for_sync", "ip_check", "mac"]
3117   REQ_BGL = False
3118
3119   def _ExpandNode(self, node):
3120     """Expands and checks one node name.
3121
3122     """
3123     node_full = self.cfg.ExpandNodeName(node)
3124     if node_full is None:
3125       raise errors.OpPrereqError("Unknown node %s" % node)
3126     return node_full
3127
3128   def ExpandNames(self):
3129     """ExpandNames for CreateInstance.
3130
3131     Figure out the right locks for instance creation.
3132
3133     """
3134     self.needed_locks = {}
3135
3136     # set optional parameters to none if they don't exist
3137     for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3138                  "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3139                  "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3140                  "vnc_bind_address"]:
3141       if not hasattr(self.op, attr):
3142         setattr(self.op, attr, None)
3143
3144     # verify creation mode
3145     if self.op.mode not in (constants.INSTANCE_CREATE,
3146                             constants.INSTANCE_IMPORT):
3147       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3148                                  self.op.mode)
3149     # disk template and mirror node verification
3150     if self.op.disk_template not in constants.DISK_TEMPLATES:
3151       raise errors.OpPrereqError("Invalid disk template name")
3152
3153     #### instance parameters check
3154
3155     # instance name verification
3156     hostname1 = utils.HostInfo(self.op.instance_name)
3157     self.op.instance_name = instance_name = hostname1.name
3158
3159     # this is just a preventive check, but someone might still add this
3160     # instance in the meantime, and creation will fail at lock-add time
3161     if instance_name in self.cfg.GetInstanceList():
3162       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3163                                  instance_name)
3164
3165     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3166
3167     # ip validity checks
3168     ip = getattr(self.op, "ip", None)
3169     if ip is None or ip.lower() == "none":
3170       inst_ip = None
3171     elif ip.lower() == "auto":
3172       inst_ip = hostname1.ip
3173     else:
3174       if not utils.IsValidIP(ip):
3175         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3176                                    " like a valid IP" % ip)
3177       inst_ip = ip
3178     self.inst_ip = self.op.ip = inst_ip
3179     # used in CheckPrereq for ip ping check
3180     self.check_ip = hostname1.ip
3181
3182     # MAC address verification
3183     if self.op.mac != "auto":
3184       if not utils.IsValidMac(self.op.mac.lower()):
3185         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3186                                    self.op.mac)
3187
3188     # boot order verification
3189     if self.op.hvm_boot_order is not None:
3190       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3191         raise errors.OpPrereqError("invalid boot order specified,"
3192                                    " must be one or more of [acdn]")
3193     # file storage checks
3194     if (self.op.file_driver and
3195         not self.op.file_driver in constants.FILE_DRIVER):
3196       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3197                                  self.op.file_driver)
3198
3199     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3200       raise errors.OpPrereqError("File storage directory path not absolute")
3201
3202     ### Node/iallocator related checks
3203     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3204       raise errors.OpPrereqError("One and only one of iallocator and primary"
3205                                  " node must be given")
3206
3207     if self.op.iallocator:
3208       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3209     else:
3210       self.op.pnode = self._ExpandNode(self.op.pnode)
3211       nodelist = [self.op.pnode]
3212       if self.op.snode is not None:
3213         self.op.snode = self._ExpandNode(self.op.snode)
3214         nodelist.append(self.op.snode)
3215       self.needed_locks[locking.LEVEL_NODE] = nodelist
3216
3217     # in case of import lock the source node too
3218     if self.op.mode == constants.INSTANCE_IMPORT:
3219       src_node = getattr(self.op, "src_node", None)
3220       src_path = getattr(self.op, "src_path", None)
3221
3222       if src_node is None or src_path is None:
3223         raise errors.OpPrereqError("Importing an instance requires source"
3224                                    " node and path options")
3225
3226       if not os.path.isabs(src_path):
3227         raise errors.OpPrereqError("The source path must be absolute")
3228
3229       self.op.src_node = src_node = self._ExpandNode(src_node)
3230       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3231         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3232
3233     else: # INSTANCE_CREATE
3234       if getattr(self.op, "os_type", None) is None:
3235         raise errors.OpPrereqError("No guest OS specified")
3236
3237   def _RunAllocator(self):
3238     """Run the allocator based on input opcode.
3239
3240     """
3241     disks = [{"size": self.op.disk_size, "mode": "w"},
3242              {"size": self.op.swap_size, "mode": "w"}]
3243     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3244              "bridge": self.op.bridge}]
3245     ial = IAllocator(self.cfg, self.sstore,
3246                      mode=constants.IALLOCATOR_MODE_ALLOC,
3247                      name=self.op.instance_name,
3248                      disk_template=self.op.disk_template,
3249                      tags=[],
3250                      os=self.op.os_type,
3251                      vcpus=self.op.vcpus,
3252                      mem_size=self.op.mem_size,
3253                      disks=disks,
3254                      nics=nics,
3255                      )
3256
3257     ial.Run(self.op.iallocator)
3258
3259     if not ial.success:
3260       raise errors.OpPrereqError("Can't compute nodes using"
3261                                  " iallocator '%s': %s" % (self.op.iallocator,
3262                                                            ial.info))
3263     if len(ial.nodes) != ial.required_nodes:
3264       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3265                                  " of nodes (%s), required %s" %
3266                                  (self.op.iallocator, len(ial.nodes),
3267                                   ial.required_nodes))
3268     self.op.pnode = ial.nodes[0]
3269     logger.ToStdout("Selected nodes for the instance: %s" %
3270                     (", ".join(ial.nodes),))
3271     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3272                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3273     if ial.required_nodes == 2:
3274       self.op.snode = ial.nodes[1]
3275
3276   def BuildHooksEnv(self):
3277     """Build hooks env.
3278
3279     This runs on master, primary and secondary nodes of the instance.
3280
3281     """
3282     env = {
3283       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3284       "INSTANCE_DISK_SIZE": self.op.disk_size,
3285       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3286       "INSTANCE_ADD_MODE": self.op.mode,
3287       }
3288     if self.op.mode == constants.INSTANCE_IMPORT:
3289       env["INSTANCE_SRC_NODE"] = self.op.src_node
3290       env["INSTANCE_SRC_PATH"] = self.op.src_path
3291       env["INSTANCE_SRC_IMAGE"] = self.src_image
3292
3293     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3294       primary_node=self.op.pnode,
3295       secondary_nodes=self.secondaries,
3296       status=self.instance_status,
3297       os_type=self.op.os_type,
3298       memory=self.op.mem_size,
3299       vcpus=self.op.vcpus,
3300       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3301     ))
3302
3303     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3304           self.secondaries)
3305     return env, nl, nl
3306
3307
3308   def CheckPrereq(self):
3309     """Check prerequisites.
3310
3311     """
3312     if (not self.cfg.GetVGName() and
3313         self.op.disk_template not in constants.DTS_NOT_LVM):
3314       raise errors.OpPrereqError("Cluster does not support lvm-based"
3315                                  " instances")
3316
3317     if self.op.mode == constants.INSTANCE_IMPORT:
3318       src_node = self.op.src_node
3319       src_path = self.op.src_path
3320
3321       export_info = rpc.call_export_info(src_node, src_path)
3322
3323       if not export_info:
3324         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3325
3326       if not export_info.has_section(constants.INISECT_EXP):
3327         raise errors.ProgrammerError("Corrupted export config")
3328
3329       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3330       if (int(ei_version) != constants.EXPORT_VERSION):
3331         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3332                                    (ei_version, constants.EXPORT_VERSION))
3333
3334       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3335         raise errors.OpPrereqError("Can't import instance with more than"
3336                                    " one data disk")
3337
3338       # FIXME: are the old os-es, disk sizes, etc. useful?
3339       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3340       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3341                                                          'disk0_dump'))
3342       self.src_image = diskimage
3343
3344     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3345
3346     if self.op.start and not self.op.ip_check:
3347       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3348                                  " adding an instance in start mode")
3349
3350     if self.op.ip_check:
3351       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3352         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3353                                    (self.check_ip, instance_name))
3354
3355     # bridge verification
3356     bridge = getattr(self.op, "bridge", None)
3357     if bridge is None:
3358       self.op.bridge = self.cfg.GetDefBridge()
3359     else:
3360       self.op.bridge = bridge
3361
3362     #### allocator run
3363
3364     if self.op.iallocator is not None:
3365       self._RunAllocator()
3366
3367     #### node related checks
3368
3369     # check primary node
3370     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3371     assert self.pnode is not None, \
3372       "Cannot retrieve locked node %s" % self.op.pnode
3373     self.secondaries = []
3374
3375     # mirror node verification
3376     if self.op.disk_template in constants.DTS_NET_MIRROR:
3377       if self.op.snode is None:
3378         raise errors.OpPrereqError("The networked disk templates need"
3379                                    " a mirror node")
3380       if self.op.snode == pnode.name:
3381         raise errors.OpPrereqError("The secondary node cannot be"
3382                                    " the primary node.")
3383       self.secondaries.append(self.op.snode)
3384
3385     req_size = _ComputeDiskSize(self.op.disk_template,
3386                                 self.op.disk_size, self.op.swap_size)
3387
3388     # Check lv size requirements
3389     if req_size is not None:
3390       nodenames = [pnode.name] + self.secondaries
3391       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3392       for node in nodenames:
3393         info = nodeinfo.get(node, None)
3394         if not info:
3395           raise errors.OpPrereqError("Cannot get current information"
3396                                      " from node '%s'" % node)
3397         vg_free = info.get('vg_free', None)
3398         if not isinstance(vg_free, int):
3399           raise errors.OpPrereqError("Can't compute free disk space on"
3400                                      " node %s" % node)
3401         if req_size > info['vg_free']:
3402           raise errors.OpPrereqError("Not enough disk space on target node %s."
3403                                      " %d MB available, %d MB required" %
3404                                      (node, info['vg_free'], req_size))
3405
3406     # os verification
3407     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3408     if not os_obj:
3409       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3410                                  " primary node"  % self.op.os_type)
3411
3412     if self.op.kernel_path == constants.VALUE_NONE:
3413       raise errors.OpPrereqError("Can't set instance kernel to none")
3414
3415     # bridge check on primary node
3416     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3417       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3418                                  " destination node '%s'" %
3419                                  (self.op.bridge, pnode.name))
3420
3421     # memory check on primary node
3422     if self.op.start:
3423       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3424                            "creating instance %s" % self.op.instance_name,
3425                            self.op.mem_size)
3426
3427     # hvm_cdrom_image_path verification
3428     if self.op.hvm_cdrom_image_path is not None:
3429       # FIXME (als): shouldn't these checks happen on the destination node?
3430       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3431         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3432                                    " be an absolute path or None, not %s" %
3433                                    self.op.hvm_cdrom_image_path)
3434       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3435         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3436                                    " regular file or a symlink pointing to"
3437                                    " an existing regular file, not %s" %
3438                                    self.op.hvm_cdrom_image_path)
3439
3440     # vnc_bind_address verification
3441     if self.op.vnc_bind_address is not None:
3442       if not utils.IsValidIP(self.op.vnc_bind_address):
3443         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3444                                    " like a valid IP address" %
3445                                    self.op.vnc_bind_address)
3446
3447     # Xen HVM device type checks
3448     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3449       if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3450         raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3451                                    " hypervisor" % self.op.hvm_nic_type)
3452       if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3453         raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3454                                    " hypervisor" % self.op.hvm_disk_type)
3455
3456     if self.op.start:
3457       self.instance_status = 'up'
3458     else:
3459       self.instance_status = 'down'
3460
3461   def Exec(self, feedback_fn):
3462     """Create and add the instance to the cluster.
3463
3464     """
3465     instance = self.op.instance_name
3466     pnode_name = self.pnode.name
3467
3468     if self.op.mac == "auto":
3469       mac_address = self.cfg.GenerateMAC()
3470     else:
3471       mac_address = self.op.mac
3472
3473     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3474     if self.inst_ip is not None:
3475       nic.ip = self.inst_ip
3476
3477     ht_kind = self.sstore.GetHypervisorType()
3478     if ht_kind in constants.HTS_REQ_PORT:
3479       network_port = self.cfg.AllocatePort()
3480     else:
3481       network_port = None
3482
3483     if self.op.vnc_bind_address is None:
3484       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3485
3486     # this is needed because os.path.join does not accept None arguments
3487     if self.op.file_storage_dir is None:
3488       string_file_storage_dir = ""
3489     else:
3490       string_file_storage_dir = self.op.file_storage_dir
3491
3492     # build the full file storage dir path
3493     file_storage_dir = os.path.normpath(os.path.join(
3494                                         self.sstore.GetFileStorageDir(),
3495                                         string_file_storage_dir, instance))
3496
3497
3498     disks = _GenerateDiskTemplate(self.cfg,
3499                                   self.op.disk_template,
3500                                   instance, pnode_name,
3501                                   self.secondaries, self.op.disk_size,
3502                                   self.op.swap_size,
3503                                   file_storage_dir,
3504                                   self.op.file_driver)
3505
3506     iobj = objects.Instance(name=instance, os=self.op.os_type,
3507                             primary_node=pnode_name,
3508                             memory=self.op.mem_size,
3509                             vcpus=self.op.vcpus,
3510                             nics=[nic], disks=disks,
3511                             disk_template=self.op.disk_template,
3512                             status=self.instance_status,
3513                             network_port=network_port,
3514                             kernel_path=self.op.kernel_path,
3515                             initrd_path=self.op.initrd_path,
3516                             hvm_boot_order=self.op.hvm_boot_order,
3517                             hvm_acpi=self.op.hvm_acpi,
3518                             hvm_pae=self.op.hvm_pae,
3519                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3520                             vnc_bind_address=self.op.vnc_bind_address,
3521                             hvm_nic_type=self.op.hvm_nic_type,
3522                             hvm_disk_type=self.op.hvm_disk_type,
3523                             )
3524
3525     feedback_fn("* creating instance disks...")
3526     if not _CreateDisks(self.cfg, iobj):
3527       _RemoveDisks(iobj, self.cfg)
3528       self.cfg.ReleaseDRBDMinors(instance)
3529       raise errors.OpExecError("Device creation failed, reverting...")
3530
3531     feedback_fn("adding instance %s to cluster config" % instance)
3532
3533     self.cfg.AddInstance(iobj)
3534     # Declare that we don't want to remove the instance lock anymore, as we've
3535     # added the instance to the config
3536     del self.remove_locks[locking.LEVEL_INSTANCE]
3537     # Remove the temp. assignements for the instance's drbds
3538     self.cfg.ReleaseDRBDMinors(instance)
3539
3540     if self.op.wait_for_sync:
3541       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3542     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3543       # make sure the disks are not degraded (still sync-ing is ok)
3544       time.sleep(15)
3545       feedback_fn("* checking mirrors status")
3546       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3547     else:
3548       disk_abort = False
3549
3550     if disk_abort:
3551       _RemoveDisks(iobj, self.cfg)
3552       self.cfg.RemoveInstance(iobj.name)
3553       # Make sure the instance lock gets removed
3554       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3555       raise errors.OpExecError("There are some degraded disks for"
3556                                " this instance")
3557
3558     feedback_fn("creating os for instance %s on node %s" %
3559                 (instance, pnode_name))
3560
3561     if iobj.disk_template != constants.DT_DISKLESS:
3562       if self.op.mode == constants.INSTANCE_CREATE:
3563         feedback_fn("* running the instance OS create scripts...")
3564         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3565           raise errors.OpExecError("could not add os for instance %s"
3566                                    " on node %s" %
3567                                    (instance, pnode_name))
3568
3569       elif self.op.mode == constants.INSTANCE_IMPORT:
3570         feedback_fn("* running the instance OS import scripts...")
3571         src_node = self.op.src_node
3572         src_image = self.src_image
3573         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3574                                                 src_node, src_image):
3575           raise errors.OpExecError("Could not import os for instance"
3576                                    " %s on node %s" %
3577                                    (instance, pnode_name))
3578       else:
3579         # also checked in the prereq part
3580         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3581                                      % self.op.mode)
3582
3583     if self.op.start:
3584       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3585       feedback_fn("* starting instance...")
3586       if not rpc.call_instance_start(pnode_name, iobj, None):
3587         raise errors.OpExecError("Could not start instance")
3588
3589
3590 class LUConnectConsole(NoHooksLU):
3591   """Connect to an instance's console.
3592
3593   This is somewhat special in that it returns the command line that
3594   you need to run on the master node in order to connect to the
3595   console.
3596
3597   """
3598   _OP_REQP = ["instance_name"]
3599   REQ_BGL = False
3600
3601   def ExpandNames(self):
3602     self._ExpandAndLockInstance()
3603
3604   def CheckPrereq(self):
3605     """Check prerequisites.
3606
3607     This checks that the instance is in the cluster.
3608
3609     """
3610     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3611     assert self.instance is not None, \
3612       "Cannot retrieve locked instance %s" % self.op.instance_name
3613
3614   def Exec(self, feedback_fn):
3615     """Connect to the console of an instance
3616
3617     """
3618     instance = self.instance
3619     node = instance.primary_node
3620
3621     node_insts = rpc.call_instance_list([node])[node]
3622     if node_insts is False:
3623       raise errors.OpExecError("Can't connect to node %s." % node)
3624
3625     if instance.name not in node_insts:
3626       raise errors.OpExecError("Instance %s is not running." % instance.name)
3627
3628     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3629
3630     hyper = hypervisor.GetHypervisor()
3631     console_cmd = hyper.GetShellCommandForConsole(instance)
3632
3633     # build ssh cmdline
3634     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3635
3636
3637 class LUReplaceDisks(LogicalUnit):
3638   """Replace the disks of an instance.
3639
3640   """
3641   HPATH = "mirrors-replace"
3642   HTYPE = constants.HTYPE_INSTANCE
3643   _OP_REQP = ["instance_name", "mode", "disks"]
3644   REQ_BGL = False
3645
3646   def ExpandNames(self):
3647     self._ExpandAndLockInstance()
3648
3649     if not hasattr(self.op, "remote_node"):
3650       self.op.remote_node = None
3651
3652     ia_name = getattr(self.op, "iallocator", None)
3653     if ia_name is not None:
3654       if self.op.remote_node is not None:
3655         raise errors.OpPrereqError("Give either the iallocator or the new"
3656                                    " secondary, not both")
3657       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3658     elif self.op.remote_node is not None:
3659       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3660       if remote_node is None:
3661         raise errors.OpPrereqError("Node '%s' not known" %
3662                                    self.op.remote_node)
3663       self.op.remote_node = remote_node
3664       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3665       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3666     else:
3667       self.needed_locks[locking.LEVEL_NODE] = []
3668       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3669
3670   def DeclareLocks(self, level):
3671     # If we're not already locking all nodes in the set we have to declare the
3672     # instance's primary/secondary nodes.
3673     if (level == locking.LEVEL_NODE and
3674         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3675       self._LockInstancesNodes()
3676
3677   def _RunAllocator(self):
3678     """Compute a new secondary node using an IAllocator.
3679
3680     """
3681     ial = IAllocator(self.cfg, self.sstore,
3682                      mode=constants.IALLOCATOR_MODE_RELOC,
3683                      name=self.op.instance_name,
3684                      relocate_from=[self.sec_node])
3685
3686     ial.Run(self.op.iallocator)
3687
3688     if not ial.success:
3689       raise errors.OpPrereqError("Can't compute nodes using"
3690                                  " iallocator '%s': %s" % (self.op.iallocator,
3691                                                            ial.info))
3692     if len(ial.nodes) != ial.required_nodes:
3693       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3694                                  " of nodes (%s), required %s" %
3695                                  (len(ial.nodes), ial.required_nodes))
3696     self.op.remote_node = ial.nodes[0]
3697     logger.ToStdout("Selected new secondary for the instance: %s" %
3698                     self.op.remote_node)
3699
3700   def BuildHooksEnv(self):
3701     """Build hooks env.
3702
3703     This runs on the master, the primary and all the secondaries.
3704
3705     """
3706     env = {
3707       "MODE": self.op.mode,
3708       "NEW_SECONDARY": self.op.remote_node,
3709       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3710       }
3711     env.update(_BuildInstanceHookEnvByObject(self.instance))
3712     nl = [
3713       self.sstore.GetMasterNode(),
3714       self.instance.primary_node,
3715       ]
3716     if self.op.remote_node is not None:
3717       nl.append(self.op.remote_node)
3718     return env, nl, nl
3719
3720   def CheckPrereq(self):
3721     """Check prerequisites.
3722
3723     This checks that the instance is in the cluster.
3724
3725     """
3726     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3727     assert instance is not None, \
3728       "Cannot retrieve locked instance %s" % self.op.instance_name
3729     self.instance = instance
3730
3731     if instance.disk_template not in constants.DTS_NET_MIRROR:
3732       raise errors.OpPrereqError("Instance's disk layout is not"
3733                                  " network mirrored.")
3734
3735     if len(instance.secondary_nodes) != 1:
3736       raise errors.OpPrereqError("The instance has a strange layout,"
3737                                  " expected one secondary but found %d" %
3738                                  len(instance.secondary_nodes))
3739
3740     self.sec_node = instance.secondary_nodes[0]
3741
3742     ia_name = getattr(self.op, "iallocator", None)
3743     if ia_name is not None:
3744       self._RunAllocator()
3745
3746     remote_node = self.op.remote_node
3747     if remote_node is not None:
3748       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3749       assert self.remote_node_info is not None, \
3750         "Cannot retrieve locked node %s" % remote_node
3751     else:
3752       self.remote_node_info = None
3753     if remote_node == instance.primary_node:
3754       raise errors.OpPrereqError("The specified node is the primary node of"
3755                                  " the instance.")
3756     elif remote_node == self.sec_node:
3757       if self.op.mode == constants.REPLACE_DISK_SEC:
3758         # this is for DRBD8, where we can't execute the same mode of
3759         # replacement as for drbd7 (no different port allocated)
3760         raise errors.OpPrereqError("Same secondary given, cannot execute"
3761                                    " replacement")
3762     if instance.disk_template == constants.DT_DRBD8:
3763       if (self.op.mode == constants.REPLACE_DISK_ALL and
3764           remote_node is not None):
3765         # switch to replace secondary mode
3766         self.op.mode = constants.REPLACE_DISK_SEC
3767
3768       if self.op.mode == constants.REPLACE_DISK_ALL:
3769         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3770                                    " secondary disk replacement, not"
3771                                    " both at once")
3772       elif self.op.mode == constants.REPLACE_DISK_PRI:
3773         if remote_node is not None:
3774           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3775                                      " the secondary while doing a primary"
3776                                      " node disk replacement")
3777         self.tgt_node = instance.primary_node
3778         self.oth_node = instance.secondary_nodes[0]
3779       elif self.op.mode == constants.REPLACE_DISK_SEC:
3780         self.new_node = remote_node # this can be None, in which case
3781                                     # we don't change the secondary
3782         self.tgt_node = instance.secondary_nodes[0]
3783         self.oth_node = instance.primary_node
3784       else:
3785         raise errors.ProgrammerError("Unhandled disk replace mode")
3786
3787     for name in self.op.disks:
3788       if instance.FindDisk(name) is None:
3789         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3790                                    (name, instance.name))
3791
3792   def _ExecD8DiskOnly(self, feedback_fn):
3793     """Replace a disk on the primary or secondary for dbrd8.
3794
3795     The algorithm for replace is quite complicated:
3796       - for each disk to be replaced:
3797         - create new LVs on the target node with unique names
3798         - detach old LVs from the drbd device
3799         - rename old LVs to name_replaced.<time_t>
3800         - rename new LVs to old LVs
3801         - attach the new LVs (with the old names now) to the drbd device
3802       - wait for sync across all devices
3803       - for each modified disk:
3804         - remove old LVs (which have the name name_replaces.<time_t>)
3805
3806     Failures are not very well handled.
3807
3808     """
3809     steps_total = 6
3810     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3811     instance = self.instance
3812     iv_names = {}
3813     vgname = self.cfg.GetVGName()
3814     # start of work
3815     cfg = self.cfg
3816     tgt_node = self.tgt_node
3817     oth_node = self.oth_node
3818
3819     # Step: check device activation
3820     self.proc.LogStep(1, steps_total, "check device existence")
3821     info("checking volume groups")
3822     my_vg = cfg.GetVGName()
3823     results = rpc.call_vg_list([oth_node, tgt_node])
3824     if not results:
3825       raise errors.OpExecError("Can't list volume groups on the nodes")
3826     for node in oth_node, tgt_node:
3827       res = results.get(node, False)
3828       if not res or my_vg not in res:
3829         raise errors.OpExecError("Volume group '%s' not found on %s" %
3830                                  (my_vg, node))
3831     for dev in instance.disks:
3832       if not dev.iv_name in self.op.disks:
3833         continue
3834       for node in tgt_node, oth_node:
3835         info("checking %s on %s" % (dev.iv_name, node))
3836         cfg.SetDiskID(dev, node)
3837         if not rpc.call_blockdev_find(node, dev):
3838           raise errors.OpExecError("Can't find device %s on node %s" %
3839                                    (dev.iv_name, node))
3840
3841     # Step: check other node consistency
3842     self.proc.LogStep(2, steps_total, "check peer consistency")
3843     for dev in instance.disks:
3844       if not dev.iv_name in self.op.disks:
3845         continue
3846       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3847       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3848                                    oth_node==instance.primary_node):
3849         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3850                                  " to replace disks on this node (%s)" %
3851                                  (oth_node, tgt_node))
3852
3853     # Step: create new storage
3854     self.proc.LogStep(3, steps_total, "allocate new storage")
3855     for dev in instance.disks:
3856       if not dev.iv_name in self.op.disks:
3857         continue
3858       size = dev.size
3859       cfg.SetDiskID(dev, tgt_node)
3860       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3861       names = _GenerateUniqueNames(cfg, lv_names)
3862       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3863                              logical_id=(vgname, names[0]))
3864       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3865                              logical_id=(vgname, names[1]))
3866       new_lvs = [lv_data, lv_meta]
3867       old_lvs = dev.children
3868       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3869       info("creating new local storage on %s for %s" %
3870            (tgt_node, dev.iv_name))
3871       # since we *always* want to create this LV, we use the
3872       # _Create...OnPrimary (which forces the creation), even if we
3873       # are talking about the secondary node
3874       for new_lv in new_lvs:
3875         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3876                                         _GetInstanceInfoText(instance)):
3877           raise errors.OpExecError("Failed to create new LV named '%s' on"
3878                                    " node '%s'" %
3879                                    (new_lv.logical_id[1], tgt_node))
3880
3881     # Step: for each lv, detach+rename*2+attach
3882     self.proc.LogStep(4, steps_total, "change drbd configuration")
3883     for dev, old_lvs, new_lvs in iv_names.itervalues():
3884       info("detaching %s drbd from local storage" % dev.iv_name)
3885       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3886         raise errors.OpExecError("Can't detach drbd from local storage on node"
3887                                  " %s for device %s" % (tgt_node, dev.iv_name))
3888       #dev.children = []
3889       #cfg.Update(instance)
3890
3891       # ok, we created the new LVs, so now we know we have the needed
3892       # storage; as such, we proceed on the target node to rename
3893       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3894       # using the assumption that logical_id == physical_id (which in
3895       # turn is the unique_id on that node)
3896
3897       # FIXME(iustin): use a better name for the replaced LVs
3898       temp_suffix = int(time.time())
3899       ren_fn = lambda d, suff: (d.physical_id[0],
3900                                 d.physical_id[1] + "_replaced-%s" % suff)
3901       # build the rename list based on what LVs exist on the node
3902       rlist = []
3903       for to_ren in old_lvs:
3904         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3905         if find_res is not None: # device exists
3906           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3907
3908       info("renaming the old LVs on the target node")
3909       if not rpc.call_blockdev_rename(tgt_node, rlist):
3910         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3911       # now we rename the new LVs to the old LVs
3912       info("renaming the new LVs on the target node")
3913       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3914       if not rpc.call_blockdev_rename(tgt_node, rlist):
3915         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3916
3917       for old, new in zip(old_lvs, new_lvs):
3918         new.logical_id = old.logical_id
3919         cfg.SetDiskID(new, tgt_node)
3920
3921       for disk in old_lvs:
3922         disk.logical_id = ren_fn(disk, temp_suffix)
3923         cfg.SetDiskID(disk, tgt_node)
3924
3925       # now that the new lvs have the old name, we can add them to the device
3926       info("adding new mirror component on %s" % tgt_node)
3927       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3928         for new_lv in new_lvs:
3929           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3930             warning("Can't rollback device %s", hint="manually cleanup unused"
3931                     " logical volumes")
3932         raise errors.OpExecError("Can't add local storage to drbd")
3933
3934       dev.children = new_lvs
3935       cfg.Update(instance)
3936
3937     # Step: wait for sync
3938
3939     # this can fail as the old devices are degraded and _WaitForSync
3940     # does a combined result over all disks, so we don't check its
3941     # return value
3942     self.proc.LogStep(5, steps_total, "sync devices")
3943     _WaitForSync(cfg, instance, self.proc, unlock=True)
3944
3945     # so check manually all the devices
3946     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3947       cfg.SetDiskID(dev, instance.primary_node)
3948       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3949       if is_degr:
3950         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3951
3952     # Step: remove old storage
3953     self.proc.LogStep(6, steps_total, "removing old storage")
3954     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3955       info("remove logical volumes for %s" % name)
3956       for lv in old_lvs:
3957         cfg.SetDiskID(lv, tgt_node)
3958         if not rpc.call_blockdev_remove(tgt_node, lv):
3959           warning("Can't remove old LV", hint="manually remove unused LVs")
3960           continue
3961
3962   def _ExecD8Secondary(self, feedback_fn):
3963     """Replace the secondary node for drbd8.
3964
3965     The algorithm for replace is quite complicated:
3966       - for all disks of the instance:
3967         - create new LVs on the new node with same names
3968         - shutdown the drbd device on the old secondary
3969         - disconnect the drbd network on the primary
3970         - create the drbd device on the new secondary
3971         - network attach the drbd on the primary, using an artifice:
3972           the drbd code for Attach() will connect to the network if it
3973           finds a device which is connected to the good local disks but
3974           not network enabled
3975       - wait for sync across all devices
3976       - remove all disks from the old secondary
3977
3978     Failures are not very well handled.
3979
3980     """
3981     steps_total = 6
3982     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3983     instance = self.instance
3984     iv_names = {}
3985     vgname = self.cfg.GetVGName()
3986     # start of work
3987     cfg = self.cfg
3988     old_node = self.tgt_node
3989     new_node = self.new_node
3990     pri_node = instance.primary_node
3991
3992     # Step: check device activation
3993     self.proc.LogStep(1, steps_total, "check device existence")
3994     info("checking volume groups")
3995     my_vg = cfg.GetVGName()
3996     results = rpc.call_vg_list([pri_node, new_node])
3997     if not results:
3998       raise errors.OpExecError("Can't list volume groups on the nodes")
3999     for node in pri_node, new_node:
4000       res = results.get(node, False)
4001       if not res or my_vg not in res:
4002         raise errors.OpExecError("Volume group '%s' not found on %s" %
4003                                  (my_vg, node))
4004     for dev in instance.disks:
4005       if not dev.iv_name in self.op.disks:
4006         continue
4007       info("checking %s on %s" % (dev.iv_name, pri_node))
4008       cfg.SetDiskID(dev, pri_node)
4009       if not rpc.call_blockdev_find(pri_node, dev):
4010         raise errors.OpExecError("Can't find device %s on node %s" %
4011                                  (dev.iv_name, pri_node))
4012
4013     # Step: check other node consistency
4014     self.proc.LogStep(2, steps_total, "check peer consistency")
4015     for dev in instance.disks:
4016       if not dev.iv_name in self.op.disks:
4017         continue
4018       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4019       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4020         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4021                                  " unsafe to replace the secondary" %
4022                                  pri_node)
4023
4024     # Step: create new storage
4025     self.proc.LogStep(3, steps_total, "allocate new storage")
4026     for dev in instance.disks:
4027       size = dev.size
4028       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4029       # since we *always* want to create this LV, we use the
4030       # _Create...OnPrimary (which forces the creation), even if we
4031       # are talking about the secondary node
4032       for new_lv in dev.children:
4033         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4034                                         _GetInstanceInfoText(instance)):
4035           raise errors.OpExecError("Failed to create new LV named '%s' on"
4036                                    " node '%s'" %
4037                                    (new_lv.logical_id[1], new_node))
4038
4039
4040     # Step 4: dbrd minors and drbd setups changes
4041     # after this, we must manually remove the drbd minors on both the
4042     # error and the success paths
4043     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4044                                    instance.name)
4045     logging.debug("Allocated minors %s" % (minors,))
4046     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4047     for dev, new_minor in zip(instance.disks, minors):
4048       size = dev.size
4049       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4050       # create new devices on new_node
4051       if pri_node == dev.logical_id[0]:
4052         new_logical_id = (pri_node, new_node,
4053                           dev.logical_id[2], dev.logical_id[3], new_minor)
4054       else:
4055         new_logical_id = (new_node, pri_node,
4056                           dev.logical_id[2], new_minor, dev.logical_id[4])
4057       iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4058       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4059                     new_logical_id)
4060       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4061                               logical_id=new_logical_id,
4062                               children=dev.children)
4063       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4064                                         new_drbd, False,
4065                                       _GetInstanceInfoText(instance)):
4066         self.cfg.ReleaseDRBDMinors(instance.name)
4067         raise errors.OpExecError("Failed to create new DRBD on"
4068                                  " node '%s'" % new_node)
4069
4070     for dev in instance.disks:
4071       # we have new devices, shutdown the drbd on the old secondary
4072       info("shutting down drbd for %s on old node" % dev.iv_name)
4073       cfg.SetDiskID(dev, old_node)
4074       if not rpc.call_blockdev_shutdown(old_node, dev):
4075         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4076                 hint="Please cleanup this device manually as soon as possible")
4077
4078     info("detaching primary drbds from the network (=> standalone)")
4079     done = 0
4080     for dev in instance.disks:
4081       cfg.SetDiskID(dev, pri_node)
4082       # set the physical (unique in bdev terms) id to None, meaning
4083       # detach from network
4084       dev.physical_id = (None, None, None, None, dev.physical_id[4])
4085       # and 'find' the device, which will 'fix' it to match the
4086       # standalone state
4087       if rpc.call_blockdev_find(pri_node, dev):
4088         done += 1
4089       else:
4090         warning("Failed to detach drbd %s from network, unusual case" %
4091                 dev.iv_name)
4092
4093     if not done:
4094       # no detaches succeeded (very unlikely)
4095       self.cfg.ReleaseDRBDMinors(instance.name)
4096       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4097
4098     # if we managed to detach at least one, we update all the disks of
4099     # the instance to point to the new secondary
4100     info("updating instance configuration")
4101     for dev, _, new_logical_id in iv_names.itervalues():
4102       dev.logical_id = new_logical_id
4103       cfg.SetDiskID(dev, pri_node)
4104     cfg.Update(instance)
4105     # we can remove now the temp minors as now the new values are
4106     # written to the config file (and therefore stable)
4107     self.cfg.ReleaseDRBDMinors(instance.name)
4108
4109     # and now perform the drbd attach
4110     info("attaching primary drbds to new secondary (standalone => connected)")
4111     failures = []
4112     for dev in instance.disks:
4113       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4114       # since the attach is smart, it's enough to 'find' the device,
4115       # it will automatically activate the network, if the physical_id
4116       # is correct
4117       cfg.SetDiskID(dev, pri_node)
4118       logging.debug("Disk to attach: %s", dev)
4119       if not rpc.call_blockdev_find(pri_node, dev):
4120         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4121                 "please do a gnt-instance info to see the status of disks")
4122
4123     # this can fail as the old devices are degraded and _WaitForSync
4124     # does a combined result over all disks, so we don't check its
4125     # return value
4126     self.proc.LogStep(5, steps_total, "sync devices")
4127     _WaitForSync(cfg, instance, self.proc, unlock=True)
4128
4129     # so check manually all the devices
4130     for name, (dev, old_lvs, _) in iv_names.iteritems():
4131       cfg.SetDiskID(dev, pri_node)
4132       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4133       if is_degr:
4134         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4135
4136     self.proc.LogStep(6, steps_total, "removing old storage")
4137     for name, (dev, old_lvs, _) in iv_names.iteritems():
4138       info("remove logical volumes for %s" % name)
4139       for lv in old_lvs:
4140         cfg.SetDiskID(lv, old_node)
4141         if not rpc.call_blockdev_remove(old_node, lv):
4142           warning("Can't remove LV on old secondary",
4143                   hint="Cleanup stale volumes by hand")
4144
4145   def Exec(self, feedback_fn):
4146     """Execute disk replacement.
4147
4148     This dispatches the disk replacement to the appropriate handler.
4149
4150     """
4151     instance = self.instance
4152
4153     # Activate the instance disks if we're replacing them on a down instance
4154     if instance.status == "down":
4155       _StartInstanceDisks(self.cfg, instance, True)
4156
4157     if instance.disk_template == constants.DT_DRBD8:
4158       if self.op.remote_node is None:
4159         fn = self._ExecD8DiskOnly
4160       else:
4161         fn = self._ExecD8Secondary
4162     else:
4163       raise errors.ProgrammerError("Unhandled disk replacement case")
4164
4165     ret = fn(feedback_fn)
4166
4167     # Deactivate the instance disks if we're replacing them on a down instance
4168     if instance.status == "down":
4169       _SafeShutdownInstanceDisks(instance, self.cfg)
4170
4171     return ret
4172
4173
4174 class LUGrowDisk(LogicalUnit):
4175   """Grow a disk of an instance.
4176
4177   """
4178   HPATH = "disk-grow"
4179   HTYPE = constants.HTYPE_INSTANCE
4180   _OP_REQP = ["instance_name", "disk", "amount"]
4181   REQ_BGL = False
4182
4183   def ExpandNames(self):
4184     self._ExpandAndLockInstance()
4185     self.needed_locks[locking.LEVEL_NODE] = []
4186     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4187
4188   def DeclareLocks(self, level):
4189     if level == locking.LEVEL_NODE:
4190       self._LockInstancesNodes()
4191
4192   def BuildHooksEnv(self):
4193     """Build hooks env.
4194
4195     This runs on the master, the primary and all the secondaries.
4196
4197     """
4198     env = {
4199       "DISK": self.op.disk,
4200       "AMOUNT": self.op.amount,
4201       }
4202     env.update(_BuildInstanceHookEnvByObject(self.instance))
4203     nl = [
4204       self.sstore.GetMasterNode(),
4205       self.instance.primary_node,
4206       ]
4207     return env, nl, nl
4208
4209   def CheckPrereq(self):
4210     """Check prerequisites.
4211
4212     This checks that the instance is in the cluster.
4213
4214     """
4215     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4216     assert instance is not None, \
4217       "Cannot retrieve locked instance %s" % self.op.instance_name
4218
4219     self.instance = instance
4220
4221     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4222       raise errors.OpPrereqError("Instance's disk layout does not support"
4223                                  " growing.")
4224
4225     if instance.FindDisk(self.op.disk) is None:
4226       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4227                                  (self.op.disk, instance.name))
4228
4229     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4230     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4231     for node in nodenames:
4232       info = nodeinfo.get(node, None)
4233       if not info:
4234         raise errors.OpPrereqError("Cannot get current information"
4235                                    " from node '%s'" % node)
4236       vg_free = info.get('vg_free', None)
4237       if not isinstance(vg_free, int):
4238         raise errors.OpPrereqError("Can't compute free disk space on"
4239                                    " node %s" % node)
4240       if self.op.amount > info['vg_free']:
4241         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4242                                    " %d MiB available, %d MiB required" %
4243                                    (node, info['vg_free'], self.op.amount))
4244
4245   def Exec(self, feedback_fn):
4246     """Execute disk grow.
4247
4248     """
4249     instance = self.instance
4250     disk = instance.FindDisk(self.op.disk)
4251     for node in (instance.secondary_nodes + (instance.primary_node,)):
4252       self.cfg.SetDiskID(disk, node)
4253       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4254       if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4255         raise errors.OpExecError("grow request failed to node %s" % node)
4256       elif not result[0]:
4257         raise errors.OpExecError("grow request failed to node %s: %s" %
4258                                  (node, result[1]))
4259     disk.RecordGrow(self.op.amount)
4260     self.cfg.Update(instance)
4261     return
4262
4263
4264 class LUQueryInstanceData(NoHooksLU):
4265   """Query runtime instance data.
4266
4267   """
4268   _OP_REQP = ["instances"]
4269   REQ_BGL = False
4270   def ExpandNames(self):
4271     self.needed_locks = {}
4272     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4273
4274     if not isinstance(self.op.instances, list):
4275       raise errors.OpPrereqError("Invalid argument type 'instances'")
4276
4277     if self.op.instances:
4278       self.wanted_names = []
4279       for name in self.op.instances:
4280         full_name = self.cfg.ExpandInstanceName(name)
4281         if full_name is None:
4282           raise errors.OpPrereqError("Instance '%s' not known" %
4283                                      self.op.instance_name)
4284         self.wanted_names.append(full_name)
4285       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4286     else:
4287       self.wanted_names = None
4288       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4289
4290     self.needed_locks[locking.LEVEL_NODE] = []
4291     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4292
4293   def DeclareLocks(self, level):
4294     if level == locking.LEVEL_NODE:
4295       self._LockInstancesNodes()
4296
4297   def CheckPrereq(self):
4298     """Check prerequisites.
4299
4300     This only checks the optional instance list against the existing names.
4301
4302     """
4303     if self.wanted_names is None:
4304       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4305
4306     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4307                              in self.wanted_names]
4308     return
4309
4310   def _ComputeDiskStatus(self, instance, snode, dev):
4311     """Compute block device status.
4312
4313     """
4314     self.cfg.SetDiskID(dev, instance.primary_node)
4315     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4316     if dev.dev_type in constants.LDS_DRBD:
4317       # we change the snode then (otherwise we use the one passed in)
4318       if dev.logical_id[0] == instance.primary_node:
4319         snode = dev.logical_id[1]
4320       else:
4321         snode = dev.logical_id[0]
4322
4323     if snode:
4324       self.cfg.SetDiskID(dev, snode)
4325       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4326     else:
4327       dev_sstatus = None
4328
4329     if dev.children:
4330       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4331                       for child in dev.children]
4332     else:
4333       dev_children = []
4334
4335     data = {
4336       "iv_name": dev.iv_name,
4337       "dev_type": dev.dev_type,
4338       "logical_id": dev.logical_id,
4339       "physical_id": dev.physical_id,
4340       "pstatus": dev_pstatus,
4341       "sstatus": dev_sstatus,
4342       "children": dev_children,
4343       }
4344
4345     return data
4346
4347   def Exec(self, feedback_fn):
4348     """Gather and return data"""
4349     result = {}
4350     for instance in self.wanted_instances:
4351       remote_info = rpc.call_instance_info(instance.primary_node,
4352                                                 instance.name)
4353       if remote_info and "state" in remote_info:
4354         remote_state = "up"
4355       else:
4356         remote_state = "down"
4357       if instance.status == "down":
4358         config_state = "down"
4359       else:
4360         config_state = "up"
4361
4362       disks = [self._ComputeDiskStatus(instance, None, device)
4363                for device in instance.disks]
4364
4365       idict = {
4366         "name": instance.name,
4367         "config_state": config_state,
4368         "run_state": remote_state,
4369         "pnode": instance.primary_node,
4370         "snodes": instance.secondary_nodes,
4371         "os": instance.os,
4372         "memory": instance.memory,
4373         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4374         "disks": disks,
4375         "vcpus": instance.vcpus,
4376         }
4377
4378       htkind = self.sstore.GetHypervisorType()
4379       if htkind == constants.HT_XEN_PVM30:
4380         idict["kernel_path"] = instance.kernel_path
4381         idict["initrd_path"] = instance.initrd_path
4382
4383       if htkind == constants.HT_XEN_HVM31:
4384         idict["hvm_boot_order"] = instance.hvm_boot_order
4385         idict["hvm_acpi"] = instance.hvm_acpi
4386         idict["hvm_pae"] = instance.hvm_pae
4387         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4388         idict["hvm_nic_type"] = instance.hvm_nic_type
4389         idict["hvm_disk_type"] = instance.hvm_disk_type
4390
4391       if htkind in constants.HTS_REQ_PORT:
4392         if instance.vnc_bind_address is None:
4393           vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4394         else:
4395           vnc_bind_address = instance.vnc_bind_address
4396         if instance.network_port is None:
4397           vnc_console_port = None
4398         elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4399           vnc_console_port = "%s:%s" % (instance.primary_node,
4400                                        instance.network_port)
4401         elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4402           vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4403                                                    instance.network_port,
4404                                                    instance.primary_node)
4405         else:
4406           vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4407                                         instance.network_port)
4408         idict["vnc_console_port"] = vnc_console_port
4409         idict["vnc_bind_address"] = vnc_bind_address
4410         idict["network_port"] = instance.network_port
4411
4412       result[instance.name] = idict
4413
4414     return result
4415
4416
4417 class LUSetInstanceParams(LogicalUnit):
4418   """Modifies an instances's parameters.
4419
4420   """
4421   HPATH = "instance-modify"
4422   HTYPE = constants.HTYPE_INSTANCE
4423   _OP_REQP = ["instance_name"]
4424   REQ_BGL = False
4425
4426   def ExpandNames(self):
4427     self._ExpandAndLockInstance()
4428
4429   def BuildHooksEnv(self):
4430     """Build hooks env.
4431
4432     This runs on the master, primary and secondaries.
4433
4434     """
4435     args = dict()
4436     if self.mem:
4437       args['memory'] = self.mem
4438     if self.vcpus:
4439       args['vcpus'] = self.vcpus
4440     if self.do_ip or self.do_bridge or self.mac:
4441       if self.do_ip:
4442         ip = self.ip
4443       else:
4444         ip = self.instance.nics[0].ip
4445       if self.bridge:
4446         bridge = self.bridge
4447       else:
4448         bridge = self.instance.nics[0].bridge
4449       if self.mac:
4450         mac = self.mac
4451       else:
4452         mac = self.instance.nics[0].mac
4453       args['nics'] = [(ip, bridge, mac)]
4454     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4455     nl = [self.sstore.GetMasterNode(),
4456           self.instance.primary_node] + list(self.instance.secondary_nodes)
4457     return env, nl, nl
4458
4459   def CheckPrereq(self):
4460     """Check prerequisites.
4461
4462     This only checks the instance list against the existing names.
4463
4464     """
4465     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4466     # a separate CheckArguments function, if we implement one, so the operation
4467     # can be aborted without waiting for any lock, should it have an error...
4468     self.mem = getattr(self.op, "mem", None)
4469     self.vcpus = getattr(self.op, "vcpus", None)
4470     self.ip = getattr(self.op, "ip", None)
4471     self.mac = getattr(self.op, "mac", None)
4472     self.bridge = getattr(self.op, "bridge", None)
4473     self.kernel_path = getattr(self.op, "kernel_path", None)
4474     self.initrd_path = getattr(self.op, "initrd_path", None)
4475     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4476     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4477     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4478     self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4479     self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4480     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4481     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4482     self.force = getattr(self.op, "force", None)
4483     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4484                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4485                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4486                  self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4487     if all_parms.count(None) == len(all_parms):
4488       raise errors.OpPrereqError("No changes submitted")
4489     if self.mem is not None:
4490       try:
4491         self.mem = int(self.mem)
4492       except ValueError, err:
4493         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4494     if self.vcpus is not None:
4495       try:
4496         self.vcpus = int(self.vcpus)
4497       except ValueError, err:
4498         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4499     if self.ip is not None:
4500       self.do_ip = True
4501       if self.ip.lower() == "none":
4502         self.ip = None
4503       else:
4504         if not utils.IsValidIP(self.ip):
4505           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4506     else:
4507       self.do_ip = False
4508     self.do_bridge = (self.bridge is not None)
4509     if self.mac is not None:
4510       if self.cfg.IsMacInUse(self.mac):
4511         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4512                                    self.mac)
4513       if not utils.IsValidMac(self.mac):
4514         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4515
4516     if self.kernel_path is not None:
4517       self.do_kernel_path = True
4518       if self.kernel_path == constants.VALUE_NONE:
4519         raise errors.OpPrereqError("Can't set instance to no kernel")
4520
4521       if self.kernel_path != constants.VALUE_DEFAULT:
4522         if not os.path.isabs(self.kernel_path):
4523           raise errors.OpPrereqError("The kernel path must be an absolute"
4524                                     " filename")
4525     else:
4526       self.do_kernel_path = False
4527
4528     if self.initrd_path is not None:
4529       self.do_initrd_path = True
4530       if self.initrd_path not in (constants.VALUE_NONE,
4531                                   constants.VALUE_DEFAULT):
4532         if not os.path.isabs(self.initrd_path):
4533           raise errors.OpPrereqError("The initrd path must be an absolute"
4534                                     " filename")
4535     else:
4536       self.do_initrd_path = False
4537
4538     # boot order verification
4539     if self.hvm_boot_order is not None:
4540       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4541         if len(self.hvm_boot_order.strip("acdn")) != 0:
4542           raise errors.OpPrereqError("invalid boot order specified,"
4543                                      " must be one or more of [acdn]"
4544                                      " or 'default'")
4545
4546     # hvm_cdrom_image_path verification
4547     if self.op.hvm_cdrom_image_path is not None:
4548       if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4549               self.op.hvm_cdrom_image_path.lower() == "none"):
4550         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4551                                    " be an absolute path or None, not %s" %
4552                                    self.op.hvm_cdrom_image_path)
4553       if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4554               self.op.hvm_cdrom_image_path.lower() == "none"):
4555         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4556                                    " regular file or a symlink pointing to"
4557                                    " an existing regular file, not %s" %
4558                                    self.op.hvm_cdrom_image_path)
4559
4560     # vnc_bind_address verification
4561     if self.op.vnc_bind_address is not None:
4562       if not utils.IsValidIP(self.op.vnc_bind_address):
4563         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4564                                    " like a valid IP address" %
4565                                    self.op.vnc_bind_address)
4566
4567     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4568     assert self.instance is not None, \
4569       "Cannot retrieve locked instance %s" % self.op.instance_name
4570     self.warn = []
4571     if self.mem is not None and not self.force:
4572       pnode = self.instance.primary_node
4573       nodelist = [pnode]
4574       nodelist.extend(instance.secondary_nodes)
4575       instance_info = rpc.call_instance_info(pnode, instance.name)
4576       nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4577
4578       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4579         # Assume the primary node is unreachable and go ahead
4580         self.warn.append("Can't get info from primary node %s" % pnode)
4581       else:
4582         if instance_info:
4583           current_mem = instance_info['memory']
4584         else:
4585           # Assume instance not running
4586           # (there is a slight race condition here, but it's not very probable,
4587           # and we have no other way to check)
4588           current_mem = 0
4589         miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4590         if miss_mem > 0:
4591           raise errors.OpPrereqError("This change will prevent the instance"
4592                                      " from starting, due to %d MB of memory"
4593                                      " missing on its primary node" % miss_mem)
4594
4595       for node in instance.secondary_nodes:
4596         if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4597           self.warn.append("Can't get info from secondary node %s" % node)
4598         elif self.mem > nodeinfo[node]['memory_free']:
4599           self.warn.append("Not enough memory to failover instance to secondary"
4600                            " node %s" % node)
4601
4602     # Xen HVM device type checks
4603     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
4604       if self.op.hvm_nic_type is not None:
4605         if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4606           raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4607                                      " HVM  hypervisor" % self.op.hvm_nic_type)
4608       if self.op.hvm_disk_type is not None:
4609         if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4610           raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4611                                      " HVM hypervisor" % self.op.hvm_disk_type)
4612
4613     return
4614
4615   def Exec(self, feedback_fn):
4616     """Modifies an instance.
4617
4618     All parameters take effect only at the next restart of the instance.
4619     """
4620     # Process here the warnings from CheckPrereq, as we don't have a
4621     # feedback_fn there.
4622     for warn in self.warn:
4623       feedback_fn("WARNING: %s" % warn)
4624
4625     result = []
4626     instance = self.instance
4627     if self.mem:
4628       instance.memory = self.mem
4629       result.append(("mem", self.mem))
4630     if self.vcpus:
4631       instance.vcpus = self.vcpus
4632       result.append(("vcpus",  self.vcpus))
4633     if self.do_ip:
4634       instance.nics[0].ip = self.ip
4635       result.append(("ip", self.ip))
4636     if self.bridge:
4637       instance.nics[0].bridge = self.bridge
4638       result.append(("bridge", self.bridge))
4639     if self.mac:
4640       instance.nics[0].mac = self.mac
4641       result.append(("mac", self.mac))
4642     if self.do_kernel_path:
4643       instance.kernel_path = self.kernel_path
4644       result.append(("kernel_path", self.kernel_path))
4645     if self.do_initrd_path:
4646       instance.initrd_path = self.initrd_path
4647       result.append(("initrd_path", self.initrd_path))
4648     if self.hvm_boot_order:
4649       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4650         instance.hvm_boot_order = None
4651       else:
4652         instance.hvm_boot_order = self.hvm_boot_order
4653       result.append(("hvm_boot_order", self.hvm_boot_order))
4654     if self.hvm_acpi is not None:
4655       instance.hvm_acpi = self.hvm_acpi
4656       result.append(("hvm_acpi", self.hvm_acpi))
4657     if self.hvm_pae is not None:
4658       instance.hvm_pae = self.hvm_pae
4659       result.append(("hvm_pae", self.hvm_pae))
4660     if self.hvm_nic_type is not None:
4661       instance.hvm_nic_type = self.hvm_nic_type
4662       result.append(("hvm_nic_type", self.hvm_nic_type))
4663     if self.hvm_disk_type is not None:
4664       instance.hvm_disk_type = self.hvm_disk_type
4665       result.append(("hvm_disk_type", self.hvm_disk_type))
4666     if self.hvm_cdrom_image_path:
4667       if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4668         instance.hvm_cdrom_image_path = None
4669       else:
4670         instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4671       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4672     if self.vnc_bind_address:
4673       instance.vnc_bind_address = self.vnc_bind_address
4674       result.append(("vnc_bind_address", self.vnc_bind_address))
4675
4676     self.cfg.Update(instance)
4677
4678     return result
4679
4680
4681 class LUQueryExports(NoHooksLU):
4682   """Query the exports list
4683
4684   """
4685   _OP_REQP = ['nodes']
4686   REQ_BGL = False
4687
4688   def ExpandNames(self):
4689     self.needed_locks = {}
4690     self.share_locks[locking.LEVEL_NODE] = 1
4691     if not self.op.nodes:
4692       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4693     else:
4694       self.needed_locks[locking.LEVEL_NODE] = \
4695         _GetWantedNodes(self, self.op.nodes)
4696
4697   def CheckPrereq(self):
4698     """Check prerequisites.
4699
4700     """
4701     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4702
4703   def Exec(self, feedback_fn):
4704     """Compute the list of all the exported system images.
4705
4706     Returns:
4707       a dictionary with the structure node->(export-list)
4708       where export-list is a list of the instances exported on
4709       that node.
4710
4711     """
4712     return rpc.call_export_list(self.nodes)
4713
4714
4715 class LUExportInstance(LogicalUnit):
4716   """Export an instance to an image in the cluster.
4717
4718   """
4719   HPATH = "instance-export"
4720   HTYPE = constants.HTYPE_INSTANCE
4721   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4722   REQ_BGL = False
4723
4724   def ExpandNames(self):
4725     self._ExpandAndLockInstance()
4726     # FIXME: lock only instance primary and destination node
4727     #
4728     # Sad but true, for now we have do lock all nodes, as we don't know where
4729     # the previous export might be, and and in this LU we search for it and
4730     # remove it from its current node. In the future we could fix this by:
4731     #  - making a tasklet to search (share-lock all), then create the new one,
4732     #    then one to remove, after
4733     #  - removing the removal operation altoghether
4734     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4735
4736   def DeclareLocks(self, level):
4737     """Last minute lock declaration."""
4738     # All nodes are locked anyway, so nothing to do here.
4739
4740   def BuildHooksEnv(self):
4741     """Build hooks env.
4742
4743     This will run on the master, primary node and target node.
4744
4745     """
4746     env = {
4747       "EXPORT_NODE": self.op.target_node,
4748       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4749       }
4750     env.update(_BuildInstanceHookEnvByObject(self.instance))
4751     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4752           self.op.target_node]
4753     return env, nl, nl
4754
4755   def CheckPrereq(self):
4756     """Check prerequisites.
4757
4758     This checks that the instance and node names are valid.
4759
4760     """
4761     instance_name = self.op.instance_name
4762     self.instance = self.cfg.GetInstanceInfo(instance_name)
4763     assert self.instance is not None, \
4764           "Cannot retrieve locked instance %s" % self.op.instance_name
4765
4766     self.dst_node = self.cfg.GetNodeInfo(
4767       self.cfg.ExpandNodeName(self.op.target_node))
4768
4769     assert self.dst_node is not None, \
4770           "Cannot retrieve locked node %s" % self.op.target_node
4771
4772     # instance disk type verification
4773     for disk in self.instance.disks:
4774       if disk.dev_type == constants.LD_FILE:
4775         raise errors.OpPrereqError("Export not supported for instances with"
4776                                    " file-based disks")
4777
4778   def Exec(self, feedback_fn):
4779     """Export an instance to an image in the cluster.
4780
4781     """
4782     instance = self.instance
4783     dst_node = self.dst_node
4784     src_node = instance.primary_node
4785     if self.op.shutdown:
4786       # shutdown the instance, but not the disks
4787       if not rpc.call_instance_shutdown(src_node, instance):
4788         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4789                                  (instance.name, src_node))
4790
4791     vgname = self.cfg.GetVGName()
4792
4793     snap_disks = []
4794
4795     try:
4796       for disk in instance.disks:
4797         if disk.iv_name == "sda":
4798           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4799           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4800
4801           if not new_dev_name:
4802             logger.Error("could not snapshot block device %s on node %s" %
4803                          (disk.logical_id[1], src_node))
4804           else:
4805             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4806                                       logical_id=(vgname, new_dev_name),
4807                                       physical_id=(vgname, new_dev_name),
4808                                       iv_name=disk.iv_name)
4809             snap_disks.append(new_dev)
4810
4811     finally:
4812       if self.op.shutdown and instance.status == "up":
4813         if not rpc.call_instance_start(src_node, instance, None):
4814           _ShutdownInstanceDisks(instance, self.cfg)
4815           raise errors.OpExecError("Could not start instance")
4816
4817     # TODO: check for size
4818
4819     for dev in snap_disks:
4820       if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4821         logger.Error("could not export block device %s from node %s to node %s"
4822                      % (dev.logical_id[1], src_node, dst_node.name))
4823       if not rpc.call_blockdev_remove(src_node, dev):
4824         logger.Error("could not remove snapshot block device %s from node %s" %
4825                      (dev.logical_id[1], src_node))
4826
4827     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4828       logger.Error("could not finalize export for instance %s on node %s" %
4829                    (instance.name, dst_node.name))
4830
4831     nodelist = self.cfg.GetNodeList()
4832     nodelist.remove(dst_node.name)
4833
4834     # on one-node clusters nodelist will be empty after the removal
4835     # if we proceed the backup would be removed because OpQueryExports
4836     # substitutes an empty list with the full cluster node list.
4837     if nodelist:
4838       exportlist = rpc.call_export_list(nodelist)
4839       for node in exportlist:
4840         if instance.name in exportlist[node]:
4841           if not rpc.call_export_remove(node, instance.name):
4842             logger.Error("could not remove older export for instance %s"
4843                          " on node %s" % (instance.name, node))
4844
4845
4846 class LURemoveExport(NoHooksLU):
4847   """Remove exports related to the named instance.
4848
4849   """
4850   _OP_REQP = ["instance_name"]
4851   REQ_BGL = False
4852
4853   def ExpandNames(self):
4854     self.needed_locks = {}
4855     # We need all nodes to be locked in order for RemoveExport to work, but we
4856     # don't need to lock the instance itself, as nothing will happen to it (and
4857     # we can remove exports also for a removed instance)
4858     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4859
4860   def CheckPrereq(self):
4861     """Check prerequisites.
4862     """
4863     pass
4864
4865   def Exec(self, feedback_fn):
4866     """Remove any export.
4867
4868     """
4869     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4870     # If the instance was not found we'll try with the name that was passed in.
4871     # This will only work if it was an FQDN, though.
4872     fqdn_warn = False
4873     if not instance_name:
4874       fqdn_warn = True
4875       instance_name = self.op.instance_name
4876
4877     exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE])
4878     found = False
4879     for node in exportlist:
4880       if instance_name in exportlist[node]:
4881         found = True
4882         if not rpc.call_export_remove(node, instance_name):
4883           logger.Error("could not remove export for instance %s"
4884                        " on node %s" % (instance_name, node))
4885
4886     if fqdn_warn and not found:
4887       feedback_fn("Export not found. If trying to remove an export belonging"
4888                   " to a deleted instance please use its Fully Qualified"
4889                   " Domain Name.")
4890
4891
4892 class TagsLU(NoHooksLU):
4893   """Generic tags LU.
4894
4895   This is an abstract class which is the parent of all the other tags LUs.
4896
4897   """
4898
4899   def ExpandNames(self):
4900     self.needed_locks = {}
4901     if self.op.kind == constants.TAG_NODE:
4902       name = self.cfg.ExpandNodeName(self.op.name)
4903       if name is None:
4904         raise errors.OpPrereqError("Invalid node name (%s)" %
4905                                    (self.op.name,))
4906       self.op.name = name
4907       self.needed_locks[locking.LEVEL_NODE] = name
4908     elif self.op.kind == constants.TAG_INSTANCE:
4909       name = self.cfg.ExpandInstanceName(self.op.name)
4910       if name is None:
4911         raise errors.OpPrereqError("Invalid instance name (%s)" %
4912                                    (self.op.name,))
4913       self.op.name = name
4914       self.needed_locks[locking.LEVEL_INSTANCE] = name
4915
4916   def CheckPrereq(self):
4917     """Check prerequisites.
4918
4919     """
4920     if self.op.kind == constants.TAG_CLUSTER:
4921       self.target = self.cfg.GetClusterInfo()
4922     elif self.op.kind == constants.TAG_NODE:
4923       self.target = self.cfg.GetNodeInfo(self.op.name)
4924     elif self.op.kind == constants.TAG_INSTANCE:
4925       self.target = self.cfg.GetInstanceInfo(self.op.name)
4926     else:
4927       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4928                                  str(self.op.kind))
4929
4930
4931 class LUGetTags(TagsLU):
4932   """Returns the tags of a given object.
4933
4934   """
4935   _OP_REQP = ["kind", "name"]
4936   REQ_BGL = False
4937
4938   def Exec(self, feedback_fn):
4939     """Returns the tag list.
4940
4941     """
4942     return list(self.target.GetTags())
4943
4944
4945 class LUSearchTags(NoHooksLU):
4946   """Searches the tags for a given pattern.
4947
4948   """
4949   _OP_REQP = ["pattern"]
4950   REQ_BGL = False
4951
4952   def ExpandNames(self):
4953     self.needed_locks = {}
4954
4955   def CheckPrereq(self):
4956     """Check prerequisites.
4957
4958     This checks the pattern passed for validity by compiling it.
4959
4960     """
4961     try:
4962       self.re = re.compile(self.op.pattern)
4963     except re.error, err:
4964       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4965                                  (self.op.pattern, err))
4966
4967   def Exec(self, feedback_fn):
4968     """Returns the tag list.
4969
4970     """
4971     cfg = self.cfg
4972     tgts = [("/cluster", cfg.GetClusterInfo())]
4973     ilist = cfg.GetAllInstancesInfo().values()
4974     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4975     nlist = cfg.GetAllNodesInfo().values()
4976     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4977     results = []
4978     for path, target in tgts:
4979       for tag in target.GetTags():
4980         if self.re.search(tag):
4981           results.append((path, tag))
4982     return results
4983
4984
4985 class LUAddTags(TagsLU):
4986   """Sets a tag on a given object.
4987
4988   """
4989   _OP_REQP = ["kind", "name", "tags"]
4990   REQ_BGL = False
4991
4992   def CheckPrereq(self):
4993     """Check prerequisites.
4994
4995     This checks the type and length of the tag name and value.
4996
4997     """
4998     TagsLU.CheckPrereq(self)
4999     for tag in self.op.tags:
5000       objects.TaggableObject.ValidateTag(tag)
5001
5002   def Exec(self, feedback_fn):
5003     """Sets the tag.
5004
5005     """
5006     try:
5007       for tag in self.op.tags:
5008         self.target.AddTag(tag)
5009     except errors.TagError, err:
5010       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5011     try:
5012       self.cfg.Update(self.target)
5013     except errors.ConfigurationError:
5014       raise errors.OpRetryError("There has been a modification to the"
5015                                 " config file and the operation has been"
5016                                 " aborted. Please retry.")
5017
5018
5019 class LUDelTags(TagsLU):
5020   """Delete a list of tags from a given object.
5021
5022   """
5023   _OP_REQP = ["kind", "name", "tags"]
5024   REQ_BGL = False
5025
5026   def CheckPrereq(self):
5027     """Check prerequisites.
5028
5029     This checks that we have the given tag.
5030
5031     """
5032     TagsLU.CheckPrereq(self)
5033     for tag in self.op.tags:
5034       objects.TaggableObject.ValidateTag(tag)
5035     del_tags = frozenset(self.op.tags)
5036     cur_tags = self.target.GetTags()
5037     if not del_tags <= cur_tags:
5038       diff_tags = del_tags - cur_tags
5039       diff_names = ["'%s'" % tag for tag in diff_tags]
5040       diff_names.sort()
5041       raise errors.OpPrereqError("Tag(s) %s not found" %
5042                                  (",".join(diff_names)))
5043
5044   def Exec(self, feedback_fn):
5045     """Remove the tag from the object.
5046
5047     """
5048     for tag in self.op.tags:
5049       self.target.RemoveTag(tag)
5050     try:
5051       self.cfg.Update(self.target)
5052     except errors.ConfigurationError:
5053       raise errors.OpRetryError("There has been a modification to the"
5054                                 " config file and the operation has been"
5055                                 " aborted. Please retry.")
5056
5057
5058 class LUTestDelay(NoHooksLU):
5059   """Sleep for a specified amount of time.
5060
5061   This LU sleeps on the master and/or nodes for a specified amount of
5062   time.
5063
5064   """
5065   _OP_REQP = ["duration", "on_master", "on_nodes"]
5066   REQ_BGL = False
5067
5068   def ExpandNames(self):
5069     """Expand names and set required locks.
5070
5071     This expands the node list, if any.
5072
5073     """
5074     self.needed_locks = {}
5075     if self.op.on_nodes:
5076       # _GetWantedNodes can be used here, but is not always appropriate to use
5077       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5078       # more information.
5079       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5080       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5081
5082   def CheckPrereq(self):
5083     """Check prerequisites.
5084
5085     """
5086
5087   def Exec(self, feedback_fn):
5088     """Do the actual sleep.
5089
5090     """
5091     if self.op.on_master:
5092       if not utils.TestDelay(self.op.duration):
5093         raise errors.OpExecError("Error during master delay test")
5094     if self.op.on_nodes:
5095       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5096       if not result:
5097         raise errors.OpExecError("Complete failure from rpc call")
5098       for node, node_result in result.items():
5099         if not node_result:
5100           raise errors.OpExecError("Failure during rpc call to node %s,"
5101                                    " result: %s" % (node, node_result))
5102
5103
5104 class IAllocator(object):
5105   """IAllocator framework.
5106
5107   An IAllocator instance has three sets of attributes:
5108     - cfg/sstore that are needed to query the cluster
5109     - input data (all members of the _KEYS class attribute are required)
5110     - four buffer attributes (in|out_data|text), that represent the
5111       input (to the external script) in text and data structure format,
5112       and the output from it, again in two formats
5113     - the result variables from the script (success, info, nodes) for
5114       easy usage
5115
5116   """
5117   _ALLO_KEYS = [
5118     "mem_size", "disks", "disk_template",
5119     "os", "tags", "nics", "vcpus",
5120     ]
5121   _RELO_KEYS = [
5122     "relocate_from",
5123     ]
5124
5125   def __init__(self, cfg, sstore, mode, name, **kwargs):
5126     self.cfg = cfg
5127     self.sstore = sstore
5128     # init buffer variables
5129     self.in_text = self.out_text = self.in_data = self.out_data = None
5130     # init all input fields so that pylint is happy
5131     self.mode = mode
5132     self.name = name
5133     self.mem_size = self.disks = self.disk_template = None
5134     self.os = self.tags = self.nics = self.vcpus = None
5135     self.relocate_from = None
5136     # computed fields
5137     self.required_nodes = None
5138     # init result fields
5139     self.success = self.info = self.nodes = None
5140     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5141       keyset = self._ALLO_KEYS
5142     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5143       keyset = self._RELO_KEYS
5144     else:
5145       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5146                                    " IAllocator" % self.mode)
5147     for key in kwargs:
5148       if key not in keyset:
5149         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5150                                      " IAllocator" % key)
5151       setattr(self, key, kwargs[key])
5152     for key in keyset:
5153       if key not in kwargs:
5154         raise errors.ProgrammerError("Missing input parameter '%s' to"
5155                                      " IAllocator" % key)
5156     self._BuildInputData()
5157
5158   def _ComputeClusterData(self):
5159     """Compute the generic allocator input data.
5160
5161     This is the data that is independent of the actual operation.
5162
5163     """
5164     cfg = self.cfg
5165     # cluster data
5166     data = {
5167       "version": 1,
5168       "cluster_name": self.sstore.GetClusterName(),
5169       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5170       "hypervisor_type": self.sstore.GetHypervisorType(),
5171       # we don't have job IDs
5172       }
5173
5174     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5175
5176     # node data
5177     node_results = {}
5178     node_list = cfg.GetNodeList()
5179     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5180     for nname in node_list:
5181       ninfo = cfg.GetNodeInfo(nname)
5182       if nname not in node_data or not isinstance(node_data[nname], dict):
5183         raise errors.OpExecError("Can't get data for node %s" % nname)
5184       remote_info = node_data[nname]
5185       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5186                    'vg_size', 'vg_free', 'cpu_total']:
5187         if attr not in remote_info:
5188           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5189                                    (nname, attr))
5190         try:
5191           remote_info[attr] = int(remote_info[attr])
5192         except ValueError, err:
5193           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5194                                    " %s" % (nname, attr, str(err)))
5195       # compute memory used by primary instances
5196       i_p_mem = i_p_up_mem = 0
5197       for iinfo in i_list:
5198         if iinfo.primary_node == nname:
5199           i_p_mem += iinfo.memory
5200           if iinfo.status == "up":
5201             i_p_up_mem += iinfo.memory
5202
5203       # compute memory used by instances
5204       pnr = {
5205         "tags": list(ninfo.GetTags()),
5206         "total_memory": remote_info['memory_total'],
5207         "reserved_memory": remote_info['memory_dom0'],
5208         "free_memory": remote_info['memory_free'],
5209         "i_pri_memory": i_p_mem,
5210         "i_pri_up_memory": i_p_up_mem,
5211         "total_disk": remote_info['vg_size'],
5212         "free_disk": remote_info['vg_free'],
5213         "primary_ip": ninfo.primary_ip,
5214         "secondary_ip": ninfo.secondary_ip,
5215         "total_cpus": remote_info['cpu_total'],
5216         }
5217       node_results[nname] = pnr
5218     data["nodes"] = node_results
5219
5220     # instance data
5221     instance_data = {}
5222     for iinfo in i_list:
5223       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5224                   for n in iinfo.nics]
5225       pir = {
5226         "tags": list(iinfo.GetTags()),
5227         "should_run": iinfo.status == "up",
5228         "vcpus": iinfo.vcpus,
5229         "memory": iinfo.memory,
5230         "os": iinfo.os,
5231         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5232         "nics": nic_data,
5233         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5234         "disk_template": iinfo.disk_template,
5235         }
5236       instance_data[iinfo.name] = pir
5237
5238     data["instances"] = instance_data
5239
5240     self.in_data = data
5241
5242   def _AddNewInstance(self):
5243     """Add new instance data to allocator structure.
5244
5245     This in combination with _AllocatorGetClusterData will create the
5246     correct structure needed as input for the allocator.
5247
5248     The checks for the completeness of the opcode must have already been
5249     done.
5250
5251     """
5252     data = self.in_data
5253     if len(self.disks) != 2:
5254       raise errors.OpExecError("Only two-disk configurations supported")
5255
5256     disk_space = _ComputeDiskSize(self.disk_template,
5257                                   self.disks[0]["size"], self.disks[1]["size"])
5258
5259     if self.disk_template in constants.DTS_NET_MIRROR:
5260       self.required_nodes = 2
5261     else:
5262       self.required_nodes = 1
5263     request = {
5264       "type": "allocate",
5265       "name": self.name,
5266       "disk_template": self.disk_template,
5267       "tags": self.tags,
5268       "os": self.os,
5269       "vcpus": self.vcpus,
5270       "memory": self.mem_size,
5271       "disks": self.disks,
5272       "disk_space_total": disk_space,
5273       "nics": self.nics,
5274       "required_nodes": self.required_nodes,
5275       }
5276     data["request"] = request
5277
5278   def _AddRelocateInstance(self):
5279     """Add relocate instance data to allocator structure.
5280
5281     This in combination with _IAllocatorGetClusterData will create the
5282     correct structure needed as input for the allocator.
5283
5284     The checks for the completeness of the opcode must have already been
5285     done.
5286
5287     """
5288     instance = self.cfg.GetInstanceInfo(self.name)
5289     if instance is None:
5290       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5291                                    " IAllocator" % self.name)
5292
5293     if instance.disk_template not in constants.DTS_NET_MIRROR:
5294       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5295
5296     if len(instance.secondary_nodes) != 1:
5297       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5298
5299     self.required_nodes = 1
5300
5301     disk_space = _ComputeDiskSize(instance.disk_template,
5302                                   instance.disks[0].size,
5303                                   instance.disks[1].size)
5304
5305     request = {
5306       "type": "relocate",
5307       "name": self.name,
5308       "disk_space_total": disk_space,
5309       "required_nodes": self.required_nodes,
5310       "relocate_from": self.relocate_from,
5311       }
5312     self.in_data["request"] = request
5313
5314   def _BuildInputData(self):
5315     """Build input data structures.
5316
5317     """
5318     self._ComputeClusterData()
5319
5320     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5321       self._AddNewInstance()
5322     else:
5323       self._AddRelocateInstance()
5324
5325     self.in_text = serializer.Dump(self.in_data)
5326
5327   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5328     """Run an instance allocator and return the results.
5329
5330     """
5331     data = self.in_text
5332
5333     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5334
5335     if not isinstance(result, (list, tuple)) or len(result) != 4:
5336       raise errors.OpExecError("Invalid result from master iallocator runner")
5337
5338     rcode, stdout, stderr, fail = result
5339
5340     if rcode == constants.IARUN_NOTFOUND:
5341       raise errors.OpExecError("Can't find allocator '%s'" % name)
5342     elif rcode == constants.IARUN_FAILURE:
5343       raise errors.OpExecError("Instance allocator call failed: %s,"
5344                                " output: %s" % (fail, stdout+stderr))
5345     self.out_text = stdout
5346     if validate:
5347       self._ValidateResult()
5348
5349   def _ValidateResult(self):
5350     """Process the allocator results.
5351
5352     This will process and if successful save the result in
5353     self.out_data and the other parameters.
5354
5355     """
5356     try:
5357       rdict = serializer.Load(self.out_text)
5358     except Exception, err:
5359       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5360
5361     if not isinstance(rdict, dict):
5362       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5363
5364     for key in "success", "info", "nodes":
5365       if key not in rdict:
5366         raise errors.OpExecError("Can't parse iallocator results:"
5367                                  " missing key '%s'" % key)
5368       setattr(self, key, rdict[key])
5369
5370     if not isinstance(rdict["nodes"], list):
5371       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5372                                " is not a list")
5373     self.out_data = rdict
5374
5375
5376 class LUTestAllocator(NoHooksLU):
5377   """Run allocator tests.
5378
5379   This LU runs the allocator tests
5380
5381   """
5382   _OP_REQP = ["direction", "mode", "name"]
5383
5384   def CheckPrereq(self):
5385     """Check prerequisites.
5386
5387     This checks the opcode parameters depending on the director and mode test.
5388
5389     """
5390     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5391       for attr in ["name", "mem_size", "disks", "disk_template",
5392                    "os", "tags", "nics", "vcpus"]:
5393         if not hasattr(self.op, attr):
5394           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5395                                      attr)
5396       iname = self.cfg.ExpandInstanceName(self.op.name)
5397       if iname is not None:
5398         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5399                                    iname)
5400       if not isinstance(self.op.nics, list):
5401         raise errors.OpPrereqError("Invalid parameter 'nics'")
5402       for row in self.op.nics:
5403         if (not isinstance(row, dict) or
5404             "mac" not in row or
5405             "ip" not in row or
5406             "bridge" not in row):
5407           raise errors.OpPrereqError("Invalid contents of the"
5408                                      " 'nics' parameter")
5409       if not isinstance(self.op.disks, list):
5410         raise errors.OpPrereqError("Invalid parameter 'disks'")
5411       if len(self.op.disks) != 2:
5412         raise errors.OpPrereqError("Only two-disk configurations supported")
5413       for row in self.op.disks:
5414         if (not isinstance(row, dict) or
5415             "size" not in row or
5416             not isinstance(row["size"], int) or
5417             "mode" not in row or
5418             row["mode"] not in ['r', 'w']):
5419           raise errors.OpPrereqError("Invalid contents of the"
5420                                      " 'disks' parameter")
5421     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5422       if not hasattr(self.op, "name"):
5423         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5424       fname = self.cfg.ExpandInstanceName(self.op.name)
5425       if fname is None:
5426         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5427                                    self.op.name)
5428       self.op.name = fname
5429       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5430     else:
5431       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5432                                  self.op.mode)
5433
5434     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5435       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5436         raise errors.OpPrereqError("Missing allocator name")
5437     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5438       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5439                                  self.op.direction)
5440
5441   def Exec(self, feedback_fn):
5442     """Run the allocator test.
5443
5444     """
5445     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5446       ial = IAllocator(self.cfg, self.sstore,
5447                        mode=self.op.mode,
5448                        name=self.op.name,
5449                        mem_size=self.op.mem_size,
5450                        disks=self.op.disks,
5451                        disk_template=self.op.disk_template,
5452                        os=self.op.os,
5453                        tags=self.op.tags,
5454                        nics=self.op.nics,
5455                        vcpus=self.op.vcpus,
5456                        )
5457     else:
5458       ial = IAllocator(self.cfg, self.sstore,
5459                        mode=self.op.mode,
5460                        name=self.op.name,
5461                        relocate_from=list(self.relocate_from),
5462                        )
5463
5464     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5465       result = ial.in_text
5466     else:
5467       ial.Run(self.op.allocator, validate=False)
5468       result = ial.out_text
5469     return result