serializer.DumpJson: Control indentation by parameter
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46 from ganeti import serializer
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement CheckPrereq which also fills in the opcode instance
54       with all the fields (even if as None)
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_MASTER: the LU needs to run on the master node
60         REQ_WSSTORE: the LU needs a writable SimpleStore
61         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
62
63   Note that all commands require root permissions.
64
65   """
66   HPATH = None
67   HTYPE = None
68   _OP_REQP = []
69   REQ_MASTER = True
70   REQ_WSSTORE = False
71   REQ_BGL = True
72
73   def __init__(self, processor, op, context, sstore):
74     """Constructor for LogicalUnit.
75
76     This needs to be overriden in derived classes in order to check op
77     validity.
78
79     """
80     self.proc = processor
81     self.op = op
82     self.cfg = context.cfg
83     self.sstore = sstore
84     self.context = context
85     self.__ssh = None
86
87     for attr_name in self._OP_REQP:
88       attr_val = getattr(op, attr_name, None)
89       if attr_val is None:
90         raise errors.OpPrereqError("Required parameter '%s' missing" %
91                                    attr_name)
92
93     if not self.cfg.IsCluster():
94       raise errors.OpPrereqError("Cluster not initialized yet,"
95                                  " use 'gnt-cluster init' first.")
96     if self.REQ_MASTER:
97       master = sstore.GetMasterNode()
98       if master != utils.HostInfo().name:
99         raise errors.OpPrereqError("Commands must be run on the master"
100                                    " node %s" % master)
101
102   def __GetSSH(self):
103     """Returns the SshRunner object
104
105     """
106     if not self.__ssh:
107       self.__ssh = ssh.SshRunner(self.sstore)
108     return self.__ssh
109
110   ssh = property(fget=__GetSSH)
111
112   def CheckPrereq(self):
113     """Check prerequisites for this LU.
114
115     This method should check that the prerequisites for the execution
116     of this LU are fulfilled. It can do internode communication, but
117     it should be idempotent - no cluster or system changes are
118     allowed.
119
120     The method should raise errors.OpPrereqError in case something is
121     not fulfilled. Its return value is ignored.
122
123     This method should also update all the parameters of the opcode to
124     their canonical form; e.g. a short node name must be fully
125     expanded after this method has successfully completed (so that
126     hooks, logging, etc. work correctly).
127
128     """
129     raise NotImplementedError
130
131   def Exec(self, feedback_fn):
132     """Execute the LU.
133
134     This method should implement the actual work. It should raise
135     errors.OpExecError for failures that are somewhat dealt with in
136     code, or expected.
137
138     """
139     raise NotImplementedError
140
141   def BuildHooksEnv(self):
142     """Build hooks environment for this LU.
143
144     This method should return a three-node tuple consisting of: a dict
145     containing the environment that will be used for running the
146     specific hook for this LU, a list of node names on which the hook
147     should run before the execution, and a list of node names on which
148     the hook should run after the execution.
149
150     The keys of the dict must not have 'GANETI_' prefixed as this will
151     be handled in the hooks runner. Also note additional keys will be
152     added by the hooks runner. If the LU doesn't define any
153     environment, an empty dict (and not None) should be returned.
154
155     No nodes should be returned as an empty list (and not None).
156
157     Note that if the HPATH for a LU class is None, this function will
158     not be called.
159
160     """
161     raise NotImplementedError
162
163   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
164     """Notify the LU about the results of its hooks.
165
166     This method is called every time a hooks phase is executed, and notifies
167     the Logical Unit about the hooks' result. The LU can then use it to alter
168     its result based on the hooks.  By default the method does nothing and the
169     previous result is passed back unchanged but any LU can define it if it
170     wants to use the local cluster hook-scripts somehow.
171
172     Args:
173       phase: the hooks phase that has just been run
174       hooks_results: the results of the multi-node hooks rpc call
175       feedback_fn: function to send feedback back to the caller
176       lu_result: the previous result this LU had, or None in the PRE phase.
177
178     """
179     return lu_result
180
181
182 class NoHooksLU(LogicalUnit):
183   """Simple LU which runs no hooks.
184
185   This LU is intended as a parent for other LogicalUnits which will
186   run no hooks, in order to reduce duplicate code.
187
188   """
189   HPATH = None
190   HTYPE = None
191
192
193 def _GetWantedNodes(lu, nodes):
194   """Returns list of checked and expanded node names.
195
196   Args:
197     nodes: List of nodes (strings) or None for all
198
199   """
200   if not isinstance(nodes, list):
201     raise errors.OpPrereqError("Invalid argument type 'nodes'")
202
203   if nodes:
204     wanted = []
205
206     for name in nodes:
207       node = lu.cfg.ExpandNodeName(name)
208       if node is None:
209         raise errors.OpPrereqError("No such node name '%s'" % name)
210       wanted.append(node)
211
212   else:
213     wanted = lu.cfg.GetNodeList()
214   return utils.NiceSort(wanted)
215
216
217 def _GetWantedInstances(lu, instances):
218   """Returns list of checked and expanded instance names.
219
220   Args:
221     instances: List of instances (strings) or None for all
222
223   """
224   if not isinstance(instances, list):
225     raise errors.OpPrereqError("Invalid argument type 'instances'")
226
227   if instances:
228     wanted = []
229
230     for name in instances:
231       instance = lu.cfg.ExpandInstanceName(name)
232       if instance is None:
233         raise errors.OpPrereqError("No such instance name '%s'" % name)
234       wanted.append(instance)
235
236   else:
237     wanted = lu.cfg.GetInstanceList()
238   return utils.NiceSort(wanted)
239
240
241 def _CheckOutputFields(static, dynamic, selected):
242   """Checks whether all selected fields are valid.
243
244   Args:
245     static: Static fields
246     dynamic: Dynamic fields
247
248   """
249   static_fields = frozenset(static)
250   dynamic_fields = frozenset(dynamic)
251
252   all_fields = static_fields | dynamic_fields
253
254   if not all_fields.issuperset(selected):
255     raise errors.OpPrereqError("Unknown output fields selected: %s"
256                                % ",".join(frozenset(selected).
257                                           difference(all_fields)))
258
259
260 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
261                           memory, vcpus, nics):
262   """Builds instance related env variables for hooks from single variables.
263
264   Args:
265     secondary_nodes: List of secondary nodes as strings
266   """
267   env = {
268     "OP_TARGET": name,
269     "INSTANCE_NAME": name,
270     "INSTANCE_PRIMARY": primary_node,
271     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
272     "INSTANCE_OS_TYPE": os_type,
273     "INSTANCE_STATUS": status,
274     "INSTANCE_MEMORY": memory,
275     "INSTANCE_VCPUS": vcpus,
276   }
277
278   if nics:
279     nic_count = len(nics)
280     for idx, (ip, bridge, mac) in enumerate(nics):
281       if ip is None:
282         ip = ""
283       env["INSTANCE_NIC%d_IP" % idx] = ip
284       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
285       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
286   else:
287     nic_count = 0
288
289   env["INSTANCE_NIC_COUNT"] = nic_count
290
291   return env
292
293
294 def _BuildInstanceHookEnvByObject(instance, override=None):
295   """Builds instance related env variables for hooks from an object.
296
297   Args:
298     instance: objects.Instance object of instance
299     override: dict of values to override
300   """
301   args = {
302     'name': instance.name,
303     'primary_node': instance.primary_node,
304     'secondary_nodes': instance.secondary_nodes,
305     'os_type': instance.os,
306     'status': instance.os,
307     'memory': instance.memory,
308     'vcpus': instance.vcpus,
309     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
310   }
311   if override:
312     args.update(override)
313   return _BuildInstanceHookEnv(**args)
314
315
316 def _CheckInstanceBridgesExist(instance):
317   """Check that the brigdes needed by an instance exist.
318
319   """
320   # check bridges existance
321   brlist = [nic.bridge for nic in instance.nics]
322   if not rpc.call_bridges_exist(instance.primary_node, brlist):
323     raise errors.OpPrereqError("one or more target bridges %s does not"
324                                " exist on destination node '%s'" %
325                                (brlist, instance.primary_node))
326
327
328 class LUDestroyCluster(NoHooksLU):
329   """Logical unit for destroying the cluster.
330
331   """
332   _OP_REQP = []
333
334   def CheckPrereq(self):
335     """Check prerequisites.
336
337     This checks whether the cluster is empty.
338
339     Any errors are signalled by raising errors.OpPrereqError.
340
341     """
342     master = self.sstore.GetMasterNode()
343
344     nodelist = self.cfg.GetNodeList()
345     if len(nodelist) != 1 or nodelist[0] != master:
346       raise errors.OpPrereqError("There are still %d node(s) in"
347                                  " this cluster." % (len(nodelist) - 1))
348     instancelist = self.cfg.GetInstanceList()
349     if instancelist:
350       raise errors.OpPrereqError("There are still %d instance(s) in"
351                                  " this cluster." % len(instancelist))
352
353   def Exec(self, feedback_fn):
354     """Destroys the cluster.
355
356     """
357     master = self.sstore.GetMasterNode()
358     if not rpc.call_node_stop_master(master):
359       raise errors.OpExecError("Could not disable the master role")
360     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
361     utils.CreateBackup(priv_key)
362     utils.CreateBackup(pub_key)
363     rpc.call_node_leave_cluster(master)
364
365
366 class LUVerifyCluster(LogicalUnit):
367   """Verifies the cluster status.
368
369   """
370   HPATH = "cluster-verify"
371   HTYPE = constants.HTYPE_CLUSTER
372   _OP_REQP = ["skip_checks"]
373
374   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
375                   remote_version, feedback_fn):
376     """Run multiple tests against a node.
377
378     Test list:
379       - compares ganeti version
380       - checks vg existance and size > 20G
381       - checks config file checksum
382       - checks ssh to other nodes
383
384     Args:
385       node: name of the node to check
386       file_list: required list of files
387       local_cksum: dictionary of local files and their checksums
388
389     """
390     # compares ganeti version
391     local_version = constants.PROTOCOL_VERSION
392     if not remote_version:
393       feedback_fn("  - ERROR: connection to %s failed" % (node))
394       return True
395
396     if local_version != remote_version:
397       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
398                       (local_version, node, remote_version))
399       return True
400
401     # checks vg existance and size > 20G
402
403     bad = False
404     if not vglist:
405       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
406                       (node,))
407       bad = True
408     else:
409       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
410                                             constants.MIN_VG_SIZE)
411       if vgstatus:
412         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
413         bad = True
414
415     # checks config file checksum
416     # checks ssh to any
417
418     if 'filelist' not in node_result:
419       bad = True
420       feedback_fn("  - ERROR: node hasn't returned file checksum data")
421     else:
422       remote_cksum = node_result['filelist']
423       for file_name in file_list:
424         if file_name not in remote_cksum:
425           bad = True
426           feedback_fn("  - ERROR: file '%s' missing" % file_name)
427         elif remote_cksum[file_name] != local_cksum[file_name]:
428           bad = True
429           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
430
431     if 'nodelist' not in node_result:
432       bad = True
433       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
434     else:
435       if node_result['nodelist']:
436         bad = True
437         for node in node_result['nodelist']:
438           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
439                           (node, node_result['nodelist'][node]))
440     if 'node-net-test' not in node_result:
441       bad = True
442       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
443     else:
444       if node_result['node-net-test']:
445         bad = True
446         nlist = utils.NiceSort(node_result['node-net-test'].keys())
447         for node in nlist:
448           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
449                           (node, node_result['node-net-test'][node]))
450
451     hyp_result = node_result.get('hypervisor', None)
452     if hyp_result is not None:
453       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
454     return bad
455
456   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
457                       node_instance, feedback_fn):
458     """Verify an instance.
459
460     This function checks to see if the required block devices are
461     available on the instance's node.
462
463     """
464     bad = False
465
466     node_current = instanceconfig.primary_node
467
468     node_vol_should = {}
469     instanceconfig.MapLVsByNode(node_vol_should)
470
471     for node in node_vol_should:
472       for volume in node_vol_should[node]:
473         if node not in node_vol_is or volume not in node_vol_is[node]:
474           feedback_fn("  - ERROR: volume %s missing on node %s" %
475                           (volume, node))
476           bad = True
477
478     if not instanceconfig.status == 'down':
479       if (node_current not in node_instance or
480           not instance in node_instance[node_current]):
481         feedback_fn("  - ERROR: instance %s not running on node %s" %
482                         (instance, node_current))
483         bad = True
484
485     for node in node_instance:
486       if (not node == node_current):
487         if instance in node_instance[node]:
488           feedback_fn("  - ERROR: instance %s should not run on node %s" %
489                           (instance, node))
490           bad = True
491
492     return bad
493
494   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
495     """Verify if there are any unknown volumes in the cluster.
496
497     The .os, .swap and backup volumes are ignored. All other volumes are
498     reported as unknown.
499
500     """
501     bad = False
502
503     for node in node_vol_is:
504       for volume in node_vol_is[node]:
505         if node not in node_vol_should or volume not in node_vol_should[node]:
506           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
507                       (volume, node))
508           bad = True
509     return bad
510
511   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
512     """Verify the list of running instances.
513
514     This checks what instances are running but unknown to the cluster.
515
516     """
517     bad = False
518     for node in node_instance:
519       for runninginstance in node_instance[node]:
520         if runninginstance not in instancelist:
521           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
522                           (runninginstance, node))
523           bad = True
524     return bad
525
526   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
527     """Verify N+1 Memory Resilience.
528
529     Check that if one single node dies we can still start all the instances it
530     was primary for.
531
532     """
533     bad = False
534
535     for node, nodeinfo in node_info.iteritems():
536       # This code checks that every node which is now listed as secondary has
537       # enough memory to host all instances it is supposed to should a single
538       # other node in the cluster fail.
539       # FIXME: not ready for failover to an arbitrary node
540       # FIXME: does not support file-backed instances
541       # WARNING: we currently take into account down instances as well as up
542       # ones, considering that even if they're down someone might want to start
543       # them even in the event of a node failure.
544       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
545         needed_mem = 0
546         for instance in instances:
547           needed_mem += instance_cfg[instance].memory
548         if nodeinfo['mfree'] < needed_mem:
549           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
550                       " failovers should node %s fail" % (node, prinode))
551           bad = True
552     return bad
553
554   def CheckPrereq(self):
555     """Check prerequisites.
556
557     Transform the list of checks we're going to skip into a set and check that
558     all its members are valid.
559
560     """
561     self.skip_set = frozenset(self.op.skip_checks)
562     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
563       raise errors.OpPrereqError("Invalid checks to be skipped specified")
564
565   def BuildHooksEnv(self):
566     """Build hooks env.
567
568     Cluster-Verify hooks just rone in the post phase and their failure makes
569     the output be logged in the verify output and the verification to fail.
570
571     """
572     all_nodes = self.cfg.GetNodeList()
573     # TODO: populate the environment with useful information for verify hooks
574     env = {}
575     return env, [], all_nodes
576
577   def Exec(self, feedback_fn):
578     """Verify integrity of cluster, performing various test on nodes.
579
580     """
581     bad = False
582     feedback_fn("* Verifying global settings")
583     for msg in self.cfg.VerifyConfig():
584       feedback_fn("  - ERROR: %s" % msg)
585
586     vg_name = self.cfg.GetVGName()
587     nodelist = utils.NiceSort(self.cfg.GetNodeList())
588     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
589     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
590     i_non_redundant = [] # Non redundant instances
591     node_volume = {}
592     node_instance = {}
593     node_info = {}
594     instance_cfg = {}
595
596     # FIXME: verify OS list
597     # do local checksums
598     file_names = list(self.sstore.GetFileList())
599     file_names.append(constants.SSL_CERT_FILE)
600     file_names.append(constants.CLUSTER_CONF_FILE)
601     local_checksums = utils.FingerprintFiles(file_names)
602
603     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
604     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
605     all_instanceinfo = rpc.call_instance_list(nodelist)
606     all_vglist = rpc.call_vg_list(nodelist)
607     node_verify_param = {
608       'filelist': file_names,
609       'nodelist': nodelist,
610       'hypervisor': None,
611       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
612                         for node in nodeinfo]
613       }
614     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
615     all_rversion = rpc.call_version(nodelist)
616     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
617
618     for node in nodelist:
619       feedback_fn("* Verifying node %s" % node)
620       result = self._VerifyNode(node, file_names, local_checksums,
621                                 all_vglist[node], all_nvinfo[node],
622                                 all_rversion[node], feedback_fn)
623       bad = bad or result
624
625       # node_volume
626       volumeinfo = all_volumeinfo[node]
627
628       if isinstance(volumeinfo, basestring):
629         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
630                     (node, volumeinfo[-400:].encode('string_escape')))
631         bad = True
632         node_volume[node] = {}
633       elif not isinstance(volumeinfo, dict):
634         feedback_fn("  - ERROR: connection to %s failed" % (node,))
635         bad = True
636         continue
637       else:
638         node_volume[node] = volumeinfo
639
640       # node_instance
641       nodeinstance = all_instanceinfo[node]
642       if type(nodeinstance) != list:
643         feedback_fn("  - ERROR: connection to %s failed" % (node,))
644         bad = True
645         continue
646
647       node_instance[node] = nodeinstance
648
649       # node_info
650       nodeinfo = all_ninfo[node]
651       if not isinstance(nodeinfo, dict):
652         feedback_fn("  - ERROR: connection to %s failed" % (node,))
653         bad = True
654         continue
655
656       try:
657         node_info[node] = {
658           "mfree": int(nodeinfo['memory_free']),
659           "dfree": int(nodeinfo['vg_free']),
660           "pinst": [],
661           "sinst": [],
662           # dictionary holding all instances this node is secondary for,
663           # grouped by their primary node. Each key is a cluster node, and each
664           # value is a list of instances which have the key as primary and the
665           # current node as secondary.  this is handy to calculate N+1 memory
666           # availability if you can only failover from a primary to its
667           # secondary.
668           "sinst-by-pnode": {},
669         }
670       except ValueError:
671         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
672         bad = True
673         continue
674
675     node_vol_should = {}
676
677     for instance in instancelist:
678       feedback_fn("* Verifying instance %s" % instance)
679       inst_config = self.cfg.GetInstanceInfo(instance)
680       result =  self._VerifyInstance(instance, inst_config, node_volume,
681                                      node_instance, feedback_fn)
682       bad = bad or result
683
684       inst_config.MapLVsByNode(node_vol_should)
685
686       instance_cfg[instance] = inst_config
687
688       pnode = inst_config.primary_node
689       if pnode in node_info:
690         node_info[pnode]['pinst'].append(instance)
691       else:
692         feedback_fn("  - ERROR: instance %s, connection to primary node"
693                     " %s failed" % (instance, pnode))
694         bad = True
695
696       # If the instance is non-redundant we cannot survive losing its primary
697       # node, so we are not N+1 compliant. On the other hand we have no disk
698       # templates with more than one secondary so that situation is not well
699       # supported either.
700       # FIXME: does not support file-backed instances
701       if len(inst_config.secondary_nodes) == 0:
702         i_non_redundant.append(instance)
703       elif len(inst_config.secondary_nodes) > 1:
704         feedback_fn("  - WARNING: multiple secondaries for instance %s"
705                     % instance)
706
707       for snode in inst_config.secondary_nodes:
708         if snode in node_info:
709           node_info[snode]['sinst'].append(instance)
710           if pnode not in node_info[snode]['sinst-by-pnode']:
711             node_info[snode]['sinst-by-pnode'][pnode] = []
712           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
713         else:
714           feedback_fn("  - ERROR: instance %s, connection to secondary node"
715                       " %s failed" % (instance, snode))
716
717     feedback_fn("* Verifying orphan volumes")
718     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
719                                        feedback_fn)
720     bad = bad or result
721
722     feedback_fn("* Verifying remaining instances")
723     result = self._VerifyOrphanInstances(instancelist, node_instance,
724                                          feedback_fn)
725     bad = bad or result
726
727     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
728       feedback_fn("* Verifying N+1 Memory redundancy")
729       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
730       bad = bad or result
731
732     feedback_fn("* Other Notes")
733     if i_non_redundant:
734       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
735                   % len(i_non_redundant))
736
737     return int(bad)
738
739   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
740     """Analize the post-hooks' result, handle it, and send some
741     nicely-formatted feedback back to the user.
742
743     Args:
744       phase: the hooks phase that has just been run
745       hooks_results: the results of the multi-node hooks rpc call
746       feedback_fn: function to send feedback back to the caller
747       lu_result: previous Exec result
748
749     """
750     # We only really run POST phase hooks, and are only interested in their results
751     if phase == constants.HOOKS_PHASE_POST:
752       # Used to change hooks' output to proper indentation
753       indent_re = re.compile('^', re.M)
754       feedback_fn("* Hooks Results")
755       if not hooks_results:
756         feedback_fn("  - ERROR: general communication failure")
757         lu_result = 1
758       else:
759         for node_name in hooks_results:
760           show_node_header = True
761           res = hooks_results[node_name]
762           if res is False or not isinstance(res, list):
763             feedback_fn("    Communication failure")
764             lu_result = 1
765             continue
766           for script, hkr, output in res:
767             if hkr == constants.HKR_FAIL:
768               # The node header is only shown once, if there are
769               # failing hooks on that node
770               if show_node_header:
771                 feedback_fn("  Node %s:" % node_name)
772                 show_node_header = False
773               feedback_fn("    ERROR: Script %s failed, output:" % script)
774               output = indent_re.sub('      ', output)
775               feedback_fn("%s" % output)
776               lu_result = 1
777
778       return lu_result
779
780
781 class LUVerifyDisks(NoHooksLU):
782   """Verifies the cluster disks status.
783
784   """
785   _OP_REQP = []
786
787   def CheckPrereq(self):
788     """Check prerequisites.
789
790     This has no prerequisites.
791
792     """
793     pass
794
795   def Exec(self, feedback_fn):
796     """Verify integrity of cluster disks.
797
798     """
799     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
800
801     vg_name = self.cfg.GetVGName()
802     nodes = utils.NiceSort(self.cfg.GetNodeList())
803     instances = [self.cfg.GetInstanceInfo(name)
804                  for name in self.cfg.GetInstanceList()]
805
806     nv_dict = {}
807     for inst in instances:
808       inst_lvs = {}
809       if (inst.status != "up" or
810           inst.disk_template not in constants.DTS_NET_MIRROR):
811         continue
812       inst.MapLVsByNode(inst_lvs)
813       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
814       for node, vol_list in inst_lvs.iteritems():
815         for vol in vol_list:
816           nv_dict[(node, vol)] = inst
817
818     if not nv_dict:
819       return result
820
821     node_lvs = rpc.call_volume_list(nodes, vg_name)
822
823     to_act = set()
824     for node in nodes:
825       # node_volume
826       lvs = node_lvs[node]
827
828       if isinstance(lvs, basestring):
829         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
830         res_nlvm[node] = lvs
831       elif not isinstance(lvs, dict):
832         logger.Info("connection to node %s failed or invalid data returned" %
833                     (node,))
834         res_nodes.append(node)
835         continue
836
837       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
838         inst = nv_dict.pop((node, lv_name), None)
839         if (not lv_online and inst is not None
840             and inst.name not in res_instances):
841           res_instances.append(inst.name)
842
843     # any leftover items in nv_dict are missing LVs, let's arrange the
844     # data better
845     for key, inst in nv_dict.iteritems():
846       if inst.name not in res_missing:
847         res_missing[inst.name] = []
848       res_missing[inst.name].append(key)
849
850     return result
851
852
853 class LURenameCluster(LogicalUnit):
854   """Rename the cluster.
855
856   """
857   HPATH = "cluster-rename"
858   HTYPE = constants.HTYPE_CLUSTER
859   _OP_REQP = ["name"]
860   REQ_WSSTORE = True
861
862   def BuildHooksEnv(self):
863     """Build hooks env.
864
865     """
866     env = {
867       "OP_TARGET": self.sstore.GetClusterName(),
868       "NEW_NAME": self.op.name,
869       }
870     mn = self.sstore.GetMasterNode()
871     return env, [mn], [mn]
872
873   def CheckPrereq(self):
874     """Verify that the passed name is a valid one.
875
876     """
877     hostname = utils.HostInfo(self.op.name)
878
879     new_name = hostname.name
880     self.ip = new_ip = hostname.ip
881     old_name = self.sstore.GetClusterName()
882     old_ip = self.sstore.GetMasterIP()
883     if new_name == old_name and new_ip == old_ip:
884       raise errors.OpPrereqError("Neither the name nor the IP address of the"
885                                  " cluster has changed")
886     if new_ip != old_ip:
887       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
888         raise errors.OpPrereqError("The given cluster IP address (%s) is"
889                                    " reachable on the network. Aborting." %
890                                    new_ip)
891
892     self.op.name = new_name
893
894   def Exec(self, feedback_fn):
895     """Rename the cluster.
896
897     """
898     clustername = self.op.name
899     ip = self.ip
900     ss = self.sstore
901
902     # shutdown the master IP
903     master = ss.GetMasterNode()
904     if not rpc.call_node_stop_master(master):
905       raise errors.OpExecError("Could not disable the master role")
906
907     try:
908       # modify the sstore
909       ss.SetKey(ss.SS_MASTER_IP, ip)
910       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
911
912       # Distribute updated ss config to all nodes
913       myself = self.cfg.GetNodeInfo(master)
914       dist_nodes = self.cfg.GetNodeList()
915       if myself.name in dist_nodes:
916         dist_nodes.remove(myself.name)
917
918       logger.Debug("Copying updated ssconf data to all nodes")
919       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
920         fname = ss.KeyToFilename(keyname)
921         result = rpc.call_upload_file(dist_nodes, fname)
922         for to_node in dist_nodes:
923           if not result[to_node]:
924             logger.Error("copy of file %s to node %s failed" %
925                          (fname, to_node))
926     finally:
927       if not rpc.call_node_start_master(master):
928         logger.Error("Could not re-enable the master role on the master,"
929                      " please restart manually.")
930
931
932 def _RecursiveCheckIfLVMBased(disk):
933   """Check if the given disk or its children are lvm-based.
934
935   Args:
936     disk: ganeti.objects.Disk object
937
938   Returns:
939     boolean indicating whether a LD_LV dev_type was found or not
940
941   """
942   if disk.children:
943     for chdisk in disk.children:
944       if _RecursiveCheckIfLVMBased(chdisk):
945         return True
946   return disk.dev_type == constants.LD_LV
947
948
949 class LUSetClusterParams(LogicalUnit):
950   """Change the parameters of the cluster.
951
952   """
953   HPATH = "cluster-modify"
954   HTYPE = constants.HTYPE_CLUSTER
955   _OP_REQP = []
956
957   def BuildHooksEnv(self):
958     """Build hooks env.
959
960     """
961     env = {
962       "OP_TARGET": self.sstore.GetClusterName(),
963       "NEW_VG_NAME": self.op.vg_name,
964       }
965     mn = self.sstore.GetMasterNode()
966     return env, [mn], [mn]
967
968   def CheckPrereq(self):
969     """Check prerequisites.
970
971     This checks whether the given params don't conflict and
972     if the given volume group is valid.
973
974     """
975     if not self.op.vg_name:
976       instances = [self.cfg.GetInstanceInfo(name)
977                    for name in self.cfg.GetInstanceList()]
978       for inst in instances:
979         for disk in inst.disks:
980           if _RecursiveCheckIfLVMBased(disk):
981             raise errors.OpPrereqError("Cannot disable lvm storage while"
982                                        " lvm-based instances exist")
983
984     # if vg_name not None, checks given volume group on all nodes
985     if self.op.vg_name:
986       node_list = self.cfg.GetNodeList()
987       vglist = rpc.call_vg_list(node_list)
988       for node in node_list:
989         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
990                                               constants.MIN_VG_SIZE)
991         if vgstatus:
992           raise errors.OpPrereqError("Error on node '%s': %s" %
993                                      (node, vgstatus))
994
995   def Exec(self, feedback_fn):
996     """Change the parameters of the cluster.
997
998     """
999     if self.op.vg_name != self.cfg.GetVGName():
1000       self.cfg.SetVGName(self.op.vg_name)
1001     else:
1002       feedback_fn("Cluster LVM configuration already in desired"
1003                   " state, not changing")
1004
1005
1006 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1007   """Sleep and poll for an instance's disk to sync.
1008
1009   """
1010   if not instance.disks:
1011     return True
1012
1013   if not oneshot:
1014     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1015
1016   node = instance.primary_node
1017
1018   for dev in instance.disks:
1019     cfgw.SetDiskID(dev, node)
1020
1021   retries = 0
1022   while True:
1023     max_time = 0
1024     done = True
1025     cumul_degraded = False
1026     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1027     if not rstats:
1028       proc.LogWarning("Can't get any data from node %s" % node)
1029       retries += 1
1030       if retries >= 10:
1031         raise errors.RemoteError("Can't contact node %s for mirror data,"
1032                                  " aborting." % node)
1033       time.sleep(6)
1034       continue
1035     retries = 0
1036     for i in range(len(rstats)):
1037       mstat = rstats[i]
1038       if mstat is None:
1039         proc.LogWarning("Can't compute data for node %s/%s" %
1040                         (node, instance.disks[i].iv_name))
1041         continue
1042       # we ignore the ldisk parameter
1043       perc_done, est_time, is_degraded, _ = mstat
1044       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1045       if perc_done is not None:
1046         done = False
1047         if est_time is not None:
1048           rem_time = "%d estimated seconds remaining" % est_time
1049           max_time = est_time
1050         else:
1051           rem_time = "no time estimate"
1052         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1053                      (instance.disks[i].iv_name, perc_done, rem_time))
1054     if done or oneshot:
1055       break
1056
1057     if unlock:
1058       #utils.Unlock('cmd')
1059       pass
1060     try:
1061       time.sleep(min(60, max_time))
1062     finally:
1063       if unlock:
1064         #utils.Lock('cmd')
1065         pass
1066
1067   if done:
1068     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1069   return not cumul_degraded
1070
1071
1072 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1073   """Check that mirrors are not degraded.
1074
1075   The ldisk parameter, if True, will change the test from the
1076   is_degraded attribute (which represents overall non-ok status for
1077   the device(s)) to the ldisk (representing the local storage status).
1078
1079   """
1080   cfgw.SetDiskID(dev, node)
1081   if ldisk:
1082     idx = 6
1083   else:
1084     idx = 5
1085
1086   result = True
1087   if on_primary or dev.AssembleOnSecondary():
1088     rstats = rpc.call_blockdev_find(node, dev)
1089     if not rstats:
1090       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1091       result = False
1092     else:
1093       result = result and (not rstats[idx])
1094   if dev.children:
1095     for child in dev.children:
1096       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1097
1098   return result
1099
1100
1101 class LUDiagnoseOS(NoHooksLU):
1102   """Logical unit for OS diagnose/query.
1103
1104   """
1105   _OP_REQP = ["output_fields", "names"]
1106
1107   def CheckPrereq(self):
1108     """Check prerequisites.
1109
1110     This always succeeds, since this is a pure query LU.
1111
1112     """
1113     if self.op.names:
1114       raise errors.OpPrereqError("Selective OS query not supported")
1115
1116     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1117     _CheckOutputFields(static=[],
1118                        dynamic=self.dynamic_fields,
1119                        selected=self.op.output_fields)
1120
1121   @staticmethod
1122   def _DiagnoseByOS(node_list, rlist):
1123     """Remaps a per-node return list into an a per-os per-node dictionary
1124
1125       Args:
1126         node_list: a list with the names of all nodes
1127         rlist: a map with node names as keys and OS objects as values
1128
1129       Returns:
1130         map: a map with osnames as keys and as value another map, with
1131              nodes as
1132              keys and list of OS objects as values
1133              e.g. {"debian-etch": {"node1": [<object>,...],
1134                                    "node2": [<object>,]}
1135                   }
1136
1137     """
1138     all_os = {}
1139     for node_name, nr in rlist.iteritems():
1140       if not nr:
1141         continue
1142       for os_obj in nr:
1143         if os_obj.name not in all_os:
1144           # build a list of nodes for this os containing empty lists
1145           # for each node in node_list
1146           all_os[os_obj.name] = {}
1147           for nname in node_list:
1148             all_os[os_obj.name][nname] = []
1149         all_os[os_obj.name][node_name].append(os_obj)
1150     return all_os
1151
1152   def Exec(self, feedback_fn):
1153     """Compute the list of OSes.
1154
1155     """
1156     node_list = self.cfg.GetNodeList()
1157     node_data = rpc.call_os_diagnose(node_list)
1158     if node_data == False:
1159       raise errors.OpExecError("Can't gather the list of OSes")
1160     pol = self._DiagnoseByOS(node_list, node_data)
1161     output = []
1162     for os_name, os_data in pol.iteritems():
1163       row = []
1164       for field in self.op.output_fields:
1165         if field == "name":
1166           val = os_name
1167         elif field == "valid":
1168           val = utils.all([osl and osl[0] for osl in os_data.values()])
1169         elif field == "node_status":
1170           val = {}
1171           for node_name, nos_list in os_data.iteritems():
1172             val[node_name] = [(v.status, v.path) for v in nos_list]
1173         else:
1174           raise errors.ParameterError(field)
1175         row.append(val)
1176       output.append(row)
1177
1178     return output
1179
1180
1181 class LURemoveNode(LogicalUnit):
1182   """Logical unit for removing a node.
1183
1184   """
1185   HPATH = "node-remove"
1186   HTYPE = constants.HTYPE_NODE
1187   _OP_REQP = ["node_name"]
1188
1189   def BuildHooksEnv(self):
1190     """Build hooks env.
1191
1192     This doesn't run on the target node in the pre phase as a failed
1193     node would then be impossible to remove.
1194
1195     """
1196     env = {
1197       "OP_TARGET": self.op.node_name,
1198       "NODE_NAME": self.op.node_name,
1199       }
1200     all_nodes = self.cfg.GetNodeList()
1201     all_nodes.remove(self.op.node_name)
1202     return env, all_nodes, all_nodes
1203
1204   def CheckPrereq(self):
1205     """Check prerequisites.
1206
1207     This checks:
1208      - the node exists in the configuration
1209      - it does not have primary or secondary instances
1210      - it's not the master
1211
1212     Any errors are signalled by raising errors.OpPrereqError.
1213
1214     """
1215     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1216     if node is None:
1217       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1218
1219     instance_list = self.cfg.GetInstanceList()
1220
1221     masternode = self.sstore.GetMasterNode()
1222     if node.name == masternode:
1223       raise errors.OpPrereqError("Node is the master node,"
1224                                  " you need to failover first.")
1225
1226     for instance_name in instance_list:
1227       instance = self.cfg.GetInstanceInfo(instance_name)
1228       if node.name == instance.primary_node:
1229         raise errors.OpPrereqError("Instance %s still running on the node,"
1230                                    " please remove first." % instance_name)
1231       if node.name in instance.secondary_nodes:
1232         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1233                                    " please remove first." % instance_name)
1234     self.op.node_name = node.name
1235     self.node = node
1236
1237   def Exec(self, feedback_fn):
1238     """Removes the node from the cluster.
1239
1240     """
1241     node = self.node
1242     logger.Info("stopping the node daemon and removing configs from node %s" %
1243                 node.name)
1244
1245     rpc.call_node_leave_cluster(node.name)
1246
1247     logger.Info("Removing node %s from config" % node.name)
1248
1249     self.cfg.RemoveNode(node.name)
1250     # Remove the node from the Ganeti Lock Manager
1251     self.context.glm.remove(locking.LEVEL_NODE, node.name)
1252
1253     utils.RemoveHostFromEtcHosts(node.name)
1254
1255
1256 class LUQueryNodes(NoHooksLU):
1257   """Logical unit for querying nodes.
1258
1259   """
1260   _OP_REQP = ["output_fields", "names"]
1261
1262   def CheckPrereq(self):
1263     """Check prerequisites.
1264
1265     This checks that the fields required are valid output fields.
1266
1267     """
1268     self.dynamic_fields = frozenset([
1269       "dtotal", "dfree",
1270       "mtotal", "mnode", "mfree",
1271       "bootid",
1272       "ctotal",
1273       ])
1274
1275     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1276                                "pinst_list", "sinst_list",
1277                                "pip", "sip", "tags"],
1278                        dynamic=self.dynamic_fields,
1279                        selected=self.op.output_fields)
1280
1281     self.wanted = _GetWantedNodes(self, self.op.names)
1282
1283   def Exec(self, feedback_fn):
1284     """Computes the list of nodes and their attributes.
1285
1286     """
1287     nodenames = self.wanted
1288     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1289
1290     # begin data gathering
1291
1292     if self.dynamic_fields.intersection(self.op.output_fields):
1293       live_data = {}
1294       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1295       for name in nodenames:
1296         nodeinfo = node_data.get(name, None)
1297         if nodeinfo:
1298           live_data[name] = {
1299             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1300             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1301             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1302             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1303             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1304             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1305             "bootid": nodeinfo['bootid'],
1306             }
1307         else:
1308           live_data[name] = {}
1309     else:
1310       live_data = dict.fromkeys(nodenames, {})
1311
1312     node_to_primary = dict([(name, set()) for name in nodenames])
1313     node_to_secondary = dict([(name, set()) for name in nodenames])
1314
1315     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1316                              "sinst_cnt", "sinst_list"))
1317     if inst_fields & frozenset(self.op.output_fields):
1318       instancelist = self.cfg.GetInstanceList()
1319
1320       for instance_name in instancelist:
1321         inst = self.cfg.GetInstanceInfo(instance_name)
1322         if inst.primary_node in node_to_primary:
1323           node_to_primary[inst.primary_node].add(inst.name)
1324         for secnode in inst.secondary_nodes:
1325           if secnode in node_to_secondary:
1326             node_to_secondary[secnode].add(inst.name)
1327
1328     # end data gathering
1329
1330     output = []
1331     for node in nodelist:
1332       node_output = []
1333       for field in self.op.output_fields:
1334         if field == "name":
1335           val = node.name
1336         elif field == "pinst_list":
1337           val = list(node_to_primary[node.name])
1338         elif field == "sinst_list":
1339           val = list(node_to_secondary[node.name])
1340         elif field == "pinst_cnt":
1341           val = len(node_to_primary[node.name])
1342         elif field == "sinst_cnt":
1343           val = len(node_to_secondary[node.name])
1344         elif field == "pip":
1345           val = node.primary_ip
1346         elif field == "sip":
1347           val = node.secondary_ip
1348         elif field == "tags":
1349           val = list(node.GetTags())
1350         elif field in self.dynamic_fields:
1351           val = live_data[node.name].get(field, None)
1352         else:
1353           raise errors.ParameterError(field)
1354         node_output.append(val)
1355       output.append(node_output)
1356
1357     return output
1358
1359
1360 class LUQueryNodeVolumes(NoHooksLU):
1361   """Logical unit for getting volumes on node(s).
1362
1363   """
1364   _OP_REQP = ["nodes", "output_fields"]
1365
1366   def CheckPrereq(self):
1367     """Check prerequisites.
1368
1369     This checks that the fields required are valid output fields.
1370
1371     """
1372     self.nodes = _GetWantedNodes(self, self.op.nodes)
1373
1374     _CheckOutputFields(static=["node"],
1375                        dynamic=["phys", "vg", "name", "size", "instance"],
1376                        selected=self.op.output_fields)
1377
1378
1379   def Exec(self, feedback_fn):
1380     """Computes the list of nodes and their attributes.
1381
1382     """
1383     nodenames = self.nodes
1384     volumes = rpc.call_node_volumes(nodenames)
1385
1386     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1387              in self.cfg.GetInstanceList()]
1388
1389     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1390
1391     output = []
1392     for node in nodenames:
1393       if node not in volumes or not volumes[node]:
1394         continue
1395
1396       node_vols = volumes[node][:]
1397       node_vols.sort(key=lambda vol: vol['dev'])
1398
1399       for vol in node_vols:
1400         node_output = []
1401         for field in self.op.output_fields:
1402           if field == "node":
1403             val = node
1404           elif field == "phys":
1405             val = vol['dev']
1406           elif field == "vg":
1407             val = vol['vg']
1408           elif field == "name":
1409             val = vol['name']
1410           elif field == "size":
1411             val = int(float(vol['size']))
1412           elif field == "instance":
1413             for inst in ilist:
1414               if node not in lv_by_node[inst]:
1415                 continue
1416               if vol['name'] in lv_by_node[inst][node]:
1417                 val = inst.name
1418                 break
1419             else:
1420               val = '-'
1421           else:
1422             raise errors.ParameterError(field)
1423           node_output.append(str(val))
1424
1425         output.append(node_output)
1426
1427     return output
1428
1429
1430 class LUAddNode(LogicalUnit):
1431   """Logical unit for adding node to the cluster.
1432
1433   """
1434   HPATH = "node-add"
1435   HTYPE = constants.HTYPE_NODE
1436   _OP_REQP = ["node_name"]
1437
1438   def BuildHooksEnv(self):
1439     """Build hooks env.
1440
1441     This will run on all nodes before, and on all nodes + the new node after.
1442
1443     """
1444     env = {
1445       "OP_TARGET": self.op.node_name,
1446       "NODE_NAME": self.op.node_name,
1447       "NODE_PIP": self.op.primary_ip,
1448       "NODE_SIP": self.op.secondary_ip,
1449       }
1450     nodes_0 = self.cfg.GetNodeList()
1451     nodes_1 = nodes_0 + [self.op.node_name, ]
1452     return env, nodes_0, nodes_1
1453
1454   def CheckPrereq(self):
1455     """Check prerequisites.
1456
1457     This checks:
1458      - the new node is not already in the config
1459      - it is resolvable
1460      - its parameters (single/dual homed) matches the cluster
1461
1462     Any errors are signalled by raising errors.OpPrereqError.
1463
1464     """
1465     node_name = self.op.node_name
1466     cfg = self.cfg
1467
1468     dns_data = utils.HostInfo(node_name)
1469
1470     node = dns_data.name
1471     primary_ip = self.op.primary_ip = dns_data.ip
1472     secondary_ip = getattr(self.op, "secondary_ip", None)
1473     if secondary_ip is None:
1474       secondary_ip = primary_ip
1475     if not utils.IsValidIP(secondary_ip):
1476       raise errors.OpPrereqError("Invalid secondary IP given")
1477     self.op.secondary_ip = secondary_ip
1478
1479     node_list = cfg.GetNodeList()
1480     if not self.op.readd and node in node_list:
1481       raise errors.OpPrereqError("Node %s is already in the configuration" %
1482                                  node)
1483     elif self.op.readd and node not in node_list:
1484       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1485
1486     for existing_node_name in node_list:
1487       existing_node = cfg.GetNodeInfo(existing_node_name)
1488
1489       if self.op.readd and node == existing_node_name:
1490         if (existing_node.primary_ip != primary_ip or
1491             existing_node.secondary_ip != secondary_ip):
1492           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1493                                      " address configuration as before")
1494         continue
1495
1496       if (existing_node.primary_ip == primary_ip or
1497           existing_node.secondary_ip == primary_ip or
1498           existing_node.primary_ip == secondary_ip or
1499           existing_node.secondary_ip == secondary_ip):
1500         raise errors.OpPrereqError("New node ip address(es) conflict with"
1501                                    " existing node %s" % existing_node.name)
1502
1503     # check that the type of the node (single versus dual homed) is the
1504     # same as for the master
1505     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1506     master_singlehomed = myself.secondary_ip == myself.primary_ip
1507     newbie_singlehomed = secondary_ip == primary_ip
1508     if master_singlehomed != newbie_singlehomed:
1509       if master_singlehomed:
1510         raise errors.OpPrereqError("The master has no private ip but the"
1511                                    " new node has one")
1512       else:
1513         raise errors.OpPrereqError("The master has a private ip but the"
1514                                    " new node doesn't have one")
1515
1516     # checks reachablity
1517     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1518       raise errors.OpPrereqError("Node not reachable by ping")
1519
1520     if not newbie_singlehomed:
1521       # check reachability from my secondary ip to newbie's secondary ip
1522       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1523                            source=myself.secondary_ip):
1524         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1525                                    " based ping to noded port")
1526
1527     self.new_node = objects.Node(name=node,
1528                                  primary_ip=primary_ip,
1529                                  secondary_ip=secondary_ip)
1530
1531   def Exec(self, feedback_fn):
1532     """Adds the new node to the cluster.
1533
1534     """
1535     new_node = self.new_node
1536     node = new_node.name
1537
1538     # check connectivity
1539     result = rpc.call_version([node])[node]
1540     if result:
1541       if constants.PROTOCOL_VERSION == result:
1542         logger.Info("communication to node %s fine, sw version %s match" %
1543                     (node, result))
1544       else:
1545         raise errors.OpExecError("Version mismatch master version %s,"
1546                                  " node version %s" %
1547                                  (constants.PROTOCOL_VERSION, result))
1548     else:
1549       raise errors.OpExecError("Cannot get version from the new node")
1550
1551     # setup ssh on node
1552     logger.Info("copy ssh key to node %s" % node)
1553     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1554     keyarray = []
1555     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1556                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1557                 priv_key, pub_key]
1558
1559     for i in keyfiles:
1560       f = open(i, 'r')
1561       try:
1562         keyarray.append(f.read())
1563       finally:
1564         f.close()
1565
1566     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1567                                keyarray[3], keyarray[4], keyarray[5])
1568
1569     if not result:
1570       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1571
1572     # Add node to our /etc/hosts, and add key to known_hosts
1573     utils.AddHostToEtcHosts(new_node.name)
1574
1575     if new_node.secondary_ip != new_node.primary_ip:
1576       if not rpc.call_node_tcp_ping(new_node.name,
1577                                     constants.LOCALHOST_IP_ADDRESS,
1578                                     new_node.secondary_ip,
1579                                     constants.DEFAULT_NODED_PORT,
1580                                     10, False):
1581         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1582                                  " you gave (%s). Please fix and re-run this"
1583                                  " command." % new_node.secondary_ip)
1584
1585     node_verify_list = [self.sstore.GetMasterNode()]
1586     node_verify_param = {
1587       'nodelist': [node],
1588       # TODO: do a node-net-test as well?
1589     }
1590
1591     result = rpc.call_node_verify(node_verify_list, node_verify_param)
1592     for verifier in node_verify_list:
1593       if not result[verifier]:
1594         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1595                                  " for remote verification" % verifier)
1596       if result[verifier]['nodelist']:
1597         for failed in result[verifier]['nodelist']:
1598           feedback_fn("ssh/hostname verification failed %s -> %s" %
1599                       (verifier, result[verifier]['nodelist'][failed]))
1600         raise errors.OpExecError("ssh/hostname verification failed.")
1601
1602     # Distribute updated /etc/hosts and known_hosts to all nodes,
1603     # including the node just added
1604     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1605     dist_nodes = self.cfg.GetNodeList()
1606     if not self.op.readd:
1607       dist_nodes.append(node)
1608     if myself.name in dist_nodes:
1609       dist_nodes.remove(myself.name)
1610
1611     logger.Debug("Copying hosts and known_hosts to all nodes")
1612     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1613       result = rpc.call_upload_file(dist_nodes, fname)
1614       for to_node in dist_nodes:
1615         if not result[to_node]:
1616           logger.Error("copy of file %s to node %s failed" %
1617                        (fname, to_node))
1618
1619     to_copy = self.sstore.GetFileList()
1620     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1621       to_copy.append(constants.VNC_PASSWORD_FILE)
1622     for fname in to_copy:
1623       result = rpc.call_upload_file([node], fname)
1624       if not result[node]:
1625         logger.Error("could not copy file %s to node %s" % (fname, node))
1626
1627     if not self.op.readd:
1628       logger.Info("adding node %s to cluster.conf" % node)
1629       self.cfg.AddNode(new_node)
1630       # Add the new node to the Ganeti Lock Manager
1631       self.context.glm.add(locking.LEVEL_NODE, node)
1632
1633
1634 class LUMasterFailover(LogicalUnit):
1635   """Failover the master node to the current node.
1636
1637   This is a special LU in that it must run on a non-master node.
1638
1639   """
1640   HPATH = "master-failover"
1641   HTYPE = constants.HTYPE_CLUSTER
1642   REQ_MASTER = False
1643   REQ_WSSTORE = True
1644   _OP_REQP = []
1645
1646   def BuildHooksEnv(self):
1647     """Build hooks env.
1648
1649     This will run on the new master only in the pre phase, and on all
1650     the nodes in the post phase.
1651
1652     """
1653     env = {
1654       "OP_TARGET": self.new_master,
1655       "NEW_MASTER": self.new_master,
1656       "OLD_MASTER": self.old_master,
1657       }
1658     return env, [self.new_master], self.cfg.GetNodeList()
1659
1660   def CheckPrereq(self):
1661     """Check prerequisites.
1662
1663     This checks that we are not already the master.
1664
1665     """
1666     self.new_master = utils.HostInfo().name
1667     self.old_master = self.sstore.GetMasterNode()
1668
1669     if self.old_master == self.new_master:
1670       raise errors.OpPrereqError("This commands must be run on the node"
1671                                  " where you want the new master to be."
1672                                  " %s is already the master" %
1673                                  self.old_master)
1674
1675   def Exec(self, feedback_fn):
1676     """Failover the master node.
1677
1678     This command, when run on a non-master node, will cause the current
1679     master to cease being master, and the non-master to become new
1680     master.
1681
1682     """
1683     #TODO: do not rely on gethostname returning the FQDN
1684     logger.Info("setting master to %s, old master: %s" %
1685                 (self.new_master, self.old_master))
1686
1687     if not rpc.call_node_stop_master(self.old_master):
1688       logger.Error("could disable the master role on the old master"
1689                    " %s, please disable manually" % self.old_master)
1690
1691     ss = self.sstore
1692     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1693     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1694                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1695       logger.Error("could not distribute the new simple store master file"
1696                    " to the other nodes, please check.")
1697
1698     if not rpc.call_node_start_master(self.new_master):
1699       logger.Error("could not start the master role on the new master"
1700                    " %s, please check" % self.new_master)
1701       feedback_fn("Error in activating the master IP on the new master,"
1702                   " please fix manually.")
1703
1704
1705
1706 class LUQueryClusterInfo(NoHooksLU):
1707   """Query cluster configuration.
1708
1709   """
1710   _OP_REQP = []
1711   REQ_MASTER = False
1712
1713   def CheckPrereq(self):
1714     """No prerequsites needed for this LU.
1715
1716     """
1717     pass
1718
1719   def Exec(self, feedback_fn):
1720     """Return cluster config.
1721
1722     """
1723     result = {
1724       "name": self.sstore.GetClusterName(),
1725       "software_version": constants.RELEASE_VERSION,
1726       "protocol_version": constants.PROTOCOL_VERSION,
1727       "config_version": constants.CONFIG_VERSION,
1728       "os_api_version": constants.OS_API_VERSION,
1729       "export_version": constants.EXPORT_VERSION,
1730       "master": self.sstore.GetMasterNode(),
1731       "architecture": (platform.architecture()[0], platform.machine()),
1732       "hypervisor_type": self.sstore.GetHypervisorType(),
1733       }
1734
1735     return result
1736
1737
1738 class LUDumpClusterConfig(NoHooksLU):
1739   """Return a text-representation of the cluster-config.
1740
1741   """
1742   _OP_REQP = []
1743
1744   def CheckPrereq(self):
1745     """No prerequisites.
1746
1747     """
1748     pass
1749
1750   def Exec(self, feedback_fn):
1751     """Dump a representation of the cluster config to the standard output.
1752
1753     """
1754     return self.cfg.DumpConfig()
1755
1756
1757 class LUActivateInstanceDisks(NoHooksLU):
1758   """Bring up an instance's disks.
1759
1760   """
1761   _OP_REQP = ["instance_name"]
1762
1763   def CheckPrereq(self):
1764     """Check prerequisites.
1765
1766     This checks that the instance is in the cluster.
1767
1768     """
1769     instance = self.cfg.GetInstanceInfo(
1770       self.cfg.ExpandInstanceName(self.op.instance_name))
1771     if instance is None:
1772       raise errors.OpPrereqError("Instance '%s' not known" %
1773                                  self.op.instance_name)
1774     self.instance = instance
1775
1776
1777   def Exec(self, feedback_fn):
1778     """Activate the disks.
1779
1780     """
1781     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1782     if not disks_ok:
1783       raise errors.OpExecError("Cannot activate block devices")
1784
1785     return disks_info
1786
1787
1788 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1789   """Prepare the block devices for an instance.
1790
1791   This sets up the block devices on all nodes.
1792
1793   Args:
1794     instance: a ganeti.objects.Instance object
1795     ignore_secondaries: if true, errors on secondary nodes won't result
1796                         in an error return from the function
1797
1798   Returns:
1799     false if the operation failed
1800     list of (host, instance_visible_name, node_visible_name) if the operation
1801          suceeded with the mapping from node devices to instance devices
1802   """
1803   device_info = []
1804   disks_ok = True
1805   iname = instance.name
1806   # With the two passes mechanism we try to reduce the window of
1807   # opportunity for the race condition of switching DRBD to primary
1808   # before handshaking occured, but we do not eliminate it
1809
1810   # The proper fix would be to wait (with some limits) until the
1811   # connection has been made and drbd transitions from WFConnection
1812   # into any other network-connected state (Connected, SyncTarget,
1813   # SyncSource, etc.)
1814
1815   # 1st pass, assemble on all nodes in secondary mode
1816   for inst_disk in instance.disks:
1817     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1818       cfg.SetDiskID(node_disk, node)
1819       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1820       if not result:
1821         logger.Error("could not prepare block device %s on node %s"
1822                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1823         if not ignore_secondaries:
1824           disks_ok = False
1825
1826   # FIXME: race condition on drbd migration to primary
1827
1828   # 2nd pass, do only the primary node
1829   for inst_disk in instance.disks:
1830     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1831       if node != instance.primary_node:
1832         continue
1833       cfg.SetDiskID(node_disk, node)
1834       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1835       if not result:
1836         logger.Error("could not prepare block device %s on node %s"
1837                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1838         disks_ok = False
1839     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1840
1841   # leave the disks configured for the primary node
1842   # this is a workaround that would be fixed better by
1843   # improving the logical/physical id handling
1844   for disk in instance.disks:
1845     cfg.SetDiskID(disk, instance.primary_node)
1846
1847   return disks_ok, device_info
1848
1849
1850 def _StartInstanceDisks(cfg, instance, force):
1851   """Start the disks of an instance.
1852
1853   """
1854   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1855                                            ignore_secondaries=force)
1856   if not disks_ok:
1857     _ShutdownInstanceDisks(instance, cfg)
1858     if force is not None and not force:
1859       logger.Error("If the message above refers to a secondary node,"
1860                    " you can retry the operation using '--force'.")
1861     raise errors.OpExecError("Disk consistency error")
1862
1863
1864 class LUDeactivateInstanceDisks(NoHooksLU):
1865   """Shutdown an instance's disks.
1866
1867   """
1868   _OP_REQP = ["instance_name"]
1869
1870   def CheckPrereq(self):
1871     """Check prerequisites.
1872
1873     This checks that the instance is in the cluster.
1874
1875     """
1876     instance = self.cfg.GetInstanceInfo(
1877       self.cfg.ExpandInstanceName(self.op.instance_name))
1878     if instance is None:
1879       raise errors.OpPrereqError("Instance '%s' not known" %
1880                                  self.op.instance_name)
1881     self.instance = instance
1882
1883   def Exec(self, feedback_fn):
1884     """Deactivate the disks
1885
1886     """
1887     instance = self.instance
1888     ins_l = rpc.call_instance_list([instance.primary_node])
1889     ins_l = ins_l[instance.primary_node]
1890     if not type(ins_l) is list:
1891       raise errors.OpExecError("Can't contact node '%s'" %
1892                                instance.primary_node)
1893
1894     if self.instance.name in ins_l:
1895       raise errors.OpExecError("Instance is running, can't shutdown"
1896                                " block devices.")
1897
1898     _ShutdownInstanceDisks(instance, self.cfg)
1899
1900
1901 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1902   """Shutdown block devices of an instance.
1903
1904   This does the shutdown on all nodes of the instance.
1905
1906   If the ignore_primary is false, errors on the primary node are
1907   ignored.
1908
1909   """
1910   result = True
1911   for disk in instance.disks:
1912     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1913       cfg.SetDiskID(top_disk, node)
1914       if not rpc.call_blockdev_shutdown(node, top_disk):
1915         logger.Error("could not shutdown block device %s on node %s" %
1916                      (disk.iv_name, node))
1917         if not ignore_primary or node != instance.primary_node:
1918           result = False
1919   return result
1920
1921
1922 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1923   """Checks if a node has enough free memory.
1924
1925   This function check if a given node has the needed amount of free
1926   memory. In case the node has less memory or we cannot get the
1927   information from the node, this function raise an OpPrereqError
1928   exception.
1929
1930   Args:
1931     - cfg: a ConfigWriter instance
1932     - node: the node name
1933     - reason: string to use in the error message
1934     - requested: the amount of memory in MiB
1935
1936   """
1937   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1938   if not nodeinfo or not isinstance(nodeinfo, dict):
1939     raise errors.OpPrereqError("Could not contact node %s for resource"
1940                              " information" % (node,))
1941
1942   free_mem = nodeinfo[node].get('memory_free')
1943   if not isinstance(free_mem, int):
1944     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1945                              " was '%s'" % (node, free_mem))
1946   if requested > free_mem:
1947     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1948                              " needed %s MiB, available %s MiB" %
1949                              (node, reason, requested, free_mem))
1950
1951
1952 class LUStartupInstance(LogicalUnit):
1953   """Starts an instance.
1954
1955   """
1956   HPATH = "instance-start"
1957   HTYPE = constants.HTYPE_INSTANCE
1958   _OP_REQP = ["instance_name", "force"]
1959
1960   def BuildHooksEnv(self):
1961     """Build hooks env.
1962
1963     This runs on master, primary and secondary nodes of the instance.
1964
1965     """
1966     env = {
1967       "FORCE": self.op.force,
1968       }
1969     env.update(_BuildInstanceHookEnvByObject(self.instance))
1970     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1971           list(self.instance.secondary_nodes))
1972     return env, nl, nl
1973
1974   def CheckPrereq(self):
1975     """Check prerequisites.
1976
1977     This checks that the instance is in the cluster.
1978
1979     """
1980     instance = self.cfg.GetInstanceInfo(
1981       self.cfg.ExpandInstanceName(self.op.instance_name))
1982     if instance is None:
1983       raise errors.OpPrereqError("Instance '%s' not known" %
1984                                  self.op.instance_name)
1985
1986     # check bridges existance
1987     _CheckInstanceBridgesExist(instance)
1988
1989     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
1990                          "starting instance %s" % instance.name,
1991                          instance.memory)
1992
1993     self.instance = instance
1994     self.op.instance_name = instance.name
1995
1996   def Exec(self, feedback_fn):
1997     """Start the instance.
1998
1999     """
2000     instance = self.instance
2001     force = self.op.force
2002     extra_args = getattr(self.op, "extra_args", "")
2003
2004     self.cfg.MarkInstanceUp(instance.name)
2005
2006     node_current = instance.primary_node
2007
2008     _StartInstanceDisks(self.cfg, instance, force)
2009
2010     if not rpc.call_instance_start(node_current, instance, extra_args):
2011       _ShutdownInstanceDisks(instance, self.cfg)
2012       raise errors.OpExecError("Could not start instance")
2013
2014
2015 class LURebootInstance(LogicalUnit):
2016   """Reboot an instance.
2017
2018   """
2019   HPATH = "instance-reboot"
2020   HTYPE = constants.HTYPE_INSTANCE
2021   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2022
2023   def BuildHooksEnv(self):
2024     """Build hooks env.
2025
2026     This runs on master, primary and secondary nodes of the instance.
2027
2028     """
2029     env = {
2030       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2031       }
2032     env.update(_BuildInstanceHookEnvByObject(self.instance))
2033     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2034           list(self.instance.secondary_nodes))
2035     return env, nl, nl
2036
2037   def CheckPrereq(self):
2038     """Check prerequisites.
2039
2040     This checks that the instance is in the cluster.
2041
2042     """
2043     instance = self.cfg.GetInstanceInfo(
2044       self.cfg.ExpandInstanceName(self.op.instance_name))
2045     if instance is None:
2046       raise errors.OpPrereqError("Instance '%s' not known" %
2047                                  self.op.instance_name)
2048
2049     # check bridges existance
2050     _CheckInstanceBridgesExist(instance)
2051
2052     self.instance = instance
2053     self.op.instance_name = instance.name
2054
2055   def Exec(self, feedback_fn):
2056     """Reboot the instance.
2057
2058     """
2059     instance = self.instance
2060     ignore_secondaries = self.op.ignore_secondaries
2061     reboot_type = self.op.reboot_type
2062     extra_args = getattr(self.op, "extra_args", "")
2063
2064     node_current = instance.primary_node
2065
2066     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2067                            constants.INSTANCE_REBOOT_HARD,
2068                            constants.INSTANCE_REBOOT_FULL]:
2069       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2070                                   (constants.INSTANCE_REBOOT_SOFT,
2071                                    constants.INSTANCE_REBOOT_HARD,
2072                                    constants.INSTANCE_REBOOT_FULL))
2073
2074     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2075                        constants.INSTANCE_REBOOT_HARD]:
2076       if not rpc.call_instance_reboot(node_current, instance,
2077                                       reboot_type, extra_args):
2078         raise errors.OpExecError("Could not reboot instance")
2079     else:
2080       if not rpc.call_instance_shutdown(node_current, instance):
2081         raise errors.OpExecError("could not shutdown instance for full reboot")
2082       _ShutdownInstanceDisks(instance, self.cfg)
2083       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2084       if not rpc.call_instance_start(node_current, instance, extra_args):
2085         _ShutdownInstanceDisks(instance, self.cfg)
2086         raise errors.OpExecError("Could not start instance for full reboot")
2087
2088     self.cfg.MarkInstanceUp(instance.name)
2089
2090
2091 class LUShutdownInstance(LogicalUnit):
2092   """Shutdown an instance.
2093
2094   """
2095   HPATH = "instance-stop"
2096   HTYPE = constants.HTYPE_INSTANCE
2097   _OP_REQP = ["instance_name"]
2098
2099   def BuildHooksEnv(self):
2100     """Build hooks env.
2101
2102     This runs on master, primary and secondary nodes of the instance.
2103
2104     """
2105     env = _BuildInstanceHookEnvByObject(self.instance)
2106     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2107           list(self.instance.secondary_nodes))
2108     return env, nl, nl
2109
2110   def CheckPrereq(self):
2111     """Check prerequisites.
2112
2113     This checks that the instance is in the cluster.
2114
2115     """
2116     instance = self.cfg.GetInstanceInfo(
2117       self.cfg.ExpandInstanceName(self.op.instance_name))
2118     if instance is None:
2119       raise errors.OpPrereqError("Instance '%s' not known" %
2120                                  self.op.instance_name)
2121     self.instance = instance
2122
2123   def Exec(self, feedback_fn):
2124     """Shutdown the instance.
2125
2126     """
2127     instance = self.instance
2128     node_current = instance.primary_node
2129     self.cfg.MarkInstanceDown(instance.name)
2130     if not rpc.call_instance_shutdown(node_current, instance):
2131       logger.Error("could not shutdown instance")
2132
2133     _ShutdownInstanceDisks(instance, self.cfg)
2134
2135
2136 class LUReinstallInstance(LogicalUnit):
2137   """Reinstall an instance.
2138
2139   """
2140   HPATH = "instance-reinstall"
2141   HTYPE = constants.HTYPE_INSTANCE
2142   _OP_REQP = ["instance_name"]
2143
2144   def BuildHooksEnv(self):
2145     """Build hooks env.
2146
2147     This runs on master, primary and secondary nodes of the instance.
2148
2149     """
2150     env = _BuildInstanceHookEnvByObject(self.instance)
2151     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2152           list(self.instance.secondary_nodes))
2153     return env, nl, nl
2154
2155   def CheckPrereq(self):
2156     """Check prerequisites.
2157
2158     This checks that the instance is in the cluster and is not running.
2159
2160     """
2161     instance = self.cfg.GetInstanceInfo(
2162       self.cfg.ExpandInstanceName(self.op.instance_name))
2163     if instance is None:
2164       raise errors.OpPrereqError("Instance '%s' not known" %
2165                                  self.op.instance_name)
2166     if instance.disk_template == constants.DT_DISKLESS:
2167       raise errors.OpPrereqError("Instance '%s' has no disks" %
2168                                  self.op.instance_name)
2169     if instance.status != "down":
2170       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2171                                  self.op.instance_name)
2172     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2173     if remote_info:
2174       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2175                                  (self.op.instance_name,
2176                                   instance.primary_node))
2177
2178     self.op.os_type = getattr(self.op, "os_type", None)
2179     if self.op.os_type is not None:
2180       # OS verification
2181       pnode = self.cfg.GetNodeInfo(
2182         self.cfg.ExpandNodeName(instance.primary_node))
2183       if pnode is None:
2184         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2185                                    self.op.pnode)
2186       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2187       if not os_obj:
2188         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2189                                    " primary node"  % self.op.os_type)
2190
2191     self.instance = instance
2192
2193   def Exec(self, feedback_fn):
2194     """Reinstall the instance.
2195
2196     """
2197     inst = self.instance
2198
2199     if self.op.os_type is not None:
2200       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2201       inst.os = self.op.os_type
2202       self.cfg.AddInstance(inst)
2203
2204     _StartInstanceDisks(self.cfg, inst, None)
2205     try:
2206       feedback_fn("Running the instance OS create scripts...")
2207       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2208         raise errors.OpExecError("Could not install OS for instance %s"
2209                                  " on node %s" %
2210                                  (inst.name, inst.primary_node))
2211     finally:
2212       _ShutdownInstanceDisks(inst, self.cfg)
2213
2214
2215 class LURenameInstance(LogicalUnit):
2216   """Rename an instance.
2217
2218   """
2219   HPATH = "instance-rename"
2220   HTYPE = constants.HTYPE_INSTANCE
2221   _OP_REQP = ["instance_name", "new_name"]
2222
2223   def BuildHooksEnv(self):
2224     """Build hooks env.
2225
2226     This runs on master, primary and secondary nodes of the instance.
2227
2228     """
2229     env = _BuildInstanceHookEnvByObject(self.instance)
2230     env["INSTANCE_NEW_NAME"] = self.op.new_name
2231     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2232           list(self.instance.secondary_nodes))
2233     return env, nl, nl
2234
2235   def CheckPrereq(self):
2236     """Check prerequisites.
2237
2238     This checks that the instance is in the cluster and is not running.
2239
2240     """
2241     instance = self.cfg.GetInstanceInfo(
2242       self.cfg.ExpandInstanceName(self.op.instance_name))
2243     if instance is None:
2244       raise errors.OpPrereqError("Instance '%s' not known" %
2245                                  self.op.instance_name)
2246     if instance.status != "down":
2247       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2248                                  self.op.instance_name)
2249     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2250     if remote_info:
2251       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2252                                  (self.op.instance_name,
2253                                   instance.primary_node))
2254     self.instance = instance
2255
2256     # new name verification
2257     name_info = utils.HostInfo(self.op.new_name)
2258
2259     self.op.new_name = new_name = name_info.name
2260     instance_list = self.cfg.GetInstanceList()
2261     if new_name in instance_list:
2262       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2263                                  new_name)
2264
2265     if not getattr(self.op, "ignore_ip", False):
2266       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2267         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2268                                    (name_info.ip, new_name))
2269
2270
2271   def Exec(self, feedback_fn):
2272     """Reinstall the instance.
2273
2274     """
2275     inst = self.instance
2276     old_name = inst.name
2277
2278     if inst.disk_template == constants.DT_FILE:
2279       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2280
2281     self.cfg.RenameInstance(inst.name, self.op.new_name)
2282
2283     # re-read the instance from the configuration after rename
2284     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2285
2286     if inst.disk_template == constants.DT_FILE:
2287       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2288       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2289                                                 old_file_storage_dir,
2290                                                 new_file_storage_dir)
2291
2292       if not result:
2293         raise errors.OpExecError("Could not connect to node '%s' to rename"
2294                                  " directory '%s' to '%s' (but the instance"
2295                                  " has been renamed in Ganeti)" % (
2296                                  inst.primary_node, old_file_storage_dir,
2297                                  new_file_storage_dir))
2298
2299       if not result[0]:
2300         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2301                                  " (but the instance has been renamed in"
2302                                  " Ganeti)" % (old_file_storage_dir,
2303                                                new_file_storage_dir))
2304
2305     _StartInstanceDisks(self.cfg, inst, None)
2306     try:
2307       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2308                                           "sda", "sdb"):
2309         msg = ("Could run OS rename script for instance %s on node %s (but the"
2310                " instance has been renamed in Ganeti)" %
2311                (inst.name, inst.primary_node))
2312         logger.Error(msg)
2313     finally:
2314       _ShutdownInstanceDisks(inst, self.cfg)
2315
2316
2317 class LURemoveInstance(LogicalUnit):
2318   """Remove an instance.
2319
2320   """
2321   HPATH = "instance-remove"
2322   HTYPE = constants.HTYPE_INSTANCE
2323   _OP_REQP = ["instance_name", "ignore_failures"]
2324
2325   def BuildHooksEnv(self):
2326     """Build hooks env.
2327
2328     This runs on master, primary and secondary nodes of the instance.
2329
2330     """
2331     env = _BuildInstanceHookEnvByObject(self.instance)
2332     nl = [self.sstore.GetMasterNode()]
2333     return env, nl, nl
2334
2335   def CheckPrereq(self):
2336     """Check prerequisites.
2337
2338     This checks that the instance is in the cluster.
2339
2340     """
2341     instance = self.cfg.GetInstanceInfo(
2342       self.cfg.ExpandInstanceName(self.op.instance_name))
2343     if instance is None:
2344       raise errors.OpPrereqError("Instance '%s' not known" %
2345                                  self.op.instance_name)
2346     self.instance = instance
2347
2348   def Exec(self, feedback_fn):
2349     """Remove the instance.
2350
2351     """
2352     instance = self.instance
2353     logger.Info("shutting down instance %s on node %s" %
2354                 (instance.name, instance.primary_node))
2355
2356     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2357       if self.op.ignore_failures:
2358         feedback_fn("Warning: can't shutdown instance")
2359       else:
2360         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2361                                  (instance.name, instance.primary_node))
2362
2363     logger.Info("removing block devices for instance %s" % instance.name)
2364
2365     if not _RemoveDisks(instance, self.cfg):
2366       if self.op.ignore_failures:
2367         feedback_fn("Warning: can't remove instance's disks")
2368       else:
2369         raise errors.OpExecError("Can't remove instance's disks")
2370
2371     logger.Info("removing instance %s out of cluster config" % instance.name)
2372
2373     self.cfg.RemoveInstance(instance.name)
2374     # Remove the new instance from the Ganeti Lock Manager
2375     self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2376
2377
2378 class LUQueryInstances(NoHooksLU):
2379   """Logical unit for querying instances.
2380
2381   """
2382   _OP_REQP = ["output_fields", "names"]
2383
2384   def CheckPrereq(self):
2385     """Check prerequisites.
2386
2387     This checks that the fields required are valid output fields.
2388
2389     """
2390     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2391     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2392                                "admin_state", "admin_ram",
2393                                "disk_template", "ip", "mac", "bridge",
2394                                "sda_size", "sdb_size", "vcpus", "tags"],
2395                        dynamic=self.dynamic_fields,
2396                        selected=self.op.output_fields)
2397
2398     self.wanted = _GetWantedInstances(self, self.op.names)
2399
2400   def Exec(self, feedback_fn):
2401     """Computes the list of nodes and their attributes.
2402
2403     """
2404     instance_names = self.wanted
2405     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2406                      in instance_names]
2407
2408     # begin data gathering
2409
2410     nodes = frozenset([inst.primary_node for inst in instance_list])
2411
2412     bad_nodes = []
2413     if self.dynamic_fields.intersection(self.op.output_fields):
2414       live_data = {}
2415       node_data = rpc.call_all_instances_info(nodes)
2416       for name in nodes:
2417         result = node_data[name]
2418         if result:
2419           live_data.update(result)
2420         elif result == False:
2421           bad_nodes.append(name)
2422         # else no instance is alive
2423     else:
2424       live_data = dict([(name, {}) for name in instance_names])
2425
2426     # end data gathering
2427
2428     output = []
2429     for instance in instance_list:
2430       iout = []
2431       for field in self.op.output_fields:
2432         if field == "name":
2433           val = instance.name
2434         elif field == "os":
2435           val = instance.os
2436         elif field == "pnode":
2437           val = instance.primary_node
2438         elif field == "snodes":
2439           val = list(instance.secondary_nodes)
2440         elif field == "admin_state":
2441           val = (instance.status != "down")
2442         elif field == "oper_state":
2443           if instance.primary_node in bad_nodes:
2444             val = None
2445           else:
2446             val = bool(live_data.get(instance.name))
2447         elif field == "status":
2448           if instance.primary_node in bad_nodes:
2449             val = "ERROR_nodedown"
2450           else:
2451             running = bool(live_data.get(instance.name))
2452             if running:
2453               if instance.status != "down":
2454                 val = "running"
2455               else:
2456                 val = "ERROR_up"
2457             else:
2458               if instance.status != "down":
2459                 val = "ERROR_down"
2460               else:
2461                 val = "ADMIN_down"
2462         elif field == "admin_ram":
2463           val = instance.memory
2464         elif field == "oper_ram":
2465           if instance.primary_node in bad_nodes:
2466             val = None
2467           elif instance.name in live_data:
2468             val = live_data[instance.name].get("memory", "?")
2469           else:
2470             val = "-"
2471         elif field == "disk_template":
2472           val = instance.disk_template
2473         elif field == "ip":
2474           val = instance.nics[0].ip
2475         elif field == "bridge":
2476           val = instance.nics[0].bridge
2477         elif field == "mac":
2478           val = instance.nics[0].mac
2479         elif field == "sda_size" or field == "sdb_size":
2480           disk = instance.FindDisk(field[:3])
2481           if disk is None:
2482             val = None
2483           else:
2484             val = disk.size
2485         elif field == "vcpus":
2486           val = instance.vcpus
2487         elif field == "tags":
2488           val = list(instance.GetTags())
2489         else:
2490           raise errors.ParameterError(field)
2491         iout.append(val)
2492       output.append(iout)
2493
2494     return output
2495
2496
2497 class LUFailoverInstance(LogicalUnit):
2498   """Failover an instance.
2499
2500   """
2501   HPATH = "instance-failover"
2502   HTYPE = constants.HTYPE_INSTANCE
2503   _OP_REQP = ["instance_name", "ignore_consistency"]
2504
2505   def BuildHooksEnv(self):
2506     """Build hooks env.
2507
2508     This runs on master, primary and secondary nodes of the instance.
2509
2510     """
2511     env = {
2512       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2513       }
2514     env.update(_BuildInstanceHookEnvByObject(self.instance))
2515     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2516     return env, nl, nl
2517
2518   def CheckPrereq(self):
2519     """Check prerequisites.
2520
2521     This checks that the instance is in the cluster.
2522
2523     """
2524     instance = self.cfg.GetInstanceInfo(
2525       self.cfg.ExpandInstanceName(self.op.instance_name))
2526     if instance is None:
2527       raise errors.OpPrereqError("Instance '%s' not known" %
2528                                  self.op.instance_name)
2529
2530     if instance.disk_template not in constants.DTS_NET_MIRROR:
2531       raise errors.OpPrereqError("Instance's disk layout is not"
2532                                  " network mirrored, cannot failover.")
2533
2534     secondary_nodes = instance.secondary_nodes
2535     if not secondary_nodes:
2536       raise errors.ProgrammerError("no secondary node but using "
2537                                    "a mirrored disk template")
2538
2539     target_node = secondary_nodes[0]
2540     # check memory requirements on the secondary node
2541     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2542                          instance.name, instance.memory)
2543
2544     # check bridge existance
2545     brlist = [nic.bridge for nic in instance.nics]
2546     if not rpc.call_bridges_exist(target_node, brlist):
2547       raise errors.OpPrereqError("One or more target bridges %s does not"
2548                                  " exist on destination node '%s'" %
2549                                  (brlist, target_node))
2550
2551     self.instance = instance
2552
2553   def Exec(self, feedback_fn):
2554     """Failover an instance.
2555
2556     The failover is done by shutting it down on its present node and
2557     starting it on the secondary.
2558
2559     """
2560     instance = self.instance
2561
2562     source_node = instance.primary_node
2563     target_node = instance.secondary_nodes[0]
2564
2565     feedback_fn("* checking disk consistency between source and target")
2566     for dev in instance.disks:
2567       # for drbd, these are drbd over lvm
2568       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2569         if instance.status == "up" and not self.op.ignore_consistency:
2570           raise errors.OpExecError("Disk %s is degraded on target node,"
2571                                    " aborting failover." % dev.iv_name)
2572
2573     feedback_fn("* shutting down instance on source node")
2574     logger.Info("Shutting down instance %s on node %s" %
2575                 (instance.name, source_node))
2576
2577     if not rpc.call_instance_shutdown(source_node, instance):
2578       if self.op.ignore_consistency:
2579         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2580                      " anyway. Please make sure node %s is down"  %
2581                      (instance.name, source_node, source_node))
2582       else:
2583         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2584                                  (instance.name, source_node))
2585
2586     feedback_fn("* deactivating the instance's disks on source node")
2587     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2588       raise errors.OpExecError("Can't shut down the instance's disks.")
2589
2590     instance.primary_node = target_node
2591     # distribute new instance config to the other nodes
2592     self.cfg.Update(instance)
2593
2594     # Only start the instance if it's marked as up
2595     if instance.status == "up":
2596       feedback_fn("* activating the instance's disks on target node")
2597       logger.Info("Starting instance %s on node %s" %
2598                   (instance.name, target_node))
2599
2600       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2601                                                ignore_secondaries=True)
2602       if not disks_ok:
2603         _ShutdownInstanceDisks(instance, self.cfg)
2604         raise errors.OpExecError("Can't activate the instance's disks")
2605
2606       feedback_fn("* starting the instance on the target node")
2607       if not rpc.call_instance_start(target_node, instance, None):
2608         _ShutdownInstanceDisks(instance, self.cfg)
2609         raise errors.OpExecError("Could not start instance %s on node %s." %
2610                                  (instance.name, target_node))
2611
2612
2613 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2614   """Create a tree of block devices on the primary node.
2615
2616   This always creates all devices.
2617
2618   """
2619   if device.children:
2620     for child in device.children:
2621       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2622         return False
2623
2624   cfg.SetDiskID(device, node)
2625   new_id = rpc.call_blockdev_create(node, device, device.size,
2626                                     instance.name, True, info)
2627   if not new_id:
2628     return False
2629   if device.physical_id is None:
2630     device.physical_id = new_id
2631   return True
2632
2633
2634 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2635   """Create a tree of block devices on a secondary node.
2636
2637   If this device type has to be created on secondaries, create it and
2638   all its children.
2639
2640   If not, just recurse to children keeping the same 'force' value.
2641
2642   """
2643   if device.CreateOnSecondary():
2644     force = True
2645   if device.children:
2646     for child in device.children:
2647       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2648                                         child, force, info):
2649         return False
2650
2651   if not force:
2652     return True
2653   cfg.SetDiskID(device, node)
2654   new_id = rpc.call_blockdev_create(node, device, device.size,
2655                                     instance.name, False, info)
2656   if not new_id:
2657     return False
2658   if device.physical_id is None:
2659     device.physical_id = new_id
2660   return True
2661
2662
2663 def _GenerateUniqueNames(cfg, exts):
2664   """Generate a suitable LV name.
2665
2666   This will generate a logical volume name for the given instance.
2667
2668   """
2669   results = []
2670   for val in exts:
2671     new_id = cfg.GenerateUniqueID()
2672     results.append("%s%s" % (new_id, val))
2673   return results
2674
2675
2676 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2677   """Generate a drbd8 device complete with its children.
2678
2679   """
2680   port = cfg.AllocatePort()
2681   vgname = cfg.GetVGName()
2682   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2683                           logical_id=(vgname, names[0]))
2684   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2685                           logical_id=(vgname, names[1]))
2686   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2687                           logical_id = (primary, secondary, port),
2688                           children = [dev_data, dev_meta],
2689                           iv_name=iv_name)
2690   return drbd_dev
2691
2692
2693 def _GenerateDiskTemplate(cfg, template_name,
2694                           instance_name, primary_node,
2695                           secondary_nodes, disk_sz, swap_sz,
2696                           file_storage_dir, file_driver):
2697   """Generate the entire disk layout for a given template type.
2698
2699   """
2700   #TODO: compute space requirements
2701
2702   vgname = cfg.GetVGName()
2703   if template_name == constants.DT_DISKLESS:
2704     disks = []
2705   elif template_name == constants.DT_PLAIN:
2706     if len(secondary_nodes) != 0:
2707       raise errors.ProgrammerError("Wrong template configuration")
2708
2709     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2710     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2711                            logical_id=(vgname, names[0]),
2712                            iv_name = "sda")
2713     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2714                            logical_id=(vgname, names[1]),
2715                            iv_name = "sdb")
2716     disks = [sda_dev, sdb_dev]
2717   elif template_name == constants.DT_DRBD8:
2718     if len(secondary_nodes) != 1:
2719       raise errors.ProgrammerError("Wrong template configuration")
2720     remote_node = secondary_nodes[0]
2721     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2722                                        ".sdb_data", ".sdb_meta"])
2723     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2724                                          disk_sz, names[0:2], "sda")
2725     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2726                                          swap_sz, names[2:4], "sdb")
2727     disks = [drbd_sda_dev, drbd_sdb_dev]
2728   elif template_name == constants.DT_FILE:
2729     if len(secondary_nodes) != 0:
2730       raise errors.ProgrammerError("Wrong template configuration")
2731
2732     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2733                                 iv_name="sda", logical_id=(file_driver,
2734                                 "%s/sda" % file_storage_dir))
2735     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2736                                 iv_name="sdb", logical_id=(file_driver,
2737                                 "%s/sdb" % file_storage_dir))
2738     disks = [file_sda_dev, file_sdb_dev]
2739   else:
2740     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2741   return disks
2742
2743
2744 def _GetInstanceInfoText(instance):
2745   """Compute that text that should be added to the disk's metadata.
2746
2747   """
2748   return "originstname+%s" % instance.name
2749
2750
2751 def _CreateDisks(cfg, instance):
2752   """Create all disks for an instance.
2753
2754   This abstracts away some work from AddInstance.
2755
2756   Args:
2757     instance: the instance object
2758
2759   Returns:
2760     True or False showing the success of the creation process
2761
2762   """
2763   info = _GetInstanceInfoText(instance)
2764
2765   if instance.disk_template == constants.DT_FILE:
2766     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2767     result = rpc.call_file_storage_dir_create(instance.primary_node,
2768                                               file_storage_dir)
2769
2770     if not result:
2771       logger.Error("Could not connect to node '%s'" % instance.primary_node)
2772       return False
2773
2774     if not result[0]:
2775       logger.Error("failed to create directory '%s'" % file_storage_dir)
2776       return False
2777
2778   for device in instance.disks:
2779     logger.Info("creating volume %s for instance %s" %
2780                 (device.iv_name, instance.name))
2781     #HARDCODE
2782     for secondary_node in instance.secondary_nodes:
2783       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2784                                         device, False, info):
2785         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2786                      (device.iv_name, device, secondary_node))
2787         return False
2788     #HARDCODE
2789     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2790                                     instance, device, info):
2791       logger.Error("failed to create volume %s on primary!" %
2792                    device.iv_name)
2793       return False
2794
2795   return True
2796
2797
2798 def _RemoveDisks(instance, cfg):
2799   """Remove all disks for an instance.
2800
2801   This abstracts away some work from `AddInstance()` and
2802   `RemoveInstance()`. Note that in case some of the devices couldn't
2803   be removed, the removal will continue with the other ones (compare
2804   with `_CreateDisks()`).
2805
2806   Args:
2807     instance: the instance object
2808
2809   Returns:
2810     True or False showing the success of the removal proces
2811
2812   """
2813   logger.Info("removing block devices for instance %s" % instance.name)
2814
2815   result = True
2816   for device in instance.disks:
2817     for node, disk in device.ComputeNodeTree(instance.primary_node):
2818       cfg.SetDiskID(disk, node)
2819       if not rpc.call_blockdev_remove(node, disk):
2820         logger.Error("could not remove block device %s on node %s,"
2821                      " continuing anyway" %
2822                      (device.iv_name, node))
2823         result = False
2824
2825   if instance.disk_template == constants.DT_FILE:
2826     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2827     if not rpc.call_file_storage_dir_remove(instance.primary_node,
2828                                             file_storage_dir):
2829       logger.Error("could not remove directory '%s'" % file_storage_dir)
2830       result = False
2831
2832   return result
2833
2834
2835 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2836   """Compute disk size requirements in the volume group
2837
2838   This is currently hard-coded for the two-drive layout.
2839
2840   """
2841   # Required free disk space as a function of disk and swap space
2842   req_size_dict = {
2843     constants.DT_DISKLESS: None,
2844     constants.DT_PLAIN: disk_size + swap_size,
2845     # 256 MB are added for drbd metadata, 128MB for each drbd device
2846     constants.DT_DRBD8: disk_size + swap_size + 256,
2847     constants.DT_FILE: None,
2848   }
2849
2850   if disk_template not in req_size_dict:
2851     raise errors.ProgrammerError("Disk template '%s' size requirement"
2852                                  " is unknown" %  disk_template)
2853
2854   return req_size_dict[disk_template]
2855
2856
2857 class LUCreateInstance(LogicalUnit):
2858   """Create an instance.
2859
2860   """
2861   HPATH = "instance-add"
2862   HTYPE = constants.HTYPE_INSTANCE
2863   _OP_REQP = ["instance_name", "mem_size", "disk_size",
2864               "disk_template", "swap_size", "mode", "start", "vcpus",
2865               "wait_for_sync", "ip_check", "mac"]
2866
2867   def _RunAllocator(self):
2868     """Run the allocator based on input opcode.
2869
2870     """
2871     disks = [{"size": self.op.disk_size, "mode": "w"},
2872              {"size": self.op.swap_size, "mode": "w"}]
2873     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2874              "bridge": self.op.bridge}]
2875     ial = IAllocator(self.cfg, self.sstore,
2876                      mode=constants.IALLOCATOR_MODE_ALLOC,
2877                      name=self.op.instance_name,
2878                      disk_template=self.op.disk_template,
2879                      tags=[],
2880                      os=self.op.os_type,
2881                      vcpus=self.op.vcpus,
2882                      mem_size=self.op.mem_size,
2883                      disks=disks,
2884                      nics=nics,
2885                      )
2886
2887     ial.Run(self.op.iallocator)
2888
2889     if not ial.success:
2890       raise errors.OpPrereqError("Can't compute nodes using"
2891                                  " iallocator '%s': %s" % (self.op.iallocator,
2892                                                            ial.info))
2893     if len(ial.nodes) != ial.required_nodes:
2894       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2895                                  " of nodes (%s), required %s" %
2896                                  (len(ial.nodes), ial.required_nodes))
2897     self.op.pnode = ial.nodes[0]
2898     logger.ToStdout("Selected nodes for the instance: %s" %
2899                     (", ".join(ial.nodes),))
2900     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2901                 (self.op.instance_name, self.op.iallocator, ial.nodes))
2902     if ial.required_nodes == 2:
2903       self.op.snode = ial.nodes[1]
2904
2905   def BuildHooksEnv(self):
2906     """Build hooks env.
2907
2908     This runs on master, primary and secondary nodes of the instance.
2909
2910     """
2911     env = {
2912       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2913       "INSTANCE_DISK_SIZE": self.op.disk_size,
2914       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2915       "INSTANCE_ADD_MODE": self.op.mode,
2916       }
2917     if self.op.mode == constants.INSTANCE_IMPORT:
2918       env["INSTANCE_SRC_NODE"] = self.op.src_node
2919       env["INSTANCE_SRC_PATH"] = self.op.src_path
2920       env["INSTANCE_SRC_IMAGE"] = self.src_image
2921
2922     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2923       primary_node=self.op.pnode,
2924       secondary_nodes=self.secondaries,
2925       status=self.instance_status,
2926       os_type=self.op.os_type,
2927       memory=self.op.mem_size,
2928       vcpus=self.op.vcpus,
2929       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2930     ))
2931
2932     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2933           self.secondaries)
2934     return env, nl, nl
2935
2936
2937   def CheckPrereq(self):
2938     """Check prerequisites.
2939
2940     """
2941     # set optional parameters to none if they don't exist
2942     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2943                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2944                  "vnc_bind_address"]:
2945       if not hasattr(self.op, attr):
2946         setattr(self.op, attr, None)
2947
2948     if self.op.mode not in (constants.INSTANCE_CREATE,
2949                             constants.INSTANCE_IMPORT):
2950       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2951                                  self.op.mode)
2952
2953     if (not self.cfg.GetVGName() and
2954         self.op.disk_template not in constants.DTS_NOT_LVM):
2955       raise errors.OpPrereqError("Cluster does not support lvm-based"
2956                                  " instances")
2957
2958     if self.op.mode == constants.INSTANCE_IMPORT:
2959       src_node = getattr(self.op, "src_node", None)
2960       src_path = getattr(self.op, "src_path", None)
2961       if src_node is None or src_path is None:
2962         raise errors.OpPrereqError("Importing an instance requires source"
2963                                    " node and path options")
2964       src_node_full = self.cfg.ExpandNodeName(src_node)
2965       if src_node_full is None:
2966         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2967       self.op.src_node = src_node = src_node_full
2968
2969       if not os.path.isabs(src_path):
2970         raise errors.OpPrereqError("The source path must be absolute")
2971
2972       export_info = rpc.call_export_info(src_node, src_path)
2973
2974       if not export_info:
2975         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2976
2977       if not export_info.has_section(constants.INISECT_EXP):
2978         raise errors.ProgrammerError("Corrupted export config")
2979
2980       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2981       if (int(ei_version) != constants.EXPORT_VERSION):
2982         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2983                                    (ei_version, constants.EXPORT_VERSION))
2984
2985       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2986         raise errors.OpPrereqError("Can't import instance with more than"
2987                                    " one data disk")
2988
2989       # FIXME: are the old os-es, disk sizes, etc. useful?
2990       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2991       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2992                                                          'disk0_dump'))
2993       self.src_image = diskimage
2994     else: # INSTANCE_CREATE
2995       if getattr(self.op, "os_type", None) is None:
2996         raise errors.OpPrereqError("No guest OS specified")
2997
2998     #### instance parameters check
2999
3000     # disk template and mirror node verification
3001     if self.op.disk_template not in constants.DISK_TEMPLATES:
3002       raise errors.OpPrereqError("Invalid disk template name")
3003
3004     # instance name verification
3005     hostname1 = utils.HostInfo(self.op.instance_name)
3006
3007     self.op.instance_name = instance_name = hostname1.name
3008     instance_list = self.cfg.GetInstanceList()
3009     if instance_name in instance_list:
3010       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3011                                  instance_name)
3012
3013     # ip validity checks
3014     ip = getattr(self.op, "ip", None)
3015     if ip is None or ip.lower() == "none":
3016       inst_ip = None
3017     elif ip.lower() == "auto":
3018       inst_ip = hostname1.ip
3019     else:
3020       if not utils.IsValidIP(ip):
3021         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3022                                    " like a valid IP" % ip)
3023       inst_ip = ip
3024     self.inst_ip = self.op.ip = inst_ip
3025
3026     if self.op.start and not self.op.ip_check:
3027       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3028                                  " adding an instance in start mode")
3029
3030     if self.op.ip_check:
3031       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3032         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3033                                    (hostname1.ip, instance_name))
3034
3035     # MAC address verification
3036     if self.op.mac != "auto":
3037       if not utils.IsValidMac(self.op.mac.lower()):
3038         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3039                                    self.op.mac)
3040
3041     # bridge verification
3042     bridge = getattr(self.op, "bridge", None)
3043     if bridge is None:
3044       self.op.bridge = self.cfg.GetDefBridge()
3045     else:
3046       self.op.bridge = bridge
3047
3048     # boot order verification
3049     if self.op.hvm_boot_order is not None:
3050       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3051         raise errors.OpPrereqError("invalid boot order specified,"
3052                                    " must be one or more of [acdn]")
3053     # file storage checks
3054     if (self.op.file_driver and
3055         not self.op.file_driver in constants.FILE_DRIVER):
3056       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3057                                  self.op.file_driver)
3058
3059     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3060       raise errors.OpPrereqError("File storage directory not a relative"
3061                                  " path")
3062     #### allocator run
3063
3064     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3065       raise errors.OpPrereqError("One and only one of iallocator and primary"
3066                                  " node must be given")
3067
3068     if self.op.iallocator is not None:
3069       self._RunAllocator()
3070
3071     #### node related checks
3072
3073     # check primary node
3074     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3075     if pnode is None:
3076       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3077                                  self.op.pnode)
3078     self.op.pnode = pnode.name
3079     self.pnode = pnode
3080     self.secondaries = []
3081
3082     # mirror node verification
3083     if self.op.disk_template in constants.DTS_NET_MIRROR:
3084       if getattr(self.op, "snode", None) is None:
3085         raise errors.OpPrereqError("The networked disk templates need"
3086                                    " a mirror node")
3087
3088       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3089       if snode_name is None:
3090         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3091                                    self.op.snode)
3092       elif snode_name == pnode.name:
3093         raise errors.OpPrereqError("The secondary node cannot be"
3094                                    " the primary node.")
3095       self.secondaries.append(snode_name)
3096
3097     req_size = _ComputeDiskSize(self.op.disk_template,
3098                                 self.op.disk_size, self.op.swap_size)
3099
3100     # Check lv size requirements
3101     if req_size is not None:
3102       nodenames = [pnode.name] + self.secondaries
3103       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3104       for node in nodenames:
3105         info = nodeinfo.get(node, None)
3106         if not info:
3107           raise errors.OpPrereqError("Cannot get current information"
3108                                      " from node '%s'" % node)
3109         vg_free = info.get('vg_free', None)
3110         if not isinstance(vg_free, int):
3111           raise errors.OpPrereqError("Can't compute free disk space on"
3112                                      " node %s" % node)
3113         if req_size > info['vg_free']:
3114           raise errors.OpPrereqError("Not enough disk space on target node %s."
3115                                      " %d MB available, %d MB required" %
3116                                      (node, info['vg_free'], req_size))
3117
3118     # os verification
3119     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3120     if not os_obj:
3121       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3122                                  " primary node"  % self.op.os_type)
3123
3124     if self.op.kernel_path == constants.VALUE_NONE:
3125       raise errors.OpPrereqError("Can't set instance kernel to none")
3126
3127
3128     # bridge check on primary node
3129     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3130       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3131                                  " destination node '%s'" %
3132                                  (self.op.bridge, pnode.name))
3133
3134     # memory check on primary node
3135     if self.op.start:
3136       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3137                            "creating instance %s" % self.op.instance_name,
3138                            self.op.mem_size)
3139
3140     # hvm_cdrom_image_path verification
3141     if self.op.hvm_cdrom_image_path is not None:
3142       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3143         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3144                                    " be an absolute path or None, not %s" %
3145                                    self.op.hvm_cdrom_image_path)
3146       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3147         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3148                                    " regular file or a symlink pointing to"
3149                                    " an existing regular file, not %s" %
3150                                    self.op.hvm_cdrom_image_path)
3151
3152     # vnc_bind_address verification
3153     if self.op.vnc_bind_address is not None:
3154       if not utils.IsValidIP(self.op.vnc_bind_address):
3155         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3156                                    " like a valid IP address" %
3157                                    self.op.vnc_bind_address)
3158
3159     if self.op.start:
3160       self.instance_status = 'up'
3161     else:
3162       self.instance_status = 'down'
3163
3164   def Exec(self, feedback_fn):
3165     """Create and add the instance to the cluster.
3166
3167     """
3168     instance = self.op.instance_name
3169     pnode_name = self.pnode.name
3170
3171     if self.op.mac == "auto":
3172       mac_address = self.cfg.GenerateMAC()
3173     else:
3174       mac_address = self.op.mac
3175
3176     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3177     if self.inst_ip is not None:
3178       nic.ip = self.inst_ip
3179
3180     ht_kind = self.sstore.GetHypervisorType()
3181     if ht_kind in constants.HTS_REQ_PORT:
3182       network_port = self.cfg.AllocatePort()
3183     else:
3184       network_port = None
3185
3186     if self.op.vnc_bind_address is None:
3187       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3188
3189     # this is needed because os.path.join does not accept None arguments
3190     if self.op.file_storage_dir is None:
3191       string_file_storage_dir = ""
3192     else:
3193       string_file_storage_dir = self.op.file_storage_dir
3194
3195     # build the full file storage dir path
3196     file_storage_dir = os.path.normpath(os.path.join(
3197                                         self.sstore.GetFileStorageDir(),
3198                                         string_file_storage_dir, instance))
3199
3200
3201     disks = _GenerateDiskTemplate(self.cfg,
3202                                   self.op.disk_template,
3203                                   instance, pnode_name,
3204                                   self.secondaries, self.op.disk_size,
3205                                   self.op.swap_size,
3206                                   file_storage_dir,
3207                                   self.op.file_driver)
3208
3209     iobj = objects.Instance(name=instance, os=self.op.os_type,
3210                             primary_node=pnode_name,
3211                             memory=self.op.mem_size,
3212                             vcpus=self.op.vcpus,
3213                             nics=[nic], disks=disks,
3214                             disk_template=self.op.disk_template,
3215                             status=self.instance_status,
3216                             network_port=network_port,
3217                             kernel_path=self.op.kernel_path,
3218                             initrd_path=self.op.initrd_path,
3219                             hvm_boot_order=self.op.hvm_boot_order,
3220                             hvm_acpi=self.op.hvm_acpi,
3221                             hvm_pae=self.op.hvm_pae,
3222                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3223                             vnc_bind_address=self.op.vnc_bind_address,
3224                             )
3225
3226     feedback_fn("* creating instance disks...")
3227     if not _CreateDisks(self.cfg, iobj):
3228       _RemoveDisks(iobj, self.cfg)
3229       raise errors.OpExecError("Device creation failed, reverting...")
3230
3231     feedback_fn("adding instance %s to cluster config" % instance)
3232
3233     self.cfg.AddInstance(iobj)
3234     # Add the new instance to the Ganeti Lock Manager
3235     self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3236
3237     if self.op.wait_for_sync:
3238       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3239     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3240       # make sure the disks are not degraded (still sync-ing is ok)
3241       time.sleep(15)
3242       feedback_fn("* checking mirrors status")
3243       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3244     else:
3245       disk_abort = False
3246
3247     if disk_abort:
3248       _RemoveDisks(iobj, self.cfg)
3249       self.cfg.RemoveInstance(iobj.name)
3250       # Remove the new instance from the Ganeti Lock Manager
3251       self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3252       raise errors.OpExecError("There are some degraded disks for"
3253                                " this instance")
3254
3255     feedback_fn("creating os for instance %s on node %s" %
3256                 (instance, pnode_name))
3257
3258     if iobj.disk_template != constants.DT_DISKLESS:
3259       if self.op.mode == constants.INSTANCE_CREATE:
3260         feedback_fn("* running the instance OS create scripts...")
3261         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3262           raise errors.OpExecError("could not add os for instance %s"
3263                                    " on node %s" %
3264                                    (instance, pnode_name))
3265
3266       elif self.op.mode == constants.INSTANCE_IMPORT:
3267         feedback_fn("* running the instance OS import scripts...")
3268         src_node = self.op.src_node
3269         src_image = self.src_image
3270         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3271                                                 src_node, src_image):
3272           raise errors.OpExecError("Could not import os for instance"
3273                                    " %s on node %s" %
3274                                    (instance, pnode_name))
3275       else:
3276         # also checked in the prereq part
3277         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3278                                      % self.op.mode)
3279
3280     if self.op.start:
3281       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3282       feedback_fn("* starting instance...")
3283       if not rpc.call_instance_start(pnode_name, iobj, None):
3284         raise errors.OpExecError("Could not start instance")
3285
3286
3287 class LUConnectConsole(NoHooksLU):
3288   """Connect to an instance's console.
3289
3290   This is somewhat special in that it returns the command line that
3291   you need to run on the master node in order to connect to the
3292   console.
3293
3294   """
3295   _OP_REQP = ["instance_name"]
3296
3297   def CheckPrereq(self):
3298     """Check prerequisites.
3299
3300     This checks that the instance is in the cluster.
3301
3302     """
3303     instance = self.cfg.GetInstanceInfo(
3304       self.cfg.ExpandInstanceName(self.op.instance_name))
3305     if instance is None:
3306       raise errors.OpPrereqError("Instance '%s' not known" %
3307                                  self.op.instance_name)
3308     self.instance = instance
3309
3310   def Exec(self, feedback_fn):
3311     """Connect to the console of an instance
3312
3313     """
3314     instance = self.instance
3315     node = instance.primary_node
3316
3317     node_insts = rpc.call_instance_list([node])[node]
3318     if node_insts is False:
3319       raise errors.OpExecError("Can't connect to node %s." % node)
3320
3321     if instance.name not in node_insts:
3322       raise errors.OpExecError("Instance %s is not running." % instance.name)
3323
3324     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3325
3326     hyper = hypervisor.GetHypervisor()
3327     console_cmd = hyper.GetShellCommandForConsole(instance)
3328
3329     # build ssh cmdline
3330     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3331
3332
3333 class LUReplaceDisks(LogicalUnit):
3334   """Replace the disks of an instance.
3335
3336   """
3337   HPATH = "mirrors-replace"
3338   HTYPE = constants.HTYPE_INSTANCE
3339   _OP_REQP = ["instance_name", "mode", "disks"]
3340
3341   def _RunAllocator(self):
3342     """Compute a new secondary node using an IAllocator.
3343
3344     """
3345     ial = IAllocator(self.cfg, self.sstore,
3346                      mode=constants.IALLOCATOR_MODE_RELOC,
3347                      name=self.op.instance_name,
3348                      relocate_from=[self.sec_node])
3349
3350     ial.Run(self.op.iallocator)
3351
3352     if not ial.success:
3353       raise errors.OpPrereqError("Can't compute nodes using"
3354                                  " iallocator '%s': %s" % (self.op.iallocator,
3355                                                            ial.info))
3356     if len(ial.nodes) != ial.required_nodes:
3357       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3358                                  " of nodes (%s), required %s" %
3359                                  (len(ial.nodes), ial.required_nodes))
3360     self.op.remote_node = ial.nodes[0]
3361     logger.ToStdout("Selected new secondary for the instance: %s" %
3362                     self.op.remote_node)
3363
3364   def BuildHooksEnv(self):
3365     """Build hooks env.
3366
3367     This runs on the master, the primary and all the secondaries.
3368
3369     """
3370     env = {
3371       "MODE": self.op.mode,
3372       "NEW_SECONDARY": self.op.remote_node,
3373       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3374       }
3375     env.update(_BuildInstanceHookEnvByObject(self.instance))
3376     nl = [
3377       self.sstore.GetMasterNode(),
3378       self.instance.primary_node,
3379       ]
3380     if self.op.remote_node is not None:
3381       nl.append(self.op.remote_node)
3382     return env, nl, nl
3383
3384   def CheckPrereq(self):
3385     """Check prerequisites.
3386
3387     This checks that the instance is in the cluster.
3388
3389     """
3390     if not hasattr(self.op, "remote_node"):
3391       self.op.remote_node = None
3392
3393     instance = self.cfg.GetInstanceInfo(
3394       self.cfg.ExpandInstanceName(self.op.instance_name))
3395     if instance is None:
3396       raise errors.OpPrereqError("Instance '%s' not known" %
3397                                  self.op.instance_name)
3398     self.instance = instance
3399     self.op.instance_name = instance.name
3400
3401     if instance.disk_template not in constants.DTS_NET_MIRROR:
3402       raise errors.OpPrereqError("Instance's disk layout is not"
3403                                  " network mirrored.")
3404
3405     if len(instance.secondary_nodes) != 1:
3406       raise errors.OpPrereqError("The instance has a strange layout,"
3407                                  " expected one secondary but found %d" %
3408                                  len(instance.secondary_nodes))
3409
3410     self.sec_node = instance.secondary_nodes[0]
3411
3412     ia_name = getattr(self.op, "iallocator", None)
3413     if ia_name is not None:
3414       if self.op.remote_node is not None:
3415         raise errors.OpPrereqError("Give either the iallocator or the new"
3416                                    " secondary, not both")
3417       self.op.remote_node = self._RunAllocator()
3418
3419     remote_node = self.op.remote_node
3420     if remote_node is not None:
3421       remote_node = self.cfg.ExpandNodeName(remote_node)
3422       if remote_node is None:
3423         raise errors.OpPrereqError("Node '%s' not known" %
3424                                    self.op.remote_node)
3425       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3426     else:
3427       self.remote_node_info = None
3428     if remote_node == instance.primary_node:
3429       raise errors.OpPrereqError("The specified node is the primary node of"
3430                                  " the instance.")
3431     elif remote_node == self.sec_node:
3432       if self.op.mode == constants.REPLACE_DISK_SEC:
3433         # this is for DRBD8, where we can't execute the same mode of
3434         # replacement as for drbd7 (no different port allocated)
3435         raise errors.OpPrereqError("Same secondary given, cannot execute"
3436                                    " replacement")
3437     if instance.disk_template == constants.DT_DRBD8:
3438       if (self.op.mode == constants.REPLACE_DISK_ALL and
3439           remote_node is not None):
3440         # switch to replace secondary mode
3441         self.op.mode = constants.REPLACE_DISK_SEC
3442
3443       if self.op.mode == constants.REPLACE_DISK_ALL:
3444         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3445                                    " secondary disk replacement, not"
3446                                    " both at once")
3447       elif self.op.mode == constants.REPLACE_DISK_PRI:
3448         if remote_node is not None:
3449           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3450                                      " the secondary while doing a primary"
3451                                      " node disk replacement")
3452         self.tgt_node = instance.primary_node
3453         self.oth_node = instance.secondary_nodes[0]
3454       elif self.op.mode == constants.REPLACE_DISK_SEC:
3455         self.new_node = remote_node # this can be None, in which case
3456                                     # we don't change the secondary
3457         self.tgt_node = instance.secondary_nodes[0]
3458         self.oth_node = instance.primary_node
3459       else:
3460         raise errors.ProgrammerError("Unhandled disk replace mode")
3461
3462     for name in self.op.disks:
3463       if instance.FindDisk(name) is None:
3464         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3465                                    (name, instance.name))
3466     self.op.remote_node = remote_node
3467
3468   def _ExecD8DiskOnly(self, feedback_fn):
3469     """Replace a disk on the primary or secondary for dbrd8.
3470
3471     The algorithm for replace is quite complicated:
3472       - for each disk to be replaced:
3473         - create new LVs on the target node with unique names
3474         - detach old LVs from the drbd device
3475         - rename old LVs to name_replaced.<time_t>
3476         - rename new LVs to old LVs
3477         - attach the new LVs (with the old names now) to the drbd device
3478       - wait for sync across all devices
3479       - for each modified disk:
3480         - remove old LVs (which have the name name_replaces.<time_t>)
3481
3482     Failures are not very well handled.
3483
3484     """
3485     steps_total = 6
3486     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3487     instance = self.instance
3488     iv_names = {}
3489     vgname = self.cfg.GetVGName()
3490     # start of work
3491     cfg = self.cfg
3492     tgt_node = self.tgt_node
3493     oth_node = self.oth_node
3494
3495     # Step: check device activation
3496     self.proc.LogStep(1, steps_total, "check device existence")
3497     info("checking volume groups")
3498     my_vg = cfg.GetVGName()
3499     results = rpc.call_vg_list([oth_node, tgt_node])
3500     if not results:
3501       raise errors.OpExecError("Can't list volume groups on the nodes")
3502     for node in oth_node, tgt_node:
3503       res = results.get(node, False)
3504       if not res or my_vg not in res:
3505         raise errors.OpExecError("Volume group '%s' not found on %s" %
3506                                  (my_vg, node))
3507     for dev in instance.disks:
3508       if not dev.iv_name in self.op.disks:
3509         continue
3510       for node in tgt_node, oth_node:
3511         info("checking %s on %s" % (dev.iv_name, node))
3512         cfg.SetDiskID(dev, node)
3513         if not rpc.call_blockdev_find(node, dev):
3514           raise errors.OpExecError("Can't find device %s on node %s" %
3515                                    (dev.iv_name, node))
3516
3517     # Step: check other node consistency
3518     self.proc.LogStep(2, steps_total, "check peer consistency")
3519     for dev in instance.disks:
3520       if not dev.iv_name in self.op.disks:
3521         continue
3522       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3523       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3524                                    oth_node==instance.primary_node):
3525         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3526                                  " to replace disks on this node (%s)" %
3527                                  (oth_node, tgt_node))
3528
3529     # Step: create new storage
3530     self.proc.LogStep(3, steps_total, "allocate new storage")
3531     for dev in instance.disks:
3532       if not dev.iv_name in self.op.disks:
3533         continue
3534       size = dev.size
3535       cfg.SetDiskID(dev, tgt_node)
3536       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3537       names = _GenerateUniqueNames(cfg, lv_names)
3538       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3539                              logical_id=(vgname, names[0]))
3540       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3541                              logical_id=(vgname, names[1]))
3542       new_lvs = [lv_data, lv_meta]
3543       old_lvs = dev.children
3544       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3545       info("creating new local storage on %s for %s" %
3546            (tgt_node, dev.iv_name))
3547       # since we *always* want to create this LV, we use the
3548       # _Create...OnPrimary (which forces the creation), even if we
3549       # are talking about the secondary node
3550       for new_lv in new_lvs:
3551         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3552                                         _GetInstanceInfoText(instance)):
3553           raise errors.OpExecError("Failed to create new LV named '%s' on"
3554                                    " node '%s'" %
3555                                    (new_lv.logical_id[1], tgt_node))
3556
3557     # Step: for each lv, detach+rename*2+attach
3558     self.proc.LogStep(4, steps_total, "change drbd configuration")
3559     for dev, old_lvs, new_lvs in iv_names.itervalues():
3560       info("detaching %s drbd from local storage" % dev.iv_name)
3561       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3562         raise errors.OpExecError("Can't detach drbd from local storage on node"
3563                                  " %s for device %s" % (tgt_node, dev.iv_name))
3564       #dev.children = []
3565       #cfg.Update(instance)
3566
3567       # ok, we created the new LVs, so now we know we have the needed
3568       # storage; as such, we proceed on the target node to rename
3569       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3570       # using the assumption that logical_id == physical_id (which in
3571       # turn is the unique_id on that node)
3572
3573       # FIXME(iustin): use a better name for the replaced LVs
3574       temp_suffix = int(time.time())
3575       ren_fn = lambda d, suff: (d.physical_id[0],
3576                                 d.physical_id[1] + "_replaced-%s" % suff)
3577       # build the rename list based on what LVs exist on the node
3578       rlist = []
3579       for to_ren in old_lvs:
3580         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3581         if find_res is not None: # device exists
3582           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3583
3584       info("renaming the old LVs on the target node")
3585       if not rpc.call_blockdev_rename(tgt_node, rlist):
3586         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3587       # now we rename the new LVs to the old LVs
3588       info("renaming the new LVs on the target node")
3589       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3590       if not rpc.call_blockdev_rename(tgt_node, rlist):
3591         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3592
3593       for old, new in zip(old_lvs, new_lvs):
3594         new.logical_id = old.logical_id
3595         cfg.SetDiskID(new, tgt_node)
3596
3597       for disk in old_lvs:
3598         disk.logical_id = ren_fn(disk, temp_suffix)
3599         cfg.SetDiskID(disk, tgt_node)
3600
3601       # now that the new lvs have the old name, we can add them to the device
3602       info("adding new mirror component on %s" % tgt_node)
3603       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3604         for new_lv in new_lvs:
3605           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3606             warning("Can't rollback device %s", hint="manually cleanup unused"
3607                     " logical volumes")
3608         raise errors.OpExecError("Can't add local storage to drbd")
3609
3610       dev.children = new_lvs
3611       cfg.Update(instance)
3612
3613     # Step: wait for sync
3614
3615     # this can fail as the old devices are degraded and _WaitForSync
3616     # does a combined result over all disks, so we don't check its
3617     # return value
3618     self.proc.LogStep(5, steps_total, "sync devices")
3619     _WaitForSync(cfg, instance, self.proc, unlock=True)
3620
3621     # so check manually all the devices
3622     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3623       cfg.SetDiskID(dev, instance.primary_node)
3624       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3625       if is_degr:
3626         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3627
3628     # Step: remove old storage
3629     self.proc.LogStep(6, steps_total, "removing old storage")
3630     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3631       info("remove logical volumes for %s" % name)
3632       for lv in old_lvs:
3633         cfg.SetDiskID(lv, tgt_node)
3634         if not rpc.call_blockdev_remove(tgt_node, lv):
3635           warning("Can't remove old LV", hint="manually remove unused LVs")
3636           continue
3637
3638   def _ExecD8Secondary(self, feedback_fn):
3639     """Replace the secondary node for drbd8.
3640
3641     The algorithm for replace is quite complicated:
3642       - for all disks of the instance:
3643         - create new LVs on the new node with same names
3644         - shutdown the drbd device on the old secondary
3645         - disconnect the drbd network on the primary
3646         - create the drbd device on the new secondary
3647         - network attach the drbd on the primary, using an artifice:
3648           the drbd code for Attach() will connect to the network if it
3649           finds a device which is connected to the good local disks but
3650           not network enabled
3651       - wait for sync across all devices
3652       - remove all disks from the old secondary
3653
3654     Failures are not very well handled.
3655
3656     """
3657     steps_total = 6
3658     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3659     instance = self.instance
3660     iv_names = {}
3661     vgname = self.cfg.GetVGName()
3662     # start of work
3663     cfg = self.cfg
3664     old_node = self.tgt_node
3665     new_node = self.new_node
3666     pri_node = instance.primary_node
3667
3668     # Step: check device activation
3669     self.proc.LogStep(1, steps_total, "check device existence")
3670     info("checking volume groups")
3671     my_vg = cfg.GetVGName()
3672     results = rpc.call_vg_list([pri_node, new_node])
3673     if not results:
3674       raise errors.OpExecError("Can't list volume groups on the nodes")
3675     for node in pri_node, new_node:
3676       res = results.get(node, False)
3677       if not res or my_vg not in res:
3678         raise errors.OpExecError("Volume group '%s' not found on %s" %
3679                                  (my_vg, node))
3680     for dev in instance.disks:
3681       if not dev.iv_name in self.op.disks:
3682         continue
3683       info("checking %s on %s" % (dev.iv_name, pri_node))
3684       cfg.SetDiskID(dev, pri_node)
3685       if not rpc.call_blockdev_find(pri_node, dev):
3686         raise errors.OpExecError("Can't find device %s on node %s" %
3687                                  (dev.iv_name, pri_node))
3688
3689     # Step: check other node consistency
3690     self.proc.LogStep(2, steps_total, "check peer consistency")
3691     for dev in instance.disks:
3692       if not dev.iv_name in self.op.disks:
3693         continue
3694       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3695       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3696         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3697                                  " unsafe to replace the secondary" %
3698                                  pri_node)
3699
3700     # Step: create new storage
3701     self.proc.LogStep(3, steps_total, "allocate new storage")
3702     for dev in instance.disks:
3703       size = dev.size
3704       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3705       # since we *always* want to create this LV, we use the
3706       # _Create...OnPrimary (which forces the creation), even if we
3707       # are talking about the secondary node
3708       for new_lv in dev.children:
3709         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3710                                         _GetInstanceInfoText(instance)):
3711           raise errors.OpExecError("Failed to create new LV named '%s' on"
3712                                    " node '%s'" %
3713                                    (new_lv.logical_id[1], new_node))
3714
3715       iv_names[dev.iv_name] = (dev, dev.children)
3716
3717     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3718     for dev in instance.disks:
3719       size = dev.size
3720       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3721       # create new devices on new_node
3722       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3723                               logical_id=(pri_node, new_node,
3724                                           dev.logical_id[2]),
3725                               children=dev.children)
3726       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3727                                         new_drbd, False,
3728                                       _GetInstanceInfoText(instance)):
3729         raise errors.OpExecError("Failed to create new DRBD on"
3730                                  " node '%s'" % new_node)
3731
3732     for dev in instance.disks:
3733       # we have new devices, shutdown the drbd on the old secondary
3734       info("shutting down drbd for %s on old node" % dev.iv_name)
3735       cfg.SetDiskID(dev, old_node)
3736       if not rpc.call_blockdev_shutdown(old_node, dev):
3737         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3738                 hint="Please cleanup this device manually as soon as possible")
3739
3740     info("detaching primary drbds from the network (=> standalone)")
3741     done = 0
3742     for dev in instance.disks:
3743       cfg.SetDiskID(dev, pri_node)
3744       # set the physical (unique in bdev terms) id to None, meaning
3745       # detach from network
3746       dev.physical_id = (None,) * len(dev.physical_id)
3747       # and 'find' the device, which will 'fix' it to match the
3748       # standalone state
3749       if rpc.call_blockdev_find(pri_node, dev):
3750         done += 1
3751       else:
3752         warning("Failed to detach drbd %s from network, unusual case" %
3753                 dev.iv_name)
3754
3755     if not done:
3756       # no detaches succeeded (very unlikely)
3757       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3758
3759     # if we managed to detach at least one, we update all the disks of
3760     # the instance to point to the new secondary
3761     info("updating instance configuration")
3762     for dev in instance.disks:
3763       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3764       cfg.SetDiskID(dev, pri_node)
3765     cfg.Update(instance)
3766
3767     # and now perform the drbd attach
3768     info("attaching primary drbds to new secondary (standalone => connected)")
3769     failures = []
3770     for dev in instance.disks:
3771       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3772       # since the attach is smart, it's enough to 'find' the device,
3773       # it will automatically activate the network, if the physical_id
3774       # is correct
3775       cfg.SetDiskID(dev, pri_node)
3776       if not rpc.call_blockdev_find(pri_node, dev):
3777         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3778                 "please do a gnt-instance info to see the status of disks")
3779
3780     # this can fail as the old devices are degraded and _WaitForSync
3781     # does a combined result over all disks, so we don't check its
3782     # return value
3783     self.proc.LogStep(5, steps_total, "sync devices")
3784     _WaitForSync(cfg, instance, self.proc, unlock=True)
3785
3786     # so check manually all the devices
3787     for name, (dev, old_lvs) in iv_names.iteritems():
3788       cfg.SetDiskID(dev, pri_node)
3789       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3790       if is_degr:
3791         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3792
3793     self.proc.LogStep(6, steps_total, "removing old storage")
3794     for name, (dev, old_lvs) in iv_names.iteritems():
3795       info("remove logical volumes for %s" % name)
3796       for lv in old_lvs:
3797         cfg.SetDiskID(lv, old_node)
3798         if not rpc.call_blockdev_remove(old_node, lv):
3799           warning("Can't remove LV on old secondary",
3800                   hint="Cleanup stale volumes by hand")
3801
3802   def Exec(self, feedback_fn):
3803     """Execute disk replacement.
3804
3805     This dispatches the disk replacement to the appropriate handler.
3806
3807     """
3808     instance = self.instance
3809
3810     # Activate the instance disks if we're replacing them on a down instance
3811     if instance.status == "down":
3812       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3813       self.proc.ChainOpCode(op)
3814
3815     if instance.disk_template == constants.DT_DRBD8:
3816       if self.op.remote_node is None:
3817         fn = self._ExecD8DiskOnly
3818       else:
3819         fn = self._ExecD8Secondary
3820     else:
3821       raise errors.ProgrammerError("Unhandled disk replacement case")
3822
3823     ret = fn(feedback_fn)
3824
3825     # Deactivate the instance disks if we're replacing them on a down instance
3826     if instance.status == "down":
3827       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3828       self.proc.ChainOpCode(op)
3829
3830     return ret
3831
3832
3833 class LUGrowDisk(LogicalUnit):
3834   """Grow a disk of an instance.
3835
3836   """
3837   HPATH = "disk-grow"
3838   HTYPE = constants.HTYPE_INSTANCE
3839   _OP_REQP = ["instance_name", "disk", "amount"]
3840
3841   def BuildHooksEnv(self):
3842     """Build hooks env.
3843
3844     This runs on the master, the primary and all the secondaries.
3845
3846     """
3847     env = {
3848       "DISK": self.op.disk,
3849       "AMOUNT": self.op.amount,
3850       }
3851     env.update(_BuildInstanceHookEnvByObject(self.instance))
3852     nl = [
3853       self.sstore.GetMasterNode(),
3854       self.instance.primary_node,
3855       ]
3856     return env, nl, nl
3857
3858   def CheckPrereq(self):
3859     """Check prerequisites.
3860
3861     This checks that the instance is in the cluster.
3862
3863     """
3864     instance = self.cfg.GetInstanceInfo(
3865       self.cfg.ExpandInstanceName(self.op.instance_name))
3866     if instance is None:
3867       raise errors.OpPrereqError("Instance '%s' not known" %
3868                                  self.op.instance_name)
3869     self.instance = instance
3870     self.op.instance_name = instance.name
3871
3872     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3873       raise errors.OpPrereqError("Instance's disk layout does not support"
3874                                  " growing.")
3875
3876     if instance.FindDisk(self.op.disk) is None:
3877       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3878                                  (self.op.disk, instance.name))
3879
3880     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3881     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3882     for node in nodenames:
3883       info = nodeinfo.get(node, None)
3884       if not info:
3885         raise errors.OpPrereqError("Cannot get current information"
3886                                    " from node '%s'" % node)
3887       vg_free = info.get('vg_free', None)
3888       if not isinstance(vg_free, int):
3889         raise errors.OpPrereqError("Can't compute free disk space on"
3890                                    " node %s" % node)
3891       if self.op.amount > info['vg_free']:
3892         raise errors.OpPrereqError("Not enough disk space on target node %s:"
3893                                    " %d MiB available, %d MiB required" %
3894                                    (node, info['vg_free'], self.op.amount))
3895
3896   def Exec(self, feedback_fn):
3897     """Execute disk grow.
3898
3899     """
3900     instance = self.instance
3901     disk = instance.FindDisk(self.op.disk)
3902     for node in (instance.secondary_nodes + (instance.primary_node,)):
3903       self.cfg.SetDiskID(disk, node)
3904       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3905       if not result or not isinstance(result, tuple) or len(result) != 2:
3906         raise errors.OpExecError("grow request failed to node %s" % node)
3907       elif not result[0]:
3908         raise errors.OpExecError("grow request failed to node %s: %s" %
3909                                  (node, result[1]))
3910     disk.RecordGrow(self.op.amount)
3911     self.cfg.Update(instance)
3912     return
3913
3914
3915 class LUQueryInstanceData(NoHooksLU):
3916   """Query runtime instance data.
3917
3918   """
3919   _OP_REQP = ["instances"]
3920
3921   def CheckPrereq(self):
3922     """Check prerequisites.
3923
3924     This only checks the optional instance list against the existing names.
3925
3926     """
3927     if not isinstance(self.op.instances, list):
3928       raise errors.OpPrereqError("Invalid argument type 'instances'")
3929     if self.op.instances:
3930       self.wanted_instances = []
3931       names = self.op.instances
3932       for name in names:
3933         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3934         if instance is None:
3935           raise errors.OpPrereqError("No such instance name '%s'" % name)
3936         self.wanted_instances.append(instance)
3937     else:
3938       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3939                                in self.cfg.GetInstanceList()]
3940     return
3941
3942
3943   def _ComputeDiskStatus(self, instance, snode, dev):
3944     """Compute block device status.
3945
3946     """
3947     self.cfg.SetDiskID(dev, instance.primary_node)
3948     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3949     if dev.dev_type in constants.LDS_DRBD:
3950       # we change the snode then (otherwise we use the one passed in)
3951       if dev.logical_id[0] == instance.primary_node:
3952         snode = dev.logical_id[1]
3953       else:
3954         snode = dev.logical_id[0]
3955
3956     if snode:
3957       self.cfg.SetDiskID(dev, snode)
3958       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3959     else:
3960       dev_sstatus = None
3961
3962     if dev.children:
3963       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3964                       for child in dev.children]
3965     else:
3966       dev_children = []
3967
3968     data = {
3969       "iv_name": dev.iv_name,
3970       "dev_type": dev.dev_type,
3971       "logical_id": dev.logical_id,
3972       "physical_id": dev.physical_id,
3973       "pstatus": dev_pstatus,
3974       "sstatus": dev_sstatus,
3975       "children": dev_children,
3976       }
3977
3978     return data
3979
3980   def Exec(self, feedback_fn):
3981     """Gather and return data"""
3982     result = {}
3983     for instance in self.wanted_instances:
3984       remote_info = rpc.call_instance_info(instance.primary_node,
3985                                                 instance.name)
3986       if remote_info and "state" in remote_info:
3987         remote_state = "up"
3988       else:
3989         remote_state = "down"
3990       if instance.status == "down":
3991         config_state = "down"
3992       else:
3993         config_state = "up"
3994
3995       disks = [self._ComputeDiskStatus(instance, None, device)
3996                for device in instance.disks]
3997
3998       idict = {
3999         "name": instance.name,
4000         "config_state": config_state,
4001         "run_state": remote_state,
4002         "pnode": instance.primary_node,
4003         "snodes": instance.secondary_nodes,
4004         "os": instance.os,
4005         "memory": instance.memory,
4006         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4007         "disks": disks,
4008         "vcpus": instance.vcpus,
4009         }
4010
4011       htkind = self.sstore.GetHypervisorType()
4012       if htkind == constants.HT_XEN_PVM30:
4013         idict["kernel_path"] = instance.kernel_path
4014         idict["initrd_path"] = instance.initrd_path
4015
4016       if htkind == constants.HT_XEN_HVM31:
4017         idict["hvm_boot_order"] = instance.hvm_boot_order
4018         idict["hvm_acpi"] = instance.hvm_acpi
4019         idict["hvm_pae"] = instance.hvm_pae
4020         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4021
4022       if htkind in constants.HTS_REQ_PORT:
4023         idict["vnc_bind_address"] = instance.vnc_bind_address
4024         idict["network_port"] = instance.network_port
4025
4026       result[instance.name] = idict
4027
4028     return result
4029
4030
4031 class LUSetInstanceParams(LogicalUnit):
4032   """Modifies an instances's parameters.
4033
4034   """
4035   HPATH = "instance-modify"
4036   HTYPE = constants.HTYPE_INSTANCE
4037   _OP_REQP = ["instance_name"]
4038
4039   def BuildHooksEnv(self):
4040     """Build hooks env.
4041
4042     This runs on the master, primary and secondaries.
4043
4044     """
4045     args = dict()
4046     if self.mem:
4047       args['memory'] = self.mem
4048     if self.vcpus:
4049       args['vcpus'] = self.vcpus
4050     if self.do_ip or self.do_bridge or self.mac:
4051       if self.do_ip:
4052         ip = self.ip
4053       else:
4054         ip = self.instance.nics[0].ip
4055       if self.bridge:
4056         bridge = self.bridge
4057       else:
4058         bridge = self.instance.nics[0].bridge
4059       if self.mac:
4060         mac = self.mac
4061       else:
4062         mac = self.instance.nics[0].mac
4063       args['nics'] = [(ip, bridge, mac)]
4064     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4065     nl = [self.sstore.GetMasterNode(),
4066           self.instance.primary_node] + list(self.instance.secondary_nodes)
4067     return env, nl, nl
4068
4069   def CheckPrereq(self):
4070     """Check prerequisites.
4071
4072     This only checks the instance list against the existing names.
4073
4074     """
4075     self.mem = getattr(self.op, "mem", None)
4076     self.vcpus = getattr(self.op, "vcpus", None)
4077     self.ip = getattr(self.op, "ip", None)
4078     self.mac = getattr(self.op, "mac", None)
4079     self.bridge = getattr(self.op, "bridge", None)
4080     self.kernel_path = getattr(self.op, "kernel_path", None)
4081     self.initrd_path = getattr(self.op, "initrd_path", None)
4082     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4083     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4084     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4085     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4086     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4087     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4088                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4089                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4090                  self.vnc_bind_address]
4091     if all_parms.count(None) == len(all_parms):
4092       raise errors.OpPrereqError("No changes submitted")
4093     if self.mem is not None:
4094       try:
4095         self.mem = int(self.mem)
4096       except ValueError, err:
4097         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4098     if self.vcpus is not None:
4099       try:
4100         self.vcpus = int(self.vcpus)
4101       except ValueError, err:
4102         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4103     if self.ip is not None:
4104       self.do_ip = True
4105       if self.ip.lower() == "none":
4106         self.ip = None
4107       else:
4108         if not utils.IsValidIP(self.ip):
4109           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4110     else:
4111       self.do_ip = False
4112     self.do_bridge = (self.bridge is not None)
4113     if self.mac is not None:
4114       if self.cfg.IsMacInUse(self.mac):
4115         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4116                                    self.mac)
4117       if not utils.IsValidMac(self.mac):
4118         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4119
4120     if self.kernel_path is not None:
4121       self.do_kernel_path = True
4122       if self.kernel_path == constants.VALUE_NONE:
4123         raise errors.OpPrereqError("Can't set instance to no kernel")
4124
4125       if self.kernel_path != constants.VALUE_DEFAULT:
4126         if not os.path.isabs(self.kernel_path):
4127           raise errors.OpPrereqError("The kernel path must be an absolute"
4128                                     " filename")
4129     else:
4130       self.do_kernel_path = False
4131
4132     if self.initrd_path is not None:
4133       self.do_initrd_path = True
4134       if self.initrd_path not in (constants.VALUE_NONE,
4135                                   constants.VALUE_DEFAULT):
4136         if not os.path.isabs(self.initrd_path):
4137           raise errors.OpPrereqError("The initrd path must be an absolute"
4138                                     " filename")
4139     else:
4140       self.do_initrd_path = False
4141
4142     # boot order verification
4143     if self.hvm_boot_order is not None:
4144       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4145         if len(self.hvm_boot_order.strip("acdn")) != 0:
4146           raise errors.OpPrereqError("invalid boot order specified,"
4147                                      " must be one or more of [acdn]"
4148                                      " or 'default'")
4149
4150     # hvm_cdrom_image_path verification
4151     if self.op.hvm_cdrom_image_path is not None:
4152       if not os.path.isabs(self.op.hvm_cdrom_image_path):
4153         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4154                                    " be an absolute path or None, not %s" %
4155                                    self.op.hvm_cdrom_image_path)
4156       if not os.path.isfile(self.op.hvm_cdrom_image_path):
4157         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4158                                    " regular file or a symlink pointing to"
4159                                    " an existing regular file, not %s" %
4160                                    self.op.hvm_cdrom_image_path)
4161
4162     # vnc_bind_address verification
4163     if self.op.vnc_bind_address is not None:
4164       if not utils.IsValidIP(self.op.vnc_bind_address):
4165         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4166                                    " like a valid IP address" %
4167                                    self.op.vnc_bind_address)
4168
4169     instance = self.cfg.GetInstanceInfo(
4170       self.cfg.ExpandInstanceName(self.op.instance_name))
4171     if instance is None:
4172       raise errors.OpPrereqError("No such instance name '%s'" %
4173                                  self.op.instance_name)
4174     self.op.instance_name = instance.name
4175     self.instance = instance
4176     return
4177
4178   def Exec(self, feedback_fn):
4179     """Modifies an instance.
4180
4181     All parameters take effect only at the next restart of the instance.
4182     """
4183     result = []
4184     instance = self.instance
4185     if self.mem:
4186       instance.memory = self.mem
4187       result.append(("mem", self.mem))
4188     if self.vcpus:
4189       instance.vcpus = self.vcpus
4190       result.append(("vcpus",  self.vcpus))
4191     if self.do_ip:
4192       instance.nics[0].ip = self.ip
4193       result.append(("ip", self.ip))
4194     if self.bridge:
4195       instance.nics[0].bridge = self.bridge
4196       result.append(("bridge", self.bridge))
4197     if self.mac:
4198       instance.nics[0].mac = self.mac
4199       result.append(("mac", self.mac))
4200     if self.do_kernel_path:
4201       instance.kernel_path = self.kernel_path
4202       result.append(("kernel_path", self.kernel_path))
4203     if self.do_initrd_path:
4204       instance.initrd_path = self.initrd_path
4205       result.append(("initrd_path", self.initrd_path))
4206     if self.hvm_boot_order:
4207       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4208         instance.hvm_boot_order = None
4209       else:
4210         instance.hvm_boot_order = self.hvm_boot_order
4211       result.append(("hvm_boot_order", self.hvm_boot_order))
4212     if self.hvm_acpi:
4213       instance.hvm_acpi = self.hvm_acpi
4214       result.append(("hvm_acpi", self.hvm_acpi))
4215     if self.hvm_pae:
4216       instance.hvm_pae = self.hvm_pae
4217       result.append(("hvm_pae", self.hvm_pae))
4218     if self.hvm_cdrom_image_path:
4219       instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4220       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4221     if self.vnc_bind_address:
4222       instance.vnc_bind_address = self.vnc_bind_address
4223       result.append(("vnc_bind_address", self.vnc_bind_address))
4224
4225     self.cfg.AddInstance(instance)
4226
4227     return result
4228
4229
4230 class LUQueryExports(NoHooksLU):
4231   """Query the exports list
4232
4233   """
4234   _OP_REQP = []
4235
4236   def CheckPrereq(self):
4237     """Check that the nodelist contains only existing nodes.
4238
4239     """
4240     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4241
4242   def Exec(self, feedback_fn):
4243     """Compute the list of all the exported system images.
4244
4245     Returns:
4246       a dictionary with the structure node->(export-list)
4247       where export-list is a list of the instances exported on
4248       that node.
4249
4250     """
4251     return rpc.call_export_list(self.nodes)
4252
4253
4254 class LUExportInstance(LogicalUnit):
4255   """Export an instance to an image in the cluster.
4256
4257   """
4258   HPATH = "instance-export"
4259   HTYPE = constants.HTYPE_INSTANCE
4260   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4261
4262   def BuildHooksEnv(self):
4263     """Build hooks env.
4264
4265     This will run on the master, primary node and target node.
4266
4267     """
4268     env = {
4269       "EXPORT_NODE": self.op.target_node,
4270       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4271       }
4272     env.update(_BuildInstanceHookEnvByObject(self.instance))
4273     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4274           self.op.target_node]
4275     return env, nl, nl
4276
4277   def CheckPrereq(self):
4278     """Check prerequisites.
4279
4280     This checks that the instance and node names are valid.
4281
4282     """
4283     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4284     self.instance = self.cfg.GetInstanceInfo(instance_name)
4285     if self.instance is None:
4286       raise errors.OpPrereqError("Instance '%s' not found" %
4287                                  self.op.instance_name)
4288
4289     # node verification
4290     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4291     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4292
4293     if self.dst_node is None:
4294       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4295                                  self.op.target_node)
4296     self.op.target_node = self.dst_node.name
4297
4298     # instance disk type verification
4299     for disk in self.instance.disks:
4300       if disk.dev_type == constants.LD_FILE:
4301         raise errors.OpPrereqError("Export not supported for instances with"
4302                                    " file-based disks")
4303
4304   def Exec(self, feedback_fn):
4305     """Export an instance to an image in the cluster.
4306
4307     """
4308     instance = self.instance
4309     dst_node = self.dst_node
4310     src_node = instance.primary_node
4311     if self.op.shutdown:
4312       # shutdown the instance, but not the disks
4313       if not rpc.call_instance_shutdown(src_node, instance):
4314          raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4315                                   (instance.name, src_node))
4316
4317     vgname = self.cfg.GetVGName()
4318
4319     snap_disks = []
4320
4321     try:
4322       for disk in instance.disks:
4323         if disk.iv_name == "sda":
4324           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4325           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4326
4327           if not new_dev_name:
4328             logger.Error("could not snapshot block device %s on node %s" %
4329                          (disk.logical_id[1], src_node))
4330           else:
4331             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4332                                       logical_id=(vgname, new_dev_name),
4333                                       physical_id=(vgname, new_dev_name),
4334                                       iv_name=disk.iv_name)
4335             snap_disks.append(new_dev)
4336
4337     finally:
4338       if self.op.shutdown and instance.status == "up":
4339         if not rpc.call_instance_start(src_node, instance, None):
4340           _ShutdownInstanceDisks(instance, self.cfg)
4341           raise errors.OpExecError("Could not start instance")
4342
4343     # TODO: check for size
4344
4345     for dev in snap_disks:
4346       if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4347         logger.Error("could not export block device %s from node %s to node %s"
4348                      % (dev.logical_id[1], src_node, dst_node.name))
4349       if not rpc.call_blockdev_remove(src_node, dev):
4350         logger.Error("could not remove snapshot block device %s from node %s" %
4351                      (dev.logical_id[1], src_node))
4352
4353     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4354       logger.Error("could not finalize export for instance %s on node %s" %
4355                    (instance.name, dst_node.name))
4356
4357     nodelist = self.cfg.GetNodeList()
4358     nodelist.remove(dst_node.name)
4359
4360     # on one-node clusters nodelist will be empty after the removal
4361     # if we proceed the backup would be removed because OpQueryExports
4362     # substitutes an empty list with the full cluster node list.
4363     if nodelist:
4364       op = opcodes.OpQueryExports(nodes=nodelist)
4365       exportlist = self.proc.ChainOpCode(op)
4366       for node in exportlist:
4367         if instance.name in exportlist[node]:
4368           if not rpc.call_export_remove(node, instance.name):
4369             logger.Error("could not remove older export for instance %s"
4370                          " on node %s" % (instance.name, node))
4371
4372
4373 class LURemoveExport(NoHooksLU):
4374   """Remove exports related to the named instance.
4375
4376   """
4377   _OP_REQP = ["instance_name"]
4378
4379   def CheckPrereq(self):
4380     """Check prerequisites.
4381     """
4382     pass
4383
4384   def Exec(self, feedback_fn):
4385     """Remove any export.
4386
4387     """
4388     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4389     # If the instance was not found we'll try with the name that was passed in.
4390     # This will only work if it was an FQDN, though.
4391     fqdn_warn = False
4392     if not instance_name:
4393       fqdn_warn = True
4394       instance_name = self.op.instance_name
4395
4396     op = opcodes.OpQueryExports(nodes=[])
4397     exportlist = self.proc.ChainOpCode(op)
4398     found = False
4399     for node in exportlist:
4400       if instance_name in exportlist[node]:
4401         found = True
4402         if not rpc.call_export_remove(node, instance_name):
4403           logger.Error("could not remove export for instance %s"
4404                        " on node %s" % (instance_name, node))
4405
4406     if fqdn_warn and not found:
4407       feedback_fn("Export not found. If trying to remove an export belonging"
4408                   " to a deleted instance please use its Fully Qualified"
4409                   " Domain Name.")
4410
4411
4412 class TagsLU(NoHooksLU):
4413   """Generic tags LU.
4414
4415   This is an abstract class which is the parent of all the other tags LUs.
4416
4417   """
4418   def CheckPrereq(self):
4419     """Check prerequisites.
4420
4421     """
4422     if self.op.kind == constants.TAG_CLUSTER:
4423       self.target = self.cfg.GetClusterInfo()
4424     elif self.op.kind == constants.TAG_NODE:
4425       name = self.cfg.ExpandNodeName(self.op.name)
4426       if name is None:
4427         raise errors.OpPrereqError("Invalid node name (%s)" %
4428                                    (self.op.name,))
4429       self.op.name = name
4430       self.target = self.cfg.GetNodeInfo(name)
4431     elif self.op.kind == constants.TAG_INSTANCE:
4432       name = self.cfg.ExpandInstanceName(self.op.name)
4433       if name is None:
4434         raise errors.OpPrereqError("Invalid instance name (%s)" %
4435                                    (self.op.name,))
4436       self.op.name = name
4437       self.target = self.cfg.GetInstanceInfo(name)
4438     else:
4439       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4440                                  str(self.op.kind))
4441
4442
4443 class LUGetTags(TagsLU):
4444   """Returns the tags of a given object.
4445
4446   """
4447   _OP_REQP = ["kind", "name"]
4448
4449   def Exec(self, feedback_fn):
4450     """Returns the tag list.
4451
4452     """
4453     return self.target.GetTags()
4454
4455
4456 class LUSearchTags(NoHooksLU):
4457   """Searches the tags for a given pattern.
4458
4459   """
4460   _OP_REQP = ["pattern"]
4461
4462   def CheckPrereq(self):
4463     """Check prerequisites.
4464
4465     This checks the pattern passed for validity by compiling it.
4466
4467     """
4468     try:
4469       self.re = re.compile(self.op.pattern)
4470     except re.error, err:
4471       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4472                                  (self.op.pattern, err))
4473
4474   def Exec(self, feedback_fn):
4475     """Returns the tag list.
4476
4477     """
4478     cfg = self.cfg
4479     tgts = [("/cluster", cfg.GetClusterInfo())]
4480     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4481     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4482     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4483     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4484     results = []
4485     for path, target in tgts:
4486       for tag in target.GetTags():
4487         if self.re.search(tag):
4488           results.append((path, tag))
4489     return results
4490
4491
4492 class LUAddTags(TagsLU):
4493   """Sets a tag on a given object.
4494
4495   """
4496   _OP_REQP = ["kind", "name", "tags"]
4497
4498   def CheckPrereq(self):
4499     """Check prerequisites.
4500
4501     This checks the type and length of the tag name and value.
4502
4503     """
4504     TagsLU.CheckPrereq(self)
4505     for tag in self.op.tags:
4506       objects.TaggableObject.ValidateTag(tag)
4507
4508   def Exec(self, feedback_fn):
4509     """Sets the tag.
4510
4511     """
4512     try:
4513       for tag in self.op.tags:
4514         self.target.AddTag(tag)
4515     except errors.TagError, err:
4516       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4517     try:
4518       self.cfg.Update(self.target)
4519     except errors.ConfigurationError:
4520       raise errors.OpRetryError("There has been a modification to the"
4521                                 " config file and the operation has been"
4522                                 " aborted. Please retry.")
4523
4524
4525 class LUDelTags(TagsLU):
4526   """Delete a list of tags from a given object.
4527
4528   """
4529   _OP_REQP = ["kind", "name", "tags"]
4530
4531   def CheckPrereq(self):
4532     """Check prerequisites.
4533
4534     This checks that we have the given tag.
4535
4536     """
4537     TagsLU.CheckPrereq(self)
4538     for tag in self.op.tags:
4539       objects.TaggableObject.ValidateTag(tag)
4540     del_tags = frozenset(self.op.tags)
4541     cur_tags = self.target.GetTags()
4542     if not del_tags <= cur_tags:
4543       diff_tags = del_tags - cur_tags
4544       diff_names = ["'%s'" % tag for tag in diff_tags]
4545       diff_names.sort()
4546       raise errors.OpPrereqError("Tag(s) %s not found" %
4547                                  (",".join(diff_names)))
4548
4549   def Exec(self, feedback_fn):
4550     """Remove the tag from the object.
4551
4552     """
4553     for tag in self.op.tags:
4554       self.target.RemoveTag(tag)
4555     try:
4556       self.cfg.Update(self.target)
4557     except errors.ConfigurationError:
4558       raise errors.OpRetryError("There has been a modification to the"
4559                                 " config file and the operation has been"
4560                                 " aborted. Please retry.")
4561
4562 class LUTestDelay(NoHooksLU):
4563   """Sleep for a specified amount of time.
4564
4565   This LU sleeps on the master and/or nodes for a specified amount of
4566   time.
4567
4568   """
4569   _OP_REQP = ["duration", "on_master", "on_nodes"]
4570
4571   def CheckPrereq(self):
4572     """Check prerequisites.
4573
4574     This checks that we have a good list of nodes and/or the duration
4575     is valid.
4576
4577     """
4578
4579     if self.op.on_nodes:
4580       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4581
4582   def Exec(self, feedback_fn):
4583     """Do the actual sleep.
4584
4585     """
4586     if self.op.on_master:
4587       if not utils.TestDelay(self.op.duration):
4588         raise errors.OpExecError("Error during master delay test")
4589     if self.op.on_nodes:
4590       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4591       if not result:
4592         raise errors.OpExecError("Complete failure from rpc call")
4593       for node, node_result in result.items():
4594         if not node_result:
4595           raise errors.OpExecError("Failure during rpc call to node %s,"
4596                                    " result: %s" % (node, node_result))
4597
4598
4599 class IAllocator(object):
4600   """IAllocator framework.
4601
4602   An IAllocator instance has three sets of attributes:
4603     - cfg/sstore that are needed to query the cluster
4604     - input data (all members of the _KEYS class attribute are required)
4605     - four buffer attributes (in|out_data|text), that represent the
4606       input (to the external script) in text and data structure format,
4607       and the output from it, again in two formats
4608     - the result variables from the script (success, info, nodes) for
4609       easy usage
4610
4611   """
4612   _ALLO_KEYS = [
4613     "mem_size", "disks", "disk_template",
4614     "os", "tags", "nics", "vcpus",
4615     ]
4616   _RELO_KEYS = [
4617     "relocate_from",
4618     ]
4619
4620   def __init__(self, cfg, sstore, mode, name, **kwargs):
4621     self.cfg = cfg
4622     self.sstore = sstore
4623     # init buffer variables
4624     self.in_text = self.out_text = self.in_data = self.out_data = None
4625     # init all input fields so that pylint is happy
4626     self.mode = mode
4627     self.name = name
4628     self.mem_size = self.disks = self.disk_template = None
4629     self.os = self.tags = self.nics = self.vcpus = None
4630     self.relocate_from = None
4631     # computed fields
4632     self.required_nodes = None
4633     # init result fields
4634     self.success = self.info = self.nodes = None
4635     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4636       keyset = self._ALLO_KEYS
4637     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4638       keyset = self._RELO_KEYS
4639     else:
4640       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4641                                    " IAllocator" % self.mode)
4642     for key in kwargs:
4643       if key not in keyset:
4644         raise errors.ProgrammerError("Invalid input parameter '%s' to"
4645                                      " IAllocator" % key)
4646       setattr(self, key, kwargs[key])
4647     for key in keyset:
4648       if key not in kwargs:
4649         raise errors.ProgrammerError("Missing input parameter '%s' to"
4650                                      " IAllocator" % key)
4651     self._BuildInputData()
4652
4653   def _ComputeClusterData(self):
4654     """Compute the generic allocator input data.
4655
4656     This is the data that is independent of the actual operation.
4657
4658     """
4659     cfg = self.cfg
4660     # cluster data
4661     data = {
4662       "version": 1,
4663       "cluster_name": self.sstore.GetClusterName(),
4664       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4665       "hypervisor_type": self.sstore.GetHypervisorType(),
4666       # we don't have job IDs
4667       }
4668
4669     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4670
4671     # node data
4672     node_results = {}
4673     node_list = cfg.GetNodeList()
4674     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4675     for nname in node_list:
4676       ninfo = cfg.GetNodeInfo(nname)
4677       if nname not in node_data or not isinstance(node_data[nname], dict):
4678         raise errors.OpExecError("Can't get data for node %s" % nname)
4679       remote_info = node_data[nname]
4680       for attr in ['memory_total', 'memory_free', 'memory_dom0',
4681                    'vg_size', 'vg_free', 'cpu_total']:
4682         if attr not in remote_info:
4683           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4684                                    (nname, attr))
4685         try:
4686           remote_info[attr] = int(remote_info[attr])
4687         except ValueError, err:
4688           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4689                                    " %s" % (nname, attr, str(err)))
4690       # compute memory used by primary instances
4691       i_p_mem = i_p_up_mem = 0
4692       for iinfo in i_list:
4693         if iinfo.primary_node == nname:
4694           i_p_mem += iinfo.memory
4695           if iinfo.status == "up":
4696             i_p_up_mem += iinfo.memory
4697
4698       # compute memory used by instances
4699       pnr = {
4700         "tags": list(ninfo.GetTags()),
4701         "total_memory": remote_info['memory_total'],
4702         "reserved_memory": remote_info['memory_dom0'],
4703         "free_memory": remote_info['memory_free'],
4704         "i_pri_memory": i_p_mem,
4705         "i_pri_up_memory": i_p_up_mem,
4706         "total_disk": remote_info['vg_size'],
4707         "free_disk": remote_info['vg_free'],
4708         "primary_ip": ninfo.primary_ip,
4709         "secondary_ip": ninfo.secondary_ip,
4710         "total_cpus": remote_info['cpu_total'],
4711         }
4712       node_results[nname] = pnr
4713     data["nodes"] = node_results
4714
4715     # instance data
4716     instance_data = {}
4717     for iinfo in i_list:
4718       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4719                   for n in iinfo.nics]
4720       pir = {
4721         "tags": list(iinfo.GetTags()),
4722         "should_run": iinfo.status == "up",
4723         "vcpus": iinfo.vcpus,
4724         "memory": iinfo.memory,
4725         "os": iinfo.os,
4726         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4727         "nics": nic_data,
4728         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4729         "disk_template": iinfo.disk_template,
4730         }
4731       instance_data[iinfo.name] = pir
4732
4733     data["instances"] = instance_data
4734
4735     self.in_data = data
4736
4737   def _AddNewInstance(self):
4738     """Add new instance data to allocator structure.
4739
4740     This in combination with _AllocatorGetClusterData will create the
4741     correct structure needed as input for the allocator.
4742
4743     The checks for the completeness of the opcode must have already been
4744     done.
4745
4746     """
4747     data = self.in_data
4748     if len(self.disks) != 2:
4749       raise errors.OpExecError("Only two-disk configurations supported")
4750
4751     disk_space = _ComputeDiskSize(self.disk_template,
4752                                   self.disks[0]["size"], self.disks[1]["size"])
4753
4754     if self.disk_template in constants.DTS_NET_MIRROR:
4755       self.required_nodes = 2
4756     else:
4757       self.required_nodes = 1
4758     request = {
4759       "type": "allocate",
4760       "name": self.name,
4761       "disk_template": self.disk_template,
4762       "tags": self.tags,
4763       "os": self.os,
4764       "vcpus": self.vcpus,
4765       "memory": self.mem_size,
4766       "disks": self.disks,
4767       "disk_space_total": disk_space,
4768       "nics": self.nics,
4769       "required_nodes": self.required_nodes,
4770       }
4771     data["request"] = request
4772
4773   def _AddRelocateInstance(self):
4774     """Add relocate instance data to allocator structure.
4775
4776     This in combination with _IAllocatorGetClusterData will create the
4777     correct structure needed as input for the allocator.
4778
4779     The checks for the completeness of the opcode must have already been
4780     done.
4781
4782     """
4783     instance = self.cfg.GetInstanceInfo(self.name)
4784     if instance is None:
4785       raise errors.ProgrammerError("Unknown instance '%s' passed to"
4786                                    " IAllocator" % self.name)
4787
4788     if instance.disk_template not in constants.DTS_NET_MIRROR:
4789       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4790
4791     if len(instance.secondary_nodes) != 1:
4792       raise errors.OpPrereqError("Instance has not exactly one secondary node")
4793
4794     self.required_nodes = 1
4795
4796     disk_space = _ComputeDiskSize(instance.disk_template,
4797                                   instance.disks[0].size,
4798                                   instance.disks[1].size)
4799
4800     request = {
4801       "type": "relocate",
4802       "name": self.name,
4803       "disk_space_total": disk_space,
4804       "required_nodes": self.required_nodes,
4805       "relocate_from": self.relocate_from,
4806       }
4807     self.in_data["request"] = request
4808
4809   def _BuildInputData(self):
4810     """Build input data structures.
4811
4812     """
4813     self._ComputeClusterData()
4814
4815     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4816       self._AddNewInstance()
4817     else:
4818       self._AddRelocateInstance()
4819
4820     self.in_text = serializer.Dump(self.in_data)
4821
4822   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4823     """Run an instance allocator and return the results.
4824
4825     """
4826     data = self.in_text
4827
4828     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4829
4830     if not isinstance(result, tuple) or len(result) != 4:
4831       raise errors.OpExecError("Invalid result from master iallocator runner")
4832
4833     rcode, stdout, stderr, fail = result
4834
4835     if rcode == constants.IARUN_NOTFOUND:
4836       raise errors.OpExecError("Can't find allocator '%s'" % name)
4837     elif rcode == constants.IARUN_FAILURE:
4838         raise errors.OpExecError("Instance allocator call failed: %s,"
4839                                  " output: %s" %
4840                                  (fail, stdout+stderr))
4841     self.out_text = stdout
4842     if validate:
4843       self._ValidateResult()
4844
4845   def _ValidateResult(self):
4846     """Process the allocator results.
4847
4848     This will process and if successful save the result in
4849     self.out_data and the other parameters.
4850
4851     """
4852     try:
4853       rdict = serializer.Load(self.out_text)
4854     except Exception, err:
4855       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4856
4857     if not isinstance(rdict, dict):
4858       raise errors.OpExecError("Can't parse iallocator results: not a dict")
4859
4860     for key in "success", "info", "nodes":
4861       if key not in rdict:
4862         raise errors.OpExecError("Can't parse iallocator results:"
4863                                  " missing key '%s'" % key)
4864       setattr(self, key, rdict[key])
4865
4866     if not isinstance(rdict["nodes"], list):
4867       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4868                                " is not a list")
4869     self.out_data = rdict
4870
4871
4872 class LUTestAllocator(NoHooksLU):
4873   """Run allocator tests.
4874
4875   This LU runs the allocator tests
4876
4877   """
4878   _OP_REQP = ["direction", "mode", "name"]
4879
4880   def CheckPrereq(self):
4881     """Check prerequisites.
4882
4883     This checks the opcode parameters depending on the director and mode test.
4884
4885     """
4886     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4887       for attr in ["name", "mem_size", "disks", "disk_template",
4888                    "os", "tags", "nics", "vcpus"]:
4889         if not hasattr(self.op, attr):
4890           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4891                                      attr)
4892       iname = self.cfg.ExpandInstanceName(self.op.name)
4893       if iname is not None:
4894         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4895                                    iname)
4896       if not isinstance(self.op.nics, list):
4897         raise errors.OpPrereqError("Invalid parameter 'nics'")
4898       for row in self.op.nics:
4899         if (not isinstance(row, dict) or
4900             "mac" not in row or
4901             "ip" not in row or
4902             "bridge" not in row):
4903           raise errors.OpPrereqError("Invalid contents of the"
4904                                      " 'nics' parameter")
4905       if not isinstance(self.op.disks, list):
4906         raise errors.OpPrereqError("Invalid parameter 'disks'")
4907       if len(self.op.disks) != 2:
4908         raise errors.OpPrereqError("Only two-disk configurations supported")
4909       for row in self.op.disks:
4910         if (not isinstance(row, dict) or
4911             "size" not in row or
4912             not isinstance(row["size"], int) or
4913             "mode" not in row or
4914             row["mode"] not in ['r', 'w']):
4915           raise errors.OpPrereqError("Invalid contents of the"
4916                                      " 'disks' parameter")
4917     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4918       if not hasattr(self.op, "name"):
4919         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4920       fname = self.cfg.ExpandInstanceName(self.op.name)
4921       if fname is None:
4922         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4923                                    self.op.name)
4924       self.op.name = fname
4925       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4926     else:
4927       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4928                                  self.op.mode)
4929
4930     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4931       if not hasattr(self.op, "allocator") or self.op.allocator is None:
4932         raise errors.OpPrereqError("Missing allocator name")
4933     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4934       raise errors.OpPrereqError("Wrong allocator test '%s'" %
4935                                  self.op.direction)
4936
4937   def Exec(self, feedback_fn):
4938     """Run the allocator test.
4939
4940     """
4941     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4942       ial = IAllocator(self.cfg, self.sstore,
4943                        mode=self.op.mode,
4944                        name=self.op.name,
4945                        mem_size=self.op.mem_size,
4946                        disks=self.op.disks,
4947                        disk_template=self.op.disk_template,
4948                        os=self.op.os,
4949                        tags=self.op.tags,
4950                        nics=self.op.nics,
4951                        vcpus=self.op.vcpus,
4952                        )
4953     else:
4954       ial = IAllocator(self.cfg, self.sstore,
4955                        mode=self.op.mode,
4956                        name=self.op.name,
4957                        relocate_from=list(self.relocate_from),
4958                        )
4959
4960     if self.op.direction == constants.IALLOCATOR_DIR_IN:
4961       result = ial.in_text
4962     else:
4963       ial.Run(self.op.allocator, validate=False)
4964       result = ial.out_text
4965     return result