9859c8590e3c7a7e03228a3317232fc5ecc32c44
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement CheckPrereq which also fills in the opcode instance
53       with all the fields (even if as None)
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_MASTER: the LU needs to run on the master node
59         REQ_WSSTORE: the LU needs a writable SimpleStore
60         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61
62   Note that all commands require root permissions.
63
64   """
65   HPATH = None
66   HTYPE = None
67   _OP_REQP = []
68   REQ_MASTER = True
69   REQ_WSSTORE = False
70   REQ_BGL = True
71
72   def __init__(self, processor, op, context, sstore):
73     """Constructor for LogicalUnit.
74
75     This needs to be overriden in derived classes in order to check op
76     validity.
77
78     """
79     self.proc = processor
80     self.op = op
81     self.cfg = context.cfg
82     self.sstore = sstore
83     self.context = context
84     self.__ssh = None
85
86     for attr_name in self._OP_REQP:
87       attr_val = getattr(op, attr_name, None)
88       if attr_val is None:
89         raise errors.OpPrereqError("Required parameter '%s' missing" %
90                                    attr_name)
91
92     if not cfg.IsCluster():
93       raise errors.OpPrereqError("Cluster not initialized yet,"
94                                  " use 'gnt-cluster init' first.")
95     if self.REQ_MASTER:
96       master = sstore.GetMasterNode()
97       if master != utils.HostInfo().name:
98         raise errors.OpPrereqError("Commands must be run on the master"
99                                    " node %s" % master)
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.sstore)
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckPrereq(self):
112     """Check prerequisites for this LU.
113
114     This method should check that the prerequisites for the execution
115     of this LU are fulfilled. It can do internode communication, but
116     it should be idempotent - no cluster or system changes are
117     allowed.
118
119     The method should raise errors.OpPrereqError in case something is
120     not fulfilled. Its return value is ignored.
121
122     This method should also update all the parameters of the opcode to
123     their canonical form; e.g. a short node name must be fully
124     expanded after this method has successfully completed (so that
125     hooks, logging, etc. work correctly).
126
127     """
128     raise NotImplementedError
129
130   def Exec(self, feedback_fn):
131     """Execute the LU.
132
133     This method should implement the actual work. It should raise
134     errors.OpExecError for failures that are somewhat dealt with in
135     code, or expected.
136
137     """
138     raise NotImplementedError
139
140   def BuildHooksEnv(self):
141     """Build hooks environment for this LU.
142
143     This method should return a three-node tuple consisting of: a dict
144     containing the environment that will be used for running the
145     specific hook for this LU, a list of node names on which the hook
146     should run before the execution, and a list of node names on which
147     the hook should run after the execution.
148
149     The keys of the dict must not have 'GANETI_' prefixed as this will
150     be handled in the hooks runner. Also note additional keys will be
151     added by the hooks runner. If the LU doesn't define any
152     environment, an empty dict (and not None) should be returned.
153
154     No nodes should be returned as an empty list (and not None).
155
156     Note that if the HPATH for a LU class is None, this function will
157     not be called.
158
159     """
160     raise NotImplementedError
161
162   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
163     """Notify the LU about the results of its hooks.
164
165     This method is called every time a hooks phase is executed, and notifies
166     the Logical Unit about the hooks' result. The LU can then use it to alter
167     its result based on the hooks.  By default the method does nothing and the
168     previous result is passed back unchanged but any LU can define it if it
169     wants to use the local cluster hook-scripts somehow.
170
171     Args:
172       phase: the hooks phase that has just been run
173       hooks_results: the results of the multi-node hooks rpc call
174       feedback_fn: function to send feedback back to the caller
175       lu_result: the previous result this LU had, or None in the PRE phase.
176
177     """
178     return lu_result
179
180
181 class NoHooksLU(LogicalUnit):
182   """Simple LU which runs no hooks.
183
184   This LU is intended as a parent for other LogicalUnits which will
185   run no hooks, in order to reduce duplicate code.
186
187   """
188   HPATH = None
189   HTYPE = None
190
191
192 def _GetWantedNodes(lu, nodes):
193   """Returns list of checked and expanded node names.
194
195   Args:
196     nodes: List of nodes (strings) or None for all
197
198   """
199   if not isinstance(nodes, list):
200     raise errors.OpPrereqError("Invalid argument type 'nodes'")
201
202   if nodes:
203     wanted = []
204
205     for name in nodes:
206       node = lu.cfg.ExpandNodeName(name)
207       if node is None:
208         raise errors.OpPrereqError("No such node name '%s'" % name)
209       wanted.append(node)
210
211   else:
212     wanted = lu.cfg.GetNodeList()
213   return utils.NiceSort(wanted)
214
215
216 def _GetWantedInstances(lu, instances):
217   """Returns list of checked and expanded instance names.
218
219   Args:
220     instances: List of instances (strings) or None for all
221
222   """
223   if not isinstance(instances, list):
224     raise errors.OpPrereqError("Invalid argument type 'instances'")
225
226   if instances:
227     wanted = []
228
229     for name in instances:
230       instance = lu.cfg.ExpandInstanceName(name)
231       if instance is None:
232         raise errors.OpPrereqError("No such instance name '%s'" % name)
233       wanted.append(instance)
234
235   else:
236     wanted = lu.cfg.GetInstanceList()
237   return utils.NiceSort(wanted)
238
239
240 def _CheckOutputFields(static, dynamic, selected):
241   """Checks whether all selected fields are valid.
242
243   Args:
244     static: Static fields
245     dynamic: Dynamic fields
246
247   """
248   static_fields = frozenset(static)
249   dynamic_fields = frozenset(dynamic)
250
251   all_fields = static_fields | dynamic_fields
252
253   if not all_fields.issuperset(selected):
254     raise errors.OpPrereqError("Unknown output fields selected: %s"
255                                % ",".join(frozenset(selected).
256                                           difference(all_fields)))
257
258
259 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
260                           memory, vcpus, nics):
261   """Builds instance related env variables for hooks from single variables.
262
263   Args:
264     secondary_nodes: List of secondary nodes as strings
265   """
266   env = {
267     "OP_TARGET": name,
268     "INSTANCE_NAME": name,
269     "INSTANCE_PRIMARY": primary_node,
270     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
271     "INSTANCE_OS_TYPE": os_type,
272     "INSTANCE_STATUS": status,
273     "INSTANCE_MEMORY": memory,
274     "INSTANCE_VCPUS": vcpus,
275   }
276
277   if nics:
278     nic_count = len(nics)
279     for idx, (ip, bridge, mac) in enumerate(nics):
280       if ip is None:
281         ip = ""
282       env["INSTANCE_NIC%d_IP" % idx] = ip
283       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
284       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
285   else:
286     nic_count = 0
287
288   env["INSTANCE_NIC_COUNT"] = nic_count
289
290   return env
291
292
293 def _BuildInstanceHookEnvByObject(instance, override=None):
294   """Builds instance related env variables for hooks from an object.
295
296   Args:
297     instance: objects.Instance object of instance
298     override: dict of values to override
299   """
300   args = {
301     'name': instance.name,
302     'primary_node': instance.primary_node,
303     'secondary_nodes': instance.secondary_nodes,
304     'os_type': instance.os,
305     'status': instance.os,
306     'memory': instance.memory,
307     'vcpus': instance.vcpus,
308     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
309   }
310   if override:
311     args.update(override)
312   return _BuildInstanceHookEnv(**args)
313
314
315 def _CheckInstanceBridgesExist(instance):
316   """Check that the brigdes needed by an instance exist.
317
318   """
319   # check bridges existance
320   brlist = [nic.bridge for nic in instance.nics]
321   if not rpc.call_bridges_exist(instance.primary_node, brlist):
322     raise errors.OpPrereqError("one or more target bridges %s does not"
323                                " exist on destination node '%s'" %
324                                (brlist, instance.primary_node))
325
326
327 class LUDestroyCluster(NoHooksLU):
328   """Logical unit for destroying the cluster.
329
330   """
331   _OP_REQP = []
332
333   def CheckPrereq(self):
334     """Check prerequisites.
335
336     This checks whether the cluster is empty.
337
338     Any errors are signalled by raising errors.OpPrereqError.
339
340     """
341     master = self.sstore.GetMasterNode()
342
343     nodelist = self.cfg.GetNodeList()
344     if len(nodelist) != 1 or nodelist[0] != master:
345       raise errors.OpPrereqError("There are still %d node(s) in"
346                                  " this cluster." % (len(nodelist) - 1))
347     instancelist = self.cfg.GetInstanceList()
348     if instancelist:
349       raise errors.OpPrereqError("There are still %d instance(s) in"
350                                  " this cluster." % len(instancelist))
351
352   def Exec(self, feedback_fn):
353     """Destroys the cluster.
354
355     """
356     master = self.sstore.GetMasterNode()
357     if not rpc.call_node_stop_master(master):
358       raise errors.OpExecError("Could not disable the master role")
359     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
360     utils.CreateBackup(priv_key)
361     utils.CreateBackup(pub_key)
362     rpc.call_node_leave_cluster(master)
363
364
365 class LUVerifyCluster(LogicalUnit):
366   """Verifies the cluster status.
367
368   """
369   HPATH = "cluster-verify"
370   HTYPE = constants.HTYPE_CLUSTER
371   _OP_REQP = ["skip_checks"]
372
373   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
374                   remote_version, feedback_fn):
375     """Run multiple tests against a node.
376
377     Test list:
378       - compares ganeti version
379       - checks vg existance and size > 20G
380       - checks config file checksum
381       - checks ssh to other nodes
382
383     Args:
384       node: name of the node to check
385       file_list: required list of files
386       local_cksum: dictionary of local files and their checksums
387
388     """
389     # compares ganeti version
390     local_version = constants.PROTOCOL_VERSION
391     if not remote_version:
392       feedback_fn("  - ERROR: connection to %s failed" % (node))
393       return True
394
395     if local_version != remote_version:
396       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
397                       (local_version, node, remote_version))
398       return True
399
400     # checks vg existance and size > 20G
401
402     bad = False
403     if not vglist:
404       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
405                       (node,))
406       bad = True
407     else:
408       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
409                                             constants.MIN_VG_SIZE)
410       if vgstatus:
411         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
412         bad = True
413
414     # checks config file checksum
415     # checks ssh to any
416
417     if 'filelist' not in node_result:
418       bad = True
419       feedback_fn("  - ERROR: node hasn't returned file checksum data")
420     else:
421       remote_cksum = node_result['filelist']
422       for file_name in file_list:
423         if file_name not in remote_cksum:
424           bad = True
425           feedback_fn("  - ERROR: file '%s' missing" % file_name)
426         elif remote_cksum[file_name] != local_cksum[file_name]:
427           bad = True
428           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
429
430     if 'nodelist' not in node_result:
431       bad = True
432       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
433     else:
434       if node_result['nodelist']:
435         bad = True
436         for node in node_result['nodelist']:
437           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
438                           (node, node_result['nodelist'][node]))
439     if 'node-net-test' not in node_result:
440       bad = True
441       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
442     else:
443       if node_result['node-net-test']:
444         bad = True
445         nlist = utils.NiceSort(node_result['node-net-test'].keys())
446         for node in nlist:
447           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
448                           (node, node_result['node-net-test'][node]))
449
450     hyp_result = node_result.get('hypervisor', None)
451     if hyp_result is not None:
452       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
453     return bad
454
455   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
456                       node_instance, feedback_fn):
457     """Verify an instance.
458
459     This function checks to see if the required block devices are
460     available on the instance's node.
461
462     """
463     bad = False
464
465     node_current = instanceconfig.primary_node
466
467     node_vol_should = {}
468     instanceconfig.MapLVsByNode(node_vol_should)
469
470     for node in node_vol_should:
471       for volume in node_vol_should[node]:
472         if node not in node_vol_is or volume not in node_vol_is[node]:
473           feedback_fn("  - ERROR: volume %s missing on node %s" %
474                           (volume, node))
475           bad = True
476
477     if not instanceconfig.status == 'down':
478       if (node_current not in node_instance or
479           not instance in node_instance[node_current]):
480         feedback_fn("  - ERROR: instance %s not running on node %s" %
481                         (instance, node_current))
482         bad = True
483
484     for node in node_instance:
485       if (not node == node_current):
486         if instance in node_instance[node]:
487           feedback_fn("  - ERROR: instance %s should not run on node %s" %
488                           (instance, node))
489           bad = True
490
491     return bad
492
493   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
494     """Verify if there are any unknown volumes in the cluster.
495
496     The .os, .swap and backup volumes are ignored. All other volumes are
497     reported as unknown.
498
499     """
500     bad = False
501
502     for node in node_vol_is:
503       for volume in node_vol_is[node]:
504         if node not in node_vol_should or volume not in node_vol_should[node]:
505           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
506                       (volume, node))
507           bad = True
508     return bad
509
510   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
511     """Verify the list of running instances.
512
513     This checks what instances are running but unknown to the cluster.
514
515     """
516     bad = False
517     for node in node_instance:
518       for runninginstance in node_instance[node]:
519         if runninginstance not in instancelist:
520           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
521                           (runninginstance, node))
522           bad = True
523     return bad
524
525   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
526     """Verify N+1 Memory Resilience.
527
528     Check that if one single node dies we can still start all the instances it
529     was primary for.
530
531     """
532     bad = False
533
534     for node, nodeinfo in node_info.iteritems():
535       # This code checks that every node which is now listed as secondary has
536       # enough memory to host all instances it is supposed to should a single
537       # other node in the cluster fail.
538       # FIXME: not ready for failover to an arbitrary node
539       # FIXME: does not support file-backed instances
540       # WARNING: we currently take into account down instances as well as up
541       # ones, considering that even if they're down someone might want to start
542       # them even in the event of a node failure.
543       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
544         needed_mem = 0
545         for instance in instances:
546           needed_mem += instance_cfg[instance].memory
547         if nodeinfo['mfree'] < needed_mem:
548           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
549                       " failovers should node %s fail" % (node, prinode))
550           bad = True
551     return bad
552
553   def CheckPrereq(self):
554     """Check prerequisites.
555
556     Transform the list of checks we're going to skip into a set and check that
557     all its members are valid.
558
559     """
560     self.skip_set = frozenset(self.op.skip_checks)
561     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
562       raise errors.OpPrereqError("Invalid checks to be skipped specified")
563
564   def BuildHooksEnv(self):
565     """Build hooks env.
566
567     Cluster-Verify hooks just rone in the post phase and their failure makes
568     the output be logged in the verify output and the verification to fail.
569
570     """
571     all_nodes = self.cfg.GetNodeList()
572     # TODO: populate the environment with useful information for verify hooks
573     env = {}
574     return env, [], all_nodes
575
576   def Exec(self, feedback_fn):
577     """Verify integrity of cluster, performing various test on nodes.
578
579     """
580     bad = False
581     feedback_fn("* Verifying global settings")
582     for msg in self.cfg.VerifyConfig():
583       feedback_fn("  - ERROR: %s" % msg)
584
585     vg_name = self.cfg.GetVGName()
586     nodelist = utils.NiceSort(self.cfg.GetNodeList())
587     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
588     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
589     i_non_redundant = [] # Non redundant instances
590     node_volume = {}
591     node_instance = {}
592     node_info = {}
593     instance_cfg = {}
594
595     # FIXME: verify OS list
596     # do local checksums
597     file_names = list(self.sstore.GetFileList())
598     file_names.append(constants.SSL_CERT_FILE)
599     file_names.append(constants.CLUSTER_CONF_FILE)
600     local_checksums = utils.FingerprintFiles(file_names)
601
602     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
603     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
604     all_instanceinfo = rpc.call_instance_list(nodelist)
605     all_vglist = rpc.call_vg_list(nodelist)
606     node_verify_param = {
607       'filelist': file_names,
608       'nodelist': nodelist,
609       'hypervisor': None,
610       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
611                         for node in nodeinfo]
612       }
613     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
614     all_rversion = rpc.call_version(nodelist)
615     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
616
617     for node in nodelist:
618       feedback_fn("* Verifying node %s" % node)
619       result = self._VerifyNode(node, file_names, local_checksums,
620                                 all_vglist[node], all_nvinfo[node],
621                                 all_rversion[node], feedback_fn)
622       bad = bad or result
623
624       # node_volume
625       volumeinfo = all_volumeinfo[node]
626
627       if isinstance(volumeinfo, basestring):
628         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
629                     (node, volumeinfo[-400:].encode('string_escape')))
630         bad = True
631         node_volume[node] = {}
632       elif not isinstance(volumeinfo, dict):
633         feedback_fn("  - ERROR: connection to %s failed" % (node,))
634         bad = True
635         continue
636       else:
637         node_volume[node] = volumeinfo
638
639       # node_instance
640       nodeinstance = all_instanceinfo[node]
641       if type(nodeinstance) != list:
642         feedback_fn("  - ERROR: connection to %s failed" % (node,))
643         bad = True
644         continue
645
646       node_instance[node] = nodeinstance
647
648       # node_info
649       nodeinfo = all_ninfo[node]
650       if not isinstance(nodeinfo, dict):
651         feedback_fn("  - ERROR: connection to %s failed" % (node,))
652         bad = True
653         continue
654
655       try:
656         node_info[node] = {
657           "mfree": int(nodeinfo['memory_free']),
658           "dfree": int(nodeinfo['vg_free']),
659           "pinst": [],
660           "sinst": [],
661           # dictionary holding all instances this node is secondary for,
662           # grouped by their primary node. Each key is a cluster node, and each
663           # value is a list of instances which have the key as primary and the
664           # current node as secondary.  this is handy to calculate N+1 memory
665           # availability if you can only failover from a primary to its
666           # secondary.
667           "sinst-by-pnode": {},
668         }
669       except ValueError:
670         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
671         bad = True
672         continue
673
674     node_vol_should = {}
675
676     for instance in instancelist:
677       feedback_fn("* Verifying instance %s" % instance)
678       inst_config = self.cfg.GetInstanceInfo(instance)
679       result =  self._VerifyInstance(instance, inst_config, node_volume,
680                                      node_instance, feedback_fn)
681       bad = bad or result
682
683       inst_config.MapLVsByNode(node_vol_should)
684
685       instance_cfg[instance] = inst_config
686
687       pnode = inst_config.primary_node
688       if pnode in node_info:
689         node_info[pnode]['pinst'].append(instance)
690       else:
691         feedback_fn("  - ERROR: instance %s, connection to primary node"
692                     " %s failed" % (instance, pnode))
693         bad = True
694
695       # If the instance is non-redundant we cannot survive losing its primary
696       # node, so we are not N+1 compliant. On the other hand we have no disk
697       # templates with more than one secondary so that situation is not well
698       # supported either.
699       # FIXME: does not support file-backed instances
700       if len(inst_config.secondary_nodes) == 0:
701         i_non_redundant.append(instance)
702       elif len(inst_config.secondary_nodes) > 1:
703         feedback_fn("  - WARNING: multiple secondaries for instance %s"
704                     % instance)
705
706       for snode in inst_config.secondary_nodes:
707         if snode in node_info:
708           node_info[snode]['sinst'].append(instance)
709           if pnode not in node_info[snode]['sinst-by-pnode']:
710             node_info[snode]['sinst-by-pnode'][pnode] = []
711           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
712         else:
713           feedback_fn("  - ERROR: instance %s, connection to secondary node"
714                       " %s failed" % (instance, snode))
715
716     feedback_fn("* Verifying orphan volumes")
717     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
718                                        feedback_fn)
719     bad = bad or result
720
721     feedback_fn("* Verifying remaining instances")
722     result = self._VerifyOrphanInstances(instancelist, node_instance,
723                                          feedback_fn)
724     bad = bad or result
725
726     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
727       feedback_fn("* Verifying N+1 Memory redundancy")
728       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
729       bad = bad or result
730
731     feedback_fn("* Other Notes")
732     if i_non_redundant:
733       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
734                   % len(i_non_redundant))
735
736     return int(bad)
737
738   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
739     """Analize the post-hooks' result, handle it, and send some
740     nicely-formatted feedback back to the user.
741
742     Args:
743       phase: the hooks phase that has just been run
744       hooks_results: the results of the multi-node hooks rpc call
745       feedback_fn: function to send feedback back to the caller
746       lu_result: previous Exec result
747
748     """
749     # We only really run POST phase hooks, and are only interested in their results
750     if phase == constants.HOOKS_PHASE_POST:
751       # Used to change hooks' output to proper indentation
752       indent_re = re.compile('^', re.M)
753       feedback_fn("* Hooks Results")
754       if not hooks_results:
755         feedback_fn("  - ERROR: general communication failure")
756         lu_result = 1
757       else:
758         for node_name in hooks_results:
759           show_node_header = True
760           res = hooks_results[node_name]
761           if res is False or not isinstance(res, list):
762             feedback_fn("    Communication failure")
763             lu_result = 1
764             continue
765           for script, hkr, output in res:
766             if hkr == constants.HKR_FAIL:
767               # The node header is only shown once, if there are
768               # failing hooks on that node
769               if show_node_header:
770                 feedback_fn("  Node %s:" % node_name)
771                 show_node_header = False
772               feedback_fn("    ERROR: Script %s failed, output:" % script)
773               output = indent_re.sub('      ', output)
774               feedback_fn("%s" % output)
775               lu_result = 1
776
777       return lu_result
778
779
780 class LUVerifyDisks(NoHooksLU):
781   """Verifies the cluster disks status.
782
783   """
784   _OP_REQP = []
785
786   def CheckPrereq(self):
787     """Check prerequisites.
788
789     This has no prerequisites.
790
791     """
792     pass
793
794   def Exec(self, feedback_fn):
795     """Verify integrity of cluster disks.
796
797     """
798     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
799
800     vg_name = self.cfg.GetVGName()
801     nodes = utils.NiceSort(self.cfg.GetNodeList())
802     instances = [self.cfg.GetInstanceInfo(name)
803                  for name in self.cfg.GetInstanceList()]
804
805     nv_dict = {}
806     for inst in instances:
807       inst_lvs = {}
808       if (inst.status != "up" or
809           inst.disk_template not in constants.DTS_NET_MIRROR):
810         continue
811       inst.MapLVsByNode(inst_lvs)
812       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
813       for node, vol_list in inst_lvs.iteritems():
814         for vol in vol_list:
815           nv_dict[(node, vol)] = inst
816
817     if not nv_dict:
818       return result
819
820     node_lvs = rpc.call_volume_list(nodes, vg_name)
821
822     to_act = set()
823     for node in nodes:
824       # node_volume
825       lvs = node_lvs[node]
826
827       if isinstance(lvs, basestring):
828         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
829         res_nlvm[node] = lvs
830       elif not isinstance(lvs, dict):
831         logger.Info("connection to node %s failed or invalid data returned" %
832                     (node,))
833         res_nodes.append(node)
834         continue
835
836       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
837         inst = nv_dict.pop((node, lv_name), None)
838         if (not lv_online and inst is not None
839             and inst.name not in res_instances):
840           res_instances.append(inst.name)
841
842     # any leftover items in nv_dict are missing LVs, let's arrange the
843     # data better
844     for key, inst in nv_dict.iteritems():
845       if inst.name not in res_missing:
846         res_missing[inst.name] = []
847       res_missing[inst.name].append(key)
848
849     return result
850
851
852 class LURenameCluster(LogicalUnit):
853   """Rename the cluster.
854
855   """
856   HPATH = "cluster-rename"
857   HTYPE = constants.HTYPE_CLUSTER
858   _OP_REQP = ["name"]
859   REQ_WSSTORE = True
860
861   def BuildHooksEnv(self):
862     """Build hooks env.
863
864     """
865     env = {
866       "OP_TARGET": self.sstore.GetClusterName(),
867       "NEW_NAME": self.op.name,
868       }
869     mn = self.sstore.GetMasterNode()
870     return env, [mn], [mn]
871
872   def CheckPrereq(self):
873     """Verify that the passed name is a valid one.
874
875     """
876     hostname = utils.HostInfo(self.op.name)
877
878     new_name = hostname.name
879     self.ip = new_ip = hostname.ip
880     old_name = self.sstore.GetClusterName()
881     old_ip = self.sstore.GetMasterIP()
882     if new_name == old_name and new_ip == old_ip:
883       raise errors.OpPrereqError("Neither the name nor the IP address of the"
884                                  " cluster has changed")
885     if new_ip != old_ip:
886       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
887         raise errors.OpPrereqError("The given cluster IP address (%s) is"
888                                    " reachable on the network. Aborting." %
889                                    new_ip)
890
891     self.op.name = new_name
892
893   def Exec(self, feedback_fn):
894     """Rename the cluster.
895
896     """
897     clustername = self.op.name
898     ip = self.ip
899     ss = self.sstore
900
901     # shutdown the master IP
902     master = ss.GetMasterNode()
903     if not rpc.call_node_stop_master(master):
904       raise errors.OpExecError("Could not disable the master role")
905
906     try:
907       # modify the sstore
908       ss.SetKey(ss.SS_MASTER_IP, ip)
909       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
910
911       # Distribute updated ss config to all nodes
912       myself = self.cfg.GetNodeInfo(master)
913       dist_nodes = self.cfg.GetNodeList()
914       if myself.name in dist_nodes:
915         dist_nodes.remove(myself.name)
916
917       logger.Debug("Copying updated ssconf data to all nodes")
918       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
919         fname = ss.KeyToFilename(keyname)
920         result = rpc.call_upload_file(dist_nodes, fname)
921         for to_node in dist_nodes:
922           if not result[to_node]:
923             logger.Error("copy of file %s to node %s failed" %
924                          (fname, to_node))
925     finally:
926       if not rpc.call_node_start_master(master):
927         logger.Error("Could not re-enable the master role on the master,"
928                      " please restart manually.")
929
930
931 def _RecursiveCheckIfLVMBased(disk):
932   """Check if the given disk or its children are lvm-based.
933
934   Args:
935     disk: ganeti.objects.Disk object
936
937   Returns:
938     boolean indicating whether a LD_LV dev_type was found or not
939
940   """
941   if disk.children:
942     for chdisk in disk.children:
943       if _RecursiveCheckIfLVMBased(chdisk):
944         return True
945   return disk.dev_type == constants.LD_LV
946
947
948 class LUSetClusterParams(LogicalUnit):
949   """Change the parameters of the cluster.
950
951   """
952   HPATH = "cluster-modify"
953   HTYPE = constants.HTYPE_CLUSTER
954   _OP_REQP = []
955
956   def BuildHooksEnv(self):
957     """Build hooks env.
958
959     """
960     env = {
961       "OP_TARGET": self.sstore.GetClusterName(),
962       "NEW_VG_NAME": self.op.vg_name,
963       }
964     mn = self.sstore.GetMasterNode()
965     return env, [mn], [mn]
966
967   def CheckPrereq(self):
968     """Check prerequisites.
969
970     This checks whether the given params don't conflict and
971     if the given volume group is valid.
972
973     """
974     if not self.op.vg_name:
975       instances = [self.cfg.GetInstanceInfo(name)
976                    for name in self.cfg.GetInstanceList()]
977       for inst in instances:
978         for disk in inst.disks:
979           if _RecursiveCheckIfLVMBased(disk):
980             raise errors.OpPrereqError("Cannot disable lvm storage while"
981                                        " lvm-based instances exist")
982
983     # if vg_name not None, checks given volume group on all nodes
984     if self.op.vg_name:
985       node_list = self.cfg.GetNodeList()
986       vglist = rpc.call_vg_list(node_list)
987       for node in node_list:
988         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
989                                               constants.MIN_VG_SIZE)
990         if vgstatus:
991           raise errors.OpPrereqError("Error on node '%s': %s" %
992                                      (node, vgstatus))
993
994   def Exec(self, feedback_fn):
995     """Change the parameters of the cluster.
996
997     """
998     if self.op.vg_name != self.cfg.GetVGName():
999       self.cfg.SetVGName(self.op.vg_name)
1000     else:
1001       feedback_fn("Cluster LVM configuration already in desired"
1002                   " state, not changing")
1003
1004
1005 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1006   """Sleep and poll for an instance's disk to sync.
1007
1008   """
1009   if not instance.disks:
1010     return True
1011
1012   if not oneshot:
1013     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1014
1015   node = instance.primary_node
1016
1017   for dev in instance.disks:
1018     cfgw.SetDiskID(dev, node)
1019
1020   retries = 0
1021   while True:
1022     max_time = 0
1023     done = True
1024     cumul_degraded = False
1025     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1026     if not rstats:
1027       proc.LogWarning("Can't get any data from node %s" % node)
1028       retries += 1
1029       if retries >= 10:
1030         raise errors.RemoteError("Can't contact node %s for mirror data,"
1031                                  " aborting." % node)
1032       time.sleep(6)
1033       continue
1034     retries = 0
1035     for i in range(len(rstats)):
1036       mstat = rstats[i]
1037       if mstat is None:
1038         proc.LogWarning("Can't compute data for node %s/%s" %
1039                         (node, instance.disks[i].iv_name))
1040         continue
1041       # we ignore the ldisk parameter
1042       perc_done, est_time, is_degraded, _ = mstat
1043       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1044       if perc_done is not None:
1045         done = False
1046         if est_time is not None:
1047           rem_time = "%d estimated seconds remaining" % est_time
1048           max_time = est_time
1049         else:
1050           rem_time = "no time estimate"
1051         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1052                      (instance.disks[i].iv_name, perc_done, rem_time))
1053     if done or oneshot:
1054       break
1055
1056     if unlock:
1057       #utils.Unlock('cmd')
1058       pass
1059     try:
1060       time.sleep(min(60, max_time))
1061     finally:
1062       if unlock:
1063         #utils.Lock('cmd')
1064         pass
1065
1066   if done:
1067     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1068   return not cumul_degraded
1069
1070
1071 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1072   """Check that mirrors are not degraded.
1073
1074   The ldisk parameter, if True, will change the test from the
1075   is_degraded attribute (which represents overall non-ok status for
1076   the device(s)) to the ldisk (representing the local storage status).
1077
1078   """
1079   cfgw.SetDiskID(dev, node)
1080   if ldisk:
1081     idx = 6
1082   else:
1083     idx = 5
1084
1085   result = True
1086   if on_primary or dev.AssembleOnSecondary():
1087     rstats = rpc.call_blockdev_find(node, dev)
1088     if not rstats:
1089       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1090       result = False
1091     else:
1092       result = result and (not rstats[idx])
1093   if dev.children:
1094     for child in dev.children:
1095       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1096
1097   return result
1098
1099
1100 class LUDiagnoseOS(NoHooksLU):
1101   """Logical unit for OS diagnose/query.
1102
1103   """
1104   _OP_REQP = ["output_fields", "names"]
1105
1106   def CheckPrereq(self):
1107     """Check prerequisites.
1108
1109     This always succeeds, since this is a pure query LU.
1110
1111     """
1112     if self.op.names:
1113       raise errors.OpPrereqError("Selective OS query not supported")
1114
1115     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1116     _CheckOutputFields(static=[],
1117                        dynamic=self.dynamic_fields,
1118                        selected=self.op.output_fields)
1119
1120   @staticmethod
1121   def _DiagnoseByOS(node_list, rlist):
1122     """Remaps a per-node return list into an a per-os per-node dictionary
1123
1124       Args:
1125         node_list: a list with the names of all nodes
1126         rlist: a map with node names as keys and OS objects as values
1127
1128       Returns:
1129         map: a map with osnames as keys and as value another map, with
1130              nodes as
1131              keys and list of OS objects as values
1132              e.g. {"debian-etch": {"node1": [<object>,...],
1133                                    "node2": [<object>,]}
1134                   }
1135
1136     """
1137     all_os = {}
1138     for node_name, nr in rlist.iteritems():
1139       if not nr:
1140         continue
1141       for os_obj in nr:
1142         if os_obj.name not in all_os:
1143           # build a list of nodes for this os containing empty lists
1144           # for each node in node_list
1145           all_os[os_obj.name] = {}
1146           for nname in node_list:
1147             all_os[os_obj.name][nname] = []
1148         all_os[os_obj.name][node_name].append(os_obj)
1149     return all_os
1150
1151   def Exec(self, feedback_fn):
1152     """Compute the list of OSes.
1153
1154     """
1155     node_list = self.cfg.GetNodeList()
1156     node_data = rpc.call_os_diagnose(node_list)
1157     if node_data == False:
1158       raise errors.OpExecError("Can't gather the list of OSes")
1159     pol = self._DiagnoseByOS(node_list, node_data)
1160     output = []
1161     for os_name, os_data in pol.iteritems():
1162       row = []
1163       for field in self.op.output_fields:
1164         if field == "name":
1165           val = os_name
1166         elif field == "valid":
1167           val = utils.all([osl and osl[0] for osl in os_data.values()])
1168         elif field == "node_status":
1169           val = {}
1170           for node_name, nos_list in os_data.iteritems():
1171             val[node_name] = [(v.status, v.path) for v in nos_list]
1172         else:
1173           raise errors.ParameterError(field)
1174         row.append(val)
1175       output.append(row)
1176
1177     return output
1178
1179
1180 class LURemoveNode(LogicalUnit):
1181   """Logical unit for removing a node.
1182
1183   """
1184   HPATH = "node-remove"
1185   HTYPE = constants.HTYPE_NODE
1186   _OP_REQP = ["node_name"]
1187
1188   def BuildHooksEnv(self):
1189     """Build hooks env.
1190
1191     This doesn't run on the target node in the pre phase as a failed
1192     node would then be impossible to remove.
1193
1194     """
1195     env = {
1196       "OP_TARGET": self.op.node_name,
1197       "NODE_NAME": self.op.node_name,
1198       }
1199     all_nodes = self.cfg.GetNodeList()
1200     all_nodes.remove(self.op.node_name)
1201     return env, all_nodes, all_nodes
1202
1203   def CheckPrereq(self):
1204     """Check prerequisites.
1205
1206     This checks:
1207      - the node exists in the configuration
1208      - it does not have primary or secondary instances
1209      - it's not the master
1210
1211     Any errors are signalled by raising errors.OpPrereqError.
1212
1213     """
1214     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1215     if node is None:
1216       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1217
1218     instance_list = self.cfg.GetInstanceList()
1219
1220     masternode = self.sstore.GetMasterNode()
1221     if node.name == masternode:
1222       raise errors.OpPrereqError("Node is the master node,"
1223                                  " you need to failover first.")
1224
1225     for instance_name in instance_list:
1226       instance = self.cfg.GetInstanceInfo(instance_name)
1227       if node.name == instance.primary_node:
1228         raise errors.OpPrereqError("Instance %s still running on the node,"
1229                                    " please remove first." % instance_name)
1230       if node.name in instance.secondary_nodes:
1231         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1232                                    " please remove first." % instance_name)
1233     self.op.node_name = node.name
1234     self.node = node
1235
1236   def Exec(self, feedback_fn):
1237     """Removes the node from the cluster.
1238
1239     """
1240     node = self.node
1241     logger.Info("stopping the node daemon and removing configs from node %s" %
1242                 node.name)
1243
1244     rpc.call_node_leave_cluster(node.name)
1245
1246     logger.Info("Removing node %s from config" % node.name)
1247
1248     self.cfg.RemoveNode(node.name)
1249
1250     utils.RemoveHostFromEtcHosts(node.name)
1251
1252
1253 class LUQueryNodes(NoHooksLU):
1254   """Logical unit for querying nodes.
1255
1256   """
1257   _OP_REQP = ["output_fields", "names"]
1258
1259   def CheckPrereq(self):
1260     """Check prerequisites.
1261
1262     This checks that the fields required are valid output fields.
1263
1264     """
1265     self.dynamic_fields = frozenset([
1266       "dtotal", "dfree",
1267       "mtotal", "mnode", "mfree",
1268       "bootid",
1269       "ctotal",
1270       ])
1271
1272     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1273                                "pinst_list", "sinst_list",
1274                                "pip", "sip", "tags"],
1275                        dynamic=self.dynamic_fields,
1276                        selected=self.op.output_fields)
1277
1278     self.wanted = _GetWantedNodes(self, self.op.names)
1279
1280   def Exec(self, feedback_fn):
1281     """Computes the list of nodes and their attributes.
1282
1283     """
1284     nodenames = self.wanted
1285     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1286
1287     # begin data gathering
1288
1289     if self.dynamic_fields.intersection(self.op.output_fields):
1290       live_data = {}
1291       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1292       for name in nodenames:
1293         nodeinfo = node_data.get(name, None)
1294         if nodeinfo:
1295           live_data[name] = {
1296             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1297             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1298             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1299             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1300             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1301             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1302             "bootid": nodeinfo['bootid'],
1303             }
1304         else:
1305           live_data[name] = {}
1306     else:
1307       live_data = dict.fromkeys(nodenames, {})
1308
1309     node_to_primary = dict([(name, set()) for name in nodenames])
1310     node_to_secondary = dict([(name, set()) for name in nodenames])
1311
1312     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1313                              "sinst_cnt", "sinst_list"))
1314     if inst_fields & frozenset(self.op.output_fields):
1315       instancelist = self.cfg.GetInstanceList()
1316
1317       for instance_name in instancelist:
1318         inst = self.cfg.GetInstanceInfo(instance_name)
1319         if inst.primary_node in node_to_primary:
1320           node_to_primary[inst.primary_node].add(inst.name)
1321         for secnode in inst.secondary_nodes:
1322           if secnode in node_to_secondary:
1323             node_to_secondary[secnode].add(inst.name)
1324
1325     # end data gathering
1326
1327     output = []
1328     for node in nodelist:
1329       node_output = []
1330       for field in self.op.output_fields:
1331         if field == "name":
1332           val = node.name
1333         elif field == "pinst_list":
1334           val = list(node_to_primary[node.name])
1335         elif field == "sinst_list":
1336           val = list(node_to_secondary[node.name])
1337         elif field == "pinst_cnt":
1338           val = len(node_to_primary[node.name])
1339         elif field == "sinst_cnt":
1340           val = len(node_to_secondary[node.name])
1341         elif field == "pip":
1342           val = node.primary_ip
1343         elif field == "sip":
1344           val = node.secondary_ip
1345         elif field == "tags":
1346           val = list(node.GetTags())
1347         elif field in self.dynamic_fields:
1348           val = live_data[node.name].get(field, None)
1349         else:
1350           raise errors.ParameterError(field)
1351         node_output.append(val)
1352       output.append(node_output)
1353
1354     return output
1355
1356
1357 class LUQueryNodeVolumes(NoHooksLU):
1358   """Logical unit for getting volumes on node(s).
1359
1360   """
1361   _OP_REQP = ["nodes", "output_fields"]
1362
1363   def CheckPrereq(self):
1364     """Check prerequisites.
1365
1366     This checks that the fields required are valid output fields.
1367
1368     """
1369     self.nodes = _GetWantedNodes(self, self.op.nodes)
1370
1371     _CheckOutputFields(static=["node"],
1372                        dynamic=["phys", "vg", "name", "size", "instance"],
1373                        selected=self.op.output_fields)
1374
1375
1376   def Exec(self, feedback_fn):
1377     """Computes the list of nodes and their attributes.
1378
1379     """
1380     nodenames = self.nodes
1381     volumes = rpc.call_node_volumes(nodenames)
1382
1383     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1384              in self.cfg.GetInstanceList()]
1385
1386     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1387
1388     output = []
1389     for node in nodenames:
1390       if node not in volumes or not volumes[node]:
1391         continue
1392
1393       node_vols = volumes[node][:]
1394       node_vols.sort(key=lambda vol: vol['dev'])
1395
1396       for vol in node_vols:
1397         node_output = []
1398         for field in self.op.output_fields:
1399           if field == "node":
1400             val = node
1401           elif field == "phys":
1402             val = vol['dev']
1403           elif field == "vg":
1404             val = vol['vg']
1405           elif field == "name":
1406             val = vol['name']
1407           elif field == "size":
1408             val = int(float(vol['size']))
1409           elif field == "instance":
1410             for inst in ilist:
1411               if node not in lv_by_node[inst]:
1412                 continue
1413               if vol['name'] in lv_by_node[inst][node]:
1414                 val = inst.name
1415                 break
1416             else:
1417               val = '-'
1418           else:
1419             raise errors.ParameterError(field)
1420           node_output.append(str(val))
1421
1422         output.append(node_output)
1423
1424     return output
1425
1426
1427 class LUAddNode(LogicalUnit):
1428   """Logical unit for adding node to the cluster.
1429
1430   """
1431   HPATH = "node-add"
1432   HTYPE = constants.HTYPE_NODE
1433   _OP_REQP = ["node_name"]
1434
1435   def BuildHooksEnv(self):
1436     """Build hooks env.
1437
1438     This will run on all nodes before, and on all nodes + the new node after.
1439
1440     """
1441     env = {
1442       "OP_TARGET": self.op.node_name,
1443       "NODE_NAME": self.op.node_name,
1444       "NODE_PIP": self.op.primary_ip,
1445       "NODE_SIP": self.op.secondary_ip,
1446       }
1447     nodes_0 = self.cfg.GetNodeList()
1448     nodes_1 = nodes_0 + [self.op.node_name, ]
1449     return env, nodes_0, nodes_1
1450
1451   def CheckPrereq(self):
1452     """Check prerequisites.
1453
1454     This checks:
1455      - the new node is not already in the config
1456      - it is resolvable
1457      - its parameters (single/dual homed) matches the cluster
1458
1459     Any errors are signalled by raising errors.OpPrereqError.
1460
1461     """
1462     node_name = self.op.node_name
1463     cfg = self.cfg
1464
1465     dns_data = utils.HostInfo(node_name)
1466
1467     node = dns_data.name
1468     primary_ip = self.op.primary_ip = dns_data.ip
1469     secondary_ip = getattr(self.op, "secondary_ip", None)
1470     if secondary_ip is None:
1471       secondary_ip = primary_ip
1472     if not utils.IsValidIP(secondary_ip):
1473       raise errors.OpPrereqError("Invalid secondary IP given")
1474     self.op.secondary_ip = secondary_ip
1475
1476     node_list = cfg.GetNodeList()
1477     if not self.op.readd and node in node_list:
1478       raise errors.OpPrereqError("Node %s is already in the configuration" %
1479                                  node)
1480     elif self.op.readd and node not in node_list:
1481       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1482
1483     for existing_node_name in node_list:
1484       existing_node = cfg.GetNodeInfo(existing_node_name)
1485
1486       if self.op.readd and node == existing_node_name:
1487         if (existing_node.primary_ip != primary_ip or
1488             existing_node.secondary_ip != secondary_ip):
1489           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1490                                      " address configuration as before")
1491         continue
1492
1493       if (existing_node.primary_ip == primary_ip or
1494           existing_node.secondary_ip == primary_ip or
1495           existing_node.primary_ip == secondary_ip or
1496           existing_node.secondary_ip == secondary_ip):
1497         raise errors.OpPrereqError("New node ip address(es) conflict with"
1498                                    " existing node %s" % existing_node.name)
1499
1500     # check that the type of the node (single versus dual homed) is the
1501     # same as for the master
1502     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1503     master_singlehomed = myself.secondary_ip == myself.primary_ip
1504     newbie_singlehomed = secondary_ip == primary_ip
1505     if master_singlehomed != newbie_singlehomed:
1506       if master_singlehomed:
1507         raise errors.OpPrereqError("The master has no private ip but the"
1508                                    " new node has one")
1509       else:
1510         raise errors.OpPrereqError("The master has a private ip but the"
1511                                    " new node doesn't have one")
1512
1513     # checks reachablity
1514     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1515       raise errors.OpPrereqError("Node not reachable by ping")
1516
1517     if not newbie_singlehomed:
1518       # check reachability from my secondary ip to newbie's secondary ip
1519       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1520                            source=myself.secondary_ip):
1521         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1522                                    " based ping to noded port")
1523
1524     self.new_node = objects.Node(name=node,
1525                                  primary_ip=primary_ip,
1526                                  secondary_ip=secondary_ip)
1527
1528   def Exec(self, feedback_fn):
1529     """Adds the new node to the cluster.
1530
1531     """
1532     new_node = self.new_node
1533     node = new_node.name
1534
1535     # check connectivity
1536     result = rpc.call_version([node])[node]
1537     if result:
1538       if constants.PROTOCOL_VERSION == result:
1539         logger.Info("communication to node %s fine, sw version %s match" %
1540                     (node, result))
1541       else:
1542         raise errors.OpExecError("Version mismatch master version %s,"
1543                                  " node version %s" %
1544                                  (constants.PROTOCOL_VERSION, result))
1545     else:
1546       raise errors.OpExecError("Cannot get version from the new node")
1547
1548     # setup ssh on node
1549     logger.Info("copy ssh key to node %s" % node)
1550     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1551     keyarray = []
1552     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1553                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1554                 priv_key, pub_key]
1555
1556     for i in keyfiles:
1557       f = open(i, 'r')
1558       try:
1559         keyarray.append(f.read())
1560       finally:
1561         f.close()
1562
1563     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1564                                keyarray[3], keyarray[4], keyarray[5])
1565
1566     if not result:
1567       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1568
1569     # Add node to our /etc/hosts, and add key to known_hosts
1570     utils.AddHostToEtcHosts(new_node.name)
1571
1572     if new_node.secondary_ip != new_node.primary_ip:
1573       if not rpc.call_node_tcp_ping(new_node.name,
1574                                     constants.LOCALHOST_IP_ADDRESS,
1575                                     new_node.secondary_ip,
1576                                     constants.DEFAULT_NODED_PORT,
1577                                     10, False):
1578         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1579                                  " you gave (%s). Please fix and re-run this"
1580                                  " command." % new_node.secondary_ip)
1581
1582     node_verify_list = [self.sstore.GetMasterNode()]
1583     node_verify_param = {
1584       'nodelist': [node],
1585       # TODO: do a node-net-test as well?
1586     }
1587
1588     result = rpc.call_node_verify(node_verify_list, node_verify_param)
1589     for verifier in node_verify_list:
1590       if not result[verifier]:
1591         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1592                                  " for remote verification" % verifier)
1593       if result[verifier]['nodelist']:
1594         for failed in result[verifier]['nodelist']:
1595           feedback_fn("ssh/hostname verification failed %s -> %s" %
1596                       (verifier, result[verifier]['nodelist'][failed]))
1597         raise errors.OpExecError("ssh/hostname verification failed.")
1598
1599     # Distribute updated /etc/hosts and known_hosts to all nodes,
1600     # including the node just added
1601     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1602     dist_nodes = self.cfg.GetNodeList()
1603     if not self.op.readd:
1604       dist_nodes.append(node)
1605     if myself.name in dist_nodes:
1606       dist_nodes.remove(myself.name)
1607
1608     logger.Debug("Copying hosts and known_hosts to all nodes")
1609     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1610       result = rpc.call_upload_file(dist_nodes, fname)
1611       for to_node in dist_nodes:
1612         if not result[to_node]:
1613           logger.Error("copy of file %s to node %s failed" %
1614                        (fname, to_node))
1615
1616     to_copy = self.sstore.GetFileList()
1617     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1618       to_copy.append(constants.VNC_PASSWORD_FILE)
1619     for fname in to_copy:
1620       result = rpc.call_upload_file([node], fname)
1621       if not result[node]:
1622         logger.Error("could not copy file %s to node %s" % (fname, node))
1623
1624     if not self.op.readd:
1625       logger.Info("adding node %s to cluster.conf" % node)
1626       self.cfg.AddNode(new_node)
1627
1628
1629 class LUMasterFailover(LogicalUnit):
1630   """Failover the master node to the current node.
1631
1632   This is a special LU in that it must run on a non-master node.
1633
1634   """
1635   HPATH = "master-failover"
1636   HTYPE = constants.HTYPE_CLUSTER
1637   REQ_MASTER = False
1638   REQ_WSSTORE = True
1639   _OP_REQP = []
1640
1641   def BuildHooksEnv(self):
1642     """Build hooks env.
1643
1644     This will run on the new master only in the pre phase, and on all
1645     the nodes in the post phase.
1646
1647     """
1648     env = {
1649       "OP_TARGET": self.new_master,
1650       "NEW_MASTER": self.new_master,
1651       "OLD_MASTER": self.old_master,
1652       }
1653     return env, [self.new_master], self.cfg.GetNodeList()
1654
1655   def CheckPrereq(self):
1656     """Check prerequisites.
1657
1658     This checks that we are not already the master.
1659
1660     """
1661     self.new_master = utils.HostInfo().name
1662     self.old_master = self.sstore.GetMasterNode()
1663
1664     if self.old_master == self.new_master:
1665       raise errors.OpPrereqError("This commands must be run on the node"
1666                                  " where you want the new master to be."
1667                                  " %s is already the master" %
1668                                  self.old_master)
1669
1670   def Exec(self, feedback_fn):
1671     """Failover the master node.
1672
1673     This command, when run on a non-master node, will cause the current
1674     master to cease being master, and the non-master to become new
1675     master.
1676
1677     """
1678     #TODO: do not rely on gethostname returning the FQDN
1679     logger.Info("setting master to %s, old master: %s" %
1680                 (self.new_master, self.old_master))
1681
1682     if not rpc.call_node_stop_master(self.old_master):
1683       logger.Error("could disable the master role on the old master"
1684                    " %s, please disable manually" % self.old_master)
1685
1686     ss = self.sstore
1687     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1688     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1689                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1690       logger.Error("could not distribute the new simple store master file"
1691                    " to the other nodes, please check.")
1692
1693     if not rpc.call_node_start_master(self.new_master):
1694       logger.Error("could not start the master role on the new master"
1695                    " %s, please check" % self.new_master)
1696       feedback_fn("Error in activating the master IP on the new master,"
1697                   " please fix manually.")
1698
1699
1700
1701 class LUQueryClusterInfo(NoHooksLU):
1702   """Query cluster configuration.
1703
1704   """
1705   _OP_REQP = []
1706   REQ_MASTER = False
1707
1708   def CheckPrereq(self):
1709     """No prerequsites needed for this LU.
1710
1711     """
1712     pass
1713
1714   def Exec(self, feedback_fn):
1715     """Return cluster config.
1716
1717     """
1718     result = {
1719       "name": self.sstore.GetClusterName(),
1720       "software_version": constants.RELEASE_VERSION,
1721       "protocol_version": constants.PROTOCOL_VERSION,
1722       "config_version": constants.CONFIG_VERSION,
1723       "os_api_version": constants.OS_API_VERSION,
1724       "export_version": constants.EXPORT_VERSION,
1725       "master": self.sstore.GetMasterNode(),
1726       "architecture": (platform.architecture()[0], platform.machine()),
1727       "hypervisor_type": self.sstore.GetHypervisorType(),
1728       }
1729
1730     return result
1731
1732
1733 class LUDumpClusterConfig(NoHooksLU):
1734   """Return a text-representation of the cluster-config.
1735
1736   """
1737   _OP_REQP = []
1738
1739   def CheckPrereq(self):
1740     """No prerequisites.
1741
1742     """
1743     pass
1744
1745   def Exec(self, feedback_fn):
1746     """Dump a representation of the cluster config to the standard output.
1747
1748     """
1749     return self.cfg.DumpConfig()
1750
1751
1752 class LUActivateInstanceDisks(NoHooksLU):
1753   """Bring up an instance's disks.
1754
1755   """
1756   _OP_REQP = ["instance_name"]
1757
1758   def CheckPrereq(self):
1759     """Check prerequisites.
1760
1761     This checks that the instance is in the cluster.
1762
1763     """
1764     instance = self.cfg.GetInstanceInfo(
1765       self.cfg.ExpandInstanceName(self.op.instance_name))
1766     if instance is None:
1767       raise errors.OpPrereqError("Instance '%s' not known" %
1768                                  self.op.instance_name)
1769     self.instance = instance
1770
1771
1772   def Exec(self, feedback_fn):
1773     """Activate the disks.
1774
1775     """
1776     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1777     if not disks_ok:
1778       raise errors.OpExecError("Cannot activate block devices")
1779
1780     return disks_info
1781
1782
1783 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1784   """Prepare the block devices for an instance.
1785
1786   This sets up the block devices on all nodes.
1787
1788   Args:
1789     instance: a ganeti.objects.Instance object
1790     ignore_secondaries: if true, errors on secondary nodes won't result
1791                         in an error return from the function
1792
1793   Returns:
1794     false if the operation failed
1795     list of (host, instance_visible_name, node_visible_name) if the operation
1796          suceeded with the mapping from node devices to instance devices
1797   """
1798   device_info = []
1799   disks_ok = True
1800   iname = instance.name
1801   # With the two passes mechanism we try to reduce the window of
1802   # opportunity for the race condition of switching DRBD to primary
1803   # before handshaking occured, but we do not eliminate it
1804
1805   # The proper fix would be to wait (with some limits) until the
1806   # connection has been made and drbd transitions from WFConnection
1807   # into any other network-connected state (Connected, SyncTarget,
1808   # SyncSource, etc.)
1809
1810   # 1st pass, assemble on all nodes in secondary mode
1811   for inst_disk in instance.disks:
1812     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1813       cfg.SetDiskID(node_disk, node)
1814       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1815       if not result:
1816         logger.Error("could not prepare block device %s on node %s"
1817                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1818         if not ignore_secondaries:
1819           disks_ok = False
1820
1821   # FIXME: race condition on drbd migration to primary
1822
1823   # 2nd pass, do only the primary node
1824   for inst_disk in instance.disks:
1825     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1826       if node != instance.primary_node:
1827         continue
1828       cfg.SetDiskID(node_disk, node)
1829       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1830       if not result:
1831         logger.Error("could not prepare block device %s on node %s"
1832                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1833         disks_ok = False
1834     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1835
1836   # leave the disks configured for the primary node
1837   # this is a workaround that would be fixed better by
1838   # improving the logical/physical id handling
1839   for disk in instance.disks:
1840     cfg.SetDiskID(disk, instance.primary_node)
1841
1842   return disks_ok, device_info
1843
1844
1845 def _StartInstanceDisks(cfg, instance, force):
1846   """Start the disks of an instance.
1847
1848   """
1849   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1850                                            ignore_secondaries=force)
1851   if not disks_ok:
1852     _ShutdownInstanceDisks(instance, cfg)
1853     if force is not None and not force:
1854       logger.Error("If the message above refers to a secondary node,"
1855                    " you can retry the operation using '--force'.")
1856     raise errors.OpExecError("Disk consistency error")
1857
1858
1859 class LUDeactivateInstanceDisks(NoHooksLU):
1860   """Shutdown an instance's disks.
1861
1862   """
1863   _OP_REQP = ["instance_name"]
1864
1865   def CheckPrereq(self):
1866     """Check prerequisites.
1867
1868     This checks that the instance is in the cluster.
1869
1870     """
1871     instance = self.cfg.GetInstanceInfo(
1872       self.cfg.ExpandInstanceName(self.op.instance_name))
1873     if instance is None:
1874       raise errors.OpPrereqError("Instance '%s' not known" %
1875                                  self.op.instance_name)
1876     self.instance = instance
1877
1878   def Exec(self, feedback_fn):
1879     """Deactivate the disks
1880
1881     """
1882     instance = self.instance
1883     ins_l = rpc.call_instance_list([instance.primary_node])
1884     ins_l = ins_l[instance.primary_node]
1885     if not type(ins_l) is list:
1886       raise errors.OpExecError("Can't contact node '%s'" %
1887                                instance.primary_node)
1888
1889     if self.instance.name in ins_l:
1890       raise errors.OpExecError("Instance is running, can't shutdown"
1891                                " block devices.")
1892
1893     _ShutdownInstanceDisks(instance, self.cfg)
1894
1895
1896 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1897   """Shutdown block devices of an instance.
1898
1899   This does the shutdown on all nodes of the instance.
1900
1901   If the ignore_primary is false, errors on the primary node are
1902   ignored.
1903
1904   """
1905   result = True
1906   for disk in instance.disks:
1907     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1908       cfg.SetDiskID(top_disk, node)
1909       if not rpc.call_blockdev_shutdown(node, top_disk):
1910         logger.Error("could not shutdown block device %s on node %s" %
1911                      (disk.iv_name, node))
1912         if not ignore_primary or node != instance.primary_node:
1913           result = False
1914   return result
1915
1916
1917 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1918   """Checks if a node has enough free memory.
1919
1920   This function check if a given node has the needed amount of free
1921   memory. In case the node has less memory or we cannot get the
1922   information from the node, this function raise an OpPrereqError
1923   exception.
1924
1925   Args:
1926     - cfg: a ConfigWriter instance
1927     - node: the node name
1928     - reason: string to use in the error message
1929     - requested: the amount of memory in MiB
1930
1931   """
1932   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1933   if not nodeinfo or not isinstance(nodeinfo, dict):
1934     raise errors.OpPrereqError("Could not contact node %s for resource"
1935                              " information" % (node,))
1936
1937   free_mem = nodeinfo[node].get('memory_free')
1938   if not isinstance(free_mem, int):
1939     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1940                              " was '%s'" % (node, free_mem))
1941   if requested > free_mem:
1942     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1943                              " needed %s MiB, available %s MiB" %
1944                              (node, reason, requested, free_mem))
1945
1946
1947 class LUStartupInstance(LogicalUnit):
1948   """Starts an instance.
1949
1950   """
1951   HPATH = "instance-start"
1952   HTYPE = constants.HTYPE_INSTANCE
1953   _OP_REQP = ["instance_name", "force"]
1954
1955   def BuildHooksEnv(self):
1956     """Build hooks env.
1957
1958     This runs on master, primary and secondary nodes of the instance.
1959
1960     """
1961     env = {
1962       "FORCE": self.op.force,
1963       }
1964     env.update(_BuildInstanceHookEnvByObject(self.instance))
1965     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1966           list(self.instance.secondary_nodes))
1967     return env, nl, nl
1968
1969   def CheckPrereq(self):
1970     """Check prerequisites.
1971
1972     This checks that the instance is in the cluster.
1973
1974     """
1975     instance = self.cfg.GetInstanceInfo(
1976       self.cfg.ExpandInstanceName(self.op.instance_name))
1977     if instance is None:
1978       raise errors.OpPrereqError("Instance '%s' not known" %
1979                                  self.op.instance_name)
1980
1981     # check bridges existance
1982     _CheckInstanceBridgesExist(instance)
1983
1984     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
1985                          "starting instance %s" % instance.name,
1986                          instance.memory)
1987
1988     self.instance = instance
1989     self.op.instance_name = instance.name
1990
1991   def Exec(self, feedback_fn):
1992     """Start the instance.
1993
1994     """
1995     instance = self.instance
1996     force = self.op.force
1997     extra_args = getattr(self.op, "extra_args", "")
1998
1999     self.cfg.MarkInstanceUp(instance.name)
2000
2001     node_current = instance.primary_node
2002
2003     _StartInstanceDisks(self.cfg, instance, force)
2004
2005     if not rpc.call_instance_start(node_current, instance, extra_args):
2006       _ShutdownInstanceDisks(instance, self.cfg)
2007       raise errors.OpExecError("Could not start instance")
2008
2009
2010 class LURebootInstance(LogicalUnit):
2011   """Reboot an instance.
2012
2013   """
2014   HPATH = "instance-reboot"
2015   HTYPE = constants.HTYPE_INSTANCE
2016   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2017
2018   def BuildHooksEnv(self):
2019     """Build hooks env.
2020
2021     This runs on master, primary and secondary nodes of the instance.
2022
2023     """
2024     env = {
2025       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2026       }
2027     env.update(_BuildInstanceHookEnvByObject(self.instance))
2028     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2029           list(self.instance.secondary_nodes))
2030     return env, nl, nl
2031
2032   def CheckPrereq(self):
2033     """Check prerequisites.
2034
2035     This checks that the instance is in the cluster.
2036
2037     """
2038     instance = self.cfg.GetInstanceInfo(
2039       self.cfg.ExpandInstanceName(self.op.instance_name))
2040     if instance is None:
2041       raise errors.OpPrereqError("Instance '%s' not known" %
2042                                  self.op.instance_name)
2043
2044     # check bridges existance
2045     _CheckInstanceBridgesExist(instance)
2046
2047     self.instance = instance
2048     self.op.instance_name = instance.name
2049
2050   def Exec(self, feedback_fn):
2051     """Reboot the instance.
2052
2053     """
2054     instance = self.instance
2055     ignore_secondaries = self.op.ignore_secondaries
2056     reboot_type = self.op.reboot_type
2057     extra_args = getattr(self.op, "extra_args", "")
2058
2059     node_current = instance.primary_node
2060
2061     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2062                            constants.INSTANCE_REBOOT_HARD,
2063                            constants.INSTANCE_REBOOT_FULL]:
2064       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2065                                   (constants.INSTANCE_REBOOT_SOFT,
2066                                    constants.INSTANCE_REBOOT_HARD,
2067                                    constants.INSTANCE_REBOOT_FULL))
2068
2069     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2070                        constants.INSTANCE_REBOOT_HARD]:
2071       if not rpc.call_instance_reboot(node_current, instance,
2072                                       reboot_type, extra_args):
2073         raise errors.OpExecError("Could not reboot instance")
2074     else:
2075       if not rpc.call_instance_shutdown(node_current, instance):
2076         raise errors.OpExecError("could not shutdown instance for full reboot")
2077       _ShutdownInstanceDisks(instance, self.cfg)
2078       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2079       if not rpc.call_instance_start(node_current, instance, extra_args):
2080         _ShutdownInstanceDisks(instance, self.cfg)
2081         raise errors.OpExecError("Could not start instance for full reboot")
2082
2083     self.cfg.MarkInstanceUp(instance.name)
2084
2085
2086 class LUShutdownInstance(LogicalUnit):
2087   """Shutdown an instance.
2088
2089   """
2090   HPATH = "instance-stop"
2091   HTYPE = constants.HTYPE_INSTANCE
2092   _OP_REQP = ["instance_name"]
2093
2094   def BuildHooksEnv(self):
2095     """Build hooks env.
2096
2097     This runs on master, primary and secondary nodes of the instance.
2098
2099     """
2100     env = _BuildInstanceHookEnvByObject(self.instance)
2101     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2102           list(self.instance.secondary_nodes))
2103     return env, nl, nl
2104
2105   def CheckPrereq(self):
2106     """Check prerequisites.
2107
2108     This checks that the instance is in the cluster.
2109
2110     """
2111     instance = self.cfg.GetInstanceInfo(
2112       self.cfg.ExpandInstanceName(self.op.instance_name))
2113     if instance is None:
2114       raise errors.OpPrereqError("Instance '%s' not known" %
2115                                  self.op.instance_name)
2116     self.instance = instance
2117
2118   def Exec(self, feedback_fn):
2119     """Shutdown the instance.
2120
2121     """
2122     instance = self.instance
2123     node_current = instance.primary_node
2124     self.cfg.MarkInstanceDown(instance.name)
2125     if not rpc.call_instance_shutdown(node_current, instance):
2126       logger.Error("could not shutdown instance")
2127
2128     _ShutdownInstanceDisks(instance, self.cfg)
2129
2130
2131 class LUReinstallInstance(LogicalUnit):
2132   """Reinstall an instance.
2133
2134   """
2135   HPATH = "instance-reinstall"
2136   HTYPE = constants.HTYPE_INSTANCE
2137   _OP_REQP = ["instance_name"]
2138
2139   def BuildHooksEnv(self):
2140     """Build hooks env.
2141
2142     This runs on master, primary and secondary nodes of the instance.
2143
2144     """
2145     env = _BuildInstanceHookEnvByObject(self.instance)
2146     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2147           list(self.instance.secondary_nodes))
2148     return env, nl, nl
2149
2150   def CheckPrereq(self):
2151     """Check prerequisites.
2152
2153     This checks that the instance is in the cluster and is not running.
2154
2155     """
2156     instance = self.cfg.GetInstanceInfo(
2157       self.cfg.ExpandInstanceName(self.op.instance_name))
2158     if instance is None:
2159       raise errors.OpPrereqError("Instance '%s' not known" %
2160                                  self.op.instance_name)
2161     if instance.disk_template == constants.DT_DISKLESS:
2162       raise errors.OpPrereqError("Instance '%s' has no disks" %
2163                                  self.op.instance_name)
2164     if instance.status != "down":
2165       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2166                                  self.op.instance_name)
2167     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2168     if remote_info:
2169       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2170                                  (self.op.instance_name,
2171                                   instance.primary_node))
2172
2173     self.op.os_type = getattr(self.op, "os_type", None)
2174     if self.op.os_type is not None:
2175       # OS verification
2176       pnode = self.cfg.GetNodeInfo(
2177         self.cfg.ExpandNodeName(instance.primary_node))
2178       if pnode is None:
2179         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2180                                    self.op.pnode)
2181       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2182       if not os_obj:
2183         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2184                                    " primary node"  % self.op.os_type)
2185
2186     self.instance = instance
2187
2188   def Exec(self, feedback_fn):
2189     """Reinstall the instance.
2190
2191     """
2192     inst = self.instance
2193
2194     if self.op.os_type is not None:
2195       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2196       inst.os = self.op.os_type
2197       self.cfg.AddInstance(inst)
2198
2199     _StartInstanceDisks(self.cfg, inst, None)
2200     try:
2201       feedback_fn("Running the instance OS create scripts...")
2202       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2203         raise errors.OpExecError("Could not install OS for instance %s"
2204                                  " on node %s" %
2205                                  (inst.name, inst.primary_node))
2206     finally:
2207       _ShutdownInstanceDisks(inst, self.cfg)
2208
2209
2210 class LURenameInstance(LogicalUnit):
2211   """Rename an instance.
2212
2213   """
2214   HPATH = "instance-rename"
2215   HTYPE = constants.HTYPE_INSTANCE
2216   _OP_REQP = ["instance_name", "new_name"]
2217
2218   def BuildHooksEnv(self):
2219     """Build hooks env.
2220
2221     This runs on master, primary and secondary nodes of the instance.
2222
2223     """
2224     env = _BuildInstanceHookEnvByObject(self.instance)
2225     env["INSTANCE_NEW_NAME"] = self.op.new_name
2226     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2227           list(self.instance.secondary_nodes))
2228     return env, nl, nl
2229
2230   def CheckPrereq(self):
2231     """Check prerequisites.
2232
2233     This checks that the instance is in the cluster and is not running.
2234
2235     """
2236     instance = self.cfg.GetInstanceInfo(
2237       self.cfg.ExpandInstanceName(self.op.instance_name))
2238     if instance is None:
2239       raise errors.OpPrereqError("Instance '%s' not known" %
2240                                  self.op.instance_name)
2241     if instance.status != "down":
2242       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2243                                  self.op.instance_name)
2244     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2245     if remote_info:
2246       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2247                                  (self.op.instance_name,
2248                                   instance.primary_node))
2249     self.instance = instance
2250
2251     # new name verification
2252     name_info = utils.HostInfo(self.op.new_name)
2253
2254     self.op.new_name = new_name = name_info.name
2255     instance_list = self.cfg.GetInstanceList()
2256     if new_name in instance_list:
2257       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2258                                  new_name)
2259
2260     if not getattr(self.op, "ignore_ip", False):
2261       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2262         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2263                                    (name_info.ip, new_name))
2264
2265
2266   def Exec(self, feedback_fn):
2267     """Reinstall the instance.
2268
2269     """
2270     inst = self.instance
2271     old_name = inst.name
2272
2273     if inst.disk_template == constants.DT_FILE:
2274       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2275
2276     self.cfg.RenameInstance(inst.name, self.op.new_name)
2277
2278     # re-read the instance from the configuration after rename
2279     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2280
2281     if inst.disk_template == constants.DT_FILE:
2282       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2283       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2284                                                 old_file_storage_dir,
2285                                                 new_file_storage_dir)
2286
2287       if not result:
2288         raise errors.OpExecError("Could not connect to node '%s' to rename"
2289                                  " directory '%s' to '%s' (but the instance"
2290                                  " has been renamed in Ganeti)" % (
2291                                  inst.primary_node, old_file_storage_dir,
2292                                  new_file_storage_dir))
2293
2294       if not result[0]:
2295         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2296                                  " (but the instance has been renamed in"
2297                                  " Ganeti)" % (old_file_storage_dir,
2298                                                new_file_storage_dir))
2299
2300     _StartInstanceDisks(self.cfg, inst, None)
2301     try:
2302       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2303                                           "sda", "sdb"):
2304         msg = ("Could run OS rename script for instance %s on node %s (but the"
2305                " instance has been renamed in Ganeti)" %
2306                (inst.name, inst.primary_node))
2307         logger.Error(msg)
2308     finally:
2309       _ShutdownInstanceDisks(inst, self.cfg)
2310
2311
2312 class LURemoveInstance(LogicalUnit):
2313   """Remove an instance.
2314
2315   """
2316   HPATH = "instance-remove"
2317   HTYPE = constants.HTYPE_INSTANCE
2318   _OP_REQP = ["instance_name", "ignore_failures"]
2319
2320   def BuildHooksEnv(self):
2321     """Build hooks env.
2322
2323     This runs on master, primary and secondary nodes of the instance.
2324
2325     """
2326     env = _BuildInstanceHookEnvByObject(self.instance)
2327     nl = [self.sstore.GetMasterNode()]
2328     return env, nl, nl
2329
2330   def CheckPrereq(self):
2331     """Check prerequisites.
2332
2333     This checks that the instance is in the cluster.
2334
2335     """
2336     instance = self.cfg.GetInstanceInfo(
2337       self.cfg.ExpandInstanceName(self.op.instance_name))
2338     if instance is None:
2339       raise errors.OpPrereqError("Instance '%s' not known" %
2340                                  self.op.instance_name)
2341     self.instance = instance
2342
2343   def Exec(self, feedback_fn):
2344     """Remove the instance.
2345
2346     """
2347     instance = self.instance
2348     logger.Info("shutting down instance %s on node %s" %
2349                 (instance.name, instance.primary_node))
2350
2351     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2352       if self.op.ignore_failures:
2353         feedback_fn("Warning: can't shutdown instance")
2354       else:
2355         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2356                                  (instance.name, instance.primary_node))
2357
2358     logger.Info("removing block devices for instance %s" % instance.name)
2359
2360     if not _RemoveDisks(instance, self.cfg):
2361       if self.op.ignore_failures:
2362         feedback_fn("Warning: can't remove instance's disks")
2363       else:
2364         raise errors.OpExecError("Can't remove instance's disks")
2365
2366     logger.Info("removing instance %s out of cluster config" % instance.name)
2367
2368     self.cfg.RemoveInstance(instance.name)
2369
2370
2371 class LUQueryInstances(NoHooksLU):
2372   """Logical unit for querying instances.
2373
2374   """
2375   _OP_REQP = ["output_fields", "names"]
2376
2377   def CheckPrereq(self):
2378     """Check prerequisites.
2379
2380     This checks that the fields required are valid output fields.
2381
2382     """
2383     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2384     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2385                                "admin_state", "admin_ram",
2386                                "disk_template", "ip", "mac", "bridge",
2387                                "sda_size", "sdb_size", "vcpus", "tags"],
2388                        dynamic=self.dynamic_fields,
2389                        selected=self.op.output_fields)
2390
2391     self.wanted = _GetWantedInstances(self, self.op.names)
2392
2393   def Exec(self, feedback_fn):
2394     """Computes the list of nodes and their attributes.
2395
2396     """
2397     instance_names = self.wanted
2398     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2399                      in instance_names]
2400
2401     # begin data gathering
2402
2403     nodes = frozenset([inst.primary_node for inst in instance_list])
2404
2405     bad_nodes = []
2406     if self.dynamic_fields.intersection(self.op.output_fields):
2407       live_data = {}
2408       node_data = rpc.call_all_instances_info(nodes)
2409       for name in nodes:
2410         result = node_data[name]
2411         if result:
2412           live_data.update(result)
2413         elif result == False:
2414           bad_nodes.append(name)
2415         # else no instance is alive
2416     else:
2417       live_data = dict([(name, {}) for name in instance_names])
2418
2419     # end data gathering
2420
2421     output = []
2422     for instance in instance_list:
2423       iout = []
2424       for field in self.op.output_fields:
2425         if field == "name":
2426           val = instance.name
2427         elif field == "os":
2428           val = instance.os
2429         elif field == "pnode":
2430           val = instance.primary_node
2431         elif field == "snodes":
2432           val = list(instance.secondary_nodes)
2433         elif field == "admin_state":
2434           val = (instance.status != "down")
2435         elif field == "oper_state":
2436           if instance.primary_node in bad_nodes:
2437             val = None
2438           else:
2439             val = bool(live_data.get(instance.name))
2440         elif field == "status":
2441           if instance.primary_node in bad_nodes:
2442             val = "ERROR_nodedown"
2443           else:
2444             running = bool(live_data.get(instance.name))
2445             if running:
2446               if instance.status != "down":
2447                 val = "running"
2448               else:
2449                 val = "ERROR_up"
2450             else:
2451               if instance.status != "down":
2452                 val = "ERROR_down"
2453               else:
2454                 val = "ADMIN_down"
2455         elif field == "admin_ram":
2456           val = instance.memory
2457         elif field == "oper_ram":
2458           if instance.primary_node in bad_nodes:
2459             val = None
2460           elif instance.name in live_data:
2461             val = live_data[instance.name].get("memory", "?")
2462           else:
2463             val = "-"
2464         elif field == "disk_template":
2465           val = instance.disk_template
2466         elif field == "ip":
2467           val = instance.nics[0].ip
2468         elif field == "bridge":
2469           val = instance.nics[0].bridge
2470         elif field == "mac":
2471           val = instance.nics[0].mac
2472         elif field == "sda_size" or field == "sdb_size":
2473           disk = instance.FindDisk(field[:3])
2474           if disk is None:
2475             val = None
2476           else:
2477             val = disk.size
2478         elif field == "vcpus":
2479           val = instance.vcpus
2480         elif field == "tags":
2481           val = list(instance.GetTags())
2482         else:
2483           raise errors.ParameterError(field)
2484         iout.append(val)
2485       output.append(iout)
2486
2487     return output
2488
2489
2490 class LUFailoverInstance(LogicalUnit):
2491   """Failover an instance.
2492
2493   """
2494   HPATH = "instance-failover"
2495   HTYPE = constants.HTYPE_INSTANCE
2496   _OP_REQP = ["instance_name", "ignore_consistency"]
2497
2498   def BuildHooksEnv(self):
2499     """Build hooks env.
2500
2501     This runs on master, primary and secondary nodes of the instance.
2502
2503     """
2504     env = {
2505       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2506       }
2507     env.update(_BuildInstanceHookEnvByObject(self.instance))
2508     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2509     return env, nl, nl
2510
2511   def CheckPrereq(self):
2512     """Check prerequisites.
2513
2514     This checks that the instance is in the cluster.
2515
2516     """
2517     instance = self.cfg.GetInstanceInfo(
2518       self.cfg.ExpandInstanceName(self.op.instance_name))
2519     if instance is None:
2520       raise errors.OpPrereqError("Instance '%s' not known" %
2521                                  self.op.instance_name)
2522
2523     if instance.disk_template not in constants.DTS_NET_MIRROR:
2524       raise errors.OpPrereqError("Instance's disk layout is not"
2525                                  " network mirrored, cannot failover.")
2526
2527     secondary_nodes = instance.secondary_nodes
2528     if not secondary_nodes:
2529       raise errors.ProgrammerError("no secondary node but using "
2530                                    "a mirrored disk template")
2531
2532     target_node = secondary_nodes[0]
2533     # check memory requirements on the secondary node
2534     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2535                          instance.name, instance.memory)
2536
2537     # check bridge existance
2538     brlist = [nic.bridge for nic in instance.nics]
2539     if not rpc.call_bridges_exist(target_node, brlist):
2540       raise errors.OpPrereqError("One or more target bridges %s does not"
2541                                  " exist on destination node '%s'" %
2542                                  (brlist, target_node))
2543
2544     self.instance = instance
2545
2546   def Exec(self, feedback_fn):
2547     """Failover an instance.
2548
2549     The failover is done by shutting it down on its present node and
2550     starting it on the secondary.
2551
2552     """
2553     instance = self.instance
2554
2555     source_node = instance.primary_node
2556     target_node = instance.secondary_nodes[0]
2557
2558     feedback_fn("* checking disk consistency between source and target")
2559     for dev in instance.disks:
2560       # for drbd, these are drbd over lvm
2561       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2562         if instance.status == "up" and not self.op.ignore_consistency:
2563           raise errors.OpExecError("Disk %s is degraded on target node,"
2564                                    " aborting failover." % dev.iv_name)
2565
2566     feedback_fn("* shutting down instance on source node")
2567     logger.Info("Shutting down instance %s on node %s" %
2568                 (instance.name, source_node))
2569
2570     if not rpc.call_instance_shutdown(source_node, instance):
2571       if self.op.ignore_consistency:
2572         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2573                      " anyway. Please make sure node %s is down"  %
2574                      (instance.name, source_node, source_node))
2575       else:
2576         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2577                                  (instance.name, source_node))
2578
2579     feedback_fn("* deactivating the instance's disks on source node")
2580     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2581       raise errors.OpExecError("Can't shut down the instance's disks.")
2582
2583     instance.primary_node = target_node
2584     # distribute new instance config to the other nodes
2585     self.cfg.Update(instance)
2586
2587     # Only start the instance if it's marked as up
2588     if instance.status == "up":
2589       feedback_fn("* activating the instance's disks on target node")
2590       logger.Info("Starting instance %s on node %s" %
2591                   (instance.name, target_node))
2592
2593       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2594                                                ignore_secondaries=True)
2595       if not disks_ok:
2596         _ShutdownInstanceDisks(instance, self.cfg)
2597         raise errors.OpExecError("Can't activate the instance's disks")
2598
2599       feedback_fn("* starting the instance on the target node")
2600       if not rpc.call_instance_start(target_node, instance, None):
2601         _ShutdownInstanceDisks(instance, self.cfg)
2602         raise errors.OpExecError("Could not start instance %s on node %s." %
2603                                  (instance.name, target_node))
2604
2605
2606 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2607   """Create a tree of block devices on the primary node.
2608
2609   This always creates all devices.
2610
2611   """
2612   if device.children:
2613     for child in device.children:
2614       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2615         return False
2616
2617   cfg.SetDiskID(device, node)
2618   new_id = rpc.call_blockdev_create(node, device, device.size,
2619                                     instance.name, True, info)
2620   if not new_id:
2621     return False
2622   if device.physical_id is None:
2623     device.physical_id = new_id
2624   return True
2625
2626
2627 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2628   """Create a tree of block devices on a secondary node.
2629
2630   If this device type has to be created on secondaries, create it and
2631   all its children.
2632
2633   If not, just recurse to children keeping the same 'force' value.
2634
2635   """
2636   if device.CreateOnSecondary():
2637     force = True
2638   if device.children:
2639     for child in device.children:
2640       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2641                                         child, force, info):
2642         return False
2643
2644   if not force:
2645     return True
2646   cfg.SetDiskID(device, node)
2647   new_id = rpc.call_blockdev_create(node, device, device.size,
2648                                     instance.name, False, info)
2649   if not new_id:
2650     return False
2651   if device.physical_id is None:
2652     device.physical_id = new_id
2653   return True
2654
2655
2656 def _GenerateUniqueNames(cfg, exts):
2657   """Generate a suitable LV name.
2658
2659   This will generate a logical volume name for the given instance.
2660
2661   """
2662   results = []
2663   for val in exts:
2664     new_id = cfg.GenerateUniqueID()
2665     results.append("%s%s" % (new_id, val))
2666   return results
2667
2668
2669 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2670   """Generate a drbd8 device complete with its children.
2671
2672   """
2673   port = cfg.AllocatePort()
2674   vgname = cfg.GetVGName()
2675   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2676                           logical_id=(vgname, names[0]))
2677   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2678                           logical_id=(vgname, names[1]))
2679   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2680                           logical_id = (primary, secondary, port),
2681                           children = [dev_data, dev_meta],
2682                           iv_name=iv_name)
2683   return drbd_dev
2684
2685
2686 def _GenerateDiskTemplate(cfg, template_name,
2687                           instance_name, primary_node,
2688                           secondary_nodes, disk_sz, swap_sz,
2689                           file_storage_dir, file_driver):
2690   """Generate the entire disk layout for a given template type.
2691
2692   """
2693   #TODO: compute space requirements
2694
2695   vgname = cfg.GetVGName()
2696   if template_name == constants.DT_DISKLESS:
2697     disks = []
2698   elif template_name == constants.DT_PLAIN:
2699     if len(secondary_nodes) != 0:
2700       raise errors.ProgrammerError("Wrong template configuration")
2701
2702     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2703     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2704                            logical_id=(vgname, names[0]),
2705                            iv_name = "sda")
2706     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2707                            logical_id=(vgname, names[1]),
2708                            iv_name = "sdb")
2709     disks = [sda_dev, sdb_dev]
2710   elif template_name == constants.DT_DRBD8:
2711     if len(secondary_nodes) != 1:
2712       raise errors.ProgrammerError("Wrong template configuration")
2713     remote_node = secondary_nodes[0]
2714     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2715                                        ".sdb_data", ".sdb_meta"])
2716     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2717                                          disk_sz, names[0:2], "sda")
2718     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2719                                          swap_sz, names[2:4], "sdb")
2720     disks = [drbd_sda_dev, drbd_sdb_dev]
2721   elif template_name == constants.DT_FILE:
2722     if len(secondary_nodes) != 0:
2723       raise errors.ProgrammerError("Wrong template configuration")
2724
2725     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2726                                 iv_name="sda", logical_id=(file_driver,
2727                                 "%s/sda" % file_storage_dir))
2728     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2729                                 iv_name="sdb", logical_id=(file_driver,
2730                                 "%s/sdb" % file_storage_dir))
2731     disks = [file_sda_dev, file_sdb_dev]
2732   else:
2733     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2734   return disks
2735
2736
2737 def _GetInstanceInfoText(instance):
2738   """Compute that text that should be added to the disk's metadata.
2739
2740   """
2741   return "originstname+%s" % instance.name
2742
2743
2744 def _CreateDisks(cfg, instance):
2745   """Create all disks for an instance.
2746
2747   This abstracts away some work from AddInstance.
2748
2749   Args:
2750     instance: the instance object
2751
2752   Returns:
2753     True or False showing the success of the creation process
2754
2755   """
2756   info = _GetInstanceInfoText(instance)
2757
2758   if instance.disk_template == constants.DT_FILE:
2759     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2760     result = rpc.call_file_storage_dir_create(instance.primary_node,
2761                                               file_storage_dir)
2762
2763     if not result:
2764       logger.Error("Could not connect to node '%s'" % instance.primary_node)
2765       return False
2766
2767     if not result[0]:
2768       logger.Error("failed to create directory '%s'" % file_storage_dir)
2769       return False
2770
2771   for device in instance.disks:
2772     logger.Info("creating volume %s for instance %s" %
2773                 (device.iv_name, instance.name))
2774     #HARDCODE
2775     for secondary_node in instance.secondary_nodes:
2776       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2777                                         device, False, info):
2778         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2779                      (device.iv_name, device, secondary_node))
2780         return False
2781     #HARDCODE
2782     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2783                                     instance, device, info):
2784       logger.Error("failed to create volume %s on primary!" %
2785                    device.iv_name)
2786       return False
2787
2788   return True
2789
2790
2791 def _RemoveDisks(instance, cfg):
2792   """Remove all disks for an instance.
2793
2794   This abstracts away some work from `AddInstance()` and
2795   `RemoveInstance()`. Note that in case some of the devices couldn't
2796   be removed, the removal will continue with the other ones (compare
2797   with `_CreateDisks()`).
2798
2799   Args:
2800     instance: the instance object
2801
2802   Returns:
2803     True or False showing the success of the removal proces
2804
2805   """
2806   logger.Info("removing block devices for instance %s" % instance.name)
2807
2808   result = True
2809   for device in instance.disks:
2810     for node, disk in device.ComputeNodeTree(instance.primary_node):
2811       cfg.SetDiskID(disk, node)
2812       if not rpc.call_blockdev_remove(node, disk):
2813         logger.Error("could not remove block device %s on node %s,"
2814                      " continuing anyway" %
2815                      (device.iv_name, node))
2816         result = False
2817
2818   if instance.disk_template == constants.DT_FILE:
2819     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2820     if not rpc.call_file_storage_dir_remove(instance.primary_node,
2821                                             file_storage_dir):
2822       logger.Error("could not remove directory '%s'" % file_storage_dir)
2823       result = False
2824
2825   return result
2826
2827
2828 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2829   """Compute disk size requirements in the volume group
2830
2831   This is currently hard-coded for the two-drive layout.
2832
2833   """
2834   # Required free disk space as a function of disk and swap space
2835   req_size_dict = {
2836     constants.DT_DISKLESS: None,
2837     constants.DT_PLAIN: disk_size + swap_size,
2838     # 256 MB are added for drbd metadata, 128MB for each drbd device
2839     constants.DT_DRBD8: disk_size + swap_size + 256,
2840     constants.DT_FILE: None,
2841   }
2842
2843   if disk_template not in req_size_dict:
2844     raise errors.ProgrammerError("Disk template '%s' size requirement"
2845                                  " is unknown" %  disk_template)
2846
2847   return req_size_dict[disk_template]
2848
2849
2850 class LUCreateInstance(LogicalUnit):
2851   """Create an instance.
2852
2853   """
2854   HPATH = "instance-add"
2855   HTYPE = constants.HTYPE_INSTANCE
2856   _OP_REQP = ["instance_name", "mem_size", "disk_size",
2857               "disk_template", "swap_size", "mode", "start", "vcpus",
2858               "wait_for_sync", "ip_check", "mac"]
2859
2860   def _RunAllocator(self):
2861     """Run the allocator based on input opcode.
2862
2863     """
2864     disks = [{"size": self.op.disk_size, "mode": "w"},
2865              {"size": self.op.swap_size, "mode": "w"}]
2866     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2867              "bridge": self.op.bridge}]
2868     ial = IAllocator(self.cfg, self.sstore,
2869                      mode=constants.IALLOCATOR_MODE_ALLOC,
2870                      name=self.op.instance_name,
2871                      disk_template=self.op.disk_template,
2872                      tags=[],
2873                      os=self.op.os_type,
2874                      vcpus=self.op.vcpus,
2875                      mem_size=self.op.mem_size,
2876                      disks=disks,
2877                      nics=nics,
2878                      )
2879
2880     ial.Run(self.op.iallocator)
2881
2882     if not ial.success:
2883       raise errors.OpPrereqError("Can't compute nodes using"
2884                                  " iallocator '%s': %s" % (self.op.iallocator,
2885                                                            ial.info))
2886     if len(ial.nodes) != ial.required_nodes:
2887       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2888                                  " of nodes (%s), required %s" %
2889                                  (len(ial.nodes), ial.required_nodes))
2890     self.op.pnode = ial.nodes[0]
2891     logger.ToStdout("Selected nodes for the instance: %s" %
2892                     (", ".join(ial.nodes),))
2893     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2894                 (self.op.instance_name, self.op.iallocator, ial.nodes))
2895     if ial.required_nodes == 2:
2896       self.op.snode = ial.nodes[1]
2897
2898   def BuildHooksEnv(self):
2899     """Build hooks env.
2900
2901     This runs on master, primary and secondary nodes of the instance.
2902
2903     """
2904     env = {
2905       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2906       "INSTANCE_DISK_SIZE": self.op.disk_size,
2907       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2908       "INSTANCE_ADD_MODE": self.op.mode,
2909       }
2910     if self.op.mode == constants.INSTANCE_IMPORT:
2911       env["INSTANCE_SRC_NODE"] = self.op.src_node
2912       env["INSTANCE_SRC_PATH"] = self.op.src_path
2913       env["INSTANCE_SRC_IMAGE"] = self.src_image
2914
2915     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2916       primary_node=self.op.pnode,
2917       secondary_nodes=self.secondaries,
2918       status=self.instance_status,
2919       os_type=self.op.os_type,
2920       memory=self.op.mem_size,
2921       vcpus=self.op.vcpus,
2922       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2923     ))
2924
2925     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2926           self.secondaries)
2927     return env, nl, nl
2928
2929
2930   def CheckPrereq(self):
2931     """Check prerequisites.
2932
2933     """
2934     # set optional parameters to none if they don't exist
2935     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2936                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2937                  "vnc_bind_address"]:
2938       if not hasattr(self.op, attr):
2939         setattr(self.op, attr, None)
2940
2941     if self.op.mode not in (constants.INSTANCE_CREATE,
2942                             constants.INSTANCE_IMPORT):
2943       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2944                                  self.op.mode)
2945
2946     if (not self.cfg.GetVGName() and
2947         self.op.disk_template not in constants.DTS_NOT_LVM):
2948       raise errors.OpPrereqError("Cluster does not support lvm-based"
2949                                  " instances")
2950
2951     if self.op.mode == constants.INSTANCE_IMPORT:
2952       src_node = getattr(self.op, "src_node", None)
2953       src_path = getattr(self.op, "src_path", None)
2954       if src_node is None or src_path is None:
2955         raise errors.OpPrereqError("Importing an instance requires source"
2956                                    " node and path options")
2957       src_node_full = self.cfg.ExpandNodeName(src_node)
2958       if src_node_full is None:
2959         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2960       self.op.src_node = src_node = src_node_full
2961
2962       if not os.path.isabs(src_path):
2963         raise errors.OpPrereqError("The source path must be absolute")
2964
2965       export_info = rpc.call_export_info(src_node, src_path)
2966
2967       if not export_info:
2968         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2969
2970       if not export_info.has_section(constants.INISECT_EXP):
2971         raise errors.ProgrammerError("Corrupted export config")
2972
2973       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2974       if (int(ei_version) != constants.EXPORT_VERSION):
2975         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2976                                    (ei_version, constants.EXPORT_VERSION))
2977
2978       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2979         raise errors.OpPrereqError("Can't import instance with more than"
2980                                    " one data disk")
2981
2982       # FIXME: are the old os-es, disk sizes, etc. useful?
2983       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2984       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2985                                                          'disk0_dump'))
2986       self.src_image = diskimage
2987     else: # INSTANCE_CREATE
2988       if getattr(self.op, "os_type", None) is None:
2989         raise errors.OpPrereqError("No guest OS specified")
2990
2991     #### instance parameters check
2992
2993     # disk template and mirror node verification
2994     if self.op.disk_template not in constants.DISK_TEMPLATES:
2995       raise errors.OpPrereqError("Invalid disk template name")
2996
2997     # instance name verification
2998     hostname1 = utils.HostInfo(self.op.instance_name)
2999
3000     self.op.instance_name = instance_name = hostname1.name
3001     instance_list = self.cfg.GetInstanceList()
3002     if instance_name in instance_list:
3003       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3004                                  instance_name)
3005
3006     # ip validity checks
3007     ip = getattr(self.op, "ip", None)
3008     if ip is None or ip.lower() == "none":
3009       inst_ip = None
3010     elif ip.lower() == "auto":
3011       inst_ip = hostname1.ip
3012     else:
3013       if not utils.IsValidIP(ip):
3014         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3015                                    " like a valid IP" % ip)
3016       inst_ip = ip
3017     self.inst_ip = self.op.ip = inst_ip
3018
3019     if self.op.start and not self.op.ip_check:
3020       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3021                                  " adding an instance in start mode")
3022
3023     if self.op.ip_check:
3024       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3025         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3026                                    (hostname1.ip, instance_name))
3027
3028     # MAC address verification
3029     if self.op.mac != "auto":
3030       if not utils.IsValidMac(self.op.mac.lower()):
3031         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3032                                    self.op.mac)
3033
3034     # bridge verification
3035     bridge = getattr(self.op, "bridge", None)
3036     if bridge is None:
3037       self.op.bridge = self.cfg.GetDefBridge()
3038     else:
3039       self.op.bridge = bridge
3040
3041     # boot order verification
3042     if self.op.hvm_boot_order is not None:
3043       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3044         raise errors.OpPrereqError("invalid boot order specified,"
3045                                    " must be one or more of [acdn]")
3046     # file storage checks
3047     if (self.op.file_driver and
3048         not self.op.file_driver in constants.FILE_DRIVER):
3049       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3050                                  self.op.file_driver)
3051
3052     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3053       raise errors.OpPrereqError("File storage directory not a relative"
3054                                  " path")
3055     #### allocator run
3056
3057     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3058       raise errors.OpPrereqError("One and only one of iallocator and primary"
3059                                  " node must be given")
3060
3061     if self.op.iallocator is not None:
3062       self._RunAllocator()
3063
3064     #### node related checks
3065
3066     # check primary node
3067     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3068     if pnode is None:
3069       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3070                                  self.op.pnode)
3071     self.op.pnode = pnode.name
3072     self.pnode = pnode
3073     self.secondaries = []
3074
3075     # mirror node verification
3076     if self.op.disk_template in constants.DTS_NET_MIRROR:
3077       if getattr(self.op, "snode", None) is None:
3078         raise errors.OpPrereqError("The networked disk templates need"
3079                                    " a mirror node")
3080
3081       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3082       if snode_name is None:
3083         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3084                                    self.op.snode)
3085       elif snode_name == pnode.name:
3086         raise errors.OpPrereqError("The secondary node cannot be"
3087                                    " the primary node.")
3088       self.secondaries.append(snode_name)
3089
3090     req_size = _ComputeDiskSize(self.op.disk_template,
3091                                 self.op.disk_size, self.op.swap_size)
3092
3093     # Check lv size requirements
3094     if req_size is not None:
3095       nodenames = [pnode.name] + self.secondaries
3096       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3097       for node in nodenames:
3098         info = nodeinfo.get(node, None)
3099         if not info:
3100           raise errors.OpPrereqError("Cannot get current information"
3101                                      " from node '%s'" % node)
3102         vg_free = info.get('vg_free', None)
3103         if not isinstance(vg_free, int):
3104           raise errors.OpPrereqError("Can't compute free disk space on"
3105                                      " node %s" % node)
3106         if req_size > info['vg_free']:
3107           raise errors.OpPrereqError("Not enough disk space on target node %s."
3108                                      " %d MB available, %d MB required" %
3109                                      (node, info['vg_free'], req_size))
3110
3111     # os verification
3112     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3113     if not os_obj:
3114       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3115                                  " primary node"  % self.op.os_type)
3116
3117     if self.op.kernel_path == constants.VALUE_NONE:
3118       raise errors.OpPrereqError("Can't set instance kernel to none")
3119
3120
3121     # bridge check on primary node
3122     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3123       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3124                                  " destination node '%s'" %
3125                                  (self.op.bridge, pnode.name))
3126
3127     # memory check on primary node
3128     if self.op.start:
3129       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3130                            "creating instance %s" % self.op.instance_name,
3131                            self.op.mem_size)
3132
3133     # hvm_cdrom_image_path verification
3134     if self.op.hvm_cdrom_image_path is not None:
3135       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3136         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3137                                    " be an absolute path or None, not %s" %
3138                                    self.op.hvm_cdrom_image_path)
3139       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3140         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3141                                    " regular file or a symlink pointing to"
3142                                    " an existing regular file, not %s" %
3143                                    self.op.hvm_cdrom_image_path)
3144
3145     # vnc_bind_address verification
3146     if self.op.vnc_bind_address is not None:
3147       if not utils.IsValidIP(self.op.vnc_bind_address):
3148         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3149                                    " like a valid IP address" %
3150                                    self.op.vnc_bind_address)
3151
3152     if self.op.start:
3153       self.instance_status = 'up'
3154     else:
3155       self.instance_status = 'down'
3156
3157   def Exec(self, feedback_fn):
3158     """Create and add the instance to the cluster.
3159
3160     """
3161     instance = self.op.instance_name
3162     pnode_name = self.pnode.name
3163
3164     if self.op.mac == "auto":
3165       mac_address = self.cfg.GenerateMAC()
3166     else:
3167       mac_address = self.op.mac
3168
3169     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3170     if self.inst_ip is not None:
3171       nic.ip = self.inst_ip
3172
3173     ht_kind = self.sstore.GetHypervisorType()
3174     if ht_kind in constants.HTS_REQ_PORT:
3175       network_port = self.cfg.AllocatePort()
3176     else:
3177       network_port = None
3178
3179     if self.op.vnc_bind_address is None:
3180       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3181
3182     # this is needed because os.path.join does not accept None arguments
3183     if self.op.file_storage_dir is None:
3184       string_file_storage_dir = ""
3185     else:
3186       string_file_storage_dir = self.op.file_storage_dir
3187
3188     # build the full file storage dir path
3189     file_storage_dir = os.path.normpath(os.path.join(
3190                                         self.sstore.GetFileStorageDir(),
3191                                         string_file_storage_dir, instance))
3192
3193
3194     disks = _GenerateDiskTemplate(self.cfg,
3195                                   self.op.disk_template,
3196                                   instance, pnode_name,
3197                                   self.secondaries, self.op.disk_size,
3198                                   self.op.swap_size,
3199                                   file_storage_dir,
3200                                   self.op.file_driver)
3201
3202     iobj = objects.Instance(name=instance, os=self.op.os_type,
3203                             primary_node=pnode_name,
3204                             memory=self.op.mem_size,
3205                             vcpus=self.op.vcpus,
3206                             nics=[nic], disks=disks,
3207                             disk_template=self.op.disk_template,
3208                             status=self.instance_status,
3209                             network_port=network_port,
3210                             kernel_path=self.op.kernel_path,
3211                             initrd_path=self.op.initrd_path,
3212                             hvm_boot_order=self.op.hvm_boot_order,
3213                             hvm_acpi=self.op.hvm_acpi,
3214                             hvm_pae=self.op.hvm_pae,
3215                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3216                             vnc_bind_address=self.op.vnc_bind_address,
3217                             )
3218
3219     feedback_fn("* creating instance disks...")
3220     if not _CreateDisks(self.cfg, iobj):
3221       _RemoveDisks(iobj, self.cfg)
3222       raise errors.OpExecError("Device creation failed, reverting...")
3223
3224     feedback_fn("adding instance %s to cluster config" % instance)
3225
3226     self.cfg.AddInstance(iobj)
3227
3228     if self.op.wait_for_sync:
3229       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3230     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3231       # make sure the disks are not degraded (still sync-ing is ok)
3232       time.sleep(15)
3233       feedback_fn("* checking mirrors status")
3234       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3235     else:
3236       disk_abort = False
3237
3238     if disk_abort:
3239       _RemoveDisks(iobj, self.cfg)
3240       self.cfg.RemoveInstance(iobj.name)
3241       raise errors.OpExecError("There are some degraded disks for"
3242                                " this instance")
3243
3244     feedback_fn("creating os for instance %s on node %s" %
3245                 (instance, pnode_name))
3246
3247     if iobj.disk_template != constants.DT_DISKLESS:
3248       if self.op.mode == constants.INSTANCE_CREATE:
3249         feedback_fn("* running the instance OS create scripts...")
3250         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3251           raise errors.OpExecError("could not add os for instance %s"
3252                                    " on node %s" %
3253                                    (instance, pnode_name))
3254
3255       elif self.op.mode == constants.INSTANCE_IMPORT:
3256         feedback_fn("* running the instance OS import scripts...")
3257         src_node = self.op.src_node
3258         src_image = self.src_image
3259         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3260                                                 src_node, src_image):
3261           raise errors.OpExecError("Could not import os for instance"
3262                                    " %s on node %s" %
3263                                    (instance, pnode_name))
3264       else:
3265         # also checked in the prereq part
3266         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3267                                      % self.op.mode)
3268
3269     if self.op.start:
3270       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3271       feedback_fn("* starting instance...")
3272       if not rpc.call_instance_start(pnode_name, iobj, None):
3273         raise errors.OpExecError("Could not start instance")
3274
3275
3276 class LUConnectConsole(NoHooksLU):
3277   """Connect to an instance's console.
3278
3279   This is somewhat special in that it returns the command line that
3280   you need to run on the master node in order to connect to the
3281   console.
3282
3283   """
3284   _OP_REQP = ["instance_name"]
3285
3286   def CheckPrereq(self):
3287     """Check prerequisites.
3288
3289     This checks that the instance is in the cluster.
3290
3291     """
3292     instance = self.cfg.GetInstanceInfo(
3293       self.cfg.ExpandInstanceName(self.op.instance_name))
3294     if instance is None:
3295       raise errors.OpPrereqError("Instance '%s' not known" %
3296                                  self.op.instance_name)
3297     self.instance = instance
3298
3299   def Exec(self, feedback_fn):
3300     """Connect to the console of an instance
3301
3302     """
3303     instance = self.instance
3304     node = instance.primary_node
3305
3306     node_insts = rpc.call_instance_list([node])[node]
3307     if node_insts is False:
3308       raise errors.OpExecError("Can't connect to node %s." % node)
3309
3310     if instance.name not in node_insts:
3311       raise errors.OpExecError("Instance %s is not running." % instance.name)
3312
3313     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3314
3315     hyper = hypervisor.GetHypervisor()
3316     console_cmd = hyper.GetShellCommandForConsole(instance)
3317
3318     # build ssh cmdline
3319     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3320
3321
3322 class LUReplaceDisks(LogicalUnit):
3323   """Replace the disks of an instance.
3324
3325   """
3326   HPATH = "mirrors-replace"
3327   HTYPE = constants.HTYPE_INSTANCE
3328   _OP_REQP = ["instance_name", "mode", "disks"]
3329
3330   def _RunAllocator(self):
3331     """Compute a new secondary node using an IAllocator.
3332
3333     """
3334     ial = IAllocator(self.cfg, self.sstore,
3335                      mode=constants.IALLOCATOR_MODE_RELOC,
3336                      name=self.op.instance_name,
3337                      relocate_from=[self.sec_node])
3338
3339     ial.Run(self.op.iallocator)
3340
3341     if not ial.success:
3342       raise errors.OpPrereqError("Can't compute nodes using"
3343                                  " iallocator '%s': %s" % (self.op.iallocator,
3344                                                            ial.info))
3345     if len(ial.nodes) != ial.required_nodes:
3346       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3347                                  " of nodes (%s), required %s" %
3348                                  (len(ial.nodes), ial.required_nodes))
3349     self.op.remote_node = ial.nodes[0]
3350     logger.ToStdout("Selected new secondary for the instance: %s" %
3351                     self.op.remote_node)
3352
3353   def BuildHooksEnv(self):
3354     """Build hooks env.
3355
3356     This runs on the master, the primary and all the secondaries.
3357
3358     """
3359     env = {
3360       "MODE": self.op.mode,
3361       "NEW_SECONDARY": self.op.remote_node,
3362       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3363       }
3364     env.update(_BuildInstanceHookEnvByObject(self.instance))
3365     nl = [
3366       self.sstore.GetMasterNode(),
3367       self.instance.primary_node,
3368       ]
3369     if self.op.remote_node is not None:
3370       nl.append(self.op.remote_node)
3371     return env, nl, nl
3372
3373   def CheckPrereq(self):
3374     """Check prerequisites.
3375
3376     This checks that the instance is in the cluster.
3377
3378     """
3379     if not hasattr(self.op, "remote_node"):
3380       self.op.remote_node = None
3381
3382     instance = self.cfg.GetInstanceInfo(
3383       self.cfg.ExpandInstanceName(self.op.instance_name))
3384     if instance is None:
3385       raise errors.OpPrereqError("Instance '%s' not known" %
3386                                  self.op.instance_name)
3387     self.instance = instance
3388     self.op.instance_name = instance.name
3389
3390     if instance.disk_template not in constants.DTS_NET_MIRROR:
3391       raise errors.OpPrereqError("Instance's disk layout is not"
3392                                  " network mirrored.")
3393
3394     if len(instance.secondary_nodes) != 1:
3395       raise errors.OpPrereqError("The instance has a strange layout,"
3396                                  " expected one secondary but found %d" %
3397                                  len(instance.secondary_nodes))
3398
3399     self.sec_node = instance.secondary_nodes[0]
3400
3401     ia_name = getattr(self.op, "iallocator", None)
3402     if ia_name is not None:
3403       if self.op.remote_node is not None:
3404         raise errors.OpPrereqError("Give either the iallocator or the new"
3405                                    " secondary, not both")
3406       self.op.remote_node = self._RunAllocator()
3407
3408     remote_node = self.op.remote_node
3409     if remote_node is not None:
3410       remote_node = self.cfg.ExpandNodeName(remote_node)
3411       if remote_node is None:
3412         raise errors.OpPrereqError("Node '%s' not known" %
3413                                    self.op.remote_node)
3414       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3415     else:
3416       self.remote_node_info = None
3417     if remote_node == instance.primary_node:
3418       raise errors.OpPrereqError("The specified node is the primary node of"
3419                                  " the instance.")
3420     elif remote_node == self.sec_node:
3421       if self.op.mode == constants.REPLACE_DISK_SEC:
3422         # this is for DRBD8, where we can't execute the same mode of
3423         # replacement as for drbd7 (no different port allocated)
3424         raise errors.OpPrereqError("Same secondary given, cannot execute"
3425                                    " replacement")
3426     if instance.disk_template == constants.DT_DRBD8:
3427       if (self.op.mode == constants.REPLACE_DISK_ALL and
3428           remote_node is not None):
3429         # switch to replace secondary mode
3430         self.op.mode = constants.REPLACE_DISK_SEC
3431
3432       if self.op.mode == constants.REPLACE_DISK_ALL:
3433         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3434                                    " secondary disk replacement, not"
3435                                    " both at once")
3436       elif self.op.mode == constants.REPLACE_DISK_PRI:
3437         if remote_node is not None:
3438           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3439                                      " the secondary while doing a primary"
3440                                      " node disk replacement")
3441         self.tgt_node = instance.primary_node
3442         self.oth_node = instance.secondary_nodes[0]
3443       elif self.op.mode == constants.REPLACE_DISK_SEC:
3444         self.new_node = remote_node # this can be None, in which case
3445                                     # we don't change the secondary
3446         self.tgt_node = instance.secondary_nodes[0]
3447         self.oth_node = instance.primary_node
3448       else:
3449         raise errors.ProgrammerError("Unhandled disk replace mode")
3450
3451     for name in self.op.disks:
3452       if instance.FindDisk(name) is None:
3453         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3454                                    (name, instance.name))
3455     self.op.remote_node = remote_node
3456
3457   def _ExecD8DiskOnly(self, feedback_fn):
3458     """Replace a disk on the primary or secondary for dbrd8.
3459
3460     The algorithm for replace is quite complicated:
3461       - for each disk to be replaced:
3462         - create new LVs on the target node with unique names
3463         - detach old LVs from the drbd device
3464         - rename old LVs to name_replaced.<time_t>
3465         - rename new LVs to old LVs
3466         - attach the new LVs (with the old names now) to the drbd device
3467       - wait for sync across all devices
3468       - for each modified disk:
3469         - remove old LVs (which have the name name_replaces.<time_t>)
3470
3471     Failures are not very well handled.
3472
3473     """
3474     steps_total = 6
3475     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3476     instance = self.instance
3477     iv_names = {}
3478     vgname = self.cfg.GetVGName()
3479     # start of work
3480     cfg = self.cfg
3481     tgt_node = self.tgt_node
3482     oth_node = self.oth_node
3483
3484     # Step: check device activation
3485     self.proc.LogStep(1, steps_total, "check device existence")
3486     info("checking volume groups")
3487     my_vg = cfg.GetVGName()
3488     results = rpc.call_vg_list([oth_node, tgt_node])
3489     if not results:
3490       raise errors.OpExecError("Can't list volume groups on the nodes")
3491     for node in oth_node, tgt_node:
3492       res = results.get(node, False)
3493       if not res or my_vg not in res:
3494         raise errors.OpExecError("Volume group '%s' not found on %s" %
3495                                  (my_vg, node))
3496     for dev in instance.disks:
3497       if not dev.iv_name in self.op.disks:
3498         continue
3499       for node in tgt_node, oth_node:
3500         info("checking %s on %s" % (dev.iv_name, node))
3501         cfg.SetDiskID(dev, node)
3502         if not rpc.call_blockdev_find(node, dev):
3503           raise errors.OpExecError("Can't find device %s on node %s" %
3504                                    (dev.iv_name, node))
3505
3506     # Step: check other node consistency
3507     self.proc.LogStep(2, steps_total, "check peer consistency")
3508     for dev in instance.disks:
3509       if not dev.iv_name in self.op.disks:
3510         continue
3511       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3512       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3513                                    oth_node==instance.primary_node):
3514         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3515                                  " to replace disks on this node (%s)" %
3516                                  (oth_node, tgt_node))
3517
3518     # Step: create new storage
3519     self.proc.LogStep(3, steps_total, "allocate new storage")
3520     for dev in instance.disks:
3521       if not dev.iv_name in self.op.disks:
3522         continue
3523       size = dev.size
3524       cfg.SetDiskID(dev, tgt_node)
3525       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3526       names = _GenerateUniqueNames(cfg, lv_names)
3527       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3528                              logical_id=(vgname, names[0]))
3529       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3530                              logical_id=(vgname, names[1]))
3531       new_lvs = [lv_data, lv_meta]
3532       old_lvs = dev.children
3533       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3534       info("creating new local storage on %s for %s" %
3535            (tgt_node, dev.iv_name))
3536       # since we *always* want to create this LV, we use the
3537       # _Create...OnPrimary (which forces the creation), even if we
3538       # are talking about the secondary node
3539       for new_lv in new_lvs:
3540         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3541                                         _GetInstanceInfoText(instance)):
3542           raise errors.OpExecError("Failed to create new LV named '%s' on"
3543                                    " node '%s'" %
3544                                    (new_lv.logical_id[1], tgt_node))
3545
3546     # Step: for each lv, detach+rename*2+attach
3547     self.proc.LogStep(4, steps_total, "change drbd configuration")
3548     for dev, old_lvs, new_lvs in iv_names.itervalues():
3549       info("detaching %s drbd from local storage" % dev.iv_name)
3550       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3551         raise errors.OpExecError("Can't detach drbd from local storage on node"
3552                                  " %s for device %s" % (tgt_node, dev.iv_name))
3553       #dev.children = []
3554       #cfg.Update(instance)
3555
3556       # ok, we created the new LVs, so now we know we have the needed
3557       # storage; as such, we proceed on the target node to rename
3558       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3559       # using the assumption that logical_id == physical_id (which in
3560       # turn is the unique_id on that node)
3561
3562       # FIXME(iustin): use a better name for the replaced LVs
3563       temp_suffix = int(time.time())
3564       ren_fn = lambda d, suff: (d.physical_id[0],
3565                                 d.physical_id[1] + "_replaced-%s" % suff)
3566       # build the rename list based on what LVs exist on the node
3567       rlist = []
3568       for to_ren in old_lvs:
3569         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3570         if find_res is not None: # device exists
3571           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3572
3573       info("renaming the old LVs on the target node")
3574       if not rpc.call_blockdev_rename(tgt_node, rlist):
3575         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3576       # now we rename the new LVs to the old LVs
3577       info("renaming the new LVs on the target node")
3578       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3579       if not rpc.call_blockdev_rename(tgt_node, rlist):
3580         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3581
3582       for old, new in zip(old_lvs, new_lvs):
3583         new.logical_id = old.logical_id
3584         cfg.SetDiskID(new, tgt_node)
3585
3586       for disk in old_lvs:
3587         disk.logical_id = ren_fn(disk, temp_suffix)
3588         cfg.SetDiskID(disk, tgt_node)
3589
3590       # now that the new lvs have the old name, we can add them to the device
3591       info("adding new mirror component on %s" % tgt_node)
3592       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3593         for new_lv in new_lvs:
3594           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3595             warning("Can't rollback device %s", hint="manually cleanup unused"
3596                     " logical volumes")
3597         raise errors.OpExecError("Can't add local storage to drbd")
3598
3599       dev.children = new_lvs
3600       cfg.Update(instance)
3601
3602     # Step: wait for sync
3603
3604     # this can fail as the old devices are degraded and _WaitForSync
3605     # does a combined result over all disks, so we don't check its
3606     # return value
3607     self.proc.LogStep(5, steps_total, "sync devices")
3608     _WaitForSync(cfg, instance, self.proc, unlock=True)
3609
3610     # so check manually all the devices
3611     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3612       cfg.SetDiskID(dev, instance.primary_node)
3613       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3614       if is_degr:
3615         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3616
3617     # Step: remove old storage
3618     self.proc.LogStep(6, steps_total, "removing old storage")
3619     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3620       info("remove logical volumes for %s" % name)
3621       for lv in old_lvs:
3622         cfg.SetDiskID(lv, tgt_node)
3623         if not rpc.call_blockdev_remove(tgt_node, lv):
3624           warning("Can't remove old LV", hint="manually remove unused LVs")
3625           continue
3626
3627   def _ExecD8Secondary(self, feedback_fn):
3628     """Replace the secondary node for drbd8.
3629
3630     The algorithm for replace is quite complicated:
3631       - for all disks of the instance:
3632         - create new LVs on the new node with same names
3633         - shutdown the drbd device on the old secondary
3634         - disconnect the drbd network on the primary
3635         - create the drbd device on the new secondary
3636         - network attach the drbd on the primary, using an artifice:
3637           the drbd code for Attach() will connect to the network if it
3638           finds a device which is connected to the good local disks but
3639           not network enabled
3640       - wait for sync across all devices
3641       - remove all disks from the old secondary
3642
3643     Failures are not very well handled.
3644
3645     """
3646     steps_total = 6
3647     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3648     instance = self.instance
3649     iv_names = {}
3650     vgname = self.cfg.GetVGName()
3651     # start of work
3652     cfg = self.cfg
3653     old_node = self.tgt_node
3654     new_node = self.new_node
3655     pri_node = instance.primary_node
3656
3657     # Step: check device activation
3658     self.proc.LogStep(1, steps_total, "check device existence")
3659     info("checking volume groups")
3660     my_vg = cfg.GetVGName()
3661     results = rpc.call_vg_list([pri_node, new_node])
3662     if not results:
3663       raise errors.OpExecError("Can't list volume groups on the nodes")
3664     for node in pri_node, new_node:
3665       res = results.get(node, False)
3666       if not res or my_vg not in res:
3667         raise errors.OpExecError("Volume group '%s' not found on %s" %
3668                                  (my_vg, node))
3669     for dev in instance.disks:
3670       if not dev.iv_name in self.op.disks:
3671         continue
3672       info("checking %s on %s" % (dev.iv_name, pri_node))
3673       cfg.SetDiskID(dev, pri_node)
3674       if not rpc.call_blockdev_find(pri_node, dev):
3675         raise errors.OpExecError("Can't find device %s on node %s" %
3676                                  (dev.iv_name, pri_node))
3677
3678     # Step: check other node consistency
3679     self.proc.LogStep(2, steps_total, "check peer consistency")
3680     for dev in instance.disks:
3681       if not dev.iv_name in self.op.disks:
3682         continue
3683       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3684       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3685         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3686                                  " unsafe to replace the secondary" %
3687                                  pri_node)
3688
3689     # Step: create new storage
3690     self.proc.LogStep(3, steps_total, "allocate new storage")
3691     for dev in instance.disks:
3692       size = dev.size
3693       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3694       # since we *always* want to create this LV, we use the
3695       # _Create...OnPrimary (which forces the creation), even if we
3696       # are talking about the secondary node
3697       for new_lv in dev.children:
3698         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3699                                         _GetInstanceInfoText(instance)):
3700           raise errors.OpExecError("Failed to create new LV named '%s' on"
3701                                    " node '%s'" %
3702                                    (new_lv.logical_id[1], new_node))
3703
3704       iv_names[dev.iv_name] = (dev, dev.children)
3705
3706     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3707     for dev in instance.disks:
3708       size = dev.size
3709       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3710       # create new devices on new_node
3711       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3712                               logical_id=(pri_node, new_node,
3713                                           dev.logical_id[2]),
3714                               children=dev.children)
3715       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3716                                         new_drbd, False,
3717                                       _GetInstanceInfoText(instance)):
3718         raise errors.OpExecError("Failed to create new DRBD on"
3719                                  " node '%s'" % new_node)
3720
3721     for dev in instance.disks:
3722       # we have new devices, shutdown the drbd on the old secondary
3723       info("shutting down drbd for %s on old node" % dev.iv_name)
3724       cfg.SetDiskID(dev, old_node)
3725       if not rpc.call_blockdev_shutdown(old_node, dev):
3726         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3727                 hint="Please cleanup this device manually as soon as possible")
3728
3729     info("detaching primary drbds from the network (=> standalone)")
3730     done = 0
3731     for dev in instance.disks:
3732       cfg.SetDiskID(dev, pri_node)
3733       # set the physical (unique in bdev terms) id to None, meaning
3734       # detach from network
3735       dev.physical_id = (None,) * len(dev.physical_id)
3736       # and 'find' the device, which will 'fix' it to match the
3737       # standalone state
3738       if rpc.call_blockdev_find(pri_node, dev):
3739         done += 1
3740       else:
3741         warning("Failed to detach drbd %s from network, unusual case" %
3742                 dev.iv_name)
3743
3744     if not done:
3745       # no detaches succeeded (very unlikely)
3746       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3747
3748     # if we managed to detach at least one, we update all the disks of
3749     # the instance to point to the new secondary
3750     info("updating instance configuration")
3751     for dev in instance.disks:
3752       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3753       cfg.SetDiskID(dev, pri_node)
3754     cfg.Update(instance)
3755
3756     # and now perform the drbd attach
3757     info("attaching primary drbds to new secondary (standalone => connected)")
3758     failures = []
3759     for dev in instance.disks:
3760       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3761       # since the attach is smart, it's enough to 'find' the device,
3762       # it will automatically activate the network, if the physical_id
3763       # is correct
3764       cfg.SetDiskID(dev, pri_node)
3765       if not rpc.call_blockdev_find(pri_node, dev):
3766         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3767                 "please do a gnt-instance info to see the status of disks")
3768
3769     # this can fail as the old devices are degraded and _WaitForSync
3770     # does a combined result over all disks, so we don't check its
3771     # return value
3772     self.proc.LogStep(5, steps_total, "sync devices")
3773     _WaitForSync(cfg, instance, self.proc, unlock=True)
3774
3775     # so check manually all the devices
3776     for name, (dev, old_lvs) in iv_names.iteritems():
3777       cfg.SetDiskID(dev, pri_node)
3778       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3779       if is_degr:
3780         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3781
3782     self.proc.LogStep(6, steps_total, "removing old storage")
3783     for name, (dev, old_lvs) in iv_names.iteritems():
3784       info("remove logical volumes for %s" % name)
3785       for lv in old_lvs:
3786         cfg.SetDiskID(lv, old_node)
3787         if not rpc.call_blockdev_remove(old_node, lv):
3788           warning("Can't remove LV on old secondary",
3789                   hint="Cleanup stale volumes by hand")
3790
3791   def Exec(self, feedback_fn):
3792     """Execute disk replacement.
3793
3794     This dispatches the disk replacement to the appropriate handler.
3795
3796     """
3797     instance = self.instance
3798
3799     # Activate the instance disks if we're replacing them on a down instance
3800     if instance.status == "down":
3801       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3802       self.proc.ChainOpCode(op)
3803
3804     if instance.disk_template == constants.DT_DRBD8:
3805       if self.op.remote_node is None:
3806         fn = self._ExecD8DiskOnly
3807       else:
3808         fn = self._ExecD8Secondary
3809     else:
3810       raise errors.ProgrammerError("Unhandled disk replacement case")
3811
3812     ret = fn(feedback_fn)
3813
3814     # Deactivate the instance disks if we're replacing them on a down instance
3815     if instance.status == "down":
3816       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3817       self.proc.ChainOpCode(op)
3818
3819     return ret
3820
3821
3822 class LUGrowDisk(LogicalUnit):
3823   """Grow a disk of an instance.
3824
3825   """
3826   HPATH = "disk-grow"
3827   HTYPE = constants.HTYPE_INSTANCE
3828   _OP_REQP = ["instance_name", "disk", "amount"]
3829
3830   def BuildHooksEnv(self):
3831     """Build hooks env.
3832
3833     This runs on the master, the primary and all the secondaries.
3834
3835     """
3836     env = {
3837       "DISK": self.op.disk,
3838       "AMOUNT": self.op.amount,
3839       }
3840     env.update(_BuildInstanceHookEnvByObject(self.instance))
3841     nl = [
3842       self.sstore.GetMasterNode(),
3843       self.instance.primary_node,
3844       ]
3845     return env, nl, nl
3846
3847   def CheckPrereq(self):
3848     """Check prerequisites.
3849
3850     This checks that the instance is in the cluster.
3851
3852     """
3853     instance = self.cfg.GetInstanceInfo(
3854       self.cfg.ExpandInstanceName(self.op.instance_name))
3855     if instance is None:
3856       raise errors.OpPrereqError("Instance '%s' not known" %
3857                                  self.op.instance_name)
3858     self.instance = instance
3859     self.op.instance_name = instance.name
3860
3861     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3862       raise errors.OpPrereqError("Instance's disk layout does not support"
3863                                  " growing.")
3864
3865     if instance.FindDisk(self.op.disk) is None:
3866       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3867                                  (self.op.disk, instance.name))
3868
3869     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3870     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3871     for node in nodenames:
3872       info = nodeinfo.get(node, None)
3873       if not info:
3874         raise errors.OpPrereqError("Cannot get current information"
3875                                    " from node '%s'" % node)
3876       vg_free = info.get('vg_free', None)
3877       if not isinstance(vg_free, int):
3878         raise errors.OpPrereqError("Can't compute free disk space on"
3879                                    " node %s" % node)
3880       if self.op.amount > info['vg_free']:
3881         raise errors.OpPrereqError("Not enough disk space on target node %s:"
3882                                    " %d MiB available, %d MiB required" %
3883                                    (node, info['vg_free'], self.op.amount))
3884
3885   def Exec(self, feedback_fn):
3886     """Execute disk grow.
3887
3888     """
3889     instance = self.instance
3890     disk = instance.FindDisk(self.op.disk)
3891     for node in (instance.secondary_nodes + (instance.primary_node,)):
3892       self.cfg.SetDiskID(disk, node)
3893       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3894       if not result or not isinstance(result, tuple) or len(result) != 2:
3895         raise errors.OpExecError("grow request failed to node %s" % node)
3896       elif not result[0]:
3897         raise errors.OpExecError("grow request failed to node %s: %s" %
3898                                  (node, result[1]))
3899     disk.RecordGrow(self.op.amount)
3900     self.cfg.Update(instance)
3901     return
3902
3903
3904 class LUQueryInstanceData(NoHooksLU):
3905   """Query runtime instance data.
3906
3907   """
3908   _OP_REQP = ["instances"]
3909
3910   def CheckPrereq(self):
3911     """Check prerequisites.
3912
3913     This only checks the optional instance list against the existing names.
3914
3915     """
3916     if not isinstance(self.op.instances, list):
3917       raise errors.OpPrereqError("Invalid argument type 'instances'")
3918     if self.op.instances:
3919       self.wanted_instances = []
3920       names = self.op.instances
3921       for name in names:
3922         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3923         if instance is None:
3924           raise errors.OpPrereqError("No such instance name '%s'" % name)
3925         self.wanted_instances.append(instance)
3926     else:
3927       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3928                                in self.cfg.GetInstanceList()]
3929     return
3930
3931
3932   def _ComputeDiskStatus(self, instance, snode, dev):
3933     """Compute block device status.
3934
3935     """
3936     self.cfg.SetDiskID(dev, instance.primary_node)
3937     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3938     if dev.dev_type in constants.LDS_DRBD:
3939       # we change the snode then (otherwise we use the one passed in)
3940       if dev.logical_id[0] == instance.primary_node:
3941         snode = dev.logical_id[1]
3942       else:
3943         snode = dev.logical_id[0]
3944
3945     if snode:
3946       self.cfg.SetDiskID(dev, snode)
3947       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3948     else:
3949       dev_sstatus = None
3950
3951     if dev.children:
3952       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3953                       for child in dev.children]
3954     else:
3955       dev_children = []
3956
3957     data = {
3958       "iv_name": dev.iv_name,
3959       "dev_type": dev.dev_type,
3960       "logical_id": dev.logical_id,
3961       "physical_id": dev.physical_id,
3962       "pstatus": dev_pstatus,
3963       "sstatus": dev_sstatus,
3964       "children": dev_children,
3965       }
3966
3967     return data
3968
3969   def Exec(self, feedback_fn):
3970     """Gather and return data"""
3971     result = {}
3972     for instance in self.wanted_instances:
3973       remote_info = rpc.call_instance_info(instance.primary_node,
3974                                                 instance.name)
3975       if remote_info and "state" in remote_info:
3976         remote_state = "up"
3977       else:
3978         remote_state = "down"
3979       if instance.status == "down":
3980         config_state = "down"
3981       else:
3982         config_state = "up"
3983
3984       disks = [self._ComputeDiskStatus(instance, None, device)
3985                for device in instance.disks]
3986
3987       idict = {
3988         "name": instance.name,
3989         "config_state": config_state,
3990         "run_state": remote_state,
3991         "pnode": instance.primary_node,
3992         "snodes": instance.secondary_nodes,
3993         "os": instance.os,
3994         "memory": instance.memory,
3995         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3996         "disks": disks,
3997         "vcpus": instance.vcpus,
3998         }
3999
4000       htkind = self.sstore.GetHypervisorType()
4001       if htkind == constants.HT_XEN_PVM30:
4002         idict["kernel_path"] = instance.kernel_path
4003         idict["initrd_path"] = instance.initrd_path
4004
4005       if htkind == constants.HT_XEN_HVM31:
4006         idict["hvm_boot_order"] = instance.hvm_boot_order
4007         idict["hvm_acpi"] = instance.hvm_acpi
4008         idict["hvm_pae"] = instance.hvm_pae
4009         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4010
4011       if htkind in constants.HTS_REQ_PORT:
4012         idict["vnc_bind_address"] = instance.vnc_bind_address
4013         idict["network_port"] = instance.network_port
4014
4015       result[instance.name] = idict
4016
4017     return result
4018
4019
4020 class LUSetInstanceParams(LogicalUnit):
4021   """Modifies an instances's parameters.
4022
4023   """
4024   HPATH = "instance-modify"
4025   HTYPE = constants.HTYPE_INSTANCE
4026   _OP_REQP = ["instance_name"]
4027
4028   def BuildHooksEnv(self):
4029     """Build hooks env.
4030
4031     This runs on the master, primary and secondaries.
4032
4033     """
4034     args = dict()
4035     if self.mem:
4036       args['memory'] = self.mem
4037     if self.vcpus:
4038       args['vcpus'] = self.vcpus
4039     if self.do_ip or self.do_bridge or self.mac:
4040       if self.do_ip:
4041         ip = self.ip
4042       else:
4043         ip = self.instance.nics[0].ip
4044       if self.bridge:
4045         bridge = self.bridge
4046       else:
4047         bridge = self.instance.nics[0].bridge
4048       if self.mac:
4049         mac = self.mac
4050       else:
4051         mac = self.instance.nics[0].mac
4052       args['nics'] = [(ip, bridge, mac)]
4053     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4054     nl = [self.sstore.GetMasterNode(),
4055           self.instance.primary_node] + list(self.instance.secondary_nodes)
4056     return env, nl, nl
4057
4058   def CheckPrereq(self):
4059     """Check prerequisites.
4060
4061     This only checks the instance list against the existing names.
4062
4063     """
4064     self.mem = getattr(self.op, "mem", None)
4065     self.vcpus = getattr(self.op, "vcpus", None)
4066     self.ip = getattr(self.op, "ip", None)
4067     self.mac = getattr(self.op, "mac", None)
4068     self.bridge = getattr(self.op, "bridge", None)
4069     self.kernel_path = getattr(self.op, "kernel_path", None)
4070     self.initrd_path = getattr(self.op, "initrd_path", None)
4071     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4072     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4073     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4074     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4075     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4076     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4077                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4078                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4079                  self.vnc_bind_address]
4080     if all_parms.count(None) == len(all_parms):
4081       raise errors.OpPrereqError("No changes submitted")
4082     if self.mem is not None:
4083       try:
4084         self.mem = int(self.mem)
4085       except ValueError, err:
4086         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4087     if self.vcpus is not None:
4088       try:
4089         self.vcpus = int(self.vcpus)
4090       except ValueError, err:
4091         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4092     if self.ip is not None:
4093       self.do_ip = True
4094       if self.ip.lower() == "none":
4095         self.ip = None
4096       else:
4097         if not utils.IsValidIP(self.ip):
4098           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4099     else:
4100       self.do_ip = False
4101     self.do_bridge = (self.bridge is not None)
4102     if self.mac is not None:
4103       if self.cfg.IsMacInUse(self.mac):
4104         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4105                                    self.mac)
4106       if not utils.IsValidMac(self.mac):
4107         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4108
4109     if self.kernel_path is not None:
4110       self.do_kernel_path = True
4111       if self.kernel_path == constants.VALUE_NONE:
4112         raise errors.OpPrereqError("Can't set instance to no kernel")
4113
4114       if self.kernel_path != constants.VALUE_DEFAULT:
4115         if not os.path.isabs(self.kernel_path):
4116           raise errors.OpPrereqError("The kernel path must be an absolute"
4117                                     " filename")
4118     else:
4119       self.do_kernel_path = False
4120
4121     if self.initrd_path is not None:
4122       self.do_initrd_path = True
4123       if self.initrd_path not in (constants.VALUE_NONE,
4124                                   constants.VALUE_DEFAULT):
4125         if not os.path.isabs(self.initrd_path):
4126           raise errors.OpPrereqError("The initrd path must be an absolute"
4127                                     " filename")
4128     else:
4129       self.do_initrd_path = False
4130
4131     # boot order verification
4132     if self.hvm_boot_order is not None:
4133       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4134         if len(self.hvm_boot_order.strip("acdn")) != 0:
4135           raise errors.OpPrereqError("invalid boot order specified,"
4136                                      " must be one or more of [acdn]"
4137                                      " or 'default'")
4138
4139     # hvm_cdrom_image_path verification
4140     if self.op.hvm_cdrom_image_path is not None:
4141       if not os.path.isabs(self.op.hvm_cdrom_image_path):
4142         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4143                                    " be an absolute path or None, not %s" %
4144                                    self.op.hvm_cdrom_image_path)
4145       if not os.path.isfile(self.op.hvm_cdrom_image_path):
4146         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4147                                    " regular file or a symlink pointing to"
4148                                    " an existing regular file, not %s" %
4149                                    self.op.hvm_cdrom_image_path)
4150
4151     # vnc_bind_address verification
4152     if self.op.vnc_bind_address is not None:
4153       if not utils.IsValidIP(self.op.vnc_bind_address):
4154         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4155                                    " like a valid IP address" %
4156                                    self.op.vnc_bind_address)
4157
4158     instance = self.cfg.GetInstanceInfo(
4159       self.cfg.ExpandInstanceName(self.op.instance_name))
4160     if instance is None:
4161       raise errors.OpPrereqError("No such instance name '%s'" %
4162                                  self.op.instance_name)
4163     self.op.instance_name = instance.name
4164     self.instance = instance
4165     return
4166
4167   def Exec(self, feedback_fn):
4168     """Modifies an instance.
4169
4170     All parameters take effect only at the next restart of the instance.
4171     """
4172     result = []
4173     instance = self.instance
4174     if self.mem:
4175       instance.memory = self.mem
4176       result.append(("mem", self.mem))
4177     if self.vcpus:
4178       instance.vcpus = self.vcpus
4179       result.append(("vcpus",  self.vcpus))
4180     if self.do_ip:
4181       instance.nics[0].ip = self.ip
4182       result.append(("ip", self.ip))
4183     if self.bridge:
4184       instance.nics[0].bridge = self.bridge
4185       result.append(("bridge", self.bridge))
4186     if self.mac:
4187       instance.nics[0].mac = self.mac
4188       result.append(("mac", self.mac))
4189     if self.do_kernel_path:
4190       instance.kernel_path = self.kernel_path
4191       result.append(("kernel_path", self.kernel_path))
4192     if self.do_initrd_path:
4193       instance.initrd_path = self.initrd_path
4194       result.append(("initrd_path", self.initrd_path))
4195     if self.hvm_boot_order:
4196       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4197         instance.hvm_boot_order = None
4198       else:
4199         instance.hvm_boot_order = self.hvm_boot_order
4200       result.append(("hvm_boot_order", self.hvm_boot_order))
4201     if self.hvm_acpi:
4202       instance.hvm_acpi = self.hvm_acpi
4203       result.append(("hvm_acpi", self.hvm_acpi))
4204     if self.hvm_pae:
4205       instance.hvm_pae = self.hvm_pae
4206       result.append(("hvm_pae", self.hvm_pae))
4207     if self.hvm_cdrom_image_path:
4208       instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4209       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4210     if self.vnc_bind_address:
4211       instance.vnc_bind_address = self.vnc_bind_address
4212       result.append(("vnc_bind_address", self.vnc_bind_address))
4213
4214     self.cfg.AddInstance(instance)
4215
4216     return result
4217
4218
4219 class LUQueryExports(NoHooksLU):
4220   """Query the exports list
4221
4222   """
4223   _OP_REQP = []
4224
4225   def CheckPrereq(self):
4226     """Check that the nodelist contains only existing nodes.
4227
4228     """
4229     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4230
4231   def Exec(self, feedback_fn):
4232     """Compute the list of all the exported system images.
4233
4234     Returns:
4235       a dictionary with the structure node->(export-list)
4236       where export-list is a list of the instances exported on
4237       that node.
4238
4239     """
4240     return rpc.call_export_list(self.nodes)
4241
4242
4243 class LUExportInstance(LogicalUnit):
4244   """Export an instance to an image in the cluster.
4245
4246   """
4247   HPATH = "instance-export"
4248   HTYPE = constants.HTYPE_INSTANCE
4249   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4250
4251   def BuildHooksEnv(self):
4252     """Build hooks env.
4253
4254     This will run on the master, primary node and target node.
4255
4256     """
4257     env = {
4258       "EXPORT_NODE": self.op.target_node,
4259       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4260       }
4261     env.update(_BuildInstanceHookEnvByObject(self.instance))
4262     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4263           self.op.target_node]
4264     return env, nl, nl
4265
4266   def CheckPrereq(self):
4267     """Check prerequisites.
4268
4269     This checks that the instance and node names are valid.
4270
4271     """
4272     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4273     self.instance = self.cfg.GetInstanceInfo(instance_name)
4274     if self.instance is None:
4275       raise errors.OpPrereqError("Instance '%s' not found" %
4276                                  self.op.instance_name)
4277
4278     # node verification
4279     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4280     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4281
4282     if self.dst_node is None:
4283       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4284                                  self.op.target_node)
4285     self.op.target_node = self.dst_node.name
4286
4287     # instance disk type verification
4288     for disk in self.instance.disks:
4289       if disk.dev_type == constants.LD_FILE:
4290         raise errors.OpPrereqError("Export not supported for instances with"
4291                                    " file-based disks")
4292
4293   def Exec(self, feedback_fn):
4294     """Export an instance to an image in the cluster.
4295
4296     """
4297     instance = self.instance
4298     dst_node = self.dst_node
4299     src_node = instance.primary_node
4300     if self.op.shutdown:
4301       # shutdown the instance, but not the disks
4302       if not rpc.call_instance_shutdown(src_node, instance):
4303          raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4304                                   (instance.name, src_node))
4305
4306     vgname = self.cfg.GetVGName()
4307
4308     snap_disks = []
4309
4310     try:
4311       for disk in instance.disks:
4312         if disk.iv_name == "sda":
4313           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4314           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4315
4316           if not new_dev_name:
4317             logger.Error("could not snapshot block device %s on node %s" %
4318                          (disk.logical_id[1], src_node))
4319           else:
4320             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4321                                       logical_id=(vgname, new_dev_name),
4322                                       physical_id=(vgname, new_dev_name),
4323                                       iv_name=disk.iv_name)
4324             snap_disks.append(new_dev)
4325
4326     finally:
4327       if self.op.shutdown and instance.status == "up":
4328         if not rpc.call_instance_start(src_node, instance, None):
4329           _ShutdownInstanceDisks(instance, self.cfg)
4330           raise errors.OpExecError("Could not start instance")
4331
4332     # TODO: check for size
4333
4334     for dev in snap_disks:
4335       if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4336         logger.Error("could not export block device %s from node %s to node %s"
4337                      % (dev.logical_id[1], src_node, dst_node.name))
4338       if not rpc.call_blockdev_remove(src_node, dev):
4339         logger.Error("could not remove snapshot block device %s from node %s" %
4340                      (dev.logical_id[1], src_node))
4341
4342     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4343       logger.Error("could not finalize export for instance %s on node %s" %
4344                    (instance.name, dst_node.name))
4345
4346     nodelist = self.cfg.GetNodeList()
4347     nodelist.remove(dst_node.name)
4348
4349     # on one-node clusters nodelist will be empty after the removal
4350     # if we proceed the backup would be removed because OpQueryExports
4351     # substitutes an empty list with the full cluster node list.
4352     if nodelist:
4353       op = opcodes.OpQueryExports(nodes=nodelist)
4354       exportlist = self.proc.ChainOpCode(op)
4355       for node in exportlist:
4356         if instance.name in exportlist[node]:
4357           if not rpc.call_export_remove(node, instance.name):
4358             logger.Error("could not remove older export for instance %s"
4359                          " on node %s" % (instance.name, node))
4360
4361
4362 class LURemoveExport(NoHooksLU):
4363   """Remove exports related to the named instance.
4364
4365   """
4366   _OP_REQP = ["instance_name"]
4367
4368   def CheckPrereq(self):
4369     """Check prerequisites.
4370     """
4371     pass
4372
4373   def Exec(self, feedback_fn):
4374     """Remove any export.
4375
4376     """
4377     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4378     # If the instance was not found we'll try with the name that was passed in.
4379     # This will only work if it was an FQDN, though.
4380     fqdn_warn = False
4381     if not instance_name:
4382       fqdn_warn = True
4383       instance_name = self.op.instance_name
4384
4385     op = opcodes.OpQueryExports(nodes=[])
4386     exportlist = self.proc.ChainOpCode(op)
4387     found = False
4388     for node in exportlist:
4389       if instance_name in exportlist[node]:
4390         found = True
4391         if not rpc.call_export_remove(node, instance_name):
4392           logger.Error("could not remove export for instance %s"
4393                        " on node %s" % (instance_name, node))
4394
4395     if fqdn_warn and not found:
4396       feedback_fn("Export not found. If trying to remove an export belonging"
4397                   " to a deleted instance please use its Fully Qualified"
4398                   " Domain Name.")
4399
4400
4401 class TagsLU(NoHooksLU):
4402   """Generic tags LU.
4403
4404   This is an abstract class which is the parent of all the other tags LUs.
4405
4406   """
4407   def CheckPrereq(self):
4408     """Check prerequisites.
4409
4410     """
4411     if self.op.kind == constants.TAG_CLUSTER:
4412       self.target = self.cfg.GetClusterInfo()
4413     elif self.op.kind == constants.TAG_NODE:
4414       name = self.cfg.ExpandNodeName(self.op.name)
4415       if name is None:
4416         raise errors.OpPrereqError("Invalid node name (%s)" %
4417                                    (self.op.name,))
4418       self.op.name = name
4419       self.target = self.cfg.GetNodeInfo(name)
4420     elif self.op.kind == constants.TAG_INSTANCE:
4421       name = self.cfg.ExpandInstanceName(self.op.name)
4422       if name is None:
4423         raise errors.OpPrereqError("Invalid instance name (%s)" %
4424                                    (self.op.name,))
4425       self.op.name = name
4426       self.target = self.cfg.GetInstanceInfo(name)
4427     else:
4428       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4429                                  str(self.op.kind))
4430
4431
4432 class LUGetTags(TagsLU):
4433   """Returns the tags of a given object.
4434
4435   """
4436   _OP_REQP = ["kind", "name"]
4437
4438   def Exec(self, feedback_fn):
4439     """Returns the tag list.
4440
4441     """
4442     return self.target.GetTags()
4443
4444
4445 class LUSearchTags(NoHooksLU):
4446   """Searches the tags for a given pattern.
4447
4448   """
4449   _OP_REQP = ["pattern"]
4450
4451   def CheckPrereq(self):
4452     """Check prerequisites.
4453
4454     This checks the pattern passed for validity by compiling it.
4455
4456     """
4457     try:
4458       self.re = re.compile(self.op.pattern)
4459     except re.error, err:
4460       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4461                                  (self.op.pattern, err))
4462
4463   def Exec(self, feedback_fn):
4464     """Returns the tag list.
4465
4466     """
4467     cfg = self.cfg
4468     tgts = [("/cluster", cfg.GetClusterInfo())]
4469     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4470     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4471     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4472     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4473     results = []
4474     for path, target in tgts:
4475       for tag in target.GetTags():
4476         if self.re.search(tag):
4477           results.append((path, tag))
4478     return results
4479
4480
4481 class LUAddTags(TagsLU):
4482   """Sets a tag on a given object.
4483
4484   """
4485   _OP_REQP = ["kind", "name", "tags"]
4486
4487   def CheckPrereq(self):
4488     """Check prerequisites.
4489
4490     This checks the type and length of the tag name and value.
4491
4492     """
4493     TagsLU.CheckPrereq(self)
4494     for tag in self.op.tags:
4495       objects.TaggableObject.ValidateTag(tag)
4496
4497   def Exec(self, feedback_fn):
4498     """Sets the tag.
4499
4500     """
4501     try:
4502       for tag in self.op.tags:
4503         self.target.AddTag(tag)
4504     except errors.TagError, err:
4505       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4506     try:
4507       self.cfg.Update(self.target)
4508     except errors.ConfigurationError:
4509       raise errors.OpRetryError("There has been a modification to the"
4510                                 " config file and the operation has been"
4511                                 " aborted. Please retry.")
4512
4513
4514 class LUDelTags(TagsLU):
4515   """Delete a list of tags from a given object.
4516
4517   """
4518   _OP_REQP = ["kind", "name", "tags"]
4519
4520   def CheckPrereq(self):
4521     """Check prerequisites.
4522
4523     This checks that we have the given tag.
4524
4525     """
4526     TagsLU.CheckPrereq(self)
4527     for tag in self.op.tags:
4528       objects.TaggableObject.ValidateTag(tag)
4529     del_tags = frozenset(self.op.tags)
4530     cur_tags = self.target.GetTags()
4531     if not del_tags <= cur_tags:
4532       diff_tags = del_tags - cur_tags
4533       diff_names = ["'%s'" % tag for tag in diff_tags]
4534       diff_names.sort()
4535       raise errors.OpPrereqError("Tag(s) %s not found" %
4536                                  (",".join(diff_names)))
4537
4538   def Exec(self, feedback_fn):
4539     """Remove the tag from the object.
4540
4541     """
4542     for tag in self.op.tags:
4543       self.target.RemoveTag(tag)
4544     try:
4545       self.cfg.Update(self.target)
4546     except errors.ConfigurationError:
4547       raise errors.OpRetryError("There has been a modification to the"
4548                                 " config file and the operation has been"
4549                                 " aborted. Please retry.")
4550
4551 class LUTestDelay(NoHooksLU):
4552   """Sleep for a specified amount of time.
4553
4554   This LU sleeps on the master and/or nodes for a specified amount of
4555   time.
4556
4557   """
4558   _OP_REQP = ["duration", "on_master", "on_nodes"]
4559
4560   def CheckPrereq(self):
4561     """Check prerequisites.
4562
4563     This checks that we have a good list of nodes and/or the duration
4564     is valid.
4565
4566     """
4567
4568     if self.op.on_nodes:
4569       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4570
4571   def Exec(self, feedback_fn):
4572     """Do the actual sleep.
4573
4574     """
4575     if self.op.on_master:
4576       if not utils.TestDelay(self.op.duration):
4577         raise errors.OpExecError("Error during master delay test")
4578     if self.op.on_nodes:
4579       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4580       if not result:
4581         raise errors.OpExecError("Complete failure from rpc call")
4582       for node, node_result in result.items():
4583         if not node_result:
4584           raise errors.OpExecError("Failure during rpc call to node %s,"
4585                                    " result: %s" % (node, node_result))
4586
4587
4588 class IAllocator(object):
4589   """IAllocator framework.
4590
4591   An IAllocator instance has three sets of attributes:
4592     - cfg/sstore that are needed to query the cluster
4593     - input data (all members of the _KEYS class attribute are required)
4594     - four buffer attributes (in|out_data|text), that represent the
4595       input (to the external script) in text and data structure format,
4596       and the output from it, again in two formats
4597     - the result variables from the script (success, info, nodes) for
4598       easy usage
4599
4600   """
4601   _ALLO_KEYS = [
4602     "mem_size", "disks", "disk_template",
4603     "os", "tags", "nics", "vcpus",
4604     ]
4605   _RELO_KEYS = [
4606     "relocate_from",
4607     ]
4608
4609   def __init__(self, cfg, sstore, mode, name, **kwargs):
4610     self.cfg = cfg
4611     self.sstore = sstore
4612     # init buffer variables
4613     self.in_text = self.out_text = self.in_data = self.out_data = None
4614     # init all input fields so that pylint is happy
4615     self.mode = mode
4616     self.name = name
4617     self.mem_size = self.disks = self.disk_template = None
4618     self.os = self.tags = self.nics = self.vcpus = None
4619     self.relocate_from = None
4620     # computed fields
4621     self.required_nodes = None
4622     # init result fields
4623     self.success = self.info = self.nodes = None
4624     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4625       keyset = self._ALLO_KEYS
4626     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4627       keyset = self._RELO_KEYS
4628     else:
4629       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4630                                    " IAllocator" % self.mode)
4631     for key in kwargs:
4632       if key not in keyset:
4633         raise errors.ProgrammerError("Invalid input parameter '%s' to"
4634                                      " IAllocator" % key)
4635       setattr(self, key, kwargs[key])
4636     for key in keyset:
4637       if key not in kwargs:
4638         raise errors.ProgrammerError("Missing input parameter '%s' to"
4639                                      " IAllocator" % key)
4640     self._BuildInputData()
4641
4642   def _ComputeClusterData(self):
4643     """Compute the generic allocator input data.
4644
4645     This is the data that is independent of the actual operation.
4646
4647     """
4648     cfg = self.cfg
4649     # cluster data
4650     data = {
4651       "version": 1,
4652       "cluster_name": self.sstore.GetClusterName(),
4653       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4654       "hypervisor_type": self.sstore.GetHypervisorType(),
4655       # we don't have job IDs
4656       }
4657
4658     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4659
4660     # node data
4661     node_results = {}
4662     node_list = cfg.GetNodeList()
4663     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4664     for nname in node_list:
4665       ninfo = cfg.GetNodeInfo(nname)
4666       if nname not in node_data or not isinstance(node_data[nname], dict):
4667         raise errors.OpExecError("Can't get data for node %s" % nname)
4668       remote_info = node_data[nname]
4669       for attr in ['memory_total', 'memory_free', 'memory_dom0',
4670                    'vg_size', 'vg_free', 'cpu_total']:
4671         if attr not in remote_info:
4672           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4673                                    (nname, attr))
4674         try:
4675           remote_info[attr] = int(remote_info[attr])
4676         except ValueError, err:
4677           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4678                                    " %s" % (nname, attr, str(err)))
4679       # compute memory used by primary instances
4680       i_p_mem = i_p_up_mem = 0
4681       for iinfo in i_list:
4682         if iinfo.primary_node == nname:
4683           i_p_mem += iinfo.memory
4684           if iinfo.status == "up":
4685             i_p_up_mem += iinfo.memory
4686
4687       # compute memory used by instances
4688       pnr = {
4689         "tags": list(ninfo.GetTags()),
4690         "total_memory": remote_info['memory_total'],
4691         "reserved_memory": remote_info['memory_dom0'],
4692         "free_memory": remote_info['memory_free'],
4693         "i_pri_memory": i_p_mem,
4694         "i_pri_up_memory": i_p_up_mem,
4695         "total_disk": remote_info['vg_size'],
4696         "free_disk": remote_info['vg_free'],
4697         "primary_ip": ninfo.primary_ip,
4698         "secondary_ip": ninfo.secondary_ip,
4699         "total_cpus": remote_info['cpu_total'],
4700         }
4701       node_results[nname] = pnr
4702     data["nodes"] = node_results
4703
4704     # instance data
4705     instance_data = {}
4706     for iinfo in i_list:
4707       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4708                   for n in iinfo.nics]
4709       pir = {
4710         "tags": list(iinfo.GetTags()),
4711         "should_run": iinfo.status == "up",
4712         "vcpus": iinfo.vcpus,
4713         "memory": iinfo.memory,
4714         "os": iinfo.os,
4715         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4716         "nics": nic_data,
4717         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4718         "disk_template": iinfo.disk_template,
4719         }
4720       instance_data[iinfo.name] = pir
4721
4722     data["instances"] = instance_data
4723
4724     self.in_data = data
4725
4726   def _AddNewInstance(self):
4727     """Add new instance data to allocator structure.
4728
4729     This in combination with _AllocatorGetClusterData will create the
4730     correct structure needed as input for the allocator.
4731
4732     The checks for the completeness of the opcode must have already been
4733     done.
4734
4735     """
4736     data = self.in_data
4737     if len(self.disks) != 2:
4738       raise errors.OpExecError("Only two-disk configurations supported")
4739
4740     disk_space = _ComputeDiskSize(self.disk_template,
4741                                   self.disks[0]["size"], self.disks[1]["size"])
4742
4743     if self.disk_template in constants.DTS_NET_MIRROR:
4744       self.required_nodes = 2
4745     else:
4746       self.required_nodes = 1
4747     request = {
4748       "type": "allocate",
4749       "name": self.name,
4750       "disk_template": self.disk_template,
4751       "tags": self.tags,
4752       "os": self.os,
4753       "vcpus": self.vcpus,
4754       "memory": self.mem_size,
4755       "disks": self.disks,
4756       "disk_space_total": disk_space,
4757       "nics": self.nics,
4758       "required_nodes": self.required_nodes,
4759       }
4760     data["request"] = request
4761
4762   def _AddRelocateInstance(self):
4763     """Add relocate instance data to allocator structure.
4764
4765     This in combination with _IAllocatorGetClusterData will create the
4766     correct structure needed as input for the allocator.
4767
4768     The checks for the completeness of the opcode must have already been
4769     done.
4770
4771     """
4772     instance = self.cfg.GetInstanceInfo(self.name)
4773     if instance is None:
4774       raise errors.ProgrammerError("Unknown instance '%s' passed to"
4775                                    " IAllocator" % self.name)
4776
4777     if instance.disk_template not in constants.DTS_NET_MIRROR:
4778       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4779
4780     if len(instance.secondary_nodes) != 1:
4781       raise errors.OpPrereqError("Instance has not exactly one secondary node")
4782
4783     self.required_nodes = 1
4784
4785     disk_space = _ComputeDiskSize(instance.disk_template,
4786                                   instance.disks[0].size,
4787                                   instance.disks[1].size)
4788
4789     request = {
4790       "type": "relocate",
4791       "name": self.name,
4792       "disk_space_total": disk_space,
4793       "required_nodes": self.required_nodes,
4794       "relocate_from": self.relocate_from,
4795       }
4796     self.in_data["request"] = request
4797
4798   def _BuildInputData(self):
4799     """Build input data structures.
4800
4801     """
4802     self._ComputeClusterData()
4803
4804     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4805       self._AddNewInstance()
4806     else:
4807       self._AddRelocateInstance()
4808
4809     self.in_text = serializer.Dump(self.in_data)
4810
4811   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4812     """Run an instance allocator and return the results.
4813
4814     """
4815     data = self.in_text
4816
4817     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4818
4819     if not isinstance(result, tuple) or len(result) != 4:
4820       raise errors.OpExecError("Invalid result from master iallocator runner")
4821
4822     rcode, stdout, stderr, fail = result
4823
4824     if rcode == constants.IARUN_NOTFOUND:
4825       raise errors.OpExecError("Can't find allocator '%s'" % name)
4826     elif rcode == constants.IARUN_FAILURE:
4827         raise errors.OpExecError("Instance allocator call failed: %s,"
4828                                  " output: %s" %
4829                                  (fail, stdout+stderr))
4830     self.out_text = stdout
4831     if validate:
4832       self._ValidateResult()
4833
4834   def _ValidateResult(self):
4835     """Process the allocator results.
4836
4837     This will process and if successful save the result in
4838     self.out_data and the other parameters.
4839
4840     """
4841     try:
4842       rdict = serializer.Load(self.out_text)
4843     except Exception, err:
4844       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4845
4846     if not isinstance(rdict, dict):
4847       raise errors.OpExecError("Can't parse iallocator results: not a dict")
4848
4849     for key in "success", "info", "nodes":
4850       if key not in rdict:
4851         raise errors.OpExecError("Can't parse iallocator results:"
4852                                  " missing key '%s'" % key)
4853       setattr(self, key, rdict[key])
4854
4855     if not isinstance(rdict["nodes"], list):
4856       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4857                                " is not a list")
4858     self.out_data = rdict
4859
4860
4861 class LUTestAllocator(NoHooksLU):
4862   """Run allocator tests.
4863
4864   This LU runs the allocator tests
4865
4866   """
4867   _OP_REQP = ["direction", "mode", "name"]
4868
4869   def CheckPrereq(self):
4870     """Check prerequisites.
4871
4872     This checks the opcode parameters depending on the director and mode test.
4873
4874     """
4875     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4876       for attr in ["name", "mem_size", "disks", "disk_template",
4877                    "os", "tags", "nics", "vcpus"]:
4878         if not hasattr(self.op, attr):
4879           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4880                                      attr)
4881       iname = self.cfg.ExpandInstanceName(self.op.name)
4882       if iname is not None:
4883         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4884                                    iname)
4885       if not isinstance(self.op.nics, list):
4886         raise errors.OpPrereqError("Invalid parameter 'nics'")
4887       for row in self.op.nics:
4888         if (not isinstance(row, dict) or
4889             "mac" not in row or
4890             "ip" not in row or
4891             "bridge" not in row):
4892           raise errors.OpPrereqError("Invalid contents of the"
4893                                      " 'nics' parameter")
4894       if not isinstance(self.op.disks, list):
4895         raise errors.OpPrereqError("Invalid parameter 'disks'")
4896       if len(self.op.disks) != 2:
4897         raise errors.OpPrereqError("Only two-disk configurations supported")
4898       for row in self.op.disks:
4899         if (not isinstance(row, dict) or
4900             "size" not in row or
4901             not isinstance(row["size"], int) or
4902             "mode" not in row or
4903             row["mode"] not in ['r', 'w']):
4904           raise errors.OpPrereqError("Invalid contents of the"
4905                                      " 'disks' parameter")
4906     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4907       if not hasattr(self.op, "name"):
4908         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4909       fname = self.cfg.ExpandInstanceName(self.op.name)
4910       if fname is None:
4911         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4912                                    self.op.name)
4913       self.op.name = fname
4914       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4915     else:
4916       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4917                                  self.op.mode)
4918
4919     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4920       if not hasattr(self.op, "allocator") or self.op.allocator is None:
4921         raise errors.OpPrereqError("Missing allocator name")
4922     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4923       raise errors.OpPrereqError("Wrong allocator test '%s'" %
4924                                  self.op.direction)
4925
4926   def Exec(self, feedback_fn):
4927     """Run the allocator test.
4928
4929     """
4930     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4931       ial = IAllocator(self.cfg, self.sstore,
4932                        mode=self.op.mode,
4933                        name=self.op.name,
4934                        mem_size=self.op.mem_size,
4935                        disks=self.op.disks,
4936                        disk_template=self.op.disk_template,
4937                        os=self.op.os,
4938                        tags=self.op.tags,
4939                        nics=self.op.nics,
4940                        vcpus=self.op.vcpus,
4941                        )
4942     else:
4943       ial = IAllocator(self.cfg, self.sstore,
4944                        mode=self.op.mode,
4945                        name=self.op.name,
4946                        relocate_from=list(self.relocate_from),
4947                        )
4948
4949     if self.op.direction == constants.IALLOCATOR_DIR_IN:
4950       result = ial.in_text
4951     else:
4952       ial.Run(self.op.allocator, validate=False)
4953       result = ial.out_text
4954     return result