Parallelize {Startup,Shutdown,Reboot}Instance
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46 from ganeti import serializer
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_MASTER: the LU needs to run on the master node
60         REQ_WSSTORE: the LU needs a writable SimpleStore
61         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
62
63   Note that all commands require root permissions.
64
65   """
66   HPATH = None
67   HTYPE = None
68   _OP_REQP = []
69   REQ_MASTER = True
70   REQ_WSSTORE = False
71   REQ_BGL = True
72
73   def __init__(self, processor, op, context, sstore):
74     """Constructor for LogicalUnit.
75
76     This needs to be overriden in derived classes in order to check op
77     validity.
78
79     """
80     self.proc = processor
81     self.op = op
82     self.cfg = context.cfg
83     self.sstore = sstore
84     self.context = context
85     self.needed_locks = None
86     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90
91     for attr_name in self._OP_REQP:
92       attr_val = getattr(op, attr_name, None)
93       if attr_val is None:
94         raise errors.OpPrereqError("Required parameter '%s' missing" %
95                                    attr_name)
96
97     if not self.cfg.IsCluster():
98       raise errors.OpPrereqError("Cluster not initialized yet,"
99                                  " use 'gnt-cluster init' first.")
100     if self.REQ_MASTER:
101       master = sstore.GetMasterNode()
102       if master != utils.HostInfo().name:
103         raise errors.OpPrereqError("Commands must be run on the master"
104                                    " node %s" % master)
105
106   def __GetSSH(self):
107     """Returns the SshRunner object
108
109     """
110     if not self.__ssh:
111       self.__ssh = ssh.SshRunner(self.sstore)
112     return self.__ssh
113
114   ssh = property(fget=__GetSSH)
115
116   def ExpandNames(self):
117     """Expand names for this LU.
118
119     This method is called before starting to execute the opcode, and it should
120     update all the parameters of the opcode to their canonical form (e.g. a
121     short node name must be fully expanded after this method has successfully
122     completed). This way locking, hooks, logging, ecc. can work correctly.
123
124     LUs which implement this method must also populate the self.needed_locks
125     member, as a dict with lock levels as keys, and a list of needed lock names
126     as values. Rules:
127       - Use an empty dict if you don't need any lock
128       - If you don't need any lock at a particular level omit that level
129       - Don't put anything for the BGL level
130       - If you want all locks at a level use None as a value
131         (this reflects what LockSet does, and will be replaced before
132         CheckPrereq with the full list of nodes that have been locked)
133
134     If you need to share locks (rather than acquire them exclusively) at one
135     level you can modify self.share_locks, setting a true value (usually 1) for
136     that level. By default locks are not shared.
137
138     Examples:
139     # Acquire all nodes and one instance
140     self.needed_locks = {
141       locking.LEVEL_NODE: None,
142       locking.LEVEL_INSTANCES: ['instance1.example.tld'],
143     }
144     # Acquire just two nodes
145     self.needed_locks = {
146       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
147     }
148     # Acquire no locks
149     self.needed_locks = {} # No, you can't leave it to the default value None
150
151     """
152     # The implementation of this method is mandatory only if the new LU is
153     # concurrent, so that old LUs don't need to be changed all at the same
154     # time.
155     if self.REQ_BGL:
156       self.needed_locks = {} # Exclusive LUs don't need locks.
157     else:
158       raise NotImplementedError
159
160   def DeclareLocks(self, level):
161     """Declare LU locking needs for a level
162
163     While most LUs can just declare their locking needs at ExpandNames time,
164     sometimes there's the need to calculate some locks after having acquired
165     the ones before. This function is called just before acquiring locks at a
166     particular level, but after acquiring the ones at lower levels, and permits
167     such calculations. It can be used to modify self.needed_locks, and by
168     default it does nothing.
169
170     This function is only called if you have something already set in
171     self.needed_locks for the level.
172
173     @param level: Locking level which is going to be locked
174     @type level: member of ganeti.locking.LEVELS
175
176     """
177
178   def CheckPrereq(self):
179     """Check prerequisites for this LU.
180
181     This method should check that the prerequisites for the execution
182     of this LU are fulfilled. It can do internode communication, but
183     it should be idempotent - no cluster or system changes are
184     allowed.
185
186     The method should raise errors.OpPrereqError in case something is
187     not fulfilled. Its return value is ignored.
188
189     This method should also update all the parameters of the opcode to
190     their canonical form if it hasn't been done by ExpandNames before.
191
192     """
193     raise NotImplementedError
194
195   def Exec(self, feedback_fn):
196     """Execute the LU.
197
198     This method should implement the actual work. It should raise
199     errors.OpExecError for failures that are somewhat dealt with in
200     code, or expected.
201
202     """
203     raise NotImplementedError
204
205   def BuildHooksEnv(self):
206     """Build hooks environment for this LU.
207
208     This method should return a three-node tuple consisting of: a dict
209     containing the environment that will be used for running the
210     specific hook for this LU, a list of node names on which the hook
211     should run before the execution, and a list of node names on which
212     the hook should run after the execution.
213
214     The keys of the dict must not have 'GANETI_' prefixed as this will
215     be handled in the hooks runner. Also note additional keys will be
216     added by the hooks runner. If the LU doesn't define any
217     environment, an empty dict (and not None) should be returned.
218
219     No nodes should be returned as an empty list (and not None).
220
221     Note that if the HPATH for a LU class is None, this function will
222     not be called.
223
224     """
225     raise NotImplementedError
226
227   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
228     """Notify the LU about the results of its hooks.
229
230     This method is called every time a hooks phase is executed, and notifies
231     the Logical Unit about the hooks' result. The LU can then use it to alter
232     its result based on the hooks.  By default the method does nothing and the
233     previous result is passed back unchanged but any LU can define it if it
234     wants to use the local cluster hook-scripts somehow.
235
236     Args:
237       phase: the hooks phase that has just been run
238       hooks_results: the results of the multi-node hooks rpc call
239       feedback_fn: function to send feedback back to the caller
240       lu_result: the previous result this LU had, or None in the PRE phase.
241
242     """
243     return lu_result
244
245   def _ExpandAndLockInstance(self):
246     """Helper function to expand and lock an instance.
247
248     Many LUs that work on an instance take its name in self.op.instance_name
249     and need to expand it and then declare the expanded name for locking. This
250     function does it, and then updates self.op.instance_name to the expanded
251     name. It also initializes needed_locks as a dict, if this hasn't been done
252     before.
253
254     """
255     if self.needed_locks is None:
256       self.needed_locks = {}
257     else:
258       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
259         "_ExpandAndLockInstance called with instance-level locks set"
260     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
261     if expanded_name is None:
262       raise errors.OpPrereqError("Instance '%s' not known" %
263                                   self.op.instance_name)
264     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
265     self.op.instance_name = expanded_name
266
267   def _LockInstancesNodes(self):
268     """Helper function to declare instances' nodes for locking.
269
270     This function should be called after locking one or more instances to lock
271     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
272     with all primary or secondary nodes for instances already locked and
273     present in self.needed_locks[locking.LEVEL_INSTANCE].
274
275     It should be called from DeclareLocks, and for safety only works if
276     self.recalculate_locks[locking.LEVEL_NODE] is set.
277
278     In the future it may grow parameters to just lock some instance's nodes, or
279     to just lock primaries or secondary nodes, if needed.
280
281     If should be called in DeclareLocks in a way similar to:
282
283     if level == locking.LEVEL_NODE:
284       self._LockInstancesNodes()
285
286     """
287     assert locking.LEVEL_NODE in self.recalculate_locks, \
288       "_LockInstancesNodes helper function called with no nodes to recalculate"
289
290     # TODO: check if we're really been called with the instance locks held
291
292     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
293     # future we might want to have different behaviors depending on the value
294     # of self.recalculate_locks[locking.LEVEL_NODE]
295     wanted_nodes = []
296     for instance_name in self.needed_locks[locking.LEVEL_INSTANCE]:
297       instance = self.context.cfg.GetInstanceInfo(instance_name)
298       wanted_nodes.append(instance.primary_node)
299       wanted_nodes.extend(instance.secondary_nodes)
300     self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
301
302     del self.recalculate_locks[locking.LEVEL_NODE]
303
304
305 class NoHooksLU(LogicalUnit):
306   """Simple LU which runs no hooks.
307
308   This LU is intended as a parent for other LogicalUnits which will
309   run no hooks, in order to reduce duplicate code.
310
311   """
312   HPATH = None
313   HTYPE = None
314
315
316 def _GetWantedNodes(lu, nodes):
317   """Returns list of checked and expanded node names.
318
319   Args:
320     nodes: List of nodes (strings) or None for all
321
322   """
323   if not isinstance(nodes, list):
324     raise errors.OpPrereqError("Invalid argument type 'nodes'")
325
326   if nodes:
327     wanted = []
328
329     for name in nodes:
330       node = lu.cfg.ExpandNodeName(name)
331       if node is None:
332         raise errors.OpPrereqError("No such node name '%s'" % name)
333       wanted.append(node)
334
335   else:
336     wanted = lu.cfg.GetNodeList()
337   return utils.NiceSort(wanted)
338
339
340 def _GetWantedInstances(lu, instances):
341   """Returns list of checked and expanded instance names.
342
343   Args:
344     instances: List of instances (strings) or None for all
345
346   """
347   if not isinstance(instances, list):
348     raise errors.OpPrereqError("Invalid argument type 'instances'")
349
350   if instances:
351     wanted = []
352
353     for name in instances:
354       instance = lu.cfg.ExpandInstanceName(name)
355       if instance is None:
356         raise errors.OpPrereqError("No such instance name '%s'" % name)
357       wanted.append(instance)
358
359   else:
360     wanted = lu.cfg.GetInstanceList()
361   return utils.NiceSort(wanted)
362
363
364 def _CheckOutputFields(static, dynamic, selected):
365   """Checks whether all selected fields are valid.
366
367   Args:
368     static: Static fields
369     dynamic: Dynamic fields
370
371   """
372   static_fields = frozenset(static)
373   dynamic_fields = frozenset(dynamic)
374
375   all_fields = static_fields | dynamic_fields
376
377   if not all_fields.issuperset(selected):
378     raise errors.OpPrereqError("Unknown output fields selected: %s"
379                                % ",".join(frozenset(selected).
380                                           difference(all_fields)))
381
382
383 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
384                           memory, vcpus, nics):
385   """Builds instance related env variables for hooks from single variables.
386
387   Args:
388     secondary_nodes: List of secondary nodes as strings
389   """
390   env = {
391     "OP_TARGET": name,
392     "INSTANCE_NAME": name,
393     "INSTANCE_PRIMARY": primary_node,
394     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
395     "INSTANCE_OS_TYPE": os_type,
396     "INSTANCE_STATUS": status,
397     "INSTANCE_MEMORY": memory,
398     "INSTANCE_VCPUS": vcpus,
399   }
400
401   if nics:
402     nic_count = len(nics)
403     for idx, (ip, bridge, mac) in enumerate(nics):
404       if ip is None:
405         ip = ""
406       env["INSTANCE_NIC%d_IP" % idx] = ip
407       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
408       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
409   else:
410     nic_count = 0
411
412   env["INSTANCE_NIC_COUNT"] = nic_count
413
414   return env
415
416
417 def _BuildInstanceHookEnvByObject(instance, override=None):
418   """Builds instance related env variables for hooks from an object.
419
420   Args:
421     instance: objects.Instance object of instance
422     override: dict of values to override
423   """
424   args = {
425     'name': instance.name,
426     'primary_node': instance.primary_node,
427     'secondary_nodes': instance.secondary_nodes,
428     'os_type': instance.os,
429     'status': instance.os,
430     'memory': instance.memory,
431     'vcpus': instance.vcpus,
432     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
433   }
434   if override:
435     args.update(override)
436   return _BuildInstanceHookEnv(**args)
437
438
439 def _CheckInstanceBridgesExist(instance):
440   """Check that the brigdes needed by an instance exist.
441
442   """
443   # check bridges existance
444   brlist = [nic.bridge for nic in instance.nics]
445   if not rpc.call_bridges_exist(instance.primary_node, brlist):
446     raise errors.OpPrereqError("one or more target bridges %s does not"
447                                " exist on destination node '%s'" %
448                                (brlist, instance.primary_node))
449
450
451 class LUDestroyCluster(NoHooksLU):
452   """Logical unit for destroying the cluster.
453
454   """
455   _OP_REQP = []
456
457   def CheckPrereq(self):
458     """Check prerequisites.
459
460     This checks whether the cluster is empty.
461
462     Any errors are signalled by raising errors.OpPrereqError.
463
464     """
465     master = self.sstore.GetMasterNode()
466
467     nodelist = self.cfg.GetNodeList()
468     if len(nodelist) != 1 or nodelist[0] != master:
469       raise errors.OpPrereqError("There are still %d node(s) in"
470                                  " this cluster." % (len(nodelist) - 1))
471     instancelist = self.cfg.GetInstanceList()
472     if instancelist:
473       raise errors.OpPrereqError("There are still %d instance(s) in"
474                                  " this cluster." % len(instancelist))
475
476   def Exec(self, feedback_fn):
477     """Destroys the cluster.
478
479     """
480     master = self.sstore.GetMasterNode()
481     if not rpc.call_node_stop_master(master, False):
482       raise errors.OpExecError("Could not disable the master role")
483     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
484     utils.CreateBackup(priv_key)
485     utils.CreateBackup(pub_key)
486     rpc.call_node_leave_cluster(master)
487
488
489 class LUVerifyCluster(LogicalUnit):
490   """Verifies the cluster status.
491
492   """
493   HPATH = "cluster-verify"
494   HTYPE = constants.HTYPE_CLUSTER
495   _OP_REQP = ["skip_checks"]
496
497   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
498                   remote_version, feedback_fn):
499     """Run multiple tests against a node.
500
501     Test list:
502       - compares ganeti version
503       - checks vg existance and size > 20G
504       - checks config file checksum
505       - checks ssh to other nodes
506
507     Args:
508       node: name of the node to check
509       file_list: required list of files
510       local_cksum: dictionary of local files and their checksums
511
512     """
513     # compares ganeti version
514     local_version = constants.PROTOCOL_VERSION
515     if not remote_version:
516       feedback_fn("  - ERROR: connection to %s failed" % (node))
517       return True
518
519     if local_version != remote_version:
520       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
521                       (local_version, node, remote_version))
522       return True
523
524     # checks vg existance and size > 20G
525
526     bad = False
527     if not vglist:
528       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
529                       (node,))
530       bad = True
531     else:
532       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
533                                             constants.MIN_VG_SIZE)
534       if vgstatus:
535         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
536         bad = True
537
538     # checks config file checksum
539     # checks ssh to any
540
541     if 'filelist' not in node_result:
542       bad = True
543       feedback_fn("  - ERROR: node hasn't returned file checksum data")
544     else:
545       remote_cksum = node_result['filelist']
546       for file_name in file_list:
547         if file_name not in remote_cksum:
548           bad = True
549           feedback_fn("  - ERROR: file '%s' missing" % file_name)
550         elif remote_cksum[file_name] != local_cksum[file_name]:
551           bad = True
552           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
553
554     if 'nodelist' not in node_result:
555       bad = True
556       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
557     else:
558       if node_result['nodelist']:
559         bad = True
560         for node in node_result['nodelist']:
561           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
562                           (node, node_result['nodelist'][node]))
563     if 'node-net-test' not in node_result:
564       bad = True
565       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
566     else:
567       if node_result['node-net-test']:
568         bad = True
569         nlist = utils.NiceSort(node_result['node-net-test'].keys())
570         for node in nlist:
571           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
572                           (node, node_result['node-net-test'][node]))
573
574     hyp_result = node_result.get('hypervisor', None)
575     if hyp_result is not None:
576       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
577     return bad
578
579   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
580                       node_instance, feedback_fn):
581     """Verify an instance.
582
583     This function checks to see if the required block devices are
584     available on the instance's node.
585
586     """
587     bad = False
588
589     node_current = instanceconfig.primary_node
590
591     node_vol_should = {}
592     instanceconfig.MapLVsByNode(node_vol_should)
593
594     for node in node_vol_should:
595       for volume in node_vol_should[node]:
596         if node not in node_vol_is or volume not in node_vol_is[node]:
597           feedback_fn("  - ERROR: volume %s missing on node %s" %
598                           (volume, node))
599           bad = True
600
601     if not instanceconfig.status == 'down':
602       if (node_current not in node_instance or
603           not instance in node_instance[node_current]):
604         feedback_fn("  - ERROR: instance %s not running on node %s" %
605                         (instance, node_current))
606         bad = True
607
608     for node in node_instance:
609       if (not node == node_current):
610         if instance in node_instance[node]:
611           feedback_fn("  - ERROR: instance %s should not run on node %s" %
612                           (instance, node))
613           bad = True
614
615     return bad
616
617   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
618     """Verify if there are any unknown volumes in the cluster.
619
620     The .os, .swap and backup volumes are ignored. All other volumes are
621     reported as unknown.
622
623     """
624     bad = False
625
626     for node in node_vol_is:
627       for volume in node_vol_is[node]:
628         if node not in node_vol_should or volume not in node_vol_should[node]:
629           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
630                       (volume, node))
631           bad = True
632     return bad
633
634   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
635     """Verify the list of running instances.
636
637     This checks what instances are running but unknown to the cluster.
638
639     """
640     bad = False
641     for node in node_instance:
642       for runninginstance in node_instance[node]:
643         if runninginstance not in instancelist:
644           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
645                           (runninginstance, node))
646           bad = True
647     return bad
648
649   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
650     """Verify N+1 Memory Resilience.
651
652     Check that if one single node dies we can still start all the instances it
653     was primary for.
654
655     """
656     bad = False
657
658     for node, nodeinfo in node_info.iteritems():
659       # This code checks that every node which is now listed as secondary has
660       # enough memory to host all instances it is supposed to should a single
661       # other node in the cluster fail.
662       # FIXME: not ready for failover to an arbitrary node
663       # FIXME: does not support file-backed instances
664       # WARNING: we currently take into account down instances as well as up
665       # ones, considering that even if they're down someone might want to start
666       # them even in the event of a node failure.
667       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
668         needed_mem = 0
669         for instance in instances:
670           needed_mem += instance_cfg[instance].memory
671         if nodeinfo['mfree'] < needed_mem:
672           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
673                       " failovers should node %s fail" % (node, prinode))
674           bad = True
675     return bad
676
677   def CheckPrereq(self):
678     """Check prerequisites.
679
680     Transform the list of checks we're going to skip into a set and check that
681     all its members are valid.
682
683     """
684     self.skip_set = frozenset(self.op.skip_checks)
685     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
686       raise errors.OpPrereqError("Invalid checks to be skipped specified")
687
688   def BuildHooksEnv(self):
689     """Build hooks env.
690
691     Cluster-Verify hooks just rone in the post phase and their failure makes
692     the output be logged in the verify output and the verification to fail.
693
694     """
695     all_nodes = self.cfg.GetNodeList()
696     # TODO: populate the environment with useful information for verify hooks
697     env = {}
698     return env, [], all_nodes
699
700   def Exec(self, feedback_fn):
701     """Verify integrity of cluster, performing various test on nodes.
702
703     """
704     bad = False
705     feedback_fn("* Verifying global settings")
706     for msg in self.cfg.VerifyConfig():
707       feedback_fn("  - ERROR: %s" % msg)
708
709     vg_name = self.cfg.GetVGName()
710     nodelist = utils.NiceSort(self.cfg.GetNodeList())
711     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
712     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
713     i_non_redundant = [] # Non redundant instances
714     node_volume = {}
715     node_instance = {}
716     node_info = {}
717     instance_cfg = {}
718
719     # FIXME: verify OS list
720     # do local checksums
721     file_names = list(self.sstore.GetFileList())
722     file_names.append(constants.SSL_CERT_FILE)
723     file_names.append(constants.CLUSTER_CONF_FILE)
724     local_checksums = utils.FingerprintFiles(file_names)
725
726     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
727     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
728     all_instanceinfo = rpc.call_instance_list(nodelist)
729     all_vglist = rpc.call_vg_list(nodelist)
730     node_verify_param = {
731       'filelist': file_names,
732       'nodelist': nodelist,
733       'hypervisor': None,
734       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
735                         for node in nodeinfo]
736       }
737     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
738     all_rversion = rpc.call_version(nodelist)
739     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
740
741     for node in nodelist:
742       feedback_fn("* Verifying node %s" % node)
743       result = self._VerifyNode(node, file_names, local_checksums,
744                                 all_vglist[node], all_nvinfo[node],
745                                 all_rversion[node], feedback_fn)
746       bad = bad or result
747
748       # node_volume
749       volumeinfo = all_volumeinfo[node]
750
751       if isinstance(volumeinfo, basestring):
752         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
753                     (node, volumeinfo[-400:].encode('string_escape')))
754         bad = True
755         node_volume[node] = {}
756       elif not isinstance(volumeinfo, dict):
757         feedback_fn("  - ERROR: connection to %s failed" % (node,))
758         bad = True
759         continue
760       else:
761         node_volume[node] = volumeinfo
762
763       # node_instance
764       nodeinstance = all_instanceinfo[node]
765       if type(nodeinstance) != list:
766         feedback_fn("  - ERROR: connection to %s failed" % (node,))
767         bad = True
768         continue
769
770       node_instance[node] = nodeinstance
771
772       # node_info
773       nodeinfo = all_ninfo[node]
774       if not isinstance(nodeinfo, dict):
775         feedback_fn("  - ERROR: connection to %s failed" % (node,))
776         bad = True
777         continue
778
779       try:
780         node_info[node] = {
781           "mfree": int(nodeinfo['memory_free']),
782           "dfree": int(nodeinfo['vg_free']),
783           "pinst": [],
784           "sinst": [],
785           # dictionary holding all instances this node is secondary for,
786           # grouped by their primary node. Each key is a cluster node, and each
787           # value is a list of instances which have the key as primary and the
788           # current node as secondary.  this is handy to calculate N+1 memory
789           # availability if you can only failover from a primary to its
790           # secondary.
791           "sinst-by-pnode": {},
792         }
793       except ValueError:
794         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
795         bad = True
796         continue
797
798     node_vol_should = {}
799
800     for instance in instancelist:
801       feedback_fn("* Verifying instance %s" % instance)
802       inst_config = self.cfg.GetInstanceInfo(instance)
803       result =  self._VerifyInstance(instance, inst_config, node_volume,
804                                      node_instance, feedback_fn)
805       bad = bad or result
806
807       inst_config.MapLVsByNode(node_vol_should)
808
809       instance_cfg[instance] = inst_config
810
811       pnode = inst_config.primary_node
812       if pnode in node_info:
813         node_info[pnode]['pinst'].append(instance)
814       else:
815         feedback_fn("  - ERROR: instance %s, connection to primary node"
816                     " %s failed" % (instance, pnode))
817         bad = True
818
819       # If the instance is non-redundant we cannot survive losing its primary
820       # node, so we are not N+1 compliant. On the other hand we have no disk
821       # templates with more than one secondary so that situation is not well
822       # supported either.
823       # FIXME: does not support file-backed instances
824       if len(inst_config.secondary_nodes) == 0:
825         i_non_redundant.append(instance)
826       elif len(inst_config.secondary_nodes) > 1:
827         feedback_fn("  - WARNING: multiple secondaries for instance %s"
828                     % instance)
829
830       for snode in inst_config.secondary_nodes:
831         if snode in node_info:
832           node_info[snode]['sinst'].append(instance)
833           if pnode not in node_info[snode]['sinst-by-pnode']:
834             node_info[snode]['sinst-by-pnode'][pnode] = []
835           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
836         else:
837           feedback_fn("  - ERROR: instance %s, connection to secondary node"
838                       " %s failed" % (instance, snode))
839
840     feedback_fn("* Verifying orphan volumes")
841     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
842                                        feedback_fn)
843     bad = bad or result
844
845     feedback_fn("* Verifying remaining instances")
846     result = self._VerifyOrphanInstances(instancelist, node_instance,
847                                          feedback_fn)
848     bad = bad or result
849
850     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
851       feedback_fn("* Verifying N+1 Memory redundancy")
852       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
853       bad = bad or result
854
855     feedback_fn("* Other Notes")
856     if i_non_redundant:
857       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
858                   % len(i_non_redundant))
859
860     return int(bad)
861
862   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
863     """Analize the post-hooks' result, handle it, and send some
864     nicely-formatted feedback back to the user.
865
866     Args:
867       phase: the hooks phase that has just been run
868       hooks_results: the results of the multi-node hooks rpc call
869       feedback_fn: function to send feedback back to the caller
870       lu_result: previous Exec result
871
872     """
873     # We only really run POST phase hooks, and are only interested in their results
874     if phase == constants.HOOKS_PHASE_POST:
875       # Used to change hooks' output to proper indentation
876       indent_re = re.compile('^', re.M)
877       feedback_fn("* Hooks Results")
878       if not hooks_results:
879         feedback_fn("  - ERROR: general communication failure")
880         lu_result = 1
881       else:
882         for node_name in hooks_results:
883           show_node_header = True
884           res = hooks_results[node_name]
885           if res is False or not isinstance(res, list):
886             feedback_fn("    Communication failure")
887             lu_result = 1
888             continue
889           for script, hkr, output in res:
890             if hkr == constants.HKR_FAIL:
891               # The node header is only shown once, if there are
892               # failing hooks on that node
893               if show_node_header:
894                 feedback_fn("  Node %s:" % node_name)
895                 show_node_header = False
896               feedback_fn("    ERROR: Script %s failed, output:" % script)
897               output = indent_re.sub('      ', output)
898               feedback_fn("%s" % output)
899               lu_result = 1
900
901       return lu_result
902
903
904 class LUVerifyDisks(NoHooksLU):
905   """Verifies the cluster disks status.
906
907   """
908   _OP_REQP = []
909
910   def CheckPrereq(self):
911     """Check prerequisites.
912
913     This has no prerequisites.
914
915     """
916     pass
917
918   def Exec(self, feedback_fn):
919     """Verify integrity of cluster disks.
920
921     """
922     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
923
924     vg_name = self.cfg.GetVGName()
925     nodes = utils.NiceSort(self.cfg.GetNodeList())
926     instances = [self.cfg.GetInstanceInfo(name)
927                  for name in self.cfg.GetInstanceList()]
928
929     nv_dict = {}
930     for inst in instances:
931       inst_lvs = {}
932       if (inst.status != "up" or
933           inst.disk_template not in constants.DTS_NET_MIRROR):
934         continue
935       inst.MapLVsByNode(inst_lvs)
936       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
937       for node, vol_list in inst_lvs.iteritems():
938         for vol in vol_list:
939           nv_dict[(node, vol)] = inst
940
941     if not nv_dict:
942       return result
943
944     node_lvs = rpc.call_volume_list(nodes, vg_name)
945
946     to_act = set()
947     for node in nodes:
948       # node_volume
949       lvs = node_lvs[node]
950
951       if isinstance(lvs, basestring):
952         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
953         res_nlvm[node] = lvs
954       elif not isinstance(lvs, dict):
955         logger.Info("connection to node %s failed or invalid data returned" %
956                     (node,))
957         res_nodes.append(node)
958         continue
959
960       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
961         inst = nv_dict.pop((node, lv_name), None)
962         if (not lv_online and inst is not None
963             and inst.name not in res_instances):
964           res_instances.append(inst.name)
965
966     # any leftover items in nv_dict are missing LVs, let's arrange the
967     # data better
968     for key, inst in nv_dict.iteritems():
969       if inst.name not in res_missing:
970         res_missing[inst.name] = []
971       res_missing[inst.name].append(key)
972
973     return result
974
975
976 class LURenameCluster(LogicalUnit):
977   """Rename the cluster.
978
979   """
980   HPATH = "cluster-rename"
981   HTYPE = constants.HTYPE_CLUSTER
982   _OP_REQP = ["name"]
983   REQ_WSSTORE = True
984
985   def BuildHooksEnv(self):
986     """Build hooks env.
987
988     """
989     env = {
990       "OP_TARGET": self.sstore.GetClusterName(),
991       "NEW_NAME": self.op.name,
992       }
993     mn = self.sstore.GetMasterNode()
994     return env, [mn], [mn]
995
996   def CheckPrereq(self):
997     """Verify that the passed name is a valid one.
998
999     """
1000     hostname = utils.HostInfo(self.op.name)
1001
1002     new_name = hostname.name
1003     self.ip = new_ip = hostname.ip
1004     old_name = self.sstore.GetClusterName()
1005     old_ip = self.sstore.GetMasterIP()
1006     if new_name == old_name and new_ip == old_ip:
1007       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1008                                  " cluster has changed")
1009     if new_ip != old_ip:
1010       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1011         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1012                                    " reachable on the network. Aborting." %
1013                                    new_ip)
1014
1015     self.op.name = new_name
1016
1017   def Exec(self, feedback_fn):
1018     """Rename the cluster.
1019
1020     """
1021     clustername = self.op.name
1022     ip = self.ip
1023     ss = self.sstore
1024
1025     # shutdown the master IP
1026     master = ss.GetMasterNode()
1027     if not rpc.call_node_stop_master(master, False):
1028       raise errors.OpExecError("Could not disable the master role")
1029
1030     try:
1031       # modify the sstore
1032       ss.SetKey(ss.SS_MASTER_IP, ip)
1033       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1034
1035       # Distribute updated ss config to all nodes
1036       myself = self.cfg.GetNodeInfo(master)
1037       dist_nodes = self.cfg.GetNodeList()
1038       if myself.name in dist_nodes:
1039         dist_nodes.remove(myself.name)
1040
1041       logger.Debug("Copying updated ssconf data to all nodes")
1042       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1043         fname = ss.KeyToFilename(keyname)
1044         result = rpc.call_upload_file(dist_nodes, fname)
1045         for to_node in dist_nodes:
1046           if not result[to_node]:
1047             logger.Error("copy of file %s to node %s failed" %
1048                          (fname, to_node))
1049     finally:
1050       if not rpc.call_node_start_master(master, False):
1051         logger.Error("Could not re-enable the master role on the master,"
1052                      " please restart manually.")
1053
1054
1055 def _RecursiveCheckIfLVMBased(disk):
1056   """Check if the given disk or its children are lvm-based.
1057
1058   Args:
1059     disk: ganeti.objects.Disk object
1060
1061   Returns:
1062     boolean indicating whether a LD_LV dev_type was found or not
1063
1064   """
1065   if disk.children:
1066     for chdisk in disk.children:
1067       if _RecursiveCheckIfLVMBased(chdisk):
1068         return True
1069   return disk.dev_type == constants.LD_LV
1070
1071
1072 class LUSetClusterParams(LogicalUnit):
1073   """Change the parameters of the cluster.
1074
1075   """
1076   HPATH = "cluster-modify"
1077   HTYPE = constants.HTYPE_CLUSTER
1078   _OP_REQP = []
1079
1080   def BuildHooksEnv(self):
1081     """Build hooks env.
1082
1083     """
1084     env = {
1085       "OP_TARGET": self.sstore.GetClusterName(),
1086       "NEW_VG_NAME": self.op.vg_name,
1087       }
1088     mn = self.sstore.GetMasterNode()
1089     return env, [mn], [mn]
1090
1091   def CheckPrereq(self):
1092     """Check prerequisites.
1093
1094     This checks whether the given params don't conflict and
1095     if the given volume group is valid.
1096
1097     """
1098     if not self.op.vg_name:
1099       instances = [self.cfg.GetInstanceInfo(name)
1100                    for name in self.cfg.GetInstanceList()]
1101       for inst in instances:
1102         for disk in inst.disks:
1103           if _RecursiveCheckIfLVMBased(disk):
1104             raise errors.OpPrereqError("Cannot disable lvm storage while"
1105                                        " lvm-based instances exist")
1106
1107     # if vg_name not None, checks given volume group on all nodes
1108     if self.op.vg_name:
1109       node_list = self.cfg.GetNodeList()
1110       vglist = rpc.call_vg_list(node_list)
1111       for node in node_list:
1112         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1113                                               constants.MIN_VG_SIZE)
1114         if vgstatus:
1115           raise errors.OpPrereqError("Error on node '%s': %s" %
1116                                      (node, vgstatus))
1117
1118   def Exec(self, feedback_fn):
1119     """Change the parameters of the cluster.
1120
1121     """
1122     if self.op.vg_name != self.cfg.GetVGName():
1123       self.cfg.SetVGName(self.op.vg_name)
1124     else:
1125       feedback_fn("Cluster LVM configuration already in desired"
1126                   " state, not changing")
1127
1128
1129 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1130   """Sleep and poll for an instance's disk to sync.
1131
1132   """
1133   if not instance.disks:
1134     return True
1135
1136   if not oneshot:
1137     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1138
1139   node = instance.primary_node
1140
1141   for dev in instance.disks:
1142     cfgw.SetDiskID(dev, node)
1143
1144   retries = 0
1145   while True:
1146     max_time = 0
1147     done = True
1148     cumul_degraded = False
1149     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1150     if not rstats:
1151       proc.LogWarning("Can't get any data from node %s" % node)
1152       retries += 1
1153       if retries >= 10:
1154         raise errors.RemoteError("Can't contact node %s for mirror data,"
1155                                  " aborting." % node)
1156       time.sleep(6)
1157       continue
1158     retries = 0
1159     for i in range(len(rstats)):
1160       mstat = rstats[i]
1161       if mstat is None:
1162         proc.LogWarning("Can't compute data for node %s/%s" %
1163                         (node, instance.disks[i].iv_name))
1164         continue
1165       # we ignore the ldisk parameter
1166       perc_done, est_time, is_degraded, _ = mstat
1167       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1168       if perc_done is not None:
1169         done = False
1170         if est_time is not None:
1171           rem_time = "%d estimated seconds remaining" % est_time
1172           max_time = est_time
1173         else:
1174           rem_time = "no time estimate"
1175         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1176                      (instance.disks[i].iv_name, perc_done, rem_time))
1177     if done or oneshot:
1178       break
1179
1180     time.sleep(min(60, max_time))
1181
1182   if done:
1183     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1184   return not cumul_degraded
1185
1186
1187 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1188   """Check that mirrors are not degraded.
1189
1190   The ldisk parameter, if True, will change the test from the
1191   is_degraded attribute (which represents overall non-ok status for
1192   the device(s)) to the ldisk (representing the local storage status).
1193
1194   """
1195   cfgw.SetDiskID(dev, node)
1196   if ldisk:
1197     idx = 6
1198   else:
1199     idx = 5
1200
1201   result = True
1202   if on_primary or dev.AssembleOnSecondary():
1203     rstats = rpc.call_blockdev_find(node, dev)
1204     if not rstats:
1205       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1206       result = False
1207     else:
1208       result = result and (not rstats[idx])
1209   if dev.children:
1210     for child in dev.children:
1211       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1212
1213   return result
1214
1215
1216 class LUDiagnoseOS(NoHooksLU):
1217   """Logical unit for OS diagnose/query.
1218
1219   """
1220   _OP_REQP = ["output_fields", "names"]
1221
1222   def CheckPrereq(self):
1223     """Check prerequisites.
1224
1225     This always succeeds, since this is a pure query LU.
1226
1227     """
1228     if self.op.names:
1229       raise errors.OpPrereqError("Selective OS query not supported")
1230
1231     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1232     _CheckOutputFields(static=[],
1233                        dynamic=self.dynamic_fields,
1234                        selected=self.op.output_fields)
1235
1236   @staticmethod
1237   def _DiagnoseByOS(node_list, rlist):
1238     """Remaps a per-node return list into an a per-os per-node dictionary
1239
1240       Args:
1241         node_list: a list with the names of all nodes
1242         rlist: a map with node names as keys and OS objects as values
1243
1244       Returns:
1245         map: a map with osnames as keys and as value another map, with
1246              nodes as
1247              keys and list of OS objects as values
1248              e.g. {"debian-etch": {"node1": [<object>,...],
1249                                    "node2": [<object>,]}
1250                   }
1251
1252     """
1253     all_os = {}
1254     for node_name, nr in rlist.iteritems():
1255       if not nr:
1256         continue
1257       for os_obj in nr:
1258         if os_obj.name not in all_os:
1259           # build a list of nodes for this os containing empty lists
1260           # for each node in node_list
1261           all_os[os_obj.name] = {}
1262           for nname in node_list:
1263             all_os[os_obj.name][nname] = []
1264         all_os[os_obj.name][node_name].append(os_obj)
1265     return all_os
1266
1267   def Exec(self, feedback_fn):
1268     """Compute the list of OSes.
1269
1270     """
1271     node_list = self.cfg.GetNodeList()
1272     node_data = rpc.call_os_diagnose(node_list)
1273     if node_data == False:
1274       raise errors.OpExecError("Can't gather the list of OSes")
1275     pol = self._DiagnoseByOS(node_list, node_data)
1276     output = []
1277     for os_name, os_data in pol.iteritems():
1278       row = []
1279       for field in self.op.output_fields:
1280         if field == "name":
1281           val = os_name
1282         elif field == "valid":
1283           val = utils.all([osl and osl[0] for osl in os_data.values()])
1284         elif field == "node_status":
1285           val = {}
1286           for node_name, nos_list in os_data.iteritems():
1287             val[node_name] = [(v.status, v.path) for v in nos_list]
1288         else:
1289           raise errors.ParameterError(field)
1290         row.append(val)
1291       output.append(row)
1292
1293     return output
1294
1295
1296 class LURemoveNode(LogicalUnit):
1297   """Logical unit for removing a node.
1298
1299   """
1300   HPATH = "node-remove"
1301   HTYPE = constants.HTYPE_NODE
1302   _OP_REQP = ["node_name"]
1303
1304   def BuildHooksEnv(self):
1305     """Build hooks env.
1306
1307     This doesn't run on the target node in the pre phase as a failed
1308     node would then be impossible to remove.
1309
1310     """
1311     env = {
1312       "OP_TARGET": self.op.node_name,
1313       "NODE_NAME": self.op.node_name,
1314       }
1315     all_nodes = self.cfg.GetNodeList()
1316     all_nodes.remove(self.op.node_name)
1317     return env, all_nodes, all_nodes
1318
1319   def CheckPrereq(self):
1320     """Check prerequisites.
1321
1322     This checks:
1323      - the node exists in the configuration
1324      - it does not have primary or secondary instances
1325      - it's not the master
1326
1327     Any errors are signalled by raising errors.OpPrereqError.
1328
1329     """
1330     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1331     if node is None:
1332       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1333
1334     instance_list = self.cfg.GetInstanceList()
1335
1336     masternode = self.sstore.GetMasterNode()
1337     if node.name == masternode:
1338       raise errors.OpPrereqError("Node is the master node,"
1339                                  " you need to failover first.")
1340
1341     for instance_name in instance_list:
1342       instance = self.cfg.GetInstanceInfo(instance_name)
1343       if node.name == instance.primary_node:
1344         raise errors.OpPrereqError("Instance %s still running on the node,"
1345                                    " please remove first." % instance_name)
1346       if node.name in instance.secondary_nodes:
1347         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1348                                    " please remove first." % instance_name)
1349     self.op.node_name = node.name
1350     self.node = node
1351
1352   def Exec(self, feedback_fn):
1353     """Removes the node from the cluster.
1354
1355     """
1356     node = self.node
1357     logger.Info("stopping the node daemon and removing configs from node %s" %
1358                 node.name)
1359
1360     rpc.call_node_leave_cluster(node.name)
1361
1362     logger.Info("Removing node %s from config" % node.name)
1363
1364     self.cfg.RemoveNode(node.name)
1365     # Remove the node from the Ganeti Lock Manager
1366     self.context.glm.remove(locking.LEVEL_NODE, node.name)
1367
1368     utils.RemoveHostFromEtcHosts(node.name)
1369
1370
1371 class LUQueryNodes(NoHooksLU):
1372   """Logical unit for querying nodes.
1373
1374   """
1375   _OP_REQP = ["output_fields", "names"]
1376
1377   def CheckPrereq(self):
1378     """Check prerequisites.
1379
1380     This checks that the fields required are valid output fields.
1381
1382     """
1383     self.dynamic_fields = frozenset([
1384       "dtotal", "dfree",
1385       "mtotal", "mnode", "mfree",
1386       "bootid",
1387       "ctotal",
1388       ])
1389
1390     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1391                                "pinst_list", "sinst_list",
1392                                "pip", "sip", "tags"],
1393                        dynamic=self.dynamic_fields,
1394                        selected=self.op.output_fields)
1395
1396     self.wanted = _GetWantedNodes(self, self.op.names)
1397
1398   def Exec(self, feedback_fn):
1399     """Computes the list of nodes and their attributes.
1400
1401     """
1402     nodenames = self.wanted
1403     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1404
1405     # begin data gathering
1406
1407     if self.dynamic_fields.intersection(self.op.output_fields):
1408       live_data = {}
1409       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1410       for name in nodenames:
1411         nodeinfo = node_data.get(name, None)
1412         if nodeinfo:
1413           live_data[name] = {
1414             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1415             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1416             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1417             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1418             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1419             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1420             "bootid": nodeinfo['bootid'],
1421             }
1422         else:
1423           live_data[name] = {}
1424     else:
1425       live_data = dict.fromkeys(nodenames, {})
1426
1427     node_to_primary = dict([(name, set()) for name in nodenames])
1428     node_to_secondary = dict([(name, set()) for name in nodenames])
1429
1430     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1431                              "sinst_cnt", "sinst_list"))
1432     if inst_fields & frozenset(self.op.output_fields):
1433       instancelist = self.cfg.GetInstanceList()
1434
1435       for instance_name in instancelist:
1436         inst = self.cfg.GetInstanceInfo(instance_name)
1437         if inst.primary_node in node_to_primary:
1438           node_to_primary[inst.primary_node].add(inst.name)
1439         for secnode in inst.secondary_nodes:
1440           if secnode in node_to_secondary:
1441             node_to_secondary[secnode].add(inst.name)
1442
1443     # end data gathering
1444
1445     output = []
1446     for node in nodelist:
1447       node_output = []
1448       for field in self.op.output_fields:
1449         if field == "name":
1450           val = node.name
1451         elif field == "pinst_list":
1452           val = list(node_to_primary[node.name])
1453         elif field == "sinst_list":
1454           val = list(node_to_secondary[node.name])
1455         elif field == "pinst_cnt":
1456           val = len(node_to_primary[node.name])
1457         elif field == "sinst_cnt":
1458           val = len(node_to_secondary[node.name])
1459         elif field == "pip":
1460           val = node.primary_ip
1461         elif field == "sip":
1462           val = node.secondary_ip
1463         elif field == "tags":
1464           val = list(node.GetTags())
1465         elif field in self.dynamic_fields:
1466           val = live_data[node.name].get(field, None)
1467         else:
1468           raise errors.ParameterError(field)
1469         node_output.append(val)
1470       output.append(node_output)
1471
1472     return output
1473
1474
1475 class LUQueryNodeVolumes(NoHooksLU):
1476   """Logical unit for getting volumes on node(s).
1477
1478   """
1479   _OP_REQP = ["nodes", "output_fields"]
1480
1481   def CheckPrereq(self):
1482     """Check prerequisites.
1483
1484     This checks that the fields required are valid output fields.
1485
1486     """
1487     self.nodes = _GetWantedNodes(self, self.op.nodes)
1488
1489     _CheckOutputFields(static=["node"],
1490                        dynamic=["phys", "vg", "name", "size", "instance"],
1491                        selected=self.op.output_fields)
1492
1493
1494   def Exec(self, feedback_fn):
1495     """Computes the list of nodes and their attributes.
1496
1497     """
1498     nodenames = self.nodes
1499     volumes = rpc.call_node_volumes(nodenames)
1500
1501     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1502              in self.cfg.GetInstanceList()]
1503
1504     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1505
1506     output = []
1507     for node in nodenames:
1508       if node not in volumes or not volumes[node]:
1509         continue
1510
1511       node_vols = volumes[node][:]
1512       node_vols.sort(key=lambda vol: vol['dev'])
1513
1514       for vol in node_vols:
1515         node_output = []
1516         for field in self.op.output_fields:
1517           if field == "node":
1518             val = node
1519           elif field == "phys":
1520             val = vol['dev']
1521           elif field == "vg":
1522             val = vol['vg']
1523           elif field == "name":
1524             val = vol['name']
1525           elif field == "size":
1526             val = int(float(vol['size']))
1527           elif field == "instance":
1528             for inst in ilist:
1529               if node not in lv_by_node[inst]:
1530                 continue
1531               if vol['name'] in lv_by_node[inst][node]:
1532                 val = inst.name
1533                 break
1534             else:
1535               val = '-'
1536           else:
1537             raise errors.ParameterError(field)
1538           node_output.append(str(val))
1539
1540         output.append(node_output)
1541
1542     return output
1543
1544
1545 class LUAddNode(LogicalUnit):
1546   """Logical unit for adding node to the cluster.
1547
1548   """
1549   HPATH = "node-add"
1550   HTYPE = constants.HTYPE_NODE
1551   _OP_REQP = ["node_name"]
1552
1553   def BuildHooksEnv(self):
1554     """Build hooks env.
1555
1556     This will run on all nodes before, and on all nodes + the new node after.
1557
1558     """
1559     env = {
1560       "OP_TARGET": self.op.node_name,
1561       "NODE_NAME": self.op.node_name,
1562       "NODE_PIP": self.op.primary_ip,
1563       "NODE_SIP": self.op.secondary_ip,
1564       }
1565     nodes_0 = self.cfg.GetNodeList()
1566     nodes_1 = nodes_0 + [self.op.node_name, ]
1567     return env, nodes_0, nodes_1
1568
1569   def CheckPrereq(self):
1570     """Check prerequisites.
1571
1572     This checks:
1573      - the new node is not already in the config
1574      - it is resolvable
1575      - its parameters (single/dual homed) matches the cluster
1576
1577     Any errors are signalled by raising errors.OpPrereqError.
1578
1579     """
1580     node_name = self.op.node_name
1581     cfg = self.cfg
1582
1583     dns_data = utils.HostInfo(node_name)
1584
1585     node = dns_data.name
1586     primary_ip = self.op.primary_ip = dns_data.ip
1587     secondary_ip = getattr(self.op, "secondary_ip", None)
1588     if secondary_ip is None:
1589       secondary_ip = primary_ip
1590     if not utils.IsValidIP(secondary_ip):
1591       raise errors.OpPrereqError("Invalid secondary IP given")
1592     self.op.secondary_ip = secondary_ip
1593
1594     node_list = cfg.GetNodeList()
1595     if not self.op.readd and node in node_list:
1596       raise errors.OpPrereqError("Node %s is already in the configuration" %
1597                                  node)
1598     elif self.op.readd and node not in node_list:
1599       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1600
1601     for existing_node_name in node_list:
1602       existing_node = cfg.GetNodeInfo(existing_node_name)
1603
1604       if self.op.readd and node == existing_node_name:
1605         if (existing_node.primary_ip != primary_ip or
1606             existing_node.secondary_ip != secondary_ip):
1607           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1608                                      " address configuration as before")
1609         continue
1610
1611       if (existing_node.primary_ip == primary_ip or
1612           existing_node.secondary_ip == primary_ip or
1613           existing_node.primary_ip == secondary_ip or
1614           existing_node.secondary_ip == secondary_ip):
1615         raise errors.OpPrereqError("New node ip address(es) conflict with"
1616                                    " existing node %s" % existing_node.name)
1617
1618     # check that the type of the node (single versus dual homed) is the
1619     # same as for the master
1620     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1621     master_singlehomed = myself.secondary_ip == myself.primary_ip
1622     newbie_singlehomed = secondary_ip == primary_ip
1623     if master_singlehomed != newbie_singlehomed:
1624       if master_singlehomed:
1625         raise errors.OpPrereqError("The master has no private ip but the"
1626                                    " new node has one")
1627       else:
1628         raise errors.OpPrereqError("The master has a private ip but the"
1629                                    " new node doesn't have one")
1630
1631     # checks reachablity
1632     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1633       raise errors.OpPrereqError("Node not reachable by ping")
1634
1635     if not newbie_singlehomed:
1636       # check reachability from my secondary ip to newbie's secondary ip
1637       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1638                            source=myself.secondary_ip):
1639         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1640                                    " based ping to noded port")
1641
1642     self.new_node = objects.Node(name=node,
1643                                  primary_ip=primary_ip,
1644                                  secondary_ip=secondary_ip)
1645
1646   def Exec(self, feedback_fn):
1647     """Adds the new node to the cluster.
1648
1649     """
1650     new_node = self.new_node
1651     node = new_node.name
1652
1653     # check connectivity
1654     result = rpc.call_version([node])[node]
1655     if result:
1656       if constants.PROTOCOL_VERSION == result:
1657         logger.Info("communication to node %s fine, sw version %s match" %
1658                     (node, result))
1659       else:
1660         raise errors.OpExecError("Version mismatch master version %s,"
1661                                  " node version %s" %
1662                                  (constants.PROTOCOL_VERSION, result))
1663     else:
1664       raise errors.OpExecError("Cannot get version from the new node")
1665
1666     # setup ssh on node
1667     logger.Info("copy ssh key to node %s" % node)
1668     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1669     keyarray = []
1670     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1671                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1672                 priv_key, pub_key]
1673
1674     for i in keyfiles:
1675       f = open(i, 'r')
1676       try:
1677         keyarray.append(f.read())
1678       finally:
1679         f.close()
1680
1681     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1682                                keyarray[3], keyarray[4], keyarray[5])
1683
1684     if not result:
1685       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1686
1687     # Add node to our /etc/hosts, and add key to known_hosts
1688     utils.AddHostToEtcHosts(new_node.name)
1689
1690     if new_node.secondary_ip != new_node.primary_ip:
1691       if not rpc.call_node_tcp_ping(new_node.name,
1692                                     constants.LOCALHOST_IP_ADDRESS,
1693                                     new_node.secondary_ip,
1694                                     constants.DEFAULT_NODED_PORT,
1695                                     10, False):
1696         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1697                                  " you gave (%s). Please fix and re-run this"
1698                                  " command." % new_node.secondary_ip)
1699
1700     node_verify_list = [self.sstore.GetMasterNode()]
1701     node_verify_param = {
1702       'nodelist': [node],
1703       # TODO: do a node-net-test as well?
1704     }
1705
1706     result = rpc.call_node_verify(node_verify_list, node_verify_param)
1707     for verifier in node_verify_list:
1708       if not result[verifier]:
1709         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1710                                  " for remote verification" % verifier)
1711       if result[verifier]['nodelist']:
1712         for failed in result[verifier]['nodelist']:
1713           feedback_fn("ssh/hostname verification failed %s -> %s" %
1714                       (verifier, result[verifier]['nodelist'][failed]))
1715         raise errors.OpExecError("ssh/hostname verification failed.")
1716
1717     # Distribute updated /etc/hosts and known_hosts to all nodes,
1718     # including the node just added
1719     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1720     dist_nodes = self.cfg.GetNodeList()
1721     if not self.op.readd:
1722       dist_nodes.append(node)
1723     if myself.name in dist_nodes:
1724       dist_nodes.remove(myself.name)
1725
1726     logger.Debug("Copying hosts and known_hosts to all nodes")
1727     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1728       result = rpc.call_upload_file(dist_nodes, fname)
1729       for to_node in dist_nodes:
1730         if not result[to_node]:
1731           logger.Error("copy of file %s to node %s failed" %
1732                        (fname, to_node))
1733
1734     to_copy = self.sstore.GetFileList()
1735     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1736       to_copy.append(constants.VNC_PASSWORD_FILE)
1737     for fname in to_copy:
1738       result = rpc.call_upload_file([node], fname)
1739       if not result[node]:
1740         logger.Error("could not copy file %s to node %s" % (fname, node))
1741
1742     if not self.op.readd:
1743       logger.Info("adding node %s to cluster.conf" % node)
1744       self.cfg.AddNode(new_node)
1745       # Add the new node to the Ganeti Lock Manager
1746       self.context.glm.add(locking.LEVEL_NODE, node)
1747
1748
1749 class LUQueryClusterInfo(NoHooksLU):
1750   """Query cluster configuration.
1751
1752   """
1753   _OP_REQP = []
1754   REQ_MASTER = False
1755   REQ_BGL = False
1756
1757   def ExpandNames(self):
1758     self.needed_locks = {}
1759
1760   def CheckPrereq(self):
1761     """No prerequsites needed for this LU.
1762
1763     """
1764     pass
1765
1766   def Exec(self, feedback_fn):
1767     """Return cluster config.
1768
1769     """
1770     result = {
1771       "name": self.sstore.GetClusterName(),
1772       "software_version": constants.RELEASE_VERSION,
1773       "protocol_version": constants.PROTOCOL_VERSION,
1774       "config_version": constants.CONFIG_VERSION,
1775       "os_api_version": constants.OS_API_VERSION,
1776       "export_version": constants.EXPORT_VERSION,
1777       "master": self.sstore.GetMasterNode(),
1778       "architecture": (platform.architecture()[0], platform.machine()),
1779       "hypervisor_type": self.sstore.GetHypervisorType(),
1780       }
1781
1782     return result
1783
1784
1785 class LUDumpClusterConfig(NoHooksLU):
1786   """Return a text-representation of the cluster-config.
1787
1788   """
1789   _OP_REQP = []
1790   REQ_BGL = False
1791
1792   def ExpandNames(self):
1793     self.needed_locks = {}
1794
1795   def CheckPrereq(self):
1796     """No prerequisites.
1797
1798     """
1799     pass
1800
1801   def Exec(self, feedback_fn):
1802     """Dump a representation of the cluster config to the standard output.
1803
1804     """
1805     return self.cfg.DumpConfig()
1806
1807
1808 class LUActivateInstanceDisks(NoHooksLU):
1809   """Bring up an instance's disks.
1810
1811   """
1812   _OP_REQP = ["instance_name"]
1813
1814   def CheckPrereq(self):
1815     """Check prerequisites.
1816
1817     This checks that the instance is in the cluster.
1818
1819     """
1820     instance = self.cfg.GetInstanceInfo(
1821       self.cfg.ExpandInstanceName(self.op.instance_name))
1822     if instance is None:
1823       raise errors.OpPrereqError("Instance '%s' not known" %
1824                                  self.op.instance_name)
1825     self.instance = instance
1826
1827
1828   def Exec(self, feedback_fn):
1829     """Activate the disks.
1830
1831     """
1832     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1833     if not disks_ok:
1834       raise errors.OpExecError("Cannot activate block devices")
1835
1836     return disks_info
1837
1838
1839 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1840   """Prepare the block devices for an instance.
1841
1842   This sets up the block devices on all nodes.
1843
1844   Args:
1845     instance: a ganeti.objects.Instance object
1846     ignore_secondaries: if true, errors on secondary nodes won't result
1847                         in an error return from the function
1848
1849   Returns:
1850     false if the operation failed
1851     list of (host, instance_visible_name, node_visible_name) if the operation
1852          suceeded with the mapping from node devices to instance devices
1853   """
1854   device_info = []
1855   disks_ok = True
1856   iname = instance.name
1857   # With the two passes mechanism we try to reduce the window of
1858   # opportunity for the race condition of switching DRBD to primary
1859   # before handshaking occured, but we do not eliminate it
1860
1861   # The proper fix would be to wait (with some limits) until the
1862   # connection has been made and drbd transitions from WFConnection
1863   # into any other network-connected state (Connected, SyncTarget,
1864   # SyncSource, etc.)
1865
1866   # 1st pass, assemble on all nodes in secondary mode
1867   for inst_disk in instance.disks:
1868     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1869       cfg.SetDiskID(node_disk, node)
1870       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1871       if not result:
1872         logger.Error("could not prepare block device %s on node %s"
1873                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1874         if not ignore_secondaries:
1875           disks_ok = False
1876
1877   # FIXME: race condition on drbd migration to primary
1878
1879   # 2nd pass, do only the primary node
1880   for inst_disk in instance.disks:
1881     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1882       if node != instance.primary_node:
1883         continue
1884       cfg.SetDiskID(node_disk, node)
1885       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1886       if not result:
1887         logger.Error("could not prepare block device %s on node %s"
1888                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1889         disks_ok = False
1890     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1891
1892   # leave the disks configured for the primary node
1893   # this is a workaround that would be fixed better by
1894   # improving the logical/physical id handling
1895   for disk in instance.disks:
1896     cfg.SetDiskID(disk, instance.primary_node)
1897
1898   return disks_ok, device_info
1899
1900
1901 def _StartInstanceDisks(cfg, instance, force):
1902   """Start the disks of an instance.
1903
1904   """
1905   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1906                                            ignore_secondaries=force)
1907   if not disks_ok:
1908     _ShutdownInstanceDisks(instance, cfg)
1909     if force is not None and not force:
1910       logger.Error("If the message above refers to a secondary node,"
1911                    " you can retry the operation using '--force'.")
1912     raise errors.OpExecError("Disk consistency error")
1913
1914
1915 class LUDeactivateInstanceDisks(NoHooksLU):
1916   """Shutdown an instance's disks.
1917
1918   """
1919   _OP_REQP = ["instance_name"]
1920
1921   def CheckPrereq(self):
1922     """Check prerequisites.
1923
1924     This checks that the instance is in the cluster.
1925
1926     """
1927     instance = self.cfg.GetInstanceInfo(
1928       self.cfg.ExpandInstanceName(self.op.instance_name))
1929     if instance is None:
1930       raise errors.OpPrereqError("Instance '%s' not known" %
1931                                  self.op.instance_name)
1932     self.instance = instance
1933
1934   def Exec(self, feedback_fn):
1935     """Deactivate the disks
1936
1937     """
1938     instance = self.instance
1939     ins_l = rpc.call_instance_list([instance.primary_node])
1940     ins_l = ins_l[instance.primary_node]
1941     if not type(ins_l) is list:
1942       raise errors.OpExecError("Can't contact node '%s'" %
1943                                instance.primary_node)
1944
1945     if self.instance.name in ins_l:
1946       raise errors.OpExecError("Instance is running, can't shutdown"
1947                                " block devices.")
1948
1949     _ShutdownInstanceDisks(instance, self.cfg)
1950
1951
1952 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1953   """Shutdown block devices of an instance.
1954
1955   This does the shutdown on all nodes of the instance.
1956
1957   If the ignore_primary is false, errors on the primary node are
1958   ignored.
1959
1960   """
1961   result = True
1962   for disk in instance.disks:
1963     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1964       cfg.SetDiskID(top_disk, node)
1965       if not rpc.call_blockdev_shutdown(node, top_disk):
1966         logger.Error("could not shutdown block device %s on node %s" %
1967                      (disk.iv_name, node))
1968         if not ignore_primary or node != instance.primary_node:
1969           result = False
1970   return result
1971
1972
1973 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1974   """Checks if a node has enough free memory.
1975
1976   This function check if a given node has the needed amount of free
1977   memory. In case the node has less memory or we cannot get the
1978   information from the node, this function raise an OpPrereqError
1979   exception.
1980
1981   Args:
1982     - cfg: a ConfigWriter instance
1983     - node: the node name
1984     - reason: string to use in the error message
1985     - requested: the amount of memory in MiB
1986
1987   """
1988   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1989   if not nodeinfo or not isinstance(nodeinfo, dict):
1990     raise errors.OpPrereqError("Could not contact node %s for resource"
1991                              " information" % (node,))
1992
1993   free_mem = nodeinfo[node].get('memory_free')
1994   if not isinstance(free_mem, int):
1995     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1996                              " was '%s'" % (node, free_mem))
1997   if requested > free_mem:
1998     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1999                              " needed %s MiB, available %s MiB" %
2000                              (node, reason, requested, free_mem))
2001
2002
2003 class LUStartupInstance(LogicalUnit):
2004   """Starts an instance.
2005
2006   """
2007   HPATH = "instance-start"
2008   HTYPE = constants.HTYPE_INSTANCE
2009   _OP_REQP = ["instance_name", "force"]
2010   REQ_BGL = False
2011
2012   def ExpandNames(self):
2013     self._ExpandAndLockInstance()
2014     self.needed_locks[locking.LEVEL_NODE] = []
2015     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2016
2017   def DeclareLocks(self, level):
2018     if level == locking.LEVEL_NODE:
2019       self._LockInstancesNodes()
2020
2021   def BuildHooksEnv(self):
2022     """Build hooks env.
2023
2024     This runs on master, primary and secondary nodes of the instance.
2025
2026     """
2027     env = {
2028       "FORCE": self.op.force,
2029       }
2030     env.update(_BuildInstanceHookEnvByObject(self.instance))
2031     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2032           list(self.instance.secondary_nodes))
2033     return env, nl, nl
2034
2035   def CheckPrereq(self):
2036     """Check prerequisites.
2037
2038     This checks that the instance is in the cluster.
2039
2040     """
2041     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2042     assert self.instance is not None, \
2043       "Cannot retrieve locked instance %s" % self.op.instance_name
2044
2045     # check bridges existance
2046     _CheckInstanceBridgesExist(instance)
2047
2048     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2049                          "starting instance %s" % instance.name,
2050                          instance.memory)
2051
2052   def Exec(self, feedback_fn):
2053     """Start the instance.
2054
2055     """
2056     instance = self.instance
2057     force = self.op.force
2058     extra_args = getattr(self.op, "extra_args", "")
2059
2060     self.cfg.MarkInstanceUp(instance.name)
2061
2062     node_current = instance.primary_node
2063
2064     _StartInstanceDisks(self.cfg, instance, force)
2065
2066     if not rpc.call_instance_start(node_current, instance, extra_args):
2067       _ShutdownInstanceDisks(instance, self.cfg)
2068       raise errors.OpExecError("Could not start instance")
2069
2070
2071 class LURebootInstance(LogicalUnit):
2072   """Reboot an instance.
2073
2074   """
2075   HPATH = "instance-reboot"
2076   HTYPE = constants.HTYPE_INSTANCE
2077   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2078   REQ_BGL = False
2079
2080   def ExpandNames(self):
2081     self._ExpandAndLockInstance()
2082     self.needed_locks[locking.LEVEL_NODE] = []
2083     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2084
2085   def DeclareLocks(self, level):
2086     if level == locking.LEVEL_NODE:
2087       self._LockInstancesNodes()
2088
2089   def BuildHooksEnv(self):
2090     """Build hooks env.
2091
2092     This runs on master, primary and secondary nodes of the instance.
2093
2094     """
2095     env = {
2096       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2097       }
2098     env.update(_BuildInstanceHookEnvByObject(self.instance))
2099     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2100           list(self.instance.secondary_nodes))
2101     return env, nl, nl
2102
2103   def CheckPrereq(self):
2104     """Check prerequisites.
2105
2106     This checks that the instance is in the cluster.
2107
2108     """
2109     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2110     assert self.instance is not None, \
2111       "Cannot retrieve locked instance %s" % self.op.instance_name
2112
2113     # check bridges existance
2114     _CheckInstanceBridgesExist(instance)
2115
2116   def Exec(self, feedback_fn):
2117     """Reboot the instance.
2118
2119     """
2120     instance = self.instance
2121     ignore_secondaries = self.op.ignore_secondaries
2122     reboot_type = self.op.reboot_type
2123     extra_args = getattr(self.op, "extra_args", "")
2124
2125     node_current = instance.primary_node
2126
2127     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2128                            constants.INSTANCE_REBOOT_HARD,
2129                            constants.INSTANCE_REBOOT_FULL]:
2130       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2131                                   (constants.INSTANCE_REBOOT_SOFT,
2132                                    constants.INSTANCE_REBOOT_HARD,
2133                                    constants.INSTANCE_REBOOT_FULL))
2134
2135     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2136                        constants.INSTANCE_REBOOT_HARD]:
2137       if not rpc.call_instance_reboot(node_current, instance,
2138                                       reboot_type, extra_args):
2139         raise errors.OpExecError("Could not reboot instance")
2140     else:
2141       if not rpc.call_instance_shutdown(node_current, instance):
2142         raise errors.OpExecError("could not shutdown instance for full reboot")
2143       _ShutdownInstanceDisks(instance, self.cfg)
2144       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2145       if not rpc.call_instance_start(node_current, instance, extra_args):
2146         _ShutdownInstanceDisks(instance, self.cfg)
2147         raise errors.OpExecError("Could not start instance for full reboot")
2148
2149     self.cfg.MarkInstanceUp(instance.name)
2150
2151
2152 class LUShutdownInstance(LogicalUnit):
2153   """Shutdown an instance.
2154
2155   """
2156   HPATH = "instance-stop"
2157   HTYPE = constants.HTYPE_INSTANCE
2158   _OP_REQP = ["instance_name"]
2159   REQ_BGL = False
2160
2161   def ExpandNames(self):
2162     self._ExpandAndLockInstance()
2163     self.needed_locks[locking.LEVEL_NODE] = []
2164     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2165
2166   def DeclareLocks(self, level):
2167     if level == locking.LEVEL_NODE:
2168       self._LockInstancesNodes()
2169
2170   def BuildHooksEnv(self):
2171     """Build hooks env.
2172
2173     This runs on master, primary and secondary nodes of the instance.
2174
2175     """
2176     env = _BuildInstanceHookEnvByObject(self.instance)
2177     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2178           list(self.instance.secondary_nodes))
2179     return env, nl, nl
2180
2181   def CheckPrereq(self):
2182     """Check prerequisites.
2183
2184     This checks that the instance is in the cluster.
2185
2186     """
2187     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2188     assert self.instance is not None, \
2189       "Cannot retrieve locked instance %s" % self.op.instance_name
2190
2191   def Exec(self, feedback_fn):
2192     """Shutdown the instance.
2193
2194     """
2195     instance = self.instance
2196     node_current = instance.primary_node
2197     self.cfg.MarkInstanceDown(instance.name)
2198     if not rpc.call_instance_shutdown(node_current, instance):
2199       logger.Error("could not shutdown instance")
2200
2201     _ShutdownInstanceDisks(instance, self.cfg)
2202
2203
2204 class LUReinstallInstance(LogicalUnit):
2205   """Reinstall an instance.
2206
2207   """
2208   HPATH = "instance-reinstall"
2209   HTYPE = constants.HTYPE_INSTANCE
2210   _OP_REQP = ["instance_name"]
2211   REQ_BGL = False
2212
2213   def ExpandNames(self):
2214     self._ExpandAndLockInstance()
2215     self.needed_locks[locking.LEVEL_NODE] = []
2216     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2217
2218   def DeclareLocks(self, level):
2219     if level == locking.LEVEL_NODE:
2220       self._LockInstancesNodes()
2221
2222   def BuildHooksEnv(self):
2223     """Build hooks env.
2224
2225     This runs on master, primary and secondary nodes of the instance.
2226
2227     """
2228     env = _BuildInstanceHookEnvByObject(self.instance)
2229     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2230           list(self.instance.secondary_nodes))
2231     return env, nl, nl
2232
2233   def CheckPrereq(self):
2234     """Check prerequisites.
2235
2236     This checks that the instance is in the cluster and is not running.
2237
2238     """
2239     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2240     assert instance is not None, \
2241       "Cannot retrieve locked instance %s" % self.op.instance_name
2242
2243     if instance.disk_template == constants.DT_DISKLESS:
2244       raise errors.OpPrereqError("Instance '%s' has no disks" %
2245                                  self.op.instance_name)
2246     if instance.status != "down":
2247       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2248                                  self.op.instance_name)
2249     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2250     if remote_info:
2251       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2252                                  (self.op.instance_name,
2253                                   instance.primary_node))
2254
2255     self.op.os_type = getattr(self.op, "os_type", None)
2256     if self.op.os_type is not None:
2257       # OS verification
2258       pnode = self.cfg.GetNodeInfo(
2259         self.cfg.ExpandNodeName(instance.primary_node))
2260       if pnode is None:
2261         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2262                                    self.op.pnode)
2263       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2264       if not os_obj:
2265         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2266                                    " primary node"  % self.op.os_type)
2267
2268     self.instance = instance
2269
2270   def Exec(self, feedback_fn):
2271     """Reinstall the instance.
2272
2273     """
2274     inst = self.instance
2275
2276     if self.op.os_type is not None:
2277       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2278       inst.os = self.op.os_type
2279       self.cfg.AddInstance(inst)
2280
2281     _StartInstanceDisks(self.cfg, inst, None)
2282     try:
2283       feedback_fn("Running the instance OS create scripts...")
2284       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2285         raise errors.OpExecError("Could not install OS for instance %s"
2286                                  " on node %s" %
2287                                  (inst.name, inst.primary_node))
2288     finally:
2289       _ShutdownInstanceDisks(inst, self.cfg)
2290
2291
2292 class LURenameInstance(LogicalUnit):
2293   """Rename an instance.
2294
2295   """
2296   HPATH = "instance-rename"
2297   HTYPE = constants.HTYPE_INSTANCE
2298   _OP_REQP = ["instance_name", "new_name"]
2299
2300   def BuildHooksEnv(self):
2301     """Build hooks env.
2302
2303     This runs on master, primary and secondary nodes of the instance.
2304
2305     """
2306     env = _BuildInstanceHookEnvByObject(self.instance)
2307     env["INSTANCE_NEW_NAME"] = self.op.new_name
2308     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2309           list(self.instance.secondary_nodes))
2310     return env, nl, nl
2311
2312   def CheckPrereq(self):
2313     """Check prerequisites.
2314
2315     This checks that the instance is in the cluster and is not running.
2316
2317     """
2318     instance = self.cfg.GetInstanceInfo(
2319       self.cfg.ExpandInstanceName(self.op.instance_name))
2320     if instance is None:
2321       raise errors.OpPrereqError("Instance '%s' not known" %
2322                                  self.op.instance_name)
2323     if instance.status != "down":
2324       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2325                                  self.op.instance_name)
2326     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2327     if remote_info:
2328       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2329                                  (self.op.instance_name,
2330                                   instance.primary_node))
2331     self.instance = instance
2332
2333     # new name verification
2334     name_info = utils.HostInfo(self.op.new_name)
2335
2336     self.op.new_name = new_name = name_info.name
2337     instance_list = self.cfg.GetInstanceList()
2338     if new_name in instance_list:
2339       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2340                                  new_name)
2341
2342     if not getattr(self.op, "ignore_ip", False):
2343       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2344         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2345                                    (name_info.ip, new_name))
2346
2347
2348   def Exec(self, feedback_fn):
2349     """Reinstall the instance.
2350
2351     """
2352     inst = self.instance
2353     old_name = inst.name
2354
2355     if inst.disk_template == constants.DT_FILE:
2356       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2357
2358     self.cfg.RenameInstance(inst.name, self.op.new_name)
2359     # Change the instance lock. This is definitely safe while we hold the BGL
2360     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2361     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2362
2363     # re-read the instance from the configuration after rename
2364     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2365
2366     if inst.disk_template == constants.DT_FILE:
2367       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2368       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2369                                                 old_file_storage_dir,
2370                                                 new_file_storage_dir)
2371
2372       if not result:
2373         raise errors.OpExecError("Could not connect to node '%s' to rename"
2374                                  " directory '%s' to '%s' (but the instance"
2375                                  " has been renamed in Ganeti)" % (
2376                                  inst.primary_node, old_file_storage_dir,
2377                                  new_file_storage_dir))
2378
2379       if not result[0]:
2380         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2381                                  " (but the instance has been renamed in"
2382                                  " Ganeti)" % (old_file_storage_dir,
2383                                                new_file_storage_dir))
2384
2385     _StartInstanceDisks(self.cfg, inst, None)
2386     try:
2387       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2388                                           "sda", "sdb"):
2389         msg = ("Could run OS rename script for instance %s on node %s (but the"
2390                " instance has been renamed in Ganeti)" %
2391                (inst.name, inst.primary_node))
2392         logger.Error(msg)
2393     finally:
2394       _ShutdownInstanceDisks(inst, self.cfg)
2395
2396
2397 class LURemoveInstance(LogicalUnit):
2398   """Remove an instance.
2399
2400   """
2401   HPATH = "instance-remove"
2402   HTYPE = constants.HTYPE_INSTANCE
2403   _OP_REQP = ["instance_name", "ignore_failures"]
2404
2405   def BuildHooksEnv(self):
2406     """Build hooks env.
2407
2408     This runs on master, primary and secondary nodes of the instance.
2409
2410     """
2411     env = _BuildInstanceHookEnvByObject(self.instance)
2412     nl = [self.sstore.GetMasterNode()]
2413     return env, nl, nl
2414
2415   def CheckPrereq(self):
2416     """Check prerequisites.
2417
2418     This checks that the instance is in the cluster.
2419
2420     """
2421     instance = self.cfg.GetInstanceInfo(
2422       self.cfg.ExpandInstanceName(self.op.instance_name))
2423     if instance is None:
2424       raise errors.OpPrereqError("Instance '%s' not known" %
2425                                  self.op.instance_name)
2426     self.instance = instance
2427
2428   def Exec(self, feedback_fn):
2429     """Remove the instance.
2430
2431     """
2432     instance = self.instance
2433     logger.Info("shutting down instance %s on node %s" %
2434                 (instance.name, instance.primary_node))
2435
2436     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2437       if self.op.ignore_failures:
2438         feedback_fn("Warning: can't shutdown instance")
2439       else:
2440         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2441                                  (instance.name, instance.primary_node))
2442
2443     logger.Info("removing block devices for instance %s" % instance.name)
2444
2445     if not _RemoveDisks(instance, self.cfg):
2446       if self.op.ignore_failures:
2447         feedback_fn("Warning: can't remove instance's disks")
2448       else:
2449         raise errors.OpExecError("Can't remove instance's disks")
2450
2451     logger.Info("removing instance %s out of cluster config" % instance.name)
2452
2453     self.cfg.RemoveInstance(instance.name)
2454     # Remove the new instance from the Ganeti Lock Manager
2455     self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2456
2457
2458 class LUQueryInstances(NoHooksLU):
2459   """Logical unit for querying instances.
2460
2461   """
2462   _OP_REQP = ["output_fields", "names"]
2463
2464   def CheckPrereq(self):
2465     """Check prerequisites.
2466
2467     This checks that the fields required are valid output fields.
2468
2469     """
2470     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2471     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2472                                "admin_state", "admin_ram",
2473                                "disk_template", "ip", "mac", "bridge",
2474                                "sda_size", "sdb_size", "vcpus", "tags"],
2475                        dynamic=self.dynamic_fields,
2476                        selected=self.op.output_fields)
2477
2478     self.wanted = _GetWantedInstances(self, self.op.names)
2479
2480   def Exec(self, feedback_fn):
2481     """Computes the list of nodes and their attributes.
2482
2483     """
2484     instance_names = self.wanted
2485     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2486                      in instance_names]
2487
2488     # begin data gathering
2489
2490     nodes = frozenset([inst.primary_node for inst in instance_list])
2491
2492     bad_nodes = []
2493     if self.dynamic_fields.intersection(self.op.output_fields):
2494       live_data = {}
2495       node_data = rpc.call_all_instances_info(nodes)
2496       for name in nodes:
2497         result = node_data[name]
2498         if result:
2499           live_data.update(result)
2500         elif result == False:
2501           bad_nodes.append(name)
2502         # else no instance is alive
2503     else:
2504       live_data = dict([(name, {}) for name in instance_names])
2505
2506     # end data gathering
2507
2508     output = []
2509     for instance in instance_list:
2510       iout = []
2511       for field in self.op.output_fields:
2512         if field == "name":
2513           val = instance.name
2514         elif field == "os":
2515           val = instance.os
2516         elif field == "pnode":
2517           val = instance.primary_node
2518         elif field == "snodes":
2519           val = list(instance.secondary_nodes)
2520         elif field == "admin_state":
2521           val = (instance.status != "down")
2522         elif field == "oper_state":
2523           if instance.primary_node in bad_nodes:
2524             val = None
2525           else:
2526             val = bool(live_data.get(instance.name))
2527         elif field == "status":
2528           if instance.primary_node in bad_nodes:
2529             val = "ERROR_nodedown"
2530           else:
2531             running = bool(live_data.get(instance.name))
2532             if running:
2533               if instance.status != "down":
2534                 val = "running"
2535               else:
2536                 val = "ERROR_up"
2537             else:
2538               if instance.status != "down":
2539                 val = "ERROR_down"
2540               else:
2541                 val = "ADMIN_down"
2542         elif field == "admin_ram":
2543           val = instance.memory
2544         elif field == "oper_ram":
2545           if instance.primary_node in bad_nodes:
2546             val = None
2547           elif instance.name in live_data:
2548             val = live_data[instance.name].get("memory", "?")
2549           else:
2550             val = "-"
2551         elif field == "disk_template":
2552           val = instance.disk_template
2553         elif field == "ip":
2554           val = instance.nics[0].ip
2555         elif field == "bridge":
2556           val = instance.nics[0].bridge
2557         elif field == "mac":
2558           val = instance.nics[0].mac
2559         elif field == "sda_size" or field == "sdb_size":
2560           disk = instance.FindDisk(field[:3])
2561           if disk is None:
2562             val = None
2563           else:
2564             val = disk.size
2565         elif field == "vcpus":
2566           val = instance.vcpus
2567         elif field == "tags":
2568           val = list(instance.GetTags())
2569         else:
2570           raise errors.ParameterError(field)
2571         iout.append(val)
2572       output.append(iout)
2573
2574     return output
2575
2576
2577 class LUFailoverInstance(LogicalUnit):
2578   """Failover an instance.
2579
2580   """
2581   HPATH = "instance-failover"
2582   HTYPE = constants.HTYPE_INSTANCE
2583   _OP_REQP = ["instance_name", "ignore_consistency"]
2584
2585   def BuildHooksEnv(self):
2586     """Build hooks env.
2587
2588     This runs on master, primary and secondary nodes of the instance.
2589
2590     """
2591     env = {
2592       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2593       }
2594     env.update(_BuildInstanceHookEnvByObject(self.instance))
2595     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2596     return env, nl, nl
2597
2598   def CheckPrereq(self):
2599     """Check prerequisites.
2600
2601     This checks that the instance is in the cluster.
2602
2603     """
2604     instance = self.cfg.GetInstanceInfo(
2605       self.cfg.ExpandInstanceName(self.op.instance_name))
2606     if instance is None:
2607       raise errors.OpPrereqError("Instance '%s' not known" %
2608                                  self.op.instance_name)
2609
2610     if instance.disk_template not in constants.DTS_NET_MIRROR:
2611       raise errors.OpPrereqError("Instance's disk layout is not"
2612                                  " network mirrored, cannot failover.")
2613
2614     secondary_nodes = instance.secondary_nodes
2615     if not secondary_nodes:
2616       raise errors.ProgrammerError("no secondary node but using "
2617                                    "a mirrored disk template")
2618
2619     target_node = secondary_nodes[0]
2620     # check memory requirements on the secondary node
2621     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2622                          instance.name, instance.memory)
2623
2624     # check bridge existance
2625     brlist = [nic.bridge for nic in instance.nics]
2626     if not rpc.call_bridges_exist(target_node, brlist):
2627       raise errors.OpPrereqError("One or more target bridges %s does not"
2628                                  " exist on destination node '%s'" %
2629                                  (brlist, target_node))
2630
2631     self.instance = instance
2632
2633   def Exec(self, feedback_fn):
2634     """Failover an instance.
2635
2636     The failover is done by shutting it down on its present node and
2637     starting it on the secondary.
2638
2639     """
2640     instance = self.instance
2641
2642     source_node = instance.primary_node
2643     target_node = instance.secondary_nodes[0]
2644
2645     feedback_fn("* checking disk consistency between source and target")
2646     for dev in instance.disks:
2647       # for drbd, these are drbd over lvm
2648       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2649         if instance.status == "up" and not self.op.ignore_consistency:
2650           raise errors.OpExecError("Disk %s is degraded on target node,"
2651                                    " aborting failover." % dev.iv_name)
2652
2653     feedback_fn("* shutting down instance on source node")
2654     logger.Info("Shutting down instance %s on node %s" %
2655                 (instance.name, source_node))
2656
2657     if not rpc.call_instance_shutdown(source_node, instance):
2658       if self.op.ignore_consistency:
2659         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2660                      " anyway. Please make sure node %s is down"  %
2661                      (instance.name, source_node, source_node))
2662       else:
2663         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2664                                  (instance.name, source_node))
2665
2666     feedback_fn("* deactivating the instance's disks on source node")
2667     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2668       raise errors.OpExecError("Can't shut down the instance's disks.")
2669
2670     instance.primary_node = target_node
2671     # distribute new instance config to the other nodes
2672     self.cfg.Update(instance)
2673
2674     # Only start the instance if it's marked as up
2675     if instance.status == "up":
2676       feedback_fn("* activating the instance's disks on target node")
2677       logger.Info("Starting instance %s on node %s" %
2678                   (instance.name, target_node))
2679
2680       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2681                                                ignore_secondaries=True)
2682       if not disks_ok:
2683         _ShutdownInstanceDisks(instance, self.cfg)
2684         raise errors.OpExecError("Can't activate the instance's disks")
2685
2686       feedback_fn("* starting the instance on the target node")
2687       if not rpc.call_instance_start(target_node, instance, None):
2688         _ShutdownInstanceDisks(instance, self.cfg)
2689         raise errors.OpExecError("Could not start instance %s on node %s." %
2690                                  (instance.name, target_node))
2691
2692
2693 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2694   """Create a tree of block devices on the primary node.
2695
2696   This always creates all devices.
2697
2698   """
2699   if device.children:
2700     for child in device.children:
2701       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2702         return False
2703
2704   cfg.SetDiskID(device, node)
2705   new_id = rpc.call_blockdev_create(node, device, device.size,
2706                                     instance.name, True, info)
2707   if not new_id:
2708     return False
2709   if device.physical_id is None:
2710     device.physical_id = new_id
2711   return True
2712
2713
2714 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2715   """Create a tree of block devices on a secondary node.
2716
2717   If this device type has to be created on secondaries, create it and
2718   all its children.
2719
2720   If not, just recurse to children keeping the same 'force' value.
2721
2722   """
2723   if device.CreateOnSecondary():
2724     force = True
2725   if device.children:
2726     for child in device.children:
2727       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2728                                         child, force, info):
2729         return False
2730
2731   if not force:
2732     return True
2733   cfg.SetDiskID(device, node)
2734   new_id = rpc.call_blockdev_create(node, device, device.size,
2735                                     instance.name, False, info)
2736   if not new_id:
2737     return False
2738   if device.physical_id is None:
2739     device.physical_id = new_id
2740   return True
2741
2742
2743 def _GenerateUniqueNames(cfg, exts):
2744   """Generate a suitable LV name.
2745
2746   This will generate a logical volume name for the given instance.
2747
2748   """
2749   results = []
2750   for val in exts:
2751     new_id = cfg.GenerateUniqueID()
2752     results.append("%s%s" % (new_id, val))
2753   return results
2754
2755
2756 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2757   """Generate a drbd8 device complete with its children.
2758
2759   """
2760   port = cfg.AllocatePort()
2761   vgname = cfg.GetVGName()
2762   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2763                           logical_id=(vgname, names[0]))
2764   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2765                           logical_id=(vgname, names[1]))
2766   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2767                           logical_id = (primary, secondary, port),
2768                           children = [dev_data, dev_meta],
2769                           iv_name=iv_name)
2770   return drbd_dev
2771
2772
2773 def _GenerateDiskTemplate(cfg, template_name,
2774                           instance_name, primary_node,
2775                           secondary_nodes, disk_sz, swap_sz,
2776                           file_storage_dir, file_driver):
2777   """Generate the entire disk layout for a given template type.
2778
2779   """
2780   #TODO: compute space requirements
2781
2782   vgname = cfg.GetVGName()
2783   if template_name == constants.DT_DISKLESS:
2784     disks = []
2785   elif template_name == constants.DT_PLAIN:
2786     if len(secondary_nodes) != 0:
2787       raise errors.ProgrammerError("Wrong template configuration")
2788
2789     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2790     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2791                            logical_id=(vgname, names[0]),
2792                            iv_name = "sda")
2793     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2794                            logical_id=(vgname, names[1]),
2795                            iv_name = "sdb")
2796     disks = [sda_dev, sdb_dev]
2797   elif template_name == constants.DT_DRBD8:
2798     if len(secondary_nodes) != 1:
2799       raise errors.ProgrammerError("Wrong template configuration")
2800     remote_node = secondary_nodes[0]
2801     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2802                                        ".sdb_data", ".sdb_meta"])
2803     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2804                                          disk_sz, names[0:2], "sda")
2805     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2806                                          swap_sz, names[2:4], "sdb")
2807     disks = [drbd_sda_dev, drbd_sdb_dev]
2808   elif template_name == constants.DT_FILE:
2809     if len(secondary_nodes) != 0:
2810       raise errors.ProgrammerError("Wrong template configuration")
2811
2812     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2813                                 iv_name="sda", logical_id=(file_driver,
2814                                 "%s/sda" % file_storage_dir))
2815     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2816                                 iv_name="sdb", logical_id=(file_driver,
2817                                 "%s/sdb" % file_storage_dir))
2818     disks = [file_sda_dev, file_sdb_dev]
2819   else:
2820     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2821   return disks
2822
2823
2824 def _GetInstanceInfoText(instance):
2825   """Compute that text that should be added to the disk's metadata.
2826
2827   """
2828   return "originstname+%s" % instance.name
2829
2830
2831 def _CreateDisks(cfg, instance):
2832   """Create all disks for an instance.
2833
2834   This abstracts away some work from AddInstance.
2835
2836   Args:
2837     instance: the instance object
2838
2839   Returns:
2840     True or False showing the success of the creation process
2841
2842   """
2843   info = _GetInstanceInfoText(instance)
2844
2845   if instance.disk_template == constants.DT_FILE:
2846     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2847     result = rpc.call_file_storage_dir_create(instance.primary_node,
2848                                               file_storage_dir)
2849
2850     if not result:
2851       logger.Error("Could not connect to node '%s'" % instance.primary_node)
2852       return False
2853
2854     if not result[0]:
2855       logger.Error("failed to create directory '%s'" % file_storage_dir)
2856       return False
2857
2858   for device in instance.disks:
2859     logger.Info("creating volume %s for instance %s" %
2860                 (device.iv_name, instance.name))
2861     #HARDCODE
2862     for secondary_node in instance.secondary_nodes:
2863       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2864                                         device, False, info):
2865         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2866                      (device.iv_name, device, secondary_node))
2867         return False
2868     #HARDCODE
2869     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2870                                     instance, device, info):
2871       logger.Error("failed to create volume %s on primary!" %
2872                    device.iv_name)
2873       return False
2874
2875   return True
2876
2877
2878 def _RemoveDisks(instance, cfg):
2879   """Remove all disks for an instance.
2880
2881   This abstracts away some work from `AddInstance()` and
2882   `RemoveInstance()`. Note that in case some of the devices couldn't
2883   be removed, the removal will continue with the other ones (compare
2884   with `_CreateDisks()`).
2885
2886   Args:
2887     instance: the instance object
2888
2889   Returns:
2890     True or False showing the success of the removal proces
2891
2892   """
2893   logger.Info("removing block devices for instance %s" % instance.name)
2894
2895   result = True
2896   for device in instance.disks:
2897     for node, disk in device.ComputeNodeTree(instance.primary_node):
2898       cfg.SetDiskID(disk, node)
2899       if not rpc.call_blockdev_remove(node, disk):
2900         logger.Error("could not remove block device %s on node %s,"
2901                      " continuing anyway" %
2902                      (device.iv_name, node))
2903         result = False
2904
2905   if instance.disk_template == constants.DT_FILE:
2906     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2907     if not rpc.call_file_storage_dir_remove(instance.primary_node,
2908                                             file_storage_dir):
2909       logger.Error("could not remove directory '%s'" % file_storage_dir)
2910       result = False
2911
2912   return result
2913
2914
2915 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2916   """Compute disk size requirements in the volume group
2917
2918   This is currently hard-coded for the two-drive layout.
2919
2920   """
2921   # Required free disk space as a function of disk and swap space
2922   req_size_dict = {
2923     constants.DT_DISKLESS: None,
2924     constants.DT_PLAIN: disk_size + swap_size,
2925     # 256 MB are added for drbd metadata, 128MB for each drbd device
2926     constants.DT_DRBD8: disk_size + swap_size + 256,
2927     constants.DT_FILE: None,
2928   }
2929
2930   if disk_template not in req_size_dict:
2931     raise errors.ProgrammerError("Disk template '%s' size requirement"
2932                                  " is unknown" %  disk_template)
2933
2934   return req_size_dict[disk_template]
2935
2936
2937 class LUCreateInstance(LogicalUnit):
2938   """Create an instance.
2939
2940   """
2941   HPATH = "instance-add"
2942   HTYPE = constants.HTYPE_INSTANCE
2943   _OP_REQP = ["instance_name", "mem_size", "disk_size",
2944               "disk_template", "swap_size", "mode", "start", "vcpus",
2945               "wait_for_sync", "ip_check", "mac"]
2946
2947   def _RunAllocator(self):
2948     """Run the allocator based on input opcode.
2949
2950     """
2951     disks = [{"size": self.op.disk_size, "mode": "w"},
2952              {"size": self.op.swap_size, "mode": "w"}]
2953     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2954              "bridge": self.op.bridge}]
2955     ial = IAllocator(self.cfg, self.sstore,
2956                      mode=constants.IALLOCATOR_MODE_ALLOC,
2957                      name=self.op.instance_name,
2958                      disk_template=self.op.disk_template,
2959                      tags=[],
2960                      os=self.op.os_type,
2961                      vcpus=self.op.vcpus,
2962                      mem_size=self.op.mem_size,
2963                      disks=disks,
2964                      nics=nics,
2965                      )
2966
2967     ial.Run(self.op.iallocator)
2968
2969     if not ial.success:
2970       raise errors.OpPrereqError("Can't compute nodes using"
2971                                  " iallocator '%s': %s" % (self.op.iallocator,
2972                                                            ial.info))
2973     if len(ial.nodes) != ial.required_nodes:
2974       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2975                                  " of nodes (%s), required %s" %
2976                                  (len(ial.nodes), ial.required_nodes))
2977     self.op.pnode = ial.nodes[0]
2978     logger.ToStdout("Selected nodes for the instance: %s" %
2979                     (", ".join(ial.nodes),))
2980     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2981                 (self.op.instance_name, self.op.iallocator, ial.nodes))
2982     if ial.required_nodes == 2:
2983       self.op.snode = ial.nodes[1]
2984
2985   def BuildHooksEnv(self):
2986     """Build hooks env.
2987
2988     This runs on master, primary and secondary nodes of the instance.
2989
2990     """
2991     env = {
2992       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2993       "INSTANCE_DISK_SIZE": self.op.disk_size,
2994       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2995       "INSTANCE_ADD_MODE": self.op.mode,
2996       }
2997     if self.op.mode == constants.INSTANCE_IMPORT:
2998       env["INSTANCE_SRC_NODE"] = self.op.src_node
2999       env["INSTANCE_SRC_PATH"] = self.op.src_path
3000       env["INSTANCE_SRC_IMAGE"] = self.src_image
3001
3002     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3003       primary_node=self.op.pnode,
3004       secondary_nodes=self.secondaries,
3005       status=self.instance_status,
3006       os_type=self.op.os_type,
3007       memory=self.op.mem_size,
3008       vcpus=self.op.vcpus,
3009       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3010     ))
3011
3012     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3013           self.secondaries)
3014     return env, nl, nl
3015
3016
3017   def CheckPrereq(self):
3018     """Check prerequisites.
3019
3020     """
3021     # set optional parameters to none if they don't exist
3022     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3023                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3024                  "vnc_bind_address"]:
3025       if not hasattr(self.op, attr):
3026         setattr(self.op, attr, None)
3027
3028     if self.op.mode not in (constants.INSTANCE_CREATE,
3029                             constants.INSTANCE_IMPORT):
3030       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3031                                  self.op.mode)
3032
3033     if (not self.cfg.GetVGName() and
3034         self.op.disk_template not in constants.DTS_NOT_LVM):
3035       raise errors.OpPrereqError("Cluster does not support lvm-based"
3036                                  " instances")
3037
3038     if self.op.mode == constants.INSTANCE_IMPORT:
3039       src_node = getattr(self.op, "src_node", None)
3040       src_path = getattr(self.op, "src_path", None)
3041       if src_node is None or src_path is None:
3042         raise errors.OpPrereqError("Importing an instance requires source"
3043                                    " node and path options")
3044       src_node_full = self.cfg.ExpandNodeName(src_node)
3045       if src_node_full is None:
3046         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3047       self.op.src_node = src_node = src_node_full
3048
3049       if not os.path.isabs(src_path):
3050         raise errors.OpPrereqError("The source path must be absolute")
3051
3052       export_info = rpc.call_export_info(src_node, src_path)
3053
3054       if not export_info:
3055         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3056
3057       if not export_info.has_section(constants.INISECT_EXP):
3058         raise errors.ProgrammerError("Corrupted export config")
3059
3060       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3061       if (int(ei_version) != constants.EXPORT_VERSION):
3062         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3063                                    (ei_version, constants.EXPORT_VERSION))
3064
3065       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3066         raise errors.OpPrereqError("Can't import instance with more than"
3067                                    " one data disk")
3068
3069       # FIXME: are the old os-es, disk sizes, etc. useful?
3070       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3071       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3072                                                          'disk0_dump'))
3073       self.src_image = diskimage
3074     else: # INSTANCE_CREATE
3075       if getattr(self.op, "os_type", None) is None:
3076         raise errors.OpPrereqError("No guest OS specified")
3077
3078     #### instance parameters check
3079
3080     # disk template and mirror node verification
3081     if self.op.disk_template not in constants.DISK_TEMPLATES:
3082       raise errors.OpPrereqError("Invalid disk template name")
3083
3084     # instance name verification
3085     hostname1 = utils.HostInfo(self.op.instance_name)
3086
3087     self.op.instance_name = instance_name = hostname1.name
3088     instance_list = self.cfg.GetInstanceList()
3089     if instance_name in instance_list:
3090       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3091                                  instance_name)
3092
3093     # ip validity checks
3094     ip = getattr(self.op, "ip", None)
3095     if ip is None or ip.lower() == "none":
3096       inst_ip = None
3097     elif ip.lower() == "auto":
3098       inst_ip = hostname1.ip
3099     else:
3100       if not utils.IsValidIP(ip):
3101         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3102                                    " like a valid IP" % ip)
3103       inst_ip = ip
3104     self.inst_ip = self.op.ip = inst_ip
3105
3106     if self.op.start and not self.op.ip_check:
3107       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3108                                  " adding an instance in start mode")
3109
3110     if self.op.ip_check:
3111       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3112         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3113                                    (hostname1.ip, instance_name))
3114
3115     # MAC address verification
3116     if self.op.mac != "auto":
3117       if not utils.IsValidMac(self.op.mac.lower()):
3118         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3119                                    self.op.mac)
3120
3121     # bridge verification
3122     bridge = getattr(self.op, "bridge", None)
3123     if bridge is None:
3124       self.op.bridge = self.cfg.GetDefBridge()
3125     else:
3126       self.op.bridge = bridge
3127
3128     # boot order verification
3129     if self.op.hvm_boot_order is not None:
3130       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3131         raise errors.OpPrereqError("invalid boot order specified,"
3132                                    " must be one or more of [acdn]")
3133     # file storage checks
3134     if (self.op.file_driver and
3135         not self.op.file_driver in constants.FILE_DRIVER):
3136       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3137                                  self.op.file_driver)
3138
3139     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3140       raise errors.OpPrereqError("File storage directory not a relative"
3141                                  " path")
3142     #### allocator run
3143
3144     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3145       raise errors.OpPrereqError("One and only one of iallocator and primary"
3146                                  " node must be given")
3147
3148     if self.op.iallocator is not None:
3149       self._RunAllocator()
3150
3151     #### node related checks
3152
3153     # check primary node
3154     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3155     if pnode is None:
3156       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3157                                  self.op.pnode)
3158     self.op.pnode = pnode.name
3159     self.pnode = pnode
3160     self.secondaries = []
3161
3162     # mirror node verification
3163     if self.op.disk_template in constants.DTS_NET_MIRROR:
3164       if getattr(self.op, "snode", None) is None:
3165         raise errors.OpPrereqError("The networked disk templates need"
3166                                    " a mirror node")
3167
3168       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3169       if snode_name is None:
3170         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3171                                    self.op.snode)
3172       elif snode_name == pnode.name:
3173         raise errors.OpPrereqError("The secondary node cannot be"
3174                                    " the primary node.")
3175       self.secondaries.append(snode_name)
3176
3177     req_size = _ComputeDiskSize(self.op.disk_template,
3178                                 self.op.disk_size, self.op.swap_size)
3179
3180     # Check lv size requirements
3181     if req_size is not None:
3182       nodenames = [pnode.name] + self.secondaries
3183       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3184       for node in nodenames:
3185         info = nodeinfo.get(node, None)
3186         if not info:
3187           raise errors.OpPrereqError("Cannot get current information"
3188                                      " from node '%s'" % node)
3189         vg_free = info.get('vg_free', None)
3190         if not isinstance(vg_free, int):
3191           raise errors.OpPrereqError("Can't compute free disk space on"
3192                                      " node %s" % node)
3193         if req_size > info['vg_free']:
3194           raise errors.OpPrereqError("Not enough disk space on target node %s."
3195                                      " %d MB available, %d MB required" %
3196                                      (node, info['vg_free'], req_size))
3197
3198     # os verification
3199     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3200     if not os_obj:
3201       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3202                                  " primary node"  % self.op.os_type)
3203
3204     if self.op.kernel_path == constants.VALUE_NONE:
3205       raise errors.OpPrereqError("Can't set instance kernel to none")
3206
3207
3208     # bridge check on primary node
3209     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3210       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3211                                  " destination node '%s'" %
3212                                  (self.op.bridge, pnode.name))
3213
3214     # memory check on primary node
3215     if self.op.start:
3216       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3217                            "creating instance %s" % self.op.instance_name,
3218                            self.op.mem_size)
3219
3220     # hvm_cdrom_image_path verification
3221     if self.op.hvm_cdrom_image_path is not None:
3222       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3223         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3224                                    " be an absolute path or None, not %s" %
3225                                    self.op.hvm_cdrom_image_path)
3226       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3227         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3228                                    " regular file or a symlink pointing to"
3229                                    " an existing regular file, not %s" %
3230                                    self.op.hvm_cdrom_image_path)
3231
3232     # vnc_bind_address verification
3233     if self.op.vnc_bind_address is not None:
3234       if not utils.IsValidIP(self.op.vnc_bind_address):
3235         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3236                                    " like a valid IP address" %
3237                                    self.op.vnc_bind_address)
3238
3239     if self.op.start:
3240       self.instance_status = 'up'
3241     else:
3242       self.instance_status = 'down'
3243
3244   def Exec(self, feedback_fn):
3245     """Create and add the instance to the cluster.
3246
3247     """
3248     instance = self.op.instance_name
3249     pnode_name = self.pnode.name
3250
3251     if self.op.mac == "auto":
3252       mac_address = self.cfg.GenerateMAC()
3253     else:
3254       mac_address = self.op.mac
3255
3256     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3257     if self.inst_ip is not None:
3258       nic.ip = self.inst_ip
3259
3260     ht_kind = self.sstore.GetHypervisorType()
3261     if ht_kind in constants.HTS_REQ_PORT:
3262       network_port = self.cfg.AllocatePort()
3263     else:
3264       network_port = None
3265
3266     if self.op.vnc_bind_address is None:
3267       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3268
3269     # this is needed because os.path.join does not accept None arguments
3270     if self.op.file_storage_dir is None:
3271       string_file_storage_dir = ""
3272     else:
3273       string_file_storage_dir = self.op.file_storage_dir
3274
3275     # build the full file storage dir path
3276     file_storage_dir = os.path.normpath(os.path.join(
3277                                         self.sstore.GetFileStorageDir(),
3278                                         string_file_storage_dir, instance))
3279
3280
3281     disks = _GenerateDiskTemplate(self.cfg,
3282                                   self.op.disk_template,
3283                                   instance, pnode_name,
3284                                   self.secondaries, self.op.disk_size,
3285                                   self.op.swap_size,
3286                                   file_storage_dir,
3287                                   self.op.file_driver)
3288
3289     iobj = objects.Instance(name=instance, os=self.op.os_type,
3290                             primary_node=pnode_name,
3291                             memory=self.op.mem_size,
3292                             vcpus=self.op.vcpus,
3293                             nics=[nic], disks=disks,
3294                             disk_template=self.op.disk_template,
3295                             status=self.instance_status,
3296                             network_port=network_port,
3297                             kernel_path=self.op.kernel_path,
3298                             initrd_path=self.op.initrd_path,
3299                             hvm_boot_order=self.op.hvm_boot_order,
3300                             hvm_acpi=self.op.hvm_acpi,
3301                             hvm_pae=self.op.hvm_pae,
3302                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3303                             vnc_bind_address=self.op.vnc_bind_address,
3304                             )
3305
3306     feedback_fn("* creating instance disks...")
3307     if not _CreateDisks(self.cfg, iobj):
3308       _RemoveDisks(iobj, self.cfg)
3309       raise errors.OpExecError("Device creation failed, reverting...")
3310
3311     feedback_fn("adding instance %s to cluster config" % instance)
3312
3313     self.cfg.AddInstance(iobj)
3314     # Add the new instance to the Ganeti Lock Manager
3315     self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3316
3317     if self.op.wait_for_sync:
3318       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3319     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3320       # make sure the disks are not degraded (still sync-ing is ok)
3321       time.sleep(15)
3322       feedback_fn("* checking mirrors status")
3323       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3324     else:
3325       disk_abort = False
3326
3327     if disk_abort:
3328       _RemoveDisks(iobj, self.cfg)
3329       self.cfg.RemoveInstance(iobj.name)
3330       # Remove the new instance from the Ganeti Lock Manager
3331       self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3332       raise errors.OpExecError("There are some degraded disks for"
3333                                " this instance")
3334
3335     feedback_fn("creating os for instance %s on node %s" %
3336                 (instance, pnode_name))
3337
3338     if iobj.disk_template != constants.DT_DISKLESS:
3339       if self.op.mode == constants.INSTANCE_CREATE:
3340         feedback_fn("* running the instance OS create scripts...")
3341         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3342           raise errors.OpExecError("could not add os for instance %s"
3343                                    " on node %s" %
3344                                    (instance, pnode_name))
3345
3346       elif self.op.mode == constants.INSTANCE_IMPORT:
3347         feedback_fn("* running the instance OS import scripts...")
3348         src_node = self.op.src_node
3349         src_image = self.src_image
3350         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3351                                                 src_node, src_image):
3352           raise errors.OpExecError("Could not import os for instance"
3353                                    " %s on node %s" %
3354                                    (instance, pnode_name))
3355       else:
3356         # also checked in the prereq part
3357         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3358                                      % self.op.mode)
3359
3360     if self.op.start:
3361       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3362       feedback_fn("* starting instance...")
3363       if not rpc.call_instance_start(pnode_name, iobj, None):
3364         raise errors.OpExecError("Could not start instance")
3365
3366
3367 class LUConnectConsole(NoHooksLU):
3368   """Connect to an instance's console.
3369
3370   This is somewhat special in that it returns the command line that
3371   you need to run on the master node in order to connect to the
3372   console.
3373
3374   """
3375   _OP_REQP = ["instance_name"]
3376   REQ_BGL = False
3377
3378   def ExpandNames(self):
3379     self._ExpandAndLockInstance()
3380
3381   def CheckPrereq(self):
3382     """Check prerequisites.
3383
3384     This checks that the instance is in the cluster.
3385
3386     """
3387     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3388     assert self.instance is not None, \
3389       "Cannot retrieve locked instance %s" % self.op.instance_name
3390
3391   def Exec(self, feedback_fn):
3392     """Connect to the console of an instance
3393
3394     """
3395     instance = self.instance
3396     node = instance.primary_node
3397
3398     node_insts = rpc.call_instance_list([node])[node]
3399     if node_insts is False:
3400       raise errors.OpExecError("Can't connect to node %s." % node)
3401
3402     if instance.name not in node_insts:
3403       raise errors.OpExecError("Instance %s is not running." % instance.name)
3404
3405     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3406
3407     hyper = hypervisor.GetHypervisor()
3408     console_cmd = hyper.GetShellCommandForConsole(instance)
3409
3410     # build ssh cmdline
3411     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3412
3413
3414 class LUReplaceDisks(LogicalUnit):
3415   """Replace the disks of an instance.
3416
3417   """
3418   HPATH = "mirrors-replace"
3419   HTYPE = constants.HTYPE_INSTANCE
3420   _OP_REQP = ["instance_name", "mode", "disks"]
3421
3422   def _RunAllocator(self):
3423     """Compute a new secondary node using an IAllocator.
3424
3425     """
3426     ial = IAllocator(self.cfg, self.sstore,
3427                      mode=constants.IALLOCATOR_MODE_RELOC,
3428                      name=self.op.instance_name,
3429                      relocate_from=[self.sec_node])
3430
3431     ial.Run(self.op.iallocator)
3432
3433     if not ial.success:
3434       raise errors.OpPrereqError("Can't compute nodes using"
3435                                  " iallocator '%s': %s" % (self.op.iallocator,
3436                                                            ial.info))
3437     if len(ial.nodes) != ial.required_nodes:
3438       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3439                                  " of nodes (%s), required %s" %
3440                                  (len(ial.nodes), ial.required_nodes))
3441     self.op.remote_node = ial.nodes[0]
3442     logger.ToStdout("Selected new secondary for the instance: %s" %
3443                     self.op.remote_node)
3444
3445   def BuildHooksEnv(self):
3446     """Build hooks env.
3447
3448     This runs on the master, the primary and all the secondaries.
3449
3450     """
3451     env = {
3452       "MODE": self.op.mode,
3453       "NEW_SECONDARY": self.op.remote_node,
3454       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3455       }
3456     env.update(_BuildInstanceHookEnvByObject(self.instance))
3457     nl = [
3458       self.sstore.GetMasterNode(),
3459       self.instance.primary_node,
3460       ]
3461     if self.op.remote_node is not None:
3462       nl.append(self.op.remote_node)
3463     return env, nl, nl
3464
3465   def CheckPrereq(self):
3466     """Check prerequisites.
3467
3468     This checks that the instance is in the cluster.
3469
3470     """
3471     if not hasattr(self.op, "remote_node"):
3472       self.op.remote_node = None
3473
3474     instance = self.cfg.GetInstanceInfo(
3475       self.cfg.ExpandInstanceName(self.op.instance_name))
3476     if instance is None:
3477       raise errors.OpPrereqError("Instance '%s' not known" %
3478                                  self.op.instance_name)
3479     self.instance = instance
3480     self.op.instance_name = instance.name
3481
3482     if instance.disk_template not in constants.DTS_NET_MIRROR:
3483       raise errors.OpPrereqError("Instance's disk layout is not"
3484                                  " network mirrored.")
3485
3486     if len(instance.secondary_nodes) != 1:
3487       raise errors.OpPrereqError("The instance has a strange layout,"
3488                                  " expected one secondary but found %d" %
3489                                  len(instance.secondary_nodes))
3490
3491     self.sec_node = instance.secondary_nodes[0]
3492
3493     ia_name = getattr(self.op, "iallocator", None)
3494     if ia_name is not None:
3495       if self.op.remote_node is not None:
3496         raise errors.OpPrereqError("Give either the iallocator or the new"
3497                                    " secondary, not both")
3498       self.op.remote_node = self._RunAllocator()
3499
3500     remote_node = self.op.remote_node
3501     if remote_node is not None:
3502       remote_node = self.cfg.ExpandNodeName(remote_node)
3503       if remote_node is None:
3504         raise errors.OpPrereqError("Node '%s' not known" %
3505                                    self.op.remote_node)
3506       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3507     else:
3508       self.remote_node_info = None
3509     if remote_node == instance.primary_node:
3510       raise errors.OpPrereqError("The specified node is the primary node of"
3511                                  " the instance.")
3512     elif remote_node == self.sec_node:
3513       if self.op.mode == constants.REPLACE_DISK_SEC:
3514         # this is for DRBD8, where we can't execute the same mode of
3515         # replacement as for drbd7 (no different port allocated)
3516         raise errors.OpPrereqError("Same secondary given, cannot execute"
3517                                    " replacement")
3518     if instance.disk_template == constants.DT_DRBD8:
3519       if (self.op.mode == constants.REPLACE_DISK_ALL and
3520           remote_node is not None):
3521         # switch to replace secondary mode
3522         self.op.mode = constants.REPLACE_DISK_SEC
3523
3524       if self.op.mode == constants.REPLACE_DISK_ALL:
3525         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3526                                    " secondary disk replacement, not"
3527                                    " both at once")
3528       elif self.op.mode == constants.REPLACE_DISK_PRI:
3529         if remote_node is not None:
3530           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3531                                      " the secondary while doing a primary"
3532                                      " node disk replacement")
3533         self.tgt_node = instance.primary_node
3534         self.oth_node = instance.secondary_nodes[0]
3535       elif self.op.mode == constants.REPLACE_DISK_SEC:
3536         self.new_node = remote_node # this can be None, in which case
3537                                     # we don't change the secondary
3538         self.tgt_node = instance.secondary_nodes[0]
3539         self.oth_node = instance.primary_node
3540       else:
3541         raise errors.ProgrammerError("Unhandled disk replace mode")
3542
3543     for name in self.op.disks:
3544       if instance.FindDisk(name) is None:
3545         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3546                                    (name, instance.name))
3547     self.op.remote_node = remote_node
3548
3549   def _ExecD8DiskOnly(self, feedback_fn):
3550     """Replace a disk on the primary or secondary for dbrd8.
3551
3552     The algorithm for replace is quite complicated:
3553       - for each disk to be replaced:
3554         - create new LVs on the target node with unique names
3555         - detach old LVs from the drbd device
3556         - rename old LVs to name_replaced.<time_t>
3557         - rename new LVs to old LVs
3558         - attach the new LVs (with the old names now) to the drbd device
3559       - wait for sync across all devices
3560       - for each modified disk:
3561         - remove old LVs (which have the name name_replaces.<time_t>)
3562
3563     Failures are not very well handled.
3564
3565     """
3566     steps_total = 6
3567     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3568     instance = self.instance
3569     iv_names = {}
3570     vgname = self.cfg.GetVGName()
3571     # start of work
3572     cfg = self.cfg
3573     tgt_node = self.tgt_node
3574     oth_node = self.oth_node
3575
3576     # Step: check device activation
3577     self.proc.LogStep(1, steps_total, "check device existence")
3578     info("checking volume groups")
3579     my_vg = cfg.GetVGName()
3580     results = rpc.call_vg_list([oth_node, tgt_node])
3581     if not results:
3582       raise errors.OpExecError("Can't list volume groups on the nodes")
3583     for node in oth_node, tgt_node:
3584       res = results.get(node, False)
3585       if not res or my_vg not in res:
3586         raise errors.OpExecError("Volume group '%s' not found on %s" %
3587                                  (my_vg, node))
3588     for dev in instance.disks:
3589       if not dev.iv_name in self.op.disks:
3590         continue
3591       for node in tgt_node, oth_node:
3592         info("checking %s on %s" % (dev.iv_name, node))
3593         cfg.SetDiskID(dev, node)
3594         if not rpc.call_blockdev_find(node, dev):
3595           raise errors.OpExecError("Can't find device %s on node %s" %
3596                                    (dev.iv_name, node))
3597
3598     # Step: check other node consistency
3599     self.proc.LogStep(2, steps_total, "check peer consistency")
3600     for dev in instance.disks:
3601       if not dev.iv_name in self.op.disks:
3602         continue
3603       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3604       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3605                                    oth_node==instance.primary_node):
3606         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3607                                  " to replace disks on this node (%s)" %
3608                                  (oth_node, tgt_node))
3609
3610     # Step: create new storage
3611     self.proc.LogStep(3, steps_total, "allocate new storage")
3612     for dev in instance.disks:
3613       if not dev.iv_name in self.op.disks:
3614         continue
3615       size = dev.size
3616       cfg.SetDiskID(dev, tgt_node)
3617       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3618       names = _GenerateUniqueNames(cfg, lv_names)
3619       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3620                              logical_id=(vgname, names[0]))
3621       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3622                              logical_id=(vgname, names[1]))
3623       new_lvs = [lv_data, lv_meta]
3624       old_lvs = dev.children
3625       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3626       info("creating new local storage on %s for %s" %
3627            (tgt_node, dev.iv_name))
3628       # since we *always* want to create this LV, we use the
3629       # _Create...OnPrimary (which forces the creation), even if we
3630       # are talking about the secondary node
3631       for new_lv in new_lvs:
3632         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3633                                         _GetInstanceInfoText(instance)):
3634           raise errors.OpExecError("Failed to create new LV named '%s' on"
3635                                    " node '%s'" %
3636                                    (new_lv.logical_id[1], tgt_node))
3637
3638     # Step: for each lv, detach+rename*2+attach
3639     self.proc.LogStep(4, steps_total, "change drbd configuration")
3640     for dev, old_lvs, new_lvs in iv_names.itervalues():
3641       info("detaching %s drbd from local storage" % dev.iv_name)
3642       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3643         raise errors.OpExecError("Can't detach drbd from local storage on node"
3644                                  " %s for device %s" % (tgt_node, dev.iv_name))
3645       #dev.children = []
3646       #cfg.Update(instance)
3647
3648       # ok, we created the new LVs, so now we know we have the needed
3649       # storage; as such, we proceed on the target node to rename
3650       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3651       # using the assumption that logical_id == physical_id (which in
3652       # turn is the unique_id on that node)
3653
3654       # FIXME(iustin): use a better name for the replaced LVs
3655       temp_suffix = int(time.time())
3656       ren_fn = lambda d, suff: (d.physical_id[0],
3657                                 d.physical_id[1] + "_replaced-%s" % suff)
3658       # build the rename list based on what LVs exist on the node
3659       rlist = []
3660       for to_ren in old_lvs:
3661         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3662         if find_res is not None: # device exists
3663           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3664
3665       info("renaming the old LVs on the target node")
3666       if not rpc.call_blockdev_rename(tgt_node, rlist):
3667         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3668       # now we rename the new LVs to the old LVs
3669       info("renaming the new LVs on the target node")
3670       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3671       if not rpc.call_blockdev_rename(tgt_node, rlist):
3672         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3673
3674       for old, new in zip(old_lvs, new_lvs):
3675         new.logical_id = old.logical_id
3676         cfg.SetDiskID(new, tgt_node)
3677
3678       for disk in old_lvs:
3679         disk.logical_id = ren_fn(disk, temp_suffix)
3680         cfg.SetDiskID(disk, tgt_node)
3681
3682       # now that the new lvs have the old name, we can add them to the device
3683       info("adding new mirror component on %s" % tgt_node)
3684       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3685         for new_lv in new_lvs:
3686           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3687             warning("Can't rollback device %s", hint="manually cleanup unused"
3688                     " logical volumes")
3689         raise errors.OpExecError("Can't add local storage to drbd")
3690
3691       dev.children = new_lvs
3692       cfg.Update(instance)
3693
3694     # Step: wait for sync
3695
3696     # this can fail as the old devices are degraded and _WaitForSync
3697     # does a combined result over all disks, so we don't check its
3698     # return value
3699     self.proc.LogStep(5, steps_total, "sync devices")
3700     _WaitForSync(cfg, instance, self.proc, unlock=True)
3701
3702     # so check manually all the devices
3703     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3704       cfg.SetDiskID(dev, instance.primary_node)
3705       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3706       if is_degr:
3707         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3708
3709     # Step: remove old storage
3710     self.proc.LogStep(6, steps_total, "removing old storage")
3711     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3712       info("remove logical volumes for %s" % name)
3713       for lv in old_lvs:
3714         cfg.SetDiskID(lv, tgt_node)
3715         if not rpc.call_blockdev_remove(tgt_node, lv):
3716           warning("Can't remove old LV", hint="manually remove unused LVs")
3717           continue
3718
3719   def _ExecD8Secondary(self, feedback_fn):
3720     """Replace the secondary node for drbd8.
3721
3722     The algorithm for replace is quite complicated:
3723       - for all disks of the instance:
3724         - create new LVs on the new node with same names
3725         - shutdown the drbd device on the old secondary
3726         - disconnect the drbd network on the primary
3727         - create the drbd device on the new secondary
3728         - network attach the drbd on the primary, using an artifice:
3729           the drbd code for Attach() will connect to the network if it
3730           finds a device which is connected to the good local disks but
3731           not network enabled
3732       - wait for sync across all devices
3733       - remove all disks from the old secondary
3734
3735     Failures are not very well handled.
3736
3737     """
3738     steps_total = 6
3739     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3740     instance = self.instance
3741     iv_names = {}
3742     vgname = self.cfg.GetVGName()
3743     # start of work
3744     cfg = self.cfg
3745     old_node = self.tgt_node
3746     new_node = self.new_node
3747     pri_node = instance.primary_node
3748
3749     # Step: check device activation
3750     self.proc.LogStep(1, steps_total, "check device existence")
3751     info("checking volume groups")
3752     my_vg = cfg.GetVGName()
3753     results = rpc.call_vg_list([pri_node, new_node])
3754     if not results:
3755       raise errors.OpExecError("Can't list volume groups on the nodes")
3756     for node in pri_node, new_node:
3757       res = results.get(node, False)
3758       if not res or my_vg not in res:
3759         raise errors.OpExecError("Volume group '%s' not found on %s" %
3760                                  (my_vg, node))
3761     for dev in instance.disks:
3762       if not dev.iv_name in self.op.disks:
3763         continue
3764       info("checking %s on %s" % (dev.iv_name, pri_node))
3765       cfg.SetDiskID(dev, pri_node)
3766       if not rpc.call_blockdev_find(pri_node, dev):
3767         raise errors.OpExecError("Can't find device %s on node %s" %
3768                                  (dev.iv_name, pri_node))
3769
3770     # Step: check other node consistency
3771     self.proc.LogStep(2, steps_total, "check peer consistency")
3772     for dev in instance.disks:
3773       if not dev.iv_name in self.op.disks:
3774         continue
3775       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3776       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3777         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3778                                  " unsafe to replace the secondary" %
3779                                  pri_node)
3780
3781     # Step: create new storage
3782     self.proc.LogStep(3, steps_total, "allocate new storage")
3783     for dev in instance.disks:
3784       size = dev.size
3785       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3786       # since we *always* want to create this LV, we use the
3787       # _Create...OnPrimary (which forces the creation), even if we
3788       # are talking about the secondary node
3789       for new_lv in dev.children:
3790         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3791                                         _GetInstanceInfoText(instance)):
3792           raise errors.OpExecError("Failed to create new LV named '%s' on"
3793                                    " node '%s'" %
3794                                    (new_lv.logical_id[1], new_node))
3795
3796       iv_names[dev.iv_name] = (dev, dev.children)
3797
3798     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3799     for dev in instance.disks:
3800       size = dev.size
3801       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3802       # create new devices on new_node
3803       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3804                               logical_id=(pri_node, new_node,
3805                                           dev.logical_id[2]),
3806                               children=dev.children)
3807       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3808                                         new_drbd, False,
3809                                       _GetInstanceInfoText(instance)):
3810         raise errors.OpExecError("Failed to create new DRBD on"
3811                                  " node '%s'" % new_node)
3812
3813     for dev in instance.disks:
3814       # we have new devices, shutdown the drbd on the old secondary
3815       info("shutting down drbd for %s on old node" % dev.iv_name)
3816       cfg.SetDiskID(dev, old_node)
3817       if not rpc.call_blockdev_shutdown(old_node, dev):
3818         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3819                 hint="Please cleanup this device manually as soon as possible")
3820
3821     info("detaching primary drbds from the network (=> standalone)")
3822     done = 0
3823     for dev in instance.disks:
3824       cfg.SetDiskID(dev, pri_node)
3825       # set the physical (unique in bdev terms) id to None, meaning
3826       # detach from network
3827       dev.physical_id = (None,) * len(dev.physical_id)
3828       # and 'find' the device, which will 'fix' it to match the
3829       # standalone state
3830       if rpc.call_blockdev_find(pri_node, dev):
3831         done += 1
3832       else:
3833         warning("Failed to detach drbd %s from network, unusual case" %
3834                 dev.iv_name)
3835
3836     if not done:
3837       # no detaches succeeded (very unlikely)
3838       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3839
3840     # if we managed to detach at least one, we update all the disks of
3841     # the instance to point to the new secondary
3842     info("updating instance configuration")
3843     for dev in instance.disks:
3844       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3845       cfg.SetDiskID(dev, pri_node)
3846     cfg.Update(instance)
3847
3848     # and now perform the drbd attach
3849     info("attaching primary drbds to new secondary (standalone => connected)")
3850     failures = []
3851     for dev in instance.disks:
3852       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3853       # since the attach is smart, it's enough to 'find' the device,
3854       # it will automatically activate the network, if the physical_id
3855       # is correct
3856       cfg.SetDiskID(dev, pri_node)
3857       if not rpc.call_blockdev_find(pri_node, dev):
3858         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3859                 "please do a gnt-instance info to see the status of disks")
3860
3861     # this can fail as the old devices are degraded and _WaitForSync
3862     # does a combined result over all disks, so we don't check its
3863     # return value
3864     self.proc.LogStep(5, steps_total, "sync devices")
3865     _WaitForSync(cfg, instance, self.proc, unlock=True)
3866
3867     # so check manually all the devices
3868     for name, (dev, old_lvs) in iv_names.iteritems():
3869       cfg.SetDiskID(dev, pri_node)
3870       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3871       if is_degr:
3872         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3873
3874     self.proc.LogStep(6, steps_total, "removing old storage")
3875     for name, (dev, old_lvs) in iv_names.iteritems():
3876       info("remove logical volumes for %s" % name)
3877       for lv in old_lvs:
3878         cfg.SetDiskID(lv, old_node)
3879         if not rpc.call_blockdev_remove(old_node, lv):
3880           warning("Can't remove LV on old secondary",
3881                   hint="Cleanup stale volumes by hand")
3882
3883   def Exec(self, feedback_fn):
3884     """Execute disk replacement.
3885
3886     This dispatches the disk replacement to the appropriate handler.
3887
3888     """
3889     instance = self.instance
3890
3891     # Activate the instance disks if we're replacing them on a down instance
3892     if instance.status == "down":
3893       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3894       self.proc.ChainOpCode(op)
3895
3896     if instance.disk_template == constants.DT_DRBD8:
3897       if self.op.remote_node is None:
3898         fn = self._ExecD8DiskOnly
3899       else:
3900         fn = self._ExecD8Secondary
3901     else:
3902       raise errors.ProgrammerError("Unhandled disk replacement case")
3903
3904     ret = fn(feedback_fn)
3905
3906     # Deactivate the instance disks if we're replacing them on a down instance
3907     if instance.status == "down":
3908       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3909       self.proc.ChainOpCode(op)
3910
3911     return ret
3912
3913
3914 class LUGrowDisk(LogicalUnit):
3915   """Grow a disk of an instance.
3916
3917   """
3918   HPATH = "disk-grow"
3919   HTYPE = constants.HTYPE_INSTANCE
3920   _OP_REQP = ["instance_name", "disk", "amount"]
3921
3922   def BuildHooksEnv(self):
3923     """Build hooks env.
3924
3925     This runs on the master, the primary and all the secondaries.
3926
3927     """
3928     env = {
3929       "DISK": self.op.disk,
3930       "AMOUNT": self.op.amount,
3931       }
3932     env.update(_BuildInstanceHookEnvByObject(self.instance))
3933     nl = [
3934       self.sstore.GetMasterNode(),
3935       self.instance.primary_node,
3936       ]
3937     return env, nl, nl
3938
3939   def CheckPrereq(self):
3940     """Check prerequisites.
3941
3942     This checks that the instance is in the cluster.
3943
3944     """
3945     instance = self.cfg.GetInstanceInfo(
3946       self.cfg.ExpandInstanceName(self.op.instance_name))
3947     if instance is None:
3948       raise errors.OpPrereqError("Instance '%s' not known" %
3949                                  self.op.instance_name)
3950     self.instance = instance
3951     self.op.instance_name = instance.name
3952
3953     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3954       raise errors.OpPrereqError("Instance's disk layout does not support"
3955                                  " growing.")
3956
3957     if instance.FindDisk(self.op.disk) is None:
3958       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3959                                  (self.op.disk, instance.name))
3960
3961     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3962     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3963     for node in nodenames:
3964       info = nodeinfo.get(node, None)
3965       if not info:
3966         raise errors.OpPrereqError("Cannot get current information"
3967                                    " from node '%s'" % node)
3968       vg_free = info.get('vg_free', None)
3969       if not isinstance(vg_free, int):
3970         raise errors.OpPrereqError("Can't compute free disk space on"
3971                                    " node %s" % node)
3972       if self.op.amount > info['vg_free']:
3973         raise errors.OpPrereqError("Not enough disk space on target node %s:"
3974                                    " %d MiB available, %d MiB required" %
3975                                    (node, info['vg_free'], self.op.amount))
3976
3977   def Exec(self, feedback_fn):
3978     """Execute disk grow.
3979
3980     """
3981     instance = self.instance
3982     disk = instance.FindDisk(self.op.disk)
3983     for node in (instance.secondary_nodes + (instance.primary_node,)):
3984       self.cfg.SetDiskID(disk, node)
3985       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3986       if not result or not isinstance(result, tuple) or len(result) != 2:
3987         raise errors.OpExecError("grow request failed to node %s" % node)
3988       elif not result[0]:
3989         raise errors.OpExecError("grow request failed to node %s: %s" %
3990                                  (node, result[1]))
3991     disk.RecordGrow(self.op.amount)
3992     self.cfg.Update(instance)
3993     return
3994
3995
3996 class LUQueryInstanceData(NoHooksLU):
3997   """Query runtime instance data.
3998
3999   """
4000   _OP_REQP = ["instances"]
4001
4002   def CheckPrereq(self):
4003     """Check prerequisites.
4004
4005     This only checks the optional instance list against the existing names.
4006
4007     """
4008     if not isinstance(self.op.instances, list):
4009       raise errors.OpPrereqError("Invalid argument type 'instances'")
4010     if self.op.instances:
4011       self.wanted_instances = []
4012       names = self.op.instances
4013       for name in names:
4014         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4015         if instance is None:
4016           raise errors.OpPrereqError("No such instance name '%s'" % name)
4017         self.wanted_instances.append(instance)
4018     else:
4019       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4020                                in self.cfg.GetInstanceList()]
4021     return
4022
4023
4024   def _ComputeDiskStatus(self, instance, snode, dev):
4025     """Compute block device status.
4026
4027     """
4028     self.cfg.SetDiskID(dev, instance.primary_node)
4029     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4030     if dev.dev_type in constants.LDS_DRBD:
4031       # we change the snode then (otherwise we use the one passed in)
4032       if dev.logical_id[0] == instance.primary_node:
4033         snode = dev.logical_id[1]
4034       else:
4035         snode = dev.logical_id[0]
4036
4037     if snode:
4038       self.cfg.SetDiskID(dev, snode)
4039       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4040     else:
4041       dev_sstatus = None
4042
4043     if dev.children:
4044       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4045                       for child in dev.children]
4046     else:
4047       dev_children = []
4048
4049     data = {
4050       "iv_name": dev.iv_name,
4051       "dev_type": dev.dev_type,
4052       "logical_id": dev.logical_id,
4053       "physical_id": dev.physical_id,
4054       "pstatus": dev_pstatus,
4055       "sstatus": dev_sstatus,
4056       "children": dev_children,
4057       }
4058
4059     return data
4060
4061   def Exec(self, feedback_fn):
4062     """Gather and return data"""
4063     result = {}
4064     for instance in self.wanted_instances:
4065       remote_info = rpc.call_instance_info(instance.primary_node,
4066                                                 instance.name)
4067       if remote_info and "state" in remote_info:
4068         remote_state = "up"
4069       else:
4070         remote_state = "down"
4071       if instance.status == "down":
4072         config_state = "down"
4073       else:
4074         config_state = "up"
4075
4076       disks = [self._ComputeDiskStatus(instance, None, device)
4077                for device in instance.disks]
4078
4079       idict = {
4080         "name": instance.name,
4081         "config_state": config_state,
4082         "run_state": remote_state,
4083         "pnode": instance.primary_node,
4084         "snodes": instance.secondary_nodes,
4085         "os": instance.os,
4086         "memory": instance.memory,
4087         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4088         "disks": disks,
4089         "vcpus": instance.vcpus,
4090         }
4091
4092       htkind = self.sstore.GetHypervisorType()
4093       if htkind == constants.HT_XEN_PVM30:
4094         idict["kernel_path"] = instance.kernel_path
4095         idict["initrd_path"] = instance.initrd_path
4096
4097       if htkind == constants.HT_XEN_HVM31:
4098         idict["hvm_boot_order"] = instance.hvm_boot_order
4099         idict["hvm_acpi"] = instance.hvm_acpi
4100         idict["hvm_pae"] = instance.hvm_pae
4101         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4102
4103       if htkind in constants.HTS_REQ_PORT:
4104         idict["vnc_bind_address"] = instance.vnc_bind_address
4105         idict["network_port"] = instance.network_port
4106
4107       result[instance.name] = idict
4108
4109     return result
4110
4111
4112 class LUSetInstanceParams(LogicalUnit):
4113   """Modifies an instances's parameters.
4114
4115   """
4116   HPATH = "instance-modify"
4117   HTYPE = constants.HTYPE_INSTANCE
4118   _OP_REQP = ["instance_name"]
4119   REQ_BGL = False
4120
4121   def ExpandNames(self):
4122     self._ExpandAndLockInstance()
4123
4124   def BuildHooksEnv(self):
4125     """Build hooks env.
4126
4127     This runs on the master, primary and secondaries.
4128
4129     """
4130     args = dict()
4131     if self.mem:
4132       args['memory'] = self.mem
4133     if self.vcpus:
4134       args['vcpus'] = self.vcpus
4135     if self.do_ip or self.do_bridge or self.mac:
4136       if self.do_ip:
4137         ip = self.ip
4138       else:
4139         ip = self.instance.nics[0].ip
4140       if self.bridge:
4141         bridge = self.bridge
4142       else:
4143         bridge = self.instance.nics[0].bridge
4144       if self.mac:
4145         mac = self.mac
4146       else:
4147         mac = self.instance.nics[0].mac
4148       args['nics'] = [(ip, bridge, mac)]
4149     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4150     nl = [self.sstore.GetMasterNode(),
4151           self.instance.primary_node] + list(self.instance.secondary_nodes)
4152     return env, nl, nl
4153
4154   def CheckPrereq(self):
4155     """Check prerequisites.
4156
4157     This only checks the instance list against the existing names.
4158
4159     """
4160     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4161     # a separate CheckArguments function, if we implement one, so the operation
4162     # can be aborted without waiting for any lock, should it have an error...
4163     self.mem = getattr(self.op, "mem", None)
4164     self.vcpus = getattr(self.op, "vcpus", None)
4165     self.ip = getattr(self.op, "ip", None)
4166     self.mac = getattr(self.op, "mac", None)
4167     self.bridge = getattr(self.op, "bridge", None)
4168     self.kernel_path = getattr(self.op, "kernel_path", None)
4169     self.initrd_path = getattr(self.op, "initrd_path", None)
4170     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4171     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4172     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4173     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4174     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4175     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4176                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4177                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4178                  self.vnc_bind_address]
4179     if all_parms.count(None) == len(all_parms):
4180       raise errors.OpPrereqError("No changes submitted")
4181     if self.mem is not None:
4182       try:
4183         self.mem = int(self.mem)
4184       except ValueError, err:
4185         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4186     if self.vcpus is not None:
4187       try:
4188         self.vcpus = int(self.vcpus)
4189       except ValueError, err:
4190         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4191     if self.ip is not None:
4192       self.do_ip = True
4193       if self.ip.lower() == "none":
4194         self.ip = None
4195       else:
4196         if not utils.IsValidIP(self.ip):
4197           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4198     else:
4199       self.do_ip = False
4200     self.do_bridge = (self.bridge is not None)
4201     if self.mac is not None:
4202       if self.cfg.IsMacInUse(self.mac):
4203         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4204                                    self.mac)
4205       if not utils.IsValidMac(self.mac):
4206         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4207
4208     if self.kernel_path is not None:
4209       self.do_kernel_path = True
4210       if self.kernel_path == constants.VALUE_NONE:
4211         raise errors.OpPrereqError("Can't set instance to no kernel")
4212
4213       if self.kernel_path != constants.VALUE_DEFAULT:
4214         if not os.path.isabs(self.kernel_path):
4215           raise errors.OpPrereqError("The kernel path must be an absolute"
4216                                     " filename")
4217     else:
4218       self.do_kernel_path = False
4219
4220     if self.initrd_path is not None:
4221       self.do_initrd_path = True
4222       if self.initrd_path not in (constants.VALUE_NONE,
4223                                   constants.VALUE_DEFAULT):
4224         if not os.path.isabs(self.initrd_path):
4225           raise errors.OpPrereqError("The initrd path must be an absolute"
4226                                     " filename")
4227     else:
4228       self.do_initrd_path = False
4229
4230     # boot order verification
4231     if self.hvm_boot_order is not None:
4232       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4233         if len(self.hvm_boot_order.strip("acdn")) != 0:
4234           raise errors.OpPrereqError("invalid boot order specified,"
4235                                      " must be one or more of [acdn]"
4236                                      " or 'default'")
4237
4238     # hvm_cdrom_image_path verification
4239     if self.op.hvm_cdrom_image_path is not None:
4240       if not os.path.isabs(self.op.hvm_cdrom_image_path):
4241         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4242                                    " be an absolute path or None, not %s" %
4243                                    self.op.hvm_cdrom_image_path)
4244       if not os.path.isfile(self.op.hvm_cdrom_image_path):
4245         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4246                                    " regular file or a symlink pointing to"
4247                                    " an existing regular file, not %s" %
4248                                    self.op.hvm_cdrom_image_path)
4249
4250     # vnc_bind_address verification
4251     if self.op.vnc_bind_address is not None:
4252       if not utils.IsValidIP(self.op.vnc_bind_address):
4253         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4254                                    " like a valid IP address" %
4255                                    self.op.vnc_bind_address)
4256
4257     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4258     assert self.instance is not None, \
4259       "Cannot retrieve locked instance %s" % self.op.instance_name
4260     return
4261
4262   def Exec(self, feedback_fn):
4263     """Modifies an instance.
4264
4265     All parameters take effect only at the next restart of the instance.
4266     """
4267     result = []
4268     instance = self.instance
4269     if self.mem:
4270       instance.memory = self.mem
4271       result.append(("mem", self.mem))
4272     if self.vcpus:
4273       instance.vcpus = self.vcpus
4274       result.append(("vcpus",  self.vcpus))
4275     if self.do_ip:
4276       instance.nics[0].ip = self.ip
4277       result.append(("ip", self.ip))
4278     if self.bridge:
4279       instance.nics[0].bridge = self.bridge
4280       result.append(("bridge", self.bridge))
4281     if self.mac:
4282       instance.nics[0].mac = self.mac
4283       result.append(("mac", self.mac))
4284     if self.do_kernel_path:
4285       instance.kernel_path = self.kernel_path
4286       result.append(("kernel_path", self.kernel_path))
4287     if self.do_initrd_path:
4288       instance.initrd_path = self.initrd_path
4289       result.append(("initrd_path", self.initrd_path))
4290     if self.hvm_boot_order:
4291       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4292         instance.hvm_boot_order = None
4293       else:
4294         instance.hvm_boot_order = self.hvm_boot_order
4295       result.append(("hvm_boot_order", self.hvm_boot_order))
4296     if self.hvm_acpi:
4297       instance.hvm_acpi = self.hvm_acpi
4298       result.append(("hvm_acpi", self.hvm_acpi))
4299     if self.hvm_pae:
4300       instance.hvm_pae = self.hvm_pae
4301       result.append(("hvm_pae", self.hvm_pae))
4302     if self.hvm_cdrom_image_path:
4303       instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4304       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4305     if self.vnc_bind_address:
4306       instance.vnc_bind_address = self.vnc_bind_address
4307       result.append(("vnc_bind_address", self.vnc_bind_address))
4308
4309     self.cfg.Update(instance)
4310
4311     return result
4312
4313
4314 class LUQueryExports(NoHooksLU):
4315   """Query the exports list
4316
4317   """
4318   _OP_REQP = []
4319
4320   def CheckPrereq(self):
4321     """Check that the nodelist contains only existing nodes.
4322
4323     """
4324     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4325
4326   def Exec(self, feedback_fn):
4327     """Compute the list of all the exported system images.
4328
4329     Returns:
4330       a dictionary with the structure node->(export-list)
4331       where export-list is a list of the instances exported on
4332       that node.
4333
4334     """
4335     return rpc.call_export_list(self.nodes)
4336
4337
4338 class LUExportInstance(LogicalUnit):
4339   """Export an instance to an image in the cluster.
4340
4341   """
4342   HPATH = "instance-export"
4343   HTYPE = constants.HTYPE_INSTANCE
4344   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4345
4346   def BuildHooksEnv(self):
4347     """Build hooks env.
4348
4349     This will run on the master, primary node and target node.
4350
4351     """
4352     env = {
4353       "EXPORT_NODE": self.op.target_node,
4354       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4355       }
4356     env.update(_BuildInstanceHookEnvByObject(self.instance))
4357     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4358           self.op.target_node]
4359     return env, nl, nl
4360
4361   def CheckPrereq(self):
4362     """Check prerequisites.
4363
4364     This checks that the instance and node names are valid.
4365
4366     """
4367     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4368     self.instance = self.cfg.GetInstanceInfo(instance_name)
4369     if self.instance is None:
4370       raise errors.OpPrereqError("Instance '%s' not found" %
4371                                  self.op.instance_name)
4372
4373     # node verification
4374     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4375     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4376
4377     if self.dst_node is None:
4378       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4379                                  self.op.target_node)
4380     self.op.target_node = self.dst_node.name
4381
4382     # instance disk type verification
4383     for disk in self.instance.disks:
4384       if disk.dev_type == constants.LD_FILE:
4385         raise errors.OpPrereqError("Export not supported for instances with"
4386                                    " file-based disks")
4387
4388   def Exec(self, feedback_fn):
4389     """Export an instance to an image in the cluster.
4390
4391     """
4392     instance = self.instance
4393     dst_node = self.dst_node
4394     src_node = instance.primary_node
4395     if self.op.shutdown:
4396       # shutdown the instance, but not the disks
4397       if not rpc.call_instance_shutdown(src_node, instance):
4398          raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4399                                   (instance.name, src_node))
4400
4401     vgname = self.cfg.GetVGName()
4402
4403     snap_disks = []
4404
4405     try:
4406       for disk in instance.disks:
4407         if disk.iv_name == "sda":
4408           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4409           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4410
4411           if not new_dev_name:
4412             logger.Error("could not snapshot block device %s on node %s" %
4413                          (disk.logical_id[1], src_node))
4414           else:
4415             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4416                                       logical_id=(vgname, new_dev_name),
4417                                       physical_id=(vgname, new_dev_name),
4418                                       iv_name=disk.iv_name)
4419             snap_disks.append(new_dev)
4420
4421     finally:
4422       if self.op.shutdown and instance.status == "up":
4423         if not rpc.call_instance_start(src_node, instance, None):
4424           _ShutdownInstanceDisks(instance, self.cfg)
4425           raise errors.OpExecError("Could not start instance")
4426
4427     # TODO: check for size
4428
4429     for dev in snap_disks:
4430       if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4431         logger.Error("could not export block device %s from node %s to node %s"
4432                      % (dev.logical_id[1], src_node, dst_node.name))
4433       if not rpc.call_blockdev_remove(src_node, dev):
4434         logger.Error("could not remove snapshot block device %s from node %s" %
4435                      (dev.logical_id[1], src_node))
4436
4437     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4438       logger.Error("could not finalize export for instance %s on node %s" %
4439                    (instance.name, dst_node.name))
4440
4441     nodelist = self.cfg.GetNodeList()
4442     nodelist.remove(dst_node.name)
4443
4444     # on one-node clusters nodelist will be empty after the removal
4445     # if we proceed the backup would be removed because OpQueryExports
4446     # substitutes an empty list with the full cluster node list.
4447     if nodelist:
4448       op = opcodes.OpQueryExports(nodes=nodelist)
4449       exportlist = self.proc.ChainOpCode(op)
4450       for node in exportlist:
4451         if instance.name in exportlist[node]:
4452           if not rpc.call_export_remove(node, instance.name):
4453             logger.Error("could not remove older export for instance %s"
4454                          " on node %s" % (instance.name, node))
4455
4456
4457 class LURemoveExport(NoHooksLU):
4458   """Remove exports related to the named instance.
4459
4460   """
4461   _OP_REQP = ["instance_name"]
4462
4463   def CheckPrereq(self):
4464     """Check prerequisites.
4465     """
4466     pass
4467
4468   def Exec(self, feedback_fn):
4469     """Remove any export.
4470
4471     """
4472     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4473     # If the instance was not found we'll try with the name that was passed in.
4474     # This will only work if it was an FQDN, though.
4475     fqdn_warn = False
4476     if not instance_name:
4477       fqdn_warn = True
4478       instance_name = self.op.instance_name
4479
4480     op = opcodes.OpQueryExports(nodes=[])
4481     exportlist = self.proc.ChainOpCode(op)
4482     found = False
4483     for node in exportlist:
4484       if instance_name in exportlist[node]:
4485         found = True
4486         if not rpc.call_export_remove(node, instance_name):
4487           logger.Error("could not remove export for instance %s"
4488                        " on node %s" % (instance_name, node))
4489
4490     if fqdn_warn and not found:
4491       feedback_fn("Export not found. If trying to remove an export belonging"
4492                   " to a deleted instance please use its Fully Qualified"
4493                   " Domain Name.")
4494
4495
4496 class TagsLU(NoHooksLU):
4497   """Generic tags LU.
4498
4499   This is an abstract class which is the parent of all the other tags LUs.
4500
4501   """
4502   def CheckPrereq(self):
4503     """Check prerequisites.
4504
4505     """
4506     if self.op.kind == constants.TAG_CLUSTER:
4507       self.target = self.cfg.GetClusterInfo()
4508     elif self.op.kind == constants.TAG_NODE:
4509       name = self.cfg.ExpandNodeName(self.op.name)
4510       if name is None:
4511         raise errors.OpPrereqError("Invalid node name (%s)" %
4512                                    (self.op.name,))
4513       self.op.name = name
4514       self.target = self.cfg.GetNodeInfo(name)
4515     elif self.op.kind == constants.TAG_INSTANCE:
4516       name = self.cfg.ExpandInstanceName(self.op.name)
4517       if name is None:
4518         raise errors.OpPrereqError("Invalid instance name (%s)" %
4519                                    (self.op.name,))
4520       self.op.name = name
4521       self.target = self.cfg.GetInstanceInfo(name)
4522     else:
4523       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4524                                  str(self.op.kind))
4525
4526
4527 class LUGetTags(TagsLU):
4528   """Returns the tags of a given object.
4529
4530   """
4531   _OP_REQP = ["kind", "name"]
4532
4533   def Exec(self, feedback_fn):
4534     """Returns the tag list.
4535
4536     """
4537     return list(self.target.GetTags())
4538
4539
4540 class LUSearchTags(NoHooksLU):
4541   """Searches the tags for a given pattern.
4542
4543   """
4544   _OP_REQP = ["pattern"]
4545
4546   def CheckPrereq(self):
4547     """Check prerequisites.
4548
4549     This checks the pattern passed for validity by compiling it.
4550
4551     """
4552     try:
4553       self.re = re.compile(self.op.pattern)
4554     except re.error, err:
4555       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4556                                  (self.op.pattern, err))
4557
4558   def Exec(self, feedback_fn):
4559     """Returns the tag list.
4560
4561     """
4562     cfg = self.cfg
4563     tgts = [("/cluster", cfg.GetClusterInfo())]
4564     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4565     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4566     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4567     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4568     results = []
4569     for path, target in tgts:
4570       for tag in target.GetTags():
4571         if self.re.search(tag):
4572           results.append((path, tag))
4573     return results
4574
4575
4576 class LUAddTags(TagsLU):
4577   """Sets a tag on a given object.
4578
4579   """
4580   _OP_REQP = ["kind", "name", "tags"]
4581
4582   def CheckPrereq(self):
4583     """Check prerequisites.
4584
4585     This checks the type and length of the tag name and value.
4586
4587     """
4588     TagsLU.CheckPrereq(self)
4589     for tag in self.op.tags:
4590       objects.TaggableObject.ValidateTag(tag)
4591
4592   def Exec(self, feedback_fn):
4593     """Sets the tag.
4594
4595     """
4596     try:
4597       for tag in self.op.tags:
4598         self.target.AddTag(tag)
4599     except errors.TagError, err:
4600       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4601     try:
4602       self.cfg.Update(self.target)
4603     except errors.ConfigurationError:
4604       raise errors.OpRetryError("There has been a modification to the"
4605                                 " config file and the operation has been"
4606                                 " aborted. Please retry.")
4607
4608
4609 class LUDelTags(TagsLU):
4610   """Delete a list of tags from a given object.
4611
4612   """
4613   _OP_REQP = ["kind", "name", "tags"]
4614
4615   def CheckPrereq(self):
4616     """Check prerequisites.
4617
4618     This checks that we have the given tag.
4619
4620     """
4621     TagsLU.CheckPrereq(self)
4622     for tag in self.op.tags:
4623       objects.TaggableObject.ValidateTag(tag)
4624     del_tags = frozenset(self.op.tags)
4625     cur_tags = self.target.GetTags()
4626     if not del_tags <= cur_tags:
4627       diff_tags = del_tags - cur_tags
4628       diff_names = ["'%s'" % tag for tag in diff_tags]
4629       diff_names.sort()
4630       raise errors.OpPrereqError("Tag(s) %s not found" %
4631                                  (",".join(diff_names)))
4632
4633   def Exec(self, feedback_fn):
4634     """Remove the tag from the object.
4635
4636     """
4637     for tag in self.op.tags:
4638       self.target.RemoveTag(tag)
4639     try:
4640       self.cfg.Update(self.target)
4641     except errors.ConfigurationError:
4642       raise errors.OpRetryError("There has been a modification to the"
4643                                 " config file and the operation has been"
4644                                 " aborted. Please retry.")
4645
4646
4647 class LUTestDelay(NoHooksLU):
4648   """Sleep for a specified amount of time.
4649
4650   This LU sleeps on the master and/or nodes for a specified amount of
4651   time.
4652
4653   """
4654   _OP_REQP = ["duration", "on_master", "on_nodes"]
4655   REQ_BGL = False
4656
4657   def ExpandNames(self):
4658     """Expand names and set required locks.
4659
4660     This expands the node list, if any.
4661
4662     """
4663     self.needed_locks = {}
4664     if self.op.on_nodes:
4665       # _GetWantedNodes can be used here, but is not always appropriate to use
4666       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4667       # more information.
4668       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4669       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4670
4671   def CheckPrereq(self):
4672     """Check prerequisites.
4673
4674     """
4675
4676   def Exec(self, feedback_fn):
4677     """Do the actual sleep.
4678
4679     """
4680     if self.op.on_master:
4681       if not utils.TestDelay(self.op.duration):
4682         raise errors.OpExecError("Error during master delay test")
4683     if self.op.on_nodes:
4684       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4685       if not result:
4686         raise errors.OpExecError("Complete failure from rpc call")
4687       for node, node_result in result.items():
4688         if not node_result:
4689           raise errors.OpExecError("Failure during rpc call to node %s,"
4690                                    " result: %s" % (node, node_result))
4691
4692
4693 class IAllocator(object):
4694   """IAllocator framework.
4695
4696   An IAllocator instance has three sets of attributes:
4697     - cfg/sstore that are needed to query the cluster
4698     - input data (all members of the _KEYS class attribute are required)
4699     - four buffer attributes (in|out_data|text), that represent the
4700       input (to the external script) in text and data structure format,
4701       and the output from it, again in two formats
4702     - the result variables from the script (success, info, nodes) for
4703       easy usage
4704
4705   """
4706   _ALLO_KEYS = [
4707     "mem_size", "disks", "disk_template",
4708     "os", "tags", "nics", "vcpus",
4709     ]
4710   _RELO_KEYS = [
4711     "relocate_from",
4712     ]
4713
4714   def __init__(self, cfg, sstore, mode, name, **kwargs):
4715     self.cfg = cfg
4716     self.sstore = sstore
4717     # init buffer variables
4718     self.in_text = self.out_text = self.in_data = self.out_data = None
4719     # init all input fields so that pylint is happy
4720     self.mode = mode
4721     self.name = name
4722     self.mem_size = self.disks = self.disk_template = None
4723     self.os = self.tags = self.nics = self.vcpus = None
4724     self.relocate_from = None
4725     # computed fields
4726     self.required_nodes = None
4727     # init result fields
4728     self.success = self.info = self.nodes = None
4729     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4730       keyset = self._ALLO_KEYS
4731     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4732       keyset = self._RELO_KEYS
4733     else:
4734       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4735                                    " IAllocator" % self.mode)
4736     for key in kwargs:
4737       if key not in keyset:
4738         raise errors.ProgrammerError("Invalid input parameter '%s' to"
4739                                      " IAllocator" % key)
4740       setattr(self, key, kwargs[key])
4741     for key in keyset:
4742       if key not in kwargs:
4743         raise errors.ProgrammerError("Missing input parameter '%s' to"
4744                                      " IAllocator" % key)
4745     self._BuildInputData()
4746
4747   def _ComputeClusterData(self):
4748     """Compute the generic allocator input data.
4749
4750     This is the data that is independent of the actual operation.
4751
4752     """
4753     cfg = self.cfg
4754     # cluster data
4755     data = {
4756       "version": 1,
4757       "cluster_name": self.sstore.GetClusterName(),
4758       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4759       "hypervisor_type": self.sstore.GetHypervisorType(),
4760       # we don't have job IDs
4761       }
4762
4763     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4764
4765     # node data
4766     node_results = {}
4767     node_list = cfg.GetNodeList()
4768     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4769     for nname in node_list:
4770       ninfo = cfg.GetNodeInfo(nname)
4771       if nname not in node_data or not isinstance(node_data[nname], dict):
4772         raise errors.OpExecError("Can't get data for node %s" % nname)
4773       remote_info = node_data[nname]
4774       for attr in ['memory_total', 'memory_free', 'memory_dom0',
4775                    'vg_size', 'vg_free', 'cpu_total']:
4776         if attr not in remote_info:
4777           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4778                                    (nname, attr))
4779         try:
4780           remote_info[attr] = int(remote_info[attr])
4781         except ValueError, err:
4782           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4783                                    " %s" % (nname, attr, str(err)))
4784       # compute memory used by primary instances
4785       i_p_mem = i_p_up_mem = 0
4786       for iinfo in i_list:
4787         if iinfo.primary_node == nname:
4788           i_p_mem += iinfo.memory
4789           if iinfo.status == "up":
4790             i_p_up_mem += iinfo.memory
4791
4792       # compute memory used by instances
4793       pnr = {
4794         "tags": list(ninfo.GetTags()),
4795         "total_memory": remote_info['memory_total'],
4796         "reserved_memory": remote_info['memory_dom0'],
4797         "free_memory": remote_info['memory_free'],
4798         "i_pri_memory": i_p_mem,
4799         "i_pri_up_memory": i_p_up_mem,
4800         "total_disk": remote_info['vg_size'],
4801         "free_disk": remote_info['vg_free'],
4802         "primary_ip": ninfo.primary_ip,
4803         "secondary_ip": ninfo.secondary_ip,
4804         "total_cpus": remote_info['cpu_total'],
4805         }
4806       node_results[nname] = pnr
4807     data["nodes"] = node_results
4808
4809     # instance data
4810     instance_data = {}
4811     for iinfo in i_list:
4812       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4813                   for n in iinfo.nics]
4814       pir = {
4815         "tags": list(iinfo.GetTags()),
4816         "should_run": iinfo.status == "up",
4817         "vcpus": iinfo.vcpus,
4818         "memory": iinfo.memory,
4819         "os": iinfo.os,
4820         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4821         "nics": nic_data,
4822         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4823         "disk_template": iinfo.disk_template,
4824         }
4825       instance_data[iinfo.name] = pir
4826
4827     data["instances"] = instance_data
4828
4829     self.in_data = data
4830
4831   def _AddNewInstance(self):
4832     """Add new instance data to allocator structure.
4833
4834     This in combination with _AllocatorGetClusterData will create the
4835     correct structure needed as input for the allocator.
4836
4837     The checks for the completeness of the opcode must have already been
4838     done.
4839
4840     """
4841     data = self.in_data
4842     if len(self.disks) != 2:
4843       raise errors.OpExecError("Only two-disk configurations supported")
4844
4845     disk_space = _ComputeDiskSize(self.disk_template,
4846                                   self.disks[0]["size"], self.disks[1]["size"])
4847
4848     if self.disk_template in constants.DTS_NET_MIRROR:
4849       self.required_nodes = 2
4850     else:
4851       self.required_nodes = 1
4852     request = {
4853       "type": "allocate",
4854       "name": self.name,
4855       "disk_template": self.disk_template,
4856       "tags": self.tags,
4857       "os": self.os,
4858       "vcpus": self.vcpus,
4859       "memory": self.mem_size,
4860       "disks": self.disks,
4861       "disk_space_total": disk_space,
4862       "nics": self.nics,
4863       "required_nodes": self.required_nodes,
4864       }
4865     data["request"] = request
4866
4867   def _AddRelocateInstance(self):
4868     """Add relocate instance data to allocator structure.
4869
4870     This in combination with _IAllocatorGetClusterData will create the
4871     correct structure needed as input for the allocator.
4872
4873     The checks for the completeness of the opcode must have already been
4874     done.
4875
4876     """
4877     instance = self.cfg.GetInstanceInfo(self.name)
4878     if instance is None:
4879       raise errors.ProgrammerError("Unknown instance '%s' passed to"
4880                                    " IAllocator" % self.name)
4881
4882     if instance.disk_template not in constants.DTS_NET_MIRROR:
4883       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4884
4885     if len(instance.secondary_nodes) != 1:
4886       raise errors.OpPrereqError("Instance has not exactly one secondary node")
4887
4888     self.required_nodes = 1
4889
4890     disk_space = _ComputeDiskSize(instance.disk_template,
4891                                   instance.disks[0].size,
4892                                   instance.disks[1].size)
4893
4894     request = {
4895       "type": "relocate",
4896       "name": self.name,
4897       "disk_space_total": disk_space,
4898       "required_nodes": self.required_nodes,
4899       "relocate_from": self.relocate_from,
4900       }
4901     self.in_data["request"] = request
4902
4903   def _BuildInputData(self):
4904     """Build input data structures.
4905
4906     """
4907     self._ComputeClusterData()
4908
4909     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4910       self._AddNewInstance()
4911     else:
4912       self._AddRelocateInstance()
4913
4914     self.in_text = serializer.Dump(self.in_data)
4915
4916   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4917     """Run an instance allocator and return the results.
4918
4919     """
4920     data = self.in_text
4921
4922     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4923
4924     if not isinstance(result, tuple) or len(result) != 4:
4925       raise errors.OpExecError("Invalid result from master iallocator runner")
4926
4927     rcode, stdout, stderr, fail = result
4928
4929     if rcode == constants.IARUN_NOTFOUND:
4930       raise errors.OpExecError("Can't find allocator '%s'" % name)
4931     elif rcode == constants.IARUN_FAILURE:
4932         raise errors.OpExecError("Instance allocator call failed: %s,"
4933                                  " output: %s" %
4934                                  (fail, stdout+stderr))
4935     self.out_text = stdout
4936     if validate:
4937       self._ValidateResult()
4938
4939   def _ValidateResult(self):
4940     """Process the allocator results.
4941
4942     This will process and if successful save the result in
4943     self.out_data and the other parameters.
4944
4945     """
4946     try:
4947       rdict = serializer.Load(self.out_text)
4948     except Exception, err:
4949       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4950
4951     if not isinstance(rdict, dict):
4952       raise errors.OpExecError("Can't parse iallocator results: not a dict")
4953
4954     for key in "success", "info", "nodes":
4955       if key not in rdict:
4956         raise errors.OpExecError("Can't parse iallocator results:"
4957                                  " missing key '%s'" % key)
4958       setattr(self, key, rdict[key])
4959
4960     if not isinstance(rdict["nodes"], list):
4961       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4962                                " is not a list")
4963     self.out_data = rdict
4964
4965
4966 class LUTestAllocator(NoHooksLU):
4967   """Run allocator tests.
4968
4969   This LU runs the allocator tests
4970
4971   """
4972   _OP_REQP = ["direction", "mode", "name"]
4973
4974   def CheckPrereq(self):
4975     """Check prerequisites.
4976
4977     This checks the opcode parameters depending on the director and mode test.
4978
4979     """
4980     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4981       for attr in ["name", "mem_size", "disks", "disk_template",
4982                    "os", "tags", "nics", "vcpus"]:
4983         if not hasattr(self.op, attr):
4984           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4985                                      attr)
4986       iname = self.cfg.ExpandInstanceName(self.op.name)
4987       if iname is not None:
4988         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4989                                    iname)
4990       if not isinstance(self.op.nics, list):
4991         raise errors.OpPrereqError("Invalid parameter 'nics'")
4992       for row in self.op.nics:
4993         if (not isinstance(row, dict) or
4994             "mac" not in row or
4995             "ip" not in row or
4996             "bridge" not in row):
4997           raise errors.OpPrereqError("Invalid contents of the"
4998                                      " 'nics' parameter")
4999       if not isinstance(self.op.disks, list):
5000         raise errors.OpPrereqError("Invalid parameter 'disks'")
5001       if len(self.op.disks) != 2:
5002         raise errors.OpPrereqError("Only two-disk configurations supported")
5003       for row in self.op.disks:
5004         if (not isinstance(row, dict) or
5005             "size" not in row or
5006             not isinstance(row["size"], int) or
5007             "mode" not in row or
5008             row["mode"] not in ['r', 'w']):
5009           raise errors.OpPrereqError("Invalid contents of the"
5010                                      " 'disks' parameter")
5011     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5012       if not hasattr(self.op, "name"):
5013         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5014       fname = self.cfg.ExpandInstanceName(self.op.name)
5015       if fname is None:
5016         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5017                                    self.op.name)
5018       self.op.name = fname
5019       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5020     else:
5021       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5022                                  self.op.mode)
5023
5024     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5025       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5026         raise errors.OpPrereqError("Missing allocator name")
5027     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5028       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5029                                  self.op.direction)
5030
5031   def Exec(self, feedback_fn):
5032     """Run the allocator test.
5033
5034     """
5035     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5036       ial = IAllocator(self.cfg, self.sstore,
5037                        mode=self.op.mode,
5038                        name=self.op.name,
5039                        mem_size=self.op.mem_size,
5040                        disks=self.op.disks,
5041                        disk_template=self.op.disk_template,
5042                        os=self.op.os,
5043                        tags=self.op.tags,
5044                        nics=self.op.nics,
5045                        vcpus=self.op.vcpus,
5046                        )
5047     else:
5048       ial = IAllocator(self.cfg, self.sstore,
5049                        mode=self.op.mode,
5050                        name=self.op.name,
5051                        relocate_from=list(self.relocate_from),
5052                        )
5053
5054     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5055       result = ial.in_text
5056     else:
5057       ial.Run(self.op.allocator, validate=False)
5058       result = ial.out_text
5059     return result