Xen: remove two end-of-line semicolons
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement ExpandNames
52     - implement CheckPrereq
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements:
57         REQ_MASTER: the LU needs to run on the master node
58         REQ_WSSTORE: the LU needs a writable SimpleStore
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_MASTER = True
68   REQ_WSSTORE = False
69   REQ_BGL = True
70
71   def __init__(self, processor, op, context, sstore):
72     """Constructor for LogicalUnit.
73
74     This needs to be overriden in derived classes in order to check op
75     validity.
76
77     """
78     self.proc = processor
79     self.op = op
80     self.cfg = context.cfg
81     self.sstore = sstore
82     self.context = context
83     self.needed_locks = None
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     # Used to force good behavior when calling helper functions
86     self.recalculate_locks = {}
87     self.__ssh = None
88
89     for attr_name in self._OP_REQP:
90       attr_val = getattr(op, attr_name, None)
91       if attr_val is None:
92         raise errors.OpPrereqError("Required parameter '%s' missing" %
93                                    attr_name)
94
95     if not self.cfg.IsCluster():
96       raise errors.OpPrereqError("Cluster not initialized yet,"
97                                  " use 'gnt-cluster init' first.")
98     if self.REQ_MASTER:
99       master = sstore.GetMasterNode()
100       if master != utils.HostInfo().name:
101         raise errors.OpPrereqError("Commands must be run on the master"
102                                    " node %s" % master)
103
104   def __GetSSH(self):
105     """Returns the SshRunner object
106
107     """
108     if not self.__ssh:
109       self.__ssh = ssh.SshRunner(self.sstore)
110     return self.__ssh
111
112   ssh = property(fget=__GetSSH)
113
114   def ExpandNames(self):
115     """Expand names for this LU.
116
117     This method is called before starting to execute the opcode, and it should
118     update all the parameters of the opcode to their canonical form (e.g. a
119     short node name must be fully expanded after this method has successfully
120     completed). This way locking, hooks, logging, ecc. can work correctly.
121
122     LUs which implement this method must also populate the self.needed_locks
123     member, as a dict with lock levels as keys, and a list of needed lock names
124     as values. Rules:
125       - Use an empty dict if you don't need any lock
126       - If you don't need any lock at a particular level omit that level
127       - Don't put anything for the BGL level
128       - If you want all locks at a level use None as a value
129         (this reflects what LockSet does, and will be replaced before
130         CheckPrereq with the full list of nodes that have been locked)
131
132     If you need to share locks (rather than acquire them exclusively) at one
133     level you can modify self.share_locks, setting a true value (usually 1) for
134     that level. By default locks are not shared.
135
136     Examples:
137     # Acquire all nodes and one instance
138     self.needed_locks = {
139       locking.LEVEL_NODE: None,
140       locking.LEVEL_INSTANCES: ['instance1.example.tld'],
141     }
142     # Acquire just two nodes
143     self.needed_locks = {
144       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145     }
146     # Acquire no locks
147     self.needed_locks = {} # No, you can't leave it to the default value None
148
149     """
150     # The implementation of this method is mandatory only if the new LU is
151     # concurrent, so that old LUs don't need to be changed all at the same
152     # time.
153     if self.REQ_BGL:
154       self.needed_locks = {} # Exclusive LUs don't need locks.
155     else:
156       raise NotImplementedError
157
158   def DeclareLocks(self, level):
159     """Declare LU locking needs for a level
160
161     While most LUs can just declare their locking needs at ExpandNames time,
162     sometimes there's the need to calculate some locks after having acquired
163     the ones before. This function is called just before acquiring locks at a
164     particular level, but after acquiring the ones at lower levels, and permits
165     such calculations. It can be used to modify self.needed_locks, and by
166     default it does nothing.
167
168     This function is only called if you have something already set in
169     self.needed_locks for the level.
170
171     @param level: Locking level which is going to be locked
172     @type level: member of ganeti.locking.LEVELS
173
174     """
175
176   def CheckPrereq(self):
177     """Check prerequisites for this LU.
178
179     This method should check that the prerequisites for the execution
180     of this LU are fulfilled. It can do internode communication, but
181     it should be idempotent - no cluster or system changes are
182     allowed.
183
184     The method should raise errors.OpPrereqError in case something is
185     not fulfilled. Its return value is ignored.
186
187     This method should also update all the parameters of the opcode to
188     their canonical form if it hasn't been done by ExpandNames before.
189
190     """
191     raise NotImplementedError
192
193   def Exec(self, feedback_fn):
194     """Execute the LU.
195
196     This method should implement the actual work. It should raise
197     errors.OpExecError for failures that are somewhat dealt with in
198     code, or expected.
199
200     """
201     raise NotImplementedError
202
203   def BuildHooksEnv(self):
204     """Build hooks environment for this LU.
205
206     This method should return a three-node tuple consisting of: a dict
207     containing the environment that will be used for running the
208     specific hook for this LU, a list of node names on which the hook
209     should run before the execution, and a list of node names on which
210     the hook should run after the execution.
211
212     The keys of the dict must not have 'GANETI_' prefixed as this will
213     be handled in the hooks runner. Also note additional keys will be
214     added by the hooks runner. If the LU doesn't define any
215     environment, an empty dict (and not None) should be returned.
216
217     No nodes should be returned as an empty list (and not None).
218
219     Note that if the HPATH for a LU class is None, this function will
220     not be called.
221
222     """
223     raise NotImplementedError
224
225   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226     """Notify the LU about the results of its hooks.
227
228     This method is called every time a hooks phase is executed, and notifies
229     the Logical Unit about the hooks' result. The LU can then use it to alter
230     its result based on the hooks.  By default the method does nothing and the
231     previous result is passed back unchanged but any LU can define it if it
232     wants to use the local cluster hook-scripts somehow.
233
234     Args:
235       phase: the hooks phase that has just been run
236       hooks_results: the results of the multi-node hooks rpc call
237       feedback_fn: function to send feedback back to the caller
238       lu_result: the previous result this LU had, or None in the PRE phase.
239
240     """
241     return lu_result
242
243   def _ExpandAndLockInstance(self):
244     """Helper function to expand and lock an instance.
245
246     Many LUs that work on an instance take its name in self.op.instance_name
247     and need to expand it and then declare the expanded name for locking. This
248     function does it, and then updates self.op.instance_name to the expanded
249     name. It also initializes needed_locks as a dict, if this hasn't been done
250     before.
251
252     """
253     if self.needed_locks is None:
254       self.needed_locks = {}
255     else:
256       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257         "_ExpandAndLockInstance called with instance-level locks set"
258     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259     if expanded_name is None:
260       raise errors.OpPrereqError("Instance '%s' not known" %
261                                   self.op.instance_name)
262     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263     self.op.instance_name = expanded_name
264
265   def _LockInstancesNodes(self):
266     """Helper function to declare instances' nodes for locking.
267
268     This function should be called after locking one or more instances to lock
269     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270     with all primary or secondary nodes for instances already locked and
271     present in self.needed_locks[locking.LEVEL_INSTANCE].
272
273     It should be called from DeclareLocks, and for safety only works if
274     self.recalculate_locks[locking.LEVEL_NODE] is set.
275
276     In the future it may grow parameters to just lock some instance's nodes, or
277     to just lock primaries or secondary nodes, if needed.
278
279     If should be called in DeclareLocks in a way similar to:
280
281     if level == locking.LEVEL_NODE:
282       self._LockInstancesNodes()
283
284     """
285     assert locking.LEVEL_NODE in self.recalculate_locks, \
286       "_LockInstancesNodes helper function called with no nodes to recalculate"
287
288     # TODO: check if we're really been called with the instance locks held
289
290     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
291     # future we might want to have different behaviors depending on the value
292     # of self.recalculate_locks[locking.LEVEL_NODE]
293     wanted_nodes = []
294     for instance_name in self.needed_locks[locking.LEVEL_INSTANCE]:
295       instance = self.context.cfg.GetInstanceInfo(instance_name)
296       wanted_nodes.append(instance.primary_node)
297       wanted_nodes.extend(instance.secondary_nodes)
298     self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
299
300     del self.recalculate_locks[locking.LEVEL_NODE]
301
302
303 class NoHooksLU(LogicalUnit):
304   """Simple LU which runs no hooks.
305
306   This LU is intended as a parent for other LogicalUnits which will
307   run no hooks, in order to reduce duplicate code.
308
309   """
310   HPATH = None
311   HTYPE = None
312
313
314 def _GetWantedNodes(lu, nodes):
315   """Returns list of checked and expanded node names.
316
317   Args:
318     nodes: List of nodes (strings) or None for all
319
320   """
321   if not isinstance(nodes, list):
322     raise errors.OpPrereqError("Invalid argument type 'nodes'")
323
324   if nodes:
325     wanted = []
326
327     for name in nodes:
328       node = lu.cfg.ExpandNodeName(name)
329       if node is None:
330         raise errors.OpPrereqError("No such node name '%s'" % name)
331       wanted.append(node)
332
333   else:
334     wanted = lu.cfg.GetNodeList()
335   return utils.NiceSort(wanted)
336
337
338 def _GetWantedInstances(lu, instances):
339   """Returns list of checked and expanded instance names.
340
341   Args:
342     instances: List of instances (strings) or None for all
343
344   """
345   if not isinstance(instances, list):
346     raise errors.OpPrereqError("Invalid argument type 'instances'")
347
348   if instances:
349     wanted = []
350
351     for name in instances:
352       instance = lu.cfg.ExpandInstanceName(name)
353       if instance is None:
354         raise errors.OpPrereqError("No such instance name '%s'" % name)
355       wanted.append(instance)
356
357   else:
358     wanted = lu.cfg.GetInstanceList()
359   return utils.NiceSort(wanted)
360
361
362 def _CheckOutputFields(static, dynamic, selected):
363   """Checks whether all selected fields are valid.
364
365   Args:
366     static: Static fields
367     dynamic: Dynamic fields
368
369   """
370   static_fields = frozenset(static)
371   dynamic_fields = frozenset(dynamic)
372
373   all_fields = static_fields | dynamic_fields
374
375   if not all_fields.issuperset(selected):
376     raise errors.OpPrereqError("Unknown output fields selected: %s"
377                                % ",".join(frozenset(selected).
378                                           difference(all_fields)))
379
380
381 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
382                           memory, vcpus, nics):
383   """Builds instance related env variables for hooks from single variables.
384
385   Args:
386     secondary_nodes: List of secondary nodes as strings
387   """
388   env = {
389     "OP_TARGET": name,
390     "INSTANCE_NAME": name,
391     "INSTANCE_PRIMARY": primary_node,
392     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
393     "INSTANCE_OS_TYPE": os_type,
394     "INSTANCE_STATUS": status,
395     "INSTANCE_MEMORY": memory,
396     "INSTANCE_VCPUS": vcpus,
397   }
398
399   if nics:
400     nic_count = len(nics)
401     for idx, (ip, bridge, mac) in enumerate(nics):
402       if ip is None:
403         ip = ""
404       env["INSTANCE_NIC%d_IP" % idx] = ip
405       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
406       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
407   else:
408     nic_count = 0
409
410   env["INSTANCE_NIC_COUNT"] = nic_count
411
412   return env
413
414
415 def _BuildInstanceHookEnvByObject(instance, override=None):
416   """Builds instance related env variables for hooks from an object.
417
418   Args:
419     instance: objects.Instance object of instance
420     override: dict of values to override
421   """
422   args = {
423     'name': instance.name,
424     'primary_node': instance.primary_node,
425     'secondary_nodes': instance.secondary_nodes,
426     'os_type': instance.os,
427     'status': instance.os,
428     'memory': instance.memory,
429     'vcpus': instance.vcpus,
430     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
431   }
432   if override:
433     args.update(override)
434   return _BuildInstanceHookEnv(**args)
435
436
437 def _CheckInstanceBridgesExist(instance):
438   """Check that the brigdes needed by an instance exist.
439
440   """
441   # check bridges existance
442   brlist = [nic.bridge for nic in instance.nics]
443   if not rpc.call_bridges_exist(instance.primary_node, brlist):
444     raise errors.OpPrereqError("one or more target bridges %s does not"
445                                " exist on destination node '%s'" %
446                                (brlist, instance.primary_node))
447
448
449 class LUDestroyCluster(NoHooksLU):
450   """Logical unit for destroying the cluster.
451
452   """
453   _OP_REQP = []
454
455   def CheckPrereq(self):
456     """Check prerequisites.
457
458     This checks whether the cluster is empty.
459
460     Any errors are signalled by raising errors.OpPrereqError.
461
462     """
463     master = self.sstore.GetMasterNode()
464
465     nodelist = self.cfg.GetNodeList()
466     if len(nodelist) != 1 or nodelist[0] != master:
467       raise errors.OpPrereqError("There are still %d node(s) in"
468                                  " this cluster." % (len(nodelist) - 1))
469     instancelist = self.cfg.GetInstanceList()
470     if instancelist:
471       raise errors.OpPrereqError("There are still %d instance(s) in"
472                                  " this cluster." % len(instancelist))
473
474   def Exec(self, feedback_fn):
475     """Destroys the cluster.
476
477     """
478     master = self.sstore.GetMasterNode()
479     if not rpc.call_node_stop_master(master, False):
480       raise errors.OpExecError("Could not disable the master role")
481     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
482     utils.CreateBackup(priv_key)
483     utils.CreateBackup(pub_key)
484     rpc.call_node_leave_cluster(master)
485
486
487 class LUVerifyCluster(LogicalUnit):
488   """Verifies the cluster status.
489
490   """
491   HPATH = "cluster-verify"
492   HTYPE = constants.HTYPE_CLUSTER
493   _OP_REQP = ["skip_checks"]
494
495   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
496                   remote_version, feedback_fn):
497     """Run multiple tests against a node.
498
499     Test list:
500       - compares ganeti version
501       - checks vg existance and size > 20G
502       - checks config file checksum
503       - checks ssh to other nodes
504
505     Args:
506       node: name of the node to check
507       file_list: required list of files
508       local_cksum: dictionary of local files and their checksums
509
510     """
511     # compares ganeti version
512     local_version = constants.PROTOCOL_VERSION
513     if not remote_version:
514       feedback_fn("  - ERROR: connection to %s failed" % (node))
515       return True
516
517     if local_version != remote_version:
518       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
519                       (local_version, node, remote_version))
520       return True
521
522     # checks vg existance and size > 20G
523
524     bad = False
525     if not vglist:
526       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
527                       (node,))
528       bad = True
529     else:
530       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
531                                             constants.MIN_VG_SIZE)
532       if vgstatus:
533         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
534         bad = True
535
536     # checks config file checksum
537     # checks ssh to any
538
539     if 'filelist' not in node_result:
540       bad = True
541       feedback_fn("  - ERROR: node hasn't returned file checksum data")
542     else:
543       remote_cksum = node_result['filelist']
544       for file_name in file_list:
545         if file_name not in remote_cksum:
546           bad = True
547           feedback_fn("  - ERROR: file '%s' missing" % file_name)
548         elif remote_cksum[file_name] != local_cksum[file_name]:
549           bad = True
550           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
551
552     if 'nodelist' not in node_result:
553       bad = True
554       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
555     else:
556       if node_result['nodelist']:
557         bad = True
558         for node in node_result['nodelist']:
559           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
560                           (node, node_result['nodelist'][node]))
561     if 'node-net-test' not in node_result:
562       bad = True
563       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
564     else:
565       if node_result['node-net-test']:
566         bad = True
567         nlist = utils.NiceSort(node_result['node-net-test'].keys())
568         for node in nlist:
569           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
570                           (node, node_result['node-net-test'][node]))
571
572     hyp_result = node_result.get('hypervisor', None)
573     if hyp_result is not None:
574       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
575     return bad
576
577   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
578                       node_instance, feedback_fn):
579     """Verify an instance.
580
581     This function checks to see if the required block devices are
582     available on the instance's node.
583
584     """
585     bad = False
586
587     node_current = instanceconfig.primary_node
588
589     node_vol_should = {}
590     instanceconfig.MapLVsByNode(node_vol_should)
591
592     for node in node_vol_should:
593       for volume in node_vol_should[node]:
594         if node not in node_vol_is or volume not in node_vol_is[node]:
595           feedback_fn("  - ERROR: volume %s missing on node %s" %
596                           (volume, node))
597           bad = True
598
599     if not instanceconfig.status == 'down':
600       if (node_current not in node_instance or
601           not instance in node_instance[node_current]):
602         feedback_fn("  - ERROR: instance %s not running on node %s" %
603                         (instance, node_current))
604         bad = True
605
606     for node in node_instance:
607       if (not node == node_current):
608         if instance in node_instance[node]:
609           feedback_fn("  - ERROR: instance %s should not run on node %s" %
610                           (instance, node))
611           bad = True
612
613     return bad
614
615   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
616     """Verify if there are any unknown volumes in the cluster.
617
618     The .os, .swap and backup volumes are ignored. All other volumes are
619     reported as unknown.
620
621     """
622     bad = False
623
624     for node in node_vol_is:
625       for volume in node_vol_is[node]:
626         if node not in node_vol_should or volume not in node_vol_should[node]:
627           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
628                       (volume, node))
629           bad = True
630     return bad
631
632   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
633     """Verify the list of running instances.
634
635     This checks what instances are running but unknown to the cluster.
636
637     """
638     bad = False
639     for node in node_instance:
640       for runninginstance in node_instance[node]:
641         if runninginstance not in instancelist:
642           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
643                           (runninginstance, node))
644           bad = True
645     return bad
646
647   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
648     """Verify N+1 Memory Resilience.
649
650     Check that if one single node dies we can still start all the instances it
651     was primary for.
652
653     """
654     bad = False
655
656     for node, nodeinfo in node_info.iteritems():
657       # This code checks that every node which is now listed as secondary has
658       # enough memory to host all instances it is supposed to should a single
659       # other node in the cluster fail.
660       # FIXME: not ready for failover to an arbitrary node
661       # FIXME: does not support file-backed instances
662       # WARNING: we currently take into account down instances as well as up
663       # ones, considering that even if they're down someone might want to start
664       # them even in the event of a node failure.
665       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
666         needed_mem = 0
667         for instance in instances:
668           needed_mem += instance_cfg[instance].memory
669         if nodeinfo['mfree'] < needed_mem:
670           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
671                       " failovers should node %s fail" % (node, prinode))
672           bad = True
673     return bad
674
675   def CheckPrereq(self):
676     """Check prerequisites.
677
678     Transform the list of checks we're going to skip into a set and check that
679     all its members are valid.
680
681     """
682     self.skip_set = frozenset(self.op.skip_checks)
683     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
684       raise errors.OpPrereqError("Invalid checks to be skipped specified")
685
686   def BuildHooksEnv(self):
687     """Build hooks env.
688
689     Cluster-Verify hooks just rone in the post phase and their failure makes
690     the output be logged in the verify output and the verification to fail.
691
692     """
693     all_nodes = self.cfg.GetNodeList()
694     # TODO: populate the environment with useful information for verify hooks
695     env = {}
696     return env, [], all_nodes
697
698   def Exec(self, feedback_fn):
699     """Verify integrity of cluster, performing various test on nodes.
700
701     """
702     bad = False
703     feedback_fn("* Verifying global settings")
704     for msg in self.cfg.VerifyConfig():
705       feedback_fn("  - ERROR: %s" % msg)
706
707     vg_name = self.cfg.GetVGName()
708     nodelist = utils.NiceSort(self.cfg.GetNodeList())
709     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
710     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
711     i_non_redundant = [] # Non redundant instances
712     node_volume = {}
713     node_instance = {}
714     node_info = {}
715     instance_cfg = {}
716
717     # FIXME: verify OS list
718     # do local checksums
719     file_names = list(self.sstore.GetFileList())
720     file_names.append(constants.SSL_CERT_FILE)
721     file_names.append(constants.CLUSTER_CONF_FILE)
722     local_checksums = utils.FingerprintFiles(file_names)
723
724     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
725     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
726     all_instanceinfo = rpc.call_instance_list(nodelist)
727     all_vglist = rpc.call_vg_list(nodelist)
728     node_verify_param = {
729       'filelist': file_names,
730       'nodelist': nodelist,
731       'hypervisor': None,
732       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
733                         for node in nodeinfo]
734       }
735     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
736     all_rversion = rpc.call_version(nodelist)
737     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
738
739     for node in nodelist:
740       feedback_fn("* Verifying node %s" % node)
741       result = self._VerifyNode(node, file_names, local_checksums,
742                                 all_vglist[node], all_nvinfo[node],
743                                 all_rversion[node], feedback_fn)
744       bad = bad or result
745
746       # node_volume
747       volumeinfo = all_volumeinfo[node]
748
749       if isinstance(volumeinfo, basestring):
750         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
751                     (node, volumeinfo[-400:].encode('string_escape')))
752         bad = True
753         node_volume[node] = {}
754       elif not isinstance(volumeinfo, dict):
755         feedback_fn("  - ERROR: connection to %s failed" % (node,))
756         bad = True
757         continue
758       else:
759         node_volume[node] = volumeinfo
760
761       # node_instance
762       nodeinstance = all_instanceinfo[node]
763       if type(nodeinstance) != list:
764         feedback_fn("  - ERROR: connection to %s failed" % (node,))
765         bad = True
766         continue
767
768       node_instance[node] = nodeinstance
769
770       # node_info
771       nodeinfo = all_ninfo[node]
772       if not isinstance(nodeinfo, dict):
773         feedback_fn("  - ERROR: connection to %s failed" % (node,))
774         bad = True
775         continue
776
777       try:
778         node_info[node] = {
779           "mfree": int(nodeinfo['memory_free']),
780           "dfree": int(nodeinfo['vg_free']),
781           "pinst": [],
782           "sinst": [],
783           # dictionary holding all instances this node is secondary for,
784           # grouped by their primary node. Each key is a cluster node, and each
785           # value is a list of instances which have the key as primary and the
786           # current node as secondary.  this is handy to calculate N+1 memory
787           # availability if you can only failover from a primary to its
788           # secondary.
789           "sinst-by-pnode": {},
790         }
791       except ValueError:
792         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
793         bad = True
794         continue
795
796     node_vol_should = {}
797
798     for instance in instancelist:
799       feedback_fn("* Verifying instance %s" % instance)
800       inst_config = self.cfg.GetInstanceInfo(instance)
801       result =  self._VerifyInstance(instance, inst_config, node_volume,
802                                      node_instance, feedback_fn)
803       bad = bad or result
804
805       inst_config.MapLVsByNode(node_vol_should)
806
807       instance_cfg[instance] = inst_config
808
809       pnode = inst_config.primary_node
810       if pnode in node_info:
811         node_info[pnode]['pinst'].append(instance)
812       else:
813         feedback_fn("  - ERROR: instance %s, connection to primary node"
814                     " %s failed" % (instance, pnode))
815         bad = True
816
817       # If the instance is non-redundant we cannot survive losing its primary
818       # node, so we are not N+1 compliant. On the other hand we have no disk
819       # templates with more than one secondary so that situation is not well
820       # supported either.
821       # FIXME: does not support file-backed instances
822       if len(inst_config.secondary_nodes) == 0:
823         i_non_redundant.append(instance)
824       elif len(inst_config.secondary_nodes) > 1:
825         feedback_fn("  - WARNING: multiple secondaries for instance %s"
826                     % instance)
827
828       for snode in inst_config.secondary_nodes:
829         if snode in node_info:
830           node_info[snode]['sinst'].append(instance)
831           if pnode not in node_info[snode]['sinst-by-pnode']:
832             node_info[snode]['sinst-by-pnode'][pnode] = []
833           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
834         else:
835           feedback_fn("  - ERROR: instance %s, connection to secondary node"
836                       " %s failed" % (instance, snode))
837
838     feedback_fn("* Verifying orphan volumes")
839     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
840                                        feedback_fn)
841     bad = bad or result
842
843     feedback_fn("* Verifying remaining instances")
844     result = self._VerifyOrphanInstances(instancelist, node_instance,
845                                          feedback_fn)
846     bad = bad or result
847
848     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
849       feedback_fn("* Verifying N+1 Memory redundancy")
850       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
851       bad = bad or result
852
853     feedback_fn("* Other Notes")
854     if i_non_redundant:
855       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
856                   % len(i_non_redundant))
857
858     return int(bad)
859
860   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
861     """Analize the post-hooks' result, handle it, and send some
862     nicely-formatted feedback back to the user.
863
864     Args:
865       phase: the hooks phase that has just been run
866       hooks_results: the results of the multi-node hooks rpc call
867       feedback_fn: function to send feedback back to the caller
868       lu_result: previous Exec result
869
870     """
871     # We only really run POST phase hooks, and are only interested in
872     # their results
873     if phase == constants.HOOKS_PHASE_POST:
874       # Used to change hooks' output to proper indentation
875       indent_re = re.compile('^', re.M)
876       feedback_fn("* Hooks Results")
877       if not hooks_results:
878         feedback_fn("  - ERROR: general communication failure")
879         lu_result = 1
880       else:
881         for node_name in hooks_results:
882           show_node_header = True
883           res = hooks_results[node_name]
884           if res is False or not isinstance(res, list):
885             feedback_fn("    Communication failure")
886             lu_result = 1
887             continue
888           for script, hkr, output in res:
889             if hkr == constants.HKR_FAIL:
890               # The node header is only shown once, if there are
891               # failing hooks on that node
892               if show_node_header:
893                 feedback_fn("  Node %s:" % node_name)
894                 show_node_header = False
895               feedback_fn("    ERROR: Script %s failed, output:" % script)
896               output = indent_re.sub('      ', output)
897               feedback_fn("%s" % output)
898               lu_result = 1
899
900       return lu_result
901
902
903 class LUVerifyDisks(NoHooksLU):
904   """Verifies the cluster disks status.
905
906   """
907   _OP_REQP = []
908
909   def CheckPrereq(self):
910     """Check prerequisites.
911
912     This has no prerequisites.
913
914     """
915     pass
916
917   def Exec(self, feedback_fn):
918     """Verify integrity of cluster disks.
919
920     """
921     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
922
923     vg_name = self.cfg.GetVGName()
924     nodes = utils.NiceSort(self.cfg.GetNodeList())
925     instances = [self.cfg.GetInstanceInfo(name)
926                  for name in self.cfg.GetInstanceList()]
927
928     nv_dict = {}
929     for inst in instances:
930       inst_lvs = {}
931       if (inst.status != "up" or
932           inst.disk_template not in constants.DTS_NET_MIRROR):
933         continue
934       inst.MapLVsByNode(inst_lvs)
935       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
936       for node, vol_list in inst_lvs.iteritems():
937         for vol in vol_list:
938           nv_dict[(node, vol)] = inst
939
940     if not nv_dict:
941       return result
942
943     node_lvs = rpc.call_volume_list(nodes, vg_name)
944
945     to_act = set()
946     for node in nodes:
947       # node_volume
948       lvs = node_lvs[node]
949
950       if isinstance(lvs, basestring):
951         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
952         res_nlvm[node] = lvs
953       elif not isinstance(lvs, dict):
954         logger.Info("connection to node %s failed or invalid data returned" %
955                     (node,))
956         res_nodes.append(node)
957         continue
958
959       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
960         inst = nv_dict.pop((node, lv_name), None)
961         if (not lv_online and inst is not None
962             and inst.name not in res_instances):
963           res_instances.append(inst.name)
964
965     # any leftover items in nv_dict are missing LVs, let's arrange the
966     # data better
967     for key, inst in nv_dict.iteritems():
968       if inst.name not in res_missing:
969         res_missing[inst.name] = []
970       res_missing[inst.name].append(key)
971
972     return result
973
974
975 class LURenameCluster(LogicalUnit):
976   """Rename the cluster.
977
978   """
979   HPATH = "cluster-rename"
980   HTYPE = constants.HTYPE_CLUSTER
981   _OP_REQP = ["name"]
982   REQ_WSSTORE = True
983
984   def BuildHooksEnv(self):
985     """Build hooks env.
986
987     """
988     env = {
989       "OP_TARGET": self.sstore.GetClusterName(),
990       "NEW_NAME": self.op.name,
991       }
992     mn = self.sstore.GetMasterNode()
993     return env, [mn], [mn]
994
995   def CheckPrereq(self):
996     """Verify that the passed name is a valid one.
997
998     """
999     hostname = utils.HostInfo(self.op.name)
1000
1001     new_name = hostname.name
1002     self.ip = new_ip = hostname.ip
1003     old_name = self.sstore.GetClusterName()
1004     old_ip = self.sstore.GetMasterIP()
1005     if new_name == old_name and new_ip == old_ip:
1006       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1007                                  " cluster has changed")
1008     if new_ip != old_ip:
1009       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1010         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1011                                    " reachable on the network. Aborting." %
1012                                    new_ip)
1013
1014     self.op.name = new_name
1015
1016   def Exec(self, feedback_fn):
1017     """Rename the cluster.
1018
1019     """
1020     clustername = self.op.name
1021     ip = self.ip
1022     ss = self.sstore
1023
1024     # shutdown the master IP
1025     master = ss.GetMasterNode()
1026     if not rpc.call_node_stop_master(master, False):
1027       raise errors.OpExecError("Could not disable the master role")
1028
1029     try:
1030       # modify the sstore
1031       ss.SetKey(ss.SS_MASTER_IP, ip)
1032       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1033
1034       # Distribute updated ss config to all nodes
1035       myself = self.cfg.GetNodeInfo(master)
1036       dist_nodes = self.cfg.GetNodeList()
1037       if myself.name in dist_nodes:
1038         dist_nodes.remove(myself.name)
1039
1040       logger.Debug("Copying updated ssconf data to all nodes")
1041       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1042         fname = ss.KeyToFilename(keyname)
1043         result = rpc.call_upload_file(dist_nodes, fname)
1044         for to_node in dist_nodes:
1045           if not result[to_node]:
1046             logger.Error("copy of file %s to node %s failed" %
1047                          (fname, to_node))
1048     finally:
1049       if not rpc.call_node_start_master(master, False):
1050         logger.Error("Could not re-enable the master role on the master,"
1051                      " please restart manually.")
1052
1053
1054 def _RecursiveCheckIfLVMBased(disk):
1055   """Check if the given disk or its children are lvm-based.
1056
1057   Args:
1058     disk: ganeti.objects.Disk object
1059
1060   Returns:
1061     boolean indicating whether a LD_LV dev_type was found or not
1062
1063   """
1064   if disk.children:
1065     for chdisk in disk.children:
1066       if _RecursiveCheckIfLVMBased(chdisk):
1067         return True
1068   return disk.dev_type == constants.LD_LV
1069
1070
1071 class LUSetClusterParams(LogicalUnit):
1072   """Change the parameters of the cluster.
1073
1074   """
1075   HPATH = "cluster-modify"
1076   HTYPE = constants.HTYPE_CLUSTER
1077   _OP_REQP = []
1078
1079   def BuildHooksEnv(self):
1080     """Build hooks env.
1081
1082     """
1083     env = {
1084       "OP_TARGET": self.sstore.GetClusterName(),
1085       "NEW_VG_NAME": self.op.vg_name,
1086       }
1087     mn = self.sstore.GetMasterNode()
1088     return env, [mn], [mn]
1089
1090   def CheckPrereq(self):
1091     """Check prerequisites.
1092
1093     This checks whether the given params don't conflict and
1094     if the given volume group is valid.
1095
1096     """
1097     if not self.op.vg_name:
1098       instances = [self.cfg.GetInstanceInfo(name)
1099                    for name in self.cfg.GetInstanceList()]
1100       for inst in instances:
1101         for disk in inst.disks:
1102           if _RecursiveCheckIfLVMBased(disk):
1103             raise errors.OpPrereqError("Cannot disable lvm storage while"
1104                                        " lvm-based instances exist")
1105
1106     # if vg_name not None, checks given volume group on all nodes
1107     if self.op.vg_name:
1108       node_list = self.cfg.GetNodeList()
1109       vglist = rpc.call_vg_list(node_list)
1110       for node in node_list:
1111         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1112                                               constants.MIN_VG_SIZE)
1113         if vgstatus:
1114           raise errors.OpPrereqError("Error on node '%s': %s" %
1115                                      (node, vgstatus))
1116
1117   def Exec(self, feedback_fn):
1118     """Change the parameters of the cluster.
1119
1120     """
1121     if self.op.vg_name != self.cfg.GetVGName():
1122       self.cfg.SetVGName(self.op.vg_name)
1123     else:
1124       feedback_fn("Cluster LVM configuration already in desired"
1125                   " state, not changing")
1126
1127
1128 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1129   """Sleep and poll for an instance's disk to sync.
1130
1131   """
1132   if not instance.disks:
1133     return True
1134
1135   if not oneshot:
1136     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1137
1138   node = instance.primary_node
1139
1140   for dev in instance.disks:
1141     cfgw.SetDiskID(dev, node)
1142
1143   retries = 0
1144   while True:
1145     max_time = 0
1146     done = True
1147     cumul_degraded = False
1148     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1149     if not rstats:
1150       proc.LogWarning("Can't get any data from node %s" % node)
1151       retries += 1
1152       if retries >= 10:
1153         raise errors.RemoteError("Can't contact node %s for mirror data,"
1154                                  " aborting." % node)
1155       time.sleep(6)
1156       continue
1157     retries = 0
1158     for i in range(len(rstats)):
1159       mstat = rstats[i]
1160       if mstat is None:
1161         proc.LogWarning("Can't compute data for node %s/%s" %
1162                         (node, instance.disks[i].iv_name))
1163         continue
1164       # we ignore the ldisk parameter
1165       perc_done, est_time, is_degraded, _ = mstat
1166       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1167       if perc_done is not None:
1168         done = False
1169         if est_time is not None:
1170           rem_time = "%d estimated seconds remaining" % est_time
1171           max_time = est_time
1172         else:
1173           rem_time = "no time estimate"
1174         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1175                      (instance.disks[i].iv_name, perc_done, rem_time))
1176     if done or oneshot:
1177       break
1178
1179     time.sleep(min(60, max_time))
1180
1181   if done:
1182     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1183   return not cumul_degraded
1184
1185
1186 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1187   """Check that mirrors are not degraded.
1188
1189   The ldisk parameter, if True, will change the test from the
1190   is_degraded attribute (which represents overall non-ok status for
1191   the device(s)) to the ldisk (representing the local storage status).
1192
1193   """
1194   cfgw.SetDiskID(dev, node)
1195   if ldisk:
1196     idx = 6
1197   else:
1198     idx = 5
1199
1200   result = True
1201   if on_primary or dev.AssembleOnSecondary():
1202     rstats = rpc.call_blockdev_find(node, dev)
1203     if not rstats:
1204       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1205       result = False
1206     else:
1207       result = result and (not rstats[idx])
1208   if dev.children:
1209     for child in dev.children:
1210       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1211
1212   return result
1213
1214
1215 class LUDiagnoseOS(NoHooksLU):
1216   """Logical unit for OS diagnose/query.
1217
1218   """
1219   _OP_REQP = ["output_fields", "names"]
1220
1221   def CheckPrereq(self):
1222     """Check prerequisites.
1223
1224     This always succeeds, since this is a pure query LU.
1225
1226     """
1227     if self.op.names:
1228       raise errors.OpPrereqError("Selective OS query not supported")
1229
1230     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1231     _CheckOutputFields(static=[],
1232                        dynamic=self.dynamic_fields,
1233                        selected=self.op.output_fields)
1234
1235   @staticmethod
1236   def _DiagnoseByOS(node_list, rlist):
1237     """Remaps a per-node return list into an a per-os per-node dictionary
1238
1239       Args:
1240         node_list: a list with the names of all nodes
1241         rlist: a map with node names as keys and OS objects as values
1242
1243       Returns:
1244         map: a map with osnames as keys and as value another map, with
1245              nodes as
1246              keys and list of OS objects as values
1247              e.g. {"debian-etch": {"node1": [<object>,...],
1248                                    "node2": [<object>,]}
1249                   }
1250
1251     """
1252     all_os = {}
1253     for node_name, nr in rlist.iteritems():
1254       if not nr:
1255         continue
1256       for os_obj in nr:
1257         if os_obj.name not in all_os:
1258           # build a list of nodes for this os containing empty lists
1259           # for each node in node_list
1260           all_os[os_obj.name] = {}
1261           for nname in node_list:
1262             all_os[os_obj.name][nname] = []
1263         all_os[os_obj.name][node_name].append(os_obj)
1264     return all_os
1265
1266   def Exec(self, feedback_fn):
1267     """Compute the list of OSes.
1268
1269     """
1270     node_list = self.cfg.GetNodeList()
1271     node_data = rpc.call_os_diagnose(node_list)
1272     if node_data == False:
1273       raise errors.OpExecError("Can't gather the list of OSes")
1274     pol = self._DiagnoseByOS(node_list, node_data)
1275     output = []
1276     for os_name, os_data in pol.iteritems():
1277       row = []
1278       for field in self.op.output_fields:
1279         if field == "name":
1280           val = os_name
1281         elif field == "valid":
1282           val = utils.all([osl and osl[0] for osl in os_data.values()])
1283         elif field == "node_status":
1284           val = {}
1285           for node_name, nos_list in os_data.iteritems():
1286             val[node_name] = [(v.status, v.path) for v in nos_list]
1287         else:
1288           raise errors.ParameterError(field)
1289         row.append(val)
1290       output.append(row)
1291
1292     return output
1293
1294
1295 class LURemoveNode(LogicalUnit):
1296   """Logical unit for removing a node.
1297
1298   """
1299   HPATH = "node-remove"
1300   HTYPE = constants.HTYPE_NODE
1301   _OP_REQP = ["node_name"]
1302
1303   def BuildHooksEnv(self):
1304     """Build hooks env.
1305
1306     This doesn't run on the target node in the pre phase as a failed
1307     node would then be impossible to remove.
1308
1309     """
1310     env = {
1311       "OP_TARGET": self.op.node_name,
1312       "NODE_NAME": self.op.node_name,
1313       }
1314     all_nodes = self.cfg.GetNodeList()
1315     all_nodes.remove(self.op.node_name)
1316     return env, all_nodes, all_nodes
1317
1318   def CheckPrereq(self):
1319     """Check prerequisites.
1320
1321     This checks:
1322      - the node exists in the configuration
1323      - it does not have primary or secondary instances
1324      - it's not the master
1325
1326     Any errors are signalled by raising errors.OpPrereqError.
1327
1328     """
1329     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1330     if node is None:
1331       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1332
1333     instance_list = self.cfg.GetInstanceList()
1334
1335     masternode = self.sstore.GetMasterNode()
1336     if node.name == masternode:
1337       raise errors.OpPrereqError("Node is the master node,"
1338                                  " you need to failover first.")
1339
1340     for instance_name in instance_list:
1341       instance = self.cfg.GetInstanceInfo(instance_name)
1342       if node.name == instance.primary_node:
1343         raise errors.OpPrereqError("Instance %s still running on the node,"
1344                                    " please remove first." % instance_name)
1345       if node.name in instance.secondary_nodes:
1346         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1347                                    " please remove first." % instance_name)
1348     self.op.node_name = node.name
1349     self.node = node
1350
1351   def Exec(self, feedback_fn):
1352     """Removes the node from the cluster.
1353
1354     """
1355     node = self.node
1356     logger.Info("stopping the node daemon and removing configs from node %s" %
1357                 node.name)
1358
1359     rpc.call_node_leave_cluster(node.name)
1360
1361     logger.Info("Removing node %s from config" % node.name)
1362
1363     self.cfg.RemoveNode(node.name)
1364     # Remove the node from the Ganeti Lock Manager
1365     self.context.glm.remove(locking.LEVEL_NODE, node.name)
1366
1367     utils.RemoveHostFromEtcHosts(node.name)
1368
1369
1370 class LUQueryNodes(NoHooksLU):
1371   """Logical unit for querying nodes.
1372
1373   """
1374   _OP_REQP = ["output_fields", "names"]
1375
1376   def CheckPrereq(self):
1377     """Check prerequisites.
1378
1379     This checks that the fields required are valid output fields.
1380
1381     """
1382     self.dynamic_fields = frozenset([
1383       "dtotal", "dfree",
1384       "mtotal", "mnode", "mfree",
1385       "bootid",
1386       "ctotal",
1387       ])
1388
1389     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1390                                "pinst_list", "sinst_list",
1391                                "pip", "sip", "tags"],
1392                        dynamic=self.dynamic_fields,
1393                        selected=self.op.output_fields)
1394
1395     self.wanted = _GetWantedNodes(self, self.op.names)
1396
1397   def Exec(self, feedback_fn):
1398     """Computes the list of nodes and their attributes.
1399
1400     """
1401     nodenames = self.wanted
1402     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1403
1404     # begin data gathering
1405
1406     if self.dynamic_fields.intersection(self.op.output_fields):
1407       live_data = {}
1408       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1409       for name in nodenames:
1410         nodeinfo = node_data.get(name, None)
1411         if nodeinfo:
1412           live_data[name] = {
1413             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1414             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1415             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1416             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1417             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1418             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1419             "bootid": nodeinfo['bootid'],
1420             }
1421         else:
1422           live_data[name] = {}
1423     else:
1424       live_data = dict.fromkeys(nodenames, {})
1425
1426     node_to_primary = dict([(name, set()) for name in nodenames])
1427     node_to_secondary = dict([(name, set()) for name in nodenames])
1428
1429     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1430                              "sinst_cnt", "sinst_list"))
1431     if inst_fields & frozenset(self.op.output_fields):
1432       instancelist = self.cfg.GetInstanceList()
1433
1434       for instance_name in instancelist:
1435         inst = self.cfg.GetInstanceInfo(instance_name)
1436         if inst.primary_node in node_to_primary:
1437           node_to_primary[inst.primary_node].add(inst.name)
1438         for secnode in inst.secondary_nodes:
1439           if secnode in node_to_secondary:
1440             node_to_secondary[secnode].add(inst.name)
1441
1442     # end data gathering
1443
1444     output = []
1445     for node in nodelist:
1446       node_output = []
1447       for field in self.op.output_fields:
1448         if field == "name":
1449           val = node.name
1450         elif field == "pinst_list":
1451           val = list(node_to_primary[node.name])
1452         elif field == "sinst_list":
1453           val = list(node_to_secondary[node.name])
1454         elif field == "pinst_cnt":
1455           val = len(node_to_primary[node.name])
1456         elif field == "sinst_cnt":
1457           val = len(node_to_secondary[node.name])
1458         elif field == "pip":
1459           val = node.primary_ip
1460         elif field == "sip":
1461           val = node.secondary_ip
1462         elif field == "tags":
1463           val = list(node.GetTags())
1464         elif field in self.dynamic_fields:
1465           val = live_data[node.name].get(field, None)
1466         else:
1467           raise errors.ParameterError(field)
1468         node_output.append(val)
1469       output.append(node_output)
1470
1471     return output
1472
1473
1474 class LUQueryNodeVolumes(NoHooksLU):
1475   """Logical unit for getting volumes on node(s).
1476
1477   """
1478   _OP_REQP = ["nodes", "output_fields"]
1479
1480   def CheckPrereq(self):
1481     """Check prerequisites.
1482
1483     This checks that the fields required are valid output fields.
1484
1485     """
1486     self.nodes = _GetWantedNodes(self, self.op.nodes)
1487
1488     _CheckOutputFields(static=["node"],
1489                        dynamic=["phys", "vg", "name", "size", "instance"],
1490                        selected=self.op.output_fields)
1491
1492
1493   def Exec(self, feedback_fn):
1494     """Computes the list of nodes and their attributes.
1495
1496     """
1497     nodenames = self.nodes
1498     volumes = rpc.call_node_volumes(nodenames)
1499
1500     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1501              in self.cfg.GetInstanceList()]
1502
1503     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1504
1505     output = []
1506     for node in nodenames:
1507       if node not in volumes or not volumes[node]:
1508         continue
1509
1510       node_vols = volumes[node][:]
1511       node_vols.sort(key=lambda vol: vol['dev'])
1512
1513       for vol in node_vols:
1514         node_output = []
1515         for field in self.op.output_fields:
1516           if field == "node":
1517             val = node
1518           elif field == "phys":
1519             val = vol['dev']
1520           elif field == "vg":
1521             val = vol['vg']
1522           elif field == "name":
1523             val = vol['name']
1524           elif field == "size":
1525             val = int(float(vol['size']))
1526           elif field == "instance":
1527             for inst in ilist:
1528               if node not in lv_by_node[inst]:
1529                 continue
1530               if vol['name'] in lv_by_node[inst][node]:
1531                 val = inst.name
1532                 break
1533             else:
1534               val = '-'
1535           else:
1536             raise errors.ParameterError(field)
1537           node_output.append(str(val))
1538
1539         output.append(node_output)
1540
1541     return output
1542
1543
1544 class LUAddNode(LogicalUnit):
1545   """Logical unit for adding node to the cluster.
1546
1547   """
1548   HPATH = "node-add"
1549   HTYPE = constants.HTYPE_NODE
1550   _OP_REQP = ["node_name"]
1551
1552   def BuildHooksEnv(self):
1553     """Build hooks env.
1554
1555     This will run on all nodes before, and on all nodes + the new node after.
1556
1557     """
1558     env = {
1559       "OP_TARGET": self.op.node_name,
1560       "NODE_NAME": self.op.node_name,
1561       "NODE_PIP": self.op.primary_ip,
1562       "NODE_SIP": self.op.secondary_ip,
1563       }
1564     nodes_0 = self.cfg.GetNodeList()
1565     nodes_1 = nodes_0 + [self.op.node_name, ]
1566     return env, nodes_0, nodes_1
1567
1568   def CheckPrereq(self):
1569     """Check prerequisites.
1570
1571     This checks:
1572      - the new node is not already in the config
1573      - it is resolvable
1574      - its parameters (single/dual homed) matches the cluster
1575
1576     Any errors are signalled by raising errors.OpPrereqError.
1577
1578     """
1579     node_name = self.op.node_name
1580     cfg = self.cfg
1581
1582     dns_data = utils.HostInfo(node_name)
1583
1584     node = dns_data.name
1585     primary_ip = self.op.primary_ip = dns_data.ip
1586     secondary_ip = getattr(self.op, "secondary_ip", None)
1587     if secondary_ip is None:
1588       secondary_ip = primary_ip
1589     if not utils.IsValidIP(secondary_ip):
1590       raise errors.OpPrereqError("Invalid secondary IP given")
1591     self.op.secondary_ip = secondary_ip
1592
1593     node_list = cfg.GetNodeList()
1594     if not self.op.readd and node in node_list:
1595       raise errors.OpPrereqError("Node %s is already in the configuration" %
1596                                  node)
1597     elif self.op.readd and node not in node_list:
1598       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1599
1600     for existing_node_name in node_list:
1601       existing_node = cfg.GetNodeInfo(existing_node_name)
1602
1603       if self.op.readd and node == existing_node_name:
1604         if (existing_node.primary_ip != primary_ip or
1605             existing_node.secondary_ip != secondary_ip):
1606           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1607                                      " address configuration as before")
1608         continue
1609
1610       if (existing_node.primary_ip == primary_ip or
1611           existing_node.secondary_ip == primary_ip or
1612           existing_node.primary_ip == secondary_ip or
1613           existing_node.secondary_ip == secondary_ip):
1614         raise errors.OpPrereqError("New node ip address(es) conflict with"
1615                                    " existing node %s" % existing_node.name)
1616
1617     # check that the type of the node (single versus dual homed) is the
1618     # same as for the master
1619     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1620     master_singlehomed = myself.secondary_ip == myself.primary_ip
1621     newbie_singlehomed = secondary_ip == primary_ip
1622     if master_singlehomed != newbie_singlehomed:
1623       if master_singlehomed:
1624         raise errors.OpPrereqError("The master has no private ip but the"
1625                                    " new node has one")
1626       else:
1627         raise errors.OpPrereqError("The master has a private ip but the"
1628                                    " new node doesn't have one")
1629
1630     # checks reachablity
1631     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1632       raise errors.OpPrereqError("Node not reachable by ping")
1633
1634     if not newbie_singlehomed:
1635       # check reachability from my secondary ip to newbie's secondary ip
1636       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1637                            source=myself.secondary_ip):
1638         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1639                                    " based ping to noded port")
1640
1641     self.new_node = objects.Node(name=node,
1642                                  primary_ip=primary_ip,
1643                                  secondary_ip=secondary_ip)
1644
1645   def Exec(self, feedback_fn):
1646     """Adds the new node to the cluster.
1647
1648     """
1649     new_node = self.new_node
1650     node = new_node.name
1651
1652     # check connectivity
1653     result = rpc.call_version([node])[node]
1654     if result:
1655       if constants.PROTOCOL_VERSION == result:
1656         logger.Info("communication to node %s fine, sw version %s match" %
1657                     (node, result))
1658       else:
1659         raise errors.OpExecError("Version mismatch master version %s,"
1660                                  " node version %s" %
1661                                  (constants.PROTOCOL_VERSION, result))
1662     else:
1663       raise errors.OpExecError("Cannot get version from the new node")
1664
1665     # setup ssh on node
1666     logger.Info("copy ssh key to node %s" % node)
1667     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1668     keyarray = []
1669     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1670                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1671                 priv_key, pub_key]
1672
1673     for i in keyfiles:
1674       f = open(i, 'r')
1675       try:
1676         keyarray.append(f.read())
1677       finally:
1678         f.close()
1679
1680     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1681                                keyarray[3], keyarray[4], keyarray[5])
1682
1683     if not result:
1684       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1685
1686     # Add node to our /etc/hosts, and add key to known_hosts
1687     utils.AddHostToEtcHosts(new_node.name)
1688
1689     if new_node.secondary_ip != new_node.primary_ip:
1690       if not rpc.call_node_tcp_ping(new_node.name,
1691                                     constants.LOCALHOST_IP_ADDRESS,
1692                                     new_node.secondary_ip,
1693                                     constants.DEFAULT_NODED_PORT,
1694                                     10, False):
1695         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1696                                  " you gave (%s). Please fix and re-run this"
1697                                  " command." % new_node.secondary_ip)
1698
1699     node_verify_list = [self.sstore.GetMasterNode()]
1700     node_verify_param = {
1701       'nodelist': [node],
1702       # TODO: do a node-net-test as well?
1703     }
1704
1705     result = rpc.call_node_verify(node_verify_list, node_verify_param)
1706     for verifier in node_verify_list:
1707       if not result[verifier]:
1708         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1709                                  " for remote verification" % verifier)
1710       if result[verifier]['nodelist']:
1711         for failed in result[verifier]['nodelist']:
1712           feedback_fn("ssh/hostname verification failed %s -> %s" %
1713                       (verifier, result[verifier]['nodelist'][failed]))
1714         raise errors.OpExecError("ssh/hostname verification failed.")
1715
1716     # Distribute updated /etc/hosts and known_hosts to all nodes,
1717     # including the node just added
1718     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1719     dist_nodes = self.cfg.GetNodeList()
1720     if not self.op.readd:
1721       dist_nodes.append(node)
1722     if myself.name in dist_nodes:
1723       dist_nodes.remove(myself.name)
1724
1725     logger.Debug("Copying hosts and known_hosts to all nodes")
1726     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1727       result = rpc.call_upload_file(dist_nodes, fname)
1728       for to_node in dist_nodes:
1729         if not result[to_node]:
1730           logger.Error("copy of file %s to node %s failed" %
1731                        (fname, to_node))
1732
1733     to_copy = self.sstore.GetFileList()
1734     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1735       to_copy.append(constants.VNC_PASSWORD_FILE)
1736     for fname in to_copy:
1737       result = rpc.call_upload_file([node], fname)
1738       if not result[node]:
1739         logger.Error("could not copy file %s to node %s" % (fname, node))
1740
1741     if not self.op.readd:
1742       logger.Info("adding node %s to cluster.conf" % node)
1743       self.cfg.AddNode(new_node)
1744       # Add the new node to the Ganeti Lock Manager
1745       self.context.glm.add(locking.LEVEL_NODE, node)
1746
1747
1748 class LUQueryClusterInfo(NoHooksLU):
1749   """Query cluster configuration.
1750
1751   """
1752   _OP_REQP = []
1753   REQ_MASTER = False
1754   REQ_BGL = False
1755
1756   def ExpandNames(self):
1757     self.needed_locks = {}
1758
1759   def CheckPrereq(self):
1760     """No prerequsites needed for this LU.
1761
1762     """
1763     pass
1764
1765   def Exec(self, feedback_fn):
1766     """Return cluster config.
1767
1768     """
1769     result = {
1770       "name": self.sstore.GetClusterName(),
1771       "software_version": constants.RELEASE_VERSION,
1772       "protocol_version": constants.PROTOCOL_VERSION,
1773       "config_version": constants.CONFIG_VERSION,
1774       "os_api_version": constants.OS_API_VERSION,
1775       "export_version": constants.EXPORT_VERSION,
1776       "master": self.sstore.GetMasterNode(),
1777       "architecture": (platform.architecture()[0], platform.machine()),
1778       "hypervisor_type": self.sstore.GetHypervisorType(),
1779       }
1780
1781     return result
1782
1783
1784 class LUDumpClusterConfig(NoHooksLU):
1785   """Return a text-representation of the cluster-config.
1786
1787   """
1788   _OP_REQP = []
1789   REQ_BGL = False
1790
1791   def ExpandNames(self):
1792     self.needed_locks = {}
1793
1794   def CheckPrereq(self):
1795     """No prerequisites.
1796
1797     """
1798     pass
1799
1800   def Exec(self, feedback_fn):
1801     """Dump a representation of the cluster config to the standard output.
1802
1803     """
1804     return self.cfg.DumpConfig()
1805
1806
1807 class LUActivateInstanceDisks(NoHooksLU):
1808   """Bring up an instance's disks.
1809
1810   """
1811   _OP_REQP = ["instance_name"]
1812
1813   def CheckPrereq(self):
1814     """Check prerequisites.
1815
1816     This checks that the instance is in the cluster.
1817
1818     """
1819     instance = self.cfg.GetInstanceInfo(
1820       self.cfg.ExpandInstanceName(self.op.instance_name))
1821     if instance is None:
1822       raise errors.OpPrereqError("Instance '%s' not known" %
1823                                  self.op.instance_name)
1824     self.instance = instance
1825
1826
1827   def Exec(self, feedback_fn):
1828     """Activate the disks.
1829
1830     """
1831     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1832     if not disks_ok:
1833       raise errors.OpExecError("Cannot activate block devices")
1834
1835     return disks_info
1836
1837
1838 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1839   """Prepare the block devices for an instance.
1840
1841   This sets up the block devices on all nodes.
1842
1843   Args:
1844     instance: a ganeti.objects.Instance object
1845     ignore_secondaries: if true, errors on secondary nodes won't result
1846                         in an error return from the function
1847
1848   Returns:
1849     false if the operation failed
1850     list of (host, instance_visible_name, node_visible_name) if the operation
1851          suceeded with the mapping from node devices to instance devices
1852   """
1853   device_info = []
1854   disks_ok = True
1855   iname = instance.name
1856   # With the two passes mechanism we try to reduce the window of
1857   # opportunity for the race condition of switching DRBD to primary
1858   # before handshaking occured, but we do not eliminate it
1859
1860   # The proper fix would be to wait (with some limits) until the
1861   # connection has been made and drbd transitions from WFConnection
1862   # into any other network-connected state (Connected, SyncTarget,
1863   # SyncSource, etc.)
1864
1865   # 1st pass, assemble on all nodes in secondary mode
1866   for inst_disk in instance.disks:
1867     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1868       cfg.SetDiskID(node_disk, node)
1869       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1870       if not result:
1871         logger.Error("could not prepare block device %s on node %s"
1872                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1873         if not ignore_secondaries:
1874           disks_ok = False
1875
1876   # FIXME: race condition on drbd migration to primary
1877
1878   # 2nd pass, do only the primary node
1879   for inst_disk in instance.disks:
1880     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1881       if node != instance.primary_node:
1882         continue
1883       cfg.SetDiskID(node_disk, node)
1884       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1885       if not result:
1886         logger.Error("could not prepare block device %s on node %s"
1887                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1888         disks_ok = False
1889     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1890
1891   # leave the disks configured for the primary node
1892   # this is a workaround that would be fixed better by
1893   # improving the logical/physical id handling
1894   for disk in instance.disks:
1895     cfg.SetDiskID(disk, instance.primary_node)
1896
1897   return disks_ok, device_info
1898
1899
1900 def _StartInstanceDisks(cfg, instance, force):
1901   """Start the disks of an instance.
1902
1903   """
1904   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1905                                            ignore_secondaries=force)
1906   if not disks_ok:
1907     _ShutdownInstanceDisks(instance, cfg)
1908     if force is not None and not force:
1909       logger.Error("If the message above refers to a secondary node,"
1910                    " you can retry the operation using '--force'.")
1911     raise errors.OpExecError("Disk consistency error")
1912
1913
1914 class LUDeactivateInstanceDisks(NoHooksLU):
1915   """Shutdown an instance's disks.
1916
1917   """
1918   _OP_REQP = ["instance_name"]
1919
1920   def CheckPrereq(self):
1921     """Check prerequisites.
1922
1923     This checks that the instance is in the cluster.
1924
1925     """
1926     instance = self.cfg.GetInstanceInfo(
1927       self.cfg.ExpandInstanceName(self.op.instance_name))
1928     if instance is None:
1929       raise errors.OpPrereqError("Instance '%s' not known" %
1930                                  self.op.instance_name)
1931     self.instance = instance
1932
1933   def Exec(self, feedback_fn):
1934     """Deactivate the disks
1935
1936     """
1937     instance = self.instance
1938     ins_l = rpc.call_instance_list([instance.primary_node])
1939     ins_l = ins_l[instance.primary_node]
1940     if not type(ins_l) is list:
1941       raise errors.OpExecError("Can't contact node '%s'" %
1942                                instance.primary_node)
1943
1944     if self.instance.name in ins_l:
1945       raise errors.OpExecError("Instance is running, can't shutdown"
1946                                " block devices.")
1947
1948     _ShutdownInstanceDisks(instance, self.cfg)
1949
1950
1951 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1952   """Shutdown block devices of an instance.
1953
1954   This does the shutdown on all nodes of the instance.
1955
1956   If the ignore_primary is false, errors on the primary node are
1957   ignored.
1958
1959   """
1960   result = True
1961   for disk in instance.disks:
1962     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1963       cfg.SetDiskID(top_disk, node)
1964       if not rpc.call_blockdev_shutdown(node, top_disk):
1965         logger.Error("could not shutdown block device %s on node %s" %
1966                      (disk.iv_name, node))
1967         if not ignore_primary or node != instance.primary_node:
1968           result = False
1969   return result
1970
1971
1972 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1973   """Checks if a node has enough free memory.
1974
1975   This function check if a given node has the needed amount of free
1976   memory. In case the node has less memory or we cannot get the
1977   information from the node, this function raise an OpPrereqError
1978   exception.
1979
1980   Args:
1981     - cfg: a ConfigWriter instance
1982     - node: the node name
1983     - reason: string to use in the error message
1984     - requested: the amount of memory in MiB
1985
1986   """
1987   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1988   if not nodeinfo or not isinstance(nodeinfo, dict):
1989     raise errors.OpPrereqError("Could not contact node %s for resource"
1990                              " information" % (node,))
1991
1992   free_mem = nodeinfo[node].get('memory_free')
1993   if not isinstance(free_mem, int):
1994     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1995                              " was '%s'" % (node, free_mem))
1996   if requested > free_mem:
1997     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1998                              " needed %s MiB, available %s MiB" %
1999                              (node, reason, requested, free_mem))
2000
2001
2002 class LUStartupInstance(LogicalUnit):
2003   """Starts an instance.
2004
2005   """
2006   HPATH = "instance-start"
2007   HTYPE = constants.HTYPE_INSTANCE
2008   _OP_REQP = ["instance_name", "force"]
2009   REQ_BGL = False
2010
2011   def ExpandNames(self):
2012     self._ExpandAndLockInstance()
2013     self.needed_locks[locking.LEVEL_NODE] = []
2014     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2015
2016   def DeclareLocks(self, level):
2017     if level == locking.LEVEL_NODE:
2018       self._LockInstancesNodes()
2019
2020   def BuildHooksEnv(self):
2021     """Build hooks env.
2022
2023     This runs on master, primary and secondary nodes of the instance.
2024
2025     """
2026     env = {
2027       "FORCE": self.op.force,
2028       }
2029     env.update(_BuildInstanceHookEnvByObject(self.instance))
2030     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2031           list(self.instance.secondary_nodes))
2032     return env, nl, nl
2033
2034   def CheckPrereq(self):
2035     """Check prerequisites.
2036
2037     This checks that the instance is in the cluster.
2038
2039     """
2040     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2041     assert self.instance is not None, \
2042       "Cannot retrieve locked instance %s" % self.op.instance_name
2043
2044     # check bridges existance
2045     _CheckInstanceBridgesExist(instance)
2046
2047     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2048                          "starting instance %s" % instance.name,
2049                          instance.memory)
2050
2051   def Exec(self, feedback_fn):
2052     """Start the instance.
2053
2054     """
2055     instance = self.instance
2056     force = self.op.force
2057     extra_args = getattr(self.op, "extra_args", "")
2058
2059     self.cfg.MarkInstanceUp(instance.name)
2060
2061     node_current = instance.primary_node
2062
2063     _StartInstanceDisks(self.cfg, instance, force)
2064
2065     if not rpc.call_instance_start(node_current, instance, extra_args):
2066       _ShutdownInstanceDisks(instance, self.cfg)
2067       raise errors.OpExecError("Could not start instance")
2068
2069
2070 class LURebootInstance(LogicalUnit):
2071   """Reboot an instance.
2072
2073   """
2074   HPATH = "instance-reboot"
2075   HTYPE = constants.HTYPE_INSTANCE
2076   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2077   REQ_BGL = False
2078
2079   def ExpandNames(self):
2080     self._ExpandAndLockInstance()
2081     self.needed_locks[locking.LEVEL_NODE] = []
2082     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2083
2084   def DeclareLocks(self, level):
2085     if level == locking.LEVEL_NODE:
2086       self._LockInstancesNodes()
2087
2088   def BuildHooksEnv(self):
2089     """Build hooks env.
2090
2091     This runs on master, primary and secondary nodes of the instance.
2092
2093     """
2094     env = {
2095       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2096       }
2097     env.update(_BuildInstanceHookEnvByObject(self.instance))
2098     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2099           list(self.instance.secondary_nodes))
2100     return env, nl, nl
2101
2102   def CheckPrereq(self):
2103     """Check prerequisites.
2104
2105     This checks that the instance is in the cluster.
2106
2107     """
2108     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2109     assert self.instance is not None, \
2110       "Cannot retrieve locked instance %s" % self.op.instance_name
2111
2112     # check bridges existance
2113     _CheckInstanceBridgesExist(instance)
2114
2115   def Exec(self, feedback_fn):
2116     """Reboot the instance.
2117
2118     """
2119     instance = self.instance
2120     ignore_secondaries = self.op.ignore_secondaries
2121     reboot_type = self.op.reboot_type
2122     extra_args = getattr(self.op, "extra_args", "")
2123
2124     node_current = instance.primary_node
2125
2126     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2127                            constants.INSTANCE_REBOOT_HARD,
2128                            constants.INSTANCE_REBOOT_FULL]:
2129       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2130                                   (constants.INSTANCE_REBOOT_SOFT,
2131                                    constants.INSTANCE_REBOOT_HARD,
2132                                    constants.INSTANCE_REBOOT_FULL))
2133
2134     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2135                        constants.INSTANCE_REBOOT_HARD]:
2136       if not rpc.call_instance_reboot(node_current, instance,
2137                                       reboot_type, extra_args):
2138         raise errors.OpExecError("Could not reboot instance")
2139     else:
2140       if not rpc.call_instance_shutdown(node_current, instance):
2141         raise errors.OpExecError("could not shutdown instance for full reboot")
2142       _ShutdownInstanceDisks(instance, self.cfg)
2143       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2144       if not rpc.call_instance_start(node_current, instance, extra_args):
2145         _ShutdownInstanceDisks(instance, self.cfg)
2146         raise errors.OpExecError("Could not start instance for full reboot")
2147
2148     self.cfg.MarkInstanceUp(instance.name)
2149
2150
2151 class LUShutdownInstance(LogicalUnit):
2152   """Shutdown an instance.
2153
2154   """
2155   HPATH = "instance-stop"
2156   HTYPE = constants.HTYPE_INSTANCE
2157   _OP_REQP = ["instance_name"]
2158   REQ_BGL = False
2159
2160   def ExpandNames(self):
2161     self._ExpandAndLockInstance()
2162     self.needed_locks[locking.LEVEL_NODE] = []
2163     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2164
2165   def DeclareLocks(self, level):
2166     if level == locking.LEVEL_NODE:
2167       self._LockInstancesNodes()
2168
2169   def BuildHooksEnv(self):
2170     """Build hooks env.
2171
2172     This runs on master, primary and secondary nodes of the instance.
2173
2174     """
2175     env = _BuildInstanceHookEnvByObject(self.instance)
2176     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2177           list(self.instance.secondary_nodes))
2178     return env, nl, nl
2179
2180   def CheckPrereq(self):
2181     """Check prerequisites.
2182
2183     This checks that the instance is in the cluster.
2184
2185     """
2186     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2187     assert self.instance is not None, \
2188       "Cannot retrieve locked instance %s" % self.op.instance_name
2189
2190   def Exec(self, feedback_fn):
2191     """Shutdown the instance.
2192
2193     """
2194     instance = self.instance
2195     node_current = instance.primary_node
2196     self.cfg.MarkInstanceDown(instance.name)
2197     if not rpc.call_instance_shutdown(node_current, instance):
2198       logger.Error("could not shutdown instance")
2199
2200     _ShutdownInstanceDisks(instance, self.cfg)
2201
2202
2203 class LUReinstallInstance(LogicalUnit):
2204   """Reinstall an instance.
2205
2206   """
2207   HPATH = "instance-reinstall"
2208   HTYPE = constants.HTYPE_INSTANCE
2209   _OP_REQP = ["instance_name"]
2210   REQ_BGL = False
2211
2212   def ExpandNames(self):
2213     self._ExpandAndLockInstance()
2214     self.needed_locks[locking.LEVEL_NODE] = []
2215     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2216
2217   def DeclareLocks(self, level):
2218     if level == locking.LEVEL_NODE:
2219       self._LockInstancesNodes()
2220
2221   def BuildHooksEnv(self):
2222     """Build hooks env.
2223
2224     This runs on master, primary and secondary nodes of the instance.
2225
2226     """
2227     env = _BuildInstanceHookEnvByObject(self.instance)
2228     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2229           list(self.instance.secondary_nodes))
2230     return env, nl, nl
2231
2232   def CheckPrereq(self):
2233     """Check prerequisites.
2234
2235     This checks that the instance is in the cluster and is not running.
2236
2237     """
2238     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2239     assert instance is not None, \
2240       "Cannot retrieve locked instance %s" % self.op.instance_name
2241
2242     if instance.disk_template == constants.DT_DISKLESS:
2243       raise errors.OpPrereqError("Instance '%s' has no disks" %
2244                                  self.op.instance_name)
2245     if instance.status != "down":
2246       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2247                                  self.op.instance_name)
2248     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2249     if remote_info:
2250       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2251                                  (self.op.instance_name,
2252                                   instance.primary_node))
2253
2254     self.op.os_type = getattr(self.op, "os_type", None)
2255     if self.op.os_type is not None:
2256       # OS verification
2257       pnode = self.cfg.GetNodeInfo(
2258         self.cfg.ExpandNodeName(instance.primary_node))
2259       if pnode is None:
2260         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2261                                    self.op.pnode)
2262       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2263       if not os_obj:
2264         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2265                                    " primary node"  % self.op.os_type)
2266
2267     self.instance = instance
2268
2269   def Exec(self, feedback_fn):
2270     """Reinstall the instance.
2271
2272     """
2273     inst = self.instance
2274
2275     if self.op.os_type is not None:
2276       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2277       inst.os = self.op.os_type
2278       self.cfg.AddInstance(inst)
2279
2280     _StartInstanceDisks(self.cfg, inst, None)
2281     try:
2282       feedback_fn("Running the instance OS create scripts...")
2283       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2284         raise errors.OpExecError("Could not install OS for instance %s"
2285                                  " on node %s" %
2286                                  (inst.name, inst.primary_node))
2287     finally:
2288       _ShutdownInstanceDisks(inst, self.cfg)
2289
2290
2291 class LURenameInstance(LogicalUnit):
2292   """Rename an instance.
2293
2294   """
2295   HPATH = "instance-rename"
2296   HTYPE = constants.HTYPE_INSTANCE
2297   _OP_REQP = ["instance_name", "new_name"]
2298
2299   def BuildHooksEnv(self):
2300     """Build hooks env.
2301
2302     This runs on master, primary and secondary nodes of the instance.
2303
2304     """
2305     env = _BuildInstanceHookEnvByObject(self.instance)
2306     env["INSTANCE_NEW_NAME"] = self.op.new_name
2307     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2308           list(self.instance.secondary_nodes))
2309     return env, nl, nl
2310
2311   def CheckPrereq(self):
2312     """Check prerequisites.
2313
2314     This checks that the instance is in the cluster and is not running.
2315
2316     """
2317     instance = self.cfg.GetInstanceInfo(
2318       self.cfg.ExpandInstanceName(self.op.instance_name))
2319     if instance is None:
2320       raise errors.OpPrereqError("Instance '%s' not known" %
2321                                  self.op.instance_name)
2322     if instance.status != "down":
2323       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2324                                  self.op.instance_name)
2325     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2326     if remote_info:
2327       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2328                                  (self.op.instance_name,
2329                                   instance.primary_node))
2330     self.instance = instance
2331
2332     # new name verification
2333     name_info = utils.HostInfo(self.op.new_name)
2334
2335     self.op.new_name = new_name = name_info.name
2336     instance_list = self.cfg.GetInstanceList()
2337     if new_name in instance_list:
2338       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2339                                  new_name)
2340
2341     if not getattr(self.op, "ignore_ip", False):
2342       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2343         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2344                                    (name_info.ip, new_name))
2345
2346
2347   def Exec(self, feedback_fn):
2348     """Reinstall the instance.
2349
2350     """
2351     inst = self.instance
2352     old_name = inst.name
2353
2354     if inst.disk_template == constants.DT_FILE:
2355       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2356
2357     self.cfg.RenameInstance(inst.name, self.op.new_name)
2358     # Change the instance lock. This is definitely safe while we hold the BGL
2359     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2360     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2361
2362     # re-read the instance from the configuration after rename
2363     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2364
2365     if inst.disk_template == constants.DT_FILE:
2366       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2367       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2368                                                 old_file_storage_dir,
2369                                                 new_file_storage_dir)
2370
2371       if not result:
2372         raise errors.OpExecError("Could not connect to node '%s' to rename"
2373                                  " directory '%s' to '%s' (but the instance"
2374                                  " has been renamed in Ganeti)" % (
2375                                  inst.primary_node, old_file_storage_dir,
2376                                  new_file_storage_dir))
2377
2378       if not result[0]:
2379         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2380                                  " (but the instance has been renamed in"
2381                                  " Ganeti)" % (old_file_storage_dir,
2382                                                new_file_storage_dir))
2383
2384     _StartInstanceDisks(self.cfg, inst, None)
2385     try:
2386       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2387                                           "sda", "sdb"):
2388         msg = ("Could run OS rename script for instance %s on node %s (but the"
2389                " instance has been renamed in Ganeti)" %
2390                (inst.name, inst.primary_node))
2391         logger.Error(msg)
2392     finally:
2393       _ShutdownInstanceDisks(inst, self.cfg)
2394
2395
2396 class LURemoveInstance(LogicalUnit):
2397   """Remove an instance.
2398
2399   """
2400   HPATH = "instance-remove"
2401   HTYPE = constants.HTYPE_INSTANCE
2402   _OP_REQP = ["instance_name", "ignore_failures"]
2403
2404   def BuildHooksEnv(self):
2405     """Build hooks env.
2406
2407     This runs on master, primary and secondary nodes of the instance.
2408
2409     """
2410     env = _BuildInstanceHookEnvByObject(self.instance)
2411     nl = [self.sstore.GetMasterNode()]
2412     return env, nl, nl
2413
2414   def CheckPrereq(self):
2415     """Check prerequisites.
2416
2417     This checks that the instance is in the cluster.
2418
2419     """
2420     instance = self.cfg.GetInstanceInfo(
2421       self.cfg.ExpandInstanceName(self.op.instance_name))
2422     if instance is None:
2423       raise errors.OpPrereqError("Instance '%s' not known" %
2424                                  self.op.instance_name)
2425     self.instance = instance
2426
2427   def Exec(self, feedback_fn):
2428     """Remove the instance.
2429
2430     """
2431     instance = self.instance
2432     logger.Info("shutting down instance %s on node %s" %
2433                 (instance.name, instance.primary_node))
2434
2435     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2436       if self.op.ignore_failures:
2437         feedback_fn("Warning: can't shutdown instance")
2438       else:
2439         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2440                                  (instance.name, instance.primary_node))
2441
2442     logger.Info("removing block devices for instance %s" % instance.name)
2443
2444     if not _RemoveDisks(instance, self.cfg):
2445       if self.op.ignore_failures:
2446         feedback_fn("Warning: can't remove instance's disks")
2447       else:
2448         raise errors.OpExecError("Can't remove instance's disks")
2449
2450     logger.Info("removing instance %s out of cluster config" % instance.name)
2451
2452     self.cfg.RemoveInstance(instance.name)
2453     # Remove the new instance from the Ganeti Lock Manager
2454     self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2455
2456
2457 class LUQueryInstances(NoHooksLU):
2458   """Logical unit for querying instances.
2459
2460   """
2461   _OP_REQP = ["output_fields", "names"]
2462
2463   def CheckPrereq(self):
2464     """Check prerequisites.
2465
2466     This checks that the fields required are valid output fields.
2467
2468     """
2469     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2470     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2471                                "admin_state", "admin_ram",
2472                                "disk_template", "ip", "mac", "bridge",
2473                                "sda_size", "sdb_size", "vcpus", "tags"],
2474                        dynamic=self.dynamic_fields,
2475                        selected=self.op.output_fields)
2476
2477     self.wanted = _GetWantedInstances(self, self.op.names)
2478
2479   def Exec(self, feedback_fn):
2480     """Computes the list of nodes and their attributes.
2481
2482     """
2483     instance_names = self.wanted
2484     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2485                      in instance_names]
2486
2487     # begin data gathering
2488
2489     nodes = frozenset([inst.primary_node for inst in instance_list])
2490
2491     bad_nodes = []
2492     if self.dynamic_fields.intersection(self.op.output_fields):
2493       live_data = {}
2494       node_data = rpc.call_all_instances_info(nodes)
2495       for name in nodes:
2496         result = node_data[name]
2497         if result:
2498           live_data.update(result)
2499         elif result == False:
2500           bad_nodes.append(name)
2501         # else no instance is alive
2502     else:
2503       live_data = dict([(name, {}) for name in instance_names])
2504
2505     # end data gathering
2506
2507     output = []
2508     for instance in instance_list:
2509       iout = []
2510       for field in self.op.output_fields:
2511         if field == "name":
2512           val = instance.name
2513         elif field == "os":
2514           val = instance.os
2515         elif field == "pnode":
2516           val = instance.primary_node
2517         elif field == "snodes":
2518           val = list(instance.secondary_nodes)
2519         elif field == "admin_state":
2520           val = (instance.status != "down")
2521         elif field == "oper_state":
2522           if instance.primary_node in bad_nodes:
2523             val = None
2524           else:
2525             val = bool(live_data.get(instance.name))
2526         elif field == "status":
2527           if instance.primary_node in bad_nodes:
2528             val = "ERROR_nodedown"
2529           else:
2530             running = bool(live_data.get(instance.name))
2531             if running:
2532               if instance.status != "down":
2533                 val = "running"
2534               else:
2535                 val = "ERROR_up"
2536             else:
2537               if instance.status != "down":
2538                 val = "ERROR_down"
2539               else:
2540                 val = "ADMIN_down"
2541         elif field == "admin_ram":
2542           val = instance.memory
2543         elif field == "oper_ram":
2544           if instance.primary_node in bad_nodes:
2545             val = None
2546           elif instance.name in live_data:
2547             val = live_data[instance.name].get("memory", "?")
2548           else:
2549             val = "-"
2550         elif field == "disk_template":
2551           val = instance.disk_template
2552         elif field == "ip":
2553           val = instance.nics[0].ip
2554         elif field == "bridge":
2555           val = instance.nics[0].bridge
2556         elif field == "mac":
2557           val = instance.nics[0].mac
2558         elif field == "sda_size" or field == "sdb_size":
2559           disk = instance.FindDisk(field[:3])
2560           if disk is None:
2561             val = None
2562           else:
2563             val = disk.size
2564         elif field == "vcpus":
2565           val = instance.vcpus
2566         elif field == "tags":
2567           val = list(instance.GetTags())
2568         else:
2569           raise errors.ParameterError(field)
2570         iout.append(val)
2571       output.append(iout)
2572
2573     return output
2574
2575
2576 class LUFailoverInstance(LogicalUnit):
2577   """Failover an instance.
2578
2579   """
2580   HPATH = "instance-failover"
2581   HTYPE = constants.HTYPE_INSTANCE
2582   _OP_REQP = ["instance_name", "ignore_consistency"]
2583   REQ_BGL = False
2584
2585   def ExpandNames(self):
2586     self._ExpandAndLockInstance()
2587     self.needed_locks[locking.LEVEL_NODE] = []
2588     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2589
2590   def DeclareLocks(self, level):
2591     if level == locking.LEVEL_NODE:
2592       self._LockInstancesNodes()
2593
2594   def BuildHooksEnv(self):
2595     """Build hooks env.
2596
2597     This runs on master, primary and secondary nodes of the instance.
2598
2599     """
2600     env = {
2601       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2602       }
2603     env.update(_BuildInstanceHookEnvByObject(self.instance))
2604     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2605     return env, nl, nl
2606
2607   def CheckPrereq(self):
2608     """Check prerequisites.
2609
2610     This checks that the instance is in the cluster.
2611
2612     """
2613     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2614     assert self.instance is not None, \
2615       "Cannot retrieve locked instance %s" % self.op.instance_name
2616
2617     if instance.disk_template not in constants.DTS_NET_MIRROR:
2618       raise errors.OpPrereqError("Instance's disk layout is not"
2619                                  " network mirrored, cannot failover.")
2620
2621     secondary_nodes = instance.secondary_nodes
2622     if not secondary_nodes:
2623       raise errors.ProgrammerError("no secondary node but using "
2624                                    "a mirrored disk template")
2625
2626     target_node = secondary_nodes[0]
2627     # check memory requirements on the secondary node
2628     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2629                          instance.name, instance.memory)
2630
2631     # check bridge existance
2632     brlist = [nic.bridge for nic in instance.nics]
2633     if not rpc.call_bridges_exist(target_node, brlist):
2634       raise errors.OpPrereqError("One or more target bridges %s does not"
2635                                  " exist on destination node '%s'" %
2636                                  (brlist, target_node))
2637
2638   def Exec(self, feedback_fn):
2639     """Failover an instance.
2640
2641     The failover is done by shutting it down on its present node and
2642     starting it on the secondary.
2643
2644     """
2645     instance = self.instance
2646
2647     source_node = instance.primary_node
2648     target_node = instance.secondary_nodes[0]
2649
2650     feedback_fn("* checking disk consistency between source and target")
2651     for dev in instance.disks:
2652       # for drbd, these are drbd over lvm
2653       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2654         if instance.status == "up" and not self.op.ignore_consistency:
2655           raise errors.OpExecError("Disk %s is degraded on target node,"
2656                                    " aborting failover." % dev.iv_name)
2657
2658     feedback_fn("* shutting down instance on source node")
2659     logger.Info("Shutting down instance %s on node %s" %
2660                 (instance.name, source_node))
2661
2662     if not rpc.call_instance_shutdown(source_node, instance):
2663       if self.op.ignore_consistency:
2664         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2665                      " anyway. Please make sure node %s is down"  %
2666                      (instance.name, source_node, source_node))
2667       else:
2668         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2669                                  (instance.name, source_node))
2670
2671     feedback_fn("* deactivating the instance's disks on source node")
2672     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2673       raise errors.OpExecError("Can't shut down the instance's disks.")
2674
2675     instance.primary_node = target_node
2676     # distribute new instance config to the other nodes
2677     self.cfg.Update(instance)
2678
2679     # Only start the instance if it's marked as up
2680     if instance.status == "up":
2681       feedback_fn("* activating the instance's disks on target node")
2682       logger.Info("Starting instance %s on node %s" %
2683                   (instance.name, target_node))
2684
2685       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2686                                                ignore_secondaries=True)
2687       if not disks_ok:
2688         _ShutdownInstanceDisks(instance, self.cfg)
2689         raise errors.OpExecError("Can't activate the instance's disks")
2690
2691       feedback_fn("* starting the instance on the target node")
2692       if not rpc.call_instance_start(target_node, instance, None):
2693         _ShutdownInstanceDisks(instance, self.cfg)
2694         raise errors.OpExecError("Could not start instance %s on node %s." %
2695                                  (instance.name, target_node))
2696
2697
2698 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2699   """Create a tree of block devices on the primary node.
2700
2701   This always creates all devices.
2702
2703   """
2704   if device.children:
2705     for child in device.children:
2706       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2707         return False
2708
2709   cfg.SetDiskID(device, node)
2710   new_id = rpc.call_blockdev_create(node, device, device.size,
2711                                     instance.name, True, info)
2712   if not new_id:
2713     return False
2714   if device.physical_id is None:
2715     device.physical_id = new_id
2716   return True
2717
2718
2719 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2720   """Create a tree of block devices on a secondary node.
2721
2722   If this device type has to be created on secondaries, create it and
2723   all its children.
2724
2725   If not, just recurse to children keeping the same 'force' value.
2726
2727   """
2728   if device.CreateOnSecondary():
2729     force = True
2730   if device.children:
2731     for child in device.children:
2732       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2733                                         child, force, info):
2734         return False
2735
2736   if not force:
2737     return True
2738   cfg.SetDiskID(device, node)
2739   new_id = rpc.call_blockdev_create(node, device, device.size,
2740                                     instance.name, False, info)
2741   if not new_id:
2742     return False
2743   if device.physical_id is None:
2744     device.physical_id = new_id
2745   return True
2746
2747
2748 def _GenerateUniqueNames(cfg, exts):
2749   """Generate a suitable LV name.
2750
2751   This will generate a logical volume name for the given instance.
2752
2753   """
2754   results = []
2755   for val in exts:
2756     new_id = cfg.GenerateUniqueID()
2757     results.append("%s%s" % (new_id, val))
2758   return results
2759
2760
2761 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2762   """Generate a drbd8 device complete with its children.
2763
2764   """
2765   port = cfg.AllocatePort()
2766   vgname = cfg.GetVGName()
2767   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2768                           logical_id=(vgname, names[0]))
2769   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2770                           logical_id=(vgname, names[1]))
2771   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2772                           logical_id = (primary, secondary, port),
2773                           children = [dev_data, dev_meta],
2774                           iv_name=iv_name)
2775   return drbd_dev
2776
2777
2778 def _GenerateDiskTemplate(cfg, template_name,
2779                           instance_name, primary_node,
2780                           secondary_nodes, disk_sz, swap_sz,
2781                           file_storage_dir, file_driver):
2782   """Generate the entire disk layout for a given template type.
2783
2784   """
2785   #TODO: compute space requirements
2786
2787   vgname = cfg.GetVGName()
2788   if template_name == constants.DT_DISKLESS:
2789     disks = []
2790   elif template_name == constants.DT_PLAIN:
2791     if len(secondary_nodes) != 0:
2792       raise errors.ProgrammerError("Wrong template configuration")
2793
2794     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2795     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2796                            logical_id=(vgname, names[0]),
2797                            iv_name = "sda")
2798     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2799                            logical_id=(vgname, names[1]),
2800                            iv_name = "sdb")
2801     disks = [sda_dev, sdb_dev]
2802   elif template_name == constants.DT_DRBD8:
2803     if len(secondary_nodes) != 1:
2804       raise errors.ProgrammerError("Wrong template configuration")
2805     remote_node = secondary_nodes[0]
2806     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2807                                        ".sdb_data", ".sdb_meta"])
2808     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2809                                          disk_sz, names[0:2], "sda")
2810     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2811                                          swap_sz, names[2:4], "sdb")
2812     disks = [drbd_sda_dev, drbd_sdb_dev]
2813   elif template_name == constants.DT_FILE:
2814     if len(secondary_nodes) != 0:
2815       raise errors.ProgrammerError("Wrong template configuration")
2816
2817     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2818                                 iv_name="sda", logical_id=(file_driver,
2819                                 "%s/sda" % file_storage_dir))
2820     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2821                                 iv_name="sdb", logical_id=(file_driver,
2822                                 "%s/sdb" % file_storage_dir))
2823     disks = [file_sda_dev, file_sdb_dev]
2824   else:
2825     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2826   return disks
2827
2828
2829 def _GetInstanceInfoText(instance):
2830   """Compute that text that should be added to the disk's metadata.
2831
2832   """
2833   return "originstname+%s" % instance.name
2834
2835
2836 def _CreateDisks(cfg, instance):
2837   """Create all disks for an instance.
2838
2839   This abstracts away some work from AddInstance.
2840
2841   Args:
2842     instance: the instance object
2843
2844   Returns:
2845     True or False showing the success of the creation process
2846
2847   """
2848   info = _GetInstanceInfoText(instance)
2849
2850   if instance.disk_template == constants.DT_FILE:
2851     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2852     result = rpc.call_file_storage_dir_create(instance.primary_node,
2853                                               file_storage_dir)
2854
2855     if not result:
2856       logger.Error("Could not connect to node '%s'" % instance.primary_node)
2857       return False
2858
2859     if not result[0]:
2860       logger.Error("failed to create directory '%s'" % file_storage_dir)
2861       return False
2862
2863   for device in instance.disks:
2864     logger.Info("creating volume %s for instance %s" %
2865                 (device.iv_name, instance.name))
2866     #HARDCODE
2867     for secondary_node in instance.secondary_nodes:
2868       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2869                                         device, False, info):
2870         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2871                      (device.iv_name, device, secondary_node))
2872         return False
2873     #HARDCODE
2874     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2875                                     instance, device, info):
2876       logger.Error("failed to create volume %s on primary!" %
2877                    device.iv_name)
2878       return False
2879
2880   return True
2881
2882
2883 def _RemoveDisks(instance, cfg):
2884   """Remove all disks for an instance.
2885
2886   This abstracts away some work from `AddInstance()` and
2887   `RemoveInstance()`. Note that in case some of the devices couldn't
2888   be removed, the removal will continue with the other ones (compare
2889   with `_CreateDisks()`).
2890
2891   Args:
2892     instance: the instance object
2893
2894   Returns:
2895     True or False showing the success of the removal proces
2896
2897   """
2898   logger.Info("removing block devices for instance %s" % instance.name)
2899
2900   result = True
2901   for device in instance.disks:
2902     for node, disk in device.ComputeNodeTree(instance.primary_node):
2903       cfg.SetDiskID(disk, node)
2904       if not rpc.call_blockdev_remove(node, disk):
2905         logger.Error("could not remove block device %s on node %s,"
2906                      " continuing anyway" %
2907                      (device.iv_name, node))
2908         result = False
2909
2910   if instance.disk_template == constants.DT_FILE:
2911     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2912     if not rpc.call_file_storage_dir_remove(instance.primary_node,
2913                                             file_storage_dir):
2914       logger.Error("could not remove directory '%s'" % file_storage_dir)
2915       result = False
2916
2917   return result
2918
2919
2920 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2921   """Compute disk size requirements in the volume group
2922
2923   This is currently hard-coded for the two-drive layout.
2924
2925   """
2926   # Required free disk space as a function of disk and swap space
2927   req_size_dict = {
2928     constants.DT_DISKLESS: None,
2929     constants.DT_PLAIN: disk_size + swap_size,
2930     # 256 MB are added for drbd metadata, 128MB for each drbd device
2931     constants.DT_DRBD8: disk_size + swap_size + 256,
2932     constants.DT_FILE: None,
2933   }
2934
2935   if disk_template not in req_size_dict:
2936     raise errors.ProgrammerError("Disk template '%s' size requirement"
2937                                  " is unknown" %  disk_template)
2938
2939   return req_size_dict[disk_template]
2940
2941
2942 class LUCreateInstance(LogicalUnit):
2943   """Create an instance.
2944
2945   """
2946   HPATH = "instance-add"
2947   HTYPE = constants.HTYPE_INSTANCE
2948   _OP_REQP = ["instance_name", "mem_size", "disk_size",
2949               "disk_template", "swap_size", "mode", "start", "vcpus",
2950               "wait_for_sync", "ip_check", "mac"]
2951
2952   def _RunAllocator(self):
2953     """Run the allocator based on input opcode.
2954
2955     """
2956     disks = [{"size": self.op.disk_size, "mode": "w"},
2957              {"size": self.op.swap_size, "mode": "w"}]
2958     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2959              "bridge": self.op.bridge}]
2960     ial = IAllocator(self.cfg, self.sstore,
2961                      mode=constants.IALLOCATOR_MODE_ALLOC,
2962                      name=self.op.instance_name,
2963                      disk_template=self.op.disk_template,
2964                      tags=[],
2965                      os=self.op.os_type,
2966                      vcpus=self.op.vcpus,
2967                      mem_size=self.op.mem_size,
2968                      disks=disks,
2969                      nics=nics,
2970                      )
2971
2972     ial.Run(self.op.iallocator)
2973
2974     if not ial.success:
2975       raise errors.OpPrereqError("Can't compute nodes using"
2976                                  " iallocator '%s': %s" % (self.op.iallocator,
2977                                                            ial.info))
2978     if len(ial.nodes) != ial.required_nodes:
2979       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2980                                  " of nodes (%s), required %s" %
2981                                  (len(ial.nodes), ial.required_nodes))
2982     self.op.pnode = ial.nodes[0]
2983     logger.ToStdout("Selected nodes for the instance: %s" %
2984                     (", ".join(ial.nodes),))
2985     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2986                 (self.op.instance_name, self.op.iallocator, ial.nodes))
2987     if ial.required_nodes == 2:
2988       self.op.snode = ial.nodes[1]
2989
2990   def BuildHooksEnv(self):
2991     """Build hooks env.
2992
2993     This runs on master, primary and secondary nodes of the instance.
2994
2995     """
2996     env = {
2997       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2998       "INSTANCE_DISK_SIZE": self.op.disk_size,
2999       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3000       "INSTANCE_ADD_MODE": self.op.mode,
3001       }
3002     if self.op.mode == constants.INSTANCE_IMPORT:
3003       env["INSTANCE_SRC_NODE"] = self.op.src_node
3004       env["INSTANCE_SRC_PATH"] = self.op.src_path
3005       env["INSTANCE_SRC_IMAGE"] = self.src_image
3006
3007     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3008       primary_node=self.op.pnode,
3009       secondary_nodes=self.secondaries,
3010       status=self.instance_status,
3011       os_type=self.op.os_type,
3012       memory=self.op.mem_size,
3013       vcpus=self.op.vcpus,
3014       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3015     ))
3016
3017     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3018           self.secondaries)
3019     return env, nl, nl
3020
3021
3022   def CheckPrereq(self):
3023     """Check prerequisites.
3024
3025     """
3026     # set optional parameters to none if they don't exist
3027     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3028                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3029                  "vnc_bind_address"]:
3030       if not hasattr(self.op, attr):
3031         setattr(self.op, attr, None)
3032
3033     if self.op.mode not in (constants.INSTANCE_CREATE,
3034                             constants.INSTANCE_IMPORT):
3035       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3036                                  self.op.mode)
3037
3038     if (not self.cfg.GetVGName() and
3039         self.op.disk_template not in constants.DTS_NOT_LVM):
3040       raise errors.OpPrereqError("Cluster does not support lvm-based"
3041                                  " instances")
3042
3043     if self.op.mode == constants.INSTANCE_IMPORT:
3044       src_node = getattr(self.op, "src_node", None)
3045       src_path = getattr(self.op, "src_path", None)
3046       if src_node is None or src_path is None:
3047         raise errors.OpPrereqError("Importing an instance requires source"
3048                                    " node and path options")
3049       src_node_full = self.cfg.ExpandNodeName(src_node)
3050       if src_node_full is None:
3051         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3052       self.op.src_node = src_node = src_node_full
3053
3054       if not os.path.isabs(src_path):
3055         raise errors.OpPrereqError("The source path must be absolute")
3056
3057       export_info = rpc.call_export_info(src_node, src_path)
3058
3059       if not export_info:
3060         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3061
3062       if not export_info.has_section(constants.INISECT_EXP):
3063         raise errors.ProgrammerError("Corrupted export config")
3064
3065       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3066       if (int(ei_version) != constants.EXPORT_VERSION):
3067         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3068                                    (ei_version, constants.EXPORT_VERSION))
3069
3070       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3071         raise errors.OpPrereqError("Can't import instance with more than"
3072                                    " one data disk")
3073
3074       # FIXME: are the old os-es, disk sizes, etc. useful?
3075       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3076       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3077                                                          'disk0_dump'))
3078       self.src_image = diskimage
3079     else: # INSTANCE_CREATE
3080       if getattr(self.op, "os_type", None) is None:
3081         raise errors.OpPrereqError("No guest OS specified")
3082
3083     #### instance parameters check
3084
3085     # disk template and mirror node verification
3086     if self.op.disk_template not in constants.DISK_TEMPLATES:
3087       raise errors.OpPrereqError("Invalid disk template name")
3088
3089     # instance name verification
3090     hostname1 = utils.HostInfo(self.op.instance_name)
3091
3092     self.op.instance_name = instance_name = hostname1.name
3093     instance_list = self.cfg.GetInstanceList()
3094     if instance_name in instance_list:
3095       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3096                                  instance_name)
3097
3098     # ip validity checks
3099     ip = getattr(self.op, "ip", None)
3100     if ip is None or ip.lower() == "none":
3101       inst_ip = None
3102     elif ip.lower() == "auto":
3103       inst_ip = hostname1.ip
3104     else:
3105       if not utils.IsValidIP(ip):
3106         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3107                                    " like a valid IP" % ip)
3108       inst_ip = ip
3109     self.inst_ip = self.op.ip = inst_ip
3110
3111     if self.op.start and not self.op.ip_check:
3112       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3113                                  " adding an instance in start mode")
3114
3115     if self.op.ip_check:
3116       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3117         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3118                                    (hostname1.ip, instance_name))
3119
3120     # MAC address verification
3121     if self.op.mac != "auto":
3122       if not utils.IsValidMac(self.op.mac.lower()):
3123         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3124                                    self.op.mac)
3125
3126     # bridge verification
3127     bridge = getattr(self.op, "bridge", None)
3128     if bridge is None:
3129       self.op.bridge = self.cfg.GetDefBridge()
3130     else:
3131       self.op.bridge = bridge
3132
3133     # boot order verification
3134     if self.op.hvm_boot_order is not None:
3135       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3136         raise errors.OpPrereqError("invalid boot order specified,"
3137                                    " must be one or more of [acdn]")
3138     # file storage checks
3139     if (self.op.file_driver and
3140         not self.op.file_driver in constants.FILE_DRIVER):
3141       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3142                                  self.op.file_driver)
3143
3144     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3145       raise errors.OpPrereqError("File storage directory not a relative"
3146                                  " path")
3147     #### allocator run
3148
3149     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3150       raise errors.OpPrereqError("One and only one of iallocator and primary"
3151                                  " node must be given")
3152
3153     if self.op.iallocator is not None:
3154       self._RunAllocator()
3155
3156     #### node related checks
3157
3158     # check primary node
3159     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3160     if pnode is None:
3161       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3162                                  self.op.pnode)
3163     self.op.pnode = pnode.name
3164     self.pnode = pnode
3165     self.secondaries = []
3166
3167     # mirror node verification
3168     if self.op.disk_template in constants.DTS_NET_MIRROR:
3169       if getattr(self.op, "snode", None) is None:
3170         raise errors.OpPrereqError("The networked disk templates need"
3171                                    " a mirror node")
3172
3173       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3174       if snode_name is None:
3175         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3176                                    self.op.snode)
3177       elif snode_name == pnode.name:
3178         raise errors.OpPrereqError("The secondary node cannot be"
3179                                    " the primary node.")
3180       self.secondaries.append(snode_name)
3181
3182     req_size = _ComputeDiskSize(self.op.disk_template,
3183                                 self.op.disk_size, self.op.swap_size)
3184
3185     # Check lv size requirements
3186     if req_size is not None:
3187       nodenames = [pnode.name] + self.secondaries
3188       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3189       for node in nodenames:
3190         info = nodeinfo.get(node, None)
3191         if not info:
3192           raise errors.OpPrereqError("Cannot get current information"
3193                                      " from node '%s'" % node)
3194         vg_free = info.get('vg_free', None)
3195         if not isinstance(vg_free, int):
3196           raise errors.OpPrereqError("Can't compute free disk space on"
3197                                      " node %s" % node)
3198         if req_size > info['vg_free']:
3199           raise errors.OpPrereqError("Not enough disk space on target node %s."
3200                                      " %d MB available, %d MB required" %
3201                                      (node, info['vg_free'], req_size))
3202
3203     # os verification
3204     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3205     if not os_obj:
3206       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3207                                  " primary node"  % self.op.os_type)
3208
3209     if self.op.kernel_path == constants.VALUE_NONE:
3210       raise errors.OpPrereqError("Can't set instance kernel to none")
3211
3212
3213     # bridge check on primary node
3214     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3215       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3216                                  " destination node '%s'" %
3217                                  (self.op.bridge, pnode.name))
3218
3219     # memory check on primary node
3220     if self.op.start:
3221       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3222                            "creating instance %s" % self.op.instance_name,
3223                            self.op.mem_size)
3224
3225     # hvm_cdrom_image_path verification
3226     if self.op.hvm_cdrom_image_path is not None:
3227       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3228         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3229                                    " be an absolute path or None, not %s" %
3230                                    self.op.hvm_cdrom_image_path)
3231       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3232         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3233                                    " regular file or a symlink pointing to"
3234                                    " an existing regular file, not %s" %
3235                                    self.op.hvm_cdrom_image_path)
3236
3237     # vnc_bind_address verification
3238     if self.op.vnc_bind_address is not None:
3239       if not utils.IsValidIP(self.op.vnc_bind_address):
3240         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3241                                    " like a valid IP address" %
3242                                    self.op.vnc_bind_address)
3243
3244     if self.op.start:
3245       self.instance_status = 'up'
3246     else:
3247       self.instance_status = 'down'
3248
3249   def Exec(self, feedback_fn):
3250     """Create and add the instance to the cluster.
3251
3252     """
3253     instance = self.op.instance_name
3254     pnode_name = self.pnode.name
3255
3256     if self.op.mac == "auto":
3257       mac_address = self.cfg.GenerateMAC()
3258     else:
3259       mac_address = self.op.mac
3260
3261     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3262     if self.inst_ip is not None:
3263       nic.ip = self.inst_ip
3264
3265     ht_kind = self.sstore.GetHypervisorType()
3266     if ht_kind in constants.HTS_REQ_PORT:
3267       network_port = self.cfg.AllocatePort()
3268     else:
3269       network_port = None
3270
3271     if self.op.vnc_bind_address is None:
3272       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3273
3274     # this is needed because os.path.join does not accept None arguments
3275     if self.op.file_storage_dir is None:
3276       string_file_storage_dir = ""
3277     else:
3278       string_file_storage_dir = self.op.file_storage_dir
3279
3280     # build the full file storage dir path
3281     file_storage_dir = os.path.normpath(os.path.join(
3282                                         self.sstore.GetFileStorageDir(),
3283                                         string_file_storage_dir, instance))
3284
3285
3286     disks = _GenerateDiskTemplate(self.cfg,
3287                                   self.op.disk_template,
3288                                   instance, pnode_name,
3289                                   self.secondaries, self.op.disk_size,
3290                                   self.op.swap_size,
3291                                   file_storage_dir,
3292                                   self.op.file_driver)
3293
3294     iobj = objects.Instance(name=instance, os=self.op.os_type,
3295                             primary_node=pnode_name,
3296                             memory=self.op.mem_size,
3297                             vcpus=self.op.vcpus,
3298                             nics=[nic], disks=disks,
3299                             disk_template=self.op.disk_template,
3300                             status=self.instance_status,
3301                             network_port=network_port,
3302                             kernel_path=self.op.kernel_path,
3303                             initrd_path=self.op.initrd_path,
3304                             hvm_boot_order=self.op.hvm_boot_order,
3305                             hvm_acpi=self.op.hvm_acpi,
3306                             hvm_pae=self.op.hvm_pae,
3307                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3308                             vnc_bind_address=self.op.vnc_bind_address,
3309                             )
3310
3311     feedback_fn("* creating instance disks...")
3312     if not _CreateDisks(self.cfg, iobj):
3313       _RemoveDisks(iobj, self.cfg)
3314       raise errors.OpExecError("Device creation failed, reverting...")
3315
3316     feedback_fn("adding instance %s to cluster config" % instance)
3317
3318     self.cfg.AddInstance(iobj)
3319     # Add the new instance to the Ganeti Lock Manager
3320     self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3321
3322     if self.op.wait_for_sync:
3323       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3324     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3325       # make sure the disks are not degraded (still sync-ing is ok)
3326       time.sleep(15)
3327       feedback_fn("* checking mirrors status")
3328       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3329     else:
3330       disk_abort = False
3331
3332     if disk_abort:
3333       _RemoveDisks(iobj, self.cfg)
3334       self.cfg.RemoveInstance(iobj.name)
3335       # Remove the new instance from the Ganeti Lock Manager
3336       self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3337       raise errors.OpExecError("There are some degraded disks for"
3338                                " this instance")
3339
3340     feedback_fn("creating os for instance %s on node %s" %
3341                 (instance, pnode_name))
3342
3343     if iobj.disk_template != constants.DT_DISKLESS:
3344       if self.op.mode == constants.INSTANCE_CREATE:
3345         feedback_fn("* running the instance OS create scripts...")
3346         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3347           raise errors.OpExecError("could not add os for instance %s"
3348                                    " on node %s" %
3349                                    (instance, pnode_name))
3350
3351       elif self.op.mode == constants.INSTANCE_IMPORT:
3352         feedback_fn("* running the instance OS import scripts...")
3353         src_node = self.op.src_node
3354         src_image = self.src_image
3355         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3356                                                 src_node, src_image):
3357           raise errors.OpExecError("Could not import os for instance"
3358                                    " %s on node %s" %
3359                                    (instance, pnode_name))
3360       else:
3361         # also checked in the prereq part
3362         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3363                                      % self.op.mode)
3364
3365     if self.op.start:
3366       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3367       feedback_fn("* starting instance...")
3368       if not rpc.call_instance_start(pnode_name, iobj, None):
3369         raise errors.OpExecError("Could not start instance")
3370
3371
3372 class LUConnectConsole(NoHooksLU):
3373   """Connect to an instance's console.
3374
3375   This is somewhat special in that it returns the command line that
3376   you need to run on the master node in order to connect to the
3377   console.
3378
3379   """
3380   _OP_REQP = ["instance_name"]
3381   REQ_BGL = False
3382
3383   def ExpandNames(self):
3384     self._ExpandAndLockInstance()
3385
3386   def CheckPrereq(self):
3387     """Check prerequisites.
3388
3389     This checks that the instance is in the cluster.
3390
3391     """
3392     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3393     assert self.instance is not None, \
3394       "Cannot retrieve locked instance %s" % self.op.instance_name
3395
3396   def Exec(self, feedback_fn):
3397     """Connect to the console of an instance
3398
3399     """
3400     instance = self.instance
3401     node = instance.primary_node
3402
3403     node_insts = rpc.call_instance_list([node])[node]
3404     if node_insts is False:
3405       raise errors.OpExecError("Can't connect to node %s." % node)
3406
3407     if instance.name not in node_insts:
3408       raise errors.OpExecError("Instance %s is not running." % instance.name)
3409
3410     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3411
3412     hyper = hypervisor.GetHypervisor()
3413     console_cmd = hyper.GetShellCommandForConsole(instance)
3414
3415     # build ssh cmdline
3416     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3417
3418
3419 class LUReplaceDisks(LogicalUnit):
3420   """Replace the disks of an instance.
3421
3422   """
3423   HPATH = "mirrors-replace"
3424   HTYPE = constants.HTYPE_INSTANCE
3425   _OP_REQP = ["instance_name", "mode", "disks"]
3426
3427   def _RunAllocator(self):
3428     """Compute a new secondary node using an IAllocator.
3429
3430     """
3431     ial = IAllocator(self.cfg, self.sstore,
3432                      mode=constants.IALLOCATOR_MODE_RELOC,
3433                      name=self.op.instance_name,
3434                      relocate_from=[self.sec_node])
3435
3436     ial.Run(self.op.iallocator)
3437
3438     if not ial.success:
3439       raise errors.OpPrereqError("Can't compute nodes using"
3440                                  " iallocator '%s': %s" % (self.op.iallocator,
3441                                                            ial.info))
3442     if len(ial.nodes) != ial.required_nodes:
3443       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3444                                  " of nodes (%s), required %s" %
3445                                  (len(ial.nodes), ial.required_nodes))
3446     self.op.remote_node = ial.nodes[0]
3447     logger.ToStdout("Selected new secondary for the instance: %s" %
3448                     self.op.remote_node)
3449
3450   def BuildHooksEnv(self):
3451     """Build hooks env.
3452
3453     This runs on the master, the primary and all the secondaries.
3454
3455     """
3456     env = {
3457       "MODE": self.op.mode,
3458       "NEW_SECONDARY": self.op.remote_node,
3459       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3460       }
3461     env.update(_BuildInstanceHookEnvByObject(self.instance))
3462     nl = [
3463       self.sstore.GetMasterNode(),
3464       self.instance.primary_node,
3465       ]
3466     if self.op.remote_node is not None:
3467       nl.append(self.op.remote_node)
3468     return env, nl, nl
3469
3470   def CheckPrereq(self):
3471     """Check prerequisites.
3472
3473     This checks that the instance is in the cluster.
3474
3475     """
3476     if not hasattr(self.op, "remote_node"):
3477       self.op.remote_node = None
3478
3479     instance = self.cfg.GetInstanceInfo(
3480       self.cfg.ExpandInstanceName(self.op.instance_name))
3481     if instance is None:
3482       raise errors.OpPrereqError("Instance '%s' not known" %
3483                                  self.op.instance_name)
3484     self.instance = instance
3485     self.op.instance_name = instance.name
3486
3487     if instance.disk_template not in constants.DTS_NET_MIRROR:
3488       raise errors.OpPrereqError("Instance's disk layout is not"
3489                                  " network mirrored.")
3490
3491     if len(instance.secondary_nodes) != 1:
3492       raise errors.OpPrereqError("The instance has a strange layout,"
3493                                  " expected one secondary but found %d" %
3494                                  len(instance.secondary_nodes))
3495
3496     self.sec_node = instance.secondary_nodes[0]
3497
3498     ia_name = getattr(self.op, "iallocator", None)
3499     if ia_name is not None:
3500       if self.op.remote_node is not None:
3501         raise errors.OpPrereqError("Give either the iallocator or the new"
3502                                    " secondary, not both")
3503       self.op.remote_node = self._RunAllocator()
3504
3505     remote_node = self.op.remote_node
3506     if remote_node is not None:
3507       remote_node = self.cfg.ExpandNodeName(remote_node)
3508       if remote_node is None:
3509         raise errors.OpPrereqError("Node '%s' not known" %
3510                                    self.op.remote_node)
3511       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3512     else:
3513       self.remote_node_info = None
3514     if remote_node == instance.primary_node:
3515       raise errors.OpPrereqError("The specified node is the primary node of"
3516                                  " the instance.")
3517     elif remote_node == self.sec_node:
3518       if self.op.mode == constants.REPLACE_DISK_SEC:
3519         # this is for DRBD8, where we can't execute the same mode of
3520         # replacement as for drbd7 (no different port allocated)
3521         raise errors.OpPrereqError("Same secondary given, cannot execute"
3522                                    " replacement")
3523     if instance.disk_template == constants.DT_DRBD8:
3524       if (self.op.mode == constants.REPLACE_DISK_ALL and
3525           remote_node is not None):
3526         # switch to replace secondary mode
3527         self.op.mode = constants.REPLACE_DISK_SEC
3528
3529       if self.op.mode == constants.REPLACE_DISK_ALL:
3530         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3531                                    " secondary disk replacement, not"
3532                                    " both at once")
3533       elif self.op.mode == constants.REPLACE_DISK_PRI:
3534         if remote_node is not None:
3535           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3536                                      " the secondary while doing a primary"
3537                                      " node disk replacement")
3538         self.tgt_node = instance.primary_node
3539         self.oth_node = instance.secondary_nodes[0]
3540       elif self.op.mode == constants.REPLACE_DISK_SEC:
3541         self.new_node = remote_node # this can be None, in which case
3542                                     # we don't change the secondary
3543         self.tgt_node = instance.secondary_nodes[0]
3544         self.oth_node = instance.primary_node
3545       else:
3546         raise errors.ProgrammerError("Unhandled disk replace mode")
3547
3548     for name in self.op.disks:
3549       if instance.FindDisk(name) is None:
3550         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3551                                    (name, instance.name))
3552     self.op.remote_node = remote_node
3553
3554   def _ExecD8DiskOnly(self, feedback_fn):
3555     """Replace a disk on the primary or secondary for dbrd8.
3556
3557     The algorithm for replace is quite complicated:
3558       - for each disk to be replaced:
3559         - create new LVs on the target node with unique names
3560         - detach old LVs from the drbd device
3561         - rename old LVs to name_replaced.<time_t>
3562         - rename new LVs to old LVs
3563         - attach the new LVs (with the old names now) to the drbd device
3564       - wait for sync across all devices
3565       - for each modified disk:
3566         - remove old LVs (which have the name name_replaces.<time_t>)
3567
3568     Failures are not very well handled.
3569
3570     """
3571     steps_total = 6
3572     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3573     instance = self.instance
3574     iv_names = {}
3575     vgname = self.cfg.GetVGName()
3576     # start of work
3577     cfg = self.cfg
3578     tgt_node = self.tgt_node
3579     oth_node = self.oth_node
3580
3581     # Step: check device activation
3582     self.proc.LogStep(1, steps_total, "check device existence")
3583     info("checking volume groups")
3584     my_vg = cfg.GetVGName()
3585     results = rpc.call_vg_list([oth_node, tgt_node])
3586     if not results:
3587       raise errors.OpExecError("Can't list volume groups on the nodes")
3588     for node in oth_node, tgt_node:
3589       res = results.get(node, False)
3590       if not res or my_vg not in res:
3591         raise errors.OpExecError("Volume group '%s' not found on %s" %
3592                                  (my_vg, node))
3593     for dev in instance.disks:
3594       if not dev.iv_name in self.op.disks:
3595         continue
3596       for node in tgt_node, oth_node:
3597         info("checking %s on %s" % (dev.iv_name, node))
3598         cfg.SetDiskID(dev, node)
3599         if not rpc.call_blockdev_find(node, dev):
3600           raise errors.OpExecError("Can't find device %s on node %s" %
3601                                    (dev.iv_name, node))
3602
3603     # Step: check other node consistency
3604     self.proc.LogStep(2, steps_total, "check peer consistency")
3605     for dev in instance.disks:
3606       if not dev.iv_name in self.op.disks:
3607         continue
3608       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3609       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3610                                    oth_node==instance.primary_node):
3611         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3612                                  " to replace disks on this node (%s)" %
3613                                  (oth_node, tgt_node))
3614
3615     # Step: create new storage
3616     self.proc.LogStep(3, steps_total, "allocate new storage")
3617     for dev in instance.disks:
3618       if not dev.iv_name in self.op.disks:
3619         continue
3620       size = dev.size
3621       cfg.SetDiskID(dev, tgt_node)
3622       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3623       names = _GenerateUniqueNames(cfg, lv_names)
3624       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3625                              logical_id=(vgname, names[0]))
3626       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3627                              logical_id=(vgname, names[1]))
3628       new_lvs = [lv_data, lv_meta]
3629       old_lvs = dev.children
3630       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3631       info("creating new local storage on %s for %s" %
3632            (tgt_node, dev.iv_name))
3633       # since we *always* want to create this LV, we use the
3634       # _Create...OnPrimary (which forces the creation), even if we
3635       # are talking about the secondary node
3636       for new_lv in new_lvs:
3637         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3638                                         _GetInstanceInfoText(instance)):
3639           raise errors.OpExecError("Failed to create new LV named '%s' on"
3640                                    " node '%s'" %
3641                                    (new_lv.logical_id[1], tgt_node))
3642
3643     # Step: for each lv, detach+rename*2+attach
3644     self.proc.LogStep(4, steps_total, "change drbd configuration")
3645     for dev, old_lvs, new_lvs in iv_names.itervalues():
3646       info("detaching %s drbd from local storage" % dev.iv_name)
3647       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3648         raise errors.OpExecError("Can't detach drbd from local storage on node"
3649                                  " %s for device %s" % (tgt_node, dev.iv_name))
3650       #dev.children = []
3651       #cfg.Update(instance)
3652
3653       # ok, we created the new LVs, so now we know we have the needed
3654       # storage; as such, we proceed on the target node to rename
3655       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3656       # using the assumption that logical_id == physical_id (which in
3657       # turn is the unique_id on that node)
3658
3659       # FIXME(iustin): use a better name for the replaced LVs
3660       temp_suffix = int(time.time())
3661       ren_fn = lambda d, suff: (d.physical_id[0],
3662                                 d.physical_id[1] + "_replaced-%s" % suff)
3663       # build the rename list based on what LVs exist on the node
3664       rlist = []
3665       for to_ren in old_lvs:
3666         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3667         if find_res is not None: # device exists
3668           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3669
3670       info("renaming the old LVs on the target node")
3671       if not rpc.call_blockdev_rename(tgt_node, rlist):
3672         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3673       # now we rename the new LVs to the old LVs
3674       info("renaming the new LVs on the target node")
3675       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3676       if not rpc.call_blockdev_rename(tgt_node, rlist):
3677         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3678
3679       for old, new in zip(old_lvs, new_lvs):
3680         new.logical_id = old.logical_id
3681         cfg.SetDiskID(new, tgt_node)
3682
3683       for disk in old_lvs:
3684         disk.logical_id = ren_fn(disk, temp_suffix)
3685         cfg.SetDiskID(disk, tgt_node)
3686
3687       # now that the new lvs have the old name, we can add them to the device
3688       info("adding new mirror component on %s" % tgt_node)
3689       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3690         for new_lv in new_lvs:
3691           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3692             warning("Can't rollback device %s", hint="manually cleanup unused"
3693                     " logical volumes")
3694         raise errors.OpExecError("Can't add local storage to drbd")
3695
3696       dev.children = new_lvs
3697       cfg.Update(instance)
3698
3699     # Step: wait for sync
3700
3701     # this can fail as the old devices are degraded and _WaitForSync
3702     # does a combined result over all disks, so we don't check its
3703     # return value
3704     self.proc.LogStep(5, steps_total, "sync devices")
3705     _WaitForSync(cfg, instance, self.proc, unlock=True)
3706
3707     # so check manually all the devices
3708     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3709       cfg.SetDiskID(dev, instance.primary_node)
3710       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3711       if is_degr:
3712         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3713
3714     # Step: remove old storage
3715     self.proc.LogStep(6, steps_total, "removing old storage")
3716     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3717       info("remove logical volumes for %s" % name)
3718       for lv in old_lvs:
3719         cfg.SetDiskID(lv, tgt_node)
3720         if not rpc.call_blockdev_remove(tgt_node, lv):
3721           warning("Can't remove old LV", hint="manually remove unused LVs")
3722           continue
3723
3724   def _ExecD8Secondary(self, feedback_fn):
3725     """Replace the secondary node for drbd8.
3726
3727     The algorithm for replace is quite complicated:
3728       - for all disks of the instance:
3729         - create new LVs on the new node with same names
3730         - shutdown the drbd device on the old secondary
3731         - disconnect the drbd network on the primary
3732         - create the drbd device on the new secondary
3733         - network attach the drbd on the primary, using an artifice:
3734           the drbd code for Attach() will connect to the network if it
3735           finds a device which is connected to the good local disks but
3736           not network enabled
3737       - wait for sync across all devices
3738       - remove all disks from the old secondary
3739
3740     Failures are not very well handled.
3741
3742     """
3743     steps_total = 6
3744     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3745     instance = self.instance
3746     iv_names = {}
3747     vgname = self.cfg.GetVGName()
3748     # start of work
3749     cfg = self.cfg
3750     old_node = self.tgt_node
3751     new_node = self.new_node
3752     pri_node = instance.primary_node
3753
3754     # Step: check device activation
3755     self.proc.LogStep(1, steps_total, "check device existence")
3756     info("checking volume groups")
3757     my_vg = cfg.GetVGName()
3758     results = rpc.call_vg_list([pri_node, new_node])
3759     if not results:
3760       raise errors.OpExecError("Can't list volume groups on the nodes")
3761     for node in pri_node, new_node:
3762       res = results.get(node, False)
3763       if not res or my_vg not in res:
3764         raise errors.OpExecError("Volume group '%s' not found on %s" %
3765                                  (my_vg, node))
3766     for dev in instance.disks:
3767       if not dev.iv_name in self.op.disks:
3768         continue
3769       info("checking %s on %s" % (dev.iv_name, pri_node))
3770       cfg.SetDiskID(dev, pri_node)
3771       if not rpc.call_blockdev_find(pri_node, dev):
3772         raise errors.OpExecError("Can't find device %s on node %s" %
3773                                  (dev.iv_name, pri_node))
3774
3775     # Step: check other node consistency
3776     self.proc.LogStep(2, steps_total, "check peer consistency")
3777     for dev in instance.disks:
3778       if not dev.iv_name in self.op.disks:
3779         continue
3780       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3781       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3782         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3783                                  " unsafe to replace the secondary" %
3784                                  pri_node)
3785
3786     # Step: create new storage
3787     self.proc.LogStep(3, steps_total, "allocate new storage")
3788     for dev in instance.disks:
3789       size = dev.size
3790       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3791       # since we *always* want to create this LV, we use the
3792       # _Create...OnPrimary (which forces the creation), even if we
3793       # are talking about the secondary node
3794       for new_lv in dev.children:
3795         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3796                                         _GetInstanceInfoText(instance)):
3797           raise errors.OpExecError("Failed to create new LV named '%s' on"
3798                                    " node '%s'" %
3799                                    (new_lv.logical_id[1], new_node))
3800
3801       iv_names[dev.iv_name] = (dev, dev.children)
3802
3803     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3804     for dev in instance.disks:
3805       size = dev.size
3806       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3807       # create new devices on new_node
3808       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3809                               logical_id=(pri_node, new_node,
3810                                           dev.logical_id[2]),
3811                               children=dev.children)
3812       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3813                                         new_drbd, False,
3814                                       _GetInstanceInfoText(instance)):
3815         raise errors.OpExecError("Failed to create new DRBD on"
3816                                  " node '%s'" % new_node)
3817
3818     for dev in instance.disks:
3819       # we have new devices, shutdown the drbd on the old secondary
3820       info("shutting down drbd for %s on old node" % dev.iv_name)
3821       cfg.SetDiskID(dev, old_node)
3822       if not rpc.call_blockdev_shutdown(old_node, dev):
3823         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3824                 hint="Please cleanup this device manually as soon as possible")
3825
3826     info("detaching primary drbds from the network (=> standalone)")
3827     done = 0
3828     for dev in instance.disks:
3829       cfg.SetDiskID(dev, pri_node)
3830       # set the physical (unique in bdev terms) id to None, meaning
3831       # detach from network
3832       dev.physical_id = (None,) * len(dev.physical_id)
3833       # and 'find' the device, which will 'fix' it to match the
3834       # standalone state
3835       if rpc.call_blockdev_find(pri_node, dev):
3836         done += 1
3837       else:
3838         warning("Failed to detach drbd %s from network, unusual case" %
3839                 dev.iv_name)
3840
3841     if not done:
3842       # no detaches succeeded (very unlikely)
3843       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3844
3845     # if we managed to detach at least one, we update all the disks of
3846     # the instance to point to the new secondary
3847     info("updating instance configuration")
3848     for dev in instance.disks:
3849       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3850       cfg.SetDiskID(dev, pri_node)
3851     cfg.Update(instance)
3852
3853     # and now perform the drbd attach
3854     info("attaching primary drbds to new secondary (standalone => connected)")
3855     failures = []
3856     for dev in instance.disks:
3857       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3858       # since the attach is smart, it's enough to 'find' the device,
3859       # it will automatically activate the network, if the physical_id
3860       # is correct
3861       cfg.SetDiskID(dev, pri_node)
3862       if not rpc.call_blockdev_find(pri_node, dev):
3863         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3864                 "please do a gnt-instance info to see the status of disks")
3865
3866     # this can fail as the old devices are degraded and _WaitForSync
3867     # does a combined result over all disks, so we don't check its
3868     # return value
3869     self.proc.LogStep(5, steps_total, "sync devices")
3870     _WaitForSync(cfg, instance, self.proc, unlock=True)
3871
3872     # so check manually all the devices
3873     for name, (dev, old_lvs) in iv_names.iteritems():
3874       cfg.SetDiskID(dev, pri_node)
3875       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3876       if is_degr:
3877         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3878
3879     self.proc.LogStep(6, steps_total, "removing old storage")
3880     for name, (dev, old_lvs) in iv_names.iteritems():
3881       info("remove logical volumes for %s" % name)
3882       for lv in old_lvs:
3883         cfg.SetDiskID(lv, old_node)
3884         if not rpc.call_blockdev_remove(old_node, lv):
3885           warning("Can't remove LV on old secondary",
3886                   hint="Cleanup stale volumes by hand")
3887
3888   def Exec(self, feedback_fn):
3889     """Execute disk replacement.
3890
3891     This dispatches the disk replacement to the appropriate handler.
3892
3893     """
3894     instance = self.instance
3895
3896     # Activate the instance disks if we're replacing them on a down instance
3897     if instance.status == "down":
3898       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3899       self.proc.ChainOpCode(op)
3900
3901     if instance.disk_template == constants.DT_DRBD8:
3902       if self.op.remote_node is None:
3903         fn = self._ExecD8DiskOnly
3904       else:
3905         fn = self._ExecD8Secondary
3906     else:
3907       raise errors.ProgrammerError("Unhandled disk replacement case")
3908
3909     ret = fn(feedback_fn)
3910
3911     # Deactivate the instance disks if we're replacing them on a down instance
3912     if instance.status == "down":
3913       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3914       self.proc.ChainOpCode(op)
3915
3916     return ret
3917
3918
3919 class LUGrowDisk(LogicalUnit):
3920   """Grow a disk of an instance.
3921
3922   """
3923   HPATH = "disk-grow"
3924   HTYPE = constants.HTYPE_INSTANCE
3925   _OP_REQP = ["instance_name", "disk", "amount"]
3926
3927   def BuildHooksEnv(self):
3928     """Build hooks env.
3929
3930     This runs on the master, the primary and all the secondaries.
3931
3932     """
3933     env = {
3934       "DISK": self.op.disk,
3935       "AMOUNT": self.op.amount,
3936       }
3937     env.update(_BuildInstanceHookEnvByObject(self.instance))
3938     nl = [
3939       self.sstore.GetMasterNode(),
3940       self.instance.primary_node,
3941       ]
3942     return env, nl, nl
3943
3944   def CheckPrereq(self):
3945     """Check prerequisites.
3946
3947     This checks that the instance is in the cluster.
3948
3949     """
3950     instance = self.cfg.GetInstanceInfo(
3951       self.cfg.ExpandInstanceName(self.op.instance_name))
3952     if instance is None:
3953       raise errors.OpPrereqError("Instance '%s' not known" %
3954                                  self.op.instance_name)
3955     self.instance = instance
3956     self.op.instance_name = instance.name
3957
3958     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3959       raise errors.OpPrereqError("Instance's disk layout does not support"
3960                                  " growing.")
3961
3962     if instance.FindDisk(self.op.disk) is None:
3963       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3964                                  (self.op.disk, instance.name))
3965
3966     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3967     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3968     for node in nodenames:
3969       info = nodeinfo.get(node, None)
3970       if not info:
3971         raise errors.OpPrereqError("Cannot get current information"
3972                                    " from node '%s'" % node)
3973       vg_free = info.get('vg_free', None)
3974       if not isinstance(vg_free, int):
3975         raise errors.OpPrereqError("Can't compute free disk space on"
3976                                    " node %s" % node)
3977       if self.op.amount > info['vg_free']:
3978         raise errors.OpPrereqError("Not enough disk space on target node %s:"
3979                                    " %d MiB available, %d MiB required" %
3980                                    (node, info['vg_free'], self.op.amount))
3981
3982   def Exec(self, feedback_fn):
3983     """Execute disk grow.
3984
3985     """
3986     instance = self.instance
3987     disk = instance.FindDisk(self.op.disk)
3988     for node in (instance.secondary_nodes + (instance.primary_node,)):
3989       self.cfg.SetDiskID(disk, node)
3990       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3991       if not result or not isinstance(result, tuple) or len(result) != 2:
3992         raise errors.OpExecError("grow request failed to node %s" % node)
3993       elif not result[0]:
3994         raise errors.OpExecError("grow request failed to node %s: %s" %
3995                                  (node, result[1]))
3996     disk.RecordGrow(self.op.amount)
3997     self.cfg.Update(instance)
3998     return
3999
4000
4001 class LUQueryInstanceData(NoHooksLU):
4002   """Query runtime instance data.
4003
4004   """
4005   _OP_REQP = ["instances"]
4006
4007   def CheckPrereq(self):
4008     """Check prerequisites.
4009
4010     This only checks the optional instance list against the existing names.
4011
4012     """
4013     if not isinstance(self.op.instances, list):
4014       raise errors.OpPrereqError("Invalid argument type 'instances'")
4015     if self.op.instances:
4016       self.wanted_instances = []
4017       names = self.op.instances
4018       for name in names:
4019         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4020         if instance is None:
4021           raise errors.OpPrereqError("No such instance name '%s'" % name)
4022         self.wanted_instances.append(instance)
4023     else:
4024       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4025                                in self.cfg.GetInstanceList()]
4026     return
4027
4028
4029   def _ComputeDiskStatus(self, instance, snode, dev):
4030     """Compute block device status.
4031
4032     """
4033     self.cfg.SetDiskID(dev, instance.primary_node)
4034     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4035     if dev.dev_type in constants.LDS_DRBD:
4036       # we change the snode then (otherwise we use the one passed in)
4037       if dev.logical_id[0] == instance.primary_node:
4038         snode = dev.logical_id[1]
4039       else:
4040         snode = dev.logical_id[0]
4041
4042     if snode:
4043       self.cfg.SetDiskID(dev, snode)
4044       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4045     else:
4046       dev_sstatus = None
4047
4048     if dev.children:
4049       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4050                       for child in dev.children]
4051     else:
4052       dev_children = []
4053
4054     data = {
4055       "iv_name": dev.iv_name,
4056       "dev_type": dev.dev_type,
4057       "logical_id": dev.logical_id,
4058       "physical_id": dev.physical_id,
4059       "pstatus": dev_pstatus,
4060       "sstatus": dev_sstatus,
4061       "children": dev_children,
4062       }
4063
4064     return data
4065
4066   def Exec(self, feedback_fn):
4067     """Gather and return data"""
4068     result = {}
4069     for instance in self.wanted_instances:
4070       remote_info = rpc.call_instance_info(instance.primary_node,
4071                                                 instance.name)
4072       if remote_info and "state" in remote_info:
4073         remote_state = "up"
4074       else:
4075         remote_state = "down"
4076       if instance.status == "down":
4077         config_state = "down"
4078       else:
4079         config_state = "up"
4080
4081       disks = [self._ComputeDiskStatus(instance, None, device)
4082                for device in instance.disks]
4083
4084       idict = {
4085         "name": instance.name,
4086         "config_state": config_state,
4087         "run_state": remote_state,
4088         "pnode": instance.primary_node,
4089         "snodes": instance.secondary_nodes,
4090         "os": instance.os,
4091         "memory": instance.memory,
4092         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4093         "disks": disks,
4094         "vcpus": instance.vcpus,
4095         }
4096
4097       htkind = self.sstore.GetHypervisorType()
4098       if htkind == constants.HT_XEN_PVM30:
4099         idict["kernel_path"] = instance.kernel_path
4100         idict["initrd_path"] = instance.initrd_path
4101
4102       if htkind == constants.HT_XEN_HVM31:
4103         idict["hvm_boot_order"] = instance.hvm_boot_order
4104         idict["hvm_acpi"] = instance.hvm_acpi
4105         idict["hvm_pae"] = instance.hvm_pae
4106         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4107
4108       if htkind in constants.HTS_REQ_PORT:
4109         idict["vnc_bind_address"] = instance.vnc_bind_address
4110         idict["network_port"] = instance.network_port
4111
4112       result[instance.name] = idict
4113
4114     return result
4115
4116
4117 class LUSetInstanceParams(LogicalUnit):
4118   """Modifies an instances's parameters.
4119
4120   """
4121   HPATH = "instance-modify"
4122   HTYPE = constants.HTYPE_INSTANCE
4123   _OP_REQP = ["instance_name"]
4124   REQ_BGL = False
4125
4126   def ExpandNames(self):
4127     self._ExpandAndLockInstance()
4128
4129   def BuildHooksEnv(self):
4130     """Build hooks env.
4131
4132     This runs on the master, primary and secondaries.
4133
4134     """
4135     args = dict()
4136     if self.mem:
4137       args['memory'] = self.mem
4138     if self.vcpus:
4139       args['vcpus'] = self.vcpus
4140     if self.do_ip or self.do_bridge or self.mac:
4141       if self.do_ip:
4142         ip = self.ip
4143       else:
4144         ip = self.instance.nics[0].ip
4145       if self.bridge:
4146         bridge = self.bridge
4147       else:
4148         bridge = self.instance.nics[0].bridge
4149       if self.mac:
4150         mac = self.mac
4151       else:
4152         mac = self.instance.nics[0].mac
4153       args['nics'] = [(ip, bridge, mac)]
4154     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4155     nl = [self.sstore.GetMasterNode(),
4156           self.instance.primary_node] + list(self.instance.secondary_nodes)
4157     return env, nl, nl
4158
4159   def CheckPrereq(self):
4160     """Check prerequisites.
4161
4162     This only checks the instance list against the existing names.
4163
4164     """
4165     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4166     # a separate CheckArguments function, if we implement one, so the operation
4167     # can be aborted without waiting for any lock, should it have an error...
4168     self.mem = getattr(self.op, "mem", None)
4169     self.vcpus = getattr(self.op, "vcpus", None)
4170     self.ip = getattr(self.op, "ip", None)
4171     self.mac = getattr(self.op, "mac", None)
4172     self.bridge = getattr(self.op, "bridge", None)
4173     self.kernel_path = getattr(self.op, "kernel_path", None)
4174     self.initrd_path = getattr(self.op, "initrd_path", None)
4175     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4176     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4177     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4178     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4179     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4180     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4181                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4182                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4183                  self.vnc_bind_address]
4184     if all_parms.count(None) == len(all_parms):
4185       raise errors.OpPrereqError("No changes submitted")
4186     if self.mem is not None:
4187       try:
4188         self.mem = int(self.mem)
4189       except ValueError, err:
4190         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4191     if self.vcpus is not None:
4192       try:
4193         self.vcpus = int(self.vcpus)
4194       except ValueError, err:
4195         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4196     if self.ip is not None:
4197       self.do_ip = True
4198       if self.ip.lower() == "none":
4199         self.ip = None
4200       else:
4201         if not utils.IsValidIP(self.ip):
4202           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4203     else:
4204       self.do_ip = False
4205     self.do_bridge = (self.bridge is not None)
4206     if self.mac is not None:
4207       if self.cfg.IsMacInUse(self.mac):
4208         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4209                                    self.mac)
4210       if not utils.IsValidMac(self.mac):
4211         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4212
4213     if self.kernel_path is not None:
4214       self.do_kernel_path = True
4215       if self.kernel_path == constants.VALUE_NONE:
4216         raise errors.OpPrereqError("Can't set instance to no kernel")
4217
4218       if self.kernel_path != constants.VALUE_DEFAULT:
4219         if not os.path.isabs(self.kernel_path):
4220           raise errors.OpPrereqError("The kernel path must be an absolute"
4221                                     " filename")
4222     else:
4223       self.do_kernel_path = False
4224
4225     if self.initrd_path is not None:
4226       self.do_initrd_path = True
4227       if self.initrd_path not in (constants.VALUE_NONE,
4228                                   constants.VALUE_DEFAULT):
4229         if not os.path.isabs(self.initrd_path):
4230           raise errors.OpPrereqError("The initrd path must be an absolute"
4231                                     " filename")
4232     else:
4233       self.do_initrd_path = False
4234
4235     # boot order verification
4236     if self.hvm_boot_order is not None:
4237       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4238         if len(self.hvm_boot_order.strip("acdn")) != 0:
4239           raise errors.OpPrereqError("invalid boot order specified,"
4240                                      " must be one or more of [acdn]"
4241                                      " or 'default'")
4242
4243     # hvm_cdrom_image_path verification
4244     if self.op.hvm_cdrom_image_path is not None:
4245       if not os.path.isabs(self.op.hvm_cdrom_image_path):
4246         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4247                                    " be an absolute path or None, not %s" %
4248                                    self.op.hvm_cdrom_image_path)
4249       if not os.path.isfile(self.op.hvm_cdrom_image_path):
4250         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4251                                    " regular file or a symlink pointing to"
4252                                    " an existing regular file, not %s" %
4253                                    self.op.hvm_cdrom_image_path)
4254
4255     # vnc_bind_address verification
4256     if self.op.vnc_bind_address is not None:
4257       if not utils.IsValidIP(self.op.vnc_bind_address):
4258         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4259                                    " like a valid IP address" %
4260                                    self.op.vnc_bind_address)
4261
4262     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4263     assert self.instance is not None, \
4264       "Cannot retrieve locked instance %s" % self.op.instance_name
4265     return
4266
4267   def Exec(self, feedback_fn):
4268     """Modifies an instance.
4269
4270     All parameters take effect only at the next restart of the instance.
4271     """
4272     result = []
4273     instance = self.instance
4274     if self.mem:
4275       instance.memory = self.mem
4276       result.append(("mem", self.mem))
4277     if self.vcpus:
4278       instance.vcpus = self.vcpus
4279       result.append(("vcpus",  self.vcpus))
4280     if self.do_ip:
4281       instance.nics[0].ip = self.ip
4282       result.append(("ip", self.ip))
4283     if self.bridge:
4284       instance.nics[0].bridge = self.bridge
4285       result.append(("bridge", self.bridge))
4286     if self.mac:
4287       instance.nics[0].mac = self.mac
4288       result.append(("mac", self.mac))
4289     if self.do_kernel_path:
4290       instance.kernel_path = self.kernel_path
4291       result.append(("kernel_path", self.kernel_path))
4292     if self.do_initrd_path:
4293       instance.initrd_path = self.initrd_path
4294       result.append(("initrd_path", self.initrd_path))
4295     if self.hvm_boot_order:
4296       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4297         instance.hvm_boot_order = None
4298       else:
4299         instance.hvm_boot_order = self.hvm_boot_order
4300       result.append(("hvm_boot_order", self.hvm_boot_order))
4301     if self.hvm_acpi:
4302       instance.hvm_acpi = self.hvm_acpi
4303       result.append(("hvm_acpi", self.hvm_acpi))
4304     if self.hvm_pae:
4305       instance.hvm_pae = self.hvm_pae
4306       result.append(("hvm_pae", self.hvm_pae))
4307     if self.hvm_cdrom_image_path:
4308       instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4309       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4310     if self.vnc_bind_address:
4311       instance.vnc_bind_address = self.vnc_bind_address
4312       result.append(("vnc_bind_address", self.vnc_bind_address))
4313
4314     self.cfg.Update(instance)
4315
4316     return result
4317
4318
4319 class LUQueryExports(NoHooksLU):
4320   """Query the exports list
4321
4322   """
4323   _OP_REQP = []
4324
4325   def CheckPrereq(self):
4326     """Check that the nodelist contains only existing nodes.
4327
4328     """
4329     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4330
4331   def Exec(self, feedback_fn):
4332     """Compute the list of all the exported system images.
4333
4334     Returns:
4335       a dictionary with the structure node->(export-list)
4336       where export-list is a list of the instances exported on
4337       that node.
4338
4339     """
4340     return rpc.call_export_list(self.nodes)
4341
4342
4343 class LUExportInstance(LogicalUnit):
4344   """Export an instance to an image in the cluster.
4345
4346   """
4347   HPATH = "instance-export"
4348   HTYPE = constants.HTYPE_INSTANCE
4349   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4350
4351   def BuildHooksEnv(self):
4352     """Build hooks env.
4353
4354     This will run on the master, primary node and target node.
4355
4356     """
4357     env = {
4358       "EXPORT_NODE": self.op.target_node,
4359       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4360       }
4361     env.update(_BuildInstanceHookEnvByObject(self.instance))
4362     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4363           self.op.target_node]
4364     return env, nl, nl
4365
4366   def CheckPrereq(self):
4367     """Check prerequisites.
4368
4369     This checks that the instance and node names are valid.
4370
4371     """
4372     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4373     self.instance = self.cfg.GetInstanceInfo(instance_name)
4374     if self.instance is None:
4375       raise errors.OpPrereqError("Instance '%s' not found" %
4376                                  self.op.instance_name)
4377
4378     # node verification
4379     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4380     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4381
4382     if self.dst_node is None:
4383       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4384                                  self.op.target_node)
4385     self.op.target_node = self.dst_node.name
4386
4387     # instance disk type verification
4388     for disk in self.instance.disks:
4389       if disk.dev_type == constants.LD_FILE:
4390         raise errors.OpPrereqError("Export not supported for instances with"
4391                                    " file-based disks")
4392
4393   def Exec(self, feedback_fn):
4394     """Export an instance to an image in the cluster.
4395
4396     """
4397     instance = self.instance
4398     dst_node = self.dst_node
4399     src_node = instance.primary_node
4400     if self.op.shutdown:
4401       # shutdown the instance, but not the disks
4402       if not rpc.call_instance_shutdown(src_node, instance):
4403         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4404                                  (instance.name, src_node))
4405
4406     vgname = self.cfg.GetVGName()
4407
4408     snap_disks = []
4409
4410     try:
4411       for disk in instance.disks:
4412         if disk.iv_name == "sda":
4413           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4414           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4415
4416           if not new_dev_name:
4417             logger.Error("could not snapshot block device %s on node %s" %
4418                          (disk.logical_id[1], src_node))
4419           else:
4420             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4421                                       logical_id=(vgname, new_dev_name),
4422                                       physical_id=(vgname, new_dev_name),
4423                                       iv_name=disk.iv_name)
4424             snap_disks.append(new_dev)
4425
4426     finally:
4427       if self.op.shutdown and instance.status == "up":
4428         if not rpc.call_instance_start(src_node, instance, None):
4429           _ShutdownInstanceDisks(instance, self.cfg)
4430           raise errors.OpExecError("Could not start instance")
4431
4432     # TODO: check for size
4433
4434     for dev in snap_disks:
4435       if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4436         logger.Error("could not export block device %s from node %s to node %s"
4437                      % (dev.logical_id[1], src_node, dst_node.name))
4438       if not rpc.call_blockdev_remove(src_node, dev):
4439         logger.Error("could not remove snapshot block device %s from node %s" %
4440                      (dev.logical_id[1], src_node))
4441
4442     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4443       logger.Error("could not finalize export for instance %s on node %s" %
4444                    (instance.name, dst_node.name))
4445
4446     nodelist = self.cfg.GetNodeList()
4447     nodelist.remove(dst_node.name)
4448
4449     # on one-node clusters nodelist will be empty after the removal
4450     # if we proceed the backup would be removed because OpQueryExports
4451     # substitutes an empty list with the full cluster node list.
4452     if nodelist:
4453       op = opcodes.OpQueryExports(nodes=nodelist)
4454       exportlist = self.proc.ChainOpCode(op)
4455       for node in exportlist:
4456         if instance.name in exportlist[node]:
4457           if not rpc.call_export_remove(node, instance.name):
4458             logger.Error("could not remove older export for instance %s"
4459                          " on node %s" % (instance.name, node))
4460
4461
4462 class LURemoveExport(NoHooksLU):
4463   """Remove exports related to the named instance.
4464
4465   """
4466   _OP_REQP = ["instance_name"]
4467
4468   def CheckPrereq(self):
4469     """Check prerequisites.
4470     """
4471     pass
4472
4473   def Exec(self, feedback_fn):
4474     """Remove any export.
4475
4476     """
4477     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4478     # If the instance was not found we'll try with the name that was passed in.
4479     # This will only work if it was an FQDN, though.
4480     fqdn_warn = False
4481     if not instance_name:
4482       fqdn_warn = True
4483       instance_name = self.op.instance_name
4484
4485     op = opcodes.OpQueryExports(nodes=[])
4486     exportlist = self.proc.ChainOpCode(op)
4487     found = False
4488     for node in exportlist:
4489       if instance_name in exportlist[node]:
4490         found = True
4491         if not rpc.call_export_remove(node, instance_name):
4492           logger.Error("could not remove export for instance %s"
4493                        " on node %s" % (instance_name, node))
4494
4495     if fqdn_warn and not found:
4496       feedback_fn("Export not found. If trying to remove an export belonging"
4497                   " to a deleted instance please use its Fully Qualified"
4498                   " Domain Name.")
4499
4500
4501 class TagsLU(NoHooksLU):
4502   """Generic tags LU.
4503
4504   This is an abstract class which is the parent of all the other tags LUs.
4505
4506   """
4507   def CheckPrereq(self):
4508     """Check prerequisites.
4509
4510     """
4511     if self.op.kind == constants.TAG_CLUSTER:
4512       self.target = self.cfg.GetClusterInfo()
4513     elif self.op.kind == constants.TAG_NODE:
4514       name = self.cfg.ExpandNodeName(self.op.name)
4515       if name is None:
4516         raise errors.OpPrereqError("Invalid node name (%s)" %
4517                                    (self.op.name,))
4518       self.op.name = name
4519       self.target = self.cfg.GetNodeInfo(name)
4520     elif self.op.kind == constants.TAG_INSTANCE:
4521       name = self.cfg.ExpandInstanceName(self.op.name)
4522       if name is None:
4523         raise errors.OpPrereqError("Invalid instance name (%s)" %
4524                                    (self.op.name,))
4525       self.op.name = name
4526       self.target = self.cfg.GetInstanceInfo(name)
4527     else:
4528       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4529                                  str(self.op.kind))
4530
4531
4532 class LUGetTags(TagsLU):
4533   """Returns the tags of a given object.
4534
4535   """
4536   _OP_REQP = ["kind", "name"]
4537
4538   def Exec(self, feedback_fn):
4539     """Returns the tag list.
4540
4541     """
4542     return list(self.target.GetTags())
4543
4544
4545 class LUSearchTags(NoHooksLU):
4546   """Searches the tags for a given pattern.
4547
4548   """
4549   _OP_REQP = ["pattern"]
4550
4551   def CheckPrereq(self):
4552     """Check prerequisites.
4553
4554     This checks the pattern passed for validity by compiling it.
4555
4556     """
4557     try:
4558       self.re = re.compile(self.op.pattern)
4559     except re.error, err:
4560       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4561                                  (self.op.pattern, err))
4562
4563   def Exec(self, feedback_fn):
4564     """Returns the tag list.
4565
4566     """
4567     cfg = self.cfg
4568     tgts = [("/cluster", cfg.GetClusterInfo())]
4569     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4570     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4571     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4572     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4573     results = []
4574     for path, target in tgts:
4575       for tag in target.GetTags():
4576         if self.re.search(tag):
4577           results.append((path, tag))
4578     return results
4579
4580
4581 class LUAddTags(TagsLU):
4582   """Sets a tag on a given object.
4583
4584   """
4585   _OP_REQP = ["kind", "name", "tags"]
4586
4587   def CheckPrereq(self):
4588     """Check prerequisites.
4589
4590     This checks the type and length of the tag name and value.
4591
4592     """
4593     TagsLU.CheckPrereq(self)
4594     for tag in self.op.tags:
4595       objects.TaggableObject.ValidateTag(tag)
4596
4597   def Exec(self, feedback_fn):
4598     """Sets the tag.
4599
4600     """
4601     try:
4602       for tag in self.op.tags:
4603         self.target.AddTag(tag)
4604     except errors.TagError, err:
4605       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4606     try:
4607       self.cfg.Update(self.target)
4608     except errors.ConfigurationError:
4609       raise errors.OpRetryError("There has been a modification to the"
4610                                 " config file and the operation has been"
4611                                 " aborted. Please retry.")
4612
4613
4614 class LUDelTags(TagsLU):
4615   """Delete a list of tags from a given object.
4616
4617   """
4618   _OP_REQP = ["kind", "name", "tags"]
4619
4620   def CheckPrereq(self):
4621     """Check prerequisites.
4622
4623     This checks that we have the given tag.
4624
4625     """
4626     TagsLU.CheckPrereq(self)
4627     for tag in self.op.tags:
4628       objects.TaggableObject.ValidateTag(tag)
4629     del_tags = frozenset(self.op.tags)
4630     cur_tags = self.target.GetTags()
4631     if not del_tags <= cur_tags:
4632       diff_tags = del_tags - cur_tags
4633       diff_names = ["'%s'" % tag for tag in diff_tags]
4634       diff_names.sort()
4635       raise errors.OpPrereqError("Tag(s) %s not found" %
4636                                  (",".join(diff_names)))
4637
4638   def Exec(self, feedback_fn):
4639     """Remove the tag from the object.
4640
4641     """
4642     for tag in self.op.tags:
4643       self.target.RemoveTag(tag)
4644     try:
4645       self.cfg.Update(self.target)
4646     except errors.ConfigurationError:
4647       raise errors.OpRetryError("There has been a modification to the"
4648                                 " config file and the operation has been"
4649                                 " aborted. Please retry.")
4650
4651
4652 class LUTestDelay(NoHooksLU):
4653   """Sleep for a specified amount of time.
4654
4655   This LU sleeps on the master and/or nodes for a specified amount of
4656   time.
4657
4658   """
4659   _OP_REQP = ["duration", "on_master", "on_nodes"]
4660   REQ_BGL = False
4661
4662   def ExpandNames(self):
4663     """Expand names and set required locks.
4664
4665     This expands the node list, if any.
4666
4667     """
4668     self.needed_locks = {}
4669     if self.op.on_nodes:
4670       # _GetWantedNodes can be used here, but is not always appropriate to use
4671       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4672       # more information.
4673       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4674       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4675
4676   def CheckPrereq(self):
4677     """Check prerequisites.
4678
4679     """
4680
4681   def Exec(self, feedback_fn):
4682     """Do the actual sleep.
4683
4684     """
4685     if self.op.on_master:
4686       if not utils.TestDelay(self.op.duration):
4687         raise errors.OpExecError("Error during master delay test")
4688     if self.op.on_nodes:
4689       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4690       if not result:
4691         raise errors.OpExecError("Complete failure from rpc call")
4692       for node, node_result in result.items():
4693         if not node_result:
4694           raise errors.OpExecError("Failure during rpc call to node %s,"
4695                                    " result: %s" % (node, node_result))
4696
4697
4698 class IAllocator(object):
4699   """IAllocator framework.
4700
4701   An IAllocator instance has three sets of attributes:
4702     - cfg/sstore that are needed to query the cluster
4703     - input data (all members of the _KEYS class attribute are required)
4704     - four buffer attributes (in|out_data|text), that represent the
4705       input (to the external script) in text and data structure format,
4706       and the output from it, again in two formats
4707     - the result variables from the script (success, info, nodes) for
4708       easy usage
4709
4710   """
4711   _ALLO_KEYS = [
4712     "mem_size", "disks", "disk_template",
4713     "os", "tags", "nics", "vcpus",
4714     ]
4715   _RELO_KEYS = [
4716     "relocate_from",
4717     ]
4718
4719   def __init__(self, cfg, sstore, mode, name, **kwargs):
4720     self.cfg = cfg
4721     self.sstore = sstore
4722     # init buffer variables
4723     self.in_text = self.out_text = self.in_data = self.out_data = None
4724     # init all input fields so that pylint is happy
4725     self.mode = mode
4726     self.name = name
4727     self.mem_size = self.disks = self.disk_template = None
4728     self.os = self.tags = self.nics = self.vcpus = None
4729     self.relocate_from = None
4730     # computed fields
4731     self.required_nodes = None
4732     # init result fields
4733     self.success = self.info = self.nodes = None
4734     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4735       keyset = self._ALLO_KEYS
4736     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4737       keyset = self._RELO_KEYS
4738     else:
4739       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4740                                    " IAllocator" % self.mode)
4741     for key in kwargs:
4742       if key not in keyset:
4743         raise errors.ProgrammerError("Invalid input parameter '%s' to"
4744                                      " IAllocator" % key)
4745       setattr(self, key, kwargs[key])
4746     for key in keyset:
4747       if key not in kwargs:
4748         raise errors.ProgrammerError("Missing input parameter '%s' to"
4749                                      " IAllocator" % key)
4750     self._BuildInputData()
4751
4752   def _ComputeClusterData(self):
4753     """Compute the generic allocator input data.
4754
4755     This is the data that is independent of the actual operation.
4756
4757     """
4758     cfg = self.cfg
4759     # cluster data
4760     data = {
4761       "version": 1,
4762       "cluster_name": self.sstore.GetClusterName(),
4763       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4764       "hypervisor_type": self.sstore.GetHypervisorType(),
4765       # we don't have job IDs
4766       }
4767
4768     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4769
4770     # node data
4771     node_results = {}
4772     node_list = cfg.GetNodeList()
4773     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4774     for nname in node_list:
4775       ninfo = cfg.GetNodeInfo(nname)
4776       if nname not in node_data or not isinstance(node_data[nname], dict):
4777         raise errors.OpExecError("Can't get data for node %s" % nname)
4778       remote_info = node_data[nname]
4779       for attr in ['memory_total', 'memory_free', 'memory_dom0',
4780                    'vg_size', 'vg_free', 'cpu_total']:
4781         if attr not in remote_info:
4782           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4783                                    (nname, attr))
4784         try:
4785           remote_info[attr] = int(remote_info[attr])
4786         except ValueError, err:
4787           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4788                                    " %s" % (nname, attr, str(err)))
4789       # compute memory used by primary instances
4790       i_p_mem = i_p_up_mem = 0
4791       for iinfo in i_list:
4792         if iinfo.primary_node == nname:
4793           i_p_mem += iinfo.memory
4794           if iinfo.status == "up":
4795             i_p_up_mem += iinfo.memory
4796
4797       # compute memory used by instances
4798       pnr = {
4799         "tags": list(ninfo.GetTags()),
4800         "total_memory": remote_info['memory_total'],
4801         "reserved_memory": remote_info['memory_dom0'],
4802         "free_memory": remote_info['memory_free'],
4803         "i_pri_memory": i_p_mem,
4804         "i_pri_up_memory": i_p_up_mem,
4805         "total_disk": remote_info['vg_size'],
4806         "free_disk": remote_info['vg_free'],
4807         "primary_ip": ninfo.primary_ip,
4808         "secondary_ip": ninfo.secondary_ip,
4809         "total_cpus": remote_info['cpu_total'],
4810         }
4811       node_results[nname] = pnr
4812     data["nodes"] = node_results
4813
4814     # instance data
4815     instance_data = {}
4816     for iinfo in i_list:
4817       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4818                   for n in iinfo.nics]
4819       pir = {
4820         "tags": list(iinfo.GetTags()),
4821         "should_run": iinfo.status == "up",
4822         "vcpus": iinfo.vcpus,
4823         "memory": iinfo.memory,
4824         "os": iinfo.os,
4825         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4826         "nics": nic_data,
4827         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4828         "disk_template": iinfo.disk_template,
4829         }
4830       instance_data[iinfo.name] = pir
4831
4832     data["instances"] = instance_data
4833
4834     self.in_data = data
4835
4836   def _AddNewInstance(self):
4837     """Add new instance data to allocator structure.
4838
4839     This in combination with _AllocatorGetClusterData will create the
4840     correct structure needed as input for the allocator.
4841
4842     The checks for the completeness of the opcode must have already been
4843     done.
4844
4845     """
4846     data = self.in_data
4847     if len(self.disks) != 2:
4848       raise errors.OpExecError("Only two-disk configurations supported")
4849
4850     disk_space = _ComputeDiskSize(self.disk_template,
4851                                   self.disks[0]["size"], self.disks[1]["size"])
4852
4853     if self.disk_template in constants.DTS_NET_MIRROR:
4854       self.required_nodes = 2
4855     else:
4856       self.required_nodes = 1
4857     request = {
4858       "type": "allocate",
4859       "name": self.name,
4860       "disk_template": self.disk_template,
4861       "tags": self.tags,
4862       "os": self.os,
4863       "vcpus": self.vcpus,
4864       "memory": self.mem_size,
4865       "disks": self.disks,
4866       "disk_space_total": disk_space,
4867       "nics": self.nics,
4868       "required_nodes": self.required_nodes,
4869       }
4870     data["request"] = request
4871
4872   def _AddRelocateInstance(self):
4873     """Add relocate instance data to allocator structure.
4874
4875     This in combination with _IAllocatorGetClusterData will create the
4876     correct structure needed as input for the allocator.
4877
4878     The checks for the completeness of the opcode must have already been
4879     done.
4880
4881     """
4882     instance = self.cfg.GetInstanceInfo(self.name)
4883     if instance is None:
4884       raise errors.ProgrammerError("Unknown instance '%s' passed to"
4885                                    " IAllocator" % self.name)
4886
4887     if instance.disk_template not in constants.DTS_NET_MIRROR:
4888       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4889
4890     if len(instance.secondary_nodes) != 1:
4891       raise errors.OpPrereqError("Instance has not exactly one secondary node")
4892
4893     self.required_nodes = 1
4894
4895     disk_space = _ComputeDiskSize(instance.disk_template,
4896                                   instance.disks[0].size,
4897                                   instance.disks[1].size)
4898
4899     request = {
4900       "type": "relocate",
4901       "name": self.name,
4902       "disk_space_total": disk_space,
4903       "required_nodes": self.required_nodes,
4904       "relocate_from": self.relocate_from,
4905       }
4906     self.in_data["request"] = request
4907
4908   def _BuildInputData(self):
4909     """Build input data structures.
4910
4911     """
4912     self._ComputeClusterData()
4913
4914     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4915       self._AddNewInstance()
4916     else:
4917       self._AddRelocateInstance()
4918
4919     self.in_text = serializer.Dump(self.in_data)
4920
4921   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4922     """Run an instance allocator and return the results.
4923
4924     """
4925     data = self.in_text
4926
4927     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4928
4929     if not isinstance(result, tuple) or len(result) != 4:
4930       raise errors.OpExecError("Invalid result from master iallocator runner")
4931
4932     rcode, stdout, stderr, fail = result
4933
4934     if rcode == constants.IARUN_NOTFOUND:
4935       raise errors.OpExecError("Can't find allocator '%s'" % name)
4936     elif rcode == constants.IARUN_FAILURE:
4937       raise errors.OpExecError("Instance allocator call failed: %s,"
4938                                " output: %s" % (fail, stdout+stderr))
4939     self.out_text = stdout
4940     if validate:
4941       self._ValidateResult()
4942
4943   def _ValidateResult(self):
4944     """Process the allocator results.
4945
4946     This will process and if successful save the result in
4947     self.out_data and the other parameters.
4948
4949     """
4950     try:
4951       rdict = serializer.Load(self.out_text)
4952     except Exception, err:
4953       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4954
4955     if not isinstance(rdict, dict):
4956       raise errors.OpExecError("Can't parse iallocator results: not a dict")
4957
4958     for key in "success", "info", "nodes":
4959       if key not in rdict:
4960         raise errors.OpExecError("Can't parse iallocator results:"
4961                                  " missing key '%s'" % key)
4962       setattr(self, key, rdict[key])
4963
4964     if not isinstance(rdict["nodes"], list):
4965       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4966                                " is not a list")
4967     self.out_data = rdict
4968
4969
4970 class LUTestAllocator(NoHooksLU):
4971   """Run allocator tests.
4972
4973   This LU runs the allocator tests
4974
4975   """
4976   _OP_REQP = ["direction", "mode", "name"]
4977
4978   def CheckPrereq(self):
4979     """Check prerequisites.
4980
4981     This checks the opcode parameters depending on the director and mode test.
4982
4983     """
4984     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4985       for attr in ["name", "mem_size", "disks", "disk_template",
4986                    "os", "tags", "nics", "vcpus"]:
4987         if not hasattr(self.op, attr):
4988           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4989                                      attr)
4990       iname = self.cfg.ExpandInstanceName(self.op.name)
4991       if iname is not None:
4992         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4993                                    iname)
4994       if not isinstance(self.op.nics, list):
4995         raise errors.OpPrereqError("Invalid parameter 'nics'")
4996       for row in self.op.nics:
4997         if (not isinstance(row, dict) or
4998             "mac" not in row or
4999             "ip" not in row or
5000             "bridge" not in row):
5001           raise errors.OpPrereqError("Invalid contents of the"
5002                                      " 'nics' parameter")
5003       if not isinstance(self.op.disks, list):
5004         raise errors.OpPrereqError("Invalid parameter 'disks'")
5005       if len(self.op.disks) != 2:
5006         raise errors.OpPrereqError("Only two-disk configurations supported")
5007       for row in self.op.disks:
5008         if (not isinstance(row, dict) or
5009             "size" not in row or
5010             not isinstance(row["size"], int) or
5011             "mode" not in row or
5012             row["mode"] not in ['r', 'w']):
5013           raise errors.OpPrereqError("Invalid contents of the"
5014                                      " 'disks' parameter")
5015     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5016       if not hasattr(self.op, "name"):
5017         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5018       fname = self.cfg.ExpandInstanceName(self.op.name)
5019       if fname is None:
5020         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5021                                    self.op.name)
5022       self.op.name = fname
5023       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5024     else:
5025       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5026                                  self.op.mode)
5027
5028     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5029       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5030         raise errors.OpPrereqError("Missing allocator name")
5031     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5032       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5033                                  self.op.direction)
5034
5035   def Exec(self, feedback_fn):
5036     """Run the allocator test.
5037
5038     """
5039     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5040       ial = IAllocator(self.cfg, self.sstore,
5041                        mode=self.op.mode,
5042                        name=self.op.name,
5043                        mem_size=self.op.mem_size,
5044                        disks=self.op.disks,
5045                        disk_template=self.op.disk_template,
5046                        os=self.op.os,
5047                        tags=self.op.tags,
5048                        nics=self.op.nics,
5049                        vcpus=self.op.vcpus,
5050                        )
5051     else:
5052       ial = IAllocator(self.cfg, self.sstore,
5053                        mode=self.op.mode,
5054                        name=self.op.name,
5055                        relocate_from=list(self.relocate_from),
5056                        )
5057
5058     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5059       result = ial.in_text
5060     else:
5061       ial.Run(self.op.allocator, validate=False)
5062       result = ial.out_text
5063     return result