Use Update in SetInstanceParams
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46 from ganeti import serializer
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_MASTER: the LU needs to run on the master node
60         REQ_WSSTORE: the LU needs a writable SimpleStore
61         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
62
63   Note that all commands require root permissions.
64
65   """
66   HPATH = None
67   HTYPE = None
68   _OP_REQP = []
69   REQ_MASTER = True
70   REQ_WSSTORE = False
71   REQ_BGL = True
72
73   def __init__(self, processor, op, context, sstore):
74     """Constructor for LogicalUnit.
75
76     This needs to be overriden in derived classes in order to check op
77     validity.
78
79     """
80     self.proc = processor
81     self.op = op
82     self.cfg = context.cfg
83     self.sstore = sstore
84     self.context = context
85     self.needed_locks = None
86     self.__ssh = None
87
88     for attr_name in self._OP_REQP:
89       attr_val = getattr(op, attr_name, None)
90       if attr_val is None:
91         raise errors.OpPrereqError("Required parameter '%s' missing" %
92                                    attr_name)
93
94     if not self.cfg.IsCluster():
95       raise errors.OpPrereqError("Cluster not initialized yet,"
96                                  " use 'gnt-cluster init' first.")
97     if self.REQ_MASTER:
98       master = sstore.GetMasterNode()
99       if master != utils.HostInfo().name:
100         raise errors.OpPrereqError("Commands must be run on the master"
101                                    " node %s" % master)
102
103   def __GetSSH(self):
104     """Returns the SshRunner object
105
106     """
107     if not self.__ssh:
108       self.__ssh = ssh.SshRunner(self.sstore)
109     return self.__ssh
110
111   ssh = property(fget=__GetSSH)
112
113   def ExpandNames(self):
114     """Expand names for this LU.
115
116     This method is called before starting to execute the opcode, and it should
117     update all the parameters of the opcode to their canonical form (e.g. a
118     short node name must be fully expanded after this method has successfully
119     completed). This way locking, hooks, logging, ecc. can work correctly.
120
121     LUs which implement this method must also populate the self.needed_locks
122     member, as a dict with lock levels as keys, and a list of needed lock names
123     as values. Rules:
124       - Use an empty dict if you don't need any lock
125       - If you don't need any lock at a particular level omit that level
126       - Don't put anything for the BGL level
127       - If you want all locks at a level use None as a value
128         (this reflects what LockSet does, and will be replaced before
129         CheckPrereq with the full list of nodes that have been locked)
130
131     Examples:
132     # Acquire all nodes and one instance
133     self.needed_locks = {
134       locking.LEVEL_NODE: None,
135       locking.LEVEL_INSTANCES: ['instance1.example.tld'],
136     }
137     # Acquire just two nodes
138     self.needed_locks = {
139       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
140     }
141     # Acquire no locks
142     self.needed_locks = {} # No, you can't leave it to the default value None
143
144     """
145     # The implementation of this method is mandatory only if the new LU is
146     # concurrent, so that old LUs don't need to be changed all at the same
147     # time.
148     if self.REQ_BGL:
149       self.needed_locks = {} # Exclusive LUs don't need locks.
150     else:
151       raise NotImplementedError
152
153   def CheckPrereq(self):
154     """Check prerequisites for this LU.
155
156     This method should check that the prerequisites for the execution
157     of this LU are fulfilled. It can do internode communication, but
158     it should be idempotent - no cluster or system changes are
159     allowed.
160
161     The method should raise errors.OpPrereqError in case something is
162     not fulfilled. Its return value is ignored.
163
164     This method should also update all the parameters of the opcode to
165     their canonical form if it hasn't been done by ExpandNames before.
166
167     """
168     raise NotImplementedError
169
170   def Exec(self, feedback_fn):
171     """Execute the LU.
172
173     This method should implement the actual work. It should raise
174     errors.OpExecError for failures that are somewhat dealt with in
175     code, or expected.
176
177     """
178     raise NotImplementedError
179
180   def BuildHooksEnv(self):
181     """Build hooks environment for this LU.
182
183     This method should return a three-node tuple consisting of: a dict
184     containing the environment that will be used for running the
185     specific hook for this LU, a list of node names on which the hook
186     should run before the execution, and a list of node names on which
187     the hook should run after the execution.
188
189     The keys of the dict must not have 'GANETI_' prefixed as this will
190     be handled in the hooks runner. Also note additional keys will be
191     added by the hooks runner. If the LU doesn't define any
192     environment, an empty dict (and not None) should be returned.
193
194     No nodes should be returned as an empty list (and not None).
195
196     Note that if the HPATH for a LU class is None, this function will
197     not be called.
198
199     """
200     raise NotImplementedError
201
202   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
203     """Notify the LU about the results of its hooks.
204
205     This method is called every time a hooks phase is executed, and notifies
206     the Logical Unit about the hooks' result. The LU can then use it to alter
207     its result based on the hooks.  By default the method does nothing and the
208     previous result is passed back unchanged but any LU can define it if it
209     wants to use the local cluster hook-scripts somehow.
210
211     Args:
212       phase: the hooks phase that has just been run
213       hooks_results: the results of the multi-node hooks rpc call
214       feedback_fn: function to send feedback back to the caller
215       lu_result: the previous result this LU had, or None in the PRE phase.
216
217     """
218     return lu_result
219
220   def _ExpandAndLockInstance(self):
221     """Helper function to expand and lock an instance.
222
223     Many LUs that work on an instance take its name in self.op.instance_name
224     and need to expand it and then declare the expanded name for locking. This
225     function does it, and then updates self.op.instance_name to the expanded
226     name. It also initializes needed_locks as a dict, if this hasn't been done
227     before.
228
229     """
230     if self.needed_locks is None:
231       self.needed_locks = {}
232     else:
233       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
234         "_ExpandAndLockInstance called with instance-level locks set"
235     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
236     if expanded_name is None:
237       raise errors.OpPrereqError("Instance '%s' not known" %
238                                   self.op.instance_name)
239     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
240     self.op.instance_name = expanded_name
241
242
243 class NoHooksLU(LogicalUnit):
244   """Simple LU which runs no hooks.
245
246   This LU is intended as a parent for other LogicalUnits which will
247   run no hooks, in order to reduce duplicate code.
248
249   """
250   HPATH = None
251   HTYPE = None
252
253
254 def _GetWantedNodes(lu, nodes):
255   """Returns list of checked and expanded node names.
256
257   Args:
258     nodes: List of nodes (strings) or None for all
259
260   """
261   if not isinstance(nodes, list):
262     raise errors.OpPrereqError("Invalid argument type 'nodes'")
263
264   if nodes:
265     wanted = []
266
267     for name in nodes:
268       node = lu.cfg.ExpandNodeName(name)
269       if node is None:
270         raise errors.OpPrereqError("No such node name '%s'" % name)
271       wanted.append(node)
272
273   else:
274     wanted = lu.cfg.GetNodeList()
275   return utils.NiceSort(wanted)
276
277
278 def _GetWantedInstances(lu, instances):
279   """Returns list of checked and expanded instance names.
280
281   Args:
282     instances: List of instances (strings) or None for all
283
284   """
285   if not isinstance(instances, list):
286     raise errors.OpPrereqError("Invalid argument type 'instances'")
287
288   if instances:
289     wanted = []
290
291     for name in instances:
292       instance = lu.cfg.ExpandInstanceName(name)
293       if instance is None:
294         raise errors.OpPrereqError("No such instance name '%s'" % name)
295       wanted.append(instance)
296
297   else:
298     wanted = lu.cfg.GetInstanceList()
299   return utils.NiceSort(wanted)
300
301
302 def _CheckOutputFields(static, dynamic, selected):
303   """Checks whether all selected fields are valid.
304
305   Args:
306     static: Static fields
307     dynamic: Dynamic fields
308
309   """
310   static_fields = frozenset(static)
311   dynamic_fields = frozenset(dynamic)
312
313   all_fields = static_fields | dynamic_fields
314
315   if not all_fields.issuperset(selected):
316     raise errors.OpPrereqError("Unknown output fields selected: %s"
317                                % ",".join(frozenset(selected).
318                                           difference(all_fields)))
319
320
321 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
322                           memory, vcpus, nics):
323   """Builds instance related env variables for hooks from single variables.
324
325   Args:
326     secondary_nodes: List of secondary nodes as strings
327   """
328   env = {
329     "OP_TARGET": name,
330     "INSTANCE_NAME": name,
331     "INSTANCE_PRIMARY": primary_node,
332     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
333     "INSTANCE_OS_TYPE": os_type,
334     "INSTANCE_STATUS": status,
335     "INSTANCE_MEMORY": memory,
336     "INSTANCE_VCPUS": vcpus,
337   }
338
339   if nics:
340     nic_count = len(nics)
341     for idx, (ip, bridge, mac) in enumerate(nics):
342       if ip is None:
343         ip = ""
344       env["INSTANCE_NIC%d_IP" % idx] = ip
345       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
346       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
347   else:
348     nic_count = 0
349
350   env["INSTANCE_NIC_COUNT"] = nic_count
351
352   return env
353
354
355 def _BuildInstanceHookEnvByObject(instance, override=None):
356   """Builds instance related env variables for hooks from an object.
357
358   Args:
359     instance: objects.Instance object of instance
360     override: dict of values to override
361   """
362   args = {
363     'name': instance.name,
364     'primary_node': instance.primary_node,
365     'secondary_nodes': instance.secondary_nodes,
366     'os_type': instance.os,
367     'status': instance.os,
368     'memory': instance.memory,
369     'vcpus': instance.vcpus,
370     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
371   }
372   if override:
373     args.update(override)
374   return _BuildInstanceHookEnv(**args)
375
376
377 def _CheckInstanceBridgesExist(instance):
378   """Check that the brigdes needed by an instance exist.
379
380   """
381   # check bridges existance
382   brlist = [nic.bridge for nic in instance.nics]
383   if not rpc.call_bridges_exist(instance.primary_node, brlist):
384     raise errors.OpPrereqError("one or more target bridges %s does not"
385                                " exist on destination node '%s'" %
386                                (brlist, instance.primary_node))
387
388
389 class LUDestroyCluster(NoHooksLU):
390   """Logical unit for destroying the cluster.
391
392   """
393   _OP_REQP = []
394
395   def CheckPrereq(self):
396     """Check prerequisites.
397
398     This checks whether the cluster is empty.
399
400     Any errors are signalled by raising errors.OpPrereqError.
401
402     """
403     master = self.sstore.GetMasterNode()
404
405     nodelist = self.cfg.GetNodeList()
406     if len(nodelist) != 1 or nodelist[0] != master:
407       raise errors.OpPrereqError("There are still %d node(s) in"
408                                  " this cluster." % (len(nodelist) - 1))
409     instancelist = self.cfg.GetInstanceList()
410     if instancelist:
411       raise errors.OpPrereqError("There are still %d instance(s) in"
412                                  " this cluster." % len(instancelist))
413
414   def Exec(self, feedback_fn):
415     """Destroys the cluster.
416
417     """
418     master = self.sstore.GetMasterNode()
419     if not rpc.call_node_stop_master(master):
420       raise errors.OpExecError("Could not disable the master role")
421     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
422     utils.CreateBackup(priv_key)
423     utils.CreateBackup(pub_key)
424     rpc.call_node_leave_cluster(master)
425
426
427 class LUVerifyCluster(LogicalUnit):
428   """Verifies the cluster status.
429
430   """
431   HPATH = "cluster-verify"
432   HTYPE = constants.HTYPE_CLUSTER
433   _OP_REQP = ["skip_checks"]
434
435   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
436                   remote_version, feedback_fn):
437     """Run multiple tests against a node.
438
439     Test list:
440       - compares ganeti version
441       - checks vg existance and size > 20G
442       - checks config file checksum
443       - checks ssh to other nodes
444
445     Args:
446       node: name of the node to check
447       file_list: required list of files
448       local_cksum: dictionary of local files and their checksums
449
450     """
451     # compares ganeti version
452     local_version = constants.PROTOCOL_VERSION
453     if not remote_version:
454       feedback_fn("  - ERROR: connection to %s failed" % (node))
455       return True
456
457     if local_version != remote_version:
458       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
459                       (local_version, node, remote_version))
460       return True
461
462     # checks vg existance and size > 20G
463
464     bad = False
465     if not vglist:
466       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
467                       (node,))
468       bad = True
469     else:
470       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
471                                             constants.MIN_VG_SIZE)
472       if vgstatus:
473         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
474         bad = True
475
476     # checks config file checksum
477     # checks ssh to any
478
479     if 'filelist' not in node_result:
480       bad = True
481       feedback_fn("  - ERROR: node hasn't returned file checksum data")
482     else:
483       remote_cksum = node_result['filelist']
484       for file_name in file_list:
485         if file_name not in remote_cksum:
486           bad = True
487           feedback_fn("  - ERROR: file '%s' missing" % file_name)
488         elif remote_cksum[file_name] != local_cksum[file_name]:
489           bad = True
490           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
491
492     if 'nodelist' not in node_result:
493       bad = True
494       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
495     else:
496       if node_result['nodelist']:
497         bad = True
498         for node in node_result['nodelist']:
499           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
500                           (node, node_result['nodelist'][node]))
501     if 'node-net-test' not in node_result:
502       bad = True
503       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
504     else:
505       if node_result['node-net-test']:
506         bad = True
507         nlist = utils.NiceSort(node_result['node-net-test'].keys())
508         for node in nlist:
509           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
510                           (node, node_result['node-net-test'][node]))
511
512     hyp_result = node_result.get('hypervisor', None)
513     if hyp_result is not None:
514       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
515     return bad
516
517   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
518                       node_instance, feedback_fn):
519     """Verify an instance.
520
521     This function checks to see if the required block devices are
522     available on the instance's node.
523
524     """
525     bad = False
526
527     node_current = instanceconfig.primary_node
528
529     node_vol_should = {}
530     instanceconfig.MapLVsByNode(node_vol_should)
531
532     for node in node_vol_should:
533       for volume in node_vol_should[node]:
534         if node not in node_vol_is or volume not in node_vol_is[node]:
535           feedback_fn("  - ERROR: volume %s missing on node %s" %
536                           (volume, node))
537           bad = True
538
539     if not instanceconfig.status == 'down':
540       if (node_current not in node_instance or
541           not instance in node_instance[node_current]):
542         feedback_fn("  - ERROR: instance %s not running on node %s" %
543                         (instance, node_current))
544         bad = True
545
546     for node in node_instance:
547       if (not node == node_current):
548         if instance in node_instance[node]:
549           feedback_fn("  - ERROR: instance %s should not run on node %s" %
550                           (instance, node))
551           bad = True
552
553     return bad
554
555   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
556     """Verify if there are any unknown volumes in the cluster.
557
558     The .os, .swap and backup volumes are ignored. All other volumes are
559     reported as unknown.
560
561     """
562     bad = False
563
564     for node in node_vol_is:
565       for volume in node_vol_is[node]:
566         if node not in node_vol_should or volume not in node_vol_should[node]:
567           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
568                       (volume, node))
569           bad = True
570     return bad
571
572   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
573     """Verify the list of running instances.
574
575     This checks what instances are running but unknown to the cluster.
576
577     """
578     bad = False
579     for node in node_instance:
580       for runninginstance in node_instance[node]:
581         if runninginstance not in instancelist:
582           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
583                           (runninginstance, node))
584           bad = True
585     return bad
586
587   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
588     """Verify N+1 Memory Resilience.
589
590     Check that if one single node dies we can still start all the instances it
591     was primary for.
592
593     """
594     bad = False
595
596     for node, nodeinfo in node_info.iteritems():
597       # This code checks that every node which is now listed as secondary has
598       # enough memory to host all instances it is supposed to should a single
599       # other node in the cluster fail.
600       # FIXME: not ready for failover to an arbitrary node
601       # FIXME: does not support file-backed instances
602       # WARNING: we currently take into account down instances as well as up
603       # ones, considering that even if they're down someone might want to start
604       # them even in the event of a node failure.
605       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
606         needed_mem = 0
607         for instance in instances:
608           needed_mem += instance_cfg[instance].memory
609         if nodeinfo['mfree'] < needed_mem:
610           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
611                       " failovers should node %s fail" % (node, prinode))
612           bad = True
613     return bad
614
615   def CheckPrereq(self):
616     """Check prerequisites.
617
618     Transform the list of checks we're going to skip into a set and check that
619     all its members are valid.
620
621     """
622     self.skip_set = frozenset(self.op.skip_checks)
623     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
624       raise errors.OpPrereqError("Invalid checks to be skipped specified")
625
626   def BuildHooksEnv(self):
627     """Build hooks env.
628
629     Cluster-Verify hooks just rone in the post phase and their failure makes
630     the output be logged in the verify output and the verification to fail.
631
632     """
633     all_nodes = self.cfg.GetNodeList()
634     # TODO: populate the environment with useful information for verify hooks
635     env = {}
636     return env, [], all_nodes
637
638   def Exec(self, feedback_fn):
639     """Verify integrity of cluster, performing various test on nodes.
640
641     """
642     bad = False
643     feedback_fn("* Verifying global settings")
644     for msg in self.cfg.VerifyConfig():
645       feedback_fn("  - ERROR: %s" % msg)
646
647     vg_name = self.cfg.GetVGName()
648     nodelist = utils.NiceSort(self.cfg.GetNodeList())
649     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
650     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
651     i_non_redundant = [] # Non redundant instances
652     node_volume = {}
653     node_instance = {}
654     node_info = {}
655     instance_cfg = {}
656
657     # FIXME: verify OS list
658     # do local checksums
659     file_names = list(self.sstore.GetFileList())
660     file_names.append(constants.SSL_CERT_FILE)
661     file_names.append(constants.CLUSTER_CONF_FILE)
662     local_checksums = utils.FingerprintFiles(file_names)
663
664     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
665     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
666     all_instanceinfo = rpc.call_instance_list(nodelist)
667     all_vglist = rpc.call_vg_list(nodelist)
668     node_verify_param = {
669       'filelist': file_names,
670       'nodelist': nodelist,
671       'hypervisor': None,
672       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
673                         for node in nodeinfo]
674       }
675     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
676     all_rversion = rpc.call_version(nodelist)
677     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
678
679     for node in nodelist:
680       feedback_fn("* Verifying node %s" % node)
681       result = self._VerifyNode(node, file_names, local_checksums,
682                                 all_vglist[node], all_nvinfo[node],
683                                 all_rversion[node], feedback_fn)
684       bad = bad or result
685
686       # node_volume
687       volumeinfo = all_volumeinfo[node]
688
689       if isinstance(volumeinfo, basestring):
690         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
691                     (node, volumeinfo[-400:].encode('string_escape')))
692         bad = True
693         node_volume[node] = {}
694       elif not isinstance(volumeinfo, dict):
695         feedback_fn("  - ERROR: connection to %s failed" % (node,))
696         bad = True
697         continue
698       else:
699         node_volume[node] = volumeinfo
700
701       # node_instance
702       nodeinstance = all_instanceinfo[node]
703       if type(nodeinstance) != list:
704         feedback_fn("  - ERROR: connection to %s failed" % (node,))
705         bad = True
706         continue
707
708       node_instance[node] = nodeinstance
709
710       # node_info
711       nodeinfo = all_ninfo[node]
712       if not isinstance(nodeinfo, dict):
713         feedback_fn("  - ERROR: connection to %s failed" % (node,))
714         bad = True
715         continue
716
717       try:
718         node_info[node] = {
719           "mfree": int(nodeinfo['memory_free']),
720           "dfree": int(nodeinfo['vg_free']),
721           "pinst": [],
722           "sinst": [],
723           # dictionary holding all instances this node is secondary for,
724           # grouped by their primary node. Each key is a cluster node, and each
725           # value is a list of instances which have the key as primary and the
726           # current node as secondary.  this is handy to calculate N+1 memory
727           # availability if you can only failover from a primary to its
728           # secondary.
729           "sinst-by-pnode": {},
730         }
731       except ValueError:
732         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
733         bad = True
734         continue
735
736     node_vol_should = {}
737
738     for instance in instancelist:
739       feedback_fn("* Verifying instance %s" % instance)
740       inst_config = self.cfg.GetInstanceInfo(instance)
741       result =  self._VerifyInstance(instance, inst_config, node_volume,
742                                      node_instance, feedback_fn)
743       bad = bad or result
744
745       inst_config.MapLVsByNode(node_vol_should)
746
747       instance_cfg[instance] = inst_config
748
749       pnode = inst_config.primary_node
750       if pnode in node_info:
751         node_info[pnode]['pinst'].append(instance)
752       else:
753         feedback_fn("  - ERROR: instance %s, connection to primary node"
754                     " %s failed" % (instance, pnode))
755         bad = True
756
757       # If the instance is non-redundant we cannot survive losing its primary
758       # node, so we are not N+1 compliant. On the other hand we have no disk
759       # templates with more than one secondary so that situation is not well
760       # supported either.
761       # FIXME: does not support file-backed instances
762       if len(inst_config.secondary_nodes) == 0:
763         i_non_redundant.append(instance)
764       elif len(inst_config.secondary_nodes) > 1:
765         feedback_fn("  - WARNING: multiple secondaries for instance %s"
766                     % instance)
767
768       for snode in inst_config.secondary_nodes:
769         if snode in node_info:
770           node_info[snode]['sinst'].append(instance)
771           if pnode not in node_info[snode]['sinst-by-pnode']:
772             node_info[snode]['sinst-by-pnode'][pnode] = []
773           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
774         else:
775           feedback_fn("  - ERROR: instance %s, connection to secondary node"
776                       " %s failed" % (instance, snode))
777
778     feedback_fn("* Verifying orphan volumes")
779     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
780                                        feedback_fn)
781     bad = bad or result
782
783     feedback_fn("* Verifying remaining instances")
784     result = self._VerifyOrphanInstances(instancelist, node_instance,
785                                          feedback_fn)
786     bad = bad or result
787
788     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
789       feedback_fn("* Verifying N+1 Memory redundancy")
790       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
791       bad = bad or result
792
793     feedback_fn("* Other Notes")
794     if i_non_redundant:
795       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
796                   % len(i_non_redundant))
797
798     return int(bad)
799
800   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
801     """Analize the post-hooks' result, handle it, and send some
802     nicely-formatted feedback back to the user.
803
804     Args:
805       phase: the hooks phase that has just been run
806       hooks_results: the results of the multi-node hooks rpc call
807       feedback_fn: function to send feedback back to the caller
808       lu_result: previous Exec result
809
810     """
811     # We only really run POST phase hooks, and are only interested in their results
812     if phase == constants.HOOKS_PHASE_POST:
813       # Used to change hooks' output to proper indentation
814       indent_re = re.compile('^', re.M)
815       feedback_fn("* Hooks Results")
816       if not hooks_results:
817         feedback_fn("  - ERROR: general communication failure")
818         lu_result = 1
819       else:
820         for node_name in hooks_results:
821           show_node_header = True
822           res = hooks_results[node_name]
823           if res is False or not isinstance(res, list):
824             feedback_fn("    Communication failure")
825             lu_result = 1
826             continue
827           for script, hkr, output in res:
828             if hkr == constants.HKR_FAIL:
829               # The node header is only shown once, if there are
830               # failing hooks on that node
831               if show_node_header:
832                 feedback_fn("  Node %s:" % node_name)
833                 show_node_header = False
834               feedback_fn("    ERROR: Script %s failed, output:" % script)
835               output = indent_re.sub('      ', output)
836               feedback_fn("%s" % output)
837               lu_result = 1
838
839       return lu_result
840
841
842 class LUVerifyDisks(NoHooksLU):
843   """Verifies the cluster disks status.
844
845   """
846   _OP_REQP = []
847
848   def CheckPrereq(self):
849     """Check prerequisites.
850
851     This has no prerequisites.
852
853     """
854     pass
855
856   def Exec(self, feedback_fn):
857     """Verify integrity of cluster disks.
858
859     """
860     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
861
862     vg_name = self.cfg.GetVGName()
863     nodes = utils.NiceSort(self.cfg.GetNodeList())
864     instances = [self.cfg.GetInstanceInfo(name)
865                  for name in self.cfg.GetInstanceList()]
866
867     nv_dict = {}
868     for inst in instances:
869       inst_lvs = {}
870       if (inst.status != "up" or
871           inst.disk_template not in constants.DTS_NET_MIRROR):
872         continue
873       inst.MapLVsByNode(inst_lvs)
874       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
875       for node, vol_list in inst_lvs.iteritems():
876         for vol in vol_list:
877           nv_dict[(node, vol)] = inst
878
879     if not nv_dict:
880       return result
881
882     node_lvs = rpc.call_volume_list(nodes, vg_name)
883
884     to_act = set()
885     for node in nodes:
886       # node_volume
887       lvs = node_lvs[node]
888
889       if isinstance(lvs, basestring):
890         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
891         res_nlvm[node] = lvs
892       elif not isinstance(lvs, dict):
893         logger.Info("connection to node %s failed or invalid data returned" %
894                     (node,))
895         res_nodes.append(node)
896         continue
897
898       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
899         inst = nv_dict.pop((node, lv_name), None)
900         if (not lv_online and inst is not None
901             and inst.name not in res_instances):
902           res_instances.append(inst.name)
903
904     # any leftover items in nv_dict are missing LVs, let's arrange the
905     # data better
906     for key, inst in nv_dict.iteritems():
907       if inst.name not in res_missing:
908         res_missing[inst.name] = []
909       res_missing[inst.name].append(key)
910
911     return result
912
913
914 class LURenameCluster(LogicalUnit):
915   """Rename the cluster.
916
917   """
918   HPATH = "cluster-rename"
919   HTYPE = constants.HTYPE_CLUSTER
920   _OP_REQP = ["name"]
921   REQ_WSSTORE = True
922
923   def BuildHooksEnv(self):
924     """Build hooks env.
925
926     """
927     env = {
928       "OP_TARGET": self.sstore.GetClusterName(),
929       "NEW_NAME": self.op.name,
930       }
931     mn = self.sstore.GetMasterNode()
932     return env, [mn], [mn]
933
934   def CheckPrereq(self):
935     """Verify that the passed name is a valid one.
936
937     """
938     hostname = utils.HostInfo(self.op.name)
939
940     new_name = hostname.name
941     self.ip = new_ip = hostname.ip
942     old_name = self.sstore.GetClusterName()
943     old_ip = self.sstore.GetMasterIP()
944     if new_name == old_name and new_ip == old_ip:
945       raise errors.OpPrereqError("Neither the name nor the IP address of the"
946                                  " cluster has changed")
947     if new_ip != old_ip:
948       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
949         raise errors.OpPrereqError("The given cluster IP address (%s) is"
950                                    " reachable on the network. Aborting." %
951                                    new_ip)
952
953     self.op.name = new_name
954
955   def Exec(self, feedback_fn):
956     """Rename the cluster.
957
958     """
959     clustername = self.op.name
960     ip = self.ip
961     ss = self.sstore
962
963     # shutdown the master IP
964     master = ss.GetMasterNode()
965     if not rpc.call_node_stop_master(master):
966       raise errors.OpExecError("Could not disable the master role")
967
968     try:
969       # modify the sstore
970       ss.SetKey(ss.SS_MASTER_IP, ip)
971       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
972
973       # Distribute updated ss config to all nodes
974       myself = self.cfg.GetNodeInfo(master)
975       dist_nodes = self.cfg.GetNodeList()
976       if myself.name in dist_nodes:
977         dist_nodes.remove(myself.name)
978
979       logger.Debug("Copying updated ssconf data to all nodes")
980       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
981         fname = ss.KeyToFilename(keyname)
982         result = rpc.call_upload_file(dist_nodes, fname)
983         for to_node in dist_nodes:
984           if not result[to_node]:
985             logger.Error("copy of file %s to node %s failed" %
986                          (fname, to_node))
987     finally:
988       if not rpc.call_node_start_master(master):
989         logger.Error("Could not re-enable the master role on the master,"
990                      " please restart manually.")
991
992
993 def _RecursiveCheckIfLVMBased(disk):
994   """Check if the given disk or its children are lvm-based.
995
996   Args:
997     disk: ganeti.objects.Disk object
998
999   Returns:
1000     boolean indicating whether a LD_LV dev_type was found or not
1001
1002   """
1003   if disk.children:
1004     for chdisk in disk.children:
1005       if _RecursiveCheckIfLVMBased(chdisk):
1006         return True
1007   return disk.dev_type == constants.LD_LV
1008
1009
1010 class LUSetClusterParams(LogicalUnit):
1011   """Change the parameters of the cluster.
1012
1013   """
1014   HPATH = "cluster-modify"
1015   HTYPE = constants.HTYPE_CLUSTER
1016   _OP_REQP = []
1017
1018   def BuildHooksEnv(self):
1019     """Build hooks env.
1020
1021     """
1022     env = {
1023       "OP_TARGET": self.sstore.GetClusterName(),
1024       "NEW_VG_NAME": self.op.vg_name,
1025       }
1026     mn = self.sstore.GetMasterNode()
1027     return env, [mn], [mn]
1028
1029   def CheckPrereq(self):
1030     """Check prerequisites.
1031
1032     This checks whether the given params don't conflict and
1033     if the given volume group is valid.
1034
1035     """
1036     if not self.op.vg_name:
1037       instances = [self.cfg.GetInstanceInfo(name)
1038                    for name in self.cfg.GetInstanceList()]
1039       for inst in instances:
1040         for disk in inst.disks:
1041           if _RecursiveCheckIfLVMBased(disk):
1042             raise errors.OpPrereqError("Cannot disable lvm storage while"
1043                                        " lvm-based instances exist")
1044
1045     # if vg_name not None, checks given volume group on all nodes
1046     if self.op.vg_name:
1047       node_list = self.cfg.GetNodeList()
1048       vglist = rpc.call_vg_list(node_list)
1049       for node in node_list:
1050         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1051                                               constants.MIN_VG_SIZE)
1052         if vgstatus:
1053           raise errors.OpPrereqError("Error on node '%s': %s" %
1054                                      (node, vgstatus))
1055
1056   def Exec(self, feedback_fn):
1057     """Change the parameters of the cluster.
1058
1059     """
1060     if self.op.vg_name != self.cfg.GetVGName():
1061       self.cfg.SetVGName(self.op.vg_name)
1062     else:
1063       feedback_fn("Cluster LVM configuration already in desired"
1064                   " state, not changing")
1065
1066
1067 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1068   """Sleep and poll for an instance's disk to sync.
1069
1070   """
1071   if not instance.disks:
1072     return True
1073
1074   if not oneshot:
1075     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1076
1077   node = instance.primary_node
1078
1079   for dev in instance.disks:
1080     cfgw.SetDiskID(dev, node)
1081
1082   retries = 0
1083   while True:
1084     max_time = 0
1085     done = True
1086     cumul_degraded = False
1087     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1088     if not rstats:
1089       proc.LogWarning("Can't get any data from node %s" % node)
1090       retries += 1
1091       if retries >= 10:
1092         raise errors.RemoteError("Can't contact node %s for mirror data,"
1093                                  " aborting." % node)
1094       time.sleep(6)
1095       continue
1096     retries = 0
1097     for i in range(len(rstats)):
1098       mstat = rstats[i]
1099       if mstat is None:
1100         proc.LogWarning("Can't compute data for node %s/%s" %
1101                         (node, instance.disks[i].iv_name))
1102         continue
1103       # we ignore the ldisk parameter
1104       perc_done, est_time, is_degraded, _ = mstat
1105       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1106       if perc_done is not None:
1107         done = False
1108         if est_time is not None:
1109           rem_time = "%d estimated seconds remaining" % est_time
1110           max_time = est_time
1111         else:
1112           rem_time = "no time estimate"
1113         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1114                      (instance.disks[i].iv_name, perc_done, rem_time))
1115     if done or oneshot:
1116       break
1117
1118     time.sleep(min(60, max_time))
1119
1120   if done:
1121     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1122   return not cumul_degraded
1123
1124
1125 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1126   """Check that mirrors are not degraded.
1127
1128   The ldisk parameter, if True, will change the test from the
1129   is_degraded attribute (which represents overall non-ok status for
1130   the device(s)) to the ldisk (representing the local storage status).
1131
1132   """
1133   cfgw.SetDiskID(dev, node)
1134   if ldisk:
1135     idx = 6
1136   else:
1137     idx = 5
1138
1139   result = True
1140   if on_primary or dev.AssembleOnSecondary():
1141     rstats = rpc.call_blockdev_find(node, dev)
1142     if not rstats:
1143       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1144       result = False
1145     else:
1146       result = result and (not rstats[idx])
1147   if dev.children:
1148     for child in dev.children:
1149       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1150
1151   return result
1152
1153
1154 class LUDiagnoseOS(NoHooksLU):
1155   """Logical unit for OS diagnose/query.
1156
1157   """
1158   _OP_REQP = ["output_fields", "names"]
1159
1160   def CheckPrereq(self):
1161     """Check prerequisites.
1162
1163     This always succeeds, since this is a pure query LU.
1164
1165     """
1166     if self.op.names:
1167       raise errors.OpPrereqError("Selective OS query not supported")
1168
1169     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1170     _CheckOutputFields(static=[],
1171                        dynamic=self.dynamic_fields,
1172                        selected=self.op.output_fields)
1173
1174   @staticmethod
1175   def _DiagnoseByOS(node_list, rlist):
1176     """Remaps a per-node return list into an a per-os per-node dictionary
1177
1178       Args:
1179         node_list: a list with the names of all nodes
1180         rlist: a map with node names as keys and OS objects as values
1181
1182       Returns:
1183         map: a map with osnames as keys and as value another map, with
1184              nodes as
1185              keys and list of OS objects as values
1186              e.g. {"debian-etch": {"node1": [<object>,...],
1187                                    "node2": [<object>,]}
1188                   }
1189
1190     """
1191     all_os = {}
1192     for node_name, nr in rlist.iteritems():
1193       if not nr:
1194         continue
1195       for os_obj in nr:
1196         if os_obj.name not in all_os:
1197           # build a list of nodes for this os containing empty lists
1198           # for each node in node_list
1199           all_os[os_obj.name] = {}
1200           for nname in node_list:
1201             all_os[os_obj.name][nname] = []
1202         all_os[os_obj.name][node_name].append(os_obj)
1203     return all_os
1204
1205   def Exec(self, feedback_fn):
1206     """Compute the list of OSes.
1207
1208     """
1209     node_list = self.cfg.GetNodeList()
1210     node_data = rpc.call_os_diagnose(node_list)
1211     if node_data == False:
1212       raise errors.OpExecError("Can't gather the list of OSes")
1213     pol = self._DiagnoseByOS(node_list, node_data)
1214     output = []
1215     for os_name, os_data in pol.iteritems():
1216       row = []
1217       for field in self.op.output_fields:
1218         if field == "name":
1219           val = os_name
1220         elif field == "valid":
1221           val = utils.all([osl and osl[0] for osl in os_data.values()])
1222         elif field == "node_status":
1223           val = {}
1224           for node_name, nos_list in os_data.iteritems():
1225             val[node_name] = [(v.status, v.path) for v in nos_list]
1226         else:
1227           raise errors.ParameterError(field)
1228         row.append(val)
1229       output.append(row)
1230
1231     return output
1232
1233
1234 class LURemoveNode(LogicalUnit):
1235   """Logical unit for removing a node.
1236
1237   """
1238   HPATH = "node-remove"
1239   HTYPE = constants.HTYPE_NODE
1240   _OP_REQP = ["node_name"]
1241
1242   def BuildHooksEnv(self):
1243     """Build hooks env.
1244
1245     This doesn't run on the target node in the pre phase as a failed
1246     node would then be impossible to remove.
1247
1248     """
1249     env = {
1250       "OP_TARGET": self.op.node_name,
1251       "NODE_NAME": self.op.node_name,
1252       }
1253     all_nodes = self.cfg.GetNodeList()
1254     all_nodes.remove(self.op.node_name)
1255     return env, all_nodes, all_nodes
1256
1257   def CheckPrereq(self):
1258     """Check prerequisites.
1259
1260     This checks:
1261      - the node exists in the configuration
1262      - it does not have primary or secondary instances
1263      - it's not the master
1264
1265     Any errors are signalled by raising errors.OpPrereqError.
1266
1267     """
1268     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1269     if node is None:
1270       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1271
1272     instance_list = self.cfg.GetInstanceList()
1273
1274     masternode = self.sstore.GetMasterNode()
1275     if node.name == masternode:
1276       raise errors.OpPrereqError("Node is the master node,"
1277                                  " you need to failover first.")
1278
1279     for instance_name in instance_list:
1280       instance = self.cfg.GetInstanceInfo(instance_name)
1281       if node.name == instance.primary_node:
1282         raise errors.OpPrereqError("Instance %s still running on the node,"
1283                                    " please remove first." % instance_name)
1284       if node.name in instance.secondary_nodes:
1285         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1286                                    " please remove first." % instance_name)
1287     self.op.node_name = node.name
1288     self.node = node
1289
1290   def Exec(self, feedback_fn):
1291     """Removes the node from the cluster.
1292
1293     """
1294     node = self.node
1295     logger.Info("stopping the node daemon and removing configs from node %s" %
1296                 node.name)
1297
1298     rpc.call_node_leave_cluster(node.name)
1299
1300     logger.Info("Removing node %s from config" % node.name)
1301
1302     self.cfg.RemoveNode(node.name)
1303     # Remove the node from the Ganeti Lock Manager
1304     self.context.glm.remove(locking.LEVEL_NODE, node.name)
1305
1306     utils.RemoveHostFromEtcHosts(node.name)
1307
1308
1309 class LUQueryNodes(NoHooksLU):
1310   """Logical unit for querying nodes.
1311
1312   """
1313   _OP_REQP = ["output_fields", "names"]
1314
1315   def CheckPrereq(self):
1316     """Check prerequisites.
1317
1318     This checks that the fields required are valid output fields.
1319
1320     """
1321     self.dynamic_fields = frozenset([
1322       "dtotal", "dfree",
1323       "mtotal", "mnode", "mfree",
1324       "bootid",
1325       "ctotal",
1326       ])
1327
1328     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1329                                "pinst_list", "sinst_list",
1330                                "pip", "sip", "tags"],
1331                        dynamic=self.dynamic_fields,
1332                        selected=self.op.output_fields)
1333
1334     self.wanted = _GetWantedNodes(self, self.op.names)
1335
1336   def Exec(self, feedback_fn):
1337     """Computes the list of nodes and their attributes.
1338
1339     """
1340     nodenames = self.wanted
1341     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1342
1343     # begin data gathering
1344
1345     if self.dynamic_fields.intersection(self.op.output_fields):
1346       live_data = {}
1347       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1348       for name in nodenames:
1349         nodeinfo = node_data.get(name, None)
1350         if nodeinfo:
1351           live_data[name] = {
1352             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1353             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1354             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1355             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1356             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1357             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1358             "bootid": nodeinfo['bootid'],
1359             }
1360         else:
1361           live_data[name] = {}
1362     else:
1363       live_data = dict.fromkeys(nodenames, {})
1364
1365     node_to_primary = dict([(name, set()) for name in nodenames])
1366     node_to_secondary = dict([(name, set()) for name in nodenames])
1367
1368     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1369                              "sinst_cnt", "sinst_list"))
1370     if inst_fields & frozenset(self.op.output_fields):
1371       instancelist = self.cfg.GetInstanceList()
1372
1373       for instance_name in instancelist:
1374         inst = self.cfg.GetInstanceInfo(instance_name)
1375         if inst.primary_node in node_to_primary:
1376           node_to_primary[inst.primary_node].add(inst.name)
1377         for secnode in inst.secondary_nodes:
1378           if secnode in node_to_secondary:
1379             node_to_secondary[secnode].add(inst.name)
1380
1381     # end data gathering
1382
1383     output = []
1384     for node in nodelist:
1385       node_output = []
1386       for field in self.op.output_fields:
1387         if field == "name":
1388           val = node.name
1389         elif field == "pinst_list":
1390           val = list(node_to_primary[node.name])
1391         elif field == "sinst_list":
1392           val = list(node_to_secondary[node.name])
1393         elif field == "pinst_cnt":
1394           val = len(node_to_primary[node.name])
1395         elif field == "sinst_cnt":
1396           val = len(node_to_secondary[node.name])
1397         elif field == "pip":
1398           val = node.primary_ip
1399         elif field == "sip":
1400           val = node.secondary_ip
1401         elif field == "tags":
1402           val = list(node.GetTags())
1403         elif field in self.dynamic_fields:
1404           val = live_data[node.name].get(field, None)
1405         else:
1406           raise errors.ParameterError(field)
1407         node_output.append(val)
1408       output.append(node_output)
1409
1410     return output
1411
1412
1413 class LUQueryNodeVolumes(NoHooksLU):
1414   """Logical unit for getting volumes on node(s).
1415
1416   """
1417   _OP_REQP = ["nodes", "output_fields"]
1418
1419   def CheckPrereq(self):
1420     """Check prerequisites.
1421
1422     This checks that the fields required are valid output fields.
1423
1424     """
1425     self.nodes = _GetWantedNodes(self, self.op.nodes)
1426
1427     _CheckOutputFields(static=["node"],
1428                        dynamic=["phys", "vg", "name", "size", "instance"],
1429                        selected=self.op.output_fields)
1430
1431
1432   def Exec(self, feedback_fn):
1433     """Computes the list of nodes and their attributes.
1434
1435     """
1436     nodenames = self.nodes
1437     volumes = rpc.call_node_volumes(nodenames)
1438
1439     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1440              in self.cfg.GetInstanceList()]
1441
1442     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1443
1444     output = []
1445     for node in nodenames:
1446       if node not in volumes or not volumes[node]:
1447         continue
1448
1449       node_vols = volumes[node][:]
1450       node_vols.sort(key=lambda vol: vol['dev'])
1451
1452       for vol in node_vols:
1453         node_output = []
1454         for field in self.op.output_fields:
1455           if field == "node":
1456             val = node
1457           elif field == "phys":
1458             val = vol['dev']
1459           elif field == "vg":
1460             val = vol['vg']
1461           elif field == "name":
1462             val = vol['name']
1463           elif field == "size":
1464             val = int(float(vol['size']))
1465           elif field == "instance":
1466             for inst in ilist:
1467               if node not in lv_by_node[inst]:
1468                 continue
1469               if vol['name'] in lv_by_node[inst][node]:
1470                 val = inst.name
1471                 break
1472             else:
1473               val = '-'
1474           else:
1475             raise errors.ParameterError(field)
1476           node_output.append(str(val))
1477
1478         output.append(node_output)
1479
1480     return output
1481
1482
1483 class LUAddNode(LogicalUnit):
1484   """Logical unit for adding node to the cluster.
1485
1486   """
1487   HPATH = "node-add"
1488   HTYPE = constants.HTYPE_NODE
1489   _OP_REQP = ["node_name"]
1490
1491   def BuildHooksEnv(self):
1492     """Build hooks env.
1493
1494     This will run on all nodes before, and on all nodes + the new node after.
1495
1496     """
1497     env = {
1498       "OP_TARGET": self.op.node_name,
1499       "NODE_NAME": self.op.node_name,
1500       "NODE_PIP": self.op.primary_ip,
1501       "NODE_SIP": self.op.secondary_ip,
1502       }
1503     nodes_0 = self.cfg.GetNodeList()
1504     nodes_1 = nodes_0 + [self.op.node_name, ]
1505     return env, nodes_0, nodes_1
1506
1507   def CheckPrereq(self):
1508     """Check prerequisites.
1509
1510     This checks:
1511      - the new node is not already in the config
1512      - it is resolvable
1513      - its parameters (single/dual homed) matches the cluster
1514
1515     Any errors are signalled by raising errors.OpPrereqError.
1516
1517     """
1518     node_name = self.op.node_name
1519     cfg = self.cfg
1520
1521     dns_data = utils.HostInfo(node_name)
1522
1523     node = dns_data.name
1524     primary_ip = self.op.primary_ip = dns_data.ip
1525     secondary_ip = getattr(self.op, "secondary_ip", None)
1526     if secondary_ip is None:
1527       secondary_ip = primary_ip
1528     if not utils.IsValidIP(secondary_ip):
1529       raise errors.OpPrereqError("Invalid secondary IP given")
1530     self.op.secondary_ip = secondary_ip
1531
1532     node_list = cfg.GetNodeList()
1533     if not self.op.readd and node in node_list:
1534       raise errors.OpPrereqError("Node %s is already in the configuration" %
1535                                  node)
1536     elif self.op.readd and node not in node_list:
1537       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1538
1539     for existing_node_name in node_list:
1540       existing_node = cfg.GetNodeInfo(existing_node_name)
1541
1542       if self.op.readd and node == existing_node_name:
1543         if (existing_node.primary_ip != primary_ip or
1544             existing_node.secondary_ip != secondary_ip):
1545           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1546                                      " address configuration as before")
1547         continue
1548
1549       if (existing_node.primary_ip == primary_ip or
1550           existing_node.secondary_ip == primary_ip or
1551           existing_node.primary_ip == secondary_ip or
1552           existing_node.secondary_ip == secondary_ip):
1553         raise errors.OpPrereqError("New node ip address(es) conflict with"
1554                                    " existing node %s" % existing_node.name)
1555
1556     # check that the type of the node (single versus dual homed) is the
1557     # same as for the master
1558     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1559     master_singlehomed = myself.secondary_ip == myself.primary_ip
1560     newbie_singlehomed = secondary_ip == primary_ip
1561     if master_singlehomed != newbie_singlehomed:
1562       if master_singlehomed:
1563         raise errors.OpPrereqError("The master has no private ip but the"
1564                                    " new node has one")
1565       else:
1566         raise errors.OpPrereqError("The master has a private ip but the"
1567                                    " new node doesn't have one")
1568
1569     # checks reachablity
1570     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1571       raise errors.OpPrereqError("Node not reachable by ping")
1572
1573     if not newbie_singlehomed:
1574       # check reachability from my secondary ip to newbie's secondary ip
1575       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1576                            source=myself.secondary_ip):
1577         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1578                                    " based ping to noded port")
1579
1580     self.new_node = objects.Node(name=node,
1581                                  primary_ip=primary_ip,
1582                                  secondary_ip=secondary_ip)
1583
1584   def Exec(self, feedback_fn):
1585     """Adds the new node to the cluster.
1586
1587     """
1588     new_node = self.new_node
1589     node = new_node.name
1590
1591     # check connectivity
1592     result = rpc.call_version([node])[node]
1593     if result:
1594       if constants.PROTOCOL_VERSION == result:
1595         logger.Info("communication to node %s fine, sw version %s match" %
1596                     (node, result))
1597       else:
1598         raise errors.OpExecError("Version mismatch master version %s,"
1599                                  " node version %s" %
1600                                  (constants.PROTOCOL_VERSION, result))
1601     else:
1602       raise errors.OpExecError("Cannot get version from the new node")
1603
1604     # setup ssh on node
1605     logger.Info("copy ssh key to node %s" % node)
1606     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1607     keyarray = []
1608     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1609                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1610                 priv_key, pub_key]
1611
1612     for i in keyfiles:
1613       f = open(i, 'r')
1614       try:
1615         keyarray.append(f.read())
1616       finally:
1617         f.close()
1618
1619     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1620                                keyarray[3], keyarray[4], keyarray[5])
1621
1622     if not result:
1623       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1624
1625     # Add node to our /etc/hosts, and add key to known_hosts
1626     utils.AddHostToEtcHosts(new_node.name)
1627
1628     if new_node.secondary_ip != new_node.primary_ip:
1629       if not rpc.call_node_tcp_ping(new_node.name,
1630                                     constants.LOCALHOST_IP_ADDRESS,
1631                                     new_node.secondary_ip,
1632                                     constants.DEFAULT_NODED_PORT,
1633                                     10, False):
1634         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1635                                  " you gave (%s). Please fix and re-run this"
1636                                  " command." % new_node.secondary_ip)
1637
1638     node_verify_list = [self.sstore.GetMasterNode()]
1639     node_verify_param = {
1640       'nodelist': [node],
1641       # TODO: do a node-net-test as well?
1642     }
1643
1644     result = rpc.call_node_verify(node_verify_list, node_verify_param)
1645     for verifier in node_verify_list:
1646       if not result[verifier]:
1647         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1648                                  " for remote verification" % verifier)
1649       if result[verifier]['nodelist']:
1650         for failed in result[verifier]['nodelist']:
1651           feedback_fn("ssh/hostname verification failed %s -> %s" %
1652                       (verifier, result[verifier]['nodelist'][failed]))
1653         raise errors.OpExecError("ssh/hostname verification failed.")
1654
1655     # Distribute updated /etc/hosts and known_hosts to all nodes,
1656     # including the node just added
1657     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1658     dist_nodes = self.cfg.GetNodeList()
1659     if not self.op.readd:
1660       dist_nodes.append(node)
1661     if myself.name in dist_nodes:
1662       dist_nodes.remove(myself.name)
1663
1664     logger.Debug("Copying hosts and known_hosts to all nodes")
1665     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1666       result = rpc.call_upload_file(dist_nodes, fname)
1667       for to_node in dist_nodes:
1668         if not result[to_node]:
1669           logger.Error("copy of file %s to node %s failed" %
1670                        (fname, to_node))
1671
1672     to_copy = self.sstore.GetFileList()
1673     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1674       to_copy.append(constants.VNC_PASSWORD_FILE)
1675     for fname in to_copy:
1676       result = rpc.call_upload_file([node], fname)
1677       if not result[node]:
1678         logger.Error("could not copy file %s to node %s" % (fname, node))
1679
1680     if not self.op.readd:
1681       logger.Info("adding node %s to cluster.conf" % node)
1682       self.cfg.AddNode(new_node)
1683       # Add the new node to the Ganeti Lock Manager
1684       self.context.glm.add(locking.LEVEL_NODE, node)
1685
1686
1687 class LUMasterFailover(LogicalUnit):
1688   """Failover the master node to the current node.
1689
1690   This is a special LU in that it must run on a non-master node.
1691
1692   """
1693   HPATH = "master-failover"
1694   HTYPE = constants.HTYPE_CLUSTER
1695   REQ_MASTER = False
1696   REQ_WSSTORE = True
1697   _OP_REQP = []
1698
1699   def BuildHooksEnv(self):
1700     """Build hooks env.
1701
1702     This will run on the new master only in the pre phase, and on all
1703     the nodes in the post phase.
1704
1705     """
1706     env = {
1707       "OP_TARGET": self.new_master,
1708       "NEW_MASTER": self.new_master,
1709       "OLD_MASTER": self.old_master,
1710       }
1711     return env, [self.new_master], self.cfg.GetNodeList()
1712
1713   def CheckPrereq(self):
1714     """Check prerequisites.
1715
1716     This checks that we are not already the master.
1717
1718     """
1719     self.new_master = utils.HostInfo().name
1720     self.old_master = self.sstore.GetMasterNode()
1721
1722     if self.old_master == self.new_master:
1723       raise errors.OpPrereqError("This commands must be run on the node"
1724                                  " where you want the new master to be."
1725                                  " %s is already the master" %
1726                                  self.old_master)
1727
1728   def Exec(self, feedback_fn):
1729     """Failover the master node.
1730
1731     This command, when run on a non-master node, will cause the current
1732     master to cease being master, and the non-master to become new
1733     master.
1734
1735     """
1736     #TODO: do not rely on gethostname returning the FQDN
1737     logger.Info("setting master to %s, old master: %s" %
1738                 (self.new_master, self.old_master))
1739
1740     if not rpc.call_node_stop_master(self.old_master):
1741       logger.Error("could disable the master role on the old master"
1742                    " %s, please disable manually" % self.old_master)
1743
1744     ss = self.sstore
1745     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1746     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1747                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1748       logger.Error("could not distribute the new simple store master file"
1749                    " to the other nodes, please check.")
1750
1751     if not rpc.call_node_start_master(self.new_master):
1752       logger.Error("could not start the master role on the new master"
1753                    " %s, please check" % self.new_master)
1754       feedback_fn("Error in activating the master IP on the new master,"
1755                   " please fix manually.")
1756
1757
1758
1759 class LUQueryClusterInfo(NoHooksLU):
1760   """Query cluster configuration.
1761
1762   """
1763   _OP_REQP = []
1764   REQ_MASTER = False
1765   REQ_BGL = False
1766
1767   def ExpandNames(self):
1768     self.needed_locks = {}
1769
1770   def CheckPrereq(self):
1771     """No prerequsites needed for this LU.
1772
1773     """
1774     pass
1775
1776   def Exec(self, feedback_fn):
1777     """Return cluster config.
1778
1779     """
1780     result = {
1781       "name": self.sstore.GetClusterName(),
1782       "software_version": constants.RELEASE_VERSION,
1783       "protocol_version": constants.PROTOCOL_VERSION,
1784       "config_version": constants.CONFIG_VERSION,
1785       "os_api_version": constants.OS_API_VERSION,
1786       "export_version": constants.EXPORT_VERSION,
1787       "master": self.sstore.GetMasterNode(),
1788       "architecture": (platform.architecture()[0], platform.machine()),
1789       "hypervisor_type": self.sstore.GetHypervisorType(),
1790       }
1791
1792     return result
1793
1794
1795 class LUDumpClusterConfig(NoHooksLU):
1796   """Return a text-representation of the cluster-config.
1797
1798   """
1799   _OP_REQP = []
1800   REQ_BGL = False
1801
1802   def ExpandNames(self):
1803     self.needed_locks = {}
1804
1805   def CheckPrereq(self):
1806     """No prerequisites.
1807
1808     """
1809     pass
1810
1811   def Exec(self, feedback_fn):
1812     """Dump a representation of the cluster config to the standard output.
1813
1814     """
1815     return self.cfg.DumpConfig()
1816
1817
1818 class LUActivateInstanceDisks(NoHooksLU):
1819   """Bring up an instance's disks.
1820
1821   """
1822   _OP_REQP = ["instance_name"]
1823
1824   def CheckPrereq(self):
1825     """Check prerequisites.
1826
1827     This checks that the instance is in the cluster.
1828
1829     """
1830     instance = self.cfg.GetInstanceInfo(
1831       self.cfg.ExpandInstanceName(self.op.instance_name))
1832     if instance is None:
1833       raise errors.OpPrereqError("Instance '%s' not known" %
1834                                  self.op.instance_name)
1835     self.instance = instance
1836
1837
1838   def Exec(self, feedback_fn):
1839     """Activate the disks.
1840
1841     """
1842     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1843     if not disks_ok:
1844       raise errors.OpExecError("Cannot activate block devices")
1845
1846     return disks_info
1847
1848
1849 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1850   """Prepare the block devices for an instance.
1851
1852   This sets up the block devices on all nodes.
1853
1854   Args:
1855     instance: a ganeti.objects.Instance object
1856     ignore_secondaries: if true, errors on secondary nodes won't result
1857                         in an error return from the function
1858
1859   Returns:
1860     false if the operation failed
1861     list of (host, instance_visible_name, node_visible_name) if the operation
1862          suceeded with the mapping from node devices to instance devices
1863   """
1864   device_info = []
1865   disks_ok = True
1866   iname = instance.name
1867   # With the two passes mechanism we try to reduce the window of
1868   # opportunity for the race condition of switching DRBD to primary
1869   # before handshaking occured, but we do not eliminate it
1870
1871   # The proper fix would be to wait (with some limits) until the
1872   # connection has been made and drbd transitions from WFConnection
1873   # into any other network-connected state (Connected, SyncTarget,
1874   # SyncSource, etc.)
1875
1876   # 1st pass, assemble on all nodes in secondary mode
1877   for inst_disk in instance.disks:
1878     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1879       cfg.SetDiskID(node_disk, node)
1880       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1881       if not result:
1882         logger.Error("could not prepare block device %s on node %s"
1883                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1884         if not ignore_secondaries:
1885           disks_ok = False
1886
1887   # FIXME: race condition on drbd migration to primary
1888
1889   # 2nd pass, do only the primary node
1890   for inst_disk in instance.disks:
1891     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1892       if node != instance.primary_node:
1893         continue
1894       cfg.SetDiskID(node_disk, node)
1895       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1896       if not result:
1897         logger.Error("could not prepare block device %s on node %s"
1898                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1899         disks_ok = False
1900     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1901
1902   # leave the disks configured for the primary node
1903   # this is a workaround that would be fixed better by
1904   # improving the logical/physical id handling
1905   for disk in instance.disks:
1906     cfg.SetDiskID(disk, instance.primary_node)
1907
1908   return disks_ok, device_info
1909
1910
1911 def _StartInstanceDisks(cfg, instance, force):
1912   """Start the disks of an instance.
1913
1914   """
1915   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1916                                            ignore_secondaries=force)
1917   if not disks_ok:
1918     _ShutdownInstanceDisks(instance, cfg)
1919     if force is not None and not force:
1920       logger.Error("If the message above refers to a secondary node,"
1921                    " you can retry the operation using '--force'.")
1922     raise errors.OpExecError("Disk consistency error")
1923
1924
1925 class LUDeactivateInstanceDisks(NoHooksLU):
1926   """Shutdown an instance's disks.
1927
1928   """
1929   _OP_REQP = ["instance_name"]
1930
1931   def CheckPrereq(self):
1932     """Check prerequisites.
1933
1934     This checks that the instance is in the cluster.
1935
1936     """
1937     instance = self.cfg.GetInstanceInfo(
1938       self.cfg.ExpandInstanceName(self.op.instance_name))
1939     if instance is None:
1940       raise errors.OpPrereqError("Instance '%s' not known" %
1941                                  self.op.instance_name)
1942     self.instance = instance
1943
1944   def Exec(self, feedback_fn):
1945     """Deactivate the disks
1946
1947     """
1948     instance = self.instance
1949     ins_l = rpc.call_instance_list([instance.primary_node])
1950     ins_l = ins_l[instance.primary_node]
1951     if not type(ins_l) is list:
1952       raise errors.OpExecError("Can't contact node '%s'" %
1953                                instance.primary_node)
1954
1955     if self.instance.name in ins_l:
1956       raise errors.OpExecError("Instance is running, can't shutdown"
1957                                " block devices.")
1958
1959     _ShutdownInstanceDisks(instance, self.cfg)
1960
1961
1962 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1963   """Shutdown block devices of an instance.
1964
1965   This does the shutdown on all nodes of the instance.
1966
1967   If the ignore_primary is false, errors on the primary node are
1968   ignored.
1969
1970   """
1971   result = True
1972   for disk in instance.disks:
1973     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1974       cfg.SetDiskID(top_disk, node)
1975       if not rpc.call_blockdev_shutdown(node, top_disk):
1976         logger.Error("could not shutdown block device %s on node %s" %
1977                      (disk.iv_name, node))
1978         if not ignore_primary or node != instance.primary_node:
1979           result = False
1980   return result
1981
1982
1983 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1984   """Checks if a node has enough free memory.
1985
1986   This function check if a given node has the needed amount of free
1987   memory. In case the node has less memory or we cannot get the
1988   information from the node, this function raise an OpPrereqError
1989   exception.
1990
1991   Args:
1992     - cfg: a ConfigWriter instance
1993     - node: the node name
1994     - reason: string to use in the error message
1995     - requested: the amount of memory in MiB
1996
1997   """
1998   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1999   if not nodeinfo or not isinstance(nodeinfo, dict):
2000     raise errors.OpPrereqError("Could not contact node %s for resource"
2001                              " information" % (node,))
2002
2003   free_mem = nodeinfo[node].get('memory_free')
2004   if not isinstance(free_mem, int):
2005     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2006                              " was '%s'" % (node, free_mem))
2007   if requested > free_mem:
2008     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2009                              " needed %s MiB, available %s MiB" %
2010                              (node, reason, requested, free_mem))
2011
2012
2013 class LUStartupInstance(LogicalUnit):
2014   """Starts an instance.
2015
2016   """
2017   HPATH = "instance-start"
2018   HTYPE = constants.HTYPE_INSTANCE
2019   _OP_REQP = ["instance_name", "force"]
2020
2021   def BuildHooksEnv(self):
2022     """Build hooks env.
2023
2024     This runs on master, primary and secondary nodes of the instance.
2025
2026     """
2027     env = {
2028       "FORCE": self.op.force,
2029       }
2030     env.update(_BuildInstanceHookEnvByObject(self.instance))
2031     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2032           list(self.instance.secondary_nodes))
2033     return env, nl, nl
2034
2035   def CheckPrereq(self):
2036     """Check prerequisites.
2037
2038     This checks that the instance is in the cluster.
2039
2040     """
2041     instance = self.cfg.GetInstanceInfo(
2042       self.cfg.ExpandInstanceName(self.op.instance_name))
2043     if instance is None:
2044       raise errors.OpPrereqError("Instance '%s' not known" %
2045                                  self.op.instance_name)
2046
2047     # check bridges existance
2048     _CheckInstanceBridgesExist(instance)
2049
2050     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2051                          "starting instance %s" % instance.name,
2052                          instance.memory)
2053
2054     self.instance = instance
2055     self.op.instance_name = instance.name
2056
2057   def Exec(self, feedback_fn):
2058     """Start the instance.
2059
2060     """
2061     instance = self.instance
2062     force = self.op.force
2063     extra_args = getattr(self.op, "extra_args", "")
2064
2065     self.cfg.MarkInstanceUp(instance.name)
2066
2067     node_current = instance.primary_node
2068
2069     _StartInstanceDisks(self.cfg, instance, force)
2070
2071     if not rpc.call_instance_start(node_current, instance, extra_args):
2072       _ShutdownInstanceDisks(instance, self.cfg)
2073       raise errors.OpExecError("Could not start instance")
2074
2075
2076 class LURebootInstance(LogicalUnit):
2077   """Reboot an instance.
2078
2079   """
2080   HPATH = "instance-reboot"
2081   HTYPE = constants.HTYPE_INSTANCE
2082   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2083
2084   def BuildHooksEnv(self):
2085     """Build hooks env.
2086
2087     This runs on master, primary and secondary nodes of the instance.
2088
2089     """
2090     env = {
2091       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2092       }
2093     env.update(_BuildInstanceHookEnvByObject(self.instance))
2094     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2095           list(self.instance.secondary_nodes))
2096     return env, nl, nl
2097
2098   def CheckPrereq(self):
2099     """Check prerequisites.
2100
2101     This checks that the instance is in the cluster.
2102
2103     """
2104     instance = self.cfg.GetInstanceInfo(
2105       self.cfg.ExpandInstanceName(self.op.instance_name))
2106     if instance is None:
2107       raise errors.OpPrereqError("Instance '%s' not known" %
2108                                  self.op.instance_name)
2109
2110     # check bridges existance
2111     _CheckInstanceBridgesExist(instance)
2112
2113     self.instance = instance
2114     self.op.instance_name = instance.name
2115
2116   def Exec(self, feedback_fn):
2117     """Reboot the instance.
2118
2119     """
2120     instance = self.instance
2121     ignore_secondaries = self.op.ignore_secondaries
2122     reboot_type = self.op.reboot_type
2123     extra_args = getattr(self.op, "extra_args", "")
2124
2125     node_current = instance.primary_node
2126
2127     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2128                            constants.INSTANCE_REBOOT_HARD,
2129                            constants.INSTANCE_REBOOT_FULL]:
2130       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2131                                   (constants.INSTANCE_REBOOT_SOFT,
2132                                    constants.INSTANCE_REBOOT_HARD,
2133                                    constants.INSTANCE_REBOOT_FULL))
2134
2135     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2136                        constants.INSTANCE_REBOOT_HARD]:
2137       if not rpc.call_instance_reboot(node_current, instance,
2138                                       reboot_type, extra_args):
2139         raise errors.OpExecError("Could not reboot instance")
2140     else:
2141       if not rpc.call_instance_shutdown(node_current, instance):
2142         raise errors.OpExecError("could not shutdown instance for full reboot")
2143       _ShutdownInstanceDisks(instance, self.cfg)
2144       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2145       if not rpc.call_instance_start(node_current, instance, extra_args):
2146         _ShutdownInstanceDisks(instance, self.cfg)
2147         raise errors.OpExecError("Could not start instance for full reboot")
2148
2149     self.cfg.MarkInstanceUp(instance.name)
2150
2151
2152 class LUShutdownInstance(LogicalUnit):
2153   """Shutdown an instance.
2154
2155   """
2156   HPATH = "instance-stop"
2157   HTYPE = constants.HTYPE_INSTANCE
2158   _OP_REQP = ["instance_name"]
2159
2160   def BuildHooksEnv(self):
2161     """Build hooks env.
2162
2163     This runs on master, primary and secondary nodes of the instance.
2164
2165     """
2166     env = _BuildInstanceHookEnvByObject(self.instance)
2167     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2168           list(self.instance.secondary_nodes))
2169     return env, nl, nl
2170
2171   def CheckPrereq(self):
2172     """Check prerequisites.
2173
2174     This checks that the instance is in the cluster.
2175
2176     """
2177     instance = self.cfg.GetInstanceInfo(
2178       self.cfg.ExpandInstanceName(self.op.instance_name))
2179     if instance is None:
2180       raise errors.OpPrereqError("Instance '%s' not known" %
2181                                  self.op.instance_name)
2182     self.instance = instance
2183
2184   def Exec(self, feedback_fn):
2185     """Shutdown the instance.
2186
2187     """
2188     instance = self.instance
2189     node_current = instance.primary_node
2190     self.cfg.MarkInstanceDown(instance.name)
2191     if not rpc.call_instance_shutdown(node_current, instance):
2192       logger.Error("could not shutdown instance")
2193
2194     _ShutdownInstanceDisks(instance, self.cfg)
2195
2196
2197 class LUReinstallInstance(LogicalUnit):
2198   """Reinstall an instance.
2199
2200   """
2201   HPATH = "instance-reinstall"
2202   HTYPE = constants.HTYPE_INSTANCE
2203   _OP_REQP = ["instance_name"]
2204
2205   def BuildHooksEnv(self):
2206     """Build hooks env.
2207
2208     This runs on master, primary and secondary nodes of the instance.
2209
2210     """
2211     env = _BuildInstanceHookEnvByObject(self.instance)
2212     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2213           list(self.instance.secondary_nodes))
2214     return env, nl, nl
2215
2216   def CheckPrereq(self):
2217     """Check prerequisites.
2218
2219     This checks that the instance is in the cluster and is not running.
2220
2221     """
2222     instance = self.cfg.GetInstanceInfo(
2223       self.cfg.ExpandInstanceName(self.op.instance_name))
2224     if instance is None:
2225       raise errors.OpPrereqError("Instance '%s' not known" %
2226                                  self.op.instance_name)
2227     if instance.disk_template == constants.DT_DISKLESS:
2228       raise errors.OpPrereqError("Instance '%s' has no disks" %
2229                                  self.op.instance_name)
2230     if instance.status != "down":
2231       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2232                                  self.op.instance_name)
2233     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2234     if remote_info:
2235       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2236                                  (self.op.instance_name,
2237                                   instance.primary_node))
2238
2239     self.op.os_type = getattr(self.op, "os_type", None)
2240     if self.op.os_type is not None:
2241       # OS verification
2242       pnode = self.cfg.GetNodeInfo(
2243         self.cfg.ExpandNodeName(instance.primary_node))
2244       if pnode is None:
2245         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2246                                    self.op.pnode)
2247       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2248       if not os_obj:
2249         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2250                                    " primary node"  % self.op.os_type)
2251
2252     self.instance = instance
2253
2254   def Exec(self, feedback_fn):
2255     """Reinstall the instance.
2256
2257     """
2258     inst = self.instance
2259
2260     if self.op.os_type is not None:
2261       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2262       inst.os = self.op.os_type
2263       self.cfg.AddInstance(inst)
2264
2265     _StartInstanceDisks(self.cfg, inst, None)
2266     try:
2267       feedback_fn("Running the instance OS create scripts...")
2268       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2269         raise errors.OpExecError("Could not install OS for instance %s"
2270                                  " on node %s" %
2271                                  (inst.name, inst.primary_node))
2272     finally:
2273       _ShutdownInstanceDisks(inst, self.cfg)
2274
2275
2276 class LURenameInstance(LogicalUnit):
2277   """Rename an instance.
2278
2279   """
2280   HPATH = "instance-rename"
2281   HTYPE = constants.HTYPE_INSTANCE
2282   _OP_REQP = ["instance_name", "new_name"]
2283
2284   def BuildHooksEnv(self):
2285     """Build hooks env.
2286
2287     This runs on master, primary and secondary nodes of the instance.
2288
2289     """
2290     env = _BuildInstanceHookEnvByObject(self.instance)
2291     env["INSTANCE_NEW_NAME"] = self.op.new_name
2292     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2293           list(self.instance.secondary_nodes))
2294     return env, nl, nl
2295
2296   def CheckPrereq(self):
2297     """Check prerequisites.
2298
2299     This checks that the instance is in the cluster and is not running.
2300
2301     """
2302     instance = self.cfg.GetInstanceInfo(
2303       self.cfg.ExpandInstanceName(self.op.instance_name))
2304     if instance is None:
2305       raise errors.OpPrereqError("Instance '%s' not known" %
2306                                  self.op.instance_name)
2307     if instance.status != "down":
2308       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2309                                  self.op.instance_name)
2310     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2311     if remote_info:
2312       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2313                                  (self.op.instance_name,
2314                                   instance.primary_node))
2315     self.instance = instance
2316
2317     # new name verification
2318     name_info = utils.HostInfo(self.op.new_name)
2319
2320     self.op.new_name = new_name = name_info.name
2321     instance_list = self.cfg.GetInstanceList()
2322     if new_name in instance_list:
2323       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2324                                  new_name)
2325
2326     if not getattr(self.op, "ignore_ip", False):
2327       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2328         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2329                                    (name_info.ip, new_name))
2330
2331
2332   def Exec(self, feedback_fn):
2333     """Reinstall the instance.
2334
2335     """
2336     inst = self.instance
2337     old_name = inst.name
2338
2339     if inst.disk_template == constants.DT_FILE:
2340       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2341
2342     self.cfg.RenameInstance(inst.name, self.op.new_name)
2343
2344     # re-read the instance from the configuration after rename
2345     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2346
2347     if inst.disk_template == constants.DT_FILE:
2348       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2349       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2350                                                 old_file_storage_dir,
2351                                                 new_file_storage_dir)
2352
2353       if not result:
2354         raise errors.OpExecError("Could not connect to node '%s' to rename"
2355                                  " directory '%s' to '%s' (but the instance"
2356                                  " has been renamed in Ganeti)" % (
2357                                  inst.primary_node, old_file_storage_dir,
2358                                  new_file_storage_dir))
2359
2360       if not result[0]:
2361         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2362                                  " (but the instance has been renamed in"
2363                                  " Ganeti)" % (old_file_storage_dir,
2364                                                new_file_storage_dir))
2365
2366     _StartInstanceDisks(self.cfg, inst, None)
2367     try:
2368       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2369                                           "sda", "sdb"):
2370         msg = ("Could run OS rename script for instance %s on node %s (but the"
2371                " instance has been renamed in Ganeti)" %
2372                (inst.name, inst.primary_node))
2373         logger.Error(msg)
2374     finally:
2375       _ShutdownInstanceDisks(inst, self.cfg)
2376
2377
2378 class LURemoveInstance(LogicalUnit):
2379   """Remove an instance.
2380
2381   """
2382   HPATH = "instance-remove"
2383   HTYPE = constants.HTYPE_INSTANCE
2384   _OP_REQP = ["instance_name", "ignore_failures"]
2385
2386   def BuildHooksEnv(self):
2387     """Build hooks env.
2388
2389     This runs on master, primary and secondary nodes of the instance.
2390
2391     """
2392     env = _BuildInstanceHookEnvByObject(self.instance)
2393     nl = [self.sstore.GetMasterNode()]
2394     return env, nl, nl
2395
2396   def CheckPrereq(self):
2397     """Check prerequisites.
2398
2399     This checks that the instance is in the cluster.
2400
2401     """
2402     instance = self.cfg.GetInstanceInfo(
2403       self.cfg.ExpandInstanceName(self.op.instance_name))
2404     if instance is None:
2405       raise errors.OpPrereqError("Instance '%s' not known" %
2406                                  self.op.instance_name)
2407     self.instance = instance
2408
2409   def Exec(self, feedback_fn):
2410     """Remove the instance.
2411
2412     """
2413     instance = self.instance
2414     logger.Info("shutting down instance %s on node %s" %
2415                 (instance.name, instance.primary_node))
2416
2417     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2418       if self.op.ignore_failures:
2419         feedback_fn("Warning: can't shutdown instance")
2420       else:
2421         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2422                                  (instance.name, instance.primary_node))
2423
2424     logger.Info("removing block devices for instance %s" % instance.name)
2425
2426     if not _RemoveDisks(instance, self.cfg):
2427       if self.op.ignore_failures:
2428         feedback_fn("Warning: can't remove instance's disks")
2429       else:
2430         raise errors.OpExecError("Can't remove instance's disks")
2431
2432     logger.Info("removing instance %s out of cluster config" % instance.name)
2433
2434     self.cfg.RemoveInstance(instance.name)
2435     # Remove the new instance from the Ganeti Lock Manager
2436     self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2437
2438
2439 class LUQueryInstances(NoHooksLU):
2440   """Logical unit for querying instances.
2441
2442   """
2443   _OP_REQP = ["output_fields", "names"]
2444
2445   def CheckPrereq(self):
2446     """Check prerequisites.
2447
2448     This checks that the fields required are valid output fields.
2449
2450     """
2451     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2452     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2453                                "admin_state", "admin_ram",
2454                                "disk_template", "ip", "mac", "bridge",
2455                                "sda_size", "sdb_size", "vcpus", "tags"],
2456                        dynamic=self.dynamic_fields,
2457                        selected=self.op.output_fields)
2458
2459     self.wanted = _GetWantedInstances(self, self.op.names)
2460
2461   def Exec(self, feedback_fn):
2462     """Computes the list of nodes and their attributes.
2463
2464     """
2465     instance_names = self.wanted
2466     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2467                      in instance_names]
2468
2469     # begin data gathering
2470
2471     nodes = frozenset([inst.primary_node for inst in instance_list])
2472
2473     bad_nodes = []
2474     if self.dynamic_fields.intersection(self.op.output_fields):
2475       live_data = {}
2476       node_data = rpc.call_all_instances_info(nodes)
2477       for name in nodes:
2478         result = node_data[name]
2479         if result:
2480           live_data.update(result)
2481         elif result == False:
2482           bad_nodes.append(name)
2483         # else no instance is alive
2484     else:
2485       live_data = dict([(name, {}) for name in instance_names])
2486
2487     # end data gathering
2488
2489     output = []
2490     for instance in instance_list:
2491       iout = []
2492       for field in self.op.output_fields:
2493         if field == "name":
2494           val = instance.name
2495         elif field == "os":
2496           val = instance.os
2497         elif field == "pnode":
2498           val = instance.primary_node
2499         elif field == "snodes":
2500           val = list(instance.secondary_nodes)
2501         elif field == "admin_state":
2502           val = (instance.status != "down")
2503         elif field == "oper_state":
2504           if instance.primary_node in bad_nodes:
2505             val = None
2506           else:
2507             val = bool(live_data.get(instance.name))
2508         elif field == "status":
2509           if instance.primary_node in bad_nodes:
2510             val = "ERROR_nodedown"
2511           else:
2512             running = bool(live_data.get(instance.name))
2513             if running:
2514               if instance.status != "down":
2515                 val = "running"
2516               else:
2517                 val = "ERROR_up"
2518             else:
2519               if instance.status != "down":
2520                 val = "ERROR_down"
2521               else:
2522                 val = "ADMIN_down"
2523         elif field == "admin_ram":
2524           val = instance.memory
2525         elif field == "oper_ram":
2526           if instance.primary_node in bad_nodes:
2527             val = None
2528           elif instance.name in live_data:
2529             val = live_data[instance.name].get("memory", "?")
2530           else:
2531             val = "-"
2532         elif field == "disk_template":
2533           val = instance.disk_template
2534         elif field == "ip":
2535           val = instance.nics[0].ip
2536         elif field == "bridge":
2537           val = instance.nics[0].bridge
2538         elif field == "mac":
2539           val = instance.nics[0].mac
2540         elif field == "sda_size" or field == "sdb_size":
2541           disk = instance.FindDisk(field[:3])
2542           if disk is None:
2543             val = None
2544           else:
2545             val = disk.size
2546         elif field == "vcpus":
2547           val = instance.vcpus
2548         elif field == "tags":
2549           val = list(instance.GetTags())
2550         else:
2551           raise errors.ParameterError(field)
2552         iout.append(val)
2553       output.append(iout)
2554
2555     return output
2556
2557
2558 class LUFailoverInstance(LogicalUnit):
2559   """Failover an instance.
2560
2561   """
2562   HPATH = "instance-failover"
2563   HTYPE = constants.HTYPE_INSTANCE
2564   _OP_REQP = ["instance_name", "ignore_consistency"]
2565
2566   def BuildHooksEnv(self):
2567     """Build hooks env.
2568
2569     This runs on master, primary and secondary nodes of the instance.
2570
2571     """
2572     env = {
2573       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2574       }
2575     env.update(_BuildInstanceHookEnvByObject(self.instance))
2576     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2577     return env, nl, nl
2578
2579   def CheckPrereq(self):
2580     """Check prerequisites.
2581
2582     This checks that the instance is in the cluster.
2583
2584     """
2585     instance = self.cfg.GetInstanceInfo(
2586       self.cfg.ExpandInstanceName(self.op.instance_name))
2587     if instance is None:
2588       raise errors.OpPrereqError("Instance '%s' not known" %
2589                                  self.op.instance_name)
2590
2591     if instance.disk_template not in constants.DTS_NET_MIRROR:
2592       raise errors.OpPrereqError("Instance's disk layout is not"
2593                                  " network mirrored, cannot failover.")
2594
2595     secondary_nodes = instance.secondary_nodes
2596     if not secondary_nodes:
2597       raise errors.ProgrammerError("no secondary node but using "
2598                                    "a mirrored disk template")
2599
2600     target_node = secondary_nodes[0]
2601     # check memory requirements on the secondary node
2602     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2603                          instance.name, instance.memory)
2604
2605     # check bridge existance
2606     brlist = [nic.bridge for nic in instance.nics]
2607     if not rpc.call_bridges_exist(target_node, brlist):
2608       raise errors.OpPrereqError("One or more target bridges %s does not"
2609                                  " exist on destination node '%s'" %
2610                                  (brlist, target_node))
2611
2612     self.instance = instance
2613
2614   def Exec(self, feedback_fn):
2615     """Failover an instance.
2616
2617     The failover is done by shutting it down on its present node and
2618     starting it on the secondary.
2619
2620     """
2621     instance = self.instance
2622
2623     source_node = instance.primary_node
2624     target_node = instance.secondary_nodes[0]
2625
2626     feedback_fn("* checking disk consistency between source and target")
2627     for dev in instance.disks:
2628       # for drbd, these are drbd over lvm
2629       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2630         if instance.status == "up" and not self.op.ignore_consistency:
2631           raise errors.OpExecError("Disk %s is degraded on target node,"
2632                                    " aborting failover." % dev.iv_name)
2633
2634     feedback_fn("* shutting down instance on source node")
2635     logger.Info("Shutting down instance %s on node %s" %
2636                 (instance.name, source_node))
2637
2638     if not rpc.call_instance_shutdown(source_node, instance):
2639       if self.op.ignore_consistency:
2640         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2641                      " anyway. Please make sure node %s is down"  %
2642                      (instance.name, source_node, source_node))
2643       else:
2644         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2645                                  (instance.name, source_node))
2646
2647     feedback_fn("* deactivating the instance's disks on source node")
2648     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2649       raise errors.OpExecError("Can't shut down the instance's disks.")
2650
2651     instance.primary_node = target_node
2652     # distribute new instance config to the other nodes
2653     self.cfg.Update(instance)
2654
2655     # Only start the instance if it's marked as up
2656     if instance.status == "up":
2657       feedback_fn("* activating the instance's disks on target node")
2658       logger.Info("Starting instance %s on node %s" %
2659                   (instance.name, target_node))
2660
2661       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2662                                                ignore_secondaries=True)
2663       if not disks_ok:
2664         _ShutdownInstanceDisks(instance, self.cfg)
2665         raise errors.OpExecError("Can't activate the instance's disks")
2666
2667       feedback_fn("* starting the instance on the target node")
2668       if not rpc.call_instance_start(target_node, instance, None):
2669         _ShutdownInstanceDisks(instance, self.cfg)
2670         raise errors.OpExecError("Could not start instance %s on node %s." %
2671                                  (instance.name, target_node))
2672
2673
2674 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2675   """Create a tree of block devices on the primary node.
2676
2677   This always creates all devices.
2678
2679   """
2680   if device.children:
2681     for child in device.children:
2682       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2683         return False
2684
2685   cfg.SetDiskID(device, node)
2686   new_id = rpc.call_blockdev_create(node, device, device.size,
2687                                     instance.name, True, info)
2688   if not new_id:
2689     return False
2690   if device.physical_id is None:
2691     device.physical_id = new_id
2692   return True
2693
2694
2695 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2696   """Create a tree of block devices on a secondary node.
2697
2698   If this device type has to be created on secondaries, create it and
2699   all its children.
2700
2701   If not, just recurse to children keeping the same 'force' value.
2702
2703   """
2704   if device.CreateOnSecondary():
2705     force = True
2706   if device.children:
2707     for child in device.children:
2708       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2709                                         child, force, info):
2710         return False
2711
2712   if not force:
2713     return True
2714   cfg.SetDiskID(device, node)
2715   new_id = rpc.call_blockdev_create(node, device, device.size,
2716                                     instance.name, False, info)
2717   if not new_id:
2718     return False
2719   if device.physical_id is None:
2720     device.physical_id = new_id
2721   return True
2722
2723
2724 def _GenerateUniqueNames(cfg, exts):
2725   """Generate a suitable LV name.
2726
2727   This will generate a logical volume name for the given instance.
2728
2729   """
2730   results = []
2731   for val in exts:
2732     new_id = cfg.GenerateUniqueID()
2733     results.append("%s%s" % (new_id, val))
2734   return results
2735
2736
2737 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2738   """Generate a drbd8 device complete with its children.
2739
2740   """
2741   port = cfg.AllocatePort()
2742   vgname = cfg.GetVGName()
2743   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2744                           logical_id=(vgname, names[0]))
2745   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2746                           logical_id=(vgname, names[1]))
2747   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2748                           logical_id = (primary, secondary, port),
2749                           children = [dev_data, dev_meta],
2750                           iv_name=iv_name)
2751   return drbd_dev
2752
2753
2754 def _GenerateDiskTemplate(cfg, template_name,
2755                           instance_name, primary_node,
2756                           secondary_nodes, disk_sz, swap_sz,
2757                           file_storage_dir, file_driver):
2758   """Generate the entire disk layout for a given template type.
2759
2760   """
2761   #TODO: compute space requirements
2762
2763   vgname = cfg.GetVGName()
2764   if template_name == constants.DT_DISKLESS:
2765     disks = []
2766   elif template_name == constants.DT_PLAIN:
2767     if len(secondary_nodes) != 0:
2768       raise errors.ProgrammerError("Wrong template configuration")
2769
2770     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2771     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2772                            logical_id=(vgname, names[0]),
2773                            iv_name = "sda")
2774     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2775                            logical_id=(vgname, names[1]),
2776                            iv_name = "sdb")
2777     disks = [sda_dev, sdb_dev]
2778   elif template_name == constants.DT_DRBD8:
2779     if len(secondary_nodes) != 1:
2780       raise errors.ProgrammerError("Wrong template configuration")
2781     remote_node = secondary_nodes[0]
2782     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2783                                        ".sdb_data", ".sdb_meta"])
2784     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2785                                          disk_sz, names[0:2], "sda")
2786     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2787                                          swap_sz, names[2:4], "sdb")
2788     disks = [drbd_sda_dev, drbd_sdb_dev]
2789   elif template_name == constants.DT_FILE:
2790     if len(secondary_nodes) != 0:
2791       raise errors.ProgrammerError("Wrong template configuration")
2792
2793     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2794                                 iv_name="sda", logical_id=(file_driver,
2795                                 "%s/sda" % file_storage_dir))
2796     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2797                                 iv_name="sdb", logical_id=(file_driver,
2798                                 "%s/sdb" % file_storage_dir))
2799     disks = [file_sda_dev, file_sdb_dev]
2800   else:
2801     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2802   return disks
2803
2804
2805 def _GetInstanceInfoText(instance):
2806   """Compute that text that should be added to the disk's metadata.
2807
2808   """
2809   return "originstname+%s" % instance.name
2810
2811
2812 def _CreateDisks(cfg, instance):
2813   """Create all disks for an instance.
2814
2815   This abstracts away some work from AddInstance.
2816
2817   Args:
2818     instance: the instance object
2819
2820   Returns:
2821     True or False showing the success of the creation process
2822
2823   """
2824   info = _GetInstanceInfoText(instance)
2825
2826   if instance.disk_template == constants.DT_FILE:
2827     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2828     result = rpc.call_file_storage_dir_create(instance.primary_node,
2829                                               file_storage_dir)
2830
2831     if not result:
2832       logger.Error("Could not connect to node '%s'" % instance.primary_node)
2833       return False
2834
2835     if not result[0]:
2836       logger.Error("failed to create directory '%s'" % file_storage_dir)
2837       return False
2838
2839   for device in instance.disks:
2840     logger.Info("creating volume %s for instance %s" %
2841                 (device.iv_name, instance.name))
2842     #HARDCODE
2843     for secondary_node in instance.secondary_nodes:
2844       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2845                                         device, False, info):
2846         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2847                      (device.iv_name, device, secondary_node))
2848         return False
2849     #HARDCODE
2850     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2851                                     instance, device, info):
2852       logger.Error("failed to create volume %s on primary!" %
2853                    device.iv_name)
2854       return False
2855
2856   return True
2857
2858
2859 def _RemoveDisks(instance, cfg):
2860   """Remove all disks for an instance.
2861
2862   This abstracts away some work from `AddInstance()` and
2863   `RemoveInstance()`. Note that in case some of the devices couldn't
2864   be removed, the removal will continue with the other ones (compare
2865   with `_CreateDisks()`).
2866
2867   Args:
2868     instance: the instance object
2869
2870   Returns:
2871     True or False showing the success of the removal proces
2872
2873   """
2874   logger.Info("removing block devices for instance %s" % instance.name)
2875
2876   result = True
2877   for device in instance.disks:
2878     for node, disk in device.ComputeNodeTree(instance.primary_node):
2879       cfg.SetDiskID(disk, node)
2880       if not rpc.call_blockdev_remove(node, disk):
2881         logger.Error("could not remove block device %s on node %s,"
2882                      " continuing anyway" %
2883                      (device.iv_name, node))
2884         result = False
2885
2886   if instance.disk_template == constants.DT_FILE:
2887     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2888     if not rpc.call_file_storage_dir_remove(instance.primary_node,
2889                                             file_storage_dir):
2890       logger.Error("could not remove directory '%s'" % file_storage_dir)
2891       result = False
2892
2893   return result
2894
2895
2896 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2897   """Compute disk size requirements in the volume group
2898
2899   This is currently hard-coded for the two-drive layout.
2900
2901   """
2902   # Required free disk space as a function of disk and swap space
2903   req_size_dict = {
2904     constants.DT_DISKLESS: None,
2905     constants.DT_PLAIN: disk_size + swap_size,
2906     # 256 MB are added for drbd metadata, 128MB for each drbd device
2907     constants.DT_DRBD8: disk_size + swap_size + 256,
2908     constants.DT_FILE: None,
2909   }
2910
2911   if disk_template not in req_size_dict:
2912     raise errors.ProgrammerError("Disk template '%s' size requirement"
2913                                  " is unknown" %  disk_template)
2914
2915   return req_size_dict[disk_template]
2916
2917
2918 class LUCreateInstance(LogicalUnit):
2919   """Create an instance.
2920
2921   """
2922   HPATH = "instance-add"
2923   HTYPE = constants.HTYPE_INSTANCE
2924   _OP_REQP = ["instance_name", "mem_size", "disk_size",
2925               "disk_template", "swap_size", "mode", "start", "vcpus",
2926               "wait_for_sync", "ip_check", "mac"]
2927
2928   def _RunAllocator(self):
2929     """Run the allocator based on input opcode.
2930
2931     """
2932     disks = [{"size": self.op.disk_size, "mode": "w"},
2933              {"size": self.op.swap_size, "mode": "w"}]
2934     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2935              "bridge": self.op.bridge}]
2936     ial = IAllocator(self.cfg, self.sstore,
2937                      mode=constants.IALLOCATOR_MODE_ALLOC,
2938                      name=self.op.instance_name,
2939                      disk_template=self.op.disk_template,
2940                      tags=[],
2941                      os=self.op.os_type,
2942                      vcpus=self.op.vcpus,
2943                      mem_size=self.op.mem_size,
2944                      disks=disks,
2945                      nics=nics,
2946                      )
2947
2948     ial.Run(self.op.iallocator)
2949
2950     if not ial.success:
2951       raise errors.OpPrereqError("Can't compute nodes using"
2952                                  " iallocator '%s': %s" % (self.op.iallocator,
2953                                                            ial.info))
2954     if len(ial.nodes) != ial.required_nodes:
2955       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2956                                  " of nodes (%s), required %s" %
2957                                  (len(ial.nodes), ial.required_nodes))
2958     self.op.pnode = ial.nodes[0]
2959     logger.ToStdout("Selected nodes for the instance: %s" %
2960                     (", ".join(ial.nodes),))
2961     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2962                 (self.op.instance_name, self.op.iallocator, ial.nodes))
2963     if ial.required_nodes == 2:
2964       self.op.snode = ial.nodes[1]
2965
2966   def BuildHooksEnv(self):
2967     """Build hooks env.
2968
2969     This runs on master, primary and secondary nodes of the instance.
2970
2971     """
2972     env = {
2973       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2974       "INSTANCE_DISK_SIZE": self.op.disk_size,
2975       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2976       "INSTANCE_ADD_MODE": self.op.mode,
2977       }
2978     if self.op.mode == constants.INSTANCE_IMPORT:
2979       env["INSTANCE_SRC_NODE"] = self.op.src_node
2980       env["INSTANCE_SRC_PATH"] = self.op.src_path
2981       env["INSTANCE_SRC_IMAGE"] = self.src_image
2982
2983     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2984       primary_node=self.op.pnode,
2985       secondary_nodes=self.secondaries,
2986       status=self.instance_status,
2987       os_type=self.op.os_type,
2988       memory=self.op.mem_size,
2989       vcpus=self.op.vcpus,
2990       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2991     ))
2992
2993     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2994           self.secondaries)
2995     return env, nl, nl
2996
2997
2998   def CheckPrereq(self):
2999     """Check prerequisites.
3000
3001     """
3002     # set optional parameters to none if they don't exist
3003     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3004                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3005                  "vnc_bind_address"]:
3006       if not hasattr(self.op, attr):
3007         setattr(self.op, attr, None)
3008
3009     if self.op.mode not in (constants.INSTANCE_CREATE,
3010                             constants.INSTANCE_IMPORT):
3011       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3012                                  self.op.mode)
3013
3014     if (not self.cfg.GetVGName() and
3015         self.op.disk_template not in constants.DTS_NOT_LVM):
3016       raise errors.OpPrereqError("Cluster does not support lvm-based"
3017                                  " instances")
3018
3019     if self.op.mode == constants.INSTANCE_IMPORT:
3020       src_node = getattr(self.op, "src_node", None)
3021       src_path = getattr(self.op, "src_path", None)
3022       if src_node is None or src_path is None:
3023         raise errors.OpPrereqError("Importing an instance requires source"
3024                                    " node and path options")
3025       src_node_full = self.cfg.ExpandNodeName(src_node)
3026       if src_node_full is None:
3027         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3028       self.op.src_node = src_node = src_node_full
3029
3030       if not os.path.isabs(src_path):
3031         raise errors.OpPrereqError("The source path must be absolute")
3032
3033       export_info = rpc.call_export_info(src_node, src_path)
3034
3035       if not export_info:
3036         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3037
3038       if not export_info.has_section(constants.INISECT_EXP):
3039         raise errors.ProgrammerError("Corrupted export config")
3040
3041       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3042       if (int(ei_version) != constants.EXPORT_VERSION):
3043         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3044                                    (ei_version, constants.EXPORT_VERSION))
3045
3046       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3047         raise errors.OpPrereqError("Can't import instance with more than"
3048                                    " one data disk")
3049
3050       # FIXME: are the old os-es, disk sizes, etc. useful?
3051       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3052       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3053                                                          'disk0_dump'))
3054       self.src_image = diskimage
3055     else: # INSTANCE_CREATE
3056       if getattr(self.op, "os_type", None) is None:
3057         raise errors.OpPrereqError("No guest OS specified")
3058
3059     #### instance parameters check
3060
3061     # disk template and mirror node verification
3062     if self.op.disk_template not in constants.DISK_TEMPLATES:
3063       raise errors.OpPrereqError("Invalid disk template name")
3064
3065     # instance name verification
3066     hostname1 = utils.HostInfo(self.op.instance_name)
3067
3068     self.op.instance_name = instance_name = hostname1.name
3069     instance_list = self.cfg.GetInstanceList()
3070     if instance_name in instance_list:
3071       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3072                                  instance_name)
3073
3074     # ip validity checks
3075     ip = getattr(self.op, "ip", None)
3076     if ip is None or ip.lower() == "none":
3077       inst_ip = None
3078     elif ip.lower() == "auto":
3079       inst_ip = hostname1.ip
3080     else:
3081       if not utils.IsValidIP(ip):
3082         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3083                                    " like a valid IP" % ip)
3084       inst_ip = ip
3085     self.inst_ip = self.op.ip = inst_ip
3086
3087     if self.op.start and not self.op.ip_check:
3088       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3089                                  " adding an instance in start mode")
3090
3091     if self.op.ip_check:
3092       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3093         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3094                                    (hostname1.ip, instance_name))
3095
3096     # MAC address verification
3097     if self.op.mac != "auto":
3098       if not utils.IsValidMac(self.op.mac.lower()):
3099         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3100                                    self.op.mac)
3101
3102     # bridge verification
3103     bridge = getattr(self.op, "bridge", None)
3104     if bridge is None:
3105       self.op.bridge = self.cfg.GetDefBridge()
3106     else:
3107       self.op.bridge = bridge
3108
3109     # boot order verification
3110     if self.op.hvm_boot_order is not None:
3111       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3112         raise errors.OpPrereqError("invalid boot order specified,"
3113                                    " must be one or more of [acdn]")
3114     # file storage checks
3115     if (self.op.file_driver and
3116         not self.op.file_driver in constants.FILE_DRIVER):
3117       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3118                                  self.op.file_driver)
3119
3120     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3121       raise errors.OpPrereqError("File storage directory not a relative"
3122                                  " path")
3123     #### allocator run
3124
3125     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3126       raise errors.OpPrereqError("One and only one of iallocator and primary"
3127                                  " node must be given")
3128
3129     if self.op.iallocator is not None:
3130       self._RunAllocator()
3131
3132     #### node related checks
3133
3134     # check primary node
3135     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3136     if pnode is None:
3137       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3138                                  self.op.pnode)
3139     self.op.pnode = pnode.name
3140     self.pnode = pnode
3141     self.secondaries = []
3142
3143     # mirror node verification
3144     if self.op.disk_template in constants.DTS_NET_MIRROR:
3145       if getattr(self.op, "snode", None) is None:
3146         raise errors.OpPrereqError("The networked disk templates need"
3147                                    " a mirror node")
3148
3149       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3150       if snode_name is None:
3151         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3152                                    self.op.snode)
3153       elif snode_name == pnode.name:
3154         raise errors.OpPrereqError("The secondary node cannot be"
3155                                    " the primary node.")
3156       self.secondaries.append(snode_name)
3157
3158     req_size = _ComputeDiskSize(self.op.disk_template,
3159                                 self.op.disk_size, self.op.swap_size)
3160
3161     # Check lv size requirements
3162     if req_size is not None:
3163       nodenames = [pnode.name] + self.secondaries
3164       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3165       for node in nodenames:
3166         info = nodeinfo.get(node, None)
3167         if not info:
3168           raise errors.OpPrereqError("Cannot get current information"
3169                                      " from node '%s'" % node)
3170         vg_free = info.get('vg_free', None)
3171         if not isinstance(vg_free, int):
3172           raise errors.OpPrereqError("Can't compute free disk space on"
3173                                      " node %s" % node)
3174         if req_size > info['vg_free']:
3175           raise errors.OpPrereqError("Not enough disk space on target node %s."
3176                                      " %d MB available, %d MB required" %
3177                                      (node, info['vg_free'], req_size))
3178
3179     # os verification
3180     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3181     if not os_obj:
3182       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3183                                  " primary node"  % self.op.os_type)
3184
3185     if self.op.kernel_path == constants.VALUE_NONE:
3186       raise errors.OpPrereqError("Can't set instance kernel to none")
3187
3188
3189     # bridge check on primary node
3190     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3191       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3192                                  " destination node '%s'" %
3193                                  (self.op.bridge, pnode.name))
3194
3195     # memory check on primary node
3196     if self.op.start:
3197       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3198                            "creating instance %s" % self.op.instance_name,
3199                            self.op.mem_size)
3200
3201     # hvm_cdrom_image_path verification
3202     if self.op.hvm_cdrom_image_path is not None:
3203       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3204         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3205                                    " be an absolute path or None, not %s" %
3206                                    self.op.hvm_cdrom_image_path)
3207       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3208         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3209                                    " regular file or a symlink pointing to"
3210                                    " an existing regular file, not %s" %
3211                                    self.op.hvm_cdrom_image_path)
3212
3213     # vnc_bind_address verification
3214     if self.op.vnc_bind_address is not None:
3215       if not utils.IsValidIP(self.op.vnc_bind_address):
3216         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3217                                    " like a valid IP address" %
3218                                    self.op.vnc_bind_address)
3219
3220     if self.op.start:
3221       self.instance_status = 'up'
3222     else:
3223       self.instance_status = 'down'
3224
3225   def Exec(self, feedback_fn):
3226     """Create and add the instance to the cluster.
3227
3228     """
3229     instance = self.op.instance_name
3230     pnode_name = self.pnode.name
3231
3232     if self.op.mac == "auto":
3233       mac_address = self.cfg.GenerateMAC()
3234     else:
3235       mac_address = self.op.mac
3236
3237     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3238     if self.inst_ip is not None:
3239       nic.ip = self.inst_ip
3240
3241     ht_kind = self.sstore.GetHypervisorType()
3242     if ht_kind in constants.HTS_REQ_PORT:
3243       network_port = self.cfg.AllocatePort()
3244     else:
3245       network_port = None
3246
3247     if self.op.vnc_bind_address is None:
3248       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3249
3250     # this is needed because os.path.join does not accept None arguments
3251     if self.op.file_storage_dir is None:
3252       string_file_storage_dir = ""
3253     else:
3254       string_file_storage_dir = self.op.file_storage_dir
3255
3256     # build the full file storage dir path
3257     file_storage_dir = os.path.normpath(os.path.join(
3258                                         self.sstore.GetFileStorageDir(),
3259                                         string_file_storage_dir, instance))
3260
3261
3262     disks = _GenerateDiskTemplate(self.cfg,
3263                                   self.op.disk_template,
3264                                   instance, pnode_name,
3265                                   self.secondaries, self.op.disk_size,
3266                                   self.op.swap_size,
3267                                   file_storage_dir,
3268                                   self.op.file_driver)
3269
3270     iobj = objects.Instance(name=instance, os=self.op.os_type,
3271                             primary_node=pnode_name,
3272                             memory=self.op.mem_size,
3273                             vcpus=self.op.vcpus,
3274                             nics=[nic], disks=disks,
3275                             disk_template=self.op.disk_template,
3276                             status=self.instance_status,
3277                             network_port=network_port,
3278                             kernel_path=self.op.kernel_path,
3279                             initrd_path=self.op.initrd_path,
3280                             hvm_boot_order=self.op.hvm_boot_order,
3281                             hvm_acpi=self.op.hvm_acpi,
3282                             hvm_pae=self.op.hvm_pae,
3283                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3284                             vnc_bind_address=self.op.vnc_bind_address,
3285                             )
3286
3287     feedback_fn("* creating instance disks...")
3288     if not _CreateDisks(self.cfg, iobj):
3289       _RemoveDisks(iobj, self.cfg)
3290       raise errors.OpExecError("Device creation failed, reverting...")
3291
3292     feedback_fn("adding instance %s to cluster config" % instance)
3293
3294     self.cfg.AddInstance(iobj)
3295     # Add the new instance to the Ganeti Lock Manager
3296     self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3297
3298     if self.op.wait_for_sync:
3299       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3300     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3301       # make sure the disks are not degraded (still sync-ing is ok)
3302       time.sleep(15)
3303       feedback_fn("* checking mirrors status")
3304       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3305     else:
3306       disk_abort = False
3307
3308     if disk_abort:
3309       _RemoveDisks(iobj, self.cfg)
3310       self.cfg.RemoveInstance(iobj.name)
3311       # Remove the new instance from the Ganeti Lock Manager
3312       self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3313       raise errors.OpExecError("There are some degraded disks for"
3314                                " this instance")
3315
3316     feedback_fn("creating os for instance %s on node %s" %
3317                 (instance, pnode_name))
3318
3319     if iobj.disk_template != constants.DT_DISKLESS:
3320       if self.op.mode == constants.INSTANCE_CREATE:
3321         feedback_fn("* running the instance OS create scripts...")
3322         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3323           raise errors.OpExecError("could not add os for instance %s"
3324                                    " on node %s" %
3325                                    (instance, pnode_name))
3326
3327       elif self.op.mode == constants.INSTANCE_IMPORT:
3328         feedback_fn("* running the instance OS import scripts...")
3329         src_node = self.op.src_node
3330         src_image = self.src_image
3331         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3332                                                 src_node, src_image):
3333           raise errors.OpExecError("Could not import os for instance"
3334                                    " %s on node %s" %
3335                                    (instance, pnode_name))
3336       else:
3337         # also checked in the prereq part
3338         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3339                                      % self.op.mode)
3340
3341     if self.op.start:
3342       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3343       feedback_fn("* starting instance...")
3344       if not rpc.call_instance_start(pnode_name, iobj, None):
3345         raise errors.OpExecError("Could not start instance")
3346
3347
3348 class LUConnectConsole(NoHooksLU):
3349   """Connect to an instance's console.
3350
3351   This is somewhat special in that it returns the command line that
3352   you need to run on the master node in order to connect to the
3353   console.
3354
3355   """
3356   _OP_REQP = ["instance_name"]
3357   REQ_BGL = False
3358
3359   def ExpandNames(self):
3360     self._ExpandAndLockInstance()
3361
3362   def CheckPrereq(self):
3363     """Check prerequisites.
3364
3365     This checks that the instance is in the cluster.
3366
3367     """
3368     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3369     assert self.instance is not None, \
3370       "Cannot retrieve locked instance %s" % self.op.instance_name
3371
3372   def Exec(self, feedback_fn):
3373     """Connect to the console of an instance
3374
3375     """
3376     instance = self.instance
3377     node = instance.primary_node
3378
3379     node_insts = rpc.call_instance_list([node])[node]
3380     if node_insts is False:
3381       raise errors.OpExecError("Can't connect to node %s." % node)
3382
3383     if instance.name not in node_insts:
3384       raise errors.OpExecError("Instance %s is not running." % instance.name)
3385
3386     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3387
3388     hyper = hypervisor.GetHypervisor()
3389     console_cmd = hyper.GetShellCommandForConsole(instance)
3390
3391     # build ssh cmdline
3392     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3393
3394
3395 class LUReplaceDisks(LogicalUnit):
3396   """Replace the disks of an instance.
3397
3398   """
3399   HPATH = "mirrors-replace"
3400   HTYPE = constants.HTYPE_INSTANCE
3401   _OP_REQP = ["instance_name", "mode", "disks"]
3402
3403   def _RunAllocator(self):
3404     """Compute a new secondary node using an IAllocator.
3405
3406     """
3407     ial = IAllocator(self.cfg, self.sstore,
3408                      mode=constants.IALLOCATOR_MODE_RELOC,
3409                      name=self.op.instance_name,
3410                      relocate_from=[self.sec_node])
3411
3412     ial.Run(self.op.iallocator)
3413
3414     if not ial.success:
3415       raise errors.OpPrereqError("Can't compute nodes using"
3416                                  " iallocator '%s': %s" % (self.op.iallocator,
3417                                                            ial.info))
3418     if len(ial.nodes) != ial.required_nodes:
3419       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3420                                  " of nodes (%s), required %s" %
3421                                  (len(ial.nodes), ial.required_nodes))
3422     self.op.remote_node = ial.nodes[0]
3423     logger.ToStdout("Selected new secondary for the instance: %s" %
3424                     self.op.remote_node)
3425
3426   def BuildHooksEnv(self):
3427     """Build hooks env.
3428
3429     This runs on the master, the primary and all the secondaries.
3430
3431     """
3432     env = {
3433       "MODE": self.op.mode,
3434       "NEW_SECONDARY": self.op.remote_node,
3435       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3436       }
3437     env.update(_BuildInstanceHookEnvByObject(self.instance))
3438     nl = [
3439       self.sstore.GetMasterNode(),
3440       self.instance.primary_node,
3441       ]
3442     if self.op.remote_node is not None:
3443       nl.append(self.op.remote_node)
3444     return env, nl, nl
3445
3446   def CheckPrereq(self):
3447     """Check prerequisites.
3448
3449     This checks that the instance is in the cluster.
3450
3451     """
3452     if not hasattr(self.op, "remote_node"):
3453       self.op.remote_node = None
3454
3455     instance = self.cfg.GetInstanceInfo(
3456       self.cfg.ExpandInstanceName(self.op.instance_name))
3457     if instance is None:
3458       raise errors.OpPrereqError("Instance '%s' not known" %
3459                                  self.op.instance_name)
3460     self.instance = instance
3461     self.op.instance_name = instance.name
3462
3463     if instance.disk_template not in constants.DTS_NET_MIRROR:
3464       raise errors.OpPrereqError("Instance's disk layout is not"
3465                                  " network mirrored.")
3466
3467     if len(instance.secondary_nodes) != 1:
3468       raise errors.OpPrereqError("The instance has a strange layout,"
3469                                  " expected one secondary but found %d" %
3470                                  len(instance.secondary_nodes))
3471
3472     self.sec_node = instance.secondary_nodes[0]
3473
3474     ia_name = getattr(self.op, "iallocator", None)
3475     if ia_name is not None:
3476       if self.op.remote_node is not None:
3477         raise errors.OpPrereqError("Give either the iallocator or the new"
3478                                    " secondary, not both")
3479       self.op.remote_node = self._RunAllocator()
3480
3481     remote_node = self.op.remote_node
3482     if remote_node is not None:
3483       remote_node = self.cfg.ExpandNodeName(remote_node)
3484       if remote_node is None:
3485         raise errors.OpPrereqError("Node '%s' not known" %
3486                                    self.op.remote_node)
3487       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3488     else:
3489       self.remote_node_info = None
3490     if remote_node == instance.primary_node:
3491       raise errors.OpPrereqError("The specified node is the primary node of"
3492                                  " the instance.")
3493     elif remote_node == self.sec_node:
3494       if self.op.mode == constants.REPLACE_DISK_SEC:
3495         # this is for DRBD8, where we can't execute the same mode of
3496         # replacement as for drbd7 (no different port allocated)
3497         raise errors.OpPrereqError("Same secondary given, cannot execute"
3498                                    " replacement")
3499     if instance.disk_template == constants.DT_DRBD8:
3500       if (self.op.mode == constants.REPLACE_DISK_ALL and
3501           remote_node is not None):
3502         # switch to replace secondary mode
3503         self.op.mode = constants.REPLACE_DISK_SEC
3504
3505       if self.op.mode == constants.REPLACE_DISK_ALL:
3506         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3507                                    " secondary disk replacement, not"
3508                                    " both at once")
3509       elif self.op.mode == constants.REPLACE_DISK_PRI:
3510         if remote_node is not None:
3511           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3512                                      " the secondary while doing a primary"
3513                                      " node disk replacement")
3514         self.tgt_node = instance.primary_node
3515         self.oth_node = instance.secondary_nodes[0]
3516       elif self.op.mode == constants.REPLACE_DISK_SEC:
3517         self.new_node = remote_node # this can be None, in which case
3518                                     # we don't change the secondary
3519         self.tgt_node = instance.secondary_nodes[0]
3520         self.oth_node = instance.primary_node
3521       else:
3522         raise errors.ProgrammerError("Unhandled disk replace mode")
3523
3524     for name in self.op.disks:
3525       if instance.FindDisk(name) is None:
3526         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3527                                    (name, instance.name))
3528     self.op.remote_node = remote_node
3529
3530   def _ExecD8DiskOnly(self, feedback_fn):
3531     """Replace a disk on the primary or secondary for dbrd8.
3532
3533     The algorithm for replace is quite complicated:
3534       - for each disk to be replaced:
3535         - create new LVs on the target node with unique names
3536         - detach old LVs from the drbd device
3537         - rename old LVs to name_replaced.<time_t>
3538         - rename new LVs to old LVs
3539         - attach the new LVs (with the old names now) to the drbd device
3540       - wait for sync across all devices
3541       - for each modified disk:
3542         - remove old LVs (which have the name name_replaces.<time_t>)
3543
3544     Failures are not very well handled.
3545
3546     """
3547     steps_total = 6
3548     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3549     instance = self.instance
3550     iv_names = {}
3551     vgname = self.cfg.GetVGName()
3552     # start of work
3553     cfg = self.cfg
3554     tgt_node = self.tgt_node
3555     oth_node = self.oth_node
3556
3557     # Step: check device activation
3558     self.proc.LogStep(1, steps_total, "check device existence")
3559     info("checking volume groups")
3560     my_vg = cfg.GetVGName()
3561     results = rpc.call_vg_list([oth_node, tgt_node])
3562     if not results:
3563       raise errors.OpExecError("Can't list volume groups on the nodes")
3564     for node in oth_node, tgt_node:
3565       res = results.get(node, False)
3566       if not res or my_vg not in res:
3567         raise errors.OpExecError("Volume group '%s' not found on %s" %
3568                                  (my_vg, node))
3569     for dev in instance.disks:
3570       if not dev.iv_name in self.op.disks:
3571         continue
3572       for node in tgt_node, oth_node:
3573         info("checking %s on %s" % (dev.iv_name, node))
3574         cfg.SetDiskID(dev, node)
3575         if not rpc.call_blockdev_find(node, dev):
3576           raise errors.OpExecError("Can't find device %s on node %s" %
3577                                    (dev.iv_name, node))
3578
3579     # Step: check other node consistency
3580     self.proc.LogStep(2, steps_total, "check peer consistency")
3581     for dev in instance.disks:
3582       if not dev.iv_name in self.op.disks:
3583         continue
3584       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3585       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3586                                    oth_node==instance.primary_node):
3587         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3588                                  " to replace disks on this node (%s)" %
3589                                  (oth_node, tgt_node))
3590
3591     # Step: create new storage
3592     self.proc.LogStep(3, steps_total, "allocate new storage")
3593     for dev in instance.disks:
3594       if not dev.iv_name in self.op.disks:
3595         continue
3596       size = dev.size
3597       cfg.SetDiskID(dev, tgt_node)
3598       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3599       names = _GenerateUniqueNames(cfg, lv_names)
3600       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3601                              logical_id=(vgname, names[0]))
3602       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3603                              logical_id=(vgname, names[1]))
3604       new_lvs = [lv_data, lv_meta]
3605       old_lvs = dev.children
3606       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3607       info("creating new local storage on %s for %s" %
3608            (tgt_node, dev.iv_name))
3609       # since we *always* want to create this LV, we use the
3610       # _Create...OnPrimary (which forces the creation), even if we
3611       # are talking about the secondary node
3612       for new_lv in new_lvs:
3613         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3614                                         _GetInstanceInfoText(instance)):
3615           raise errors.OpExecError("Failed to create new LV named '%s' on"
3616                                    " node '%s'" %
3617                                    (new_lv.logical_id[1], tgt_node))
3618
3619     # Step: for each lv, detach+rename*2+attach
3620     self.proc.LogStep(4, steps_total, "change drbd configuration")
3621     for dev, old_lvs, new_lvs in iv_names.itervalues():
3622       info("detaching %s drbd from local storage" % dev.iv_name)
3623       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3624         raise errors.OpExecError("Can't detach drbd from local storage on node"
3625                                  " %s for device %s" % (tgt_node, dev.iv_name))
3626       #dev.children = []
3627       #cfg.Update(instance)
3628
3629       # ok, we created the new LVs, so now we know we have the needed
3630       # storage; as such, we proceed on the target node to rename
3631       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3632       # using the assumption that logical_id == physical_id (which in
3633       # turn is the unique_id on that node)
3634
3635       # FIXME(iustin): use a better name for the replaced LVs
3636       temp_suffix = int(time.time())
3637       ren_fn = lambda d, suff: (d.physical_id[0],
3638                                 d.physical_id[1] + "_replaced-%s" % suff)
3639       # build the rename list based on what LVs exist on the node
3640       rlist = []
3641       for to_ren in old_lvs:
3642         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3643         if find_res is not None: # device exists
3644           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3645
3646       info("renaming the old LVs on the target node")
3647       if not rpc.call_blockdev_rename(tgt_node, rlist):
3648         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3649       # now we rename the new LVs to the old LVs
3650       info("renaming the new LVs on the target node")
3651       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3652       if not rpc.call_blockdev_rename(tgt_node, rlist):
3653         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3654
3655       for old, new in zip(old_lvs, new_lvs):
3656         new.logical_id = old.logical_id
3657         cfg.SetDiskID(new, tgt_node)
3658
3659       for disk in old_lvs:
3660         disk.logical_id = ren_fn(disk, temp_suffix)
3661         cfg.SetDiskID(disk, tgt_node)
3662
3663       # now that the new lvs have the old name, we can add them to the device
3664       info("adding new mirror component on %s" % tgt_node)
3665       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3666         for new_lv in new_lvs:
3667           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3668             warning("Can't rollback device %s", hint="manually cleanup unused"
3669                     " logical volumes")
3670         raise errors.OpExecError("Can't add local storage to drbd")
3671
3672       dev.children = new_lvs
3673       cfg.Update(instance)
3674
3675     # Step: wait for sync
3676
3677     # this can fail as the old devices are degraded and _WaitForSync
3678     # does a combined result over all disks, so we don't check its
3679     # return value
3680     self.proc.LogStep(5, steps_total, "sync devices")
3681     _WaitForSync(cfg, instance, self.proc, unlock=True)
3682
3683     # so check manually all the devices
3684     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3685       cfg.SetDiskID(dev, instance.primary_node)
3686       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3687       if is_degr:
3688         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3689
3690     # Step: remove old storage
3691     self.proc.LogStep(6, steps_total, "removing old storage")
3692     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3693       info("remove logical volumes for %s" % name)
3694       for lv in old_lvs:
3695         cfg.SetDiskID(lv, tgt_node)
3696         if not rpc.call_blockdev_remove(tgt_node, lv):
3697           warning("Can't remove old LV", hint="manually remove unused LVs")
3698           continue
3699
3700   def _ExecD8Secondary(self, feedback_fn):
3701     """Replace the secondary node for drbd8.
3702
3703     The algorithm for replace is quite complicated:
3704       - for all disks of the instance:
3705         - create new LVs on the new node with same names
3706         - shutdown the drbd device on the old secondary
3707         - disconnect the drbd network on the primary
3708         - create the drbd device on the new secondary
3709         - network attach the drbd on the primary, using an artifice:
3710           the drbd code for Attach() will connect to the network if it
3711           finds a device which is connected to the good local disks but
3712           not network enabled
3713       - wait for sync across all devices
3714       - remove all disks from the old secondary
3715
3716     Failures are not very well handled.
3717
3718     """
3719     steps_total = 6
3720     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3721     instance = self.instance
3722     iv_names = {}
3723     vgname = self.cfg.GetVGName()
3724     # start of work
3725     cfg = self.cfg
3726     old_node = self.tgt_node
3727     new_node = self.new_node
3728     pri_node = instance.primary_node
3729
3730     # Step: check device activation
3731     self.proc.LogStep(1, steps_total, "check device existence")
3732     info("checking volume groups")
3733     my_vg = cfg.GetVGName()
3734     results = rpc.call_vg_list([pri_node, new_node])
3735     if not results:
3736       raise errors.OpExecError("Can't list volume groups on the nodes")
3737     for node in pri_node, new_node:
3738       res = results.get(node, False)
3739       if not res or my_vg not in res:
3740         raise errors.OpExecError("Volume group '%s' not found on %s" %
3741                                  (my_vg, node))
3742     for dev in instance.disks:
3743       if not dev.iv_name in self.op.disks:
3744         continue
3745       info("checking %s on %s" % (dev.iv_name, pri_node))
3746       cfg.SetDiskID(dev, pri_node)
3747       if not rpc.call_blockdev_find(pri_node, dev):
3748         raise errors.OpExecError("Can't find device %s on node %s" %
3749                                  (dev.iv_name, pri_node))
3750
3751     # Step: check other node consistency
3752     self.proc.LogStep(2, steps_total, "check peer consistency")
3753     for dev in instance.disks:
3754       if not dev.iv_name in self.op.disks:
3755         continue
3756       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3757       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3758         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3759                                  " unsafe to replace the secondary" %
3760                                  pri_node)
3761
3762     # Step: create new storage
3763     self.proc.LogStep(3, steps_total, "allocate new storage")
3764     for dev in instance.disks:
3765       size = dev.size
3766       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3767       # since we *always* want to create this LV, we use the
3768       # _Create...OnPrimary (which forces the creation), even if we
3769       # are talking about the secondary node
3770       for new_lv in dev.children:
3771         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3772                                         _GetInstanceInfoText(instance)):
3773           raise errors.OpExecError("Failed to create new LV named '%s' on"
3774                                    " node '%s'" %
3775                                    (new_lv.logical_id[1], new_node))
3776
3777       iv_names[dev.iv_name] = (dev, dev.children)
3778
3779     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3780     for dev in instance.disks:
3781       size = dev.size
3782       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3783       # create new devices on new_node
3784       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3785                               logical_id=(pri_node, new_node,
3786                                           dev.logical_id[2]),
3787                               children=dev.children)
3788       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3789                                         new_drbd, False,
3790                                       _GetInstanceInfoText(instance)):
3791         raise errors.OpExecError("Failed to create new DRBD on"
3792                                  " node '%s'" % new_node)
3793
3794     for dev in instance.disks:
3795       # we have new devices, shutdown the drbd on the old secondary
3796       info("shutting down drbd for %s on old node" % dev.iv_name)
3797       cfg.SetDiskID(dev, old_node)
3798       if not rpc.call_blockdev_shutdown(old_node, dev):
3799         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3800                 hint="Please cleanup this device manually as soon as possible")
3801
3802     info("detaching primary drbds from the network (=> standalone)")
3803     done = 0
3804     for dev in instance.disks:
3805       cfg.SetDiskID(dev, pri_node)
3806       # set the physical (unique in bdev terms) id to None, meaning
3807       # detach from network
3808       dev.physical_id = (None,) * len(dev.physical_id)
3809       # and 'find' the device, which will 'fix' it to match the
3810       # standalone state
3811       if rpc.call_blockdev_find(pri_node, dev):
3812         done += 1
3813       else:
3814         warning("Failed to detach drbd %s from network, unusual case" %
3815                 dev.iv_name)
3816
3817     if not done:
3818       # no detaches succeeded (very unlikely)
3819       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3820
3821     # if we managed to detach at least one, we update all the disks of
3822     # the instance to point to the new secondary
3823     info("updating instance configuration")
3824     for dev in instance.disks:
3825       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3826       cfg.SetDiskID(dev, pri_node)
3827     cfg.Update(instance)
3828
3829     # and now perform the drbd attach
3830     info("attaching primary drbds to new secondary (standalone => connected)")
3831     failures = []
3832     for dev in instance.disks:
3833       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3834       # since the attach is smart, it's enough to 'find' the device,
3835       # it will automatically activate the network, if the physical_id
3836       # is correct
3837       cfg.SetDiskID(dev, pri_node)
3838       if not rpc.call_blockdev_find(pri_node, dev):
3839         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3840                 "please do a gnt-instance info to see the status of disks")
3841
3842     # this can fail as the old devices are degraded and _WaitForSync
3843     # does a combined result over all disks, so we don't check its
3844     # return value
3845     self.proc.LogStep(5, steps_total, "sync devices")
3846     _WaitForSync(cfg, instance, self.proc, unlock=True)
3847
3848     # so check manually all the devices
3849     for name, (dev, old_lvs) in iv_names.iteritems():
3850       cfg.SetDiskID(dev, pri_node)
3851       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3852       if is_degr:
3853         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3854
3855     self.proc.LogStep(6, steps_total, "removing old storage")
3856     for name, (dev, old_lvs) in iv_names.iteritems():
3857       info("remove logical volumes for %s" % name)
3858       for lv in old_lvs:
3859         cfg.SetDiskID(lv, old_node)
3860         if not rpc.call_blockdev_remove(old_node, lv):
3861           warning("Can't remove LV on old secondary",
3862                   hint="Cleanup stale volumes by hand")
3863
3864   def Exec(self, feedback_fn):
3865     """Execute disk replacement.
3866
3867     This dispatches the disk replacement to the appropriate handler.
3868
3869     """
3870     instance = self.instance
3871
3872     # Activate the instance disks if we're replacing them on a down instance
3873     if instance.status == "down":
3874       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3875       self.proc.ChainOpCode(op)
3876
3877     if instance.disk_template == constants.DT_DRBD8:
3878       if self.op.remote_node is None:
3879         fn = self._ExecD8DiskOnly
3880       else:
3881         fn = self._ExecD8Secondary
3882     else:
3883       raise errors.ProgrammerError("Unhandled disk replacement case")
3884
3885     ret = fn(feedback_fn)
3886
3887     # Deactivate the instance disks if we're replacing them on a down instance
3888     if instance.status == "down":
3889       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3890       self.proc.ChainOpCode(op)
3891
3892     return ret
3893
3894
3895 class LUGrowDisk(LogicalUnit):
3896   """Grow a disk of an instance.
3897
3898   """
3899   HPATH = "disk-grow"
3900   HTYPE = constants.HTYPE_INSTANCE
3901   _OP_REQP = ["instance_name", "disk", "amount"]
3902
3903   def BuildHooksEnv(self):
3904     """Build hooks env.
3905
3906     This runs on the master, the primary and all the secondaries.
3907
3908     """
3909     env = {
3910       "DISK": self.op.disk,
3911       "AMOUNT": self.op.amount,
3912       }
3913     env.update(_BuildInstanceHookEnvByObject(self.instance))
3914     nl = [
3915       self.sstore.GetMasterNode(),
3916       self.instance.primary_node,
3917       ]
3918     return env, nl, nl
3919
3920   def CheckPrereq(self):
3921     """Check prerequisites.
3922
3923     This checks that the instance is in the cluster.
3924
3925     """
3926     instance = self.cfg.GetInstanceInfo(
3927       self.cfg.ExpandInstanceName(self.op.instance_name))
3928     if instance is None:
3929       raise errors.OpPrereqError("Instance '%s' not known" %
3930                                  self.op.instance_name)
3931     self.instance = instance
3932     self.op.instance_name = instance.name
3933
3934     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3935       raise errors.OpPrereqError("Instance's disk layout does not support"
3936                                  " growing.")
3937
3938     if instance.FindDisk(self.op.disk) is None:
3939       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3940                                  (self.op.disk, instance.name))
3941
3942     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3943     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3944     for node in nodenames:
3945       info = nodeinfo.get(node, None)
3946       if not info:
3947         raise errors.OpPrereqError("Cannot get current information"
3948                                    " from node '%s'" % node)
3949       vg_free = info.get('vg_free', None)
3950       if not isinstance(vg_free, int):
3951         raise errors.OpPrereqError("Can't compute free disk space on"
3952                                    " node %s" % node)
3953       if self.op.amount > info['vg_free']:
3954         raise errors.OpPrereqError("Not enough disk space on target node %s:"
3955                                    " %d MiB available, %d MiB required" %
3956                                    (node, info['vg_free'], self.op.amount))
3957
3958   def Exec(self, feedback_fn):
3959     """Execute disk grow.
3960
3961     """
3962     instance = self.instance
3963     disk = instance.FindDisk(self.op.disk)
3964     for node in (instance.secondary_nodes + (instance.primary_node,)):
3965       self.cfg.SetDiskID(disk, node)
3966       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3967       if not result or not isinstance(result, tuple) or len(result) != 2:
3968         raise errors.OpExecError("grow request failed to node %s" % node)
3969       elif not result[0]:
3970         raise errors.OpExecError("grow request failed to node %s: %s" %
3971                                  (node, result[1]))
3972     disk.RecordGrow(self.op.amount)
3973     self.cfg.Update(instance)
3974     return
3975
3976
3977 class LUQueryInstanceData(NoHooksLU):
3978   """Query runtime instance data.
3979
3980   """
3981   _OP_REQP = ["instances"]
3982
3983   def CheckPrereq(self):
3984     """Check prerequisites.
3985
3986     This only checks the optional instance list against the existing names.
3987
3988     """
3989     if not isinstance(self.op.instances, list):
3990       raise errors.OpPrereqError("Invalid argument type 'instances'")
3991     if self.op.instances:
3992       self.wanted_instances = []
3993       names = self.op.instances
3994       for name in names:
3995         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3996         if instance is None:
3997           raise errors.OpPrereqError("No such instance name '%s'" % name)
3998         self.wanted_instances.append(instance)
3999     else:
4000       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4001                                in self.cfg.GetInstanceList()]
4002     return
4003
4004
4005   def _ComputeDiskStatus(self, instance, snode, dev):
4006     """Compute block device status.
4007
4008     """
4009     self.cfg.SetDiskID(dev, instance.primary_node)
4010     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4011     if dev.dev_type in constants.LDS_DRBD:
4012       # we change the snode then (otherwise we use the one passed in)
4013       if dev.logical_id[0] == instance.primary_node:
4014         snode = dev.logical_id[1]
4015       else:
4016         snode = dev.logical_id[0]
4017
4018     if snode:
4019       self.cfg.SetDiskID(dev, snode)
4020       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4021     else:
4022       dev_sstatus = None
4023
4024     if dev.children:
4025       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4026                       for child in dev.children]
4027     else:
4028       dev_children = []
4029
4030     data = {
4031       "iv_name": dev.iv_name,
4032       "dev_type": dev.dev_type,
4033       "logical_id": dev.logical_id,
4034       "physical_id": dev.physical_id,
4035       "pstatus": dev_pstatus,
4036       "sstatus": dev_sstatus,
4037       "children": dev_children,
4038       }
4039
4040     return data
4041
4042   def Exec(self, feedback_fn):
4043     """Gather and return data"""
4044     result = {}
4045     for instance in self.wanted_instances:
4046       remote_info = rpc.call_instance_info(instance.primary_node,
4047                                                 instance.name)
4048       if remote_info and "state" in remote_info:
4049         remote_state = "up"
4050       else:
4051         remote_state = "down"
4052       if instance.status == "down":
4053         config_state = "down"
4054       else:
4055         config_state = "up"
4056
4057       disks = [self._ComputeDiskStatus(instance, None, device)
4058                for device in instance.disks]
4059
4060       idict = {
4061         "name": instance.name,
4062         "config_state": config_state,
4063         "run_state": remote_state,
4064         "pnode": instance.primary_node,
4065         "snodes": instance.secondary_nodes,
4066         "os": instance.os,
4067         "memory": instance.memory,
4068         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4069         "disks": disks,
4070         "vcpus": instance.vcpus,
4071         }
4072
4073       htkind = self.sstore.GetHypervisorType()
4074       if htkind == constants.HT_XEN_PVM30:
4075         idict["kernel_path"] = instance.kernel_path
4076         idict["initrd_path"] = instance.initrd_path
4077
4078       if htkind == constants.HT_XEN_HVM31:
4079         idict["hvm_boot_order"] = instance.hvm_boot_order
4080         idict["hvm_acpi"] = instance.hvm_acpi
4081         idict["hvm_pae"] = instance.hvm_pae
4082         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4083
4084       if htkind in constants.HTS_REQ_PORT:
4085         idict["vnc_bind_address"] = instance.vnc_bind_address
4086         idict["network_port"] = instance.network_port
4087
4088       result[instance.name] = idict
4089
4090     return result
4091
4092
4093 class LUSetInstanceParams(LogicalUnit):
4094   """Modifies an instances's parameters.
4095
4096   """
4097   HPATH = "instance-modify"
4098   HTYPE = constants.HTYPE_INSTANCE
4099   _OP_REQP = ["instance_name"]
4100
4101   def BuildHooksEnv(self):
4102     """Build hooks env.
4103
4104     This runs on the master, primary and secondaries.
4105
4106     """
4107     args = dict()
4108     if self.mem:
4109       args['memory'] = self.mem
4110     if self.vcpus:
4111       args['vcpus'] = self.vcpus
4112     if self.do_ip or self.do_bridge or self.mac:
4113       if self.do_ip:
4114         ip = self.ip
4115       else:
4116         ip = self.instance.nics[0].ip
4117       if self.bridge:
4118         bridge = self.bridge
4119       else:
4120         bridge = self.instance.nics[0].bridge
4121       if self.mac:
4122         mac = self.mac
4123       else:
4124         mac = self.instance.nics[0].mac
4125       args['nics'] = [(ip, bridge, mac)]
4126     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4127     nl = [self.sstore.GetMasterNode(),
4128           self.instance.primary_node] + list(self.instance.secondary_nodes)
4129     return env, nl, nl
4130
4131   def CheckPrereq(self):
4132     """Check prerequisites.
4133
4134     This only checks the instance list against the existing names.
4135
4136     """
4137     self.mem = getattr(self.op, "mem", None)
4138     self.vcpus = getattr(self.op, "vcpus", None)
4139     self.ip = getattr(self.op, "ip", None)
4140     self.mac = getattr(self.op, "mac", None)
4141     self.bridge = getattr(self.op, "bridge", None)
4142     self.kernel_path = getattr(self.op, "kernel_path", None)
4143     self.initrd_path = getattr(self.op, "initrd_path", None)
4144     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4145     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4146     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4147     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4148     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4149     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4150                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4151                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4152                  self.vnc_bind_address]
4153     if all_parms.count(None) == len(all_parms):
4154       raise errors.OpPrereqError("No changes submitted")
4155     if self.mem is not None:
4156       try:
4157         self.mem = int(self.mem)
4158       except ValueError, err:
4159         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4160     if self.vcpus is not None:
4161       try:
4162         self.vcpus = int(self.vcpus)
4163       except ValueError, err:
4164         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4165     if self.ip is not None:
4166       self.do_ip = True
4167       if self.ip.lower() == "none":
4168         self.ip = None
4169       else:
4170         if not utils.IsValidIP(self.ip):
4171           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4172     else:
4173       self.do_ip = False
4174     self.do_bridge = (self.bridge is not None)
4175     if self.mac is not None:
4176       if self.cfg.IsMacInUse(self.mac):
4177         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4178                                    self.mac)
4179       if not utils.IsValidMac(self.mac):
4180         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4181
4182     if self.kernel_path is not None:
4183       self.do_kernel_path = True
4184       if self.kernel_path == constants.VALUE_NONE:
4185         raise errors.OpPrereqError("Can't set instance to no kernel")
4186
4187       if self.kernel_path != constants.VALUE_DEFAULT:
4188         if not os.path.isabs(self.kernel_path):
4189           raise errors.OpPrereqError("The kernel path must be an absolute"
4190                                     " filename")
4191     else:
4192       self.do_kernel_path = False
4193
4194     if self.initrd_path is not None:
4195       self.do_initrd_path = True
4196       if self.initrd_path not in (constants.VALUE_NONE,
4197                                   constants.VALUE_DEFAULT):
4198         if not os.path.isabs(self.initrd_path):
4199           raise errors.OpPrereqError("The initrd path must be an absolute"
4200                                     " filename")
4201     else:
4202       self.do_initrd_path = False
4203
4204     # boot order verification
4205     if self.hvm_boot_order is not None:
4206       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4207         if len(self.hvm_boot_order.strip("acdn")) != 0:
4208           raise errors.OpPrereqError("invalid boot order specified,"
4209                                      " must be one or more of [acdn]"
4210                                      " or 'default'")
4211
4212     # hvm_cdrom_image_path verification
4213     if self.op.hvm_cdrom_image_path is not None:
4214       if not os.path.isabs(self.op.hvm_cdrom_image_path):
4215         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4216                                    " be an absolute path or None, not %s" %
4217                                    self.op.hvm_cdrom_image_path)
4218       if not os.path.isfile(self.op.hvm_cdrom_image_path):
4219         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4220                                    " regular file or a symlink pointing to"
4221                                    " an existing regular file, not %s" %
4222                                    self.op.hvm_cdrom_image_path)
4223
4224     # vnc_bind_address verification
4225     if self.op.vnc_bind_address is not None:
4226       if not utils.IsValidIP(self.op.vnc_bind_address):
4227         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4228                                    " like a valid IP address" %
4229                                    self.op.vnc_bind_address)
4230
4231     instance = self.cfg.GetInstanceInfo(
4232       self.cfg.ExpandInstanceName(self.op.instance_name))
4233     if instance is None:
4234       raise errors.OpPrereqError("No such instance name '%s'" %
4235                                  self.op.instance_name)
4236     self.op.instance_name = instance.name
4237     self.instance = instance
4238     return
4239
4240   def Exec(self, feedback_fn):
4241     """Modifies an instance.
4242
4243     All parameters take effect only at the next restart of the instance.
4244     """
4245     result = []
4246     instance = self.instance
4247     if self.mem:
4248       instance.memory = self.mem
4249       result.append(("mem", self.mem))
4250     if self.vcpus:
4251       instance.vcpus = self.vcpus
4252       result.append(("vcpus",  self.vcpus))
4253     if self.do_ip:
4254       instance.nics[0].ip = self.ip
4255       result.append(("ip", self.ip))
4256     if self.bridge:
4257       instance.nics[0].bridge = self.bridge
4258       result.append(("bridge", self.bridge))
4259     if self.mac:
4260       instance.nics[0].mac = self.mac
4261       result.append(("mac", self.mac))
4262     if self.do_kernel_path:
4263       instance.kernel_path = self.kernel_path
4264       result.append(("kernel_path", self.kernel_path))
4265     if self.do_initrd_path:
4266       instance.initrd_path = self.initrd_path
4267       result.append(("initrd_path", self.initrd_path))
4268     if self.hvm_boot_order:
4269       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4270         instance.hvm_boot_order = None
4271       else:
4272         instance.hvm_boot_order = self.hvm_boot_order
4273       result.append(("hvm_boot_order", self.hvm_boot_order))
4274     if self.hvm_acpi:
4275       instance.hvm_acpi = self.hvm_acpi
4276       result.append(("hvm_acpi", self.hvm_acpi))
4277     if self.hvm_pae:
4278       instance.hvm_pae = self.hvm_pae
4279       result.append(("hvm_pae", self.hvm_pae))
4280     if self.hvm_cdrom_image_path:
4281       instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4282       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4283     if self.vnc_bind_address:
4284       instance.vnc_bind_address = self.vnc_bind_address
4285       result.append(("vnc_bind_address", self.vnc_bind_address))
4286
4287     self.cfg.Update(instance)
4288
4289     return result
4290
4291
4292 class LUQueryExports(NoHooksLU):
4293   """Query the exports list
4294
4295   """
4296   _OP_REQP = []
4297
4298   def CheckPrereq(self):
4299     """Check that the nodelist contains only existing nodes.
4300
4301     """
4302     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4303
4304   def Exec(self, feedback_fn):
4305     """Compute the list of all the exported system images.
4306
4307     Returns:
4308       a dictionary with the structure node->(export-list)
4309       where export-list is a list of the instances exported on
4310       that node.
4311
4312     """
4313     return rpc.call_export_list(self.nodes)
4314
4315
4316 class LUExportInstance(LogicalUnit):
4317   """Export an instance to an image in the cluster.
4318
4319   """
4320   HPATH = "instance-export"
4321   HTYPE = constants.HTYPE_INSTANCE
4322   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4323
4324   def BuildHooksEnv(self):
4325     """Build hooks env.
4326
4327     This will run on the master, primary node and target node.
4328
4329     """
4330     env = {
4331       "EXPORT_NODE": self.op.target_node,
4332       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4333       }
4334     env.update(_BuildInstanceHookEnvByObject(self.instance))
4335     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4336           self.op.target_node]
4337     return env, nl, nl
4338
4339   def CheckPrereq(self):
4340     """Check prerequisites.
4341
4342     This checks that the instance and node names are valid.
4343
4344     """
4345     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4346     self.instance = self.cfg.GetInstanceInfo(instance_name)
4347     if self.instance is None:
4348       raise errors.OpPrereqError("Instance '%s' not found" %
4349                                  self.op.instance_name)
4350
4351     # node verification
4352     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4353     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4354
4355     if self.dst_node is None:
4356       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4357                                  self.op.target_node)
4358     self.op.target_node = self.dst_node.name
4359
4360     # instance disk type verification
4361     for disk in self.instance.disks:
4362       if disk.dev_type == constants.LD_FILE:
4363         raise errors.OpPrereqError("Export not supported for instances with"
4364                                    " file-based disks")
4365
4366   def Exec(self, feedback_fn):
4367     """Export an instance to an image in the cluster.
4368
4369     """
4370     instance = self.instance
4371     dst_node = self.dst_node
4372     src_node = instance.primary_node
4373     if self.op.shutdown:
4374       # shutdown the instance, but not the disks
4375       if not rpc.call_instance_shutdown(src_node, instance):
4376          raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4377                                   (instance.name, src_node))
4378
4379     vgname = self.cfg.GetVGName()
4380
4381     snap_disks = []
4382
4383     try:
4384       for disk in instance.disks:
4385         if disk.iv_name == "sda":
4386           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4387           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4388
4389           if not new_dev_name:
4390             logger.Error("could not snapshot block device %s on node %s" %
4391                          (disk.logical_id[1], src_node))
4392           else:
4393             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4394                                       logical_id=(vgname, new_dev_name),
4395                                       physical_id=(vgname, new_dev_name),
4396                                       iv_name=disk.iv_name)
4397             snap_disks.append(new_dev)
4398
4399     finally:
4400       if self.op.shutdown and instance.status == "up":
4401         if not rpc.call_instance_start(src_node, instance, None):
4402           _ShutdownInstanceDisks(instance, self.cfg)
4403           raise errors.OpExecError("Could not start instance")
4404
4405     # TODO: check for size
4406
4407     for dev in snap_disks:
4408       if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4409         logger.Error("could not export block device %s from node %s to node %s"
4410                      % (dev.logical_id[1], src_node, dst_node.name))
4411       if not rpc.call_blockdev_remove(src_node, dev):
4412         logger.Error("could not remove snapshot block device %s from node %s" %
4413                      (dev.logical_id[1], src_node))
4414
4415     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4416       logger.Error("could not finalize export for instance %s on node %s" %
4417                    (instance.name, dst_node.name))
4418
4419     nodelist = self.cfg.GetNodeList()
4420     nodelist.remove(dst_node.name)
4421
4422     # on one-node clusters nodelist will be empty after the removal
4423     # if we proceed the backup would be removed because OpQueryExports
4424     # substitutes an empty list with the full cluster node list.
4425     if nodelist:
4426       op = opcodes.OpQueryExports(nodes=nodelist)
4427       exportlist = self.proc.ChainOpCode(op)
4428       for node in exportlist:
4429         if instance.name in exportlist[node]:
4430           if not rpc.call_export_remove(node, instance.name):
4431             logger.Error("could not remove older export for instance %s"
4432                          " on node %s" % (instance.name, node))
4433
4434
4435 class LURemoveExport(NoHooksLU):
4436   """Remove exports related to the named instance.
4437
4438   """
4439   _OP_REQP = ["instance_name"]
4440
4441   def CheckPrereq(self):
4442     """Check prerequisites.
4443     """
4444     pass
4445
4446   def Exec(self, feedback_fn):
4447     """Remove any export.
4448
4449     """
4450     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4451     # If the instance was not found we'll try with the name that was passed in.
4452     # This will only work if it was an FQDN, though.
4453     fqdn_warn = False
4454     if not instance_name:
4455       fqdn_warn = True
4456       instance_name = self.op.instance_name
4457
4458     op = opcodes.OpQueryExports(nodes=[])
4459     exportlist = self.proc.ChainOpCode(op)
4460     found = False
4461     for node in exportlist:
4462       if instance_name in exportlist[node]:
4463         found = True
4464         if not rpc.call_export_remove(node, instance_name):
4465           logger.Error("could not remove export for instance %s"
4466                        " on node %s" % (instance_name, node))
4467
4468     if fqdn_warn and not found:
4469       feedback_fn("Export not found. If trying to remove an export belonging"
4470                   " to a deleted instance please use its Fully Qualified"
4471                   " Domain Name.")
4472
4473
4474 class TagsLU(NoHooksLU):
4475   """Generic tags LU.
4476
4477   This is an abstract class which is the parent of all the other tags LUs.
4478
4479   """
4480   def CheckPrereq(self):
4481     """Check prerequisites.
4482
4483     """
4484     if self.op.kind == constants.TAG_CLUSTER:
4485       self.target = self.cfg.GetClusterInfo()
4486     elif self.op.kind == constants.TAG_NODE:
4487       name = self.cfg.ExpandNodeName(self.op.name)
4488       if name is None:
4489         raise errors.OpPrereqError("Invalid node name (%s)" %
4490                                    (self.op.name,))
4491       self.op.name = name
4492       self.target = self.cfg.GetNodeInfo(name)
4493     elif self.op.kind == constants.TAG_INSTANCE:
4494       name = self.cfg.ExpandInstanceName(self.op.name)
4495       if name is None:
4496         raise errors.OpPrereqError("Invalid instance name (%s)" %
4497                                    (self.op.name,))
4498       self.op.name = name
4499       self.target = self.cfg.GetInstanceInfo(name)
4500     else:
4501       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4502                                  str(self.op.kind))
4503
4504
4505 class LUGetTags(TagsLU):
4506   """Returns the tags of a given object.
4507
4508   """
4509   _OP_REQP = ["kind", "name"]
4510
4511   def Exec(self, feedback_fn):
4512     """Returns the tag list.
4513
4514     """
4515     return self.target.GetTags()
4516
4517
4518 class LUSearchTags(NoHooksLU):
4519   """Searches the tags for a given pattern.
4520
4521   """
4522   _OP_REQP = ["pattern"]
4523
4524   def CheckPrereq(self):
4525     """Check prerequisites.
4526
4527     This checks the pattern passed for validity by compiling it.
4528
4529     """
4530     try:
4531       self.re = re.compile(self.op.pattern)
4532     except re.error, err:
4533       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4534                                  (self.op.pattern, err))
4535
4536   def Exec(self, feedback_fn):
4537     """Returns the tag list.
4538
4539     """
4540     cfg = self.cfg
4541     tgts = [("/cluster", cfg.GetClusterInfo())]
4542     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4543     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4544     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4545     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4546     results = []
4547     for path, target in tgts:
4548       for tag in target.GetTags():
4549         if self.re.search(tag):
4550           results.append((path, tag))
4551     return results
4552
4553
4554 class LUAddTags(TagsLU):
4555   """Sets a tag on a given object.
4556
4557   """
4558   _OP_REQP = ["kind", "name", "tags"]
4559
4560   def CheckPrereq(self):
4561     """Check prerequisites.
4562
4563     This checks the type and length of the tag name and value.
4564
4565     """
4566     TagsLU.CheckPrereq(self)
4567     for tag in self.op.tags:
4568       objects.TaggableObject.ValidateTag(tag)
4569
4570   def Exec(self, feedback_fn):
4571     """Sets the tag.
4572
4573     """
4574     try:
4575       for tag in self.op.tags:
4576         self.target.AddTag(tag)
4577     except errors.TagError, err:
4578       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4579     try:
4580       self.cfg.Update(self.target)
4581     except errors.ConfigurationError:
4582       raise errors.OpRetryError("There has been a modification to the"
4583                                 " config file and the operation has been"
4584                                 " aborted. Please retry.")
4585
4586
4587 class LUDelTags(TagsLU):
4588   """Delete a list of tags from a given object.
4589
4590   """
4591   _OP_REQP = ["kind", "name", "tags"]
4592
4593   def CheckPrereq(self):
4594     """Check prerequisites.
4595
4596     This checks that we have the given tag.
4597
4598     """
4599     TagsLU.CheckPrereq(self)
4600     for tag in self.op.tags:
4601       objects.TaggableObject.ValidateTag(tag)
4602     del_tags = frozenset(self.op.tags)
4603     cur_tags = self.target.GetTags()
4604     if not del_tags <= cur_tags:
4605       diff_tags = del_tags - cur_tags
4606       diff_names = ["'%s'" % tag for tag in diff_tags]
4607       diff_names.sort()
4608       raise errors.OpPrereqError("Tag(s) %s not found" %
4609                                  (",".join(diff_names)))
4610
4611   def Exec(self, feedback_fn):
4612     """Remove the tag from the object.
4613
4614     """
4615     for tag in self.op.tags:
4616       self.target.RemoveTag(tag)
4617     try:
4618       self.cfg.Update(self.target)
4619     except errors.ConfigurationError:
4620       raise errors.OpRetryError("There has been a modification to the"
4621                                 " config file and the operation has been"
4622                                 " aborted. Please retry.")
4623
4624
4625 class LUTestDelay(NoHooksLU):
4626   """Sleep for a specified amount of time.
4627
4628   This LU sleeps on the master and/or nodes for a specified amount of
4629   time.
4630
4631   """
4632   _OP_REQP = ["duration", "on_master", "on_nodes"]
4633   REQ_BGL = False
4634
4635   def ExpandNames(self):
4636     """Expand names and set required locks.
4637
4638     This expands the node list, if any.
4639
4640     """
4641     self.needed_locks = {}
4642     if self.op.on_nodes:
4643       # _GetWantedNodes can be used here, but is not always appropriate to use
4644       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4645       # more information.
4646       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4647       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4648
4649   def CheckPrereq(self):
4650     """Check prerequisites.
4651
4652     """
4653
4654   def Exec(self, feedback_fn):
4655     """Do the actual sleep.
4656
4657     """
4658     if self.op.on_master:
4659       if not utils.TestDelay(self.op.duration):
4660         raise errors.OpExecError("Error during master delay test")
4661     if self.op.on_nodes:
4662       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4663       if not result:
4664         raise errors.OpExecError("Complete failure from rpc call")
4665       for node, node_result in result.items():
4666         if not node_result:
4667           raise errors.OpExecError("Failure during rpc call to node %s,"
4668                                    " result: %s" % (node, node_result))
4669
4670
4671 class IAllocator(object):
4672   """IAllocator framework.
4673
4674   An IAllocator instance has three sets of attributes:
4675     - cfg/sstore that are needed to query the cluster
4676     - input data (all members of the _KEYS class attribute are required)
4677     - four buffer attributes (in|out_data|text), that represent the
4678       input (to the external script) in text and data structure format,
4679       and the output from it, again in two formats
4680     - the result variables from the script (success, info, nodes) for
4681       easy usage
4682
4683   """
4684   _ALLO_KEYS = [
4685     "mem_size", "disks", "disk_template",
4686     "os", "tags", "nics", "vcpus",
4687     ]
4688   _RELO_KEYS = [
4689     "relocate_from",
4690     ]
4691
4692   def __init__(self, cfg, sstore, mode, name, **kwargs):
4693     self.cfg = cfg
4694     self.sstore = sstore
4695     # init buffer variables
4696     self.in_text = self.out_text = self.in_data = self.out_data = None
4697     # init all input fields so that pylint is happy
4698     self.mode = mode
4699     self.name = name
4700     self.mem_size = self.disks = self.disk_template = None
4701     self.os = self.tags = self.nics = self.vcpus = None
4702     self.relocate_from = None
4703     # computed fields
4704     self.required_nodes = None
4705     # init result fields
4706     self.success = self.info = self.nodes = None
4707     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4708       keyset = self._ALLO_KEYS
4709     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4710       keyset = self._RELO_KEYS
4711     else:
4712       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4713                                    " IAllocator" % self.mode)
4714     for key in kwargs:
4715       if key not in keyset:
4716         raise errors.ProgrammerError("Invalid input parameter '%s' to"
4717                                      " IAllocator" % key)
4718       setattr(self, key, kwargs[key])
4719     for key in keyset:
4720       if key not in kwargs:
4721         raise errors.ProgrammerError("Missing input parameter '%s' to"
4722                                      " IAllocator" % key)
4723     self._BuildInputData()
4724
4725   def _ComputeClusterData(self):
4726     """Compute the generic allocator input data.
4727
4728     This is the data that is independent of the actual operation.
4729
4730     """
4731     cfg = self.cfg
4732     # cluster data
4733     data = {
4734       "version": 1,
4735       "cluster_name": self.sstore.GetClusterName(),
4736       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4737       "hypervisor_type": self.sstore.GetHypervisorType(),
4738       # we don't have job IDs
4739       }
4740
4741     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4742
4743     # node data
4744     node_results = {}
4745     node_list = cfg.GetNodeList()
4746     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4747     for nname in node_list:
4748       ninfo = cfg.GetNodeInfo(nname)
4749       if nname not in node_data or not isinstance(node_data[nname], dict):
4750         raise errors.OpExecError("Can't get data for node %s" % nname)
4751       remote_info = node_data[nname]
4752       for attr in ['memory_total', 'memory_free', 'memory_dom0',
4753                    'vg_size', 'vg_free', 'cpu_total']:
4754         if attr not in remote_info:
4755           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4756                                    (nname, attr))
4757         try:
4758           remote_info[attr] = int(remote_info[attr])
4759         except ValueError, err:
4760           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4761                                    " %s" % (nname, attr, str(err)))
4762       # compute memory used by primary instances
4763       i_p_mem = i_p_up_mem = 0
4764       for iinfo in i_list:
4765         if iinfo.primary_node == nname:
4766           i_p_mem += iinfo.memory
4767           if iinfo.status == "up":
4768             i_p_up_mem += iinfo.memory
4769
4770       # compute memory used by instances
4771       pnr = {
4772         "tags": list(ninfo.GetTags()),
4773         "total_memory": remote_info['memory_total'],
4774         "reserved_memory": remote_info['memory_dom0'],
4775         "free_memory": remote_info['memory_free'],
4776         "i_pri_memory": i_p_mem,
4777         "i_pri_up_memory": i_p_up_mem,
4778         "total_disk": remote_info['vg_size'],
4779         "free_disk": remote_info['vg_free'],
4780         "primary_ip": ninfo.primary_ip,
4781         "secondary_ip": ninfo.secondary_ip,
4782         "total_cpus": remote_info['cpu_total'],
4783         }
4784       node_results[nname] = pnr
4785     data["nodes"] = node_results
4786
4787     # instance data
4788     instance_data = {}
4789     for iinfo in i_list:
4790       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4791                   for n in iinfo.nics]
4792       pir = {
4793         "tags": list(iinfo.GetTags()),
4794         "should_run": iinfo.status == "up",
4795         "vcpus": iinfo.vcpus,
4796         "memory": iinfo.memory,
4797         "os": iinfo.os,
4798         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4799         "nics": nic_data,
4800         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4801         "disk_template": iinfo.disk_template,
4802         }
4803       instance_data[iinfo.name] = pir
4804
4805     data["instances"] = instance_data
4806
4807     self.in_data = data
4808
4809   def _AddNewInstance(self):
4810     """Add new instance data to allocator structure.
4811
4812     This in combination with _AllocatorGetClusterData will create the
4813     correct structure needed as input for the allocator.
4814
4815     The checks for the completeness of the opcode must have already been
4816     done.
4817
4818     """
4819     data = self.in_data
4820     if len(self.disks) != 2:
4821       raise errors.OpExecError("Only two-disk configurations supported")
4822
4823     disk_space = _ComputeDiskSize(self.disk_template,
4824                                   self.disks[0]["size"], self.disks[1]["size"])
4825
4826     if self.disk_template in constants.DTS_NET_MIRROR:
4827       self.required_nodes = 2
4828     else:
4829       self.required_nodes = 1
4830     request = {
4831       "type": "allocate",
4832       "name": self.name,
4833       "disk_template": self.disk_template,
4834       "tags": self.tags,
4835       "os": self.os,
4836       "vcpus": self.vcpus,
4837       "memory": self.mem_size,
4838       "disks": self.disks,
4839       "disk_space_total": disk_space,
4840       "nics": self.nics,
4841       "required_nodes": self.required_nodes,
4842       }
4843     data["request"] = request
4844
4845   def _AddRelocateInstance(self):
4846     """Add relocate instance data to allocator structure.
4847
4848     This in combination with _IAllocatorGetClusterData will create the
4849     correct structure needed as input for the allocator.
4850
4851     The checks for the completeness of the opcode must have already been
4852     done.
4853
4854     """
4855     instance = self.cfg.GetInstanceInfo(self.name)
4856     if instance is None:
4857       raise errors.ProgrammerError("Unknown instance '%s' passed to"
4858                                    " IAllocator" % self.name)
4859
4860     if instance.disk_template not in constants.DTS_NET_MIRROR:
4861       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4862
4863     if len(instance.secondary_nodes) != 1:
4864       raise errors.OpPrereqError("Instance has not exactly one secondary node")
4865
4866     self.required_nodes = 1
4867
4868     disk_space = _ComputeDiskSize(instance.disk_template,
4869                                   instance.disks[0].size,
4870                                   instance.disks[1].size)
4871
4872     request = {
4873       "type": "relocate",
4874       "name": self.name,
4875       "disk_space_total": disk_space,
4876       "required_nodes": self.required_nodes,
4877       "relocate_from": self.relocate_from,
4878       }
4879     self.in_data["request"] = request
4880
4881   def _BuildInputData(self):
4882     """Build input data structures.
4883
4884     """
4885     self._ComputeClusterData()
4886
4887     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4888       self._AddNewInstance()
4889     else:
4890       self._AddRelocateInstance()
4891
4892     self.in_text = serializer.Dump(self.in_data)
4893
4894   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4895     """Run an instance allocator and return the results.
4896
4897     """
4898     data = self.in_text
4899
4900     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4901
4902     if not isinstance(result, tuple) or len(result) != 4:
4903       raise errors.OpExecError("Invalid result from master iallocator runner")
4904
4905     rcode, stdout, stderr, fail = result
4906
4907     if rcode == constants.IARUN_NOTFOUND:
4908       raise errors.OpExecError("Can't find allocator '%s'" % name)
4909     elif rcode == constants.IARUN_FAILURE:
4910         raise errors.OpExecError("Instance allocator call failed: %s,"
4911                                  " output: %s" %
4912                                  (fail, stdout+stderr))
4913     self.out_text = stdout
4914     if validate:
4915       self._ValidateResult()
4916
4917   def _ValidateResult(self):
4918     """Process the allocator results.
4919
4920     This will process and if successful save the result in
4921     self.out_data and the other parameters.
4922
4923     """
4924     try:
4925       rdict = serializer.Load(self.out_text)
4926     except Exception, err:
4927       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4928
4929     if not isinstance(rdict, dict):
4930       raise errors.OpExecError("Can't parse iallocator results: not a dict")
4931
4932     for key in "success", "info", "nodes":
4933       if key not in rdict:
4934         raise errors.OpExecError("Can't parse iallocator results:"
4935                                  " missing key '%s'" % key)
4936       setattr(self, key, rdict[key])
4937
4938     if not isinstance(rdict["nodes"], list):
4939       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4940                                " is not a list")
4941     self.out_data = rdict
4942
4943
4944 class LUTestAllocator(NoHooksLU):
4945   """Run allocator tests.
4946
4947   This LU runs the allocator tests
4948
4949   """
4950   _OP_REQP = ["direction", "mode", "name"]
4951
4952   def CheckPrereq(self):
4953     """Check prerequisites.
4954
4955     This checks the opcode parameters depending on the director and mode test.
4956
4957     """
4958     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4959       for attr in ["name", "mem_size", "disks", "disk_template",
4960                    "os", "tags", "nics", "vcpus"]:
4961         if not hasattr(self.op, attr):
4962           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4963                                      attr)
4964       iname = self.cfg.ExpandInstanceName(self.op.name)
4965       if iname is not None:
4966         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4967                                    iname)
4968       if not isinstance(self.op.nics, list):
4969         raise errors.OpPrereqError("Invalid parameter 'nics'")
4970       for row in self.op.nics:
4971         if (not isinstance(row, dict) or
4972             "mac" not in row or
4973             "ip" not in row or
4974             "bridge" not in row):
4975           raise errors.OpPrereqError("Invalid contents of the"
4976                                      " 'nics' parameter")
4977       if not isinstance(self.op.disks, list):
4978         raise errors.OpPrereqError("Invalid parameter 'disks'")
4979       if len(self.op.disks) != 2:
4980         raise errors.OpPrereqError("Only two-disk configurations supported")
4981       for row in self.op.disks:
4982         if (not isinstance(row, dict) or
4983             "size" not in row or
4984             not isinstance(row["size"], int) or
4985             "mode" not in row or
4986             row["mode"] not in ['r', 'w']):
4987           raise errors.OpPrereqError("Invalid contents of the"
4988                                      " 'disks' parameter")
4989     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4990       if not hasattr(self.op, "name"):
4991         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4992       fname = self.cfg.ExpandInstanceName(self.op.name)
4993       if fname is None:
4994         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4995                                    self.op.name)
4996       self.op.name = fname
4997       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4998     else:
4999       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5000                                  self.op.mode)
5001
5002     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5003       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5004         raise errors.OpPrereqError("Missing allocator name")
5005     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5006       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5007                                  self.op.direction)
5008
5009   def Exec(self, feedback_fn):
5010     """Run the allocator test.
5011
5012     """
5013     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5014       ial = IAllocator(self.cfg, self.sstore,
5015                        mode=self.op.mode,
5016                        name=self.op.name,
5017                        mem_size=self.op.mem_size,
5018                        disks=self.op.disks,
5019                        disk_template=self.op.disk_template,
5020                        os=self.op.os,
5021                        tags=self.op.tags,
5022                        nics=self.op.nics,
5023                        vcpus=self.op.vcpus,
5024                        )
5025     else:
5026       ial = IAllocator(self.cfg, self.sstore,
5027                        mode=self.op.mode,
5028                        name=self.op.name,
5029                        relocate_from=list(self.relocate_from),
5030                        )
5031
5032     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5033       result = ial.in_text
5034     else:
5035       ial.Run(self.op.allocator, validate=False)
5036       result = ial.out_text
5037     return result