Parallelize LU{A,Dea}ctivateInstanceDisks
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement ExpandNames
52     - implement CheckPrereq
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements:
57         REQ_MASTER: the LU needs to run on the master node
58         REQ_WSSTORE: the LU needs a writable SimpleStore
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_MASTER = True
68   REQ_WSSTORE = False
69   REQ_BGL = True
70
71   def __init__(self, processor, op, context, sstore):
72     """Constructor for LogicalUnit.
73
74     This needs to be overriden in derived classes in order to check op
75     validity.
76
77     """
78     self.proc = processor
79     self.op = op
80     self.cfg = context.cfg
81     self.sstore = sstore
82     self.context = context
83     self.needed_locks = None
84     self.acquired_locks = {}
85     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89
90     for attr_name in self._OP_REQP:
91       attr_val = getattr(op, attr_name, None)
92       if attr_val is None:
93         raise errors.OpPrereqError("Required parameter '%s' missing" %
94                                    attr_name)
95
96     if not self.cfg.IsCluster():
97       raise errors.OpPrereqError("Cluster not initialized yet,"
98                                  " use 'gnt-cluster init' first.")
99     if self.REQ_MASTER:
100       master = sstore.GetMasterNode()
101       if master != utils.HostInfo().name:
102         raise errors.OpPrereqError("Commands must be run on the master"
103                                    " node %s" % master)
104
105   def __GetSSH(self):
106     """Returns the SshRunner object
107
108     """
109     if not self.__ssh:
110       self.__ssh = ssh.SshRunner(self.sstore)
111     return self.__ssh
112
113   ssh = property(fget=__GetSSH)
114
115   def ExpandNames(self):
116     """Expand names for this LU.
117
118     This method is called before starting to execute the opcode, and it should
119     update all the parameters of the opcode to their canonical form (e.g. a
120     short node name must be fully expanded after this method has successfully
121     completed). This way locking, hooks, logging, ecc. can work correctly.
122
123     LUs which implement this method must also populate the self.needed_locks
124     member, as a dict with lock levels as keys, and a list of needed lock names
125     as values. Rules:
126       - Use an empty dict if you don't need any lock
127       - If you don't need any lock at a particular level omit that level
128       - Don't put anything for the BGL level
129       - If you want all locks at a level use locking.ALL_SET as a value
130
131     If you need to share locks (rather than acquire them exclusively) at one
132     level you can modify self.share_locks, setting a true value (usually 1) for
133     that level. By default locks are not shared.
134
135     Examples:
136     # Acquire all nodes and one instance
137     self.needed_locks = {
138       locking.LEVEL_NODE: locking.ALL_SET,
139       locking.LEVEL_INSTANCE: ['instance1.example.tld'],
140     }
141     # Acquire just two nodes
142     self.needed_locks = {
143       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
144     }
145     # Acquire no locks
146     self.needed_locks = {} # No, you can't leave it to the default value None
147
148     """
149     # The implementation of this method is mandatory only if the new LU is
150     # concurrent, so that old LUs don't need to be changed all at the same
151     # time.
152     if self.REQ_BGL:
153       self.needed_locks = {} # Exclusive LUs don't need locks.
154     else:
155       raise NotImplementedError
156
157   def DeclareLocks(self, level):
158     """Declare LU locking needs for a level
159
160     While most LUs can just declare their locking needs at ExpandNames time,
161     sometimes there's the need to calculate some locks after having acquired
162     the ones before. This function is called just before acquiring locks at a
163     particular level, but after acquiring the ones at lower levels, and permits
164     such calculations. It can be used to modify self.needed_locks, and by
165     default it does nothing.
166
167     This function is only called if you have something already set in
168     self.needed_locks for the level.
169
170     @param level: Locking level which is going to be locked
171     @type level: member of ganeti.locking.LEVELS
172
173     """
174
175   def CheckPrereq(self):
176     """Check prerequisites for this LU.
177
178     This method should check that the prerequisites for the execution
179     of this LU are fulfilled. It can do internode communication, but
180     it should be idempotent - no cluster or system changes are
181     allowed.
182
183     The method should raise errors.OpPrereqError in case something is
184     not fulfilled. Its return value is ignored.
185
186     This method should also update all the parameters of the opcode to
187     their canonical form if it hasn't been done by ExpandNames before.
188
189     """
190     raise NotImplementedError
191
192   def Exec(self, feedback_fn):
193     """Execute the LU.
194
195     This method should implement the actual work. It should raise
196     errors.OpExecError for failures that are somewhat dealt with in
197     code, or expected.
198
199     """
200     raise NotImplementedError
201
202   def BuildHooksEnv(self):
203     """Build hooks environment for this LU.
204
205     This method should return a three-node tuple consisting of: a dict
206     containing the environment that will be used for running the
207     specific hook for this LU, a list of node names on which the hook
208     should run before the execution, and a list of node names on which
209     the hook should run after the execution.
210
211     The keys of the dict must not have 'GANETI_' prefixed as this will
212     be handled in the hooks runner. Also note additional keys will be
213     added by the hooks runner. If the LU doesn't define any
214     environment, an empty dict (and not None) should be returned.
215
216     No nodes should be returned as an empty list (and not None).
217
218     Note that if the HPATH for a LU class is None, this function will
219     not be called.
220
221     """
222     raise NotImplementedError
223
224   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
225     """Notify the LU about the results of its hooks.
226
227     This method is called every time a hooks phase is executed, and notifies
228     the Logical Unit about the hooks' result. The LU can then use it to alter
229     its result based on the hooks.  By default the method does nothing and the
230     previous result is passed back unchanged but any LU can define it if it
231     wants to use the local cluster hook-scripts somehow.
232
233     Args:
234       phase: the hooks phase that has just been run
235       hooks_results: the results of the multi-node hooks rpc call
236       feedback_fn: function to send feedback back to the caller
237       lu_result: the previous result this LU had, or None in the PRE phase.
238
239     """
240     return lu_result
241
242   def _ExpandAndLockInstance(self):
243     """Helper function to expand and lock an instance.
244
245     Many LUs that work on an instance take its name in self.op.instance_name
246     and need to expand it and then declare the expanded name for locking. This
247     function does it, and then updates self.op.instance_name to the expanded
248     name. It also initializes needed_locks as a dict, if this hasn't been done
249     before.
250
251     """
252     if self.needed_locks is None:
253       self.needed_locks = {}
254     else:
255       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
256         "_ExpandAndLockInstance called with instance-level locks set"
257     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
258     if expanded_name is None:
259       raise errors.OpPrereqError("Instance '%s' not known" %
260                                   self.op.instance_name)
261     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
262     self.op.instance_name = expanded_name
263
264   def _LockInstancesNodes(self, primary_only=False):
265     """Helper function to declare instances' nodes for locking.
266
267     This function should be called after locking one or more instances to lock
268     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
269     with all primary or secondary nodes for instances already locked and
270     present in self.needed_locks[locking.LEVEL_INSTANCE].
271
272     It should be called from DeclareLocks, and for safety only works if
273     self.recalculate_locks[locking.LEVEL_NODE] is set.
274
275     In the future it may grow parameters to just lock some instance's nodes, or
276     to just lock primaries or secondary nodes, if needed.
277
278     If should be called in DeclareLocks in a way similar to:
279
280     if level == locking.LEVEL_NODE:
281       self._LockInstancesNodes()
282
283     @type primary_only: boolean
284     @param primary_only: only lock primary nodes of locked instances
285
286     """
287     assert locking.LEVEL_NODE in self.recalculate_locks, \
288       "_LockInstancesNodes helper function called with no nodes to recalculate"
289
290     # TODO: check if we're really been called with the instance locks held
291
292     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
293     # future we might want to have different behaviors depending on the value
294     # of self.recalculate_locks[locking.LEVEL_NODE]
295     wanted_nodes = []
296     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
297       instance = self.context.cfg.GetInstanceInfo(instance_name)
298       wanted_nodes.append(instance.primary_node)
299       if not primary_only:
300         wanted_nodes.extend(instance.secondary_nodes)
301     self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
302
303     del self.recalculate_locks[locking.LEVEL_NODE]
304
305
306 class NoHooksLU(LogicalUnit):
307   """Simple LU which runs no hooks.
308
309   This LU is intended as a parent for other LogicalUnits which will
310   run no hooks, in order to reduce duplicate code.
311
312   """
313   HPATH = None
314   HTYPE = None
315
316
317 def _GetWantedNodes(lu, nodes):
318   """Returns list of checked and expanded node names.
319
320   Args:
321     nodes: List of nodes (strings) or None for all
322
323   """
324   if not isinstance(nodes, list):
325     raise errors.OpPrereqError("Invalid argument type 'nodes'")
326
327   if not nodes:
328     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
329       " non-empty list of nodes whose name is to be expanded.")
330
331   wanted = []
332   for name in nodes:
333     node = lu.cfg.ExpandNodeName(name)
334     if node is None:
335       raise errors.OpPrereqError("No such node name '%s'" % name)
336     wanted.append(node)
337
338   return utils.NiceSort(wanted)
339
340
341 def _GetWantedInstances(lu, instances):
342   """Returns list of checked and expanded instance names.
343
344   Args:
345     instances: List of instances (strings) or None for all
346
347   """
348   if not isinstance(instances, list):
349     raise errors.OpPrereqError("Invalid argument type 'instances'")
350
351   if instances:
352     wanted = []
353
354     for name in instances:
355       instance = lu.cfg.ExpandInstanceName(name)
356       if instance is None:
357         raise errors.OpPrereqError("No such instance name '%s'" % name)
358       wanted.append(instance)
359
360   else:
361     wanted = lu.cfg.GetInstanceList()
362   return utils.NiceSort(wanted)
363
364
365 def _CheckOutputFields(static, dynamic, selected):
366   """Checks whether all selected fields are valid.
367
368   Args:
369     static: Static fields
370     dynamic: Dynamic fields
371
372   """
373   static_fields = frozenset(static)
374   dynamic_fields = frozenset(dynamic)
375
376   all_fields = static_fields | dynamic_fields
377
378   if not all_fields.issuperset(selected):
379     raise errors.OpPrereqError("Unknown output fields selected: %s"
380                                % ",".join(frozenset(selected).
381                                           difference(all_fields)))
382
383
384 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
385                           memory, vcpus, nics):
386   """Builds instance related env variables for hooks from single variables.
387
388   Args:
389     secondary_nodes: List of secondary nodes as strings
390   """
391   env = {
392     "OP_TARGET": name,
393     "INSTANCE_NAME": name,
394     "INSTANCE_PRIMARY": primary_node,
395     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
396     "INSTANCE_OS_TYPE": os_type,
397     "INSTANCE_STATUS": status,
398     "INSTANCE_MEMORY": memory,
399     "INSTANCE_VCPUS": vcpus,
400   }
401
402   if nics:
403     nic_count = len(nics)
404     for idx, (ip, bridge, mac) in enumerate(nics):
405       if ip is None:
406         ip = ""
407       env["INSTANCE_NIC%d_IP" % idx] = ip
408       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
409       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
410   else:
411     nic_count = 0
412
413   env["INSTANCE_NIC_COUNT"] = nic_count
414
415   return env
416
417
418 def _BuildInstanceHookEnvByObject(instance, override=None):
419   """Builds instance related env variables for hooks from an object.
420
421   Args:
422     instance: objects.Instance object of instance
423     override: dict of values to override
424   """
425   args = {
426     'name': instance.name,
427     'primary_node': instance.primary_node,
428     'secondary_nodes': instance.secondary_nodes,
429     'os_type': instance.os,
430     'status': instance.os,
431     'memory': instance.memory,
432     'vcpus': instance.vcpus,
433     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
434   }
435   if override:
436     args.update(override)
437   return _BuildInstanceHookEnv(**args)
438
439
440 def _CheckInstanceBridgesExist(instance):
441   """Check that the brigdes needed by an instance exist.
442
443   """
444   # check bridges existance
445   brlist = [nic.bridge for nic in instance.nics]
446   if not rpc.call_bridges_exist(instance.primary_node, brlist):
447     raise errors.OpPrereqError("one or more target bridges %s does not"
448                                " exist on destination node '%s'" %
449                                (brlist, instance.primary_node))
450
451
452 class LUDestroyCluster(NoHooksLU):
453   """Logical unit for destroying the cluster.
454
455   """
456   _OP_REQP = []
457
458   def CheckPrereq(self):
459     """Check prerequisites.
460
461     This checks whether the cluster is empty.
462
463     Any errors are signalled by raising errors.OpPrereqError.
464
465     """
466     master = self.sstore.GetMasterNode()
467
468     nodelist = self.cfg.GetNodeList()
469     if len(nodelist) != 1 or nodelist[0] != master:
470       raise errors.OpPrereqError("There are still %d node(s) in"
471                                  " this cluster." % (len(nodelist) - 1))
472     instancelist = self.cfg.GetInstanceList()
473     if instancelist:
474       raise errors.OpPrereqError("There are still %d instance(s) in"
475                                  " this cluster." % len(instancelist))
476
477   def Exec(self, feedback_fn):
478     """Destroys the cluster.
479
480     """
481     master = self.sstore.GetMasterNode()
482     if not rpc.call_node_stop_master(master, False):
483       raise errors.OpExecError("Could not disable the master role")
484     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
485     utils.CreateBackup(priv_key)
486     utils.CreateBackup(pub_key)
487     return master
488
489
490 class LUVerifyCluster(LogicalUnit):
491   """Verifies the cluster status.
492
493   """
494   HPATH = "cluster-verify"
495   HTYPE = constants.HTYPE_CLUSTER
496   _OP_REQP = ["skip_checks"]
497
498   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
499                   remote_version, feedback_fn):
500     """Run multiple tests against a node.
501
502     Test list:
503       - compares ganeti version
504       - checks vg existance and size > 20G
505       - checks config file checksum
506       - checks ssh to other nodes
507
508     Args:
509       node: name of the node to check
510       file_list: required list of files
511       local_cksum: dictionary of local files and their checksums
512
513     """
514     # compares ganeti version
515     local_version = constants.PROTOCOL_VERSION
516     if not remote_version:
517       feedback_fn("  - ERROR: connection to %s failed" % (node))
518       return True
519
520     if local_version != remote_version:
521       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
522                       (local_version, node, remote_version))
523       return True
524
525     # checks vg existance and size > 20G
526
527     bad = False
528     if not vglist:
529       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
530                       (node,))
531       bad = True
532     else:
533       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
534                                             constants.MIN_VG_SIZE)
535       if vgstatus:
536         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
537         bad = True
538
539     # checks config file checksum
540     # checks ssh to any
541
542     if 'filelist' not in node_result:
543       bad = True
544       feedback_fn("  - ERROR: node hasn't returned file checksum data")
545     else:
546       remote_cksum = node_result['filelist']
547       for file_name in file_list:
548         if file_name not in remote_cksum:
549           bad = True
550           feedback_fn("  - ERROR: file '%s' missing" % file_name)
551         elif remote_cksum[file_name] != local_cksum[file_name]:
552           bad = True
553           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
554
555     if 'nodelist' not in node_result:
556       bad = True
557       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
558     else:
559       if node_result['nodelist']:
560         bad = True
561         for node in node_result['nodelist']:
562           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
563                           (node, node_result['nodelist'][node]))
564     if 'node-net-test' not in node_result:
565       bad = True
566       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
567     else:
568       if node_result['node-net-test']:
569         bad = True
570         nlist = utils.NiceSort(node_result['node-net-test'].keys())
571         for node in nlist:
572           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
573                           (node, node_result['node-net-test'][node]))
574
575     hyp_result = node_result.get('hypervisor', None)
576     if hyp_result is not None:
577       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
578     return bad
579
580   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
581                       node_instance, feedback_fn):
582     """Verify an instance.
583
584     This function checks to see if the required block devices are
585     available on the instance's node.
586
587     """
588     bad = False
589
590     node_current = instanceconfig.primary_node
591
592     node_vol_should = {}
593     instanceconfig.MapLVsByNode(node_vol_should)
594
595     for node in node_vol_should:
596       for volume in node_vol_should[node]:
597         if node not in node_vol_is or volume not in node_vol_is[node]:
598           feedback_fn("  - ERROR: volume %s missing on node %s" %
599                           (volume, node))
600           bad = True
601
602     if not instanceconfig.status == 'down':
603       if (node_current not in node_instance or
604           not instance in node_instance[node_current]):
605         feedback_fn("  - ERROR: instance %s not running on node %s" %
606                         (instance, node_current))
607         bad = True
608
609     for node in node_instance:
610       if (not node == node_current):
611         if instance in node_instance[node]:
612           feedback_fn("  - ERROR: instance %s should not run on node %s" %
613                           (instance, node))
614           bad = True
615
616     return bad
617
618   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
619     """Verify if there are any unknown volumes in the cluster.
620
621     The .os, .swap and backup volumes are ignored. All other volumes are
622     reported as unknown.
623
624     """
625     bad = False
626
627     for node in node_vol_is:
628       for volume in node_vol_is[node]:
629         if node not in node_vol_should or volume not in node_vol_should[node]:
630           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
631                       (volume, node))
632           bad = True
633     return bad
634
635   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
636     """Verify the list of running instances.
637
638     This checks what instances are running but unknown to the cluster.
639
640     """
641     bad = False
642     for node in node_instance:
643       for runninginstance in node_instance[node]:
644         if runninginstance not in instancelist:
645           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
646                           (runninginstance, node))
647           bad = True
648     return bad
649
650   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
651     """Verify N+1 Memory Resilience.
652
653     Check that if one single node dies we can still start all the instances it
654     was primary for.
655
656     """
657     bad = False
658
659     for node, nodeinfo in node_info.iteritems():
660       # This code checks that every node which is now listed as secondary has
661       # enough memory to host all instances it is supposed to should a single
662       # other node in the cluster fail.
663       # FIXME: not ready for failover to an arbitrary node
664       # FIXME: does not support file-backed instances
665       # WARNING: we currently take into account down instances as well as up
666       # ones, considering that even if they're down someone might want to start
667       # them even in the event of a node failure.
668       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
669         needed_mem = 0
670         for instance in instances:
671           needed_mem += instance_cfg[instance].memory
672         if nodeinfo['mfree'] < needed_mem:
673           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
674                       " failovers should node %s fail" % (node, prinode))
675           bad = True
676     return bad
677
678   def CheckPrereq(self):
679     """Check prerequisites.
680
681     Transform the list of checks we're going to skip into a set and check that
682     all its members are valid.
683
684     """
685     self.skip_set = frozenset(self.op.skip_checks)
686     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
687       raise errors.OpPrereqError("Invalid checks to be skipped specified")
688
689   def BuildHooksEnv(self):
690     """Build hooks env.
691
692     Cluster-Verify hooks just rone in the post phase and their failure makes
693     the output be logged in the verify output and the verification to fail.
694
695     """
696     all_nodes = self.cfg.GetNodeList()
697     # TODO: populate the environment with useful information for verify hooks
698     env = {}
699     return env, [], all_nodes
700
701   def Exec(self, feedback_fn):
702     """Verify integrity of cluster, performing various test on nodes.
703
704     """
705     bad = False
706     feedback_fn("* Verifying global settings")
707     for msg in self.cfg.VerifyConfig():
708       feedback_fn("  - ERROR: %s" % msg)
709
710     vg_name = self.cfg.GetVGName()
711     nodelist = utils.NiceSort(self.cfg.GetNodeList())
712     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
713     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
714     i_non_redundant = [] # Non redundant instances
715     node_volume = {}
716     node_instance = {}
717     node_info = {}
718     instance_cfg = {}
719
720     # FIXME: verify OS list
721     # do local checksums
722     file_names = list(self.sstore.GetFileList())
723     file_names.append(constants.SSL_CERT_FILE)
724     file_names.append(constants.CLUSTER_CONF_FILE)
725     local_checksums = utils.FingerprintFiles(file_names)
726
727     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
728     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
729     all_instanceinfo = rpc.call_instance_list(nodelist)
730     all_vglist = rpc.call_vg_list(nodelist)
731     node_verify_param = {
732       'filelist': file_names,
733       'nodelist': nodelist,
734       'hypervisor': None,
735       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
736                         for node in nodeinfo]
737       }
738     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
739     all_rversion = rpc.call_version(nodelist)
740     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
741
742     for node in nodelist:
743       feedback_fn("* Verifying node %s" % node)
744       result = self._VerifyNode(node, file_names, local_checksums,
745                                 all_vglist[node], all_nvinfo[node],
746                                 all_rversion[node], feedback_fn)
747       bad = bad or result
748
749       # node_volume
750       volumeinfo = all_volumeinfo[node]
751
752       if isinstance(volumeinfo, basestring):
753         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
754                     (node, volumeinfo[-400:].encode('string_escape')))
755         bad = True
756         node_volume[node] = {}
757       elif not isinstance(volumeinfo, dict):
758         feedback_fn("  - ERROR: connection to %s failed" % (node,))
759         bad = True
760         continue
761       else:
762         node_volume[node] = volumeinfo
763
764       # node_instance
765       nodeinstance = all_instanceinfo[node]
766       if type(nodeinstance) != list:
767         feedback_fn("  - ERROR: connection to %s failed" % (node,))
768         bad = True
769         continue
770
771       node_instance[node] = nodeinstance
772
773       # node_info
774       nodeinfo = all_ninfo[node]
775       if not isinstance(nodeinfo, dict):
776         feedback_fn("  - ERROR: connection to %s failed" % (node,))
777         bad = True
778         continue
779
780       try:
781         node_info[node] = {
782           "mfree": int(nodeinfo['memory_free']),
783           "dfree": int(nodeinfo['vg_free']),
784           "pinst": [],
785           "sinst": [],
786           # dictionary holding all instances this node is secondary for,
787           # grouped by their primary node. Each key is a cluster node, and each
788           # value is a list of instances which have the key as primary and the
789           # current node as secondary.  this is handy to calculate N+1 memory
790           # availability if you can only failover from a primary to its
791           # secondary.
792           "sinst-by-pnode": {},
793         }
794       except ValueError:
795         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
796         bad = True
797         continue
798
799     node_vol_should = {}
800
801     for instance in instancelist:
802       feedback_fn("* Verifying instance %s" % instance)
803       inst_config = self.cfg.GetInstanceInfo(instance)
804       result =  self._VerifyInstance(instance, inst_config, node_volume,
805                                      node_instance, feedback_fn)
806       bad = bad or result
807
808       inst_config.MapLVsByNode(node_vol_should)
809
810       instance_cfg[instance] = inst_config
811
812       pnode = inst_config.primary_node
813       if pnode in node_info:
814         node_info[pnode]['pinst'].append(instance)
815       else:
816         feedback_fn("  - ERROR: instance %s, connection to primary node"
817                     " %s failed" % (instance, pnode))
818         bad = True
819
820       # If the instance is non-redundant we cannot survive losing its primary
821       # node, so we are not N+1 compliant. On the other hand we have no disk
822       # templates with more than one secondary so that situation is not well
823       # supported either.
824       # FIXME: does not support file-backed instances
825       if len(inst_config.secondary_nodes) == 0:
826         i_non_redundant.append(instance)
827       elif len(inst_config.secondary_nodes) > 1:
828         feedback_fn("  - WARNING: multiple secondaries for instance %s"
829                     % instance)
830
831       for snode in inst_config.secondary_nodes:
832         if snode in node_info:
833           node_info[snode]['sinst'].append(instance)
834           if pnode not in node_info[snode]['sinst-by-pnode']:
835             node_info[snode]['sinst-by-pnode'][pnode] = []
836           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
837         else:
838           feedback_fn("  - ERROR: instance %s, connection to secondary node"
839                       " %s failed" % (instance, snode))
840
841     feedback_fn("* Verifying orphan volumes")
842     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
843                                        feedback_fn)
844     bad = bad or result
845
846     feedback_fn("* Verifying remaining instances")
847     result = self._VerifyOrphanInstances(instancelist, node_instance,
848                                          feedback_fn)
849     bad = bad or result
850
851     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
852       feedback_fn("* Verifying N+1 Memory redundancy")
853       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
854       bad = bad or result
855
856     feedback_fn("* Other Notes")
857     if i_non_redundant:
858       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
859                   % len(i_non_redundant))
860
861     return not bad
862
863   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
864     """Analize the post-hooks' result, handle it, and send some
865     nicely-formatted feedback back to the user.
866
867     Args:
868       phase: the hooks phase that has just been run
869       hooks_results: the results of the multi-node hooks rpc call
870       feedback_fn: function to send feedback back to the caller
871       lu_result: previous Exec result
872
873     """
874     # We only really run POST phase hooks, and are only interested in
875     # their results
876     if phase == constants.HOOKS_PHASE_POST:
877       # Used to change hooks' output to proper indentation
878       indent_re = re.compile('^', re.M)
879       feedback_fn("* Hooks Results")
880       if not hooks_results:
881         feedback_fn("  - ERROR: general communication failure")
882         lu_result = 1
883       else:
884         for node_name in hooks_results:
885           show_node_header = True
886           res = hooks_results[node_name]
887           if res is False or not isinstance(res, list):
888             feedback_fn("    Communication failure")
889             lu_result = 1
890             continue
891           for script, hkr, output in res:
892             if hkr == constants.HKR_FAIL:
893               # The node header is only shown once, if there are
894               # failing hooks on that node
895               if show_node_header:
896                 feedback_fn("  Node %s:" % node_name)
897                 show_node_header = False
898               feedback_fn("    ERROR: Script %s failed, output:" % script)
899               output = indent_re.sub('      ', output)
900               feedback_fn("%s" % output)
901               lu_result = 1
902
903       return lu_result
904
905
906 class LUVerifyDisks(NoHooksLU):
907   """Verifies the cluster disks status.
908
909   """
910   _OP_REQP = []
911
912   def CheckPrereq(self):
913     """Check prerequisites.
914
915     This has no prerequisites.
916
917     """
918     pass
919
920   def Exec(self, feedback_fn):
921     """Verify integrity of cluster disks.
922
923     """
924     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
925
926     vg_name = self.cfg.GetVGName()
927     nodes = utils.NiceSort(self.cfg.GetNodeList())
928     instances = [self.cfg.GetInstanceInfo(name)
929                  for name in self.cfg.GetInstanceList()]
930
931     nv_dict = {}
932     for inst in instances:
933       inst_lvs = {}
934       if (inst.status != "up" or
935           inst.disk_template not in constants.DTS_NET_MIRROR):
936         continue
937       inst.MapLVsByNode(inst_lvs)
938       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
939       for node, vol_list in inst_lvs.iteritems():
940         for vol in vol_list:
941           nv_dict[(node, vol)] = inst
942
943     if not nv_dict:
944       return result
945
946     node_lvs = rpc.call_volume_list(nodes, vg_name)
947
948     to_act = set()
949     for node in nodes:
950       # node_volume
951       lvs = node_lvs[node]
952
953       if isinstance(lvs, basestring):
954         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
955         res_nlvm[node] = lvs
956       elif not isinstance(lvs, dict):
957         logger.Info("connection to node %s failed or invalid data returned" %
958                     (node,))
959         res_nodes.append(node)
960         continue
961
962       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
963         inst = nv_dict.pop((node, lv_name), None)
964         if (not lv_online and inst is not None
965             and inst.name not in res_instances):
966           res_instances.append(inst.name)
967
968     # any leftover items in nv_dict are missing LVs, let's arrange the
969     # data better
970     for key, inst in nv_dict.iteritems():
971       if inst.name not in res_missing:
972         res_missing[inst.name] = []
973       res_missing[inst.name].append(key)
974
975     return result
976
977
978 class LURenameCluster(LogicalUnit):
979   """Rename the cluster.
980
981   """
982   HPATH = "cluster-rename"
983   HTYPE = constants.HTYPE_CLUSTER
984   _OP_REQP = ["name"]
985   REQ_WSSTORE = True
986
987   def BuildHooksEnv(self):
988     """Build hooks env.
989
990     """
991     env = {
992       "OP_TARGET": self.sstore.GetClusterName(),
993       "NEW_NAME": self.op.name,
994       }
995     mn = self.sstore.GetMasterNode()
996     return env, [mn], [mn]
997
998   def CheckPrereq(self):
999     """Verify that the passed name is a valid one.
1000
1001     """
1002     hostname = utils.HostInfo(self.op.name)
1003
1004     new_name = hostname.name
1005     self.ip = new_ip = hostname.ip
1006     old_name = self.sstore.GetClusterName()
1007     old_ip = self.sstore.GetMasterIP()
1008     if new_name == old_name and new_ip == old_ip:
1009       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1010                                  " cluster has changed")
1011     if new_ip != old_ip:
1012       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1013         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1014                                    " reachable on the network. Aborting." %
1015                                    new_ip)
1016
1017     self.op.name = new_name
1018
1019   def Exec(self, feedback_fn):
1020     """Rename the cluster.
1021
1022     """
1023     clustername = self.op.name
1024     ip = self.ip
1025     ss = self.sstore
1026
1027     # shutdown the master IP
1028     master = ss.GetMasterNode()
1029     if not rpc.call_node_stop_master(master, False):
1030       raise errors.OpExecError("Could not disable the master role")
1031
1032     try:
1033       # modify the sstore
1034       ss.SetKey(ss.SS_MASTER_IP, ip)
1035       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1036
1037       # Distribute updated ss config to all nodes
1038       myself = self.cfg.GetNodeInfo(master)
1039       dist_nodes = self.cfg.GetNodeList()
1040       if myself.name in dist_nodes:
1041         dist_nodes.remove(myself.name)
1042
1043       logger.Debug("Copying updated ssconf data to all nodes")
1044       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1045         fname = ss.KeyToFilename(keyname)
1046         result = rpc.call_upload_file(dist_nodes, fname)
1047         for to_node in dist_nodes:
1048           if not result[to_node]:
1049             logger.Error("copy of file %s to node %s failed" %
1050                          (fname, to_node))
1051     finally:
1052       if not rpc.call_node_start_master(master, False):
1053         logger.Error("Could not re-enable the master role on the master,"
1054                      " please restart manually.")
1055
1056
1057 def _RecursiveCheckIfLVMBased(disk):
1058   """Check if the given disk or its children are lvm-based.
1059
1060   Args:
1061     disk: ganeti.objects.Disk object
1062
1063   Returns:
1064     boolean indicating whether a LD_LV dev_type was found or not
1065
1066   """
1067   if disk.children:
1068     for chdisk in disk.children:
1069       if _RecursiveCheckIfLVMBased(chdisk):
1070         return True
1071   return disk.dev_type == constants.LD_LV
1072
1073
1074 class LUSetClusterParams(LogicalUnit):
1075   """Change the parameters of the cluster.
1076
1077   """
1078   HPATH = "cluster-modify"
1079   HTYPE = constants.HTYPE_CLUSTER
1080   _OP_REQP = []
1081
1082   def BuildHooksEnv(self):
1083     """Build hooks env.
1084
1085     """
1086     env = {
1087       "OP_TARGET": self.sstore.GetClusterName(),
1088       "NEW_VG_NAME": self.op.vg_name,
1089       }
1090     mn = self.sstore.GetMasterNode()
1091     return env, [mn], [mn]
1092
1093   def CheckPrereq(self):
1094     """Check prerequisites.
1095
1096     This checks whether the given params don't conflict and
1097     if the given volume group is valid.
1098
1099     """
1100     if not self.op.vg_name:
1101       instances = [self.cfg.GetInstanceInfo(name)
1102                    for name in self.cfg.GetInstanceList()]
1103       for inst in instances:
1104         for disk in inst.disks:
1105           if _RecursiveCheckIfLVMBased(disk):
1106             raise errors.OpPrereqError("Cannot disable lvm storage while"
1107                                        " lvm-based instances exist")
1108
1109     # if vg_name not None, checks given volume group on all nodes
1110     if self.op.vg_name:
1111       node_list = self.cfg.GetNodeList()
1112       vglist = rpc.call_vg_list(node_list)
1113       for node in node_list:
1114         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1115                                               constants.MIN_VG_SIZE)
1116         if vgstatus:
1117           raise errors.OpPrereqError("Error on node '%s': %s" %
1118                                      (node, vgstatus))
1119
1120   def Exec(self, feedback_fn):
1121     """Change the parameters of the cluster.
1122
1123     """
1124     if self.op.vg_name != self.cfg.GetVGName():
1125       self.cfg.SetVGName(self.op.vg_name)
1126     else:
1127       feedback_fn("Cluster LVM configuration already in desired"
1128                   " state, not changing")
1129
1130
1131 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1132   """Sleep and poll for an instance's disk to sync.
1133
1134   """
1135   if not instance.disks:
1136     return True
1137
1138   if not oneshot:
1139     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1140
1141   node = instance.primary_node
1142
1143   for dev in instance.disks:
1144     cfgw.SetDiskID(dev, node)
1145
1146   retries = 0
1147   while True:
1148     max_time = 0
1149     done = True
1150     cumul_degraded = False
1151     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1152     if not rstats:
1153       proc.LogWarning("Can't get any data from node %s" % node)
1154       retries += 1
1155       if retries >= 10:
1156         raise errors.RemoteError("Can't contact node %s for mirror data,"
1157                                  " aborting." % node)
1158       time.sleep(6)
1159       continue
1160     retries = 0
1161     for i in range(len(rstats)):
1162       mstat = rstats[i]
1163       if mstat is None:
1164         proc.LogWarning("Can't compute data for node %s/%s" %
1165                         (node, instance.disks[i].iv_name))
1166         continue
1167       # we ignore the ldisk parameter
1168       perc_done, est_time, is_degraded, _ = mstat
1169       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1170       if perc_done is not None:
1171         done = False
1172         if est_time is not None:
1173           rem_time = "%d estimated seconds remaining" % est_time
1174           max_time = est_time
1175         else:
1176           rem_time = "no time estimate"
1177         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1178                      (instance.disks[i].iv_name, perc_done, rem_time))
1179     if done or oneshot:
1180       break
1181
1182     time.sleep(min(60, max_time))
1183
1184   if done:
1185     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1186   return not cumul_degraded
1187
1188
1189 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1190   """Check that mirrors are not degraded.
1191
1192   The ldisk parameter, if True, will change the test from the
1193   is_degraded attribute (which represents overall non-ok status for
1194   the device(s)) to the ldisk (representing the local storage status).
1195
1196   """
1197   cfgw.SetDiskID(dev, node)
1198   if ldisk:
1199     idx = 6
1200   else:
1201     idx = 5
1202
1203   result = True
1204   if on_primary or dev.AssembleOnSecondary():
1205     rstats = rpc.call_blockdev_find(node, dev)
1206     if not rstats:
1207       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1208       result = False
1209     else:
1210       result = result and (not rstats[idx])
1211   if dev.children:
1212     for child in dev.children:
1213       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1214
1215   return result
1216
1217
1218 class LUDiagnoseOS(NoHooksLU):
1219   """Logical unit for OS diagnose/query.
1220
1221   """
1222   _OP_REQP = ["output_fields", "names"]
1223   REQ_BGL = False
1224
1225   def ExpandNames(self):
1226     if self.op.names:
1227       raise errors.OpPrereqError("Selective OS query not supported")
1228
1229     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1230     _CheckOutputFields(static=[],
1231                        dynamic=self.dynamic_fields,
1232                        selected=self.op.output_fields)
1233
1234     # Lock all nodes, in shared mode
1235     self.needed_locks = {}
1236     self.share_locks[locking.LEVEL_NODE] = 1
1237     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1238
1239   def CheckPrereq(self):
1240     """Check prerequisites.
1241
1242     """
1243
1244   @staticmethod
1245   def _DiagnoseByOS(node_list, rlist):
1246     """Remaps a per-node return list into an a per-os per-node dictionary
1247
1248       Args:
1249         node_list: a list with the names of all nodes
1250         rlist: a map with node names as keys and OS objects as values
1251
1252       Returns:
1253         map: a map with osnames as keys and as value another map, with
1254              nodes as
1255              keys and list of OS objects as values
1256              e.g. {"debian-etch": {"node1": [<object>,...],
1257                                    "node2": [<object>,]}
1258                   }
1259
1260     """
1261     all_os = {}
1262     for node_name, nr in rlist.iteritems():
1263       if not nr:
1264         continue
1265       for os_obj in nr:
1266         if os_obj.name not in all_os:
1267           # build a list of nodes for this os containing empty lists
1268           # for each node in node_list
1269           all_os[os_obj.name] = {}
1270           for nname in node_list:
1271             all_os[os_obj.name][nname] = []
1272         all_os[os_obj.name][node_name].append(os_obj)
1273     return all_os
1274
1275   def Exec(self, feedback_fn):
1276     """Compute the list of OSes.
1277
1278     """
1279     node_list = self.acquired_locks[locking.LEVEL_NODE]
1280     node_data = rpc.call_os_diagnose(node_list)
1281     if node_data == False:
1282       raise errors.OpExecError("Can't gather the list of OSes")
1283     pol = self._DiagnoseByOS(node_list, node_data)
1284     output = []
1285     for os_name, os_data in pol.iteritems():
1286       row = []
1287       for field in self.op.output_fields:
1288         if field == "name":
1289           val = os_name
1290         elif field == "valid":
1291           val = utils.all([osl and osl[0] for osl in os_data.values()])
1292         elif field == "node_status":
1293           val = {}
1294           for node_name, nos_list in os_data.iteritems():
1295             val[node_name] = [(v.status, v.path) for v in nos_list]
1296         else:
1297           raise errors.ParameterError(field)
1298         row.append(val)
1299       output.append(row)
1300
1301     return output
1302
1303
1304 class LURemoveNode(LogicalUnit):
1305   """Logical unit for removing a node.
1306
1307   """
1308   HPATH = "node-remove"
1309   HTYPE = constants.HTYPE_NODE
1310   _OP_REQP = ["node_name"]
1311
1312   def BuildHooksEnv(self):
1313     """Build hooks env.
1314
1315     This doesn't run on the target node in the pre phase as a failed
1316     node would then be impossible to remove.
1317
1318     """
1319     env = {
1320       "OP_TARGET": self.op.node_name,
1321       "NODE_NAME": self.op.node_name,
1322       }
1323     all_nodes = self.cfg.GetNodeList()
1324     all_nodes.remove(self.op.node_name)
1325     return env, all_nodes, all_nodes
1326
1327   def CheckPrereq(self):
1328     """Check prerequisites.
1329
1330     This checks:
1331      - the node exists in the configuration
1332      - it does not have primary or secondary instances
1333      - it's not the master
1334
1335     Any errors are signalled by raising errors.OpPrereqError.
1336
1337     """
1338     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1339     if node is None:
1340       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1341
1342     instance_list = self.cfg.GetInstanceList()
1343
1344     masternode = self.sstore.GetMasterNode()
1345     if node.name == masternode:
1346       raise errors.OpPrereqError("Node is the master node,"
1347                                  " you need to failover first.")
1348
1349     for instance_name in instance_list:
1350       instance = self.cfg.GetInstanceInfo(instance_name)
1351       if node.name == instance.primary_node:
1352         raise errors.OpPrereqError("Instance %s still running on the node,"
1353                                    " please remove first." % instance_name)
1354       if node.name in instance.secondary_nodes:
1355         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1356                                    " please remove first." % instance_name)
1357     self.op.node_name = node.name
1358     self.node = node
1359
1360   def Exec(self, feedback_fn):
1361     """Removes the node from the cluster.
1362
1363     """
1364     node = self.node
1365     logger.Info("stopping the node daemon and removing configs from node %s" %
1366                 node.name)
1367
1368     self.context.RemoveNode(node.name)
1369
1370     rpc.call_node_leave_cluster(node.name)
1371
1372
1373 class LUQueryNodes(NoHooksLU):
1374   """Logical unit for querying nodes.
1375
1376   """
1377   _OP_REQP = ["output_fields", "names"]
1378   REQ_BGL = False
1379
1380   def ExpandNames(self):
1381     self.dynamic_fields = frozenset([
1382       "dtotal", "dfree",
1383       "mtotal", "mnode", "mfree",
1384       "bootid",
1385       "ctotal",
1386       ])
1387
1388     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1389                                "pinst_list", "sinst_list",
1390                                "pip", "sip", "tags"],
1391                        dynamic=self.dynamic_fields,
1392                        selected=self.op.output_fields)
1393
1394     self.needed_locks = {}
1395     self.share_locks[locking.LEVEL_NODE] = 1
1396     # TODO: we could lock nodes only if the user asked for dynamic fields. For
1397     # that we need atomic ways to get info for a group of nodes from the
1398     # config, though.
1399     if not self.op.names:
1400       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1401     else:
1402       self.needed_locks[locking.LEVEL_NODE] = \
1403         _GetWantedNodes(self, self.op.names)
1404
1405   def CheckPrereq(self):
1406     """Check prerequisites.
1407
1408     """
1409     # This of course is valid only if we locked the nodes
1410     self.wanted = self.acquired_locks[locking.LEVEL_NODE]
1411
1412   def Exec(self, feedback_fn):
1413     """Computes the list of nodes and their attributes.
1414
1415     """
1416     nodenames = self.wanted
1417     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1418
1419     # begin data gathering
1420
1421     if self.dynamic_fields.intersection(self.op.output_fields):
1422       live_data = {}
1423       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1424       for name in nodenames:
1425         nodeinfo = node_data.get(name, None)
1426         if nodeinfo:
1427           live_data[name] = {
1428             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1429             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1430             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1431             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1432             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1433             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1434             "bootid": nodeinfo['bootid'],
1435             }
1436         else:
1437           live_data[name] = {}
1438     else:
1439       live_data = dict.fromkeys(nodenames, {})
1440
1441     node_to_primary = dict([(name, set()) for name in nodenames])
1442     node_to_secondary = dict([(name, set()) for name in nodenames])
1443
1444     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1445                              "sinst_cnt", "sinst_list"))
1446     if inst_fields & frozenset(self.op.output_fields):
1447       instancelist = self.cfg.GetInstanceList()
1448
1449       for instance_name in instancelist:
1450         inst = self.cfg.GetInstanceInfo(instance_name)
1451         if inst.primary_node in node_to_primary:
1452           node_to_primary[inst.primary_node].add(inst.name)
1453         for secnode in inst.secondary_nodes:
1454           if secnode in node_to_secondary:
1455             node_to_secondary[secnode].add(inst.name)
1456
1457     # end data gathering
1458
1459     output = []
1460     for node in nodelist:
1461       node_output = []
1462       for field in self.op.output_fields:
1463         if field == "name":
1464           val = node.name
1465         elif field == "pinst_list":
1466           val = list(node_to_primary[node.name])
1467         elif field == "sinst_list":
1468           val = list(node_to_secondary[node.name])
1469         elif field == "pinst_cnt":
1470           val = len(node_to_primary[node.name])
1471         elif field == "sinst_cnt":
1472           val = len(node_to_secondary[node.name])
1473         elif field == "pip":
1474           val = node.primary_ip
1475         elif field == "sip":
1476           val = node.secondary_ip
1477         elif field == "tags":
1478           val = list(node.GetTags())
1479         elif field in self.dynamic_fields:
1480           val = live_data[node.name].get(field, None)
1481         else:
1482           raise errors.ParameterError(field)
1483         node_output.append(val)
1484       output.append(node_output)
1485
1486     return output
1487
1488
1489 class LUQueryNodeVolumes(NoHooksLU):
1490   """Logical unit for getting volumes on node(s).
1491
1492   """
1493   _OP_REQP = ["nodes", "output_fields"]
1494   REQ_BGL = False
1495
1496   def ExpandNames(self):
1497     _CheckOutputFields(static=["node"],
1498                        dynamic=["phys", "vg", "name", "size", "instance"],
1499                        selected=self.op.output_fields)
1500
1501     self.needed_locks = {}
1502     self.share_locks[locking.LEVEL_NODE] = 1
1503     if not self.op.nodes:
1504       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1505     else:
1506       self.needed_locks[locking.LEVEL_NODE] = \
1507         _GetWantedNodes(self, self.op.nodes)
1508
1509   def CheckPrereq(self):
1510     """Check prerequisites.
1511
1512     This checks that the fields required are valid output fields.
1513
1514     """
1515     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1516
1517   def Exec(self, feedback_fn):
1518     """Computes the list of nodes and their attributes.
1519
1520     """
1521     nodenames = self.nodes
1522     volumes = rpc.call_node_volumes(nodenames)
1523
1524     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1525              in self.cfg.GetInstanceList()]
1526
1527     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1528
1529     output = []
1530     for node in nodenames:
1531       if node not in volumes or not volumes[node]:
1532         continue
1533
1534       node_vols = volumes[node][:]
1535       node_vols.sort(key=lambda vol: vol['dev'])
1536
1537       for vol in node_vols:
1538         node_output = []
1539         for field in self.op.output_fields:
1540           if field == "node":
1541             val = node
1542           elif field == "phys":
1543             val = vol['dev']
1544           elif field == "vg":
1545             val = vol['vg']
1546           elif field == "name":
1547             val = vol['name']
1548           elif field == "size":
1549             val = int(float(vol['size']))
1550           elif field == "instance":
1551             for inst in ilist:
1552               if node not in lv_by_node[inst]:
1553                 continue
1554               if vol['name'] in lv_by_node[inst][node]:
1555                 val = inst.name
1556                 break
1557             else:
1558               val = '-'
1559           else:
1560             raise errors.ParameterError(field)
1561           node_output.append(str(val))
1562
1563         output.append(node_output)
1564
1565     return output
1566
1567
1568 class LUAddNode(LogicalUnit):
1569   """Logical unit for adding node to the cluster.
1570
1571   """
1572   HPATH = "node-add"
1573   HTYPE = constants.HTYPE_NODE
1574   _OP_REQP = ["node_name"]
1575
1576   def BuildHooksEnv(self):
1577     """Build hooks env.
1578
1579     This will run on all nodes before, and on all nodes + the new node after.
1580
1581     """
1582     env = {
1583       "OP_TARGET": self.op.node_name,
1584       "NODE_NAME": self.op.node_name,
1585       "NODE_PIP": self.op.primary_ip,
1586       "NODE_SIP": self.op.secondary_ip,
1587       }
1588     nodes_0 = self.cfg.GetNodeList()
1589     nodes_1 = nodes_0 + [self.op.node_name, ]
1590     return env, nodes_0, nodes_1
1591
1592   def CheckPrereq(self):
1593     """Check prerequisites.
1594
1595     This checks:
1596      - the new node is not already in the config
1597      - it is resolvable
1598      - its parameters (single/dual homed) matches the cluster
1599
1600     Any errors are signalled by raising errors.OpPrereqError.
1601
1602     """
1603     node_name = self.op.node_name
1604     cfg = self.cfg
1605
1606     dns_data = utils.HostInfo(node_name)
1607
1608     node = dns_data.name
1609     primary_ip = self.op.primary_ip = dns_data.ip
1610     secondary_ip = getattr(self.op, "secondary_ip", None)
1611     if secondary_ip is None:
1612       secondary_ip = primary_ip
1613     if not utils.IsValidIP(secondary_ip):
1614       raise errors.OpPrereqError("Invalid secondary IP given")
1615     self.op.secondary_ip = secondary_ip
1616
1617     node_list = cfg.GetNodeList()
1618     if not self.op.readd and node in node_list:
1619       raise errors.OpPrereqError("Node %s is already in the configuration" %
1620                                  node)
1621     elif self.op.readd and node not in node_list:
1622       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1623
1624     for existing_node_name in node_list:
1625       existing_node = cfg.GetNodeInfo(existing_node_name)
1626
1627       if self.op.readd and node == existing_node_name:
1628         if (existing_node.primary_ip != primary_ip or
1629             existing_node.secondary_ip != secondary_ip):
1630           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1631                                      " address configuration as before")
1632         continue
1633
1634       if (existing_node.primary_ip == primary_ip or
1635           existing_node.secondary_ip == primary_ip or
1636           existing_node.primary_ip == secondary_ip or
1637           existing_node.secondary_ip == secondary_ip):
1638         raise errors.OpPrereqError("New node ip address(es) conflict with"
1639                                    " existing node %s" % existing_node.name)
1640
1641     # check that the type of the node (single versus dual homed) is the
1642     # same as for the master
1643     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1644     master_singlehomed = myself.secondary_ip == myself.primary_ip
1645     newbie_singlehomed = secondary_ip == primary_ip
1646     if master_singlehomed != newbie_singlehomed:
1647       if master_singlehomed:
1648         raise errors.OpPrereqError("The master has no private ip but the"
1649                                    " new node has one")
1650       else:
1651         raise errors.OpPrereqError("The master has a private ip but the"
1652                                    " new node doesn't have one")
1653
1654     # checks reachablity
1655     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1656       raise errors.OpPrereqError("Node not reachable by ping")
1657
1658     if not newbie_singlehomed:
1659       # check reachability from my secondary ip to newbie's secondary ip
1660       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1661                            source=myself.secondary_ip):
1662         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1663                                    " based ping to noded port")
1664
1665     self.new_node = objects.Node(name=node,
1666                                  primary_ip=primary_ip,
1667                                  secondary_ip=secondary_ip)
1668
1669   def Exec(self, feedback_fn):
1670     """Adds the new node to the cluster.
1671
1672     """
1673     new_node = self.new_node
1674     node = new_node.name
1675
1676     # check connectivity
1677     result = rpc.call_version([node])[node]
1678     if result:
1679       if constants.PROTOCOL_VERSION == result:
1680         logger.Info("communication to node %s fine, sw version %s match" %
1681                     (node, result))
1682       else:
1683         raise errors.OpExecError("Version mismatch master version %s,"
1684                                  " node version %s" %
1685                                  (constants.PROTOCOL_VERSION, result))
1686     else:
1687       raise errors.OpExecError("Cannot get version from the new node")
1688
1689     # setup ssh on node
1690     logger.Info("copy ssh key to node %s" % node)
1691     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1692     keyarray = []
1693     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1694                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1695                 priv_key, pub_key]
1696
1697     for i in keyfiles:
1698       f = open(i, 'r')
1699       try:
1700         keyarray.append(f.read())
1701       finally:
1702         f.close()
1703
1704     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1705                                keyarray[3], keyarray[4], keyarray[5])
1706
1707     if not result:
1708       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1709
1710     # Add node to our /etc/hosts, and add key to known_hosts
1711     utils.AddHostToEtcHosts(new_node.name)
1712
1713     if new_node.secondary_ip != new_node.primary_ip:
1714       if not rpc.call_node_tcp_ping(new_node.name,
1715                                     constants.LOCALHOST_IP_ADDRESS,
1716                                     new_node.secondary_ip,
1717                                     constants.DEFAULT_NODED_PORT,
1718                                     10, False):
1719         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1720                                  " you gave (%s). Please fix and re-run this"
1721                                  " command." % new_node.secondary_ip)
1722
1723     node_verify_list = [self.sstore.GetMasterNode()]
1724     node_verify_param = {
1725       'nodelist': [node],
1726       # TODO: do a node-net-test as well?
1727     }
1728
1729     result = rpc.call_node_verify(node_verify_list, node_verify_param)
1730     for verifier in node_verify_list:
1731       if not result[verifier]:
1732         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1733                                  " for remote verification" % verifier)
1734       if result[verifier]['nodelist']:
1735         for failed in result[verifier]['nodelist']:
1736           feedback_fn("ssh/hostname verification failed %s -> %s" %
1737                       (verifier, result[verifier]['nodelist'][failed]))
1738         raise errors.OpExecError("ssh/hostname verification failed.")
1739
1740     # Distribute updated /etc/hosts and known_hosts to all nodes,
1741     # including the node just added
1742     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1743     dist_nodes = self.cfg.GetNodeList()
1744     if not self.op.readd:
1745       dist_nodes.append(node)
1746     if myself.name in dist_nodes:
1747       dist_nodes.remove(myself.name)
1748
1749     logger.Debug("Copying hosts and known_hosts to all nodes")
1750     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1751       result = rpc.call_upload_file(dist_nodes, fname)
1752       for to_node in dist_nodes:
1753         if not result[to_node]:
1754           logger.Error("copy of file %s to node %s failed" %
1755                        (fname, to_node))
1756
1757     to_copy = self.sstore.GetFileList()
1758     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1759       to_copy.append(constants.VNC_PASSWORD_FILE)
1760     for fname in to_copy:
1761       result = rpc.call_upload_file([node], fname)
1762       if not result[node]:
1763         logger.Error("could not copy file %s to node %s" % (fname, node))
1764
1765     if self.op.readd:
1766       self.context.ReaddNode(new_node)
1767     else:
1768       self.context.AddNode(new_node)
1769
1770
1771 class LUQueryClusterInfo(NoHooksLU):
1772   """Query cluster configuration.
1773
1774   """
1775   _OP_REQP = []
1776   REQ_MASTER = False
1777   REQ_BGL = False
1778
1779   def ExpandNames(self):
1780     self.needed_locks = {}
1781
1782   def CheckPrereq(self):
1783     """No prerequsites needed for this LU.
1784
1785     """
1786     pass
1787
1788   def Exec(self, feedback_fn):
1789     """Return cluster config.
1790
1791     """
1792     result = {
1793       "name": self.sstore.GetClusterName(),
1794       "software_version": constants.RELEASE_VERSION,
1795       "protocol_version": constants.PROTOCOL_VERSION,
1796       "config_version": constants.CONFIG_VERSION,
1797       "os_api_version": constants.OS_API_VERSION,
1798       "export_version": constants.EXPORT_VERSION,
1799       "master": self.sstore.GetMasterNode(),
1800       "architecture": (platform.architecture()[0], platform.machine()),
1801       "hypervisor_type": self.sstore.GetHypervisorType(),
1802       }
1803
1804     return result
1805
1806
1807 class LUDumpClusterConfig(NoHooksLU):
1808   """Return a text-representation of the cluster-config.
1809
1810   """
1811   _OP_REQP = []
1812   REQ_BGL = False
1813
1814   def ExpandNames(self):
1815     self.needed_locks = {}
1816
1817   def CheckPrereq(self):
1818     """No prerequisites.
1819
1820     """
1821     pass
1822
1823   def Exec(self, feedback_fn):
1824     """Dump a representation of the cluster config to the standard output.
1825
1826     """
1827     return self.cfg.DumpConfig()
1828
1829
1830 class LUActivateInstanceDisks(NoHooksLU):
1831   """Bring up an instance's disks.
1832
1833   """
1834   _OP_REQP = ["instance_name"]
1835   REQ_BGL = False
1836
1837   def ExpandNames(self):
1838     self._ExpandAndLockInstance()
1839     self.needed_locks[locking.LEVEL_NODE] = []
1840     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1841
1842   def DeclareLocks(self, level):
1843     if level == locking.LEVEL_NODE:
1844       self._LockInstancesNodes()
1845
1846   def CheckPrereq(self):
1847     """Check prerequisites.
1848
1849     This checks that the instance is in the cluster.
1850
1851     """
1852     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1853     assert self.instance is not None, \
1854       "Cannot retrieve locked instance %s" % self.op.instance_name
1855
1856   def Exec(self, feedback_fn):
1857     """Activate the disks.
1858
1859     """
1860     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1861     if not disks_ok:
1862       raise errors.OpExecError("Cannot activate block devices")
1863
1864     return disks_info
1865
1866
1867 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1868   """Prepare the block devices for an instance.
1869
1870   This sets up the block devices on all nodes.
1871
1872   Args:
1873     instance: a ganeti.objects.Instance object
1874     ignore_secondaries: if true, errors on secondary nodes won't result
1875                         in an error return from the function
1876
1877   Returns:
1878     false if the operation failed
1879     list of (host, instance_visible_name, node_visible_name) if the operation
1880          suceeded with the mapping from node devices to instance devices
1881   """
1882   device_info = []
1883   disks_ok = True
1884   iname = instance.name
1885   # With the two passes mechanism we try to reduce the window of
1886   # opportunity for the race condition of switching DRBD to primary
1887   # before handshaking occured, but we do not eliminate it
1888
1889   # The proper fix would be to wait (with some limits) until the
1890   # connection has been made and drbd transitions from WFConnection
1891   # into any other network-connected state (Connected, SyncTarget,
1892   # SyncSource, etc.)
1893
1894   # 1st pass, assemble on all nodes in secondary mode
1895   for inst_disk in instance.disks:
1896     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1897       cfg.SetDiskID(node_disk, node)
1898       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1899       if not result:
1900         logger.Error("could not prepare block device %s on node %s"
1901                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1902         if not ignore_secondaries:
1903           disks_ok = False
1904
1905   # FIXME: race condition on drbd migration to primary
1906
1907   # 2nd pass, do only the primary node
1908   for inst_disk in instance.disks:
1909     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1910       if node != instance.primary_node:
1911         continue
1912       cfg.SetDiskID(node_disk, node)
1913       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1914       if not result:
1915         logger.Error("could not prepare block device %s on node %s"
1916                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1917         disks_ok = False
1918     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1919
1920   # leave the disks configured for the primary node
1921   # this is a workaround that would be fixed better by
1922   # improving the logical/physical id handling
1923   for disk in instance.disks:
1924     cfg.SetDiskID(disk, instance.primary_node)
1925
1926   return disks_ok, device_info
1927
1928
1929 def _StartInstanceDisks(cfg, instance, force):
1930   """Start the disks of an instance.
1931
1932   """
1933   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1934                                            ignore_secondaries=force)
1935   if not disks_ok:
1936     _ShutdownInstanceDisks(instance, cfg)
1937     if force is not None and not force:
1938       logger.Error("If the message above refers to a secondary node,"
1939                    " you can retry the operation using '--force'.")
1940     raise errors.OpExecError("Disk consistency error")
1941
1942
1943 class LUDeactivateInstanceDisks(NoHooksLU):
1944   """Shutdown an instance's disks.
1945
1946   """
1947   _OP_REQP = ["instance_name"]
1948   REQ_BGL = False
1949
1950   def ExpandNames(self):
1951     self._ExpandAndLockInstance()
1952     self.needed_locks[locking.LEVEL_NODE] = []
1953     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1954
1955   def DeclareLocks(self, level):
1956     if level == locking.LEVEL_NODE:
1957       self._LockInstancesNodes()
1958
1959   def CheckPrereq(self):
1960     """Check prerequisites.
1961
1962     This checks that the instance is in the cluster.
1963
1964     """
1965     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1966     assert self.instance is not None, \
1967       "Cannot retrieve locked instance %s" % self.op.instance_name
1968
1969   def Exec(self, feedback_fn):
1970     """Deactivate the disks
1971
1972     """
1973     instance = self.instance
1974     _SafeShutdownInstanceDisks(instance, self.cfg)
1975
1976
1977 def _SafeShutdownInstanceDisks(instance, cfg):
1978   """Shutdown block devices of an instance.
1979
1980   This function checks if an instance is running, before calling
1981   _ShutdownInstanceDisks.
1982
1983   """
1984   ins_l = rpc.call_instance_list([instance.primary_node])
1985   ins_l = ins_l[instance.primary_node]
1986   if not type(ins_l) is list:
1987     raise errors.OpExecError("Can't contact node '%s'" %
1988                              instance.primary_node)
1989
1990   if instance.name in ins_l:
1991     raise errors.OpExecError("Instance is running, can't shutdown"
1992                              " block devices.")
1993
1994   _ShutdownInstanceDisks(instance, cfg)
1995
1996
1997 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1998   """Shutdown block devices of an instance.
1999
2000   This does the shutdown on all nodes of the instance.
2001
2002   If the ignore_primary is false, errors on the primary node are
2003   ignored.
2004
2005   """
2006   result = True
2007   for disk in instance.disks:
2008     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2009       cfg.SetDiskID(top_disk, node)
2010       if not rpc.call_blockdev_shutdown(node, top_disk):
2011         logger.Error("could not shutdown block device %s on node %s" %
2012                      (disk.iv_name, node))
2013         if not ignore_primary or node != instance.primary_node:
2014           result = False
2015   return result
2016
2017
2018 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2019   """Checks if a node has enough free memory.
2020
2021   This function check if a given node has the needed amount of free
2022   memory. In case the node has less memory or we cannot get the
2023   information from the node, this function raise an OpPrereqError
2024   exception.
2025
2026   Args:
2027     - cfg: a ConfigWriter instance
2028     - node: the node name
2029     - reason: string to use in the error message
2030     - requested: the amount of memory in MiB
2031
2032   """
2033   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2034   if not nodeinfo or not isinstance(nodeinfo, dict):
2035     raise errors.OpPrereqError("Could not contact node %s for resource"
2036                              " information" % (node,))
2037
2038   free_mem = nodeinfo[node].get('memory_free')
2039   if not isinstance(free_mem, int):
2040     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2041                              " was '%s'" % (node, free_mem))
2042   if requested > free_mem:
2043     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2044                              " needed %s MiB, available %s MiB" %
2045                              (node, reason, requested, free_mem))
2046
2047
2048 class LUStartupInstance(LogicalUnit):
2049   """Starts an instance.
2050
2051   """
2052   HPATH = "instance-start"
2053   HTYPE = constants.HTYPE_INSTANCE
2054   _OP_REQP = ["instance_name", "force"]
2055   REQ_BGL = False
2056
2057   def ExpandNames(self):
2058     self._ExpandAndLockInstance()
2059     self.needed_locks[locking.LEVEL_NODE] = []
2060     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2061
2062   def DeclareLocks(self, level):
2063     if level == locking.LEVEL_NODE:
2064       self._LockInstancesNodes()
2065
2066   def BuildHooksEnv(self):
2067     """Build hooks env.
2068
2069     This runs on master, primary and secondary nodes of the instance.
2070
2071     """
2072     env = {
2073       "FORCE": self.op.force,
2074       }
2075     env.update(_BuildInstanceHookEnvByObject(self.instance))
2076     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2077           list(self.instance.secondary_nodes))
2078     return env, nl, nl
2079
2080   def CheckPrereq(self):
2081     """Check prerequisites.
2082
2083     This checks that the instance is in the cluster.
2084
2085     """
2086     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2087     assert self.instance is not None, \
2088       "Cannot retrieve locked instance %s" % self.op.instance_name
2089
2090     # check bridges existance
2091     _CheckInstanceBridgesExist(instance)
2092
2093     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2094                          "starting instance %s" % instance.name,
2095                          instance.memory)
2096
2097   def Exec(self, feedback_fn):
2098     """Start the instance.
2099
2100     """
2101     instance = self.instance
2102     force = self.op.force
2103     extra_args = getattr(self.op, "extra_args", "")
2104
2105     self.cfg.MarkInstanceUp(instance.name)
2106
2107     node_current = instance.primary_node
2108
2109     _StartInstanceDisks(self.cfg, instance, force)
2110
2111     if not rpc.call_instance_start(node_current, instance, extra_args):
2112       _ShutdownInstanceDisks(instance, self.cfg)
2113       raise errors.OpExecError("Could not start instance")
2114
2115
2116 class LURebootInstance(LogicalUnit):
2117   """Reboot an instance.
2118
2119   """
2120   HPATH = "instance-reboot"
2121   HTYPE = constants.HTYPE_INSTANCE
2122   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2123   REQ_BGL = False
2124
2125   def ExpandNames(self):
2126     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2127                                    constants.INSTANCE_REBOOT_HARD,
2128                                    constants.INSTANCE_REBOOT_FULL]:
2129       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2130                                   (constants.INSTANCE_REBOOT_SOFT,
2131                                    constants.INSTANCE_REBOOT_HARD,
2132                                    constants.INSTANCE_REBOOT_FULL))
2133     self._ExpandAndLockInstance()
2134     self.needed_locks[locking.LEVEL_NODE] = []
2135     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2136
2137   def DeclareLocks(self, level):
2138     if level == locking.LEVEL_NODE:
2139       primary_only = not constants.INSTANCE_REBOOT_FULL
2140       self._LockInstancesNodes(primary_only=primary_only)
2141
2142   def BuildHooksEnv(self):
2143     """Build hooks env.
2144
2145     This runs on master, primary and secondary nodes of the instance.
2146
2147     """
2148     env = {
2149       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2150       }
2151     env.update(_BuildInstanceHookEnvByObject(self.instance))
2152     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2153           list(self.instance.secondary_nodes))
2154     return env, nl, nl
2155
2156   def CheckPrereq(self):
2157     """Check prerequisites.
2158
2159     This checks that the instance is in the cluster.
2160
2161     """
2162     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2163     assert self.instance is not None, \
2164       "Cannot retrieve locked instance %s" % self.op.instance_name
2165
2166     # check bridges existance
2167     _CheckInstanceBridgesExist(instance)
2168
2169   def Exec(self, feedback_fn):
2170     """Reboot the instance.
2171
2172     """
2173     instance = self.instance
2174     ignore_secondaries = self.op.ignore_secondaries
2175     reboot_type = self.op.reboot_type
2176     extra_args = getattr(self.op, "extra_args", "")
2177
2178     node_current = instance.primary_node
2179
2180     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2181                        constants.INSTANCE_REBOOT_HARD]:
2182       if not rpc.call_instance_reboot(node_current, instance,
2183                                       reboot_type, extra_args):
2184         raise errors.OpExecError("Could not reboot instance")
2185     else:
2186       if not rpc.call_instance_shutdown(node_current, instance):
2187         raise errors.OpExecError("could not shutdown instance for full reboot")
2188       _ShutdownInstanceDisks(instance, self.cfg)
2189       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2190       if not rpc.call_instance_start(node_current, instance, extra_args):
2191         _ShutdownInstanceDisks(instance, self.cfg)
2192         raise errors.OpExecError("Could not start instance for full reboot")
2193
2194     self.cfg.MarkInstanceUp(instance.name)
2195
2196
2197 class LUShutdownInstance(LogicalUnit):
2198   """Shutdown an instance.
2199
2200   """
2201   HPATH = "instance-stop"
2202   HTYPE = constants.HTYPE_INSTANCE
2203   _OP_REQP = ["instance_name"]
2204   REQ_BGL = False
2205
2206   def ExpandNames(self):
2207     self._ExpandAndLockInstance()
2208     self.needed_locks[locking.LEVEL_NODE] = []
2209     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2210
2211   def DeclareLocks(self, level):
2212     if level == locking.LEVEL_NODE:
2213       self._LockInstancesNodes()
2214
2215   def BuildHooksEnv(self):
2216     """Build hooks env.
2217
2218     This runs on master, primary and secondary nodes of the instance.
2219
2220     """
2221     env = _BuildInstanceHookEnvByObject(self.instance)
2222     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2223           list(self.instance.secondary_nodes))
2224     return env, nl, nl
2225
2226   def CheckPrereq(self):
2227     """Check prerequisites.
2228
2229     This checks that the instance is in the cluster.
2230
2231     """
2232     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2233     assert self.instance is not None, \
2234       "Cannot retrieve locked instance %s" % self.op.instance_name
2235
2236   def Exec(self, feedback_fn):
2237     """Shutdown the instance.
2238
2239     """
2240     instance = self.instance
2241     node_current = instance.primary_node
2242     self.cfg.MarkInstanceDown(instance.name)
2243     if not rpc.call_instance_shutdown(node_current, instance):
2244       logger.Error("could not shutdown instance")
2245
2246     _ShutdownInstanceDisks(instance, self.cfg)
2247
2248
2249 class LUReinstallInstance(LogicalUnit):
2250   """Reinstall an instance.
2251
2252   """
2253   HPATH = "instance-reinstall"
2254   HTYPE = constants.HTYPE_INSTANCE
2255   _OP_REQP = ["instance_name"]
2256   REQ_BGL = False
2257
2258   def ExpandNames(self):
2259     self._ExpandAndLockInstance()
2260     self.needed_locks[locking.LEVEL_NODE] = []
2261     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2262
2263   def DeclareLocks(self, level):
2264     if level == locking.LEVEL_NODE:
2265       self._LockInstancesNodes()
2266
2267   def BuildHooksEnv(self):
2268     """Build hooks env.
2269
2270     This runs on master, primary and secondary nodes of the instance.
2271
2272     """
2273     env = _BuildInstanceHookEnvByObject(self.instance)
2274     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2275           list(self.instance.secondary_nodes))
2276     return env, nl, nl
2277
2278   def CheckPrereq(self):
2279     """Check prerequisites.
2280
2281     This checks that the instance is in the cluster and is not running.
2282
2283     """
2284     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2285     assert instance is not None, \
2286       "Cannot retrieve locked instance %s" % self.op.instance_name
2287
2288     if instance.disk_template == constants.DT_DISKLESS:
2289       raise errors.OpPrereqError("Instance '%s' has no disks" %
2290                                  self.op.instance_name)
2291     if instance.status != "down":
2292       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2293                                  self.op.instance_name)
2294     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2295     if remote_info:
2296       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2297                                  (self.op.instance_name,
2298                                   instance.primary_node))
2299
2300     self.op.os_type = getattr(self.op, "os_type", None)
2301     if self.op.os_type is not None:
2302       # OS verification
2303       pnode = self.cfg.GetNodeInfo(
2304         self.cfg.ExpandNodeName(instance.primary_node))
2305       if pnode is None:
2306         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2307                                    self.op.pnode)
2308       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2309       if not os_obj:
2310         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2311                                    " primary node"  % self.op.os_type)
2312
2313     self.instance = instance
2314
2315   def Exec(self, feedback_fn):
2316     """Reinstall the instance.
2317
2318     """
2319     inst = self.instance
2320
2321     if self.op.os_type is not None:
2322       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2323       inst.os = self.op.os_type
2324       self.cfg.AddInstance(inst)
2325
2326     _StartInstanceDisks(self.cfg, inst, None)
2327     try:
2328       feedback_fn("Running the instance OS create scripts...")
2329       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2330         raise errors.OpExecError("Could not install OS for instance %s"
2331                                  " on node %s" %
2332                                  (inst.name, inst.primary_node))
2333     finally:
2334       _ShutdownInstanceDisks(inst, self.cfg)
2335
2336
2337 class LURenameInstance(LogicalUnit):
2338   """Rename an instance.
2339
2340   """
2341   HPATH = "instance-rename"
2342   HTYPE = constants.HTYPE_INSTANCE
2343   _OP_REQP = ["instance_name", "new_name"]
2344
2345   def BuildHooksEnv(self):
2346     """Build hooks env.
2347
2348     This runs on master, primary and secondary nodes of the instance.
2349
2350     """
2351     env = _BuildInstanceHookEnvByObject(self.instance)
2352     env["INSTANCE_NEW_NAME"] = self.op.new_name
2353     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2354           list(self.instance.secondary_nodes))
2355     return env, nl, nl
2356
2357   def CheckPrereq(self):
2358     """Check prerequisites.
2359
2360     This checks that the instance is in the cluster and is not running.
2361
2362     """
2363     instance = self.cfg.GetInstanceInfo(
2364       self.cfg.ExpandInstanceName(self.op.instance_name))
2365     if instance is None:
2366       raise errors.OpPrereqError("Instance '%s' not known" %
2367                                  self.op.instance_name)
2368     if instance.status != "down":
2369       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2370                                  self.op.instance_name)
2371     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2372     if remote_info:
2373       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2374                                  (self.op.instance_name,
2375                                   instance.primary_node))
2376     self.instance = instance
2377
2378     # new name verification
2379     name_info = utils.HostInfo(self.op.new_name)
2380
2381     self.op.new_name = new_name = name_info.name
2382     instance_list = self.cfg.GetInstanceList()
2383     if new_name in instance_list:
2384       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2385                                  new_name)
2386
2387     if not getattr(self.op, "ignore_ip", False):
2388       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2389         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2390                                    (name_info.ip, new_name))
2391
2392
2393   def Exec(self, feedback_fn):
2394     """Reinstall the instance.
2395
2396     """
2397     inst = self.instance
2398     old_name = inst.name
2399
2400     if inst.disk_template == constants.DT_FILE:
2401       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2402
2403     self.cfg.RenameInstance(inst.name, self.op.new_name)
2404     # Change the instance lock. This is definitely safe while we hold the BGL
2405     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2406     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2407
2408     # re-read the instance from the configuration after rename
2409     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2410
2411     if inst.disk_template == constants.DT_FILE:
2412       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2413       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2414                                                 old_file_storage_dir,
2415                                                 new_file_storage_dir)
2416
2417       if not result:
2418         raise errors.OpExecError("Could not connect to node '%s' to rename"
2419                                  " directory '%s' to '%s' (but the instance"
2420                                  " has been renamed in Ganeti)" % (
2421                                  inst.primary_node, old_file_storage_dir,
2422                                  new_file_storage_dir))
2423
2424       if not result[0]:
2425         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2426                                  " (but the instance has been renamed in"
2427                                  " Ganeti)" % (old_file_storage_dir,
2428                                                new_file_storage_dir))
2429
2430     _StartInstanceDisks(self.cfg, inst, None)
2431     try:
2432       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2433                                           "sda", "sdb"):
2434         msg = ("Could not run OS rename script for instance %s on node %s"
2435                " (but the instance has been renamed in Ganeti)" %
2436                (inst.name, inst.primary_node))
2437         logger.Error(msg)
2438     finally:
2439       _ShutdownInstanceDisks(inst, self.cfg)
2440
2441
2442 class LURemoveInstance(LogicalUnit):
2443   """Remove an instance.
2444
2445   """
2446   HPATH = "instance-remove"
2447   HTYPE = constants.HTYPE_INSTANCE
2448   _OP_REQP = ["instance_name", "ignore_failures"]
2449
2450   def BuildHooksEnv(self):
2451     """Build hooks env.
2452
2453     This runs on master, primary and secondary nodes of the instance.
2454
2455     """
2456     env = _BuildInstanceHookEnvByObject(self.instance)
2457     nl = [self.sstore.GetMasterNode()]
2458     return env, nl, nl
2459
2460   def CheckPrereq(self):
2461     """Check prerequisites.
2462
2463     This checks that the instance is in the cluster.
2464
2465     """
2466     instance = self.cfg.GetInstanceInfo(
2467       self.cfg.ExpandInstanceName(self.op.instance_name))
2468     if instance is None:
2469       raise errors.OpPrereqError("Instance '%s' not known" %
2470                                  self.op.instance_name)
2471     self.instance = instance
2472
2473   def Exec(self, feedback_fn):
2474     """Remove the instance.
2475
2476     """
2477     instance = self.instance
2478     logger.Info("shutting down instance %s on node %s" %
2479                 (instance.name, instance.primary_node))
2480
2481     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2482       if self.op.ignore_failures:
2483         feedback_fn("Warning: can't shutdown instance")
2484       else:
2485         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2486                                  (instance.name, instance.primary_node))
2487
2488     logger.Info("removing block devices for instance %s" % instance.name)
2489
2490     if not _RemoveDisks(instance, self.cfg):
2491       if self.op.ignore_failures:
2492         feedback_fn("Warning: can't remove instance's disks")
2493       else:
2494         raise errors.OpExecError("Can't remove instance's disks")
2495
2496     logger.Info("removing instance %s out of cluster config" % instance.name)
2497
2498     self.cfg.RemoveInstance(instance.name)
2499     # Remove the new instance from the Ganeti Lock Manager
2500     self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2501
2502
2503 class LUQueryInstances(NoHooksLU):
2504   """Logical unit for querying instances.
2505
2506   """
2507   _OP_REQP = ["output_fields", "names"]
2508   REQ_BGL = False
2509
2510   def ExpandNames(self):
2511     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2512     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2513                                "admin_state", "admin_ram",
2514                                "disk_template", "ip", "mac", "bridge",
2515                                "sda_size", "sdb_size", "vcpus", "tags",
2516                                "auto_balance",
2517                                "network_port", "kernel_path", "initrd_path",
2518                                "hvm_boot_order", "hvm_acpi", "hvm_pae",
2519                                "hvm_cdrom_image_path", "hvm_nic_type",
2520                                "hvm_disk_type", "vnc_bind_address"],
2521                        dynamic=self.dynamic_fields,
2522                        selected=self.op.output_fields)
2523
2524     self.needed_locks = {}
2525     self.share_locks[locking.LEVEL_INSTANCE] = 1
2526     self.share_locks[locking.LEVEL_NODE] = 1
2527
2528     # TODO: we could lock instances (and nodes) only if the user asked for
2529     # dynamic fields. For that we need atomic ways to get info for a group of
2530     # instances from the config, though.
2531     if not self.op.names:
2532       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
2533     else:
2534       self.needed_locks[locking.LEVEL_INSTANCE] = \
2535         _GetWantedInstances(self, self.op.names)
2536
2537     self.needed_locks[locking.LEVEL_NODE] = []
2538     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2539
2540   def DeclareLocks(self, level):
2541     # TODO: locking of nodes could be avoided when not querying them
2542     if level == locking.LEVEL_NODE:
2543       self._LockInstancesNodes()
2544
2545   def CheckPrereq(self):
2546     """Check prerequisites.
2547
2548     """
2549     # This of course is valid only if we locked the instances
2550     self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE]
2551
2552   def Exec(self, feedback_fn):
2553     """Computes the list of nodes and their attributes.
2554
2555     """
2556     instance_names = self.wanted
2557     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2558                      in instance_names]
2559
2560     # begin data gathering
2561
2562     nodes = frozenset([inst.primary_node for inst in instance_list])
2563
2564     bad_nodes = []
2565     if self.dynamic_fields.intersection(self.op.output_fields):
2566       live_data = {}
2567       node_data = rpc.call_all_instances_info(nodes)
2568       for name in nodes:
2569         result = node_data[name]
2570         if result:
2571           live_data.update(result)
2572         elif result == False:
2573           bad_nodes.append(name)
2574         # else no instance is alive
2575     else:
2576       live_data = dict([(name, {}) for name in instance_names])
2577
2578     # end data gathering
2579
2580     output = []
2581     for instance in instance_list:
2582       iout = []
2583       for field in self.op.output_fields:
2584         if field == "name":
2585           val = instance.name
2586         elif field == "os":
2587           val = instance.os
2588         elif field == "pnode":
2589           val = instance.primary_node
2590         elif field == "snodes":
2591           val = list(instance.secondary_nodes)
2592         elif field == "admin_state":
2593           val = (instance.status != "down")
2594         elif field == "oper_state":
2595           if instance.primary_node in bad_nodes:
2596             val = None
2597           else:
2598             val = bool(live_data.get(instance.name))
2599         elif field == "status":
2600           if instance.primary_node in bad_nodes:
2601             val = "ERROR_nodedown"
2602           else:
2603             running = bool(live_data.get(instance.name))
2604             if running:
2605               if instance.status != "down":
2606                 val = "running"
2607               else:
2608                 val = "ERROR_up"
2609             else:
2610               if instance.status != "down":
2611                 val = "ERROR_down"
2612               else:
2613                 val = "ADMIN_down"
2614         elif field == "admin_ram":
2615           val = instance.memory
2616         elif field == "oper_ram":
2617           if instance.primary_node in bad_nodes:
2618             val = None
2619           elif instance.name in live_data:
2620             val = live_data[instance.name].get("memory", "?")
2621           else:
2622             val = "-"
2623         elif field == "disk_template":
2624           val = instance.disk_template
2625         elif field == "ip":
2626           val = instance.nics[0].ip
2627         elif field == "bridge":
2628           val = instance.nics[0].bridge
2629         elif field == "mac":
2630           val = instance.nics[0].mac
2631         elif field == "sda_size" or field == "sdb_size":
2632           disk = instance.FindDisk(field[:3])
2633           if disk is None:
2634             val = None
2635           else:
2636             val = disk.size
2637         elif field == "vcpus":
2638           val = instance.vcpus
2639         elif field == "tags":
2640           val = list(instance.GetTags())
2641         elif field in ("network_port", "kernel_path", "initrd_path",
2642                        "hvm_boot_order", "hvm_acpi", "hvm_pae",
2643                        "hvm_cdrom_image_path", "hvm_nic_type",
2644                        "hvm_disk_type", "vnc_bind_address"):
2645           val = getattr(instance, field, None)
2646           if val is not None:
2647             pass
2648           elif field in ("hvm_nic_type", "hvm_disk_type",
2649                          "kernel_path", "initrd_path"):
2650             val = "default"
2651           else:
2652             val = "-"
2653         else:
2654           raise errors.ParameterError(field)
2655         iout.append(val)
2656       output.append(iout)
2657
2658     return output
2659
2660
2661 class LUFailoverInstance(LogicalUnit):
2662   """Failover an instance.
2663
2664   """
2665   HPATH = "instance-failover"
2666   HTYPE = constants.HTYPE_INSTANCE
2667   _OP_REQP = ["instance_name", "ignore_consistency"]
2668   REQ_BGL = False
2669
2670   def ExpandNames(self):
2671     self._ExpandAndLockInstance()
2672     self.needed_locks[locking.LEVEL_NODE] = []
2673     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2674
2675   def DeclareLocks(self, level):
2676     if level == locking.LEVEL_NODE:
2677       self._LockInstancesNodes()
2678
2679   def BuildHooksEnv(self):
2680     """Build hooks env.
2681
2682     This runs on master, primary and secondary nodes of the instance.
2683
2684     """
2685     env = {
2686       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2687       }
2688     env.update(_BuildInstanceHookEnvByObject(self.instance))
2689     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2690     return env, nl, nl
2691
2692   def CheckPrereq(self):
2693     """Check prerequisites.
2694
2695     This checks that the instance is in the cluster.
2696
2697     """
2698     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2699     assert self.instance is not None, \
2700       "Cannot retrieve locked instance %s" % self.op.instance_name
2701
2702     if instance.disk_template not in constants.DTS_NET_MIRROR:
2703       raise errors.OpPrereqError("Instance's disk layout is not"
2704                                  " network mirrored, cannot failover.")
2705
2706     secondary_nodes = instance.secondary_nodes
2707     if not secondary_nodes:
2708       raise errors.ProgrammerError("no secondary node but using "
2709                                    "a mirrored disk template")
2710
2711     target_node = secondary_nodes[0]
2712     # check memory requirements on the secondary node
2713     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2714                          instance.name, instance.memory)
2715
2716     # check bridge existance
2717     brlist = [nic.bridge for nic in instance.nics]
2718     if not rpc.call_bridges_exist(target_node, brlist):
2719       raise errors.OpPrereqError("One or more target bridges %s does not"
2720                                  " exist on destination node '%s'" %
2721                                  (brlist, target_node))
2722
2723   def Exec(self, feedback_fn):
2724     """Failover an instance.
2725
2726     The failover is done by shutting it down on its present node and
2727     starting it on the secondary.
2728
2729     """
2730     instance = self.instance
2731
2732     source_node = instance.primary_node
2733     target_node = instance.secondary_nodes[0]
2734
2735     feedback_fn("* checking disk consistency between source and target")
2736     for dev in instance.disks:
2737       # for drbd, these are drbd over lvm
2738       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2739         if instance.status == "up" and not self.op.ignore_consistency:
2740           raise errors.OpExecError("Disk %s is degraded on target node,"
2741                                    " aborting failover." % dev.iv_name)
2742
2743     feedback_fn("* shutting down instance on source node")
2744     logger.Info("Shutting down instance %s on node %s" %
2745                 (instance.name, source_node))
2746
2747     if not rpc.call_instance_shutdown(source_node, instance):
2748       if self.op.ignore_consistency:
2749         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2750                      " anyway. Please make sure node %s is down"  %
2751                      (instance.name, source_node, source_node))
2752       else:
2753         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2754                                  (instance.name, source_node))
2755
2756     feedback_fn("* deactivating the instance's disks on source node")
2757     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2758       raise errors.OpExecError("Can't shut down the instance's disks.")
2759
2760     instance.primary_node = target_node
2761     # distribute new instance config to the other nodes
2762     self.cfg.Update(instance)
2763
2764     # Only start the instance if it's marked as up
2765     if instance.status == "up":
2766       feedback_fn("* activating the instance's disks on target node")
2767       logger.Info("Starting instance %s on node %s" %
2768                   (instance.name, target_node))
2769
2770       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2771                                                ignore_secondaries=True)
2772       if not disks_ok:
2773         _ShutdownInstanceDisks(instance, self.cfg)
2774         raise errors.OpExecError("Can't activate the instance's disks")
2775
2776       feedback_fn("* starting the instance on the target node")
2777       if not rpc.call_instance_start(target_node, instance, None):
2778         _ShutdownInstanceDisks(instance, self.cfg)
2779         raise errors.OpExecError("Could not start instance %s on node %s." %
2780                                  (instance.name, target_node))
2781
2782
2783 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2784   """Create a tree of block devices on the primary node.
2785
2786   This always creates all devices.
2787
2788   """
2789   if device.children:
2790     for child in device.children:
2791       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2792         return False
2793
2794   cfg.SetDiskID(device, node)
2795   new_id = rpc.call_blockdev_create(node, device, device.size,
2796                                     instance.name, True, info)
2797   if not new_id:
2798     return False
2799   if device.physical_id is None:
2800     device.physical_id = new_id
2801   return True
2802
2803
2804 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2805   """Create a tree of block devices on a secondary node.
2806
2807   If this device type has to be created on secondaries, create it and
2808   all its children.
2809
2810   If not, just recurse to children keeping the same 'force' value.
2811
2812   """
2813   if device.CreateOnSecondary():
2814     force = True
2815   if device.children:
2816     for child in device.children:
2817       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2818                                         child, force, info):
2819         return False
2820
2821   if not force:
2822     return True
2823   cfg.SetDiskID(device, node)
2824   new_id = rpc.call_blockdev_create(node, device, device.size,
2825                                     instance.name, False, info)
2826   if not new_id:
2827     return False
2828   if device.physical_id is None:
2829     device.physical_id = new_id
2830   return True
2831
2832
2833 def _GenerateUniqueNames(cfg, exts):
2834   """Generate a suitable LV name.
2835
2836   This will generate a logical volume name for the given instance.
2837
2838   """
2839   results = []
2840   for val in exts:
2841     new_id = cfg.GenerateUniqueID()
2842     results.append("%s%s" % (new_id, val))
2843   return results
2844
2845
2846 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2847   """Generate a drbd8 device complete with its children.
2848
2849   """
2850   port = cfg.AllocatePort()
2851   vgname = cfg.GetVGName()
2852   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2853                           logical_id=(vgname, names[0]))
2854   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2855                           logical_id=(vgname, names[1]))
2856   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2857                           logical_id = (primary, secondary, port),
2858                           children = [dev_data, dev_meta],
2859                           iv_name=iv_name)
2860   return drbd_dev
2861
2862
2863 def _GenerateDiskTemplate(cfg, template_name,
2864                           instance_name, primary_node,
2865                           secondary_nodes, disk_sz, swap_sz,
2866                           file_storage_dir, file_driver):
2867   """Generate the entire disk layout for a given template type.
2868
2869   """
2870   #TODO: compute space requirements
2871
2872   vgname = cfg.GetVGName()
2873   if template_name == constants.DT_DISKLESS:
2874     disks = []
2875   elif template_name == constants.DT_PLAIN:
2876     if len(secondary_nodes) != 0:
2877       raise errors.ProgrammerError("Wrong template configuration")
2878
2879     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2880     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2881                            logical_id=(vgname, names[0]),
2882                            iv_name = "sda")
2883     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2884                            logical_id=(vgname, names[1]),
2885                            iv_name = "sdb")
2886     disks = [sda_dev, sdb_dev]
2887   elif template_name == constants.DT_DRBD8:
2888     if len(secondary_nodes) != 1:
2889       raise errors.ProgrammerError("Wrong template configuration")
2890     remote_node = secondary_nodes[0]
2891     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2892                                        ".sdb_data", ".sdb_meta"])
2893     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2894                                          disk_sz, names[0:2], "sda")
2895     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2896                                          swap_sz, names[2:4], "sdb")
2897     disks = [drbd_sda_dev, drbd_sdb_dev]
2898   elif template_name == constants.DT_FILE:
2899     if len(secondary_nodes) != 0:
2900       raise errors.ProgrammerError("Wrong template configuration")
2901
2902     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2903                                 iv_name="sda", logical_id=(file_driver,
2904                                 "%s/sda" % file_storage_dir))
2905     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2906                                 iv_name="sdb", logical_id=(file_driver,
2907                                 "%s/sdb" % file_storage_dir))
2908     disks = [file_sda_dev, file_sdb_dev]
2909   else:
2910     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2911   return disks
2912
2913
2914 def _GetInstanceInfoText(instance):
2915   """Compute that text that should be added to the disk's metadata.
2916
2917   """
2918   return "originstname+%s" % instance.name
2919
2920
2921 def _CreateDisks(cfg, instance):
2922   """Create all disks for an instance.
2923
2924   This abstracts away some work from AddInstance.
2925
2926   Args:
2927     instance: the instance object
2928
2929   Returns:
2930     True or False showing the success of the creation process
2931
2932   """
2933   info = _GetInstanceInfoText(instance)
2934
2935   if instance.disk_template == constants.DT_FILE:
2936     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2937     result = rpc.call_file_storage_dir_create(instance.primary_node,
2938                                               file_storage_dir)
2939
2940     if not result:
2941       logger.Error("Could not connect to node '%s'" % instance.primary_node)
2942       return False
2943
2944     if not result[0]:
2945       logger.Error("failed to create directory '%s'" % file_storage_dir)
2946       return False
2947
2948   for device in instance.disks:
2949     logger.Info("creating volume %s for instance %s" %
2950                 (device.iv_name, instance.name))
2951     #HARDCODE
2952     for secondary_node in instance.secondary_nodes:
2953       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2954                                         device, False, info):
2955         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2956                      (device.iv_name, device, secondary_node))
2957         return False
2958     #HARDCODE
2959     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2960                                     instance, device, info):
2961       logger.Error("failed to create volume %s on primary!" %
2962                    device.iv_name)
2963       return False
2964
2965   return True
2966
2967
2968 def _RemoveDisks(instance, cfg):
2969   """Remove all disks for an instance.
2970
2971   This abstracts away some work from `AddInstance()` and
2972   `RemoveInstance()`. Note that in case some of the devices couldn't
2973   be removed, the removal will continue with the other ones (compare
2974   with `_CreateDisks()`).
2975
2976   Args:
2977     instance: the instance object
2978
2979   Returns:
2980     True or False showing the success of the removal proces
2981
2982   """
2983   logger.Info("removing block devices for instance %s" % instance.name)
2984
2985   result = True
2986   for device in instance.disks:
2987     for node, disk in device.ComputeNodeTree(instance.primary_node):
2988       cfg.SetDiskID(disk, node)
2989       if not rpc.call_blockdev_remove(node, disk):
2990         logger.Error("could not remove block device %s on node %s,"
2991                      " continuing anyway" %
2992                      (device.iv_name, node))
2993         result = False
2994
2995   if instance.disk_template == constants.DT_FILE:
2996     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2997     if not rpc.call_file_storage_dir_remove(instance.primary_node,
2998                                             file_storage_dir):
2999       logger.Error("could not remove directory '%s'" % file_storage_dir)
3000       result = False
3001
3002   return result
3003
3004
3005 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3006   """Compute disk size requirements in the volume group
3007
3008   This is currently hard-coded for the two-drive layout.
3009
3010   """
3011   # Required free disk space as a function of disk and swap space
3012   req_size_dict = {
3013     constants.DT_DISKLESS: None,
3014     constants.DT_PLAIN: disk_size + swap_size,
3015     # 256 MB are added for drbd metadata, 128MB for each drbd device
3016     constants.DT_DRBD8: disk_size + swap_size + 256,
3017     constants.DT_FILE: None,
3018   }
3019
3020   if disk_template not in req_size_dict:
3021     raise errors.ProgrammerError("Disk template '%s' size requirement"
3022                                  " is unknown" %  disk_template)
3023
3024   return req_size_dict[disk_template]
3025
3026
3027 class LUCreateInstance(LogicalUnit):
3028   """Create an instance.
3029
3030   """
3031   HPATH = "instance-add"
3032   HTYPE = constants.HTYPE_INSTANCE
3033   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3034               "disk_template", "swap_size", "mode", "start", "vcpus",
3035               "wait_for_sync", "ip_check", "mac"]
3036
3037   def _RunAllocator(self):
3038     """Run the allocator based on input opcode.
3039
3040     """
3041     disks = [{"size": self.op.disk_size, "mode": "w"},
3042              {"size": self.op.swap_size, "mode": "w"}]
3043     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3044              "bridge": self.op.bridge}]
3045     ial = IAllocator(self.cfg, self.sstore,
3046                      mode=constants.IALLOCATOR_MODE_ALLOC,
3047                      name=self.op.instance_name,
3048                      disk_template=self.op.disk_template,
3049                      tags=[],
3050                      os=self.op.os_type,
3051                      vcpus=self.op.vcpus,
3052                      mem_size=self.op.mem_size,
3053                      disks=disks,
3054                      nics=nics,
3055                      )
3056
3057     ial.Run(self.op.iallocator)
3058
3059     if not ial.success:
3060       raise errors.OpPrereqError("Can't compute nodes using"
3061                                  " iallocator '%s': %s" % (self.op.iallocator,
3062                                                            ial.info))
3063     if len(ial.nodes) != ial.required_nodes:
3064       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3065                                  " of nodes (%s), required %s" %
3066                                  (len(ial.nodes), ial.required_nodes))
3067     self.op.pnode = ial.nodes[0]
3068     logger.ToStdout("Selected nodes for the instance: %s" %
3069                     (", ".join(ial.nodes),))
3070     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3071                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3072     if ial.required_nodes == 2:
3073       self.op.snode = ial.nodes[1]
3074
3075   def BuildHooksEnv(self):
3076     """Build hooks env.
3077
3078     This runs on master, primary and secondary nodes of the instance.
3079
3080     """
3081     env = {
3082       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3083       "INSTANCE_DISK_SIZE": self.op.disk_size,
3084       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3085       "INSTANCE_ADD_MODE": self.op.mode,
3086       }
3087     if self.op.mode == constants.INSTANCE_IMPORT:
3088       env["INSTANCE_SRC_NODE"] = self.op.src_node
3089       env["INSTANCE_SRC_PATH"] = self.op.src_path
3090       env["INSTANCE_SRC_IMAGE"] = self.src_image
3091
3092     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3093       primary_node=self.op.pnode,
3094       secondary_nodes=self.secondaries,
3095       status=self.instance_status,
3096       os_type=self.op.os_type,
3097       memory=self.op.mem_size,
3098       vcpus=self.op.vcpus,
3099       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3100     ))
3101
3102     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3103           self.secondaries)
3104     return env, nl, nl
3105
3106
3107   def CheckPrereq(self):
3108     """Check prerequisites.
3109
3110     """
3111     # set optional parameters to none if they don't exist
3112     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3113                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3114                  "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3115       if not hasattr(self.op, attr):
3116         setattr(self.op, attr, None)
3117
3118     if self.op.mode not in (constants.INSTANCE_CREATE,
3119                             constants.INSTANCE_IMPORT):
3120       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3121                                  self.op.mode)
3122
3123     if (not self.cfg.GetVGName() and
3124         self.op.disk_template not in constants.DTS_NOT_LVM):
3125       raise errors.OpPrereqError("Cluster does not support lvm-based"
3126                                  " instances")
3127
3128     if self.op.mode == constants.INSTANCE_IMPORT:
3129       src_node = getattr(self.op, "src_node", None)
3130       src_path = getattr(self.op, "src_path", None)
3131       if src_node is None or src_path is None:
3132         raise errors.OpPrereqError("Importing an instance requires source"
3133                                    " node and path options")
3134       src_node_full = self.cfg.ExpandNodeName(src_node)
3135       if src_node_full is None:
3136         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3137       self.op.src_node = src_node = src_node_full
3138
3139       if not os.path.isabs(src_path):
3140         raise errors.OpPrereqError("The source path must be absolute")
3141
3142       export_info = rpc.call_export_info(src_node, src_path)
3143
3144       if not export_info:
3145         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3146
3147       if not export_info.has_section(constants.INISECT_EXP):
3148         raise errors.ProgrammerError("Corrupted export config")
3149
3150       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3151       if (int(ei_version) != constants.EXPORT_VERSION):
3152         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3153                                    (ei_version, constants.EXPORT_VERSION))
3154
3155       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3156         raise errors.OpPrereqError("Can't import instance with more than"
3157                                    " one data disk")
3158
3159       # FIXME: are the old os-es, disk sizes, etc. useful?
3160       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3161       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3162                                                          'disk0_dump'))
3163       self.src_image = diskimage
3164     else: # INSTANCE_CREATE
3165       if getattr(self.op, "os_type", None) is None:
3166         raise errors.OpPrereqError("No guest OS specified")
3167
3168     #### instance parameters check
3169
3170     # disk template and mirror node verification
3171     if self.op.disk_template not in constants.DISK_TEMPLATES:
3172       raise errors.OpPrereqError("Invalid disk template name")
3173
3174     # instance name verification
3175     hostname1 = utils.HostInfo(self.op.instance_name)
3176
3177     self.op.instance_name = instance_name = hostname1.name
3178     instance_list = self.cfg.GetInstanceList()
3179     if instance_name in instance_list:
3180       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3181                                  instance_name)
3182
3183     # ip validity checks
3184     ip = getattr(self.op, "ip", None)
3185     if ip is None or ip.lower() == "none":
3186       inst_ip = None
3187     elif ip.lower() == "auto":
3188       inst_ip = hostname1.ip
3189     else:
3190       if not utils.IsValidIP(ip):
3191         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3192                                    " like a valid IP" % ip)
3193       inst_ip = ip
3194     self.inst_ip = self.op.ip = inst_ip
3195
3196     if self.op.start and not self.op.ip_check:
3197       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3198                                  " adding an instance in start mode")
3199
3200     if self.op.ip_check:
3201       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3202         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3203                                    (hostname1.ip, instance_name))
3204
3205     # MAC address verification
3206     if self.op.mac != "auto":
3207       if not utils.IsValidMac(self.op.mac.lower()):
3208         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3209                                    self.op.mac)
3210
3211     # bridge verification
3212     bridge = getattr(self.op, "bridge", None)
3213     if bridge is None:
3214       self.op.bridge = self.cfg.GetDefBridge()
3215     else:
3216       self.op.bridge = bridge
3217
3218     # boot order verification
3219     if self.op.hvm_boot_order is not None:
3220       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3221         raise errors.OpPrereqError("invalid boot order specified,"
3222                                    " must be one or more of [acdn]")
3223     # file storage checks
3224     if (self.op.file_driver and
3225         not self.op.file_driver in constants.FILE_DRIVER):
3226       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3227                                  self.op.file_driver)
3228
3229     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3230       raise errors.OpPrereqError("File storage directory not a relative"
3231                                  " path")
3232     #### allocator run
3233
3234     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3235       raise errors.OpPrereqError("One and only one of iallocator and primary"
3236                                  " node must be given")
3237
3238     if self.op.iallocator is not None:
3239       self._RunAllocator()
3240
3241     #### node related checks
3242
3243     # check primary node
3244     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3245     if pnode is None:
3246       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3247                                  self.op.pnode)
3248     self.op.pnode = pnode.name
3249     self.pnode = pnode
3250     self.secondaries = []
3251
3252     # mirror node verification
3253     if self.op.disk_template in constants.DTS_NET_MIRROR:
3254       if getattr(self.op, "snode", None) is None:
3255         raise errors.OpPrereqError("The networked disk templates need"
3256                                    " a mirror node")
3257
3258       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3259       if snode_name is None:
3260         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3261                                    self.op.snode)
3262       elif snode_name == pnode.name:
3263         raise errors.OpPrereqError("The secondary node cannot be"
3264                                    " the primary node.")
3265       self.secondaries.append(snode_name)
3266
3267     req_size = _ComputeDiskSize(self.op.disk_template,
3268                                 self.op.disk_size, self.op.swap_size)
3269
3270     # Check lv size requirements
3271     if req_size is not None:
3272       nodenames = [pnode.name] + self.secondaries
3273       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3274       for node in nodenames:
3275         info = nodeinfo.get(node, None)
3276         if not info:
3277           raise errors.OpPrereqError("Cannot get current information"
3278                                      " from node '%s'" % node)
3279         vg_free = info.get('vg_free', None)
3280         if not isinstance(vg_free, int):
3281           raise errors.OpPrereqError("Can't compute free disk space on"
3282                                      " node %s" % node)
3283         if req_size > info['vg_free']:
3284           raise errors.OpPrereqError("Not enough disk space on target node %s."
3285                                      " %d MB available, %d MB required" %
3286                                      (node, info['vg_free'], req_size))
3287
3288     # os verification
3289     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3290     if not os_obj:
3291       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3292                                  " primary node"  % self.op.os_type)
3293
3294     if self.op.kernel_path == constants.VALUE_NONE:
3295       raise errors.OpPrereqError("Can't set instance kernel to none")
3296
3297
3298     # bridge check on primary node
3299     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3300       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3301                                  " destination node '%s'" %
3302                                  (self.op.bridge, pnode.name))
3303
3304     # memory check on primary node
3305     if self.op.start:
3306       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3307                            "creating instance %s" % self.op.instance_name,
3308                            self.op.mem_size)
3309
3310     # hvm_cdrom_image_path verification
3311     if self.op.hvm_cdrom_image_path is not None:
3312       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3313         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3314                                    " be an absolute path or None, not %s" %
3315                                    self.op.hvm_cdrom_image_path)
3316       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3317         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3318                                    " regular file or a symlink pointing to"
3319                                    " an existing regular file, not %s" %
3320                                    self.op.hvm_cdrom_image_path)
3321
3322     # vnc_bind_address verification
3323     if self.op.vnc_bind_address is not None:
3324       if not utils.IsValidIP(self.op.vnc_bind_address):
3325         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3326                                    " like a valid IP address" %
3327                                    self.op.vnc_bind_address)
3328
3329     # Xen HVM device type checks
3330     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3331       if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3332         raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3333                                    " hypervisor" % self.op.hvm_nic_type)
3334       if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3335         raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3336                                    " hypervisor" % self.op.hvm_disk_type)
3337
3338     if self.op.start:
3339       self.instance_status = 'up'
3340     else:
3341       self.instance_status = 'down'
3342
3343   def Exec(self, feedback_fn):
3344     """Create and add the instance to the cluster.
3345
3346     """
3347     instance = self.op.instance_name
3348     pnode_name = self.pnode.name
3349
3350     if self.op.mac == "auto":
3351       mac_address = self.cfg.GenerateMAC()
3352     else:
3353       mac_address = self.op.mac
3354
3355     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3356     if self.inst_ip is not None:
3357       nic.ip = self.inst_ip
3358
3359     ht_kind = self.sstore.GetHypervisorType()
3360     if ht_kind in constants.HTS_REQ_PORT:
3361       network_port = self.cfg.AllocatePort()
3362     else:
3363       network_port = None
3364
3365     if self.op.vnc_bind_address is None:
3366       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3367
3368     # this is needed because os.path.join does not accept None arguments
3369     if self.op.file_storage_dir is None:
3370       string_file_storage_dir = ""
3371     else:
3372       string_file_storage_dir = self.op.file_storage_dir
3373
3374     # build the full file storage dir path
3375     file_storage_dir = os.path.normpath(os.path.join(
3376                                         self.sstore.GetFileStorageDir(),
3377                                         string_file_storage_dir, instance))
3378
3379
3380     disks = _GenerateDiskTemplate(self.cfg,
3381                                   self.op.disk_template,
3382                                   instance, pnode_name,
3383                                   self.secondaries, self.op.disk_size,
3384                                   self.op.swap_size,
3385                                   file_storage_dir,
3386                                   self.op.file_driver)
3387
3388     iobj = objects.Instance(name=instance, os=self.op.os_type,
3389                             primary_node=pnode_name,
3390                             memory=self.op.mem_size,
3391                             vcpus=self.op.vcpus,
3392                             nics=[nic], disks=disks,
3393                             disk_template=self.op.disk_template,
3394                             status=self.instance_status,
3395                             network_port=network_port,
3396                             kernel_path=self.op.kernel_path,
3397                             initrd_path=self.op.initrd_path,
3398                             hvm_boot_order=self.op.hvm_boot_order,
3399                             hvm_acpi=self.op.hvm_acpi,
3400                             hvm_pae=self.op.hvm_pae,
3401                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3402                             vnc_bind_address=self.op.vnc_bind_address,
3403                             hvm_nic_type=self.op.hvm_nic_type,
3404                             hvm_disk_type=self.op.hvm_disk_type,
3405                             )
3406
3407     feedback_fn("* creating instance disks...")
3408     if not _CreateDisks(self.cfg, iobj):
3409       _RemoveDisks(iobj, self.cfg)
3410       raise errors.OpExecError("Device creation failed, reverting...")
3411
3412     feedback_fn("adding instance %s to cluster config" % instance)
3413
3414     self.cfg.AddInstance(iobj)
3415     # Add the new instance to the Ganeti Lock Manager
3416     self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3417
3418     if self.op.wait_for_sync:
3419       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3420     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3421       # make sure the disks are not degraded (still sync-ing is ok)
3422       time.sleep(15)
3423       feedback_fn("* checking mirrors status")
3424       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3425     else:
3426       disk_abort = False
3427
3428     if disk_abort:
3429       _RemoveDisks(iobj, self.cfg)
3430       self.cfg.RemoveInstance(iobj.name)
3431       # Remove the new instance from the Ganeti Lock Manager
3432       self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3433       raise errors.OpExecError("There are some degraded disks for"
3434                                " this instance")
3435
3436     feedback_fn("creating os for instance %s on node %s" %
3437                 (instance, pnode_name))
3438
3439     if iobj.disk_template != constants.DT_DISKLESS:
3440       if self.op.mode == constants.INSTANCE_CREATE:
3441         feedback_fn("* running the instance OS create scripts...")
3442         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3443           raise errors.OpExecError("could not add os for instance %s"
3444                                    " on node %s" %
3445                                    (instance, pnode_name))
3446
3447       elif self.op.mode == constants.INSTANCE_IMPORT:
3448         feedback_fn("* running the instance OS import scripts...")
3449         src_node = self.op.src_node
3450         src_image = self.src_image
3451         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3452                                                 src_node, src_image):
3453           raise errors.OpExecError("Could not import os for instance"
3454                                    " %s on node %s" %
3455                                    (instance, pnode_name))
3456       else:
3457         # also checked in the prereq part
3458         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3459                                      % self.op.mode)
3460
3461     if self.op.start:
3462       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3463       feedback_fn("* starting instance...")
3464       if not rpc.call_instance_start(pnode_name, iobj, None):
3465         raise errors.OpExecError("Could not start instance")
3466
3467
3468 class LUConnectConsole(NoHooksLU):
3469   """Connect to an instance's console.
3470
3471   This is somewhat special in that it returns the command line that
3472   you need to run on the master node in order to connect to the
3473   console.
3474
3475   """
3476   _OP_REQP = ["instance_name"]
3477   REQ_BGL = False
3478
3479   def ExpandNames(self):
3480     self._ExpandAndLockInstance()
3481
3482   def CheckPrereq(self):
3483     """Check prerequisites.
3484
3485     This checks that the instance is in the cluster.
3486
3487     """
3488     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3489     assert self.instance is not None, \
3490       "Cannot retrieve locked instance %s" % self.op.instance_name
3491
3492   def Exec(self, feedback_fn):
3493     """Connect to the console of an instance
3494
3495     """
3496     instance = self.instance
3497     node = instance.primary_node
3498
3499     node_insts = rpc.call_instance_list([node])[node]
3500     if node_insts is False:
3501       raise errors.OpExecError("Can't connect to node %s." % node)
3502
3503     if instance.name not in node_insts:
3504       raise errors.OpExecError("Instance %s is not running." % instance.name)
3505
3506     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3507
3508     hyper = hypervisor.GetHypervisor()
3509     console_cmd = hyper.GetShellCommandForConsole(instance)
3510
3511     # build ssh cmdline
3512     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3513
3514
3515 class LUReplaceDisks(LogicalUnit):
3516   """Replace the disks of an instance.
3517
3518   """
3519   HPATH = "mirrors-replace"
3520   HTYPE = constants.HTYPE_INSTANCE
3521   _OP_REQP = ["instance_name", "mode", "disks"]
3522
3523   def _RunAllocator(self):
3524     """Compute a new secondary node using an IAllocator.
3525
3526     """
3527     ial = IAllocator(self.cfg, self.sstore,
3528                      mode=constants.IALLOCATOR_MODE_RELOC,
3529                      name=self.op.instance_name,
3530                      relocate_from=[self.sec_node])
3531
3532     ial.Run(self.op.iallocator)
3533
3534     if not ial.success:
3535       raise errors.OpPrereqError("Can't compute nodes using"
3536                                  " iallocator '%s': %s" % (self.op.iallocator,
3537                                                            ial.info))
3538     if len(ial.nodes) != ial.required_nodes:
3539       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3540                                  " of nodes (%s), required %s" %
3541                                  (len(ial.nodes), ial.required_nodes))
3542     self.op.remote_node = ial.nodes[0]
3543     logger.ToStdout("Selected new secondary for the instance: %s" %
3544                     self.op.remote_node)
3545
3546   def BuildHooksEnv(self):
3547     """Build hooks env.
3548
3549     This runs on the master, the primary and all the secondaries.
3550
3551     """
3552     env = {
3553       "MODE": self.op.mode,
3554       "NEW_SECONDARY": self.op.remote_node,
3555       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3556       }
3557     env.update(_BuildInstanceHookEnvByObject(self.instance))
3558     nl = [
3559       self.sstore.GetMasterNode(),
3560       self.instance.primary_node,
3561       ]
3562     if self.op.remote_node is not None:
3563       nl.append(self.op.remote_node)
3564     return env, nl, nl
3565
3566   def CheckPrereq(self):
3567     """Check prerequisites.
3568
3569     This checks that the instance is in the cluster.
3570
3571     """
3572     if not hasattr(self.op, "remote_node"):
3573       self.op.remote_node = None
3574
3575     instance = self.cfg.GetInstanceInfo(
3576       self.cfg.ExpandInstanceName(self.op.instance_name))
3577     if instance is None:
3578       raise errors.OpPrereqError("Instance '%s' not known" %
3579                                  self.op.instance_name)
3580     self.instance = instance
3581     self.op.instance_name = instance.name
3582
3583     if instance.disk_template not in constants.DTS_NET_MIRROR:
3584       raise errors.OpPrereqError("Instance's disk layout is not"
3585                                  " network mirrored.")
3586
3587     if len(instance.secondary_nodes) != 1:
3588       raise errors.OpPrereqError("The instance has a strange layout,"
3589                                  " expected one secondary but found %d" %
3590                                  len(instance.secondary_nodes))
3591
3592     self.sec_node = instance.secondary_nodes[0]
3593
3594     ia_name = getattr(self.op, "iallocator", None)
3595     if ia_name is not None:
3596       if self.op.remote_node is not None:
3597         raise errors.OpPrereqError("Give either the iallocator or the new"
3598                                    " secondary, not both")
3599       self._RunAllocator()
3600
3601     remote_node = self.op.remote_node
3602     if remote_node is not None:
3603       remote_node = self.cfg.ExpandNodeName(remote_node)
3604       if remote_node is None:
3605         raise errors.OpPrereqError("Node '%s' not known" %
3606                                    self.op.remote_node)
3607       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3608     else:
3609       self.remote_node_info = None
3610     if remote_node == instance.primary_node:
3611       raise errors.OpPrereqError("The specified node is the primary node of"
3612                                  " the instance.")
3613     elif remote_node == self.sec_node:
3614       if self.op.mode == constants.REPLACE_DISK_SEC:
3615         # this is for DRBD8, where we can't execute the same mode of
3616         # replacement as for drbd7 (no different port allocated)
3617         raise errors.OpPrereqError("Same secondary given, cannot execute"
3618                                    " replacement")
3619     if instance.disk_template == constants.DT_DRBD8:
3620       if (self.op.mode == constants.REPLACE_DISK_ALL and
3621           remote_node is not None):
3622         # switch to replace secondary mode
3623         self.op.mode = constants.REPLACE_DISK_SEC
3624
3625       if self.op.mode == constants.REPLACE_DISK_ALL:
3626         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3627                                    " secondary disk replacement, not"
3628                                    " both at once")
3629       elif self.op.mode == constants.REPLACE_DISK_PRI:
3630         if remote_node is not None:
3631           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3632                                      " the secondary while doing a primary"
3633                                      " node disk replacement")
3634         self.tgt_node = instance.primary_node
3635         self.oth_node = instance.secondary_nodes[0]
3636       elif self.op.mode == constants.REPLACE_DISK_SEC:
3637         self.new_node = remote_node # this can be None, in which case
3638                                     # we don't change the secondary
3639         self.tgt_node = instance.secondary_nodes[0]
3640         self.oth_node = instance.primary_node
3641       else:
3642         raise errors.ProgrammerError("Unhandled disk replace mode")
3643
3644     for name in self.op.disks:
3645       if instance.FindDisk(name) is None:
3646         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3647                                    (name, instance.name))
3648     self.op.remote_node = remote_node
3649
3650   def _ExecD8DiskOnly(self, feedback_fn):
3651     """Replace a disk on the primary or secondary for dbrd8.
3652
3653     The algorithm for replace is quite complicated:
3654       - for each disk to be replaced:
3655         - create new LVs on the target node with unique names
3656         - detach old LVs from the drbd device
3657         - rename old LVs to name_replaced.<time_t>
3658         - rename new LVs to old LVs
3659         - attach the new LVs (with the old names now) to the drbd device
3660       - wait for sync across all devices
3661       - for each modified disk:
3662         - remove old LVs (which have the name name_replaces.<time_t>)
3663
3664     Failures are not very well handled.
3665
3666     """
3667     steps_total = 6
3668     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3669     instance = self.instance
3670     iv_names = {}
3671     vgname = self.cfg.GetVGName()
3672     # start of work
3673     cfg = self.cfg
3674     tgt_node = self.tgt_node
3675     oth_node = self.oth_node
3676
3677     # Step: check device activation
3678     self.proc.LogStep(1, steps_total, "check device existence")
3679     info("checking volume groups")
3680     my_vg = cfg.GetVGName()
3681     results = rpc.call_vg_list([oth_node, tgt_node])
3682     if not results:
3683       raise errors.OpExecError("Can't list volume groups on the nodes")
3684     for node in oth_node, tgt_node:
3685       res = results.get(node, False)
3686       if not res or my_vg not in res:
3687         raise errors.OpExecError("Volume group '%s' not found on %s" %
3688                                  (my_vg, node))
3689     for dev in instance.disks:
3690       if not dev.iv_name in self.op.disks:
3691         continue
3692       for node in tgt_node, oth_node:
3693         info("checking %s on %s" % (dev.iv_name, node))
3694         cfg.SetDiskID(dev, node)
3695         if not rpc.call_blockdev_find(node, dev):
3696           raise errors.OpExecError("Can't find device %s on node %s" %
3697                                    (dev.iv_name, node))
3698
3699     # Step: check other node consistency
3700     self.proc.LogStep(2, steps_total, "check peer consistency")
3701     for dev in instance.disks:
3702       if not dev.iv_name in self.op.disks:
3703         continue
3704       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3705       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3706                                    oth_node==instance.primary_node):
3707         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3708                                  " to replace disks on this node (%s)" %
3709                                  (oth_node, tgt_node))
3710
3711     # Step: create new storage
3712     self.proc.LogStep(3, steps_total, "allocate new storage")
3713     for dev in instance.disks:
3714       if not dev.iv_name in self.op.disks:
3715         continue
3716       size = dev.size
3717       cfg.SetDiskID(dev, tgt_node)
3718       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3719       names = _GenerateUniqueNames(cfg, lv_names)
3720       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3721                              logical_id=(vgname, names[0]))
3722       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3723                              logical_id=(vgname, names[1]))
3724       new_lvs = [lv_data, lv_meta]
3725       old_lvs = dev.children
3726       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3727       info("creating new local storage on %s for %s" %
3728            (tgt_node, dev.iv_name))
3729       # since we *always* want to create this LV, we use the
3730       # _Create...OnPrimary (which forces the creation), even if we
3731       # are talking about the secondary node
3732       for new_lv in new_lvs:
3733         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3734                                         _GetInstanceInfoText(instance)):
3735           raise errors.OpExecError("Failed to create new LV named '%s' on"
3736                                    " node '%s'" %
3737                                    (new_lv.logical_id[1], tgt_node))
3738
3739     # Step: for each lv, detach+rename*2+attach
3740     self.proc.LogStep(4, steps_total, "change drbd configuration")
3741     for dev, old_lvs, new_lvs in iv_names.itervalues():
3742       info("detaching %s drbd from local storage" % dev.iv_name)
3743       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3744         raise errors.OpExecError("Can't detach drbd from local storage on node"
3745                                  " %s for device %s" % (tgt_node, dev.iv_name))
3746       #dev.children = []
3747       #cfg.Update(instance)
3748
3749       # ok, we created the new LVs, so now we know we have the needed
3750       # storage; as such, we proceed on the target node to rename
3751       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3752       # using the assumption that logical_id == physical_id (which in
3753       # turn is the unique_id on that node)
3754
3755       # FIXME(iustin): use a better name for the replaced LVs
3756       temp_suffix = int(time.time())
3757       ren_fn = lambda d, suff: (d.physical_id[0],
3758                                 d.physical_id[1] + "_replaced-%s" % suff)
3759       # build the rename list based on what LVs exist on the node
3760       rlist = []
3761       for to_ren in old_lvs:
3762         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3763         if find_res is not None: # device exists
3764           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3765
3766       info("renaming the old LVs on the target node")
3767       if not rpc.call_blockdev_rename(tgt_node, rlist):
3768         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3769       # now we rename the new LVs to the old LVs
3770       info("renaming the new LVs on the target node")
3771       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3772       if not rpc.call_blockdev_rename(tgt_node, rlist):
3773         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3774
3775       for old, new in zip(old_lvs, new_lvs):
3776         new.logical_id = old.logical_id
3777         cfg.SetDiskID(new, tgt_node)
3778
3779       for disk in old_lvs:
3780         disk.logical_id = ren_fn(disk, temp_suffix)
3781         cfg.SetDiskID(disk, tgt_node)
3782
3783       # now that the new lvs have the old name, we can add them to the device
3784       info("adding new mirror component on %s" % tgt_node)
3785       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3786         for new_lv in new_lvs:
3787           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3788             warning("Can't rollback device %s", hint="manually cleanup unused"
3789                     " logical volumes")
3790         raise errors.OpExecError("Can't add local storage to drbd")
3791
3792       dev.children = new_lvs
3793       cfg.Update(instance)
3794
3795     # Step: wait for sync
3796
3797     # this can fail as the old devices are degraded and _WaitForSync
3798     # does a combined result over all disks, so we don't check its
3799     # return value
3800     self.proc.LogStep(5, steps_total, "sync devices")
3801     _WaitForSync(cfg, instance, self.proc, unlock=True)
3802
3803     # so check manually all the devices
3804     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3805       cfg.SetDiskID(dev, instance.primary_node)
3806       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3807       if is_degr:
3808         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3809
3810     # Step: remove old storage
3811     self.proc.LogStep(6, steps_total, "removing old storage")
3812     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3813       info("remove logical volumes for %s" % name)
3814       for lv in old_lvs:
3815         cfg.SetDiskID(lv, tgt_node)
3816         if not rpc.call_blockdev_remove(tgt_node, lv):
3817           warning("Can't remove old LV", hint="manually remove unused LVs")
3818           continue
3819
3820   def _ExecD8Secondary(self, feedback_fn):
3821     """Replace the secondary node for drbd8.
3822
3823     The algorithm for replace is quite complicated:
3824       - for all disks of the instance:
3825         - create new LVs on the new node with same names
3826         - shutdown the drbd device on the old secondary
3827         - disconnect the drbd network on the primary
3828         - create the drbd device on the new secondary
3829         - network attach the drbd on the primary, using an artifice:
3830           the drbd code for Attach() will connect to the network if it
3831           finds a device which is connected to the good local disks but
3832           not network enabled
3833       - wait for sync across all devices
3834       - remove all disks from the old secondary
3835
3836     Failures are not very well handled.
3837
3838     """
3839     steps_total = 6
3840     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3841     instance = self.instance
3842     iv_names = {}
3843     vgname = self.cfg.GetVGName()
3844     # start of work
3845     cfg = self.cfg
3846     old_node = self.tgt_node
3847     new_node = self.new_node
3848     pri_node = instance.primary_node
3849
3850     # Step: check device activation
3851     self.proc.LogStep(1, steps_total, "check device existence")
3852     info("checking volume groups")
3853     my_vg = cfg.GetVGName()
3854     results = rpc.call_vg_list([pri_node, new_node])
3855     if not results:
3856       raise errors.OpExecError("Can't list volume groups on the nodes")
3857     for node in pri_node, new_node:
3858       res = results.get(node, False)
3859       if not res or my_vg not in res:
3860         raise errors.OpExecError("Volume group '%s' not found on %s" %
3861                                  (my_vg, node))
3862     for dev in instance.disks:
3863       if not dev.iv_name in self.op.disks:
3864         continue
3865       info("checking %s on %s" % (dev.iv_name, pri_node))
3866       cfg.SetDiskID(dev, pri_node)
3867       if not rpc.call_blockdev_find(pri_node, dev):
3868         raise errors.OpExecError("Can't find device %s on node %s" %
3869                                  (dev.iv_name, pri_node))
3870
3871     # Step: check other node consistency
3872     self.proc.LogStep(2, steps_total, "check peer consistency")
3873     for dev in instance.disks:
3874       if not dev.iv_name in self.op.disks:
3875         continue
3876       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3877       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3878         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3879                                  " unsafe to replace the secondary" %
3880                                  pri_node)
3881
3882     # Step: create new storage
3883     self.proc.LogStep(3, steps_total, "allocate new storage")
3884     for dev in instance.disks:
3885       size = dev.size
3886       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3887       # since we *always* want to create this LV, we use the
3888       # _Create...OnPrimary (which forces the creation), even if we
3889       # are talking about the secondary node
3890       for new_lv in dev.children:
3891         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3892                                         _GetInstanceInfoText(instance)):
3893           raise errors.OpExecError("Failed to create new LV named '%s' on"
3894                                    " node '%s'" %
3895                                    (new_lv.logical_id[1], new_node))
3896
3897       iv_names[dev.iv_name] = (dev, dev.children)
3898
3899     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3900     for dev in instance.disks:
3901       size = dev.size
3902       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3903       # create new devices on new_node
3904       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3905                               logical_id=(pri_node, new_node,
3906                                           dev.logical_id[2]),
3907                               children=dev.children)
3908       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3909                                         new_drbd, False,
3910                                       _GetInstanceInfoText(instance)):
3911         raise errors.OpExecError("Failed to create new DRBD on"
3912                                  " node '%s'" % new_node)
3913
3914     for dev in instance.disks:
3915       # we have new devices, shutdown the drbd on the old secondary
3916       info("shutting down drbd for %s on old node" % dev.iv_name)
3917       cfg.SetDiskID(dev, old_node)
3918       if not rpc.call_blockdev_shutdown(old_node, dev):
3919         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3920                 hint="Please cleanup this device manually as soon as possible")
3921
3922     info("detaching primary drbds from the network (=> standalone)")
3923     done = 0
3924     for dev in instance.disks:
3925       cfg.SetDiskID(dev, pri_node)
3926       # set the physical (unique in bdev terms) id to None, meaning
3927       # detach from network
3928       dev.physical_id = (None,) * len(dev.physical_id)
3929       # and 'find' the device, which will 'fix' it to match the
3930       # standalone state
3931       if rpc.call_blockdev_find(pri_node, dev):
3932         done += 1
3933       else:
3934         warning("Failed to detach drbd %s from network, unusual case" %
3935                 dev.iv_name)
3936
3937     if not done:
3938       # no detaches succeeded (very unlikely)
3939       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3940
3941     # if we managed to detach at least one, we update all the disks of
3942     # the instance to point to the new secondary
3943     info("updating instance configuration")
3944     for dev in instance.disks:
3945       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3946       cfg.SetDiskID(dev, pri_node)
3947     cfg.Update(instance)
3948
3949     # and now perform the drbd attach
3950     info("attaching primary drbds to new secondary (standalone => connected)")
3951     failures = []
3952     for dev in instance.disks:
3953       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3954       # since the attach is smart, it's enough to 'find' the device,
3955       # it will automatically activate the network, if the physical_id
3956       # is correct
3957       cfg.SetDiskID(dev, pri_node)
3958       if not rpc.call_blockdev_find(pri_node, dev):
3959         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3960                 "please do a gnt-instance info to see the status of disks")
3961
3962     # this can fail as the old devices are degraded and _WaitForSync
3963     # does a combined result over all disks, so we don't check its
3964     # return value
3965     self.proc.LogStep(5, steps_total, "sync devices")
3966     _WaitForSync(cfg, instance, self.proc, unlock=True)
3967
3968     # so check manually all the devices
3969     for name, (dev, old_lvs) in iv_names.iteritems():
3970       cfg.SetDiskID(dev, pri_node)
3971       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3972       if is_degr:
3973         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3974
3975     self.proc.LogStep(6, steps_total, "removing old storage")
3976     for name, (dev, old_lvs) in iv_names.iteritems():
3977       info("remove logical volumes for %s" % name)
3978       for lv in old_lvs:
3979         cfg.SetDiskID(lv, old_node)
3980         if not rpc.call_blockdev_remove(old_node, lv):
3981           warning("Can't remove LV on old secondary",
3982                   hint="Cleanup stale volumes by hand")
3983
3984   def Exec(self, feedback_fn):
3985     """Execute disk replacement.
3986
3987     This dispatches the disk replacement to the appropriate handler.
3988
3989     """
3990     instance = self.instance
3991
3992     # Activate the instance disks if we're replacing them on a down instance
3993     if instance.status == "down":
3994       _StartInstanceDisks(self.cfg, instance, True)
3995
3996     if instance.disk_template == constants.DT_DRBD8:
3997       if self.op.remote_node is None:
3998         fn = self._ExecD8DiskOnly
3999       else:
4000         fn = self._ExecD8Secondary
4001     else:
4002       raise errors.ProgrammerError("Unhandled disk replacement case")
4003
4004     ret = fn(feedback_fn)
4005
4006     # Deactivate the instance disks if we're replacing them on a down instance
4007     if instance.status == "down":
4008       _SafeShutdownInstanceDisks(instance, self.cfg)
4009
4010     return ret
4011
4012
4013 class LUGrowDisk(LogicalUnit):
4014   """Grow a disk of an instance.
4015
4016   """
4017   HPATH = "disk-grow"
4018   HTYPE = constants.HTYPE_INSTANCE
4019   _OP_REQP = ["instance_name", "disk", "amount"]
4020   REQ_BGL = False
4021
4022   def ExpandNames(self):
4023     self._ExpandAndLockInstance()
4024     self.needed_locks[locking.LEVEL_NODE] = []
4025     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4026
4027   def DeclareLocks(self, level):
4028     if level == locking.LEVEL_NODE:
4029       self._LockInstancesNodes()
4030
4031   def BuildHooksEnv(self):
4032     """Build hooks env.
4033
4034     This runs on the master, the primary and all the secondaries.
4035
4036     """
4037     env = {
4038       "DISK": self.op.disk,
4039       "AMOUNT": self.op.amount,
4040       }
4041     env.update(_BuildInstanceHookEnvByObject(self.instance))
4042     nl = [
4043       self.sstore.GetMasterNode(),
4044       self.instance.primary_node,
4045       ]
4046     return env, nl, nl
4047
4048   def CheckPrereq(self):
4049     """Check prerequisites.
4050
4051     This checks that the instance is in the cluster.
4052
4053     """
4054     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4055     assert instance is not None, \
4056       "Cannot retrieve locked instance %s" % self.op.instance_name
4057
4058     self.instance = instance
4059
4060     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4061       raise errors.OpPrereqError("Instance's disk layout does not support"
4062                                  " growing.")
4063
4064     if instance.FindDisk(self.op.disk) is None:
4065       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4066                                  (self.op.disk, instance.name))
4067
4068     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4069     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4070     for node in nodenames:
4071       info = nodeinfo.get(node, None)
4072       if not info:
4073         raise errors.OpPrereqError("Cannot get current information"
4074                                    " from node '%s'" % node)
4075       vg_free = info.get('vg_free', None)
4076       if not isinstance(vg_free, int):
4077         raise errors.OpPrereqError("Can't compute free disk space on"
4078                                    " node %s" % node)
4079       if self.op.amount > info['vg_free']:
4080         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4081                                    " %d MiB available, %d MiB required" %
4082                                    (node, info['vg_free'], self.op.amount))
4083
4084   def Exec(self, feedback_fn):
4085     """Execute disk grow.
4086
4087     """
4088     instance = self.instance
4089     disk = instance.FindDisk(self.op.disk)
4090     for node in (instance.secondary_nodes + (instance.primary_node,)):
4091       self.cfg.SetDiskID(disk, node)
4092       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4093       if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4094         raise errors.OpExecError("grow request failed to node %s" % node)
4095       elif not result[0]:
4096         raise errors.OpExecError("grow request failed to node %s: %s" %
4097                                  (node, result[1]))
4098     disk.RecordGrow(self.op.amount)
4099     self.cfg.Update(instance)
4100     return
4101
4102
4103 class LUQueryInstanceData(NoHooksLU):
4104   """Query runtime instance data.
4105
4106   """
4107   _OP_REQP = ["instances"]
4108
4109   def CheckPrereq(self):
4110     """Check prerequisites.
4111
4112     This only checks the optional instance list against the existing names.
4113
4114     """
4115     if not isinstance(self.op.instances, list):
4116       raise errors.OpPrereqError("Invalid argument type 'instances'")
4117     if self.op.instances:
4118       self.wanted_instances = []
4119       names = self.op.instances
4120       for name in names:
4121         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4122         if instance is None:
4123           raise errors.OpPrereqError("No such instance name '%s'" % name)
4124         self.wanted_instances.append(instance)
4125     else:
4126       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4127                                in self.cfg.GetInstanceList()]
4128     return
4129
4130
4131   def _ComputeDiskStatus(self, instance, snode, dev):
4132     """Compute block device status.
4133
4134     """
4135     self.cfg.SetDiskID(dev, instance.primary_node)
4136     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4137     if dev.dev_type in constants.LDS_DRBD:
4138       # we change the snode then (otherwise we use the one passed in)
4139       if dev.logical_id[0] == instance.primary_node:
4140         snode = dev.logical_id[1]
4141       else:
4142         snode = dev.logical_id[0]
4143
4144     if snode:
4145       self.cfg.SetDiskID(dev, snode)
4146       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4147     else:
4148       dev_sstatus = None
4149
4150     if dev.children:
4151       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4152                       for child in dev.children]
4153     else:
4154       dev_children = []
4155
4156     data = {
4157       "iv_name": dev.iv_name,
4158       "dev_type": dev.dev_type,
4159       "logical_id": dev.logical_id,
4160       "physical_id": dev.physical_id,
4161       "pstatus": dev_pstatus,
4162       "sstatus": dev_sstatus,
4163       "children": dev_children,
4164       }
4165
4166     return data
4167
4168   def Exec(self, feedback_fn):
4169     """Gather and return data"""
4170     result = {}
4171     for instance in self.wanted_instances:
4172       remote_info = rpc.call_instance_info(instance.primary_node,
4173                                                 instance.name)
4174       if remote_info and "state" in remote_info:
4175         remote_state = "up"
4176       else:
4177         remote_state = "down"
4178       if instance.status == "down":
4179         config_state = "down"
4180       else:
4181         config_state = "up"
4182
4183       disks = [self._ComputeDiskStatus(instance, None, device)
4184                for device in instance.disks]
4185
4186       idict = {
4187         "name": instance.name,
4188         "config_state": config_state,
4189         "run_state": remote_state,
4190         "pnode": instance.primary_node,
4191         "snodes": instance.secondary_nodes,
4192         "os": instance.os,
4193         "memory": instance.memory,
4194         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4195         "disks": disks,
4196         "vcpus": instance.vcpus,
4197         }
4198
4199       htkind = self.sstore.GetHypervisorType()
4200       if htkind == constants.HT_XEN_PVM30:
4201         idict["kernel_path"] = instance.kernel_path
4202         idict["initrd_path"] = instance.initrd_path
4203
4204       if htkind == constants.HT_XEN_HVM31:
4205         idict["hvm_boot_order"] = instance.hvm_boot_order
4206         idict["hvm_acpi"] = instance.hvm_acpi
4207         idict["hvm_pae"] = instance.hvm_pae
4208         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4209         idict["hvm_nic_type"] = instance.hvm_nic_type
4210         idict["hvm_disk_type"] = instance.hvm_disk_type
4211
4212       if htkind in constants.HTS_REQ_PORT:
4213         if instance.vnc_bind_address is None:
4214           vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4215         else:
4216           vnc_bind_address = instance.vnc_bind_address
4217         if instance.network_port is None:
4218           vnc_console_port = None
4219         elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4220           vnc_console_port = "%s:%s" % (instance.primary_node,
4221                                        instance.network_port)
4222         elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4223           vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4224                                                    instance.network_port,
4225                                                    instance.primary_node)
4226         else:
4227           vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4228                                         instance.network_port)
4229         idict["vnc_console_port"] = vnc_console_port
4230         idict["vnc_bind_address"] = vnc_bind_address
4231         idict["network_port"] = instance.network_port
4232
4233       result[instance.name] = idict
4234
4235     return result
4236
4237
4238 class LUSetInstanceParams(LogicalUnit):
4239   """Modifies an instances's parameters.
4240
4241   """
4242   HPATH = "instance-modify"
4243   HTYPE = constants.HTYPE_INSTANCE
4244   _OP_REQP = ["instance_name"]
4245   REQ_BGL = False
4246
4247   def ExpandNames(self):
4248     self._ExpandAndLockInstance()
4249
4250   def BuildHooksEnv(self):
4251     """Build hooks env.
4252
4253     This runs on the master, primary and secondaries.
4254
4255     """
4256     args = dict()
4257     if self.mem:
4258       args['memory'] = self.mem
4259     if self.vcpus:
4260       args['vcpus'] = self.vcpus
4261     if self.do_ip or self.do_bridge or self.mac:
4262       if self.do_ip:
4263         ip = self.ip
4264       else:
4265         ip = self.instance.nics[0].ip
4266       if self.bridge:
4267         bridge = self.bridge
4268       else:
4269         bridge = self.instance.nics[0].bridge
4270       if self.mac:
4271         mac = self.mac
4272       else:
4273         mac = self.instance.nics[0].mac
4274       args['nics'] = [(ip, bridge, mac)]
4275     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4276     nl = [self.sstore.GetMasterNode(),
4277           self.instance.primary_node] + list(self.instance.secondary_nodes)
4278     return env, nl, nl
4279
4280   def CheckPrereq(self):
4281     """Check prerequisites.
4282
4283     This only checks the instance list against the existing names.
4284
4285     """
4286     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4287     # a separate CheckArguments function, if we implement one, so the operation
4288     # can be aborted without waiting for any lock, should it have an error...
4289     self.mem = getattr(self.op, "mem", None)
4290     self.vcpus = getattr(self.op, "vcpus", None)
4291     self.ip = getattr(self.op, "ip", None)
4292     self.mac = getattr(self.op, "mac", None)
4293     self.bridge = getattr(self.op, "bridge", None)
4294     self.kernel_path = getattr(self.op, "kernel_path", None)
4295     self.initrd_path = getattr(self.op, "initrd_path", None)
4296     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4297     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4298     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4299     self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4300     self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4301     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4302     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4303     self.force = getattr(self.op, "force", None)
4304     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4305                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4306                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4307                  self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4308     if all_parms.count(None) == len(all_parms):
4309       raise errors.OpPrereqError("No changes submitted")
4310     if self.mem is not None:
4311       try:
4312         self.mem = int(self.mem)
4313       except ValueError, err:
4314         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4315     if self.vcpus is not None:
4316       try:
4317         self.vcpus = int(self.vcpus)
4318       except ValueError, err:
4319         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4320     if self.ip is not None:
4321       self.do_ip = True
4322       if self.ip.lower() == "none":
4323         self.ip = None
4324       else:
4325         if not utils.IsValidIP(self.ip):
4326           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4327     else:
4328       self.do_ip = False
4329     self.do_bridge = (self.bridge is not None)
4330     if self.mac is not None:
4331       if self.cfg.IsMacInUse(self.mac):
4332         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4333                                    self.mac)
4334       if not utils.IsValidMac(self.mac):
4335         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4336
4337     if self.kernel_path is not None:
4338       self.do_kernel_path = True
4339       if self.kernel_path == constants.VALUE_NONE:
4340         raise errors.OpPrereqError("Can't set instance to no kernel")
4341
4342       if self.kernel_path != constants.VALUE_DEFAULT:
4343         if not os.path.isabs(self.kernel_path):
4344           raise errors.OpPrereqError("The kernel path must be an absolute"
4345                                     " filename")
4346     else:
4347       self.do_kernel_path = False
4348
4349     if self.initrd_path is not None:
4350       self.do_initrd_path = True
4351       if self.initrd_path not in (constants.VALUE_NONE,
4352                                   constants.VALUE_DEFAULT):
4353         if not os.path.isabs(self.initrd_path):
4354           raise errors.OpPrereqError("The initrd path must be an absolute"
4355                                     " filename")
4356     else:
4357       self.do_initrd_path = False
4358
4359     # boot order verification
4360     if self.hvm_boot_order is not None:
4361       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4362         if len(self.hvm_boot_order.strip("acdn")) != 0:
4363           raise errors.OpPrereqError("invalid boot order specified,"
4364                                      " must be one or more of [acdn]"
4365                                      " or 'default'")
4366
4367     # hvm_cdrom_image_path verification
4368     if self.op.hvm_cdrom_image_path is not None:
4369       if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4370               self.op.hvm_cdrom_image_path.lower() == "none"):
4371         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4372                                    " be an absolute path or None, not %s" %
4373                                    self.op.hvm_cdrom_image_path)
4374       if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4375               self.op.hvm_cdrom_image_path.lower() == "none"):
4376         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4377                                    " regular file or a symlink pointing to"
4378                                    " an existing regular file, not %s" %
4379                                    self.op.hvm_cdrom_image_path)
4380
4381     # vnc_bind_address verification
4382     if self.op.vnc_bind_address is not None:
4383       if not utils.IsValidIP(self.op.vnc_bind_address):
4384         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4385                                    " like a valid IP address" %
4386                                    self.op.vnc_bind_address)
4387
4388     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4389     assert self.instance is not None, \
4390       "Cannot retrieve locked instance %s" % self.op.instance_name
4391     self.warn = []
4392     if self.mem is not None and not self.force:
4393       pnode = self.instance.primary_node
4394       nodelist = [pnode]
4395       nodelist.extend(instance.secondary_nodes)
4396       instance_info = rpc.call_instance_info(pnode, instance.name)
4397       nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4398
4399       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4400         # Assume the primary node is unreachable and go ahead
4401         self.warn.append("Can't get info from primary node %s" % pnode)
4402       else:
4403         if instance_info:
4404           current_mem = instance_info['memory']
4405         else:
4406           # Assume instance not running
4407           # (there is a slight race condition here, but it's not very probable,
4408           # and we have no other way to check)
4409           current_mem = 0
4410         miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4411         if miss_mem > 0:
4412           raise errors.OpPrereqError("This change will prevent the instance"
4413                                      " from starting, due to %d MB of memory"
4414                                      " missing on its primary node" % miss_mem)
4415
4416       for node in instance.secondary_nodes:
4417         if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4418           self.warn.append("Can't get info from secondary node %s" % node)
4419         elif self.mem > nodeinfo[node]['memory_free']:
4420           self.warn.append("Not enough memory to failover instance to secondary"
4421                            " node %s" % node)
4422
4423     # Xen HVM device type checks
4424     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
4425       if self.op.hvm_nic_type is not None:
4426         if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4427           raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4428                                      " HVM  hypervisor" % self.op.hvm_nic_type)
4429       if self.op.hvm_disk_type is not None:
4430         if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4431           raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4432                                      " HVM hypervisor" % self.op.hvm_disk_type)
4433
4434     return
4435
4436   def Exec(self, feedback_fn):
4437     """Modifies an instance.
4438
4439     All parameters take effect only at the next restart of the instance.
4440     """
4441     # Process here the warnings from CheckPrereq, as we don't have a
4442     # feedback_fn there.
4443     for warn in self.warn:
4444       feedback_fn("WARNING: %s" % warn)
4445
4446     result = []
4447     instance = self.instance
4448     if self.mem:
4449       instance.memory = self.mem
4450       result.append(("mem", self.mem))
4451     if self.vcpus:
4452       instance.vcpus = self.vcpus
4453       result.append(("vcpus",  self.vcpus))
4454     if self.do_ip:
4455       instance.nics[0].ip = self.ip
4456       result.append(("ip", self.ip))
4457     if self.bridge:
4458       instance.nics[0].bridge = self.bridge
4459       result.append(("bridge", self.bridge))
4460     if self.mac:
4461       instance.nics[0].mac = self.mac
4462       result.append(("mac", self.mac))
4463     if self.do_kernel_path:
4464       instance.kernel_path = self.kernel_path
4465       result.append(("kernel_path", self.kernel_path))
4466     if self.do_initrd_path:
4467       instance.initrd_path = self.initrd_path
4468       result.append(("initrd_path", self.initrd_path))
4469     if self.hvm_boot_order:
4470       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4471         instance.hvm_boot_order = None
4472       else:
4473         instance.hvm_boot_order = self.hvm_boot_order
4474       result.append(("hvm_boot_order", self.hvm_boot_order))
4475     if self.hvm_acpi is not None:
4476       instance.hvm_acpi = self.hvm_acpi
4477       result.append(("hvm_acpi", self.hvm_acpi))
4478     if self.hvm_pae is not None:
4479       instance.hvm_pae = self.hvm_pae
4480       result.append(("hvm_pae", self.hvm_pae))
4481     if self.hvm_nic_type is not None:
4482       instance.hvm_nic_type = self.hvm_nic_type
4483       result.append(("hvm_nic_type", self.hvm_nic_type))
4484     if self.hvm_disk_type is not None:
4485       instance.hvm_disk_type = self.hvm_disk_type
4486       result.append(("hvm_disk_type", self.hvm_disk_type))
4487     if self.hvm_cdrom_image_path:
4488       if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4489         instance.hvm_cdrom_image_path = None
4490       else:
4491         instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4492       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4493     if self.vnc_bind_address:
4494       instance.vnc_bind_address = self.vnc_bind_address
4495       result.append(("vnc_bind_address", self.vnc_bind_address))
4496
4497     self.cfg.Update(instance)
4498
4499     return result
4500
4501
4502 class LUQueryExports(NoHooksLU):
4503   """Query the exports list
4504
4505   """
4506   _OP_REQP = ['nodes']
4507   REQ_BGL = False
4508
4509   def ExpandNames(self):
4510     self.needed_locks = {}
4511     self.share_locks[locking.LEVEL_NODE] = 1
4512     if not self.op.nodes:
4513       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4514     else:
4515       self.needed_locks[locking.LEVEL_NODE] = \
4516         _GetWantedNodes(self, self.op.nodes)
4517
4518   def CheckPrereq(self):
4519     """Check prerequisites.
4520
4521     """
4522     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4523
4524   def Exec(self, feedback_fn):
4525     """Compute the list of all the exported system images.
4526
4527     Returns:
4528       a dictionary with the structure node->(export-list)
4529       where export-list is a list of the instances exported on
4530       that node.
4531
4532     """
4533     return rpc.call_export_list(self.nodes)
4534
4535
4536 class LUExportInstance(LogicalUnit):
4537   """Export an instance to an image in the cluster.
4538
4539   """
4540   HPATH = "instance-export"
4541   HTYPE = constants.HTYPE_INSTANCE
4542   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4543   REQ_BGL = False
4544
4545   def ExpandNames(self):
4546     self._ExpandAndLockInstance()
4547     # FIXME: lock only instance primary and destination node
4548     #
4549     # Sad but true, for now we have do lock all nodes, as we don't know where
4550     # the previous export might be, and and in this LU we search for it and
4551     # remove it from its current node. In the future we could fix this by:
4552     #  - making a tasklet to search (share-lock all), then create the new one,
4553     #    then one to remove, after
4554     #  - removing the removal operation altoghether
4555     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4556
4557   def DeclareLocks(self, level):
4558     """Last minute lock declaration."""
4559     # All nodes are locked anyway, so nothing to do here.
4560
4561   def BuildHooksEnv(self):
4562     """Build hooks env.
4563
4564     This will run on the master, primary node and target node.
4565
4566     """
4567     env = {
4568       "EXPORT_NODE": self.op.target_node,
4569       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4570       }
4571     env.update(_BuildInstanceHookEnvByObject(self.instance))
4572     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4573           self.op.target_node]
4574     return env, nl, nl
4575
4576   def CheckPrereq(self):
4577     """Check prerequisites.
4578
4579     This checks that the instance and node names are valid.
4580
4581     """
4582     instance_name = self.op.instance_name
4583     self.instance = self.cfg.GetInstanceInfo(instance_name)
4584     assert self.instance is not None, \
4585           "Cannot retrieve locked instance %s" % self.op.instance_name
4586
4587     self.dst_node = self.cfg.GetNodeInfo(
4588       self.cfg.ExpandNodeName(self.op.target_node))
4589
4590     assert self.dst_node is not None, \
4591           "Cannot retrieve locked node %s" % self.op.target_node
4592
4593     # instance disk type verification
4594     for disk in self.instance.disks:
4595       if disk.dev_type == constants.LD_FILE:
4596         raise errors.OpPrereqError("Export not supported for instances with"
4597                                    " file-based disks")
4598
4599   def Exec(self, feedback_fn):
4600     """Export an instance to an image in the cluster.
4601
4602     """
4603     instance = self.instance
4604     dst_node = self.dst_node
4605     src_node = instance.primary_node
4606     if self.op.shutdown:
4607       # shutdown the instance, but not the disks
4608       if not rpc.call_instance_shutdown(src_node, instance):
4609         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4610                                  (instance.name, src_node))
4611
4612     vgname = self.cfg.GetVGName()
4613
4614     snap_disks = []
4615
4616     try:
4617       for disk in instance.disks:
4618         if disk.iv_name == "sda":
4619           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4620           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4621
4622           if not new_dev_name:
4623             logger.Error("could not snapshot block device %s on node %s" %
4624                          (disk.logical_id[1], src_node))
4625           else:
4626             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4627                                       logical_id=(vgname, new_dev_name),
4628                                       physical_id=(vgname, new_dev_name),
4629                                       iv_name=disk.iv_name)
4630             snap_disks.append(new_dev)
4631
4632     finally:
4633       if self.op.shutdown and instance.status == "up":
4634         if not rpc.call_instance_start(src_node, instance, None):
4635           _ShutdownInstanceDisks(instance, self.cfg)
4636           raise errors.OpExecError("Could not start instance")
4637
4638     # TODO: check for size
4639
4640     for dev in snap_disks:
4641       if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4642         logger.Error("could not export block device %s from node %s to node %s"
4643                      % (dev.logical_id[1], src_node, dst_node.name))
4644       if not rpc.call_blockdev_remove(src_node, dev):
4645         logger.Error("could not remove snapshot block device %s from node %s" %
4646                      (dev.logical_id[1], src_node))
4647
4648     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4649       logger.Error("could not finalize export for instance %s on node %s" %
4650                    (instance.name, dst_node.name))
4651
4652     nodelist = self.cfg.GetNodeList()
4653     nodelist.remove(dst_node.name)
4654
4655     # on one-node clusters nodelist will be empty after the removal
4656     # if we proceed the backup would be removed because OpQueryExports
4657     # substitutes an empty list with the full cluster node list.
4658     if nodelist:
4659       exportlist = rpc.call_export_list(nodelist)
4660       for node in exportlist:
4661         if instance.name in exportlist[node]:
4662           if not rpc.call_export_remove(node, instance.name):
4663             logger.Error("could not remove older export for instance %s"
4664                          " on node %s" % (instance.name, node))
4665
4666
4667 class LURemoveExport(NoHooksLU):
4668   """Remove exports related to the named instance.
4669
4670   """
4671   _OP_REQP = ["instance_name"]
4672
4673   def CheckPrereq(self):
4674     """Check prerequisites.
4675     """
4676     pass
4677
4678   def Exec(self, feedback_fn):
4679     """Remove any export.
4680
4681     """
4682     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4683     # If the instance was not found we'll try with the name that was passed in.
4684     # This will only work if it was an FQDN, though.
4685     fqdn_warn = False
4686     if not instance_name:
4687       fqdn_warn = True
4688       instance_name = self.op.instance_name
4689
4690     exportlist = rpc.call_export_list(self.cfg.GetNodeList())
4691     found = False
4692     for node in exportlist:
4693       if instance_name in exportlist[node]:
4694         found = True
4695         if not rpc.call_export_remove(node, instance_name):
4696           logger.Error("could not remove export for instance %s"
4697                        " on node %s" % (instance_name, node))
4698
4699     if fqdn_warn and not found:
4700       feedback_fn("Export not found. If trying to remove an export belonging"
4701                   " to a deleted instance please use its Fully Qualified"
4702                   " Domain Name.")
4703
4704
4705 class TagsLU(NoHooksLU):
4706   """Generic tags LU.
4707
4708   This is an abstract class which is the parent of all the other tags LUs.
4709
4710   """
4711   def CheckPrereq(self):
4712     """Check prerequisites.
4713
4714     """
4715     if self.op.kind == constants.TAG_CLUSTER:
4716       self.target = self.cfg.GetClusterInfo()
4717     elif self.op.kind == constants.TAG_NODE:
4718       name = self.cfg.ExpandNodeName(self.op.name)
4719       if name is None:
4720         raise errors.OpPrereqError("Invalid node name (%s)" %
4721                                    (self.op.name,))
4722       self.op.name = name
4723       self.target = self.cfg.GetNodeInfo(name)
4724     elif self.op.kind == constants.TAG_INSTANCE:
4725       name = self.cfg.ExpandInstanceName(self.op.name)
4726       if name is None:
4727         raise errors.OpPrereqError("Invalid instance name (%s)" %
4728                                    (self.op.name,))
4729       self.op.name = name
4730       self.target = self.cfg.GetInstanceInfo(name)
4731     else:
4732       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4733                                  str(self.op.kind))
4734
4735
4736 class LUGetTags(TagsLU):
4737   """Returns the tags of a given object.
4738
4739   """
4740   _OP_REQP = ["kind", "name"]
4741
4742   def Exec(self, feedback_fn):
4743     """Returns the tag list.
4744
4745     """
4746     return list(self.target.GetTags())
4747
4748
4749 class LUSearchTags(NoHooksLU):
4750   """Searches the tags for a given pattern.
4751
4752   """
4753   _OP_REQP = ["pattern"]
4754
4755   def CheckPrereq(self):
4756     """Check prerequisites.
4757
4758     This checks the pattern passed for validity by compiling it.
4759
4760     """
4761     try:
4762       self.re = re.compile(self.op.pattern)
4763     except re.error, err:
4764       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4765                                  (self.op.pattern, err))
4766
4767   def Exec(self, feedback_fn):
4768     """Returns the tag list.
4769
4770     """
4771     cfg = self.cfg
4772     tgts = [("/cluster", cfg.GetClusterInfo())]
4773     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4774     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4775     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4776     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4777     results = []
4778     for path, target in tgts:
4779       for tag in target.GetTags():
4780         if self.re.search(tag):
4781           results.append((path, tag))
4782     return results
4783
4784
4785 class LUAddTags(TagsLU):
4786   """Sets a tag on a given object.
4787
4788   """
4789   _OP_REQP = ["kind", "name", "tags"]
4790
4791   def CheckPrereq(self):
4792     """Check prerequisites.
4793
4794     This checks the type and length of the tag name and value.
4795
4796     """
4797     TagsLU.CheckPrereq(self)
4798     for tag in self.op.tags:
4799       objects.TaggableObject.ValidateTag(tag)
4800
4801   def Exec(self, feedback_fn):
4802     """Sets the tag.
4803
4804     """
4805     try:
4806       for tag in self.op.tags:
4807         self.target.AddTag(tag)
4808     except errors.TagError, err:
4809       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4810     try:
4811       self.cfg.Update(self.target)
4812     except errors.ConfigurationError:
4813       raise errors.OpRetryError("There has been a modification to the"
4814                                 " config file and the operation has been"
4815                                 " aborted. Please retry.")
4816
4817
4818 class LUDelTags(TagsLU):
4819   """Delete a list of tags from a given object.
4820
4821   """
4822   _OP_REQP = ["kind", "name", "tags"]
4823
4824   def CheckPrereq(self):
4825     """Check prerequisites.
4826
4827     This checks that we have the given tag.
4828
4829     """
4830     TagsLU.CheckPrereq(self)
4831     for tag in self.op.tags:
4832       objects.TaggableObject.ValidateTag(tag)
4833     del_tags = frozenset(self.op.tags)
4834     cur_tags = self.target.GetTags()
4835     if not del_tags <= cur_tags:
4836       diff_tags = del_tags - cur_tags
4837       diff_names = ["'%s'" % tag for tag in diff_tags]
4838       diff_names.sort()
4839       raise errors.OpPrereqError("Tag(s) %s not found" %
4840                                  (",".join(diff_names)))
4841
4842   def Exec(self, feedback_fn):
4843     """Remove the tag from the object.
4844
4845     """
4846     for tag in self.op.tags:
4847       self.target.RemoveTag(tag)
4848     try:
4849       self.cfg.Update(self.target)
4850     except errors.ConfigurationError:
4851       raise errors.OpRetryError("There has been a modification to the"
4852                                 " config file and the operation has been"
4853                                 " aborted. Please retry.")
4854
4855
4856 class LUTestDelay(NoHooksLU):
4857   """Sleep for a specified amount of time.
4858
4859   This LU sleeps on the master and/or nodes for a specified amount of
4860   time.
4861
4862   """
4863   _OP_REQP = ["duration", "on_master", "on_nodes"]
4864   REQ_BGL = False
4865
4866   def ExpandNames(self):
4867     """Expand names and set required locks.
4868
4869     This expands the node list, if any.
4870
4871     """
4872     self.needed_locks = {}
4873     if self.op.on_nodes:
4874       # _GetWantedNodes can be used here, but is not always appropriate to use
4875       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4876       # more information.
4877       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4878       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4879
4880   def CheckPrereq(self):
4881     """Check prerequisites.
4882
4883     """
4884
4885   def Exec(self, feedback_fn):
4886     """Do the actual sleep.
4887
4888     """
4889     if self.op.on_master:
4890       if not utils.TestDelay(self.op.duration):
4891         raise errors.OpExecError("Error during master delay test")
4892     if self.op.on_nodes:
4893       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4894       if not result:
4895         raise errors.OpExecError("Complete failure from rpc call")
4896       for node, node_result in result.items():
4897         if not node_result:
4898           raise errors.OpExecError("Failure during rpc call to node %s,"
4899                                    " result: %s" % (node, node_result))
4900
4901
4902 class IAllocator(object):
4903   """IAllocator framework.
4904
4905   An IAllocator instance has three sets of attributes:
4906     - cfg/sstore that are needed to query the cluster
4907     - input data (all members of the _KEYS class attribute are required)
4908     - four buffer attributes (in|out_data|text), that represent the
4909       input (to the external script) in text and data structure format,
4910       and the output from it, again in two formats
4911     - the result variables from the script (success, info, nodes) for
4912       easy usage
4913
4914   """
4915   _ALLO_KEYS = [
4916     "mem_size", "disks", "disk_template",
4917     "os", "tags", "nics", "vcpus",
4918     ]
4919   _RELO_KEYS = [
4920     "relocate_from",
4921     ]
4922
4923   def __init__(self, cfg, sstore, mode, name, **kwargs):
4924     self.cfg = cfg
4925     self.sstore = sstore
4926     # init buffer variables
4927     self.in_text = self.out_text = self.in_data = self.out_data = None
4928     # init all input fields so that pylint is happy
4929     self.mode = mode
4930     self.name = name
4931     self.mem_size = self.disks = self.disk_template = None
4932     self.os = self.tags = self.nics = self.vcpus = None
4933     self.relocate_from = None
4934     # computed fields
4935     self.required_nodes = None
4936     # init result fields
4937     self.success = self.info = self.nodes = None
4938     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4939       keyset = self._ALLO_KEYS
4940     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4941       keyset = self._RELO_KEYS
4942     else:
4943       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4944                                    " IAllocator" % self.mode)
4945     for key in kwargs:
4946       if key not in keyset:
4947         raise errors.ProgrammerError("Invalid input parameter '%s' to"
4948                                      " IAllocator" % key)
4949       setattr(self, key, kwargs[key])
4950     for key in keyset:
4951       if key not in kwargs:
4952         raise errors.ProgrammerError("Missing input parameter '%s' to"
4953                                      " IAllocator" % key)
4954     self._BuildInputData()
4955
4956   def _ComputeClusterData(self):
4957     """Compute the generic allocator input data.
4958
4959     This is the data that is independent of the actual operation.
4960
4961     """
4962     cfg = self.cfg
4963     # cluster data
4964     data = {
4965       "version": 1,
4966       "cluster_name": self.sstore.GetClusterName(),
4967       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4968       "hypervisor_type": self.sstore.GetHypervisorType(),
4969       # we don't have job IDs
4970       }
4971
4972     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4973
4974     # node data
4975     node_results = {}
4976     node_list = cfg.GetNodeList()
4977     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4978     for nname in node_list:
4979       ninfo = cfg.GetNodeInfo(nname)
4980       if nname not in node_data or not isinstance(node_data[nname], dict):
4981         raise errors.OpExecError("Can't get data for node %s" % nname)
4982       remote_info = node_data[nname]
4983       for attr in ['memory_total', 'memory_free', 'memory_dom0',
4984                    'vg_size', 'vg_free', 'cpu_total']:
4985         if attr not in remote_info:
4986           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4987                                    (nname, attr))
4988         try:
4989           remote_info[attr] = int(remote_info[attr])
4990         except ValueError, err:
4991           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4992                                    " %s" % (nname, attr, str(err)))
4993       # compute memory used by primary instances
4994       i_p_mem = i_p_up_mem = 0
4995       for iinfo in i_list:
4996         if iinfo.primary_node == nname:
4997           i_p_mem += iinfo.memory
4998           if iinfo.status == "up":
4999             i_p_up_mem += iinfo.memory
5000
5001       # compute memory used by instances
5002       pnr = {
5003         "tags": list(ninfo.GetTags()),
5004         "total_memory": remote_info['memory_total'],
5005         "reserved_memory": remote_info['memory_dom0'],
5006         "free_memory": remote_info['memory_free'],
5007         "i_pri_memory": i_p_mem,
5008         "i_pri_up_memory": i_p_up_mem,
5009         "total_disk": remote_info['vg_size'],
5010         "free_disk": remote_info['vg_free'],
5011         "primary_ip": ninfo.primary_ip,
5012         "secondary_ip": ninfo.secondary_ip,
5013         "total_cpus": remote_info['cpu_total'],
5014         }
5015       node_results[nname] = pnr
5016     data["nodes"] = node_results
5017
5018     # instance data
5019     instance_data = {}
5020     for iinfo in i_list:
5021       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5022                   for n in iinfo.nics]
5023       pir = {
5024         "tags": list(iinfo.GetTags()),
5025         "should_run": iinfo.status == "up",
5026         "vcpus": iinfo.vcpus,
5027         "memory": iinfo.memory,
5028         "os": iinfo.os,
5029         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5030         "nics": nic_data,
5031         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5032         "disk_template": iinfo.disk_template,
5033         }
5034       instance_data[iinfo.name] = pir
5035
5036     data["instances"] = instance_data
5037
5038     self.in_data = data
5039
5040   def _AddNewInstance(self):
5041     """Add new instance data to allocator structure.
5042
5043     This in combination with _AllocatorGetClusterData will create the
5044     correct structure needed as input for the allocator.
5045
5046     The checks for the completeness of the opcode must have already been
5047     done.
5048
5049     """
5050     data = self.in_data
5051     if len(self.disks) != 2:
5052       raise errors.OpExecError("Only two-disk configurations supported")
5053
5054     disk_space = _ComputeDiskSize(self.disk_template,
5055                                   self.disks[0]["size"], self.disks[1]["size"])
5056
5057     if self.disk_template in constants.DTS_NET_MIRROR:
5058       self.required_nodes = 2
5059     else:
5060       self.required_nodes = 1
5061     request = {
5062       "type": "allocate",
5063       "name": self.name,
5064       "disk_template": self.disk_template,
5065       "tags": self.tags,
5066       "os": self.os,
5067       "vcpus": self.vcpus,
5068       "memory": self.mem_size,
5069       "disks": self.disks,
5070       "disk_space_total": disk_space,
5071       "nics": self.nics,
5072       "required_nodes": self.required_nodes,
5073       }
5074     data["request"] = request
5075
5076   def _AddRelocateInstance(self):
5077     """Add relocate instance data to allocator structure.
5078
5079     This in combination with _IAllocatorGetClusterData will create the
5080     correct structure needed as input for the allocator.
5081
5082     The checks for the completeness of the opcode must have already been
5083     done.
5084
5085     """
5086     instance = self.cfg.GetInstanceInfo(self.name)
5087     if instance is None:
5088       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5089                                    " IAllocator" % self.name)
5090
5091     if instance.disk_template not in constants.DTS_NET_MIRROR:
5092       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5093
5094     if len(instance.secondary_nodes) != 1:
5095       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5096
5097     self.required_nodes = 1
5098
5099     disk_space = _ComputeDiskSize(instance.disk_template,
5100                                   instance.disks[0].size,
5101                                   instance.disks[1].size)
5102
5103     request = {
5104       "type": "relocate",
5105       "name": self.name,
5106       "disk_space_total": disk_space,
5107       "required_nodes": self.required_nodes,
5108       "relocate_from": self.relocate_from,
5109       }
5110     self.in_data["request"] = request
5111
5112   def _BuildInputData(self):
5113     """Build input data structures.
5114
5115     """
5116     self._ComputeClusterData()
5117
5118     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5119       self._AddNewInstance()
5120     else:
5121       self._AddRelocateInstance()
5122
5123     self.in_text = serializer.Dump(self.in_data)
5124
5125   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5126     """Run an instance allocator and return the results.
5127
5128     """
5129     data = self.in_text
5130
5131     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5132
5133     if not isinstance(result, (list, tuple)) or len(result) != 4:
5134       raise errors.OpExecError("Invalid result from master iallocator runner")
5135
5136     rcode, stdout, stderr, fail = result
5137
5138     if rcode == constants.IARUN_NOTFOUND:
5139       raise errors.OpExecError("Can't find allocator '%s'" % name)
5140     elif rcode == constants.IARUN_FAILURE:
5141       raise errors.OpExecError("Instance allocator call failed: %s,"
5142                                " output: %s" % (fail, stdout+stderr))
5143     self.out_text = stdout
5144     if validate:
5145       self._ValidateResult()
5146
5147   def _ValidateResult(self):
5148     """Process the allocator results.
5149
5150     This will process and if successful save the result in
5151     self.out_data and the other parameters.
5152
5153     """
5154     try:
5155       rdict = serializer.Load(self.out_text)
5156     except Exception, err:
5157       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5158
5159     if not isinstance(rdict, dict):
5160       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5161
5162     for key in "success", "info", "nodes":
5163       if key not in rdict:
5164         raise errors.OpExecError("Can't parse iallocator results:"
5165                                  " missing key '%s'" % key)
5166       setattr(self, key, rdict[key])
5167
5168     if not isinstance(rdict["nodes"], list):
5169       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5170                                " is not a list")
5171     self.out_data = rdict
5172
5173
5174 class LUTestAllocator(NoHooksLU):
5175   """Run allocator tests.
5176
5177   This LU runs the allocator tests
5178
5179   """
5180   _OP_REQP = ["direction", "mode", "name"]
5181
5182   def CheckPrereq(self):
5183     """Check prerequisites.
5184
5185     This checks the opcode parameters depending on the director and mode test.
5186
5187     """
5188     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5189       for attr in ["name", "mem_size", "disks", "disk_template",
5190                    "os", "tags", "nics", "vcpus"]:
5191         if not hasattr(self.op, attr):
5192           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5193                                      attr)
5194       iname = self.cfg.ExpandInstanceName(self.op.name)
5195       if iname is not None:
5196         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5197                                    iname)
5198       if not isinstance(self.op.nics, list):
5199         raise errors.OpPrereqError("Invalid parameter 'nics'")
5200       for row in self.op.nics:
5201         if (not isinstance(row, dict) or
5202             "mac" not in row or
5203             "ip" not in row or
5204             "bridge" not in row):
5205           raise errors.OpPrereqError("Invalid contents of the"
5206                                      " 'nics' parameter")
5207       if not isinstance(self.op.disks, list):
5208         raise errors.OpPrereqError("Invalid parameter 'disks'")
5209       if len(self.op.disks) != 2:
5210         raise errors.OpPrereqError("Only two-disk configurations supported")
5211       for row in self.op.disks:
5212         if (not isinstance(row, dict) or
5213             "size" not in row or
5214             not isinstance(row["size"], int) or
5215             "mode" not in row or
5216             row["mode"] not in ['r', 'w']):
5217           raise errors.OpPrereqError("Invalid contents of the"
5218                                      " 'disks' parameter")
5219     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5220       if not hasattr(self.op, "name"):
5221         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5222       fname = self.cfg.ExpandInstanceName(self.op.name)
5223       if fname is None:
5224         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5225                                    self.op.name)
5226       self.op.name = fname
5227       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5228     else:
5229       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5230                                  self.op.mode)
5231
5232     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5233       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5234         raise errors.OpPrereqError("Missing allocator name")
5235     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5236       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5237                                  self.op.direction)
5238
5239   def Exec(self, feedback_fn):
5240     """Run the allocator test.
5241
5242     """
5243     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5244       ial = IAllocator(self.cfg, self.sstore,
5245                        mode=self.op.mode,
5246                        name=self.op.name,
5247                        mem_size=self.op.mem_size,
5248                        disks=self.op.disks,
5249                        disk_template=self.op.disk_template,
5250                        os=self.op.os,
5251                        tags=self.op.tags,
5252                        nics=self.op.nics,
5253                        vcpus=self.op.vcpus,
5254                        )
5255     else:
5256       ial = IAllocator(self.cfg, self.sstore,
5257                        mode=self.op.mode,
5258                        name=self.op.name,
5259                        relocate_from=list(self.relocate_from),
5260                        )
5261
5262     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5263       result = ial.in_text
5264     else:
5265       ial.Run(self.op.allocator, validate=False)
5266       result = ial.out_text
5267     return result