Change JobStorage to work with ids not filenames
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46 from ganeti import serializer
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_MASTER: the LU needs to run on the master node
60         REQ_WSSTORE: the LU needs a writable SimpleStore
61         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
62
63   Note that all commands require root permissions.
64
65   """
66   HPATH = None
67   HTYPE = None
68   _OP_REQP = []
69   REQ_MASTER = True
70   REQ_WSSTORE = False
71   REQ_BGL = True
72
73   def __init__(self, processor, op, context, sstore):
74     """Constructor for LogicalUnit.
75
76     This needs to be overriden in derived classes in order to check op
77     validity.
78
79     """
80     self.proc = processor
81     self.op = op
82     self.cfg = context.cfg
83     self.sstore = sstore
84     self.context = context
85     self.needed_locks = None
86     self.__ssh = None
87
88     for attr_name in self._OP_REQP:
89       attr_val = getattr(op, attr_name, None)
90       if attr_val is None:
91         raise errors.OpPrereqError("Required parameter '%s' missing" %
92                                    attr_name)
93
94     if not self.cfg.IsCluster():
95       raise errors.OpPrereqError("Cluster not initialized yet,"
96                                  " use 'gnt-cluster init' first.")
97     if self.REQ_MASTER:
98       master = sstore.GetMasterNode()
99       if master != utils.HostInfo().name:
100         raise errors.OpPrereqError("Commands must be run on the master"
101                                    " node %s" % master)
102
103   def __GetSSH(self):
104     """Returns the SshRunner object
105
106     """
107     if not self.__ssh:
108       self.__ssh = ssh.SshRunner(self.sstore)
109     return self.__ssh
110
111   ssh = property(fget=__GetSSH)
112
113   def ExpandNames(self):
114     """Expand names for this LU.
115
116     This method is called before starting to execute the opcode, and it should
117     update all the parameters of the opcode to their canonical form (e.g. a
118     short node name must be fully expanded after this method has successfully
119     completed). This way locking, hooks, logging, ecc. can work correctly.
120
121     LUs which implement this method must also populate the self.needed_locks
122     member, as a dict with lock levels as keys, and a list of needed lock names
123     as values. Rules:
124       - Use an empty dict if you don't need any lock
125       - If you don't need any lock at a particular level omit that level
126       - Don't put anything for the BGL level
127       - If you want all locks at a level use None as a value
128         (this reflects what LockSet does, and will be replaced before
129         CheckPrereq with the full list of nodes that have been locked)
130
131     Examples:
132     # Acquire all nodes and one instance
133     self.needed_locks = {
134       locking.LEVEL_NODE: None,
135       locking.LEVEL_INSTANCES: ['instance1.example.tld'],
136     }
137     # Acquire just two nodes
138     self.needed_locks = {
139       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
140     }
141     # Acquire no locks
142     self.needed_locks = {} # No, you can't leave it to the default value None
143
144     """
145     # The implementation of this method is mandatory only if the new LU is
146     # concurrent, so that old LUs don't need to be changed all at the same
147     # time.
148     if self.REQ_BGL:
149       self.needed_locks = {} # Exclusive LUs don't need locks.
150     else:
151       raise NotImplementedError
152
153   def CheckPrereq(self):
154     """Check prerequisites for this LU.
155
156     This method should check that the prerequisites for the execution
157     of this LU are fulfilled. It can do internode communication, but
158     it should be idempotent - no cluster or system changes are
159     allowed.
160
161     The method should raise errors.OpPrereqError in case something is
162     not fulfilled. Its return value is ignored.
163
164     This method should also update all the parameters of the opcode to
165     their canonical form if it hasn't been done by ExpandNames before.
166
167     """
168     raise NotImplementedError
169
170   def Exec(self, feedback_fn):
171     """Execute the LU.
172
173     This method should implement the actual work. It should raise
174     errors.OpExecError for failures that are somewhat dealt with in
175     code, or expected.
176
177     """
178     raise NotImplementedError
179
180   def BuildHooksEnv(self):
181     """Build hooks environment for this LU.
182
183     This method should return a three-node tuple consisting of: a dict
184     containing the environment that will be used for running the
185     specific hook for this LU, a list of node names on which the hook
186     should run before the execution, and a list of node names on which
187     the hook should run after the execution.
188
189     The keys of the dict must not have 'GANETI_' prefixed as this will
190     be handled in the hooks runner. Also note additional keys will be
191     added by the hooks runner. If the LU doesn't define any
192     environment, an empty dict (and not None) should be returned.
193
194     No nodes should be returned as an empty list (and not None).
195
196     Note that if the HPATH for a LU class is None, this function will
197     not be called.
198
199     """
200     raise NotImplementedError
201
202   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
203     """Notify the LU about the results of its hooks.
204
205     This method is called every time a hooks phase is executed, and notifies
206     the Logical Unit about the hooks' result. The LU can then use it to alter
207     its result based on the hooks.  By default the method does nothing and the
208     previous result is passed back unchanged but any LU can define it if it
209     wants to use the local cluster hook-scripts somehow.
210
211     Args:
212       phase: the hooks phase that has just been run
213       hooks_results: the results of the multi-node hooks rpc call
214       feedback_fn: function to send feedback back to the caller
215       lu_result: the previous result this LU had, or None in the PRE phase.
216
217     """
218     return lu_result
219
220
221 class NoHooksLU(LogicalUnit):
222   """Simple LU which runs no hooks.
223
224   This LU is intended as a parent for other LogicalUnits which will
225   run no hooks, in order to reduce duplicate code.
226
227   """
228   HPATH = None
229   HTYPE = None
230
231
232 def _GetWantedNodes(lu, nodes):
233   """Returns list of checked and expanded node names.
234
235   Args:
236     nodes: List of nodes (strings) or None for all
237
238   """
239   if not isinstance(nodes, list):
240     raise errors.OpPrereqError("Invalid argument type 'nodes'")
241
242   if nodes:
243     wanted = []
244
245     for name in nodes:
246       node = lu.cfg.ExpandNodeName(name)
247       if node is None:
248         raise errors.OpPrereqError("No such node name '%s'" % name)
249       wanted.append(node)
250
251   else:
252     wanted = lu.cfg.GetNodeList()
253   return utils.NiceSort(wanted)
254
255
256 def _GetWantedInstances(lu, instances):
257   """Returns list of checked and expanded instance names.
258
259   Args:
260     instances: List of instances (strings) or None for all
261
262   """
263   if not isinstance(instances, list):
264     raise errors.OpPrereqError("Invalid argument type 'instances'")
265
266   if instances:
267     wanted = []
268
269     for name in instances:
270       instance = lu.cfg.ExpandInstanceName(name)
271       if instance is None:
272         raise errors.OpPrereqError("No such instance name '%s'" % name)
273       wanted.append(instance)
274
275   else:
276     wanted = lu.cfg.GetInstanceList()
277   return utils.NiceSort(wanted)
278
279
280 def _CheckOutputFields(static, dynamic, selected):
281   """Checks whether all selected fields are valid.
282
283   Args:
284     static: Static fields
285     dynamic: Dynamic fields
286
287   """
288   static_fields = frozenset(static)
289   dynamic_fields = frozenset(dynamic)
290
291   all_fields = static_fields | dynamic_fields
292
293   if not all_fields.issuperset(selected):
294     raise errors.OpPrereqError("Unknown output fields selected: %s"
295                                % ",".join(frozenset(selected).
296                                           difference(all_fields)))
297
298
299 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
300                           memory, vcpus, nics):
301   """Builds instance related env variables for hooks from single variables.
302
303   Args:
304     secondary_nodes: List of secondary nodes as strings
305   """
306   env = {
307     "OP_TARGET": name,
308     "INSTANCE_NAME": name,
309     "INSTANCE_PRIMARY": primary_node,
310     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
311     "INSTANCE_OS_TYPE": os_type,
312     "INSTANCE_STATUS": status,
313     "INSTANCE_MEMORY": memory,
314     "INSTANCE_VCPUS": vcpus,
315   }
316
317   if nics:
318     nic_count = len(nics)
319     for idx, (ip, bridge, mac) in enumerate(nics):
320       if ip is None:
321         ip = ""
322       env["INSTANCE_NIC%d_IP" % idx] = ip
323       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
324       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
325   else:
326     nic_count = 0
327
328   env["INSTANCE_NIC_COUNT"] = nic_count
329
330   return env
331
332
333 def _BuildInstanceHookEnvByObject(instance, override=None):
334   """Builds instance related env variables for hooks from an object.
335
336   Args:
337     instance: objects.Instance object of instance
338     override: dict of values to override
339   """
340   args = {
341     'name': instance.name,
342     'primary_node': instance.primary_node,
343     'secondary_nodes': instance.secondary_nodes,
344     'os_type': instance.os,
345     'status': instance.os,
346     'memory': instance.memory,
347     'vcpus': instance.vcpus,
348     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
349   }
350   if override:
351     args.update(override)
352   return _BuildInstanceHookEnv(**args)
353
354
355 def _CheckInstanceBridgesExist(instance):
356   """Check that the brigdes needed by an instance exist.
357
358   """
359   # check bridges existance
360   brlist = [nic.bridge for nic in instance.nics]
361   if not rpc.call_bridges_exist(instance.primary_node, brlist):
362     raise errors.OpPrereqError("one or more target bridges %s does not"
363                                " exist on destination node '%s'" %
364                                (brlist, instance.primary_node))
365
366
367 class LUDestroyCluster(NoHooksLU):
368   """Logical unit for destroying the cluster.
369
370   """
371   _OP_REQP = []
372
373   def CheckPrereq(self):
374     """Check prerequisites.
375
376     This checks whether the cluster is empty.
377
378     Any errors are signalled by raising errors.OpPrereqError.
379
380     """
381     master = self.sstore.GetMasterNode()
382
383     nodelist = self.cfg.GetNodeList()
384     if len(nodelist) != 1 or nodelist[0] != master:
385       raise errors.OpPrereqError("There are still %d node(s) in"
386                                  " this cluster." % (len(nodelist) - 1))
387     instancelist = self.cfg.GetInstanceList()
388     if instancelist:
389       raise errors.OpPrereqError("There are still %d instance(s) in"
390                                  " this cluster." % len(instancelist))
391
392   def Exec(self, feedback_fn):
393     """Destroys the cluster.
394
395     """
396     master = self.sstore.GetMasterNode()
397     if not rpc.call_node_stop_master(master):
398       raise errors.OpExecError("Could not disable the master role")
399     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
400     utils.CreateBackup(priv_key)
401     utils.CreateBackup(pub_key)
402     rpc.call_node_leave_cluster(master)
403
404
405 class LUVerifyCluster(LogicalUnit):
406   """Verifies the cluster status.
407
408   """
409   HPATH = "cluster-verify"
410   HTYPE = constants.HTYPE_CLUSTER
411   _OP_REQP = ["skip_checks"]
412
413   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
414                   remote_version, feedback_fn):
415     """Run multiple tests against a node.
416
417     Test list:
418       - compares ganeti version
419       - checks vg existance and size > 20G
420       - checks config file checksum
421       - checks ssh to other nodes
422
423     Args:
424       node: name of the node to check
425       file_list: required list of files
426       local_cksum: dictionary of local files and their checksums
427
428     """
429     # compares ganeti version
430     local_version = constants.PROTOCOL_VERSION
431     if not remote_version:
432       feedback_fn("  - ERROR: connection to %s failed" % (node))
433       return True
434
435     if local_version != remote_version:
436       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
437                       (local_version, node, remote_version))
438       return True
439
440     # checks vg existance and size > 20G
441
442     bad = False
443     if not vglist:
444       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
445                       (node,))
446       bad = True
447     else:
448       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
449                                             constants.MIN_VG_SIZE)
450       if vgstatus:
451         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
452         bad = True
453
454     # checks config file checksum
455     # checks ssh to any
456
457     if 'filelist' not in node_result:
458       bad = True
459       feedback_fn("  - ERROR: node hasn't returned file checksum data")
460     else:
461       remote_cksum = node_result['filelist']
462       for file_name in file_list:
463         if file_name not in remote_cksum:
464           bad = True
465           feedback_fn("  - ERROR: file '%s' missing" % file_name)
466         elif remote_cksum[file_name] != local_cksum[file_name]:
467           bad = True
468           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
469
470     if 'nodelist' not in node_result:
471       bad = True
472       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
473     else:
474       if node_result['nodelist']:
475         bad = True
476         for node in node_result['nodelist']:
477           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
478                           (node, node_result['nodelist'][node]))
479     if 'node-net-test' not in node_result:
480       bad = True
481       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
482     else:
483       if node_result['node-net-test']:
484         bad = True
485         nlist = utils.NiceSort(node_result['node-net-test'].keys())
486         for node in nlist:
487           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
488                           (node, node_result['node-net-test'][node]))
489
490     hyp_result = node_result.get('hypervisor', None)
491     if hyp_result is not None:
492       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
493     return bad
494
495   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
496                       node_instance, feedback_fn):
497     """Verify an instance.
498
499     This function checks to see if the required block devices are
500     available on the instance's node.
501
502     """
503     bad = False
504
505     node_current = instanceconfig.primary_node
506
507     node_vol_should = {}
508     instanceconfig.MapLVsByNode(node_vol_should)
509
510     for node in node_vol_should:
511       for volume in node_vol_should[node]:
512         if node not in node_vol_is or volume not in node_vol_is[node]:
513           feedback_fn("  - ERROR: volume %s missing on node %s" %
514                           (volume, node))
515           bad = True
516
517     if not instanceconfig.status == 'down':
518       if (node_current not in node_instance or
519           not instance in node_instance[node_current]):
520         feedback_fn("  - ERROR: instance %s not running on node %s" %
521                         (instance, node_current))
522         bad = True
523
524     for node in node_instance:
525       if (not node == node_current):
526         if instance in node_instance[node]:
527           feedback_fn("  - ERROR: instance %s should not run on node %s" %
528                           (instance, node))
529           bad = True
530
531     return bad
532
533   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
534     """Verify if there are any unknown volumes in the cluster.
535
536     The .os, .swap and backup volumes are ignored. All other volumes are
537     reported as unknown.
538
539     """
540     bad = False
541
542     for node in node_vol_is:
543       for volume in node_vol_is[node]:
544         if node not in node_vol_should or volume not in node_vol_should[node]:
545           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
546                       (volume, node))
547           bad = True
548     return bad
549
550   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
551     """Verify the list of running instances.
552
553     This checks what instances are running but unknown to the cluster.
554
555     """
556     bad = False
557     for node in node_instance:
558       for runninginstance in node_instance[node]:
559         if runninginstance not in instancelist:
560           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
561                           (runninginstance, node))
562           bad = True
563     return bad
564
565   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
566     """Verify N+1 Memory Resilience.
567
568     Check that if one single node dies we can still start all the instances it
569     was primary for.
570
571     """
572     bad = False
573
574     for node, nodeinfo in node_info.iteritems():
575       # This code checks that every node which is now listed as secondary has
576       # enough memory to host all instances it is supposed to should a single
577       # other node in the cluster fail.
578       # FIXME: not ready for failover to an arbitrary node
579       # FIXME: does not support file-backed instances
580       # WARNING: we currently take into account down instances as well as up
581       # ones, considering that even if they're down someone might want to start
582       # them even in the event of a node failure.
583       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
584         needed_mem = 0
585         for instance in instances:
586           needed_mem += instance_cfg[instance].memory
587         if nodeinfo['mfree'] < needed_mem:
588           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
589                       " failovers should node %s fail" % (node, prinode))
590           bad = True
591     return bad
592
593   def CheckPrereq(self):
594     """Check prerequisites.
595
596     Transform the list of checks we're going to skip into a set and check that
597     all its members are valid.
598
599     """
600     self.skip_set = frozenset(self.op.skip_checks)
601     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
602       raise errors.OpPrereqError("Invalid checks to be skipped specified")
603
604   def BuildHooksEnv(self):
605     """Build hooks env.
606
607     Cluster-Verify hooks just rone in the post phase and their failure makes
608     the output be logged in the verify output and the verification to fail.
609
610     """
611     all_nodes = self.cfg.GetNodeList()
612     # TODO: populate the environment with useful information for verify hooks
613     env = {}
614     return env, [], all_nodes
615
616   def Exec(self, feedback_fn):
617     """Verify integrity of cluster, performing various test on nodes.
618
619     """
620     bad = False
621     feedback_fn("* Verifying global settings")
622     for msg in self.cfg.VerifyConfig():
623       feedback_fn("  - ERROR: %s" % msg)
624
625     vg_name = self.cfg.GetVGName()
626     nodelist = utils.NiceSort(self.cfg.GetNodeList())
627     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
628     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
629     i_non_redundant = [] # Non redundant instances
630     node_volume = {}
631     node_instance = {}
632     node_info = {}
633     instance_cfg = {}
634
635     # FIXME: verify OS list
636     # do local checksums
637     file_names = list(self.sstore.GetFileList())
638     file_names.append(constants.SSL_CERT_FILE)
639     file_names.append(constants.CLUSTER_CONF_FILE)
640     local_checksums = utils.FingerprintFiles(file_names)
641
642     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
643     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
644     all_instanceinfo = rpc.call_instance_list(nodelist)
645     all_vglist = rpc.call_vg_list(nodelist)
646     node_verify_param = {
647       'filelist': file_names,
648       'nodelist': nodelist,
649       'hypervisor': None,
650       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
651                         for node in nodeinfo]
652       }
653     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
654     all_rversion = rpc.call_version(nodelist)
655     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
656
657     for node in nodelist:
658       feedback_fn("* Verifying node %s" % node)
659       result = self._VerifyNode(node, file_names, local_checksums,
660                                 all_vglist[node], all_nvinfo[node],
661                                 all_rversion[node], feedback_fn)
662       bad = bad or result
663
664       # node_volume
665       volumeinfo = all_volumeinfo[node]
666
667       if isinstance(volumeinfo, basestring):
668         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
669                     (node, volumeinfo[-400:].encode('string_escape')))
670         bad = True
671         node_volume[node] = {}
672       elif not isinstance(volumeinfo, dict):
673         feedback_fn("  - ERROR: connection to %s failed" % (node,))
674         bad = True
675         continue
676       else:
677         node_volume[node] = volumeinfo
678
679       # node_instance
680       nodeinstance = all_instanceinfo[node]
681       if type(nodeinstance) != list:
682         feedback_fn("  - ERROR: connection to %s failed" % (node,))
683         bad = True
684         continue
685
686       node_instance[node] = nodeinstance
687
688       # node_info
689       nodeinfo = all_ninfo[node]
690       if not isinstance(nodeinfo, dict):
691         feedback_fn("  - ERROR: connection to %s failed" % (node,))
692         bad = True
693         continue
694
695       try:
696         node_info[node] = {
697           "mfree": int(nodeinfo['memory_free']),
698           "dfree": int(nodeinfo['vg_free']),
699           "pinst": [],
700           "sinst": [],
701           # dictionary holding all instances this node is secondary for,
702           # grouped by their primary node. Each key is a cluster node, and each
703           # value is a list of instances which have the key as primary and the
704           # current node as secondary.  this is handy to calculate N+1 memory
705           # availability if you can only failover from a primary to its
706           # secondary.
707           "sinst-by-pnode": {},
708         }
709       except ValueError:
710         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
711         bad = True
712         continue
713
714     node_vol_should = {}
715
716     for instance in instancelist:
717       feedback_fn("* Verifying instance %s" % instance)
718       inst_config = self.cfg.GetInstanceInfo(instance)
719       result =  self._VerifyInstance(instance, inst_config, node_volume,
720                                      node_instance, feedback_fn)
721       bad = bad or result
722
723       inst_config.MapLVsByNode(node_vol_should)
724
725       instance_cfg[instance] = inst_config
726
727       pnode = inst_config.primary_node
728       if pnode in node_info:
729         node_info[pnode]['pinst'].append(instance)
730       else:
731         feedback_fn("  - ERROR: instance %s, connection to primary node"
732                     " %s failed" % (instance, pnode))
733         bad = True
734
735       # If the instance is non-redundant we cannot survive losing its primary
736       # node, so we are not N+1 compliant. On the other hand we have no disk
737       # templates with more than one secondary so that situation is not well
738       # supported either.
739       # FIXME: does not support file-backed instances
740       if len(inst_config.secondary_nodes) == 0:
741         i_non_redundant.append(instance)
742       elif len(inst_config.secondary_nodes) > 1:
743         feedback_fn("  - WARNING: multiple secondaries for instance %s"
744                     % instance)
745
746       for snode in inst_config.secondary_nodes:
747         if snode in node_info:
748           node_info[snode]['sinst'].append(instance)
749           if pnode not in node_info[snode]['sinst-by-pnode']:
750             node_info[snode]['sinst-by-pnode'][pnode] = []
751           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
752         else:
753           feedback_fn("  - ERROR: instance %s, connection to secondary node"
754                       " %s failed" % (instance, snode))
755
756     feedback_fn("* Verifying orphan volumes")
757     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
758                                        feedback_fn)
759     bad = bad or result
760
761     feedback_fn("* Verifying remaining instances")
762     result = self._VerifyOrphanInstances(instancelist, node_instance,
763                                          feedback_fn)
764     bad = bad or result
765
766     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
767       feedback_fn("* Verifying N+1 Memory redundancy")
768       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
769       bad = bad or result
770
771     feedback_fn("* Other Notes")
772     if i_non_redundant:
773       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
774                   % len(i_non_redundant))
775
776     return int(bad)
777
778   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
779     """Analize the post-hooks' result, handle it, and send some
780     nicely-formatted feedback back to the user.
781
782     Args:
783       phase: the hooks phase that has just been run
784       hooks_results: the results of the multi-node hooks rpc call
785       feedback_fn: function to send feedback back to the caller
786       lu_result: previous Exec result
787
788     """
789     # We only really run POST phase hooks, and are only interested in their results
790     if phase == constants.HOOKS_PHASE_POST:
791       # Used to change hooks' output to proper indentation
792       indent_re = re.compile('^', re.M)
793       feedback_fn("* Hooks Results")
794       if not hooks_results:
795         feedback_fn("  - ERROR: general communication failure")
796         lu_result = 1
797       else:
798         for node_name in hooks_results:
799           show_node_header = True
800           res = hooks_results[node_name]
801           if res is False or not isinstance(res, list):
802             feedback_fn("    Communication failure")
803             lu_result = 1
804             continue
805           for script, hkr, output in res:
806             if hkr == constants.HKR_FAIL:
807               # The node header is only shown once, if there are
808               # failing hooks on that node
809               if show_node_header:
810                 feedback_fn("  Node %s:" % node_name)
811                 show_node_header = False
812               feedback_fn("    ERROR: Script %s failed, output:" % script)
813               output = indent_re.sub('      ', output)
814               feedback_fn("%s" % output)
815               lu_result = 1
816
817       return lu_result
818
819
820 class LUVerifyDisks(NoHooksLU):
821   """Verifies the cluster disks status.
822
823   """
824   _OP_REQP = []
825
826   def CheckPrereq(self):
827     """Check prerequisites.
828
829     This has no prerequisites.
830
831     """
832     pass
833
834   def Exec(self, feedback_fn):
835     """Verify integrity of cluster disks.
836
837     """
838     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
839
840     vg_name = self.cfg.GetVGName()
841     nodes = utils.NiceSort(self.cfg.GetNodeList())
842     instances = [self.cfg.GetInstanceInfo(name)
843                  for name in self.cfg.GetInstanceList()]
844
845     nv_dict = {}
846     for inst in instances:
847       inst_lvs = {}
848       if (inst.status != "up" or
849           inst.disk_template not in constants.DTS_NET_MIRROR):
850         continue
851       inst.MapLVsByNode(inst_lvs)
852       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
853       for node, vol_list in inst_lvs.iteritems():
854         for vol in vol_list:
855           nv_dict[(node, vol)] = inst
856
857     if not nv_dict:
858       return result
859
860     node_lvs = rpc.call_volume_list(nodes, vg_name)
861
862     to_act = set()
863     for node in nodes:
864       # node_volume
865       lvs = node_lvs[node]
866
867       if isinstance(lvs, basestring):
868         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
869         res_nlvm[node] = lvs
870       elif not isinstance(lvs, dict):
871         logger.Info("connection to node %s failed or invalid data returned" %
872                     (node,))
873         res_nodes.append(node)
874         continue
875
876       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
877         inst = nv_dict.pop((node, lv_name), None)
878         if (not lv_online and inst is not None
879             and inst.name not in res_instances):
880           res_instances.append(inst.name)
881
882     # any leftover items in nv_dict are missing LVs, let's arrange the
883     # data better
884     for key, inst in nv_dict.iteritems():
885       if inst.name not in res_missing:
886         res_missing[inst.name] = []
887       res_missing[inst.name].append(key)
888
889     return result
890
891
892 class LURenameCluster(LogicalUnit):
893   """Rename the cluster.
894
895   """
896   HPATH = "cluster-rename"
897   HTYPE = constants.HTYPE_CLUSTER
898   _OP_REQP = ["name"]
899   REQ_WSSTORE = True
900
901   def BuildHooksEnv(self):
902     """Build hooks env.
903
904     """
905     env = {
906       "OP_TARGET": self.sstore.GetClusterName(),
907       "NEW_NAME": self.op.name,
908       }
909     mn = self.sstore.GetMasterNode()
910     return env, [mn], [mn]
911
912   def CheckPrereq(self):
913     """Verify that the passed name is a valid one.
914
915     """
916     hostname = utils.HostInfo(self.op.name)
917
918     new_name = hostname.name
919     self.ip = new_ip = hostname.ip
920     old_name = self.sstore.GetClusterName()
921     old_ip = self.sstore.GetMasterIP()
922     if new_name == old_name and new_ip == old_ip:
923       raise errors.OpPrereqError("Neither the name nor the IP address of the"
924                                  " cluster has changed")
925     if new_ip != old_ip:
926       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
927         raise errors.OpPrereqError("The given cluster IP address (%s) is"
928                                    " reachable on the network. Aborting." %
929                                    new_ip)
930
931     self.op.name = new_name
932
933   def Exec(self, feedback_fn):
934     """Rename the cluster.
935
936     """
937     clustername = self.op.name
938     ip = self.ip
939     ss = self.sstore
940
941     # shutdown the master IP
942     master = ss.GetMasterNode()
943     if not rpc.call_node_stop_master(master):
944       raise errors.OpExecError("Could not disable the master role")
945
946     try:
947       # modify the sstore
948       ss.SetKey(ss.SS_MASTER_IP, ip)
949       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
950
951       # Distribute updated ss config to all nodes
952       myself = self.cfg.GetNodeInfo(master)
953       dist_nodes = self.cfg.GetNodeList()
954       if myself.name in dist_nodes:
955         dist_nodes.remove(myself.name)
956
957       logger.Debug("Copying updated ssconf data to all nodes")
958       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
959         fname = ss.KeyToFilename(keyname)
960         result = rpc.call_upload_file(dist_nodes, fname)
961         for to_node in dist_nodes:
962           if not result[to_node]:
963             logger.Error("copy of file %s to node %s failed" %
964                          (fname, to_node))
965     finally:
966       if not rpc.call_node_start_master(master):
967         logger.Error("Could not re-enable the master role on the master,"
968                      " please restart manually.")
969
970
971 def _RecursiveCheckIfLVMBased(disk):
972   """Check if the given disk or its children are lvm-based.
973
974   Args:
975     disk: ganeti.objects.Disk object
976
977   Returns:
978     boolean indicating whether a LD_LV dev_type was found or not
979
980   """
981   if disk.children:
982     for chdisk in disk.children:
983       if _RecursiveCheckIfLVMBased(chdisk):
984         return True
985   return disk.dev_type == constants.LD_LV
986
987
988 class LUSetClusterParams(LogicalUnit):
989   """Change the parameters of the cluster.
990
991   """
992   HPATH = "cluster-modify"
993   HTYPE = constants.HTYPE_CLUSTER
994   _OP_REQP = []
995
996   def BuildHooksEnv(self):
997     """Build hooks env.
998
999     """
1000     env = {
1001       "OP_TARGET": self.sstore.GetClusterName(),
1002       "NEW_VG_NAME": self.op.vg_name,
1003       }
1004     mn = self.sstore.GetMasterNode()
1005     return env, [mn], [mn]
1006
1007   def CheckPrereq(self):
1008     """Check prerequisites.
1009
1010     This checks whether the given params don't conflict and
1011     if the given volume group is valid.
1012
1013     """
1014     if not self.op.vg_name:
1015       instances = [self.cfg.GetInstanceInfo(name)
1016                    for name in self.cfg.GetInstanceList()]
1017       for inst in instances:
1018         for disk in inst.disks:
1019           if _RecursiveCheckIfLVMBased(disk):
1020             raise errors.OpPrereqError("Cannot disable lvm storage while"
1021                                        " lvm-based instances exist")
1022
1023     # if vg_name not None, checks given volume group on all nodes
1024     if self.op.vg_name:
1025       node_list = self.cfg.GetNodeList()
1026       vglist = rpc.call_vg_list(node_list)
1027       for node in node_list:
1028         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1029                                               constants.MIN_VG_SIZE)
1030         if vgstatus:
1031           raise errors.OpPrereqError("Error on node '%s': %s" %
1032                                      (node, vgstatus))
1033
1034   def Exec(self, feedback_fn):
1035     """Change the parameters of the cluster.
1036
1037     """
1038     if self.op.vg_name != self.cfg.GetVGName():
1039       self.cfg.SetVGName(self.op.vg_name)
1040     else:
1041       feedback_fn("Cluster LVM configuration already in desired"
1042                   " state, not changing")
1043
1044
1045 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1046   """Sleep and poll for an instance's disk to sync.
1047
1048   """
1049   if not instance.disks:
1050     return True
1051
1052   if not oneshot:
1053     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1054
1055   node = instance.primary_node
1056
1057   for dev in instance.disks:
1058     cfgw.SetDiskID(dev, node)
1059
1060   retries = 0
1061   while True:
1062     max_time = 0
1063     done = True
1064     cumul_degraded = False
1065     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1066     if not rstats:
1067       proc.LogWarning("Can't get any data from node %s" % node)
1068       retries += 1
1069       if retries >= 10:
1070         raise errors.RemoteError("Can't contact node %s for mirror data,"
1071                                  " aborting." % node)
1072       time.sleep(6)
1073       continue
1074     retries = 0
1075     for i in range(len(rstats)):
1076       mstat = rstats[i]
1077       if mstat is None:
1078         proc.LogWarning("Can't compute data for node %s/%s" %
1079                         (node, instance.disks[i].iv_name))
1080         continue
1081       # we ignore the ldisk parameter
1082       perc_done, est_time, is_degraded, _ = mstat
1083       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1084       if perc_done is not None:
1085         done = False
1086         if est_time is not None:
1087           rem_time = "%d estimated seconds remaining" % est_time
1088           max_time = est_time
1089         else:
1090           rem_time = "no time estimate"
1091         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1092                      (instance.disks[i].iv_name, perc_done, rem_time))
1093     if done or oneshot:
1094       break
1095
1096     time.sleep(min(60, max_time))
1097
1098   if done:
1099     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1100   return not cumul_degraded
1101
1102
1103 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1104   """Check that mirrors are not degraded.
1105
1106   The ldisk parameter, if True, will change the test from the
1107   is_degraded attribute (which represents overall non-ok status for
1108   the device(s)) to the ldisk (representing the local storage status).
1109
1110   """
1111   cfgw.SetDiskID(dev, node)
1112   if ldisk:
1113     idx = 6
1114   else:
1115     idx = 5
1116
1117   result = True
1118   if on_primary or dev.AssembleOnSecondary():
1119     rstats = rpc.call_blockdev_find(node, dev)
1120     if not rstats:
1121       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1122       result = False
1123     else:
1124       result = result and (not rstats[idx])
1125   if dev.children:
1126     for child in dev.children:
1127       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1128
1129   return result
1130
1131
1132 class LUDiagnoseOS(NoHooksLU):
1133   """Logical unit for OS diagnose/query.
1134
1135   """
1136   _OP_REQP = ["output_fields", "names"]
1137
1138   def CheckPrereq(self):
1139     """Check prerequisites.
1140
1141     This always succeeds, since this is a pure query LU.
1142
1143     """
1144     if self.op.names:
1145       raise errors.OpPrereqError("Selective OS query not supported")
1146
1147     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1148     _CheckOutputFields(static=[],
1149                        dynamic=self.dynamic_fields,
1150                        selected=self.op.output_fields)
1151
1152   @staticmethod
1153   def _DiagnoseByOS(node_list, rlist):
1154     """Remaps a per-node return list into an a per-os per-node dictionary
1155
1156       Args:
1157         node_list: a list with the names of all nodes
1158         rlist: a map with node names as keys and OS objects as values
1159
1160       Returns:
1161         map: a map with osnames as keys and as value another map, with
1162              nodes as
1163              keys and list of OS objects as values
1164              e.g. {"debian-etch": {"node1": [<object>,...],
1165                                    "node2": [<object>,]}
1166                   }
1167
1168     """
1169     all_os = {}
1170     for node_name, nr in rlist.iteritems():
1171       if not nr:
1172         continue
1173       for os_obj in nr:
1174         if os_obj.name not in all_os:
1175           # build a list of nodes for this os containing empty lists
1176           # for each node in node_list
1177           all_os[os_obj.name] = {}
1178           for nname in node_list:
1179             all_os[os_obj.name][nname] = []
1180         all_os[os_obj.name][node_name].append(os_obj)
1181     return all_os
1182
1183   def Exec(self, feedback_fn):
1184     """Compute the list of OSes.
1185
1186     """
1187     node_list = self.cfg.GetNodeList()
1188     node_data = rpc.call_os_diagnose(node_list)
1189     if node_data == False:
1190       raise errors.OpExecError("Can't gather the list of OSes")
1191     pol = self._DiagnoseByOS(node_list, node_data)
1192     output = []
1193     for os_name, os_data in pol.iteritems():
1194       row = []
1195       for field in self.op.output_fields:
1196         if field == "name":
1197           val = os_name
1198         elif field == "valid":
1199           val = utils.all([osl and osl[0] for osl in os_data.values()])
1200         elif field == "node_status":
1201           val = {}
1202           for node_name, nos_list in os_data.iteritems():
1203             val[node_name] = [(v.status, v.path) for v in nos_list]
1204         else:
1205           raise errors.ParameterError(field)
1206         row.append(val)
1207       output.append(row)
1208
1209     return output
1210
1211
1212 class LURemoveNode(LogicalUnit):
1213   """Logical unit for removing a node.
1214
1215   """
1216   HPATH = "node-remove"
1217   HTYPE = constants.HTYPE_NODE
1218   _OP_REQP = ["node_name"]
1219
1220   def BuildHooksEnv(self):
1221     """Build hooks env.
1222
1223     This doesn't run on the target node in the pre phase as a failed
1224     node would then be impossible to remove.
1225
1226     """
1227     env = {
1228       "OP_TARGET": self.op.node_name,
1229       "NODE_NAME": self.op.node_name,
1230       }
1231     all_nodes = self.cfg.GetNodeList()
1232     all_nodes.remove(self.op.node_name)
1233     return env, all_nodes, all_nodes
1234
1235   def CheckPrereq(self):
1236     """Check prerequisites.
1237
1238     This checks:
1239      - the node exists in the configuration
1240      - it does not have primary or secondary instances
1241      - it's not the master
1242
1243     Any errors are signalled by raising errors.OpPrereqError.
1244
1245     """
1246     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1247     if node is None:
1248       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1249
1250     instance_list = self.cfg.GetInstanceList()
1251
1252     masternode = self.sstore.GetMasterNode()
1253     if node.name == masternode:
1254       raise errors.OpPrereqError("Node is the master node,"
1255                                  " you need to failover first.")
1256
1257     for instance_name in instance_list:
1258       instance = self.cfg.GetInstanceInfo(instance_name)
1259       if node.name == instance.primary_node:
1260         raise errors.OpPrereqError("Instance %s still running on the node,"
1261                                    " please remove first." % instance_name)
1262       if node.name in instance.secondary_nodes:
1263         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1264                                    " please remove first." % instance_name)
1265     self.op.node_name = node.name
1266     self.node = node
1267
1268   def Exec(self, feedback_fn):
1269     """Removes the node from the cluster.
1270
1271     """
1272     node = self.node
1273     logger.Info("stopping the node daemon and removing configs from node %s" %
1274                 node.name)
1275
1276     rpc.call_node_leave_cluster(node.name)
1277
1278     logger.Info("Removing node %s from config" % node.name)
1279
1280     self.cfg.RemoveNode(node.name)
1281     # Remove the node from the Ganeti Lock Manager
1282     self.context.glm.remove(locking.LEVEL_NODE, node.name)
1283
1284     utils.RemoveHostFromEtcHosts(node.name)
1285
1286
1287 class LUQueryNodes(NoHooksLU):
1288   """Logical unit for querying nodes.
1289
1290   """
1291   _OP_REQP = ["output_fields", "names"]
1292
1293   def CheckPrereq(self):
1294     """Check prerequisites.
1295
1296     This checks that the fields required are valid output fields.
1297
1298     """
1299     self.dynamic_fields = frozenset([
1300       "dtotal", "dfree",
1301       "mtotal", "mnode", "mfree",
1302       "bootid",
1303       "ctotal",
1304       ])
1305
1306     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1307                                "pinst_list", "sinst_list",
1308                                "pip", "sip", "tags"],
1309                        dynamic=self.dynamic_fields,
1310                        selected=self.op.output_fields)
1311
1312     self.wanted = _GetWantedNodes(self, self.op.names)
1313
1314   def Exec(self, feedback_fn):
1315     """Computes the list of nodes and their attributes.
1316
1317     """
1318     nodenames = self.wanted
1319     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1320
1321     # begin data gathering
1322
1323     if self.dynamic_fields.intersection(self.op.output_fields):
1324       live_data = {}
1325       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1326       for name in nodenames:
1327         nodeinfo = node_data.get(name, None)
1328         if nodeinfo:
1329           live_data[name] = {
1330             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1331             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1332             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1333             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1334             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1335             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1336             "bootid": nodeinfo['bootid'],
1337             }
1338         else:
1339           live_data[name] = {}
1340     else:
1341       live_data = dict.fromkeys(nodenames, {})
1342
1343     node_to_primary = dict([(name, set()) for name in nodenames])
1344     node_to_secondary = dict([(name, set()) for name in nodenames])
1345
1346     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1347                              "sinst_cnt", "sinst_list"))
1348     if inst_fields & frozenset(self.op.output_fields):
1349       instancelist = self.cfg.GetInstanceList()
1350
1351       for instance_name in instancelist:
1352         inst = self.cfg.GetInstanceInfo(instance_name)
1353         if inst.primary_node in node_to_primary:
1354           node_to_primary[inst.primary_node].add(inst.name)
1355         for secnode in inst.secondary_nodes:
1356           if secnode in node_to_secondary:
1357             node_to_secondary[secnode].add(inst.name)
1358
1359     # end data gathering
1360
1361     output = []
1362     for node in nodelist:
1363       node_output = []
1364       for field in self.op.output_fields:
1365         if field == "name":
1366           val = node.name
1367         elif field == "pinst_list":
1368           val = list(node_to_primary[node.name])
1369         elif field == "sinst_list":
1370           val = list(node_to_secondary[node.name])
1371         elif field == "pinst_cnt":
1372           val = len(node_to_primary[node.name])
1373         elif field == "sinst_cnt":
1374           val = len(node_to_secondary[node.name])
1375         elif field == "pip":
1376           val = node.primary_ip
1377         elif field == "sip":
1378           val = node.secondary_ip
1379         elif field == "tags":
1380           val = list(node.GetTags())
1381         elif field in self.dynamic_fields:
1382           val = live_data[node.name].get(field, None)
1383         else:
1384           raise errors.ParameterError(field)
1385         node_output.append(val)
1386       output.append(node_output)
1387
1388     return output
1389
1390
1391 class LUQueryNodeVolumes(NoHooksLU):
1392   """Logical unit for getting volumes on node(s).
1393
1394   """
1395   _OP_REQP = ["nodes", "output_fields"]
1396
1397   def CheckPrereq(self):
1398     """Check prerequisites.
1399
1400     This checks that the fields required are valid output fields.
1401
1402     """
1403     self.nodes = _GetWantedNodes(self, self.op.nodes)
1404
1405     _CheckOutputFields(static=["node"],
1406                        dynamic=["phys", "vg", "name", "size", "instance"],
1407                        selected=self.op.output_fields)
1408
1409
1410   def Exec(self, feedback_fn):
1411     """Computes the list of nodes and their attributes.
1412
1413     """
1414     nodenames = self.nodes
1415     volumes = rpc.call_node_volumes(nodenames)
1416
1417     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1418              in self.cfg.GetInstanceList()]
1419
1420     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1421
1422     output = []
1423     for node in nodenames:
1424       if node not in volumes or not volumes[node]:
1425         continue
1426
1427       node_vols = volumes[node][:]
1428       node_vols.sort(key=lambda vol: vol['dev'])
1429
1430       for vol in node_vols:
1431         node_output = []
1432         for field in self.op.output_fields:
1433           if field == "node":
1434             val = node
1435           elif field == "phys":
1436             val = vol['dev']
1437           elif field == "vg":
1438             val = vol['vg']
1439           elif field == "name":
1440             val = vol['name']
1441           elif field == "size":
1442             val = int(float(vol['size']))
1443           elif field == "instance":
1444             for inst in ilist:
1445               if node not in lv_by_node[inst]:
1446                 continue
1447               if vol['name'] in lv_by_node[inst][node]:
1448                 val = inst.name
1449                 break
1450             else:
1451               val = '-'
1452           else:
1453             raise errors.ParameterError(field)
1454           node_output.append(str(val))
1455
1456         output.append(node_output)
1457
1458     return output
1459
1460
1461 class LUAddNode(LogicalUnit):
1462   """Logical unit for adding node to the cluster.
1463
1464   """
1465   HPATH = "node-add"
1466   HTYPE = constants.HTYPE_NODE
1467   _OP_REQP = ["node_name"]
1468
1469   def BuildHooksEnv(self):
1470     """Build hooks env.
1471
1472     This will run on all nodes before, and on all nodes + the new node after.
1473
1474     """
1475     env = {
1476       "OP_TARGET": self.op.node_name,
1477       "NODE_NAME": self.op.node_name,
1478       "NODE_PIP": self.op.primary_ip,
1479       "NODE_SIP": self.op.secondary_ip,
1480       }
1481     nodes_0 = self.cfg.GetNodeList()
1482     nodes_1 = nodes_0 + [self.op.node_name, ]
1483     return env, nodes_0, nodes_1
1484
1485   def CheckPrereq(self):
1486     """Check prerequisites.
1487
1488     This checks:
1489      - the new node is not already in the config
1490      - it is resolvable
1491      - its parameters (single/dual homed) matches the cluster
1492
1493     Any errors are signalled by raising errors.OpPrereqError.
1494
1495     """
1496     node_name = self.op.node_name
1497     cfg = self.cfg
1498
1499     dns_data = utils.HostInfo(node_name)
1500
1501     node = dns_data.name
1502     primary_ip = self.op.primary_ip = dns_data.ip
1503     secondary_ip = getattr(self.op, "secondary_ip", None)
1504     if secondary_ip is None:
1505       secondary_ip = primary_ip
1506     if not utils.IsValidIP(secondary_ip):
1507       raise errors.OpPrereqError("Invalid secondary IP given")
1508     self.op.secondary_ip = secondary_ip
1509
1510     node_list = cfg.GetNodeList()
1511     if not self.op.readd and node in node_list:
1512       raise errors.OpPrereqError("Node %s is already in the configuration" %
1513                                  node)
1514     elif self.op.readd and node not in node_list:
1515       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1516
1517     for existing_node_name in node_list:
1518       existing_node = cfg.GetNodeInfo(existing_node_name)
1519
1520       if self.op.readd and node == existing_node_name:
1521         if (existing_node.primary_ip != primary_ip or
1522             existing_node.secondary_ip != secondary_ip):
1523           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1524                                      " address configuration as before")
1525         continue
1526
1527       if (existing_node.primary_ip == primary_ip or
1528           existing_node.secondary_ip == primary_ip or
1529           existing_node.primary_ip == secondary_ip or
1530           existing_node.secondary_ip == secondary_ip):
1531         raise errors.OpPrereqError("New node ip address(es) conflict with"
1532                                    " existing node %s" % existing_node.name)
1533
1534     # check that the type of the node (single versus dual homed) is the
1535     # same as for the master
1536     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1537     master_singlehomed = myself.secondary_ip == myself.primary_ip
1538     newbie_singlehomed = secondary_ip == primary_ip
1539     if master_singlehomed != newbie_singlehomed:
1540       if master_singlehomed:
1541         raise errors.OpPrereqError("The master has no private ip but the"
1542                                    " new node has one")
1543       else:
1544         raise errors.OpPrereqError("The master has a private ip but the"
1545                                    " new node doesn't have one")
1546
1547     # checks reachablity
1548     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1549       raise errors.OpPrereqError("Node not reachable by ping")
1550
1551     if not newbie_singlehomed:
1552       # check reachability from my secondary ip to newbie's secondary ip
1553       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1554                            source=myself.secondary_ip):
1555         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1556                                    " based ping to noded port")
1557
1558     self.new_node = objects.Node(name=node,
1559                                  primary_ip=primary_ip,
1560                                  secondary_ip=secondary_ip)
1561
1562   def Exec(self, feedback_fn):
1563     """Adds the new node to the cluster.
1564
1565     """
1566     new_node = self.new_node
1567     node = new_node.name
1568
1569     # check connectivity
1570     result = rpc.call_version([node])[node]
1571     if result:
1572       if constants.PROTOCOL_VERSION == result:
1573         logger.Info("communication to node %s fine, sw version %s match" %
1574                     (node, result))
1575       else:
1576         raise errors.OpExecError("Version mismatch master version %s,"
1577                                  " node version %s" %
1578                                  (constants.PROTOCOL_VERSION, result))
1579     else:
1580       raise errors.OpExecError("Cannot get version from the new node")
1581
1582     # setup ssh on node
1583     logger.Info("copy ssh key to node %s" % node)
1584     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1585     keyarray = []
1586     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1587                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1588                 priv_key, pub_key]
1589
1590     for i in keyfiles:
1591       f = open(i, 'r')
1592       try:
1593         keyarray.append(f.read())
1594       finally:
1595         f.close()
1596
1597     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1598                                keyarray[3], keyarray[4], keyarray[5])
1599
1600     if not result:
1601       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1602
1603     # Add node to our /etc/hosts, and add key to known_hosts
1604     utils.AddHostToEtcHosts(new_node.name)
1605
1606     if new_node.secondary_ip != new_node.primary_ip:
1607       if not rpc.call_node_tcp_ping(new_node.name,
1608                                     constants.LOCALHOST_IP_ADDRESS,
1609                                     new_node.secondary_ip,
1610                                     constants.DEFAULT_NODED_PORT,
1611                                     10, False):
1612         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1613                                  " you gave (%s). Please fix and re-run this"
1614                                  " command." % new_node.secondary_ip)
1615
1616     node_verify_list = [self.sstore.GetMasterNode()]
1617     node_verify_param = {
1618       'nodelist': [node],
1619       # TODO: do a node-net-test as well?
1620     }
1621
1622     result = rpc.call_node_verify(node_verify_list, node_verify_param)
1623     for verifier in node_verify_list:
1624       if not result[verifier]:
1625         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1626                                  " for remote verification" % verifier)
1627       if result[verifier]['nodelist']:
1628         for failed in result[verifier]['nodelist']:
1629           feedback_fn("ssh/hostname verification failed %s -> %s" %
1630                       (verifier, result[verifier]['nodelist'][failed]))
1631         raise errors.OpExecError("ssh/hostname verification failed.")
1632
1633     # Distribute updated /etc/hosts and known_hosts to all nodes,
1634     # including the node just added
1635     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1636     dist_nodes = self.cfg.GetNodeList()
1637     if not self.op.readd:
1638       dist_nodes.append(node)
1639     if myself.name in dist_nodes:
1640       dist_nodes.remove(myself.name)
1641
1642     logger.Debug("Copying hosts and known_hosts to all nodes")
1643     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1644       result = rpc.call_upload_file(dist_nodes, fname)
1645       for to_node in dist_nodes:
1646         if not result[to_node]:
1647           logger.Error("copy of file %s to node %s failed" %
1648                        (fname, to_node))
1649
1650     to_copy = self.sstore.GetFileList()
1651     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1652       to_copy.append(constants.VNC_PASSWORD_FILE)
1653     for fname in to_copy:
1654       result = rpc.call_upload_file([node], fname)
1655       if not result[node]:
1656         logger.Error("could not copy file %s to node %s" % (fname, node))
1657
1658     if not self.op.readd:
1659       logger.Info("adding node %s to cluster.conf" % node)
1660       self.cfg.AddNode(new_node)
1661       # Add the new node to the Ganeti Lock Manager
1662       self.context.glm.add(locking.LEVEL_NODE, node)
1663
1664
1665 class LUMasterFailover(LogicalUnit):
1666   """Failover the master node to the current node.
1667
1668   This is a special LU in that it must run on a non-master node.
1669
1670   """
1671   HPATH = "master-failover"
1672   HTYPE = constants.HTYPE_CLUSTER
1673   REQ_MASTER = False
1674   REQ_WSSTORE = True
1675   _OP_REQP = []
1676
1677   def BuildHooksEnv(self):
1678     """Build hooks env.
1679
1680     This will run on the new master only in the pre phase, and on all
1681     the nodes in the post phase.
1682
1683     """
1684     env = {
1685       "OP_TARGET": self.new_master,
1686       "NEW_MASTER": self.new_master,
1687       "OLD_MASTER": self.old_master,
1688       }
1689     return env, [self.new_master], self.cfg.GetNodeList()
1690
1691   def CheckPrereq(self):
1692     """Check prerequisites.
1693
1694     This checks that we are not already the master.
1695
1696     """
1697     self.new_master = utils.HostInfo().name
1698     self.old_master = self.sstore.GetMasterNode()
1699
1700     if self.old_master == self.new_master:
1701       raise errors.OpPrereqError("This commands must be run on the node"
1702                                  " where you want the new master to be."
1703                                  " %s is already the master" %
1704                                  self.old_master)
1705
1706   def Exec(self, feedback_fn):
1707     """Failover the master node.
1708
1709     This command, when run on a non-master node, will cause the current
1710     master to cease being master, and the non-master to become new
1711     master.
1712
1713     """
1714     #TODO: do not rely on gethostname returning the FQDN
1715     logger.Info("setting master to %s, old master: %s" %
1716                 (self.new_master, self.old_master))
1717
1718     if not rpc.call_node_stop_master(self.old_master):
1719       logger.Error("could disable the master role on the old master"
1720                    " %s, please disable manually" % self.old_master)
1721
1722     ss = self.sstore
1723     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1724     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1725                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1726       logger.Error("could not distribute the new simple store master file"
1727                    " to the other nodes, please check.")
1728
1729     if not rpc.call_node_start_master(self.new_master):
1730       logger.Error("could not start the master role on the new master"
1731                    " %s, please check" % self.new_master)
1732       feedback_fn("Error in activating the master IP on the new master,"
1733                   " please fix manually.")
1734
1735
1736
1737 class LUQueryClusterInfo(NoHooksLU):
1738   """Query cluster configuration.
1739
1740   """
1741   _OP_REQP = []
1742   REQ_MASTER = False
1743
1744   def CheckPrereq(self):
1745     """No prerequsites needed for this LU.
1746
1747     """
1748     pass
1749
1750   def Exec(self, feedback_fn):
1751     """Return cluster config.
1752
1753     """
1754     result = {
1755       "name": self.sstore.GetClusterName(),
1756       "software_version": constants.RELEASE_VERSION,
1757       "protocol_version": constants.PROTOCOL_VERSION,
1758       "config_version": constants.CONFIG_VERSION,
1759       "os_api_version": constants.OS_API_VERSION,
1760       "export_version": constants.EXPORT_VERSION,
1761       "master": self.sstore.GetMasterNode(),
1762       "architecture": (platform.architecture()[0], platform.machine()),
1763       "hypervisor_type": self.sstore.GetHypervisorType(),
1764       }
1765
1766     return result
1767
1768
1769 class LUDumpClusterConfig(NoHooksLU):
1770   """Return a text-representation of the cluster-config.
1771
1772   """
1773   _OP_REQP = []
1774
1775   def CheckPrereq(self):
1776     """No prerequisites.
1777
1778     """
1779     pass
1780
1781   def Exec(self, feedback_fn):
1782     """Dump a representation of the cluster config to the standard output.
1783
1784     """
1785     return self.cfg.DumpConfig()
1786
1787
1788 class LUActivateInstanceDisks(NoHooksLU):
1789   """Bring up an instance's disks.
1790
1791   """
1792   _OP_REQP = ["instance_name"]
1793
1794   def CheckPrereq(self):
1795     """Check prerequisites.
1796
1797     This checks that the instance is in the cluster.
1798
1799     """
1800     instance = self.cfg.GetInstanceInfo(
1801       self.cfg.ExpandInstanceName(self.op.instance_name))
1802     if instance is None:
1803       raise errors.OpPrereqError("Instance '%s' not known" %
1804                                  self.op.instance_name)
1805     self.instance = instance
1806
1807
1808   def Exec(self, feedback_fn):
1809     """Activate the disks.
1810
1811     """
1812     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1813     if not disks_ok:
1814       raise errors.OpExecError("Cannot activate block devices")
1815
1816     return disks_info
1817
1818
1819 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1820   """Prepare the block devices for an instance.
1821
1822   This sets up the block devices on all nodes.
1823
1824   Args:
1825     instance: a ganeti.objects.Instance object
1826     ignore_secondaries: if true, errors on secondary nodes won't result
1827                         in an error return from the function
1828
1829   Returns:
1830     false if the operation failed
1831     list of (host, instance_visible_name, node_visible_name) if the operation
1832          suceeded with the mapping from node devices to instance devices
1833   """
1834   device_info = []
1835   disks_ok = True
1836   iname = instance.name
1837   # With the two passes mechanism we try to reduce the window of
1838   # opportunity for the race condition of switching DRBD to primary
1839   # before handshaking occured, but we do not eliminate it
1840
1841   # The proper fix would be to wait (with some limits) until the
1842   # connection has been made and drbd transitions from WFConnection
1843   # into any other network-connected state (Connected, SyncTarget,
1844   # SyncSource, etc.)
1845
1846   # 1st pass, assemble on all nodes in secondary mode
1847   for inst_disk in instance.disks:
1848     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1849       cfg.SetDiskID(node_disk, node)
1850       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1851       if not result:
1852         logger.Error("could not prepare block device %s on node %s"
1853                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1854         if not ignore_secondaries:
1855           disks_ok = False
1856
1857   # FIXME: race condition on drbd migration to primary
1858
1859   # 2nd pass, do only the primary node
1860   for inst_disk in instance.disks:
1861     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1862       if node != instance.primary_node:
1863         continue
1864       cfg.SetDiskID(node_disk, node)
1865       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1866       if not result:
1867         logger.Error("could not prepare block device %s on node %s"
1868                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1869         disks_ok = False
1870     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1871
1872   # leave the disks configured for the primary node
1873   # this is a workaround that would be fixed better by
1874   # improving the logical/physical id handling
1875   for disk in instance.disks:
1876     cfg.SetDiskID(disk, instance.primary_node)
1877
1878   return disks_ok, device_info
1879
1880
1881 def _StartInstanceDisks(cfg, instance, force):
1882   """Start the disks of an instance.
1883
1884   """
1885   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1886                                            ignore_secondaries=force)
1887   if not disks_ok:
1888     _ShutdownInstanceDisks(instance, cfg)
1889     if force is not None and not force:
1890       logger.Error("If the message above refers to a secondary node,"
1891                    " you can retry the operation using '--force'.")
1892     raise errors.OpExecError("Disk consistency error")
1893
1894
1895 class LUDeactivateInstanceDisks(NoHooksLU):
1896   """Shutdown an instance's disks.
1897
1898   """
1899   _OP_REQP = ["instance_name"]
1900
1901   def CheckPrereq(self):
1902     """Check prerequisites.
1903
1904     This checks that the instance is in the cluster.
1905
1906     """
1907     instance = self.cfg.GetInstanceInfo(
1908       self.cfg.ExpandInstanceName(self.op.instance_name))
1909     if instance is None:
1910       raise errors.OpPrereqError("Instance '%s' not known" %
1911                                  self.op.instance_name)
1912     self.instance = instance
1913
1914   def Exec(self, feedback_fn):
1915     """Deactivate the disks
1916
1917     """
1918     instance = self.instance
1919     ins_l = rpc.call_instance_list([instance.primary_node])
1920     ins_l = ins_l[instance.primary_node]
1921     if not type(ins_l) is list:
1922       raise errors.OpExecError("Can't contact node '%s'" %
1923                                instance.primary_node)
1924
1925     if self.instance.name in ins_l:
1926       raise errors.OpExecError("Instance is running, can't shutdown"
1927                                " block devices.")
1928
1929     _ShutdownInstanceDisks(instance, self.cfg)
1930
1931
1932 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1933   """Shutdown block devices of an instance.
1934
1935   This does the shutdown on all nodes of the instance.
1936
1937   If the ignore_primary is false, errors on the primary node are
1938   ignored.
1939
1940   """
1941   result = True
1942   for disk in instance.disks:
1943     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1944       cfg.SetDiskID(top_disk, node)
1945       if not rpc.call_blockdev_shutdown(node, top_disk):
1946         logger.Error("could not shutdown block device %s on node %s" %
1947                      (disk.iv_name, node))
1948         if not ignore_primary or node != instance.primary_node:
1949           result = False
1950   return result
1951
1952
1953 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1954   """Checks if a node has enough free memory.
1955
1956   This function check if a given node has the needed amount of free
1957   memory. In case the node has less memory or we cannot get the
1958   information from the node, this function raise an OpPrereqError
1959   exception.
1960
1961   Args:
1962     - cfg: a ConfigWriter instance
1963     - node: the node name
1964     - reason: string to use in the error message
1965     - requested: the amount of memory in MiB
1966
1967   """
1968   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1969   if not nodeinfo or not isinstance(nodeinfo, dict):
1970     raise errors.OpPrereqError("Could not contact node %s for resource"
1971                              " information" % (node,))
1972
1973   free_mem = nodeinfo[node].get('memory_free')
1974   if not isinstance(free_mem, int):
1975     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1976                              " was '%s'" % (node, free_mem))
1977   if requested > free_mem:
1978     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1979                              " needed %s MiB, available %s MiB" %
1980                              (node, reason, requested, free_mem))
1981
1982
1983 class LUStartupInstance(LogicalUnit):
1984   """Starts an instance.
1985
1986   """
1987   HPATH = "instance-start"
1988   HTYPE = constants.HTYPE_INSTANCE
1989   _OP_REQP = ["instance_name", "force"]
1990
1991   def BuildHooksEnv(self):
1992     """Build hooks env.
1993
1994     This runs on master, primary and secondary nodes of the instance.
1995
1996     """
1997     env = {
1998       "FORCE": self.op.force,
1999       }
2000     env.update(_BuildInstanceHookEnvByObject(self.instance))
2001     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2002           list(self.instance.secondary_nodes))
2003     return env, nl, nl
2004
2005   def CheckPrereq(self):
2006     """Check prerequisites.
2007
2008     This checks that the instance is in the cluster.
2009
2010     """
2011     instance = self.cfg.GetInstanceInfo(
2012       self.cfg.ExpandInstanceName(self.op.instance_name))
2013     if instance is None:
2014       raise errors.OpPrereqError("Instance '%s' not known" %
2015                                  self.op.instance_name)
2016
2017     # check bridges existance
2018     _CheckInstanceBridgesExist(instance)
2019
2020     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2021                          "starting instance %s" % instance.name,
2022                          instance.memory)
2023
2024     self.instance = instance
2025     self.op.instance_name = instance.name
2026
2027   def Exec(self, feedback_fn):
2028     """Start the instance.
2029
2030     """
2031     instance = self.instance
2032     force = self.op.force
2033     extra_args = getattr(self.op, "extra_args", "")
2034
2035     self.cfg.MarkInstanceUp(instance.name)
2036
2037     node_current = instance.primary_node
2038
2039     _StartInstanceDisks(self.cfg, instance, force)
2040
2041     if not rpc.call_instance_start(node_current, instance, extra_args):
2042       _ShutdownInstanceDisks(instance, self.cfg)
2043       raise errors.OpExecError("Could not start instance")
2044
2045
2046 class LURebootInstance(LogicalUnit):
2047   """Reboot an instance.
2048
2049   """
2050   HPATH = "instance-reboot"
2051   HTYPE = constants.HTYPE_INSTANCE
2052   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2053
2054   def BuildHooksEnv(self):
2055     """Build hooks env.
2056
2057     This runs on master, primary and secondary nodes of the instance.
2058
2059     """
2060     env = {
2061       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2062       }
2063     env.update(_BuildInstanceHookEnvByObject(self.instance))
2064     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2065           list(self.instance.secondary_nodes))
2066     return env, nl, nl
2067
2068   def CheckPrereq(self):
2069     """Check prerequisites.
2070
2071     This checks that the instance is in the cluster.
2072
2073     """
2074     instance = self.cfg.GetInstanceInfo(
2075       self.cfg.ExpandInstanceName(self.op.instance_name))
2076     if instance is None:
2077       raise errors.OpPrereqError("Instance '%s' not known" %
2078                                  self.op.instance_name)
2079
2080     # check bridges existance
2081     _CheckInstanceBridgesExist(instance)
2082
2083     self.instance = instance
2084     self.op.instance_name = instance.name
2085
2086   def Exec(self, feedback_fn):
2087     """Reboot the instance.
2088
2089     """
2090     instance = self.instance
2091     ignore_secondaries = self.op.ignore_secondaries
2092     reboot_type = self.op.reboot_type
2093     extra_args = getattr(self.op, "extra_args", "")
2094
2095     node_current = instance.primary_node
2096
2097     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2098                            constants.INSTANCE_REBOOT_HARD,
2099                            constants.INSTANCE_REBOOT_FULL]:
2100       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2101                                   (constants.INSTANCE_REBOOT_SOFT,
2102                                    constants.INSTANCE_REBOOT_HARD,
2103                                    constants.INSTANCE_REBOOT_FULL))
2104
2105     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2106                        constants.INSTANCE_REBOOT_HARD]:
2107       if not rpc.call_instance_reboot(node_current, instance,
2108                                       reboot_type, extra_args):
2109         raise errors.OpExecError("Could not reboot instance")
2110     else:
2111       if not rpc.call_instance_shutdown(node_current, instance):
2112         raise errors.OpExecError("could not shutdown instance for full reboot")
2113       _ShutdownInstanceDisks(instance, self.cfg)
2114       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2115       if not rpc.call_instance_start(node_current, instance, extra_args):
2116         _ShutdownInstanceDisks(instance, self.cfg)
2117         raise errors.OpExecError("Could not start instance for full reboot")
2118
2119     self.cfg.MarkInstanceUp(instance.name)
2120
2121
2122 class LUShutdownInstance(LogicalUnit):
2123   """Shutdown an instance.
2124
2125   """
2126   HPATH = "instance-stop"
2127   HTYPE = constants.HTYPE_INSTANCE
2128   _OP_REQP = ["instance_name"]
2129
2130   def BuildHooksEnv(self):
2131     """Build hooks env.
2132
2133     This runs on master, primary and secondary nodes of the instance.
2134
2135     """
2136     env = _BuildInstanceHookEnvByObject(self.instance)
2137     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2138           list(self.instance.secondary_nodes))
2139     return env, nl, nl
2140
2141   def CheckPrereq(self):
2142     """Check prerequisites.
2143
2144     This checks that the instance is in the cluster.
2145
2146     """
2147     instance = self.cfg.GetInstanceInfo(
2148       self.cfg.ExpandInstanceName(self.op.instance_name))
2149     if instance is None:
2150       raise errors.OpPrereqError("Instance '%s' not known" %
2151                                  self.op.instance_name)
2152     self.instance = instance
2153
2154   def Exec(self, feedback_fn):
2155     """Shutdown the instance.
2156
2157     """
2158     instance = self.instance
2159     node_current = instance.primary_node
2160     self.cfg.MarkInstanceDown(instance.name)
2161     if not rpc.call_instance_shutdown(node_current, instance):
2162       logger.Error("could not shutdown instance")
2163
2164     _ShutdownInstanceDisks(instance, self.cfg)
2165
2166
2167 class LUReinstallInstance(LogicalUnit):
2168   """Reinstall an instance.
2169
2170   """
2171   HPATH = "instance-reinstall"
2172   HTYPE = constants.HTYPE_INSTANCE
2173   _OP_REQP = ["instance_name"]
2174
2175   def BuildHooksEnv(self):
2176     """Build hooks env.
2177
2178     This runs on master, primary and secondary nodes of the instance.
2179
2180     """
2181     env = _BuildInstanceHookEnvByObject(self.instance)
2182     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2183           list(self.instance.secondary_nodes))
2184     return env, nl, nl
2185
2186   def CheckPrereq(self):
2187     """Check prerequisites.
2188
2189     This checks that the instance is in the cluster and is not running.
2190
2191     """
2192     instance = self.cfg.GetInstanceInfo(
2193       self.cfg.ExpandInstanceName(self.op.instance_name))
2194     if instance is None:
2195       raise errors.OpPrereqError("Instance '%s' not known" %
2196                                  self.op.instance_name)
2197     if instance.disk_template == constants.DT_DISKLESS:
2198       raise errors.OpPrereqError("Instance '%s' has no disks" %
2199                                  self.op.instance_name)
2200     if instance.status != "down":
2201       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2202                                  self.op.instance_name)
2203     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2204     if remote_info:
2205       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2206                                  (self.op.instance_name,
2207                                   instance.primary_node))
2208
2209     self.op.os_type = getattr(self.op, "os_type", None)
2210     if self.op.os_type is not None:
2211       # OS verification
2212       pnode = self.cfg.GetNodeInfo(
2213         self.cfg.ExpandNodeName(instance.primary_node))
2214       if pnode is None:
2215         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2216                                    self.op.pnode)
2217       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2218       if not os_obj:
2219         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2220                                    " primary node"  % self.op.os_type)
2221
2222     self.instance = instance
2223
2224   def Exec(self, feedback_fn):
2225     """Reinstall the instance.
2226
2227     """
2228     inst = self.instance
2229
2230     if self.op.os_type is not None:
2231       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2232       inst.os = self.op.os_type
2233       self.cfg.AddInstance(inst)
2234
2235     _StartInstanceDisks(self.cfg, inst, None)
2236     try:
2237       feedback_fn("Running the instance OS create scripts...")
2238       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2239         raise errors.OpExecError("Could not install OS for instance %s"
2240                                  " on node %s" %
2241                                  (inst.name, inst.primary_node))
2242     finally:
2243       _ShutdownInstanceDisks(inst, self.cfg)
2244
2245
2246 class LURenameInstance(LogicalUnit):
2247   """Rename an instance.
2248
2249   """
2250   HPATH = "instance-rename"
2251   HTYPE = constants.HTYPE_INSTANCE
2252   _OP_REQP = ["instance_name", "new_name"]
2253
2254   def BuildHooksEnv(self):
2255     """Build hooks env.
2256
2257     This runs on master, primary and secondary nodes of the instance.
2258
2259     """
2260     env = _BuildInstanceHookEnvByObject(self.instance)
2261     env["INSTANCE_NEW_NAME"] = self.op.new_name
2262     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2263           list(self.instance.secondary_nodes))
2264     return env, nl, nl
2265
2266   def CheckPrereq(self):
2267     """Check prerequisites.
2268
2269     This checks that the instance is in the cluster and is not running.
2270
2271     """
2272     instance = self.cfg.GetInstanceInfo(
2273       self.cfg.ExpandInstanceName(self.op.instance_name))
2274     if instance is None:
2275       raise errors.OpPrereqError("Instance '%s' not known" %
2276                                  self.op.instance_name)
2277     if instance.status != "down":
2278       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2279                                  self.op.instance_name)
2280     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2281     if remote_info:
2282       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2283                                  (self.op.instance_name,
2284                                   instance.primary_node))
2285     self.instance = instance
2286
2287     # new name verification
2288     name_info = utils.HostInfo(self.op.new_name)
2289
2290     self.op.new_name = new_name = name_info.name
2291     instance_list = self.cfg.GetInstanceList()
2292     if new_name in instance_list:
2293       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2294                                  new_name)
2295
2296     if not getattr(self.op, "ignore_ip", False):
2297       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2298         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2299                                    (name_info.ip, new_name))
2300
2301
2302   def Exec(self, feedback_fn):
2303     """Reinstall the instance.
2304
2305     """
2306     inst = self.instance
2307     old_name = inst.name
2308
2309     if inst.disk_template == constants.DT_FILE:
2310       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2311
2312     self.cfg.RenameInstance(inst.name, self.op.new_name)
2313
2314     # re-read the instance from the configuration after rename
2315     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2316
2317     if inst.disk_template == constants.DT_FILE:
2318       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2319       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2320                                                 old_file_storage_dir,
2321                                                 new_file_storage_dir)
2322
2323       if not result:
2324         raise errors.OpExecError("Could not connect to node '%s' to rename"
2325                                  " directory '%s' to '%s' (but the instance"
2326                                  " has been renamed in Ganeti)" % (
2327                                  inst.primary_node, old_file_storage_dir,
2328                                  new_file_storage_dir))
2329
2330       if not result[0]:
2331         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2332                                  " (but the instance has been renamed in"
2333                                  " Ganeti)" % (old_file_storage_dir,
2334                                                new_file_storage_dir))
2335
2336     _StartInstanceDisks(self.cfg, inst, None)
2337     try:
2338       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2339                                           "sda", "sdb"):
2340         msg = ("Could run OS rename script for instance %s on node %s (but the"
2341                " instance has been renamed in Ganeti)" %
2342                (inst.name, inst.primary_node))
2343         logger.Error(msg)
2344     finally:
2345       _ShutdownInstanceDisks(inst, self.cfg)
2346
2347
2348 class LURemoveInstance(LogicalUnit):
2349   """Remove an instance.
2350
2351   """
2352   HPATH = "instance-remove"
2353   HTYPE = constants.HTYPE_INSTANCE
2354   _OP_REQP = ["instance_name", "ignore_failures"]
2355
2356   def BuildHooksEnv(self):
2357     """Build hooks env.
2358
2359     This runs on master, primary and secondary nodes of the instance.
2360
2361     """
2362     env = _BuildInstanceHookEnvByObject(self.instance)
2363     nl = [self.sstore.GetMasterNode()]
2364     return env, nl, nl
2365
2366   def CheckPrereq(self):
2367     """Check prerequisites.
2368
2369     This checks that the instance is in the cluster.
2370
2371     """
2372     instance = self.cfg.GetInstanceInfo(
2373       self.cfg.ExpandInstanceName(self.op.instance_name))
2374     if instance is None:
2375       raise errors.OpPrereqError("Instance '%s' not known" %
2376                                  self.op.instance_name)
2377     self.instance = instance
2378
2379   def Exec(self, feedback_fn):
2380     """Remove the instance.
2381
2382     """
2383     instance = self.instance
2384     logger.Info("shutting down instance %s on node %s" %
2385                 (instance.name, instance.primary_node))
2386
2387     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2388       if self.op.ignore_failures:
2389         feedback_fn("Warning: can't shutdown instance")
2390       else:
2391         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2392                                  (instance.name, instance.primary_node))
2393
2394     logger.Info("removing block devices for instance %s" % instance.name)
2395
2396     if not _RemoveDisks(instance, self.cfg):
2397       if self.op.ignore_failures:
2398         feedback_fn("Warning: can't remove instance's disks")
2399       else:
2400         raise errors.OpExecError("Can't remove instance's disks")
2401
2402     logger.Info("removing instance %s out of cluster config" % instance.name)
2403
2404     self.cfg.RemoveInstance(instance.name)
2405     # Remove the new instance from the Ganeti Lock Manager
2406     self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2407
2408
2409 class LUQueryInstances(NoHooksLU):
2410   """Logical unit for querying instances.
2411
2412   """
2413   _OP_REQP = ["output_fields", "names"]
2414
2415   def CheckPrereq(self):
2416     """Check prerequisites.
2417
2418     This checks that the fields required are valid output fields.
2419
2420     """
2421     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2422     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2423                                "admin_state", "admin_ram",
2424                                "disk_template", "ip", "mac", "bridge",
2425                                "sda_size", "sdb_size", "vcpus", "tags"],
2426                        dynamic=self.dynamic_fields,
2427                        selected=self.op.output_fields)
2428
2429     self.wanted = _GetWantedInstances(self, self.op.names)
2430
2431   def Exec(self, feedback_fn):
2432     """Computes the list of nodes and their attributes.
2433
2434     """
2435     instance_names = self.wanted
2436     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2437                      in instance_names]
2438
2439     # begin data gathering
2440
2441     nodes = frozenset([inst.primary_node for inst in instance_list])
2442
2443     bad_nodes = []
2444     if self.dynamic_fields.intersection(self.op.output_fields):
2445       live_data = {}
2446       node_data = rpc.call_all_instances_info(nodes)
2447       for name in nodes:
2448         result = node_data[name]
2449         if result:
2450           live_data.update(result)
2451         elif result == False:
2452           bad_nodes.append(name)
2453         # else no instance is alive
2454     else:
2455       live_data = dict([(name, {}) for name in instance_names])
2456
2457     # end data gathering
2458
2459     output = []
2460     for instance in instance_list:
2461       iout = []
2462       for field in self.op.output_fields:
2463         if field == "name":
2464           val = instance.name
2465         elif field == "os":
2466           val = instance.os
2467         elif field == "pnode":
2468           val = instance.primary_node
2469         elif field == "snodes":
2470           val = list(instance.secondary_nodes)
2471         elif field == "admin_state":
2472           val = (instance.status != "down")
2473         elif field == "oper_state":
2474           if instance.primary_node in bad_nodes:
2475             val = None
2476           else:
2477             val = bool(live_data.get(instance.name))
2478         elif field == "status":
2479           if instance.primary_node in bad_nodes:
2480             val = "ERROR_nodedown"
2481           else:
2482             running = bool(live_data.get(instance.name))
2483             if running:
2484               if instance.status != "down":
2485                 val = "running"
2486               else:
2487                 val = "ERROR_up"
2488             else:
2489               if instance.status != "down":
2490                 val = "ERROR_down"
2491               else:
2492                 val = "ADMIN_down"
2493         elif field == "admin_ram":
2494           val = instance.memory
2495         elif field == "oper_ram":
2496           if instance.primary_node in bad_nodes:
2497             val = None
2498           elif instance.name in live_data:
2499             val = live_data[instance.name].get("memory", "?")
2500           else:
2501             val = "-"
2502         elif field == "disk_template":
2503           val = instance.disk_template
2504         elif field == "ip":
2505           val = instance.nics[0].ip
2506         elif field == "bridge":
2507           val = instance.nics[0].bridge
2508         elif field == "mac":
2509           val = instance.nics[0].mac
2510         elif field == "sda_size" or field == "sdb_size":
2511           disk = instance.FindDisk(field[:3])
2512           if disk is None:
2513             val = None
2514           else:
2515             val = disk.size
2516         elif field == "vcpus":
2517           val = instance.vcpus
2518         elif field == "tags":
2519           val = list(instance.GetTags())
2520         else:
2521           raise errors.ParameterError(field)
2522         iout.append(val)
2523       output.append(iout)
2524
2525     return output
2526
2527
2528 class LUFailoverInstance(LogicalUnit):
2529   """Failover an instance.
2530
2531   """
2532   HPATH = "instance-failover"
2533   HTYPE = constants.HTYPE_INSTANCE
2534   _OP_REQP = ["instance_name", "ignore_consistency"]
2535
2536   def BuildHooksEnv(self):
2537     """Build hooks env.
2538
2539     This runs on master, primary and secondary nodes of the instance.
2540
2541     """
2542     env = {
2543       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2544       }
2545     env.update(_BuildInstanceHookEnvByObject(self.instance))
2546     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2547     return env, nl, nl
2548
2549   def CheckPrereq(self):
2550     """Check prerequisites.
2551
2552     This checks that the instance is in the cluster.
2553
2554     """
2555     instance = self.cfg.GetInstanceInfo(
2556       self.cfg.ExpandInstanceName(self.op.instance_name))
2557     if instance is None:
2558       raise errors.OpPrereqError("Instance '%s' not known" %
2559                                  self.op.instance_name)
2560
2561     if instance.disk_template not in constants.DTS_NET_MIRROR:
2562       raise errors.OpPrereqError("Instance's disk layout is not"
2563                                  " network mirrored, cannot failover.")
2564
2565     secondary_nodes = instance.secondary_nodes
2566     if not secondary_nodes:
2567       raise errors.ProgrammerError("no secondary node but using "
2568                                    "a mirrored disk template")
2569
2570     target_node = secondary_nodes[0]
2571     # check memory requirements on the secondary node
2572     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2573                          instance.name, instance.memory)
2574
2575     # check bridge existance
2576     brlist = [nic.bridge for nic in instance.nics]
2577     if not rpc.call_bridges_exist(target_node, brlist):
2578       raise errors.OpPrereqError("One or more target bridges %s does not"
2579                                  " exist on destination node '%s'" %
2580                                  (brlist, target_node))
2581
2582     self.instance = instance
2583
2584   def Exec(self, feedback_fn):
2585     """Failover an instance.
2586
2587     The failover is done by shutting it down on its present node and
2588     starting it on the secondary.
2589
2590     """
2591     instance = self.instance
2592
2593     source_node = instance.primary_node
2594     target_node = instance.secondary_nodes[0]
2595
2596     feedback_fn("* checking disk consistency between source and target")
2597     for dev in instance.disks:
2598       # for drbd, these are drbd over lvm
2599       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2600         if instance.status == "up" and not self.op.ignore_consistency:
2601           raise errors.OpExecError("Disk %s is degraded on target node,"
2602                                    " aborting failover." % dev.iv_name)
2603
2604     feedback_fn("* shutting down instance on source node")
2605     logger.Info("Shutting down instance %s on node %s" %
2606                 (instance.name, source_node))
2607
2608     if not rpc.call_instance_shutdown(source_node, instance):
2609       if self.op.ignore_consistency:
2610         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2611                      " anyway. Please make sure node %s is down"  %
2612                      (instance.name, source_node, source_node))
2613       else:
2614         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2615                                  (instance.name, source_node))
2616
2617     feedback_fn("* deactivating the instance's disks on source node")
2618     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2619       raise errors.OpExecError("Can't shut down the instance's disks.")
2620
2621     instance.primary_node = target_node
2622     # distribute new instance config to the other nodes
2623     self.cfg.Update(instance)
2624
2625     # Only start the instance if it's marked as up
2626     if instance.status == "up":
2627       feedback_fn("* activating the instance's disks on target node")
2628       logger.Info("Starting instance %s on node %s" %
2629                   (instance.name, target_node))
2630
2631       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2632                                                ignore_secondaries=True)
2633       if not disks_ok:
2634         _ShutdownInstanceDisks(instance, self.cfg)
2635         raise errors.OpExecError("Can't activate the instance's disks")
2636
2637       feedback_fn("* starting the instance on the target node")
2638       if not rpc.call_instance_start(target_node, instance, None):
2639         _ShutdownInstanceDisks(instance, self.cfg)
2640         raise errors.OpExecError("Could not start instance %s on node %s." %
2641                                  (instance.name, target_node))
2642
2643
2644 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2645   """Create a tree of block devices on the primary node.
2646
2647   This always creates all devices.
2648
2649   """
2650   if device.children:
2651     for child in device.children:
2652       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2653         return False
2654
2655   cfg.SetDiskID(device, node)
2656   new_id = rpc.call_blockdev_create(node, device, device.size,
2657                                     instance.name, True, info)
2658   if not new_id:
2659     return False
2660   if device.physical_id is None:
2661     device.physical_id = new_id
2662   return True
2663
2664
2665 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2666   """Create a tree of block devices on a secondary node.
2667
2668   If this device type has to be created on secondaries, create it and
2669   all its children.
2670
2671   If not, just recurse to children keeping the same 'force' value.
2672
2673   """
2674   if device.CreateOnSecondary():
2675     force = True
2676   if device.children:
2677     for child in device.children:
2678       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2679                                         child, force, info):
2680         return False
2681
2682   if not force:
2683     return True
2684   cfg.SetDiskID(device, node)
2685   new_id = rpc.call_blockdev_create(node, device, device.size,
2686                                     instance.name, False, info)
2687   if not new_id:
2688     return False
2689   if device.physical_id is None:
2690     device.physical_id = new_id
2691   return True
2692
2693
2694 def _GenerateUniqueNames(cfg, exts):
2695   """Generate a suitable LV name.
2696
2697   This will generate a logical volume name for the given instance.
2698
2699   """
2700   results = []
2701   for val in exts:
2702     new_id = cfg.GenerateUniqueID()
2703     results.append("%s%s" % (new_id, val))
2704   return results
2705
2706
2707 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2708   """Generate a drbd8 device complete with its children.
2709
2710   """
2711   port = cfg.AllocatePort()
2712   vgname = cfg.GetVGName()
2713   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2714                           logical_id=(vgname, names[0]))
2715   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2716                           logical_id=(vgname, names[1]))
2717   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2718                           logical_id = (primary, secondary, port),
2719                           children = [dev_data, dev_meta],
2720                           iv_name=iv_name)
2721   return drbd_dev
2722
2723
2724 def _GenerateDiskTemplate(cfg, template_name,
2725                           instance_name, primary_node,
2726                           secondary_nodes, disk_sz, swap_sz,
2727                           file_storage_dir, file_driver):
2728   """Generate the entire disk layout for a given template type.
2729
2730   """
2731   #TODO: compute space requirements
2732
2733   vgname = cfg.GetVGName()
2734   if template_name == constants.DT_DISKLESS:
2735     disks = []
2736   elif template_name == constants.DT_PLAIN:
2737     if len(secondary_nodes) != 0:
2738       raise errors.ProgrammerError("Wrong template configuration")
2739
2740     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2741     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2742                            logical_id=(vgname, names[0]),
2743                            iv_name = "sda")
2744     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2745                            logical_id=(vgname, names[1]),
2746                            iv_name = "sdb")
2747     disks = [sda_dev, sdb_dev]
2748   elif template_name == constants.DT_DRBD8:
2749     if len(secondary_nodes) != 1:
2750       raise errors.ProgrammerError("Wrong template configuration")
2751     remote_node = secondary_nodes[0]
2752     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2753                                        ".sdb_data", ".sdb_meta"])
2754     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2755                                          disk_sz, names[0:2], "sda")
2756     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2757                                          swap_sz, names[2:4], "sdb")
2758     disks = [drbd_sda_dev, drbd_sdb_dev]
2759   elif template_name == constants.DT_FILE:
2760     if len(secondary_nodes) != 0:
2761       raise errors.ProgrammerError("Wrong template configuration")
2762
2763     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2764                                 iv_name="sda", logical_id=(file_driver,
2765                                 "%s/sda" % file_storage_dir))
2766     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2767                                 iv_name="sdb", logical_id=(file_driver,
2768                                 "%s/sdb" % file_storage_dir))
2769     disks = [file_sda_dev, file_sdb_dev]
2770   else:
2771     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2772   return disks
2773
2774
2775 def _GetInstanceInfoText(instance):
2776   """Compute that text that should be added to the disk's metadata.
2777
2778   """
2779   return "originstname+%s" % instance.name
2780
2781
2782 def _CreateDisks(cfg, instance):
2783   """Create all disks for an instance.
2784
2785   This abstracts away some work from AddInstance.
2786
2787   Args:
2788     instance: the instance object
2789
2790   Returns:
2791     True or False showing the success of the creation process
2792
2793   """
2794   info = _GetInstanceInfoText(instance)
2795
2796   if instance.disk_template == constants.DT_FILE:
2797     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2798     result = rpc.call_file_storage_dir_create(instance.primary_node,
2799                                               file_storage_dir)
2800
2801     if not result:
2802       logger.Error("Could not connect to node '%s'" % instance.primary_node)
2803       return False
2804
2805     if not result[0]:
2806       logger.Error("failed to create directory '%s'" % file_storage_dir)
2807       return False
2808
2809   for device in instance.disks:
2810     logger.Info("creating volume %s for instance %s" %
2811                 (device.iv_name, instance.name))
2812     #HARDCODE
2813     for secondary_node in instance.secondary_nodes:
2814       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2815                                         device, False, info):
2816         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2817                      (device.iv_name, device, secondary_node))
2818         return False
2819     #HARDCODE
2820     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2821                                     instance, device, info):
2822       logger.Error("failed to create volume %s on primary!" %
2823                    device.iv_name)
2824       return False
2825
2826   return True
2827
2828
2829 def _RemoveDisks(instance, cfg):
2830   """Remove all disks for an instance.
2831
2832   This abstracts away some work from `AddInstance()` and
2833   `RemoveInstance()`. Note that in case some of the devices couldn't
2834   be removed, the removal will continue with the other ones (compare
2835   with `_CreateDisks()`).
2836
2837   Args:
2838     instance: the instance object
2839
2840   Returns:
2841     True or False showing the success of the removal proces
2842
2843   """
2844   logger.Info("removing block devices for instance %s" % instance.name)
2845
2846   result = True
2847   for device in instance.disks:
2848     for node, disk in device.ComputeNodeTree(instance.primary_node):
2849       cfg.SetDiskID(disk, node)
2850       if not rpc.call_blockdev_remove(node, disk):
2851         logger.Error("could not remove block device %s on node %s,"
2852                      " continuing anyway" %
2853                      (device.iv_name, node))
2854         result = False
2855
2856   if instance.disk_template == constants.DT_FILE:
2857     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2858     if not rpc.call_file_storage_dir_remove(instance.primary_node,
2859                                             file_storage_dir):
2860       logger.Error("could not remove directory '%s'" % file_storage_dir)
2861       result = False
2862
2863   return result
2864
2865
2866 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2867   """Compute disk size requirements in the volume group
2868
2869   This is currently hard-coded for the two-drive layout.
2870
2871   """
2872   # Required free disk space as a function of disk and swap space
2873   req_size_dict = {
2874     constants.DT_DISKLESS: None,
2875     constants.DT_PLAIN: disk_size + swap_size,
2876     # 256 MB are added for drbd metadata, 128MB for each drbd device
2877     constants.DT_DRBD8: disk_size + swap_size + 256,
2878     constants.DT_FILE: None,
2879   }
2880
2881   if disk_template not in req_size_dict:
2882     raise errors.ProgrammerError("Disk template '%s' size requirement"
2883                                  " is unknown" %  disk_template)
2884
2885   return req_size_dict[disk_template]
2886
2887
2888 class LUCreateInstance(LogicalUnit):
2889   """Create an instance.
2890
2891   """
2892   HPATH = "instance-add"
2893   HTYPE = constants.HTYPE_INSTANCE
2894   _OP_REQP = ["instance_name", "mem_size", "disk_size",
2895               "disk_template", "swap_size", "mode", "start", "vcpus",
2896               "wait_for_sync", "ip_check", "mac"]
2897
2898   def _RunAllocator(self):
2899     """Run the allocator based on input opcode.
2900
2901     """
2902     disks = [{"size": self.op.disk_size, "mode": "w"},
2903              {"size": self.op.swap_size, "mode": "w"}]
2904     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2905              "bridge": self.op.bridge}]
2906     ial = IAllocator(self.cfg, self.sstore,
2907                      mode=constants.IALLOCATOR_MODE_ALLOC,
2908                      name=self.op.instance_name,
2909                      disk_template=self.op.disk_template,
2910                      tags=[],
2911                      os=self.op.os_type,
2912                      vcpus=self.op.vcpus,
2913                      mem_size=self.op.mem_size,
2914                      disks=disks,
2915                      nics=nics,
2916                      )
2917
2918     ial.Run(self.op.iallocator)
2919
2920     if not ial.success:
2921       raise errors.OpPrereqError("Can't compute nodes using"
2922                                  " iallocator '%s': %s" % (self.op.iallocator,
2923                                                            ial.info))
2924     if len(ial.nodes) != ial.required_nodes:
2925       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2926                                  " of nodes (%s), required %s" %
2927                                  (len(ial.nodes), ial.required_nodes))
2928     self.op.pnode = ial.nodes[0]
2929     logger.ToStdout("Selected nodes for the instance: %s" %
2930                     (", ".join(ial.nodes),))
2931     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2932                 (self.op.instance_name, self.op.iallocator, ial.nodes))
2933     if ial.required_nodes == 2:
2934       self.op.snode = ial.nodes[1]
2935
2936   def BuildHooksEnv(self):
2937     """Build hooks env.
2938
2939     This runs on master, primary and secondary nodes of the instance.
2940
2941     """
2942     env = {
2943       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2944       "INSTANCE_DISK_SIZE": self.op.disk_size,
2945       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2946       "INSTANCE_ADD_MODE": self.op.mode,
2947       }
2948     if self.op.mode == constants.INSTANCE_IMPORT:
2949       env["INSTANCE_SRC_NODE"] = self.op.src_node
2950       env["INSTANCE_SRC_PATH"] = self.op.src_path
2951       env["INSTANCE_SRC_IMAGE"] = self.src_image
2952
2953     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2954       primary_node=self.op.pnode,
2955       secondary_nodes=self.secondaries,
2956       status=self.instance_status,
2957       os_type=self.op.os_type,
2958       memory=self.op.mem_size,
2959       vcpus=self.op.vcpus,
2960       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2961     ))
2962
2963     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2964           self.secondaries)
2965     return env, nl, nl
2966
2967
2968   def CheckPrereq(self):
2969     """Check prerequisites.
2970
2971     """
2972     # set optional parameters to none if they don't exist
2973     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2974                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2975                  "vnc_bind_address"]:
2976       if not hasattr(self.op, attr):
2977         setattr(self.op, attr, None)
2978
2979     if self.op.mode not in (constants.INSTANCE_CREATE,
2980                             constants.INSTANCE_IMPORT):
2981       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2982                                  self.op.mode)
2983
2984     if (not self.cfg.GetVGName() and
2985         self.op.disk_template not in constants.DTS_NOT_LVM):
2986       raise errors.OpPrereqError("Cluster does not support lvm-based"
2987                                  " instances")
2988
2989     if self.op.mode == constants.INSTANCE_IMPORT:
2990       src_node = getattr(self.op, "src_node", None)
2991       src_path = getattr(self.op, "src_path", None)
2992       if src_node is None or src_path is None:
2993         raise errors.OpPrereqError("Importing an instance requires source"
2994                                    " node and path options")
2995       src_node_full = self.cfg.ExpandNodeName(src_node)
2996       if src_node_full is None:
2997         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2998       self.op.src_node = src_node = src_node_full
2999
3000       if not os.path.isabs(src_path):
3001         raise errors.OpPrereqError("The source path must be absolute")
3002
3003       export_info = rpc.call_export_info(src_node, src_path)
3004
3005       if not export_info:
3006         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3007
3008       if not export_info.has_section(constants.INISECT_EXP):
3009         raise errors.ProgrammerError("Corrupted export config")
3010
3011       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3012       if (int(ei_version) != constants.EXPORT_VERSION):
3013         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3014                                    (ei_version, constants.EXPORT_VERSION))
3015
3016       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3017         raise errors.OpPrereqError("Can't import instance with more than"
3018                                    " one data disk")
3019
3020       # FIXME: are the old os-es, disk sizes, etc. useful?
3021       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3022       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3023                                                          'disk0_dump'))
3024       self.src_image = diskimage
3025     else: # INSTANCE_CREATE
3026       if getattr(self.op, "os_type", None) is None:
3027         raise errors.OpPrereqError("No guest OS specified")
3028
3029     #### instance parameters check
3030
3031     # disk template and mirror node verification
3032     if self.op.disk_template not in constants.DISK_TEMPLATES:
3033       raise errors.OpPrereqError("Invalid disk template name")
3034
3035     # instance name verification
3036     hostname1 = utils.HostInfo(self.op.instance_name)
3037
3038     self.op.instance_name = instance_name = hostname1.name
3039     instance_list = self.cfg.GetInstanceList()
3040     if instance_name in instance_list:
3041       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3042                                  instance_name)
3043
3044     # ip validity checks
3045     ip = getattr(self.op, "ip", None)
3046     if ip is None or ip.lower() == "none":
3047       inst_ip = None
3048     elif ip.lower() == "auto":
3049       inst_ip = hostname1.ip
3050     else:
3051       if not utils.IsValidIP(ip):
3052         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3053                                    " like a valid IP" % ip)
3054       inst_ip = ip
3055     self.inst_ip = self.op.ip = inst_ip
3056
3057     if self.op.start and not self.op.ip_check:
3058       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3059                                  " adding an instance in start mode")
3060
3061     if self.op.ip_check:
3062       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3063         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3064                                    (hostname1.ip, instance_name))
3065
3066     # MAC address verification
3067     if self.op.mac != "auto":
3068       if not utils.IsValidMac(self.op.mac.lower()):
3069         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3070                                    self.op.mac)
3071
3072     # bridge verification
3073     bridge = getattr(self.op, "bridge", None)
3074     if bridge is None:
3075       self.op.bridge = self.cfg.GetDefBridge()
3076     else:
3077       self.op.bridge = bridge
3078
3079     # boot order verification
3080     if self.op.hvm_boot_order is not None:
3081       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3082         raise errors.OpPrereqError("invalid boot order specified,"
3083                                    " must be one or more of [acdn]")
3084     # file storage checks
3085     if (self.op.file_driver and
3086         not self.op.file_driver in constants.FILE_DRIVER):
3087       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3088                                  self.op.file_driver)
3089
3090     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3091       raise errors.OpPrereqError("File storage directory not a relative"
3092                                  " path")
3093     #### allocator run
3094
3095     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3096       raise errors.OpPrereqError("One and only one of iallocator and primary"
3097                                  " node must be given")
3098
3099     if self.op.iallocator is not None:
3100       self._RunAllocator()
3101
3102     #### node related checks
3103
3104     # check primary node
3105     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3106     if pnode is None:
3107       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3108                                  self.op.pnode)
3109     self.op.pnode = pnode.name
3110     self.pnode = pnode
3111     self.secondaries = []
3112
3113     # mirror node verification
3114     if self.op.disk_template in constants.DTS_NET_MIRROR:
3115       if getattr(self.op, "snode", None) is None:
3116         raise errors.OpPrereqError("The networked disk templates need"
3117                                    " a mirror node")
3118
3119       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3120       if snode_name is None:
3121         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3122                                    self.op.snode)
3123       elif snode_name == pnode.name:
3124         raise errors.OpPrereqError("The secondary node cannot be"
3125                                    " the primary node.")
3126       self.secondaries.append(snode_name)
3127
3128     req_size = _ComputeDiskSize(self.op.disk_template,
3129                                 self.op.disk_size, self.op.swap_size)
3130
3131     # Check lv size requirements
3132     if req_size is not None:
3133       nodenames = [pnode.name] + self.secondaries
3134       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3135       for node in nodenames:
3136         info = nodeinfo.get(node, None)
3137         if not info:
3138           raise errors.OpPrereqError("Cannot get current information"
3139                                      " from node '%s'" % node)
3140         vg_free = info.get('vg_free', None)
3141         if not isinstance(vg_free, int):
3142           raise errors.OpPrereqError("Can't compute free disk space on"
3143                                      " node %s" % node)
3144         if req_size > info['vg_free']:
3145           raise errors.OpPrereqError("Not enough disk space on target node %s."
3146                                      " %d MB available, %d MB required" %
3147                                      (node, info['vg_free'], req_size))
3148
3149     # os verification
3150     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3151     if not os_obj:
3152       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3153                                  " primary node"  % self.op.os_type)
3154
3155     if self.op.kernel_path == constants.VALUE_NONE:
3156       raise errors.OpPrereqError("Can't set instance kernel to none")
3157
3158
3159     # bridge check on primary node
3160     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3161       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3162                                  " destination node '%s'" %
3163                                  (self.op.bridge, pnode.name))
3164
3165     # memory check on primary node
3166     if self.op.start:
3167       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3168                            "creating instance %s" % self.op.instance_name,
3169                            self.op.mem_size)
3170
3171     # hvm_cdrom_image_path verification
3172     if self.op.hvm_cdrom_image_path is not None:
3173       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3174         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3175                                    " be an absolute path or None, not %s" %
3176                                    self.op.hvm_cdrom_image_path)
3177       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3178         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3179                                    " regular file or a symlink pointing to"
3180                                    " an existing regular file, not %s" %
3181                                    self.op.hvm_cdrom_image_path)
3182
3183     # vnc_bind_address verification
3184     if self.op.vnc_bind_address is not None:
3185       if not utils.IsValidIP(self.op.vnc_bind_address):
3186         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3187                                    " like a valid IP address" %
3188                                    self.op.vnc_bind_address)
3189
3190     if self.op.start:
3191       self.instance_status = 'up'
3192     else:
3193       self.instance_status = 'down'
3194
3195   def Exec(self, feedback_fn):
3196     """Create and add the instance to the cluster.
3197
3198     """
3199     instance = self.op.instance_name
3200     pnode_name = self.pnode.name
3201
3202     if self.op.mac == "auto":
3203       mac_address = self.cfg.GenerateMAC()
3204     else:
3205       mac_address = self.op.mac
3206
3207     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3208     if self.inst_ip is not None:
3209       nic.ip = self.inst_ip
3210
3211     ht_kind = self.sstore.GetHypervisorType()
3212     if ht_kind in constants.HTS_REQ_PORT:
3213       network_port = self.cfg.AllocatePort()
3214     else:
3215       network_port = None
3216
3217     if self.op.vnc_bind_address is None:
3218       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3219
3220     # this is needed because os.path.join does not accept None arguments
3221     if self.op.file_storage_dir is None:
3222       string_file_storage_dir = ""
3223     else:
3224       string_file_storage_dir = self.op.file_storage_dir
3225
3226     # build the full file storage dir path
3227     file_storage_dir = os.path.normpath(os.path.join(
3228                                         self.sstore.GetFileStorageDir(),
3229                                         string_file_storage_dir, instance))
3230
3231
3232     disks = _GenerateDiskTemplate(self.cfg,
3233                                   self.op.disk_template,
3234                                   instance, pnode_name,
3235                                   self.secondaries, self.op.disk_size,
3236                                   self.op.swap_size,
3237                                   file_storage_dir,
3238                                   self.op.file_driver)
3239
3240     iobj = objects.Instance(name=instance, os=self.op.os_type,
3241                             primary_node=pnode_name,
3242                             memory=self.op.mem_size,
3243                             vcpus=self.op.vcpus,
3244                             nics=[nic], disks=disks,
3245                             disk_template=self.op.disk_template,
3246                             status=self.instance_status,
3247                             network_port=network_port,
3248                             kernel_path=self.op.kernel_path,
3249                             initrd_path=self.op.initrd_path,
3250                             hvm_boot_order=self.op.hvm_boot_order,
3251                             hvm_acpi=self.op.hvm_acpi,
3252                             hvm_pae=self.op.hvm_pae,
3253                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3254                             vnc_bind_address=self.op.vnc_bind_address,
3255                             )
3256
3257     feedback_fn("* creating instance disks...")
3258     if not _CreateDisks(self.cfg, iobj):
3259       _RemoveDisks(iobj, self.cfg)
3260       raise errors.OpExecError("Device creation failed, reverting...")
3261
3262     feedback_fn("adding instance %s to cluster config" % instance)
3263
3264     self.cfg.AddInstance(iobj)
3265     # Add the new instance to the Ganeti Lock Manager
3266     self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3267
3268     if self.op.wait_for_sync:
3269       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3270     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3271       # make sure the disks are not degraded (still sync-ing is ok)
3272       time.sleep(15)
3273       feedback_fn("* checking mirrors status")
3274       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3275     else:
3276       disk_abort = False
3277
3278     if disk_abort:
3279       _RemoveDisks(iobj, self.cfg)
3280       self.cfg.RemoveInstance(iobj.name)
3281       # Remove the new instance from the Ganeti Lock Manager
3282       self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3283       raise errors.OpExecError("There are some degraded disks for"
3284                                " this instance")
3285
3286     feedback_fn("creating os for instance %s on node %s" %
3287                 (instance, pnode_name))
3288
3289     if iobj.disk_template != constants.DT_DISKLESS:
3290       if self.op.mode == constants.INSTANCE_CREATE:
3291         feedback_fn("* running the instance OS create scripts...")
3292         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3293           raise errors.OpExecError("could not add os for instance %s"
3294                                    " on node %s" %
3295                                    (instance, pnode_name))
3296
3297       elif self.op.mode == constants.INSTANCE_IMPORT:
3298         feedback_fn("* running the instance OS import scripts...")
3299         src_node = self.op.src_node
3300         src_image = self.src_image
3301         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3302                                                 src_node, src_image):
3303           raise errors.OpExecError("Could not import os for instance"
3304                                    " %s on node %s" %
3305                                    (instance, pnode_name))
3306       else:
3307         # also checked in the prereq part
3308         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3309                                      % self.op.mode)
3310
3311     if self.op.start:
3312       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3313       feedback_fn("* starting instance...")
3314       if not rpc.call_instance_start(pnode_name, iobj, None):
3315         raise errors.OpExecError("Could not start instance")
3316
3317
3318 class LUConnectConsole(NoHooksLU):
3319   """Connect to an instance's console.
3320
3321   This is somewhat special in that it returns the command line that
3322   you need to run on the master node in order to connect to the
3323   console.
3324
3325   """
3326   _OP_REQP = ["instance_name"]
3327
3328   def CheckPrereq(self):
3329     """Check prerequisites.
3330
3331     This checks that the instance is in the cluster.
3332
3333     """
3334     instance = self.cfg.GetInstanceInfo(
3335       self.cfg.ExpandInstanceName(self.op.instance_name))
3336     if instance is None:
3337       raise errors.OpPrereqError("Instance '%s' not known" %
3338                                  self.op.instance_name)
3339     self.instance = instance
3340
3341   def Exec(self, feedback_fn):
3342     """Connect to the console of an instance
3343
3344     """
3345     instance = self.instance
3346     node = instance.primary_node
3347
3348     node_insts = rpc.call_instance_list([node])[node]
3349     if node_insts is False:
3350       raise errors.OpExecError("Can't connect to node %s." % node)
3351
3352     if instance.name not in node_insts:
3353       raise errors.OpExecError("Instance %s is not running." % instance.name)
3354
3355     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3356
3357     hyper = hypervisor.GetHypervisor()
3358     console_cmd = hyper.GetShellCommandForConsole(instance)
3359
3360     # build ssh cmdline
3361     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3362
3363
3364 class LUReplaceDisks(LogicalUnit):
3365   """Replace the disks of an instance.
3366
3367   """
3368   HPATH = "mirrors-replace"
3369   HTYPE = constants.HTYPE_INSTANCE
3370   _OP_REQP = ["instance_name", "mode", "disks"]
3371
3372   def _RunAllocator(self):
3373     """Compute a new secondary node using an IAllocator.
3374
3375     """
3376     ial = IAllocator(self.cfg, self.sstore,
3377                      mode=constants.IALLOCATOR_MODE_RELOC,
3378                      name=self.op.instance_name,
3379                      relocate_from=[self.sec_node])
3380
3381     ial.Run(self.op.iallocator)
3382
3383     if not ial.success:
3384       raise errors.OpPrereqError("Can't compute nodes using"
3385                                  " iallocator '%s': %s" % (self.op.iallocator,
3386                                                            ial.info))
3387     if len(ial.nodes) != ial.required_nodes:
3388       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3389                                  " of nodes (%s), required %s" %
3390                                  (len(ial.nodes), ial.required_nodes))
3391     self.op.remote_node = ial.nodes[0]
3392     logger.ToStdout("Selected new secondary for the instance: %s" %
3393                     self.op.remote_node)
3394
3395   def BuildHooksEnv(self):
3396     """Build hooks env.
3397
3398     This runs on the master, the primary and all the secondaries.
3399
3400     """
3401     env = {
3402       "MODE": self.op.mode,
3403       "NEW_SECONDARY": self.op.remote_node,
3404       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3405       }
3406     env.update(_BuildInstanceHookEnvByObject(self.instance))
3407     nl = [
3408       self.sstore.GetMasterNode(),
3409       self.instance.primary_node,
3410       ]
3411     if self.op.remote_node is not None:
3412       nl.append(self.op.remote_node)
3413     return env, nl, nl
3414
3415   def CheckPrereq(self):
3416     """Check prerequisites.
3417
3418     This checks that the instance is in the cluster.
3419
3420     """
3421     if not hasattr(self.op, "remote_node"):
3422       self.op.remote_node = None
3423
3424     instance = self.cfg.GetInstanceInfo(
3425       self.cfg.ExpandInstanceName(self.op.instance_name))
3426     if instance is None:
3427       raise errors.OpPrereqError("Instance '%s' not known" %
3428                                  self.op.instance_name)
3429     self.instance = instance
3430     self.op.instance_name = instance.name
3431
3432     if instance.disk_template not in constants.DTS_NET_MIRROR:
3433       raise errors.OpPrereqError("Instance's disk layout is not"
3434                                  " network mirrored.")
3435
3436     if len(instance.secondary_nodes) != 1:
3437       raise errors.OpPrereqError("The instance has a strange layout,"
3438                                  " expected one secondary but found %d" %
3439                                  len(instance.secondary_nodes))
3440
3441     self.sec_node = instance.secondary_nodes[0]
3442
3443     ia_name = getattr(self.op, "iallocator", None)
3444     if ia_name is not None:
3445       if self.op.remote_node is not None:
3446         raise errors.OpPrereqError("Give either the iallocator or the new"
3447                                    " secondary, not both")
3448       self.op.remote_node = self._RunAllocator()
3449
3450     remote_node = self.op.remote_node
3451     if remote_node is not None:
3452       remote_node = self.cfg.ExpandNodeName(remote_node)
3453       if remote_node is None:
3454         raise errors.OpPrereqError("Node '%s' not known" %
3455                                    self.op.remote_node)
3456       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3457     else:
3458       self.remote_node_info = None
3459     if remote_node == instance.primary_node:
3460       raise errors.OpPrereqError("The specified node is the primary node of"
3461                                  " the instance.")
3462     elif remote_node == self.sec_node:
3463       if self.op.mode == constants.REPLACE_DISK_SEC:
3464         # this is for DRBD8, where we can't execute the same mode of
3465         # replacement as for drbd7 (no different port allocated)
3466         raise errors.OpPrereqError("Same secondary given, cannot execute"
3467                                    " replacement")
3468     if instance.disk_template == constants.DT_DRBD8:
3469       if (self.op.mode == constants.REPLACE_DISK_ALL and
3470           remote_node is not None):
3471         # switch to replace secondary mode
3472         self.op.mode = constants.REPLACE_DISK_SEC
3473
3474       if self.op.mode == constants.REPLACE_DISK_ALL:
3475         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3476                                    " secondary disk replacement, not"
3477                                    " both at once")
3478       elif self.op.mode == constants.REPLACE_DISK_PRI:
3479         if remote_node is not None:
3480           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3481                                      " the secondary while doing a primary"
3482                                      " node disk replacement")
3483         self.tgt_node = instance.primary_node
3484         self.oth_node = instance.secondary_nodes[0]
3485       elif self.op.mode == constants.REPLACE_DISK_SEC:
3486         self.new_node = remote_node # this can be None, in which case
3487                                     # we don't change the secondary
3488         self.tgt_node = instance.secondary_nodes[0]
3489         self.oth_node = instance.primary_node
3490       else:
3491         raise errors.ProgrammerError("Unhandled disk replace mode")
3492
3493     for name in self.op.disks:
3494       if instance.FindDisk(name) is None:
3495         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3496                                    (name, instance.name))
3497     self.op.remote_node = remote_node
3498
3499   def _ExecD8DiskOnly(self, feedback_fn):
3500     """Replace a disk on the primary or secondary for dbrd8.
3501
3502     The algorithm for replace is quite complicated:
3503       - for each disk to be replaced:
3504         - create new LVs on the target node with unique names
3505         - detach old LVs from the drbd device
3506         - rename old LVs to name_replaced.<time_t>
3507         - rename new LVs to old LVs
3508         - attach the new LVs (with the old names now) to the drbd device
3509       - wait for sync across all devices
3510       - for each modified disk:
3511         - remove old LVs (which have the name name_replaces.<time_t>)
3512
3513     Failures are not very well handled.
3514
3515     """
3516     steps_total = 6
3517     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3518     instance = self.instance
3519     iv_names = {}
3520     vgname = self.cfg.GetVGName()
3521     # start of work
3522     cfg = self.cfg
3523     tgt_node = self.tgt_node
3524     oth_node = self.oth_node
3525
3526     # Step: check device activation
3527     self.proc.LogStep(1, steps_total, "check device existence")
3528     info("checking volume groups")
3529     my_vg = cfg.GetVGName()
3530     results = rpc.call_vg_list([oth_node, tgt_node])
3531     if not results:
3532       raise errors.OpExecError("Can't list volume groups on the nodes")
3533     for node in oth_node, tgt_node:
3534       res = results.get(node, False)
3535       if not res or my_vg not in res:
3536         raise errors.OpExecError("Volume group '%s' not found on %s" %
3537                                  (my_vg, node))
3538     for dev in instance.disks:
3539       if not dev.iv_name in self.op.disks:
3540         continue
3541       for node in tgt_node, oth_node:
3542         info("checking %s on %s" % (dev.iv_name, node))
3543         cfg.SetDiskID(dev, node)
3544         if not rpc.call_blockdev_find(node, dev):
3545           raise errors.OpExecError("Can't find device %s on node %s" %
3546                                    (dev.iv_name, node))
3547
3548     # Step: check other node consistency
3549     self.proc.LogStep(2, steps_total, "check peer consistency")
3550     for dev in instance.disks:
3551       if not dev.iv_name in self.op.disks:
3552         continue
3553       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3554       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3555                                    oth_node==instance.primary_node):
3556         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3557                                  " to replace disks on this node (%s)" %
3558                                  (oth_node, tgt_node))
3559
3560     # Step: create new storage
3561     self.proc.LogStep(3, steps_total, "allocate new storage")
3562     for dev in instance.disks:
3563       if not dev.iv_name in self.op.disks:
3564         continue
3565       size = dev.size
3566       cfg.SetDiskID(dev, tgt_node)
3567       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3568       names = _GenerateUniqueNames(cfg, lv_names)
3569       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3570                              logical_id=(vgname, names[0]))
3571       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3572                              logical_id=(vgname, names[1]))
3573       new_lvs = [lv_data, lv_meta]
3574       old_lvs = dev.children
3575       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3576       info("creating new local storage on %s for %s" %
3577            (tgt_node, dev.iv_name))
3578       # since we *always* want to create this LV, we use the
3579       # _Create...OnPrimary (which forces the creation), even if we
3580       # are talking about the secondary node
3581       for new_lv in new_lvs:
3582         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3583                                         _GetInstanceInfoText(instance)):
3584           raise errors.OpExecError("Failed to create new LV named '%s' on"
3585                                    " node '%s'" %
3586                                    (new_lv.logical_id[1], tgt_node))
3587
3588     # Step: for each lv, detach+rename*2+attach
3589     self.proc.LogStep(4, steps_total, "change drbd configuration")
3590     for dev, old_lvs, new_lvs in iv_names.itervalues():
3591       info("detaching %s drbd from local storage" % dev.iv_name)
3592       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3593         raise errors.OpExecError("Can't detach drbd from local storage on node"
3594                                  " %s for device %s" % (tgt_node, dev.iv_name))
3595       #dev.children = []
3596       #cfg.Update(instance)
3597
3598       # ok, we created the new LVs, so now we know we have the needed
3599       # storage; as such, we proceed on the target node to rename
3600       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3601       # using the assumption that logical_id == physical_id (which in
3602       # turn is the unique_id on that node)
3603
3604       # FIXME(iustin): use a better name for the replaced LVs
3605       temp_suffix = int(time.time())
3606       ren_fn = lambda d, suff: (d.physical_id[0],
3607                                 d.physical_id[1] + "_replaced-%s" % suff)
3608       # build the rename list based on what LVs exist on the node
3609       rlist = []
3610       for to_ren in old_lvs:
3611         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3612         if find_res is not None: # device exists
3613           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3614
3615       info("renaming the old LVs on the target node")
3616       if not rpc.call_blockdev_rename(tgt_node, rlist):
3617         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3618       # now we rename the new LVs to the old LVs
3619       info("renaming the new LVs on the target node")
3620       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3621       if not rpc.call_blockdev_rename(tgt_node, rlist):
3622         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3623
3624       for old, new in zip(old_lvs, new_lvs):
3625         new.logical_id = old.logical_id
3626         cfg.SetDiskID(new, tgt_node)
3627
3628       for disk in old_lvs:
3629         disk.logical_id = ren_fn(disk, temp_suffix)
3630         cfg.SetDiskID(disk, tgt_node)
3631
3632       # now that the new lvs have the old name, we can add them to the device
3633       info("adding new mirror component on %s" % tgt_node)
3634       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3635         for new_lv in new_lvs:
3636           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3637             warning("Can't rollback device %s", hint="manually cleanup unused"
3638                     " logical volumes")
3639         raise errors.OpExecError("Can't add local storage to drbd")
3640
3641       dev.children = new_lvs
3642       cfg.Update(instance)
3643
3644     # Step: wait for sync
3645
3646     # this can fail as the old devices are degraded and _WaitForSync
3647     # does a combined result over all disks, so we don't check its
3648     # return value
3649     self.proc.LogStep(5, steps_total, "sync devices")
3650     _WaitForSync(cfg, instance, self.proc, unlock=True)
3651
3652     # so check manually all the devices
3653     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3654       cfg.SetDiskID(dev, instance.primary_node)
3655       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3656       if is_degr:
3657         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3658
3659     # Step: remove old storage
3660     self.proc.LogStep(6, steps_total, "removing old storage")
3661     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3662       info("remove logical volumes for %s" % name)
3663       for lv in old_lvs:
3664         cfg.SetDiskID(lv, tgt_node)
3665         if not rpc.call_blockdev_remove(tgt_node, lv):
3666           warning("Can't remove old LV", hint="manually remove unused LVs")
3667           continue
3668
3669   def _ExecD8Secondary(self, feedback_fn):
3670     """Replace the secondary node for drbd8.
3671
3672     The algorithm for replace is quite complicated:
3673       - for all disks of the instance:
3674         - create new LVs on the new node with same names
3675         - shutdown the drbd device on the old secondary
3676         - disconnect the drbd network on the primary
3677         - create the drbd device on the new secondary
3678         - network attach the drbd on the primary, using an artifice:
3679           the drbd code for Attach() will connect to the network if it
3680           finds a device which is connected to the good local disks but
3681           not network enabled
3682       - wait for sync across all devices
3683       - remove all disks from the old secondary
3684
3685     Failures are not very well handled.
3686
3687     """
3688     steps_total = 6
3689     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3690     instance = self.instance
3691     iv_names = {}
3692     vgname = self.cfg.GetVGName()
3693     # start of work
3694     cfg = self.cfg
3695     old_node = self.tgt_node
3696     new_node = self.new_node
3697     pri_node = instance.primary_node
3698
3699     # Step: check device activation
3700     self.proc.LogStep(1, steps_total, "check device existence")
3701     info("checking volume groups")
3702     my_vg = cfg.GetVGName()
3703     results = rpc.call_vg_list([pri_node, new_node])
3704     if not results:
3705       raise errors.OpExecError("Can't list volume groups on the nodes")
3706     for node in pri_node, new_node:
3707       res = results.get(node, False)
3708       if not res or my_vg not in res:
3709         raise errors.OpExecError("Volume group '%s' not found on %s" %
3710                                  (my_vg, node))
3711     for dev in instance.disks:
3712       if not dev.iv_name in self.op.disks:
3713         continue
3714       info("checking %s on %s" % (dev.iv_name, pri_node))
3715       cfg.SetDiskID(dev, pri_node)
3716       if not rpc.call_blockdev_find(pri_node, dev):
3717         raise errors.OpExecError("Can't find device %s on node %s" %
3718                                  (dev.iv_name, pri_node))
3719
3720     # Step: check other node consistency
3721     self.proc.LogStep(2, steps_total, "check peer consistency")
3722     for dev in instance.disks:
3723       if not dev.iv_name in self.op.disks:
3724         continue
3725       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3726       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3727         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3728                                  " unsafe to replace the secondary" %
3729                                  pri_node)
3730
3731     # Step: create new storage
3732     self.proc.LogStep(3, steps_total, "allocate new storage")
3733     for dev in instance.disks:
3734       size = dev.size
3735       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3736       # since we *always* want to create this LV, we use the
3737       # _Create...OnPrimary (which forces the creation), even if we
3738       # are talking about the secondary node
3739       for new_lv in dev.children:
3740         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3741                                         _GetInstanceInfoText(instance)):
3742           raise errors.OpExecError("Failed to create new LV named '%s' on"
3743                                    " node '%s'" %
3744                                    (new_lv.logical_id[1], new_node))
3745
3746       iv_names[dev.iv_name] = (dev, dev.children)
3747
3748     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3749     for dev in instance.disks:
3750       size = dev.size
3751       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3752       # create new devices on new_node
3753       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3754                               logical_id=(pri_node, new_node,
3755                                           dev.logical_id[2]),
3756                               children=dev.children)
3757       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3758                                         new_drbd, False,
3759                                       _GetInstanceInfoText(instance)):
3760         raise errors.OpExecError("Failed to create new DRBD on"
3761                                  " node '%s'" % new_node)
3762
3763     for dev in instance.disks:
3764       # we have new devices, shutdown the drbd on the old secondary
3765       info("shutting down drbd for %s on old node" % dev.iv_name)
3766       cfg.SetDiskID(dev, old_node)
3767       if not rpc.call_blockdev_shutdown(old_node, dev):
3768         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3769                 hint="Please cleanup this device manually as soon as possible")
3770
3771     info("detaching primary drbds from the network (=> standalone)")
3772     done = 0
3773     for dev in instance.disks:
3774       cfg.SetDiskID(dev, pri_node)
3775       # set the physical (unique in bdev terms) id to None, meaning
3776       # detach from network
3777       dev.physical_id = (None,) * len(dev.physical_id)
3778       # and 'find' the device, which will 'fix' it to match the
3779       # standalone state
3780       if rpc.call_blockdev_find(pri_node, dev):
3781         done += 1
3782       else:
3783         warning("Failed to detach drbd %s from network, unusual case" %
3784                 dev.iv_name)
3785
3786     if not done:
3787       # no detaches succeeded (very unlikely)
3788       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3789
3790     # if we managed to detach at least one, we update all the disks of
3791     # the instance to point to the new secondary
3792     info("updating instance configuration")
3793     for dev in instance.disks:
3794       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3795       cfg.SetDiskID(dev, pri_node)
3796     cfg.Update(instance)
3797
3798     # and now perform the drbd attach
3799     info("attaching primary drbds to new secondary (standalone => connected)")
3800     failures = []
3801     for dev in instance.disks:
3802       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3803       # since the attach is smart, it's enough to 'find' the device,
3804       # it will automatically activate the network, if the physical_id
3805       # is correct
3806       cfg.SetDiskID(dev, pri_node)
3807       if not rpc.call_blockdev_find(pri_node, dev):
3808         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3809                 "please do a gnt-instance info to see the status of disks")
3810
3811     # this can fail as the old devices are degraded and _WaitForSync
3812     # does a combined result over all disks, so we don't check its
3813     # return value
3814     self.proc.LogStep(5, steps_total, "sync devices")
3815     _WaitForSync(cfg, instance, self.proc, unlock=True)
3816
3817     # so check manually all the devices
3818     for name, (dev, old_lvs) in iv_names.iteritems():
3819       cfg.SetDiskID(dev, pri_node)
3820       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3821       if is_degr:
3822         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3823
3824     self.proc.LogStep(6, steps_total, "removing old storage")
3825     for name, (dev, old_lvs) in iv_names.iteritems():
3826       info("remove logical volumes for %s" % name)
3827       for lv in old_lvs:
3828         cfg.SetDiskID(lv, old_node)
3829         if not rpc.call_blockdev_remove(old_node, lv):
3830           warning("Can't remove LV on old secondary",
3831                   hint="Cleanup stale volumes by hand")
3832
3833   def Exec(self, feedback_fn):
3834     """Execute disk replacement.
3835
3836     This dispatches the disk replacement to the appropriate handler.
3837
3838     """
3839     instance = self.instance
3840
3841     # Activate the instance disks if we're replacing them on a down instance
3842     if instance.status == "down":
3843       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3844       self.proc.ChainOpCode(op)
3845
3846     if instance.disk_template == constants.DT_DRBD8:
3847       if self.op.remote_node is None:
3848         fn = self._ExecD8DiskOnly
3849       else:
3850         fn = self._ExecD8Secondary
3851     else:
3852       raise errors.ProgrammerError("Unhandled disk replacement case")
3853
3854     ret = fn(feedback_fn)
3855
3856     # Deactivate the instance disks if we're replacing them on a down instance
3857     if instance.status == "down":
3858       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3859       self.proc.ChainOpCode(op)
3860
3861     return ret
3862
3863
3864 class LUGrowDisk(LogicalUnit):
3865   """Grow a disk of an instance.
3866
3867   """
3868   HPATH = "disk-grow"
3869   HTYPE = constants.HTYPE_INSTANCE
3870   _OP_REQP = ["instance_name", "disk", "amount"]
3871
3872   def BuildHooksEnv(self):
3873     """Build hooks env.
3874
3875     This runs on the master, the primary and all the secondaries.
3876
3877     """
3878     env = {
3879       "DISK": self.op.disk,
3880       "AMOUNT": self.op.amount,
3881       }
3882     env.update(_BuildInstanceHookEnvByObject(self.instance))
3883     nl = [
3884       self.sstore.GetMasterNode(),
3885       self.instance.primary_node,
3886       ]
3887     return env, nl, nl
3888
3889   def CheckPrereq(self):
3890     """Check prerequisites.
3891
3892     This checks that the instance is in the cluster.
3893
3894     """
3895     instance = self.cfg.GetInstanceInfo(
3896       self.cfg.ExpandInstanceName(self.op.instance_name))
3897     if instance is None:
3898       raise errors.OpPrereqError("Instance '%s' not known" %
3899                                  self.op.instance_name)
3900     self.instance = instance
3901     self.op.instance_name = instance.name
3902
3903     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3904       raise errors.OpPrereqError("Instance's disk layout does not support"
3905                                  " growing.")
3906
3907     if instance.FindDisk(self.op.disk) is None:
3908       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3909                                  (self.op.disk, instance.name))
3910
3911     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3912     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3913     for node in nodenames:
3914       info = nodeinfo.get(node, None)
3915       if not info:
3916         raise errors.OpPrereqError("Cannot get current information"
3917                                    " from node '%s'" % node)
3918       vg_free = info.get('vg_free', None)
3919       if not isinstance(vg_free, int):
3920         raise errors.OpPrereqError("Can't compute free disk space on"
3921                                    " node %s" % node)
3922       if self.op.amount > info['vg_free']:
3923         raise errors.OpPrereqError("Not enough disk space on target node %s:"
3924                                    " %d MiB available, %d MiB required" %
3925                                    (node, info['vg_free'], self.op.amount))
3926
3927   def Exec(self, feedback_fn):
3928     """Execute disk grow.
3929
3930     """
3931     instance = self.instance
3932     disk = instance.FindDisk(self.op.disk)
3933     for node in (instance.secondary_nodes + (instance.primary_node,)):
3934       self.cfg.SetDiskID(disk, node)
3935       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3936       if not result or not isinstance(result, tuple) or len(result) != 2:
3937         raise errors.OpExecError("grow request failed to node %s" % node)
3938       elif not result[0]:
3939         raise errors.OpExecError("grow request failed to node %s: %s" %
3940                                  (node, result[1]))
3941     disk.RecordGrow(self.op.amount)
3942     self.cfg.Update(instance)
3943     return
3944
3945
3946 class LUQueryInstanceData(NoHooksLU):
3947   """Query runtime instance data.
3948
3949   """
3950   _OP_REQP = ["instances"]
3951
3952   def CheckPrereq(self):
3953     """Check prerequisites.
3954
3955     This only checks the optional instance list against the existing names.
3956
3957     """
3958     if not isinstance(self.op.instances, list):
3959       raise errors.OpPrereqError("Invalid argument type 'instances'")
3960     if self.op.instances:
3961       self.wanted_instances = []
3962       names = self.op.instances
3963       for name in names:
3964         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3965         if instance is None:
3966           raise errors.OpPrereqError("No such instance name '%s'" % name)
3967         self.wanted_instances.append(instance)
3968     else:
3969       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3970                                in self.cfg.GetInstanceList()]
3971     return
3972
3973
3974   def _ComputeDiskStatus(self, instance, snode, dev):
3975     """Compute block device status.
3976
3977     """
3978     self.cfg.SetDiskID(dev, instance.primary_node)
3979     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3980     if dev.dev_type in constants.LDS_DRBD:
3981       # we change the snode then (otherwise we use the one passed in)
3982       if dev.logical_id[0] == instance.primary_node:
3983         snode = dev.logical_id[1]
3984       else:
3985         snode = dev.logical_id[0]
3986
3987     if snode:
3988       self.cfg.SetDiskID(dev, snode)
3989       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3990     else:
3991       dev_sstatus = None
3992
3993     if dev.children:
3994       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3995                       for child in dev.children]
3996     else:
3997       dev_children = []
3998
3999     data = {
4000       "iv_name": dev.iv_name,
4001       "dev_type": dev.dev_type,
4002       "logical_id": dev.logical_id,
4003       "physical_id": dev.physical_id,
4004       "pstatus": dev_pstatus,
4005       "sstatus": dev_sstatus,
4006       "children": dev_children,
4007       }
4008
4009     return data
4010
4011   def Exec(self, feedback_fn):
4012     """Gather and return data"""
4013     result = {}
4014     for instance in self.wanted_instances:
4015       remote_info = rpc.call_instance_info(instance.primary_node,
4016                                                 instance.name)
4017       if remote_info and "state" in remote_info:
4018         remote_state = "up"
4019       else:
4020         remote_state = "down"
4021       if instance.status == "down":
4022         config_state = "down"
4023       else:
4024         config_state = "up"
4025
4026       disks = [self._ComputeDiskStatus(instance, None, device)
4027                for device in instance.disks]
4028
4029       idict = {
4030         "name": instance.name,
4031         "config_state": config_state,
4032         "run_state": remote_state,
4033         "pnode": instance.primary_node,
4034         "snodes": instance.secondary_nodes,
4035         "os": instance.os,
4036         "memory": instance.memory,
4037         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4038         "disks": disks,
4039         "vcpus": instance.vcpus,
4040         }
4041
4042       htkind = self.sstore.GetHypervisorType()
4043       if htkind == constants.HT_XEN_PVM30:
4044         idict["kernel_path"] = instance.kernel_path
4045         idict["initrd_path"] = instance.initrd_path
4046
4047       if htkind == constants.HT_XEN_HVM31:
4048         idict["hvm_boot_order"] = instance.hvm_boot_order
4049         idict["hvm_acpi"] = instance.hvm_acpi
4050         idict["hvm_pae"] = instance.hvm_pae
4051         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4052
4053       if htkind in constants.HTS_REQ_PORT:
4054         idict["vnc_bind_address"] = instance.vnc_bind_address
4055         idict["network_port"] = instance.network_port
4056
4057       result[instance.name] = idict
4058
4059     return result
4060
4061
4062 class LUSetInstanceParams(LogicalUnit):
4063   """Modifies an instances's parameters.
4064
4065   """
4066   HPATH = "instance-modify"
4067   HTYPE = constants.HTYPE_INSTANCE
4068   _OP_REQP = ["instance_name"]
4069
4070   def BuildHooksEnv(self):
4071     """Build hooks env.
4072
4073     This runs on the master, primary and secondaries.
4074
4075     """
4076     args = dict()
4077     if self.mem:
4078       args['memory'] = self.mem
4079     if self.vcpus:
4080       args['vcpus'] = self.vcpus
4081     if self.do_ip or self.do_bridge or self.mac:
4082       if self.do_ip:
4083         ip = self.ip
4084       else:
4085         ip = self.instance.nics[0].ip
4086       if self.bridge:
4087         bridge = self.bridge
4088       else:
4089         bridge = self.instance.nics[0].bridge
4090       if self.mac:
4091         mac = self.mac
4092       else:
4093         mac = self.instance.nics[0].mac
4094       args['nics'] = [(ip, bridge, mac)]
4095     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4096     nl = [self.sstore.GetMasterNode(),
4097           self.instance.primary_node] + list(self.instance.secondary_nodes)
4098     return env, nl, nl
4099
4100   def CheckPrereq(self):
4101     """Check prerequisites.
4102
4103     This only checks the instance list against the existing names.
4104
4105     """
4106     self.mem = getattr(self.op, "mem", None)
4107     self.vcpus = getattr(self.op, "vcpus", None)
4108     self.ip = getattr(self.op, "ip", None)
4109     self.mac = getattr(self.op, "mac", None)
4110     self.bridge = getattr(self.op, "bridge", None)
4111     self.kernel_path = getattr(self.op, "kernel_path", None)
4112     self.initrd_path = getattr(self.op, "initrd_path", None)
4113     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4114     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4115     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4116     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4117     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4118     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4119                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4120                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4121                  self.vnc_bind_address]
4122     if all_parms.count(None) == len(all_parms):
4123       raise errors.OpPrereqError("No changes submitted")
4124     if self.mem is not None:
4125       try:
4126         self.mem = int(self.mem)
4127       except ValueError, err:
4128         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4129     if self.vcpus is not None:
4130       try:
4131         self.vcpus = int(self.vcpus)
4132       except ValueError, err:
4133         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4134     if self.ip is not None:
4135       self.do_ip = True
4136       if self.ip.lower() == "none":
4137         self.ip = None
4138       else:
4139         if not utils.IsValidIP(self.ip):
4140           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4141     else:
4142       self.do_ip = False
4143     self.do_bridge = (self.bridge is not None)
4144     if self.mac is not None:
4145       if self.cfg.IsMacInUse(self.mac):
4146         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4147                                    self.mac)
4148       if not utils.IsValidMac(self.mac):
4149         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4150
4151     if self.kernel_path is not None:
4152       self.do_kernel_path = True
4153       if self.kernel_path == constants.VALUE_NONE:
4154         raise errors.OpPrereqError("Can't set instance to no kernel")
4155
4156       if self.kernel_path != constants.VALUE_DEFAULT:
4157         if not os.path.isabs(self.kernel_path):
4158           raise errors.OpPrereqError("The kernel path must be an absolute"
4159                                     " filename")
4160     else:
4161       self.do_kernel_path = False
4162
4163     if self.initrd_path is not None:
4164       self.do_initrd_path = True
4165       if self.initrd_path not in (constants.VALUE_NONE,
4166                                   constants.VALUE_DEFAULT):
4167         if not os.path.isabs(self.initrd_path):
4168           raise errors.OpPrereqError("The initrd path must be an absolute"
4169                                     " filename")
4170     else:
4171       self.do_initrd_path = False
4172
4173     # boot order verification
4174     if self.hvm_boot_order is not None:
4175       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4176         if len(self.hvm_boot_order.strip("acdn")) != 0:
4177           raise errors.OpPrereqError("invalid boot order specified,"
4178                                      " must be one or more of [acdn]"
4179                                      " or 'default'")
4180
4181     # hvm_cdrom_image_path verification
4182     if self.op.hvm_cdrom_image_path is not None:
4183       if not os.path.isabs(self.op.hvm_cdrom_image_path):
4184         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4185                                    " be an absolute path or None, not %s" %
4186                                    self.op.hvm_cdrom_image_path)
4187       if not os.path.isfile(self.op.hvm_cdrom_image_path):
4188         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4189                                    " regular file or a symlink pointing to"
4190                                    " an existing regular file, not %s" %
4191                                    self.op.hvm_cdrom_image_path)
4192
4193     # vnc_bind_address verification
4194     if self.op.vnc_bind_address is not None:
4195       if not utils.IsValidIP(self.op.vnc_bind_address):
4196         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4197                                    " like a valid IP address" %
4198                                    self.op.vnc_bind_address)
4199
4200     instance = self.cfg.GetInstanceInfo(
4201       self.cfg.ExpandInstanceName(self.op.instance_name))
4202     if instance is None:
4203       raise errors.OpPrereqError("No such instance name '%s'" %
4204                                  self.op.instance_name)
4205     self.op.instance_name = instance.name
4206     self.instance = instance
4207     return
4208
4209   def Exec(self, feedback_fn):
4210     """Modifies an instance.
4211
4212     All parameters take effect only at the next restart of the instance.
4213     """
4214     result = []
4215     instance = self.instance
4216     if self.mem:
4217       instance.memory = self.mem
4218       result.append(("mem", self.mem))
4219     if self.vcpus:
4220       instance.vcpus = self.vcpus
4221       result.append(("vcpus",  self.vcpus))
4222     if self.do_ip:
4223       instance.nics[0].ip = self.ip
4224       result.append(("ip", self.ip))
4225     if self.bridge:
4226       instance.nics[0].bridge = self.bridge
4227       result.append(("bridge", self.bridge))
4228     if self.mac:
4229       instance.nics[0].mac = self.mac
4230       result.append(("mac", self.mac))
4231     if self.do_kernel_path:
4232       instance.kernel_path = self.kernel_path
4233       result.append(("kernel_path", self.kernel_path))
4234     if self.do_initrd_path:
4235       instance.initrd_path = self.initrd_path
4236       result.append(("initrd_path", self.initrd_path))
4237     if self.hvm_boot_order:
4238       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4239         instance.hvm_boot_order = None
4240       else:
4241         instance.hvm_boot_order = self.hvm_boot_order
4242       result.append(("hvm_boot_order", self.hvm_boot_order))
4243     if self.hvm_acpi:
4244       instance.hvm_acpi = self.hvm_acpi
4245       result.append(("hvm_acpi", self.hvm_acpi))
4246     if self.hvm_pae:
4247       instance.hvm_pae = self.hvm_pae
4248       result.append(("hvm_pae", self.hvm_pae))
4249     if self.hvm_cdrom_image_path:
4250       instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4251       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4252     if self.vnc_bind_address:
4253       instance.vnc_bind_address = self.vnc_bind_address
4254       result.append(("vnc_bind_address", self.vnc_bind_address))
4255
4256     self.cfg.AddInstance(instance)
4257
4258     return result
4259
4260
4261 class LUQueryExports(NoHooksLU):
4262   """Query the exports list
4263
4264   """
4265   _OP_REQP = []
4266
4267   def CheckPrereq(self):
4268     """Check that the nodelist contains only existing nodes.
4269
4270     """
4271     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4272
4273   def Exec(self, feedback_fn):
4274     """Compute the list of all the exported system images.
4275
4276     Returns:
4277       a dictionary with the structure node->(export-list)
4278       where export-list is a list of the instances exported on
4279       that node.
4280
4281     """
4282     return rpc.call_export_list(self.nodes)
4283
4284
4285 class LUExportInstance(LogicalUnit):
4286   """Export an instance to an image in the cluster.
4287
4288   """
4289   HPATH = "instance-export"
4290   HTYPE = constants.HTYPE_INSTANCE
4291   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4292
4293   def BuildHooksEnv(self):
4294     """Build hooks env.
4295
4296     This will run on the master, primary node and target node.
4297
4298     """
4299     env = {
4300       "EXPORT_NODE": self.op.target_node,
4301       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4302       }
4303     env.update(_BuildInstanceHookEnvByObject(self.instance))
4304     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4305           self.op.target_node]
4306     return env, nl, nl
4307
4308   def CheckPrereq(self):
4309     """Check prerequisites.
4310
4311     This checks that the instance and node names are valid.
4312
4313     """
4314     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4315     self.instance = self.cfg.GetInstanceInfo(instance_name)
4316     if self.instance is None:
4317       raise errors.OpPrereqError("Instance '%s' not found" %
4318                                  self.op.instance_name)
4319
4320     # node verification
4321     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4322     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4323
4324     if self.dst_node is None:
4325       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4326                                  self.op.target_node)
4327     self.op.target_node = self.dst_node.name
4328
4329     # instance disk type verification
4330     for disk in self.instance.disks:
4331       if disk.dev_type == constants.LD_FILE:
4332         raise errors.OpPrereqError("Export not supported for instances with"
4333                                    " file-based disks")
4334
4335   def Exec(self, feedback_fn):
4336     """Export an instance to an image in the cluster.
4337
4338     """
4339     instance = self.instance
4340     dst_node = self.dst_node
4341     src_node = instance.primary_node
4342     if self.op.shutdown:
4343       # shutdown the instance, but not the disks
4344       if not rpc.call_instance_shutdown(src_node, instance):
4345          raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4346                                   (instance.name, src_node))
4347
4348     vgname = self.cfg.GetVGName()
4349
4350     snap_disks = []
4351
4352     try:
4353       for disk in instance.disks:
4354         if disk.iv_name == "sda":
4355           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4356           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4357
4358           if not new_dev_name:
4359             logger.Error("could not snapshot block device %s on node %s" %
4360                          (disk.logical_id[1], src_node))
4361           else:
4362             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4363                                       logical_id=(vgname, new_dev_name),
4364                                       physical_id=(vgname, new_dev_name),
4365                                       iv_name=disk.iv_name)
4366             snap_disks.append(new_dev)
4367
4368     finally:
4369       if self.op.shutdown and instance.status == "up":
4370         if not rpc.call_instance_start(src_node, instance, None):
4371           _ShutdownInstanceDisks(instance, self.cfg)
4372           raise errors.OpExecError("Could not start instance")
4373
4374     # TODO: check for size
4375
4376     for dev in snap_disks:
4377       if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4378         logger.Error("could not export block device %s from node %s to node %s"
4379                      % (dev.logical_id[1], src_node, dst_node.name))
4380       if not rpc.call_blockdev_remove(src_node, dev):
4381         logger.Error("could not remove snapshot block device %s from node %s" %
4382                      (dev.logical_id[1], src_node))
4383
4384     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4385       logger.Error("could not finalize export for instance %s on node %s" %
4386                    (instance.name, dst_node.name))
4387
4388     nodelist = self.cfg.GetNodeList()
4389     nodelist.remove(dst_node.name)
4390
4391     # on one-node clusters nodelist will be empty after the removal
4392     # if we proceed the backup would be removed because OpQueryExports
4393     # substitutes an empty list with the full cluster node list.
4394     if nodelist:
4395       op = opcodes.OpQueryExports(nodes=nodelist)
4396       exportlist = self.proc.ChainOpCode(op)
4397       for node in exportlist:
4398         if instance.name in exportlist[node]:
4399           if not rpc.call_export_remove(node, instance.name):
4400             logger.Error("could not remove older export for instance %s"
4401                          " on node %s" % (instance.name, node))
4402
4403
4404 class LURemoveExport(NoHooksLU):
4405   """Remove exports related to the named instance.
4406
4407   """
4408   _OP_REQP = ["instance_name"]
4409
4410   def CheckPrereq(self):
4411     """Check prerequisites.
4412     """
4413     pass
4414
4415   def Exec(self, feedback_fn):
4416     """Remove any export.
4417
4418     """
4419     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4420     # If the instance was not found we'll try with the name that was passed in.
4421     # This will only work if it was an FQDN, though.
4422     fqdn_warn = False
4423     if not instance_name:
4424       fqdn_warn = True
4425       instance_name = self.op.instance_name
4426
4427     op = opcodes.OpQueryExports(nodes=[])
4428     exportlist = self.proc.ChainOpCode(op)
4429     found = False
4430     for node in exportlist:
4431       if instance_name in exportlist[node]:
4432         found = True
4433         if not rpc.call_export_remove(node, instance_name):
4434           logger.Error("could not remove export for instance %s"
4435                        " on node %s" % (instance_name, node))
4436
4437     if fqdn_warn and not found:
4438       feedback_fn("Export not found. If trying to remove an export belonging"
4439                   " to a deleted instance please use its Fully Qualified"
4440                   " Domain Name.")
4441
4442
4443 class TagsLU(NoHooksLU):
4444   """Generic tags LU.
4445
4446   This is an abstract class which is the parent of all the other tags LUs.
4447
4448   """
4449   def CheckPrereq(self):
4450     """Check prerequisites.
4451
4452     """
4453     if self.op.kind == constants.TAG_CLUSTER:
4454       self.target = self.cfg.GetClusterInfo()
4455     elif self.op.kind == constants.TAG_NODE:
4456       name = self.cfg.ExpandNodeName(self.op.name)
4457       if name is None:
4458         raise errors.OpPrereqError("Invalid node name (%s)" %
4459                                    (self.op.name,))
4460       self.op.name = name
4461       self.target = self.cfg.GetNodeInfo(name)
4462     elif self.op.kind == constants.TAG_INSTANCE:
4463       name = self.cfg.ExpandInstanceName(self.op.name)
4464       if name is None:
4465         raise errors.OpPrereqError("Invalid instance name (%s)" %
4466                                    (self.op.name,))
4467       self.op.name = name
4468       self.target = self.cfg.GetInstanceInfo(name)
4469     else:
4470       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4471                                  str(self.op.kind))
4472
4473
4474 class LUGetTags(TagsLU):
4475   """Returns the tags of a given object.
4476
4477   """
4478   _OP_REQP = ["kind", "name"]
4479
4480   def Exec(self, feedback_fn):
4481     """Returns the tag list.
4482
4483     """
4484     return self.target.GetTags()
4485
4486
4487 class LUSearchTags(NoHooksLU):
4488   """Searches the tags for a given pattern.
4489
4490   """
4491   _OP_REQP = ["pattern"]
4492
4493   def CheckPrereq(self):
4494     """Check prerequisites.
4495
4496     This checks the pattern passed for validity by compiling it.
4497
4498     """
4499     try:
4500       self.re = re.compile(self.op.pattern)
4501     except re.error, err:
4502       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4503                                  (self.op.pattern, err))
4504
4505   def Exec(self, feedback_fn):
4506     """Returns the tag list.
4507
4508     """
4509     cfg = self.cfg
4510     tgts = [("/cluster", cfg.GetClusterInfo())]
4511     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4512     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4513     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4514     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4515     results = []
4516     for path, target in tgts:
4517       for tag in target.GetTags():
4518         if self.re.search(tag):
4519           results.append((path, tag))
4520     return results
4521
4522
4523 class LUAddTags(TagsLU):
4524   """Sets a tag on a given object.
4525
4526   """
4527   _OP_REQP = ["kind", "name", "tags"]
4528
4529   def CheckPrereq(self):
4530     """Check prerequisites.
4531
4532     This checks the type and length of the tag name and value.
4533
4534     """
4535     TagsLU.CheckPrereq(self)
4536     for tag in self.op.tags:
4537       objects.TaggableObject.ValidateTag(tag)
4538
4539   def Exec(self, feedback_fn):
4540     """Sets the tag.
4541
4542     """
4543     try:
4544       for tag in self.op.tags:
4545         self.target.AddTag(tag)
4546     except errors.TagError, err:
4547       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4548     try:
4549       self.cfg.Update(self.target)
4550     except errors.ConfigurationError:
4551       raise errors.OpRetryError("There has been a modification to the"
4552                                 " config file and the operation has been"
4553                                 " aborted. Please retry.")
4554
4555
4556 class LUDelTags(TagsLU):
4557   """Delete a list of tags from a given object.
4558
4559   """
4560   _OP_REQP = ["kind", "name", "tags"]
4561
4562   def CheckPrereq(self):
4563     """Check prerequisites.
4564
4565     This checks that we have the given tag.
4566
4567     """
4568     TagsLU.CheckPrereq(self)
4569     for tag in self.op.tags:
4570       objects.TaggableObject.ValidateTag(tag)
4571     del_tags = frozenset(self.op.tags)
4572     cur_tags = self.target.GetTags()
4573     if not del_tags <= cur_tags:
4574       diff_tags = del_tags - cur_tags
4575       diff_names = ["'%s'" % tag for tag in diff_tags]
4576       diff_names.sort()
4577       raise errors.OpPrereqError("Tag(s) %s not found" %
4578                                  (",".join(diff_names)))
4579
4580   def Exec(self, feedback_fn):
4581     """Remove the tag from the object.
4582
4583     """
4584     for tag in self.op.tags:
4585       self.target.RemoveTag(tag)
4586     try:
4587       self.cfg.Update(self.target)
4588     except errors.ConfigurationError:
4589       raise errors.OpRetryError("There has been a modification to the"
4590                                 " config file and the operation has been"
4591                                 " aborted. Please retry.")
4592
4593 class LUTestDelay(NoHooksLU):
4594   """Sleep for a specified amount of time.
4595
4596   This LU sleeps on the master and/or nodes for a specified amount of
4597   time.
4598
4599   """
4600   _OP_REQP = ["duration", "on_master", "on_nodes"]
4601   REQ_BGL = False
4602
4603   def ExpandNames(self):
4604     """Expand names and set required locks.
4605
4606     This expands the node list, if any.
4607
4608     """
4609     self.needed_locks = {}
4610     if self.op.on_nodes:
4611       # _GetWantedNodes can be used here, but is not always appropriate to use
4612       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4613       # more information.
4614       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4615       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4616
4617   def CheckPrereq(self):
4618     """Check prerequisites.
4619
4620     """
4621
4622   def Exec(self, feedback_fn):
4623     """Do the actual sleep.
4624
4625     """
4626     if self.op.on_master:
4627       if not utils.TestDelay(self.op.duration):
4628         raise errors.OpExecError("Error during master delay test")
4629     if self.op.on_nodes:
4630       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4631       if not result:
4632         raise errors.OpExecError("Complete failure from rpc call")
4633       for node, node_result in result.items():
4634         if not node_result:
4635           raise errors.OpExecError("Failure during rpc call to node %s,"
4636                                    " result: %s" % (node, node_result))
4637
4638
4639 class IAllocator(object):
4640   """IAllocator framework.
4641
4642   An IAllocator instance has three sets of attributes:
4643     - cfg/sstore that are needed to query the cluster
4644     - input data (all members of the _KEYS class attribute are required)
4645     - four buffer attributes (in|out_data|text), that represent the
4646       input (to the external script) in text and data structure format,
4647       and the output from it, again in two formats
4648     - the result variables from the script (success, info, nodes) for
4649       easy usage
4650
4651   """
4652   _ALLO_KEYS = [
4653     "mem_size", "disks", "disk_template",
4654     "os", "tags", "nics", "vcpus",
4655     ]
4656   _RELO_KEYS = [
4657     "relocate_from",
4658     ]
4659
4660   def __init__(self, cfg, sstore, mode, name, **kwargs):
4661     self.cfg = cfg
4662     self.sstore = sstore
4663     # init buffer variables
4664     self.in_text = self.out_text = self.in_data = self.out_data = None
4665     # init all input fields so that pylint is happy
4666     self.mode = mode
4667     self.name = name
4668     self.mem_size = self.disks = self.disk_template = None
4669     self.os = self.tags = self.nics = self.vcpus = None
4670     self.relocate_from = None
4671     # computed fields
4672     self.required_nodes = None
4673     # init result fields
4674     self.success = self.info = self.nodes = None
4675     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4676       keyset = self._ALLO_KEYS
4677     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4678       keyset = self._RELO_KEYS
4679     else:
4680       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4681                                    " IAllocator" % self.mode)
4682     for key in kwargs:
4683       if key not in keyset:
4684         raise errors.ProgrammerError("Invalid input parameter '%s' to"
4685                                      " IAllocator" % key)
4686       setattr(self, key, kwargs[key])
4687     for key in keyset:
4688       if key not in kwargs:
4689         raise errors.ProgrammerError("Missing input parameter '%s' to"
4690                                      " IAllocator" % key)
4691     self._BuildInputData()
4692
4693   def _ComputeClusterData(self):
4694     """Compute the generic allocator input data.
4695
4696     This is the data that is independent of the actual operation.
4697
4698     """
4699     cfg = self.cfg
4700     # cluster data
4701     data = {
4702       "version": 1,
4703       "cluster_name": self.sstore.GetClusterName(),
4704       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4705       "hypervisor_type": self.sstore.GetHypervisorType(),
4706       # we don't have job IDs
4707       }
4708
4709     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4710
4711     # node data
4712     node_results = {}
4713     node_list = cfg.GetNodeList()
4714     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4715     for nname in node_list:
4716       ninfo = cfg.GetNodeInfo(nname)
4717       if nname not in node_data or not isinstance(node_data[nname], dict):
4718         raise errors.OpExecError("Can't get data for node %s" % nname)
4719       remote_info = node_data[nname]
4720       for attr in ['memory_total', 'memory_free', 'memory_dom0',
4721                    'vg_size', 'vg_free', 'cpu_total']:
4722         if attr not in remote_info:
4723           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4724                                    (nname, attr))
4725         try:
4726           remote_info[attr] = int(remote_info[attr])
4727         except ValueError, err:
4728           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4729                                    " %s" % (nname, attr, str(err)))
4730       # compute memory used by primary instances
4731       i_p_mem = i_p_up_mem = 0
4732       for iinfo in i_list:
4733         if iinfo.primary_node == nname:
4734           i_p_mem += iinfo.memory
4735           if iinfo.status == "up":
4736             i_p_up_mem += iinfo.memory
4737
4738       # compute memory used by instances
4739       pnr = {
4740         "tags": list(ninfo.GetTags()),
4741         "total_memory": remote_info['memory_total'],
4742         "reserved_memory": remote_info['memory_dom0'],
4743         "free_memory": remote_info['memory_free'],
4744         "i_pri_memory": i_p_mem,
4745         "i_pri_up_memory": i_p_up_mem,
4746         "total_disk": remote_info['vg_size'],
4747         "free_disk": remote_info['vg_free'],
4748         "primary_ip": ninfo.primary_ip,
4749         "secondary_ip": ninfo.secondary_ip,
4750         "total_cpus": remote_info['cpu_total'],
4751         }
4752       node_results[nname] = pnr
4753     data["nodes"] = node_results
4754
4755     # instance data
4756     instance_data = {}
4757     for iinfo in i_list:
4758       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4759                   for n in iinfo.nics]
4760       pir = {
4761         "tags": list(iinfo.GetTags()),
4762         "should_run": iinfo.status == "up",
4763         "vcpus": iinfo.vcpus,
4764         "memory": iinfo.memory,
4765         "os": iinfo.os,
4766         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4767         "nics": nic_data,
4768         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4769         "disk_template": iinfo.disk_template,
4770         }
4771       instance_data[iinfo.name] = pir
4772
4773     data["instances"] = instance_data
4774
4775     self.in_data = data
4776
4777   def _AddNewInstance(self):
4778     """Add new instance data to allocator structure.
4779
4780     This in combination with _AllocatorGetClusterData will create the
4781     correct structure needed as input for the allocator.
4782
4783     The checks for the completeness of the opcode must have already been
4784     done.
4785
4786     """
4787     data = self.in_data
4788     if len(self.disks) != 2:
4789       raise errors.OpExecError("Only two-disk configurations supported")
4790
4791     disk_space = _ComputeDiskSize(self.disk_template,
4792                                   self.disks[0]["size"], self.disks[1]["size"])
4793
4794     if self.disk_template in constants.DTS_NET_MIRROR:
4795       self.required_nodes = 2
4796     else:
4797       self.required_nodes = 1
4798     request = {
4799       "type": "allocate",
4800       "name": self.name,
4801       "disk_template": self.disk_template,
4802       "tags": self.tags,
4803       "os": self.os,
4804       "vcpus": self.vcpus,
4805       "memory": self.mem_size,
4806       "disks": self.disks,
4807       "disk_space_total": disk_space,
4808       "nics": self.nics,
4809       "required_nodes": self.required_nodes,
4810       }
4811     data["request"] = request
4812
4813   def _AddRelocateInstance(self):
4814     """Add relocate instance data to allocator structure.
4815
4816     This in combination with _IAllocatorGetClusterData will create the
4817     correct structure needed as input for the allocator.
4818
4819     The checks for the completeness of the opcode must have already been
4820     done.
4821
4822     """
4823     instance = self.cfg.GetInstanceInfo(self.name)
4824     if instance is None:
4825       raise errors.ProgrammerError("Unknown instance '%s' passed to"
4826                                    " IAllocator" % self.name)
4827
4828     if instance.disk_template not in constants.DTS_NET_MIRROR:
4829       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4830
4831     if len(instance.secondary_nodes) != 1:
4832       raise errors.OpPrereqError("Instance has not exactly one secondary node")
4833
4834     self.required_nodes = 1
4835
4836     disk_space = _ComputeDiskSize(instance.disk_template,
4837                                   instance.disks[0].size,
4838                                   instance.disks[1].size)
4839
4840     request = {
4841       "type": "relocate",
4842       "name": self.name,
4843       "disk_space_total": disk_space,
4844       "required_nodes": self.required_nodes,
4845       "relocate_from": self.relocate_from,
4846       }
4847     self.in_data["request"] = request
4848
4849   def _BuildInputData(self):
4850     """Build input data structures.
4851
4852     """
4853     self._ComputeClusterData()
4854
4855     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4856       self._AddNewInstance()
4857     else:
4858       self._AddRelocateInstance()
4859
4860     self.in_text = serializer.Dump(self.in_data)
4861
4862   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4863     """Run an instance allocator and return the results.
4864
4865     """
4866     data = self.in_text
4867
4868     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4869
4870     if not isinstance(result, tuple) or len(result) != 4:
4871       raise errors.OpExecError("Invalid result from master iallocator runner")
4872
4873     rcode, stdout, stderr, fail = result
4874
4875     if rcode == constants.IARUN_NOTFOUND:
4876       raise errors.OpExecError("Can't find allocator '%s'" % name)
4877     elif rcode == constants.IARUN_FAILURE:
4878         raise errors.OpExecError("Instance allocator call failed: %s,"
4879                                  " output: %s" %
4880                                  (fail, stdout+stderr))
4881     self.out_text = stdout
4882     if validate:
4883       self._ValidateResult()
4884
4885   def _ValidateResult(self):
4886     """Process the allocator results.
4887
4888     This will process and if successful save the result in
4889     self.out_data and the other parameters.
4890
4891     """
4892     try:
4893       rdict = serializer.Load(self.out_text)
4894     except Exception, err:
4895       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4896
4897     if not isinstance(rdict, dict):
4898       raise errors.OpExecError("Can't parse iallocator results: not a dict")
4899
4900     for key in "success", "info", "nodes":
4901       if key not in rdict:
4902         raise errors.OpExecError("Can't parse iallocator results:"
4903                                  " missing key '%s'" % key)
4904       setattr(self, key, rdict[key])
4905
4906     if not isinstance(rdict["nodes"], list):
4907       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4908                                " is not a list")
4909     self.out_data = rdict
4910
4911
4912 class LUTestAllocator(NoHooksLU):
4913   """Run allocator tests.
4914
4915   This LU runs the allocator tests
4916
4917   """
4918   _OP_REQP = ["direction", "mode", "name"]
4919
4920   def CheckPrereq(self):
4921     """Check prerequisites.
4922
4923     This checks the opcode parameters depending on the director and mode test.
4924
4925     """
4926     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4927       for attr in ["name", "mem_size", "disks", "disk_template",
4928                    "os", "tags", "nics", "vcpus"]:
4929         if not hasattr(self.op, attr):
4930           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4931                                      attr)
4932       iname = self.cfg.ExpandInstanceName(self.op.name)
4933       if iname is not None:
4934         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4935                                    iname)
4936       if not isinstance(self.op.nics, list):
4937         raise errors.OpPrereqError("Invalid parameter 'nics'")
4938       for row in self.op.nics:
4939         if (not isinstance(row, dict) or
4940             "mac" not in row or
4941             "ip" not in row or
4942             "bridge" not in row):
4943           raise errors.OpPrereqError("Invalid contents of the"
4944                                      " 'nics' parameter")
4945       if not isinstance(self.op.disks, list):
4946         raise errors.OpPrereqError("Invalid parameter 'disks'")
4947       if len(self.op.disks) != 2:
4948         raise errors.OpPrereqError("Only two-disk configurations supported")
4949       for row in self.op.disks:
4950         if (not isinstance(row, dict) or
4951             "size" not in row or
4952             not isinstance(row["size"], int) or
4953             "mode" not in row or
4954             row["mode"] not in ['r', 'w']):
4955           raise errors.OpPrereqError("Invalid contents of the"
4956                                      " 'disks' parameter")
4957     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4958       if not hasattr(self.op, "name"):
4959         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4960       fname = self.cfg.ExpandInstanceName(self.op.name)
4961       if fname is None:
4962         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4963                                    self.op.name)
4964       self.op.name = fname
4965       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4966     else:
4967       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4968                                  self.op.mode)
4969
4970     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4971       if not hasattr(self.op, "allocator") or self.op.allocator is None:
4972         raise errors.OpPrereqError("Missing allocator name")
4973     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4974       raise errors.OpPrereqError("Wrong allocator test '%s'" %
4975                                  self.op.direction)
4976
4977   def Exec(self, feedback_fn):
4978     """Run the allocator test.
4979
4980     """
4981     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4982       ial = IAllocator(self.cfg, self.sstore,
4983                        mode=self.op.mode,
4984                        name=self.op.name,
4985                        mem_size=self.op.mem_size,
4986                        disks=self.op.disks,
4987                        disk_template=self.op.disk_template,
4988                        os=self.op.os,
4989                        tags=self.op.tags,
4990                        nics=self.op.nics,
4991                        vcpus=self.op.vcpus,
4992                        )
4993     else:
4994       ial = IAllocator(self.cfg, self.sstore,
4995                        mode=self.op.mode,
4996                        name=self.op.name,
4997                        relocate_from=list(self.relocate_from),
4998                        )
4999
5000     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5001       result = ial.in_text
5002     else:
5003       ial.Run(self.op.allocator, validate=False)
5004       result = ial.out_text
5005     return result