377670385f99d05d4b83ba974e066e3b5ed8af80
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement CheckPrereq which also fills in the opcode instance
53       with all the fields (even if as None)
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements (REQ_CLUSTER,
58       REQ_MASTER); note that all commands require root permissions
59
60   """
61   HPATH = None
62   HTYPE = None
63   _OP_REQP = []
64   REQ_CLUSTER = True
65   REQ_MASTER = True
66
67   def __init__(self, processor, op, cfg, sstore):
68     """Constructor for LogicalUnit.
69
70     This needs to be overriden in derived classes in order to check op
71     validity.
72
73     """
74     self.proc = processor
75     self.op = op
76     self.cfg = cfg
77     self.sstore = sstore
78     for attr_name in self._OP_REQP:
79       attr_val = getattr(op, attr_name, None)
80       if attr_val is None:
81         raise errors.OpPrereqError("Required parameter '%s' missing" %
82                                    attr_name)
83     if self.REQ_CLUSTER:
84       if not cfg.IsCluster():
85         raise errors.OpPrereqError("Cluster not initialized yet,"
86                                    " use 'gnt-cluster init' first.")
87       if self.REQ_MASTER:
88         master = sstore.GetMasterNode()
89         if master != utils.HostInfo().name:
90           raise errors.OpPrereqError("Commands must be run on the master"
91                                      " node %s" % master)
92
93   def CheckPrereq(self):
94     """Check prerequisites for this LU.
95
96     This method should check that the prerequisites for the execution
97     of this LU are fulfilled. It can do internode communication, but
98     it should be idempotent - no cluster or system changes are
99     allowed.
100
101     The method should raise errors.OpPrereqError in case something is
102     not fulfilled. Its return value is ignored.
103
104     This method should also update all the parameters of the opcode to
105     their canonical form; e.g. a short node name must be fully
106     expanded after this method has successfully completed (so that
107     hooks, logging, etc. work correctly).
108
109     """
110     raise NotImplementedError
111
112   def Exec(self, feedback_fn):
113     """Execute the LU.
114
115     This method should implement the actual work. It should raise
116     errors.OpExecError for failures that are somewhat dealt with in
117     code, or expected.
118
119     """
120     raise NotImplementedError
121
122   def BuildHooksEnv(self):
123     """Build hooks environment for this LU.
124
125     This method should return a three-node tuple consisting of: a dict
126     containing the environment that will be used for running the
127     specific hook for this LU, a list of node names on which the hook
128     should run before the execution, and a list of node names on which
129     the hook should run after the execution.
130
131     The keys of the dict must not have 'GANETI_' prefixed as this will
132     be handled in the hooks runner. Also note additional keys will be
133     added by the hooks runner. If the LU doesn't define any
134     environment, an empty dict (and not None) should be returned.
135
136     No nodes should be returned as an empty list (and not None).
137
138     Note that if the HPATH for a LU class is None, this function will
139     not be called.
140
141     """
142     raise NotImplementedError
143
144
145 class NoHooksLU(LogicalUnit):
146   """Simple LU which runs no hooks.
147
148   This LU is intended as a parent for other LogicalUnits which will
149   run no hooks, in order to reduce duplicate code.
150
151   """
152   HPATH = None
153   HTYPE = None
154
155   def BuildHooksEnv(self):
156     """Build hooks env.
157
158     This is a no-op, since we don't run hooks.
159
160     """
161     return {}, [], []
162
163
164 def _AddHostToEtcHosts(hostname):
165   """Wrapper around utils.SetEtcHostsEntry.
166
167   """
168   hi = utils.HostInfo(name=hostname)
169   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
170
171
172 def _RemoveHostFromEtcHosts(hostname):
173   """Wrapper around utils.RemoveEtcHostsEntry.
174
175   """
176   hi = utils.HostInfo(name=hostname)
177   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
178   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
179
180
181 def _GetWantedNodes(lu, nodes):
182   """Returns list of checked and expanded node names.
183
184   Args:
185     nodes: List of nodes (strings) or None for all
186
187   """
188   if not isinstance(nodes, list):
189     raise errors.OpPrereqError("Invalid argument type 'nodes'")
190
191   if nodes:
192     wanted = []
193
194     for name in nodes:
195       node = lu.cfg.ExpandNodeName(name)
196       if node is None:
197         raise errors.OpPrereqError("No such node name '%s'" % name)
198       wanted.append(node)
199
200   else:
201     wanted = lu.cfg.GetNodeList()
202   return utils.NiceSort(wanted)
203
204
205 def _GetWantedInstances(lu, instances):
206   """Returns list of checked and expanded instance names.
207
208   Args:
209     instances: List of instances (strings) or None for all
210
211   """
212   if not isinstance(instances, list):
213     raise errors.OpPrereqError("Invalid argument type 'instances'")
214
215   if instances:
216     wanted = []
217
218     for name in instances:
219       instance = lu.cfg.ExpandInstanceName(name)
220       if instance is None:
221         raise errors.OpPrereqError("No such instance name '%s'" % name)
222       wanted.append(instance)
223
224   else:
225     wanted = lu.cfg.GetInstanceList()
226   return utils.NiceSort(wanted)
227
228
229 def _CheckOutputFields(static, dynamic, selected):
230   """Checks whether all selected fields are valid.
231
232   Args:
233     static: Static fields
234     dynamic: Dynamic fields
235
236   """
237   static_fields = frozenset(static)
238   dynamic_fields = frozenset(dynamic)
239
240   all_fields = static_fields | dynamic_fields
241
242   if not all_fields.issuperset(selected):
243     raise errors.OpPrereqError("Unknown output fields selected: %s"
244                                % ",".join(frozenset(selected).
245                                           difference(all_fields)))
246
247
248 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
249                           memory, vcpus, nics):
250   """Builds instance related env variables for hooks from single variables.
251
252   Args:
253     secondary_nodes: List of secondary nodes as strings
254   """
255   env = {
256     "OP_TARGET": name,
257     "INSTANCE_NAME": name,
258     "INSTANCE_PRIMARY": primary_node,
259     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
260     "INSTANCE_OS_TYPE": os_type,
261     "INSTANCE_STATUS": status,
262     "INSTANCE_MEMORY": memory,
263     "INSTANCE_VCPUS": vcpus,
264   }
265
266   if nics:
267     nic_count = len(nics)
268     for idx, (ip, bridge, mac) in enumerate(nics):
269       if ip is None:
270         ip = ""
271       env["INSTANCE_NIC%d_IP" % idx] = ip
272       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
273       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
274   else:
275     nic_count = 0
276
277   env["INSTANCE_NIC_COUNT"] = nic_count
278
279   return env
280
281
282 def _BuildInstanceHookEnvByObject(instance, override=None):
283   """Builds instance related env variables for hooks from an object.
284
285   Args:
286     instance: objects.Instance object of instance
287     override: dict of values to override
288   """
289   args = {
290     'name': instance.name,
291     'primary_node': instance.primary_node,
292     'secondary_nodes': instance.secondary_nodes,
293     'os_type': instance.os,
294     'status': instance.os,
295     'memory': instance.memory,
296     'vcpus': instance.vcpus,
297     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
298   }
299   if override:
300     args.update(override)
301   return _BuildInstanceHookEnv(**args)
302
303
304 def _UpdateKnownHosts(fullnode, ip, pubkey):
305   """Ensure a node has a correct known_hosts entry.
306
307   Args:
308     fullnode - Fully qualified domain name of host. (str)
309     ip       - IPv4 address of host (str)
310     pubkey   - the public key of the cluster
311
312   """
313   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
314     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
315   else:
316     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
317
318   inthere = False
319
320   save_lines = []
321   add_lines = []
322   removed = False
323
324   for rawline in f:
325     logger.Debug('read %s' % (repr(rawline),))
326
327     parts = rawline.rstrip('\r\n').split()
328
329     # Ignore unwanted lines
330     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
331       fields = parts[0].split(',')
332       key = parts[2]
333
334       haveall = True
335       havesome = False
336       for spec in [ ip, fullnode ]:
337         if spec not in fields:
338           haveall = False
339         if spec in fields:
340           havesome = True
341
342       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
343       if haveall and key == pubkey:
344         inthere = True
345         save_lines.append(rawline)
346         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
347         continue
348
349       if havesome and (not haveall or key != pubkey):
350         removed = True
351         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
352         continue
353
354     save_lines.append(rawline)
355
356   if not inthere:
357     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
358     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
359
360   if removed:
361     save_lines = save_lines + add_lines
362
363     # Write a new file and replace old.
364     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
365                                    constants.DATA_DIR)
366     newfile = os.fdopen(fd, 'w')
367     try:
368       newfile.write(''.join(save_lines))
369     finally:
370       newfile.close()
371     logger.Debug("Wrote new known_hosts.")
372     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
373
374   elif add_lines:
375     # Simply appending a new line will do the trick.
376     f.seek(0, 2)
377     for add in add_lines:
378       f.write(add)
379
380   f.close()
381
382
383 def _HasValidVG(vglist, vgname):
384   """Checks if the volume group list is valid.
385
386   A non-None return value means there's an error, and the return value
387   is the error message.
388
389   """
390   vgsize = vglist.get(vgname, None)
391   if vgsize is None:
392     return "volume group '%s' missing" % vgname
393   elif vgsize < 20480:
394     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
395             (vgname, vgsize))
396   return None
397
398
399 def _InitSSHSetup(node):
400   """Setup the SSH configuration for the cluster.
401
402
403   This generates a dsa keypair for root, adds the pub key to the
404   permitted hosts and adds the hostkey to its own known hosts.
405
406   Args:
407     node: the name of this host as a fqdn
408
409   """
410   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
411
412   for name in priv_key, pub_key:
413     if os.path.exists(name):
414       utils.CreateBackup(name)
415     utils.RemoveFile(name)
416
417   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
418                          "-f", priv_key,
419                          "-q", "-N", ""])
420   if result.failed:
421     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
422                              result.output)
423
424   f = open(pub_key, 'r')
425   try:
426     utils.AddAuthorizedKey(auth_keys, f.read(8192))
427   finally:
428     f.close()
429
430
431 def _InitGanetiServerSetup(ss):
432   """Setup the necessary configuration for the initial node daemon.
433
434   This creates the nodepass file containing the shared password for
435   the cluster and also generates the SSL certificate.
436
437   """
438   # Create pseudo random password
439   randpass = sha.new(os.urandom(64)).hexdigest()
440   # and write it into sstore
441   ss.SetKey(ss.SS_NODED_PASS, randpass)
442
443   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
444                          "-days", str(365*5), "-nodes", "-x509",
445                          "-keyout", constants.SSL_CERT_FILE,
446                          "-out", constants.SSL_CERT_FILE, "-batch"])
447   if result.failed:
448     raise errors.OpExecError("could not generate server ssl cert, command"
449                              " %s had exitcode %s and error message %s" %
450                              (result.cmd, result.exit_code, result.output))
451
452   os.chmod(constants.SSL_CERT_FILE, 0400)
453
454   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
455
456   if result.failed:
457     raise errors.OpExecError("Could not start the node daemon, command %s"
458                              " had exitcode %s and error %s" %
459                              (result.cmd, result.exit_code, result.output))
460
461
462 def _CheckInstanceBridgesExist(instance):
463   """Check that the brigdes needed by an instance exist.
464
465   """
466   # check bridges existance
467   brlist = [nic.bridge for nic in instance.nics]
468   if not rpc.call_bridges_exist(instance.primary_node, brlist):
469     raise errors.OpPrereqError("one or more target bridges %s does not"
470                                " exist on destination node '%s'" %
471                                (brlist, instance.primary_node))
472
473
474 class LUInitCluster(LogicalUnit):
475   """Initialise the cluster.
476
477   """
478   HPATH = "cluster-init"
479   HTYPE = constants.HTYPE_CLUSTER
480   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
481               "def_bridge", "master_netdev"]
482   REQ_CLUSTER = False
483
484   def BuildHooksEnv(self):
485     """Build hooks env.
486
487     Notes: Since we don't require a cluster, we must manually add
488     ourselves in the post-run node list.
489
490     """
491     env = {"OP_TARGET": self.op.cluster_name}
492     return env, [], [self.hostname.name]
493
494   def CheckPrereq(self):
495     """Verify that the passed name is a valid one.
496
497     """
498     if config.ConfigWriter.IsCluster():
499       raise errors.OpPrereqError("Cluster is already initialised")
500
501     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
502       if not os.path.exists(constants.VNC_PASSWORD_FILE):
503         raise errors.OpPrereqError("Please prepare the cluster VNC"
504                                    "password file %s" %
505                                    constants.VNC_PASSWORD_FILE)
506
507     self.hostname = hostname = utils.HostInfo()
508
509     if hostname.ip.startswith("127."):
510       raise errors.OpPrereqError("This host's IP resolves to the private"
511                                  " range (%s). Please fix DNS or %s." %
512                                  (hostname.ip, constants.ETC_HOSTS))
513
514     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
515                          source=constants.LOCALHOST_IP_ADDRESS):
516       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
517                                  " to %s,\nbut this ip address does not"
518                                  " belong to this host."
519                                  " Aborting." % hostname.ip)
520
521     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
522
523     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
524                      timeout=5):
525       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
526
527     secondary_ip = getattr(self.op, "secondary_ip", None)
528     if secondary_ip and not utils.IsValidIP(secondary_ip):
529       raise errors.OpPrereqError("Invalid secondary ip given")
530     if (secondary_ip and
531         secondary_ip != hostname.ip and
532         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
533                            source=constants.LOCALHOST_IP_ADDRESS))):
534       raise errors.OpPrereqError("You gave %s as secondary IP,"
535                                  " but it does not belong to this host." %
536                                  secondary_ip)
537     self.secondary_ip = secondary_ip
538
539     # checks presence of the volume group given
540     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
541
542     if vgstatus:
543       raise errors.OpPrereqError("Error: %s" % vgstatus)
544
545     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
546                     self.op.mac_prefix):
547       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
548                                  self.op.mac_prefix)
549
550     if self.op.hypervisor_type not in constants.HYPER_TYPES:
551       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
552                                  self.op.hypervisor_type)
553
554     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
555     if result.failed:
556       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
557                                  (self.op.master_netdev,
558                                   result.output.strip()))
559
560     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
561             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
562       raise errors.OpPrereqError("Init.d script '%s' missing or not"
563                                  " executable." % constants.NODE_INITD_SCRIPT)
564
565   def Exec(self, feedback_fn):
566     """Initialize the cluster.
567
568     """
569     clustername = self.clustername
570     hostname = self.hostname
571
572     # set up the simple store
573     self.sstore = ss = ssconf.SimpleStore()
574     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
575     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
576     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
577     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
578     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
579
580     # set up the inter-node password and certificate
581     _InitGanetiServerSetup(ss)
582
583     # start the master ip
584     rpc.call_node_start_master(hostname.name)
585
586     # set up ssh config and /etc/hosts
587     f = open(constants.SSH_HOST_RSA_PUB, 'r')
588     try:
589       sshline = f.read()
590     finally:
591       f.close()
592     sshkey = sshline.split(" ")[1]
593
594     _AddHostToEtcHosts(hostname.name)
595
596     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
597
598     _InitSSHSetup(hostname.name)
599
600     # init of cluster config file
601     self.cfg = cfgw = config.ConfigWriter()
602     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
603                     sshkey, self.op.mac_prefix,
604                     self.op.vg_name, self.op.def_bridge)
605
606
607 class LUDestroyCluster(NoHooksLU):
608   """Logical unit for destroying the cluster.
609
610   """
611   _OP_REQP = []
612
613   def CheckPrereq(self):
614     """Check prerequisites.
615
616     This checks whether the cluster is empty.
617
618     Any errors are signalled by raising errors.OpPrereqError.
619
620     """
621     master = self.sstore.GetMasterNode()
622
623     nodelist = self.cfg.GetNodeList()
624     if len(nodelist) != 1 or nodelist[0] != master:
625       raise errors.OpPrereqError("There are still %d node(s) in"
626                                  " this cluster." % (len(nodelist) - 1))
627     instancelist = self.cfg.GetInstanceList()
628     if instancelist:
629       raise errors.OpPrereqError("There are still %d instance(s) in"
630                                  " this cluster." % len(instancelist))
631
632   def Exec(self, feedback_fn):
633     """Destroys the cluster.
634
635     """
636     master = self.sstore.GetMasterNode()
637     if not rpc.call_node_stop_master(master):
638       raise errors.OpExecError("Could not disable the master role")
639     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
640     utils.CreateBackup(priv_key)
641     utils.CreateBackup(pub_key)
642     rpc.call_node_leave_cluster(master)
643
644
645 class LUVerifyCluster(NoHooksLU):
646   """Verifies the cluster status.
647
648   """
649   _OP_REQP = ["skip_checks"]
650
651   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
652                   remote_version, feedback_fn):
653     """Run multiple tests against a node.
654
655     Test list:
656       - compares ganeti version
657       - checks vg existance and size > 20G
658       - checks config file checksum
659       - checks ssh to other nodes
660
661     Args:
662       node: name of the node to check
663       file_list: required list of files
664       local_cksum: dictionary of local files and their checksums
665
666     """
667     # compares ganeti version
668     local_version = constants.PROTOCOL_VERSION
669     if not remote_version:
670       feedback_fn("  - ERROR: connection to %s failed" % (node))
671       return True
672
673     if local_version != remote_version:
674       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
675                       (local_version, node, remote_version))
676       return True
677
678     # checks vg existance and size > 20G
679
680     bad = False
681     if not vglist:
682       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
683                       (node,))
684       bad = True
685     else:
686       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
687       if vgstatus:
688         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
689         bad = True
690
691     # checks config file checksum
692     # checks ssh to any
693
694     if 'filelist' not in node_result:
695       bad = True
696       feedback_fn("  - ERROR: node hasn't returned file checksum data")
697     else:
698       remote_cksum = node_result['filelist']
699       for file_name in file_list:
700         if file_name not in remote_cksum:
701           bad = True
702           feedback_fn("  - ERROR: file '%s' missing" % file_name)
703         elif remote_cksum[file_name] != local_cksum[file_name]:
704           bad = True
705           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
706
707     if 'nodelist' not in node_result:
708       bad = True
709       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
710     else:
711       if node_result['nodelist']:
712         bad = True
713         for node in node_result['nodelist']:
714           feedback_fn("  - ERROR: communication with node '%s': %s" %
715                           (node, node_result['nodelist'][node]))
716     hyp_result = node_result.get('hypervisor', None)
717     if hyp_result is not None:
718       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
719     return bad
720
721   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
722                       node_instance, feedback_fn):
723     """Verify an instance.
724
725     This function checks to see if the required block devices are
726     available on the instance's node.
727
728     """
729     bad = False
730
731     node_current = instanceconfig.primary_node
732
733     node_vol_should = {}
734     instanceconfig.MapLVsByNode(node_vol_should)
735
736     for node in node_vol_should:
737       for volume in node_vol_should[node]:
738         if node not in node_vol_is or volume not in node_vol_is[node]:
739           feedback_fn("  - ERROR: volume %s missing on node %s" %
740                           (volume, node))
741           bad = True
742
743     if not instanceconfig.status == 'down':
744       if (node_current not in node_instance or
745           not instance in node_instance[node_current]):
746         feedback_fn("  - ERROR: instance %s not running on node %s" %
747                         (instance, node_current))
748         bad = True
749
750     for node in node_instance:
751       if (not node == node_current):
752         if instance in node_instance[node]:
753           feedback_fn("  - ERROR: instance %s should not run on node %s" %
754                           (instance, node))
755           bad = True
756
757     return bad
758
759   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
760     """Verify if there are any unknown volumes in the cluster.
761
762     The .os, .swap and backup volumes are ignored. All other volumes are
763     reported as unknown.
764
765     """
766     bad = False
767
768     for node in node_vol_is:
769       for volume in node_vol_is[node]:
770         if node not in node_vol_should or volume not in node_vol_should[node]:
771           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
772                       (volume, node))
773           bad = True
774     return bad
775
776   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
777     """Verify the list of running instances.
778
779     This checks what instances are running but unknown to the cluster.
780
781     """
782     bad = False
783     for node in node_instance:
784       for runninginstance in node_instance[node]:
785         if runninginstance not in instancelist:
786           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
787                           (runninginstance, node))
788           bad = True
789     return bad
790
791   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
792     """Verify N+1 Memory Resilience.
793
794     Check that if one single node dies we can still start all the instances it
795     was primary for.
796
797     """
798     bad = False
799
800     for node, nodeinfo in node_info.iteritems():
801       # This code checks that every node which is now listed as secondary has
802       # enough memory to host all instances it is supposed to should a single
803       # other node in the cluster fail.
804       # FIXME: not ready for failover to an arbitrary node
805       # FIXME: does not support file-backed instances
806       # WARNING: we currently take into account down instances as well as up
807       # ones, considering that even if they're down someone might want to start
808       # them even in the event of a node failure.
809       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
810         needed_mem = 0
811         for instance in instances:
812           needed_mem += instance_cfg[instance].memory
813         if nodeinfo['mfree'] < needed_mem:
814           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
815                       " failovers should node %s fail" % (node, prinode))
816           bad = True
817     return bad
818
819   def CheckPrereq(self):
820     """Check prerequisites.
821
822     Transform the list of checks we're going to skip into a set and check that
823     all its members are valid.
824
825     """
826     self.skip_set = frozenset(self.op.skip_checks)
827     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
828       raise errors.OpPrereqError("Invalid checks to be skipped specified")
829
830   def Exec(self, feedback_fn):
831     """Verify integrity of cluster, performing various test on nodes.
832
833     """
834     bad = False
835     feedback_fn("* Verifying global settings")
836     for msg in self.cfg.VerifyConfig():
837       feedback_fn("  - ERROR: %s" % msg)
838
839     vg_name = self.cfg.GetVGName()
840     nodelist = utils.NiceSort(self.cfg.GetNodeList())
841     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
842     i_non_redundant = [] # Non redundant instances
843     node_volume = {}
844     node_instance = {}
845     node_info = {}
846     instance_cfg = {}
847
848     # FIXME: verify OS list
849     # do local checksums
850     file_names = list(self.sstore.GetFileList())
851     file_names.append(constants.SSL_CERT_FILE)
852     file_names.append(constants.CLUSTER_CONF_FILE)
853     local_checksums = utils.FingerprintFiles(file_names)
854
855     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
856     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
857     all_instanceinfo = rpc.call_instance_list(nodelist)
858     all_vglist = rpc.call_vg_list(nodelist)
859     node_verify_param = {
860       'filelist': file_names,
861       'nodelist': nodelist,
862       'hypervisor': None,
863       }
864     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
865     all_rversion = rpc.call_version(nodelist)
866     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
867
868     for node in nodelist:
869       feedback_fn("* Verifying node %s" % node)
870       result = self._VerifyNode(node, file_names, local_checksums,
871                                 all_vglist[node], all_nvinfo[node],
872                                 all_rversion[node], feedback_fn)
873       bad = bad or result
874
875       # node_volume
876       volumeinfo = all_volumeinfo[node]
877
878       if isinstance(volumeinfo, basestring):
879         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
880                     (node, volumeinfo[-400:].encode('string_escape')))
881         bad = True
882         node_volume[node] = {}
883       elif not isinstance(volumeinfo, dict):
884         feedback_fn("  - ERROR: connection to %s failed" % (node,))
885         bad = True
886         continue
887       else:
888         node_volume[node] = volumeinfo
889
890       # node_instance
891       nodeinstance = all_instanceinfo[node]
892       if type(nodeinstance) != list:
893         feedback_fn("  - ERROR: connection to %s failed" % (node,))
894         bad = True
895         continue
896
897       node_instance[node] = nodeinstance
898
899       # node_info
900       nodeinfo = all_ninfo[node]
901       if not isinstance(nodeinfo, dict):
902         feedback_fn("  - ERROR: connection to %s failed" % (node,))
903         bad = True
904         continue
905
906       try:
907         node_info[node] = {
908           "mfree": int(nodeinfo['memory_free']),
909           "dfree": int(nodeinfo['vg_free']),
910           "pinst": [],
911           "sinst": [],
912           # dictionary holding all instances this node is secondary for,
913           # grouped by their primary node. Each key is a cluster node, and each
914           # value is a list of instances which have the key as primary and the
915           # current node as secondary.  this is handy to calculate N+1 memory
916           # availability if you can only failover from a primary to its
917           # secondary.
918           "sinst-by-pnode": {},
919         }
920       except ValueError:
921         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
922         bad = True
923         continue
924
925     node_vol_should = {}
926
927     for instance in instancelist:
928       feedback_fn("* Verifying instance %s" % instance)
929       inst_config = self.cfg.GetInstanceInfo(instance)
930       result =  self._VerifyInstance(instance, inst_config, node_volume,
931                                      node_instance, feedback_fn)
932       bad = bad or result
933
934       inst_config.MapLVsByNode(node_vol_should)
935
936       instance_cfg[instance] = inst_config
937
938       pnode = inst_config.primary_node
939       if pnode in node_info:
940         node_info[pnode]['pinst'].append(instance)
941       else:
942         feedback_fn("  - ERROR: instance %s, connection to primary node"
943                     " %s failed" % (instance, pnode))
944         bad = True
945
946       # If the instance is non-redundant we cannot survive losing its primary
947       # node, so we are not N+1 compliant. On the other hand we have no disk
948       # templates with more than one secondary so that situation is not well
949       # supported either.
950       # FIXME: does not support file-backed instances
951       if len(inst_config.secondary_nodes) == 0:
952         i_non_redundant.append(instance)
953       elif len(inst_config.secondary_nodes) > 1:
954         feedback_fn("  - WARNING: multiple secondaries for instance %s"
955                     % instance)
956
957       for snode in inst_config.secondary_nodes:
958         if snode in node_info:
959           node_info[snode]['sinst'].append(instance)
960           if pnode not in node_info[snode]['sinst-by-pnode']:
961             node_info[snode]['sinst-by-pnode'][pnode] = []
962           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
963         else:
964           feedback_fn("  - ERROR: instance %s, connection to secondary node"
965                       " %s failed" % (instance, snode))
966
967     feedback_fn("* Verifying orphan volumes")
968     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
969                                        feedback_fn)
970     bad = bad or result
971
972     feedback_fn("* Verifying remaining instances")
973     result = self._VerifyOrphanInstances(instancelist, node_instance,
974                                          feedback_fn)
975     bad = bad or result
976
977     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
978       feedback_fn("* Verifying N+1 Memory redundancy")
979       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
980       bad = bad or result
981
982     feedback_fn("* Other Notes")
983     if i_non_redundant:
984       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
985                   % len(i_non_redundant))
986
987     return int(bad)
988
989
990 class LUVerifyDisks(NoHooksLU):
991   """Verifies the cluster disks status.
992
993   """
994   _OP_REQP = []
995
996   def CheckPrereq(self):
997     """Check prerequisites.
998
999     This has no prerequisites.
1000
1001     """
1002     pass
1003
1004   def Exec(self, feedback_fn):
1005     """Verify integrity of cluster disks.
1006
1007     """
1008     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1009
1010     vg_name = self.cfg.GetVGName()
1011     nodes = utils.NiceSort(self.cfg.GetNodeList())
1012     instances = [self.cfg.GetInstanceInfo(name)
1013                  for name in self.cfg.GetInstanceList()]
1014
1015     nv_dict = {}
1016     for inst in instances:
1017       inst_lvs = {}
1018       if (inst.status != "up" or
1019           inst.disk_template not in constants.DTS_NET_MIRROR):
1020         continue
1021       inst.MapLVsByNode(inst_lvs)
1022       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1023       for node, vol_list in inst_lvs.iteritems():
1024         for vol in vol_list:
1025           nv_dict[(node, vol)] = inst
1026
1027     if not nv_dict:
1028       return result
1029
1030     node_lvs = rpc.call_volume_list(nodes, vg_name)
1031
1032     to_act = set()
1033     for node in nodes:
1034       # node_volume
1035       lvs = node_lvs[node]
1036
1037       if isinstance(lvs, basestring):
1038         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1039         res_nlvm[node] = lvs
1040       elif not isinstance(lvs, dict):
1041         logger.Info("connection to node %s failed or invalid data returned" %
1042                     (node,))
1043         res_nodes.append(node)
1044         continue
1045
1046       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1047         inst = nv_dict.pop((node, lv_name), None)
1048         if (not lv_online and inst is not None
1049             and inst.name not in res_instances):
1050           res_instances.append(inst.name)
1051
1052     # any leftover items in nv_dict are missing LVs, let's arrange the
1053     # data better
1054     for key, inst in nv_dict.iteritems():
1055       if inst.name not in res_missing:
1056         res_missing[inst.name] = []
1057       res_missing[inst.name].append(key)
1058
1059     return result
1060
1061
1062 class LURenameCluster(LogicalUnit):
1063   """Rename the cluster.
1064
1065   """
1066   HPATH = "cluster-rename"
1067   HTYPE = constants.HTYPE_CLUSTER
1068   _OP_REQP = ["name"]
1069
1070   def BuildHooksEnv(self):
1071     """Build hooks env.
1072
1073     """
1074     env = {
1075       "OP_TARGET": self.sstore.GetClusterName(),
1076       "NEW_NAME": self.op.name,
1077       }
1078     mn = self.sstore.GetMasterNode()
1079     return env, [mn], [mn]
1080
1081   def CheckPrereq(self):
1082     """Verify that the passed name is a valid one.
1083
1084     """
1085     hostname = utils.HostInfo(self.op.name)
1086
1087     new_name = hostname.name
1088     self.ip = new_ip = hostname.ip
1089     old_name = self.sstore.GetClusterName()
1090     old_ip = self.sstore.GetMasterIP()
1091     if new_name == old_name and new_ip == old_ip:
1092       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1093                                  " cluster has changed")
1094     if new_ip != old_ip:
1095       result = utils.RunCmd(["fping", "-q", new_ip])
1096       if not result.failed:
1097         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1098                                    " reachable on the network. Aborting." %
1099                                    new_ip)
1100
1101     self.op.name = new_name
1102
1103   def Exec(self, feedback_fn):
1104     """Rename the cluster.
1105
1106     """
1107     clustername = self.op.name
1108     ip = self.ip
1109     ss = self.sstore
1110
1111     # shutdown the master IP
1112     master = ss.GetMasterNode()
1113     if not rpc.call_node_stop_master(master):
1114       raise errors.OpExecError("Could not disable the master role")
1115
1116     try:
1117       # modify the sstore
1118       ss.SetKey(ss.SS_MASTER_IP, ip)
1119       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1120
1121       # Distribute updated ss config to all nodes
1122       myself = self.cfg.GetNodeInfo(master)
1123       dist_nodes = self.cfg.GetNodeList()
1124       if myself.name in dist_nodes:
1125         dist_nodes.remove(myself.name)
1126
1127       logger.Debug("Copying updated ssconf data to all nodes")
1128       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1129         fname = ss.KeyToFilename(keyname)
1130         result = rpc.call_upload_file(dist_nodes, fname)
1131         for to_node in dist_nodes:
1132           if not result[to_node]:
1133             logger.Error("copy of file %s to node %s failed" %
1134                          (fname, to_node))
1135     finally:
1136       if not rpc.call_node_start_master(master):
1137         logger.Error("Could not re-enable the master role on the master,"
1138                      " please restart manually.")
1139
1140
1141 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1142   """Sleep and poll for an instance's disk to sync.
1143
1144   """
1145   if not instance.disks:
1146     return True
1147
1148   if not oneshot:
1149     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1150
1151   node = instance.primary_node
1152
1153   for dev in instance.disks:
1154     cfgw.SetDiskID(dev, node)
1155
1156   retries = 0
1157   while True:
1158     max_time = 0
1159     done = True
1160     cumul_degraded = False
1161     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1162     if not rstats:
1163       proc.LogWarning("Can't get any data from node %s" % node)
1164       retries += 1
1165       if retries >= 10:
1166         raise errors.RemoteError("Can't contact node %s for mirror data,"
1167                                  " aborting." % node)
1168       time.sleep(6)
1169       continue
1170     retries = 0
1171     for i in range(len(rstats)):
1172       mstat = rstats[i]
1173       if mstat is None:
1174         proc.LogWarning("Can't compute data for node %s/%s" %
1175                         (node, instance.disks[i].iv_name))
1176         continue
1177       # we ignore the ldisk parameter
1178       perc_done, est_time, is_degraded, _ = mstat
1179       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1180       if perc_done is not None:
1181         done = False
1182         if est_time is not None:
1183           rem_time = "%d estimated seconds remaining" % est_time
1184           max_time = est_time
1185         else:
1186           rem_time = "no time estimate"
1187         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1188                      (instance.disks[i].iv_name, perc_done, rem_time))
1189     if done or oneshot:
1190       break
1191
1192     if unlock:
1193       utils.Unlock('cmd')
1194     try:
1195       time.sleep(min(60, max_time))
1196     finally:
1197       if unlock:
1198         utils.Lock('cmd')
1199
1200   if done:
1201     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1202   return not cumul_degraded
1203
1204
1205 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1206   """Check that mirrors are not degraded.
1207
1208   The ldisk parameter, if True, will change the test from the
1209   is_degraded attribute (which represents overall non-ok status for
1210   the device(s)) to the ldisk (representing the local storage status).
1211
1212   """
1213   cfgw.SetDiskID(dev, node)
1214   if ldisk:
1215     idx = 6
1216   else:
1217     idx = 5
1218
1219   result = True
1220   if on_primary or dev.AssembleOnSecondary():
1221     rstats = rpc.call_blockdev_find(node, dev)
1222     if not rstats:
1223       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1224       result = False
1225     else:
1226       result = result and (not rstats[idx])
1227   if dev.children:
1228     for child in dev.children:
1229       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1230
1231   return result
1232
1233
1234 class LUDiagnoseOS(NoHooksLU):
1235   """Logical unit for OS diagnose/query.
1236
1237   """
1238   _OP_REQP = ["output_fields", "names"]
1239
1240   def CheckPrereq(self):
1241     """Check prerequisites.
1242
1243     This always succeeds, since this is a pure query LU.
1244
1245     """
1246     if self.op.names:
1247       raise errors.OpPrereqError("Selective OS query not supported")
1248
1249     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1250     _CheckOutputFields(static=[],
1251                        dynamic=self.dynamic_fields,
1252                        selected=self.op.output_fields)
1253
1254   @staticmethod
1255   def _DiagnoseByOS(node_list, rlist):
1256     """Remaps a per-node return list into an a per-os per-node dictionary
1257
1258       Args:
1259         node_list: a list with the names of all nodes
1260         rlist: a map with node names as keys and OS objects as values
1261
1262       Returns:
1263         map: a map with osnames as keys and as value another map, with
1264              nodes as
1265              keys and list of OS objects as values
1266              e.g. {"debian-etch": {"node1": [<object>,...],
1267                                    "node2": [<object>,]}
1268                   }
1269
1270     """
1271     all_os = {}
1272     for node_name, nr in rlist.iteritems():
1273       if not nr:
1274         continue
1275       for os in nr:
1276         if os.name not in all_os:
1277           # build a list of nodes for this os containing empty lists
1278           # for each node in node_list
1279           all_os[os.name] = {}
1280           for nname in node_list:
1281             all_os[os.name][nname] = []
1282         all_os[os.name][node_name].append(os)
1283     return all_os
1284
1285   def Exec(self, feedback_fn):
1286     """Compute the list of OSes.
1287
1288     """
1289     node_list = self.cfg.GetNodeList()
1290     node_data = rpc.call_os_diagnose(node_list)
1291     if node_data == False:
1292       raise errors.OpExecError("Can't gather the list of OSes")
1293     pol = self._DiagnoseByOS(node_list, node_data)
1294     output = []
1295     for os_name, os_data in pol.iteritems():
1296       row = []
1297       for field in self.op.output_fields:
1298         if field == "name":
1299           val = os_name
1300         elif field == "valid":
1301           val = utils.all([osl and osl[0] for osl in os_data.values()])
1302         elif field == "node_status":
1303           val = {}
1304           for node_name, nos_list in os_data.iteritems():
1305             val[node_name] = [(v.status, v.path) for v in nos_list]
1306         else:
1307           raise errors.ParameterError(field)
1308         row.append(val)
1309       output.append(row)
1310
1311     return output
1312
1313
1314 class LURemoveNode(LogicalUnit):
1315   """Logical unit for removing a node.
1316
1317   """
1318   HPATH = "node-remove"
1319   HTYPE = constants.HTYPE_NODE
1320   _OP_REQP = ["node_name"]
1321
1322   def BuildHooksEnv(self):
1323     """Build hooks env.
1324
1325     This doesn't run on the target node in the pre phase as a failed
1326     node would not allows itself to run.
1327
1328     """
1329     env = {
1330       "OP_TARGET": self.op.node_name,
1331       "NODE_NAME": self.op.node_name,
1332       }
1333     all_nodes = self.cfg.GetNodeList()
1334     all_nodes.remove(self.op.node_name)
1335     return env, all_nodes, all_nodes
1336
1337   def CheckPrereq(self):
1338     """Check prerequisites.
1339
1340     This checks:
1341      - the node exists in the configuration
1342      - it does not have primary or secondary instances
1343      - it's not the master
1344
1345     Any errors are signalled by raising errors.OpPrereqError.
1346
1347     """
1348     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1349     if node is None:
1350       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1351
1352     instance_list = self.cfg.GetInstanceList()
1353
1354     masternode = self.sstore.GetMasterNode()
1355     if node.name == masternode:
1356       raise errors.OpPrereqError("Node is the master node,"
1357                                  " you need to failover first.")
1358
1359     for instance_name in instance_list:
1360       instance = self.cfg.GetInstanceInfo(instance_name)
1361       if node.name == instance.primary_node:
1362         raise errors.OpPrereqError("Instance %s still running on the node,"
1363                                    " please remove first." % instance_name)
1364       if node.name in instance.secondary_nodes:
1365         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1366                                    " please remove first." % instance_name)
1367     self.op.node_name = node.name
1368     self.node = node
1369
1370   def Exec(self, feedback_fn):
1371     """Removes the node from the cluster.
1372
1373     """
1374     node = self.node
1375     logger.Info("stopping the node daemon and removing configs from node %s" %
1376                 node.name)
1377
1378     rpc.call_node_leave_cluster(node.name)
1379
1380     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1381
1382     logger.Info("Removing node %s from config" % node.name)
1383
1384     self.cfg.RemoveNode(node.name)
1385
1386     _RemoveHostFromEtcHosts(node.name)
1387
1388
1389 class LUQueryNodes(NoHooksLU):
1390   """Logical unit for querying nodes.
1391
1392   """
1393   _OP_REQP = ["output_fields", "names"]
1394
1395   def CheckPrereq(self):
1396     """Check prerequisites.
1397
1398     This checks that the fields required are valid output fields.
1399
1400     """
1401     self.dynamic_fields = frozenset(["dtotal", "dfree",
1402                                      "mtotal", "mnode", "mfree",
1403                                      "bootid"])
1404
1405     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1406                                "pinst_list", "sinst_list",
1407                                "pip", "sip"],
1408                        dynamic=self.dynamic_fields,
1409                        selected=self.op.output_fields)
1410
1411     self.wanted = _GetWantedNodes(self, self.op.names)
1412
1413   def Exec(self, feedback_fn):
1414     """Computes the list of nodes and their attributes.
1415
1416     """
1417     nodenames = self.wanted
1418     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1419
1420     # begin data gathering
1421
1422     if self.dynamic_fields.intersection(self.op.output_fields):
1423       live_data = {}
1424       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1425       for name in nodenames:
1426         nodeinfo = node_data.get(name, None)
1427         if nodeinfo:
1428           live_data[name] = {
1429             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1430             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1431             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1432             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1433             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1434             "bootid": nodeinfo['bootid'],
1435             }
1436         else:
1437           live_data[name] = {}
1438     else:
1439       live_data = dict.fromkeys(nodenames, {})
1440
1441     node_to_primary = dict([(name, set()) for name in nodenames])
1442     node_to_secondary = dict([(name, set()) for name in nodenames])
1443
1444     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1445                              "sinst_cnt", "sinst_list"))
1446     if inst_fields & frozenset(self.op.output_fields):
1447       instancelist = self.cfg.GetInstanceList()
1448
1449       for instance_name in instancelist:
1450         inst = self.cfg.GetInstanceInfo(instance_name)
1451         if inst.primary_node in node_to_primary:
1452           node_to_primary[inst.primary_node].add(inst.name)
1453         for secnode in inst.secondary_nodes:
1454           if secnode in node_to_secondary:
1455             node_to_secondary[secnode].add(inst.name)
1456
1457     # end data gathering
1458
1459     output = []
1460     for node in nodelist:
1461       node_output = []
1462       for field in self.op.output_fields:
1463         if field == "name":
1464           val = node.name
1465         elif field == "pinst_list":
1466           val = list(node_to_primary[node.name])
1467         elif field == "sinst_list":
1468           val = list(node_to_secondary[node.name])
1469         elif field == "pinst_cnt":
1470           val = len(node_to_primary[node.name])
1471         elif field == "sinst_cnt":
1472           val = len(node_to_secondary[node.name])
1473         elif field == "pip":
1474           val = node.primary_ip
1475         elif field == "sip":
1476           val = node.secondary_ip
1477         elif field in self.dynamic_fields:
1478           val = live_data[node.name].get(field, None)
1479         else:
1480           raise errors.ParameterError(field)
1481         node_output.append(val)
1482       output.append(node_output)
1483
1484     return output
1485
1486
1487 class LUQueryNodeVolumes(NoHooksLU):
1488   """Logical unit for getting volumes on node(s).
1489
1490   """
1491   _OP_REQP = ["nodes", "output_fields"]
1492
1493   def CheckPrereq(self):
1494     """Check prerequisites.
1495
1496     This checks that the fields required are valid output fields.
1497
1498     """
1499     self.nodes = _GetWantedNodes(self, self.op.nodes)
1500
1501     _CheckOutputFields(static=["node"],
1502                        dynamic=["phys", "vg", "name", "size", "instance"],
1503                        selected=self.op.output_fields)
1504
1505
1506   def Exec(self, feedback_fn):
1507     """Computes the list of nodes and their attributes.
1508
1509     """
1510     nodenames = self.nodes
1511     volumes = rpc.call_node_volumes(nodenames)
1512
1513     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1514              in self.cfg.GetInstanceList()]
1515
1516     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1517
1518     output = []
1519     for node in nodenames:
1520       if node not in volumes or not volumes[node]:
1521         continue
1522
1523       node_vols = volumes[node][:]
1524       node_vols.sort(key=lambda vol: vol['dev'])
1525
1526       for vol in node_vols:
1527         node_output = []
1528         for field in self.op.output_fields:
1529           if field == "node":
1530             val = node
1531           elif field == "phys":
1532             val = vol['dev']
1533           elif field == "vg":
1534             val = vol['vg']
1535           elif field == "name":
1536             val = vol['name']
1537           elif field == "size":
1538             val = int(float(vol['size']))
1539           elif field == "instance":
1540             for inst in ilist:
1541               if node not in lv_by_node[inst]:
1542                 continue
1543               if vol['name'] in lv_by_node[inst][node]:
1544                 val = inst.name
1545                 break
1546             else:
1547               val = '-'
1548           else:
1549             raise errors.ParameterError(field)
1550           node_output.append(str(val))
1551
1552         output.append(node_output)
1553
1554     return output
1555
1556
1557 class LUAddNode(LogicalUnit):
1558   """Logical unit for adding node to the cluster.
1559
1560   """
1561   HPATH = "node-add"
1562   HTYPE = constants.HTYPE_NODE
1563   _OP_REQP = ["node_name"]
1564
1565   def BuildHooksEnv(self):
1566     """Build hooks env.
1567
1568     This will run on all nodes before, and on all nodes + the new node after.
1569
1570     """
1571     env = {
1572       "OP_TARGET": self.op.node_name,
1573       "NODE_NAME": self.op.node_name,
1574       "NODE_PIP": self.op.primary_ip,
1575       "NODE_SIP": self.op.secondary_ip,
1576       }
1577     nodes_0 = self.cfg.GetNodeList()
1578     nodes_1 = nodes_0 + [self.op.node_name, ]
1579     return env, nodes_0, nodes_1
1580
1581   def CheckPrereq(self):
1582     """Check prerequisites.
1583
1584     This checks:
1585      - the new node is not already in the config
1586      - it is resolvable
1587      - its parameters (single/dual homed) matches the cluster
1588
1589     Any errors are signalled by raising errors.OpPrereqError.
1590
1591     """
1592     node_name = self.op.node_name
1593     cfg = self.cfg
1594
1595     dns_data = utils.HostInfo(node_name)
1596
1597     node = dns_data.name
1598     primary_ip = self.op.primary_ip = dns_data.ip
1599     secondary_ip = getattr(self.op, "secondary_ip", None)
1600     if secondary_ip is None:
1601       secondary_ip = primary_ip
1602     if not utils.IsValidIP(secondary_ip):
1603       raise errors.OpPrereqError("Invalid secondary IP given")
1604     self.op.secondary_ip = secondary_ip
1605     node_list = cfg.GetNodeList()
1606     if node in node_list:
1607       raise errors.OpPrereqError("Node %s is already in the configuration"
1608                                  % node)
1609
1610     for existing_node_name in node_list:
1611       existing_node = cfg.GetNodeInfo(existing_node_name)
1612       if (existing_node.primary_ip == primary_ip or
1613           existing_node.secondary_ip == primary_ip or
1614           existing_node.primary_ip == secondary_ip or
1615           existing_node.secondary_ip == secondary_ip):
1616         raise errors.OpPrereqError("New node ip address(es) conflict with"
1617                                    " existing node %s" % existing_node.name)
1618
1619     # check that the type of the node (single versus dual homed) is the
1620     # same as for the master
1621     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1622     master_singlehomed = myself.secondary_ip == myself.primary_ip
1623     newbie_singlehomed = secondary_ip == primary_ip
1624     if master_singlehomed != newbie_singlehomed:
1625       if master_singlehomed:
1626         raise errors.OpPrereqError("The master has no private ip but the"
1627                                    " new node has one")
1628       else:
1629         raise errors.OpPrereqError("The master has a private ip but the"
1630                                    " new node doesn't have one")
1631
1632     # checks reachablity
1633     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1634       raise errors.OpPrereqError("Node not reachable by ping")
1635
1636     if not newbie_singlehomed:
1637       # check reachability from my secondary ip to newbie's secondary ip
1638       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1639                            source=myself.secondary_ip):
1640         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1641                                    " based ping to noded port")
1642
1643     self.new_node = objects.Node(name=node,
1644                                  primary_ip=primary_ip,
1645                                  secondary_ip=secondary_ip)
1646
1647     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1648       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1649         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1650                                    constants.VNC_PASSWORD_FILE)
1651
1652   def Exec(self, feedback_fn):
1653     """Adds the new node to the cluster.
1654
1655     """
1656     new_node = self.new_node
1657     node = new_node.name
1658
1659     # set up inter-node password and certificate and restarts the node daemon
1660     gntpass = self.sstore.GetNodeDaemonPassword()
1661     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1662       raise errors.OpExecError("ganeti password corruption detected")
1663     f = open(constants.SSL_CERT_FILE)
1664     try:
1665       gntpem = f.read(8192)
1666     finally:
1667       f.close()
1668     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1669     # so we use this to detect an invalid certificate; as long as the
1670     # cert doesn't contain this, the here-document will be correctly
1671     # parsed by the shell sequence below
1672     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1673       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1674     if not gntpem.endswith("\n"):
1675       raise errors.OpExecError("PEM must end with newline")
1676     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1677
1678     # and then connect with ssh to set password and start ganeti-noded
1679     # note that all the below variables are sanitized at this point,
1680     # either by being constants or by the checks above
1681     ss = self.sstore
1682     mycommand = ("umask 077 && "
1683                  "echo '%s' > '%s' && "
1684                  "cat > '%s' << '!EOF.' && \n"
1685                  "%s!EOF.\n%s restart" %
1686                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1687                   constants.SSL_CERT_FILE, gntpem,
1688                   constants.NODE_INITD_SCRIPT))
1689
1690     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1691     if result.failed:
1692       raise errors.OpExecError("Remote command on node %s, error: %s,"
1693                                " output: %s" %
1694                                (node, result.fail_reason, result.output))
1695
1696     # check connectivity
1697     time.sleep(4)
1698
1699     result = rpc.call_version([node])[node]
1700     if result:
1701       if constants.PROTOCOL_VERSION == result:
1702         logger.Info("communication to node %s fine, sw version %s match" %
1703                     (node, result))
1704       else:
1705         raise errors.OpExecError("Version mismatch master version %s,"
1706                                  " node version %s" %
1707                                  (constants.PROTOCOL_VERSION, result))
1708     else:
1709       raise errors.OpExecError("Cannot get version from the new node")
1710
1711     # setup ssh on node
1712     logger.Info("copy ssh key to node %s" % node)
1713     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1714     keyarray = []
1715     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1716                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1717                 priv_key, pub_key]
1718
1719     for i in keyfiles:
1720       f = open(i, 'r')
1721       try:
1722         keyarray.append(f.read())
1723       finally:
1724         f.close()
1725
1726     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1727                                keyarray[3], keyarray[4], keyarray[5])
1728
1729     if not result:
1730       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1731
1732     # Add node to our /etc/hosts, and add key to known_hosts
1733     _AddHostToEtcHosts(new_node.name)
1734
1735     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1736                       self.cfg.GetHostKey())
1737
1738     if new_node.secondary_ip != new_node.primary_ip:
1739       if not rpc.call_node_tcp_ping(new_node.name,
1740                                     constants.LOCALHOST_IP_ADDRESS,
1741                                     new_node.secondary_ip,
1742                                     constants.DEFAULT_NODED_PORT,
1743                                     10, False):
1744         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1745                                  " you gave (%s). Please fix and re-run this"
1746                                  " command." % new_node.secondary_ip)
1747
1748     success, msg = ssh.VerifyNodeHostname(node)
1749     if not success:
1750       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1751                                " than the one the resolver gives: %s."
1752                                " Please fix and re-run this command." %
1753                                (node, msg))
1754
1755     # Distribute updated /etc/hosts and known_hosts to all nodes,
1756     # including the node just added
1757     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1758     dist_nodes = self.cfg.GetNodeList() + [node]
1759     if myself.name in dist_nodes:
1760       dist_nodes.remove(myself.name)
1761
1762     logger.Debug("Copying hosts and known_hosts to all nodes")
1763     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1764       result = rpc.call_upload_file(dist_nodes, fname)
1765       for to_node in dist_nodes:
1766         if not result[to_node]:
1767           logger.Error("copy of file %s to node %s failed" %
1768                        (fname, to_node))
1769
1770     to_copy = ss.GetFileList()
1771     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1772       to_copy.append(constants.VNC_PASSWORD_FILE)
1773     for fname in to_copy:
1774       if not ssh.CopyFileToNode(node, fname):
1775         logger.Error("could not copy file %s to node %s" % (fname, node))
1776
1777     logger.Info("adding node %s to cluster.conf" % node)
1778     self.cfg.AddNode(new_node)
1779
1780
1781 class LUMasterFailover(LogicalUnit):
1782   """Failover the master node to the current node.
1783
1784   This is a special LU in that it must run on a non-master node.
1785
1786   """
1787   HPATH = "master-failover"
1788   HTYPE = constants.HTYPE_CLUSTER
1789   REQ_MASTER = False
1790   _OP_REQP = []
1791
1792   def BuildHooksEnv(self):
1793     """Build hooks env.
1794
1795     This will run on the new master only in the pre phase, and on all
1796     the nodes in the post phase.
1797
1798     """
1799     env = {
1800       "OP_TARGET": self.new_master,
1801       "NEW_MASTER": self.new_master,
1802       "OLD_MASTER": self.old_master,
1803       }
1804     return env, [self.new_master], self.cfg.GetNodeList()
1805
1806   def CheckPrereq(self):
1807     """Check prerequisites.
1808
1809     This checks that we are not already the master.
1810
1811     """
1812     self.new_master = utils.HostInfo().name
1813     self.old_master = self.sstore.GetMasterNode()
1814
1815     if self.old_master == self.new_master:
1816       raise errors.OpPrereqError("This commands must be run on the node"
1817                                  " where you want the new master to be."
1818                                  " %s is already the master" %
1819                                  self.old_master)
1820
1821   def Exec(self, feedback_fn):
1822     """Failover the master node.
1823
1824     This command, when run on a non-master node, will cause the current
1825     master to cease being master, and the non-master to become new
1826     master.
1827
1828     """
1829     #TODO: do not rely on gethostname returning the FQDN
1830     logger.Info("setting master to %s, old master: %s" %
1831                 (self.new_master, self.old_master))
1832
1833     if not rpc.call_node_stop_master(self.old_master):
1834       logger.Error("could disable the master role on the old master"
1835                    " %s, please disable manually" % self.old_master)
1836
1837     ss = self.sstore
1838     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1839     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1840                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1841       logger.Error("could not distribute the new simple store master file"
1842                    " to the other nodes, please check.")
1843
1844     if not rpc.call_node_start_master(self.new_master):
1845       logger.Error("could not start the master role on the new master"
1846                    " %s, please check" % self.new_master)
1847       feedback_fn("Error in activating the master IP on the new master,"
1848                   " please fix manually.")
1849
1850
1851
1852 class LUQueryClusterInfo(NoHooksLU):
1853   """Query cluster configuration.
1854
1855   """
1856   _OP_REQP = []
1857   REQ_MASTER = False
1858
1859   def CheckPrereq(self):
1860     """No prerequsites needed for this LU.
1861
1862     """
1863     pass
1864
1865   def Exec(self, feedback_fn):
1866     """Return cluster config.
1867
1868     """
1869     result = {
1870       "name": self.sstore.GetClusterName(),
1871       "software_version": constants.RELEASE_VERSION,
1872       "protocol_version": constants.PROTOCOL_VERSION,
1873       "config_version": constants.CONFIG_VERSION,
1874       "os_api_version": constants.OS_API_VERSION,
1875       "export_version": constants.EXPORT_VERSION,
1876       "master": self.sstore.GetMasterNode(),
1877       "architecture": (platform.architecture()[0], platform.machine()),
1878       }
1879
1880     return result
1881
1882
1883 class LUClusterCopyFile(NoHooksLU):
1884   """Copy file to cluster.
1885
1886   """
1887   _OP_REQP = ["nodes", "filename"]
1888
1889   def CheckPrereq(self):
1890     """Check prerequisites.
1891
1892     It should check that the named file exists and that the given list
1893     of nodes is valid.
1894
1895     """
1896     if not os.path.exists(self.op.filename):
1897       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1898
1899     self.nodes = _GetWantedNodes(self, self.op.nodes)
1900
1901   def Exec(self, feedback_fn):
1902     """Copy a file from master to some nodes.
1903
1904     Args:
1905       opts - class with options as members
1906       args - list containing a single element, the file name
1907     Opts used:
1908       nodes - list containing the name of target nodes; if empty, all nodes
1909
1910     """
1911     filename = self.op.filename
1912
1913     myname = utils.HostInfo().name
1914
1915     for node in self.nodes:
1916       if node == myname:
1917         continue
1918       if not ssh.CopyFileToNode(node, filename):
1919         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1920
1921
1922 class LUDumpClusterConfig(NoHooksLU):
1923   """Return a text-representation of the cluster-config.
1924
1925   """
1926   _OP_REQP = []
1927
1928   def CheckPrereq(self):
1929     """No prerequisites.
1930
1931     """
1932     pass
1933
1934   def Exec(self, feedback_fn):
1935     """Dump a representation of the cluster config to the standard output.
1936
1937     """
1938     return self.cfg.DumpConfig()
1939
1940
1941 class LURunClusterCommand(NoHooksLU):
1942   """Run a command on some nodes.
1943
1944   """
1945   _OP_REQP = ["command", "nodes"]
1946
1947   def CheckPrereq(self):
1948     """Check prerequisites.
1949
1950     It checks that the given list of nodes is valid.
1951
1952     """
1953     self.nodes = _GetWantedNodes(self, self.op.nodes)
1954
1955   def Exec(self, feedback_fn):
1956     """Run a command on some nodes.
1957
1958     """
1959     # put the master at the end of the nodes list
1960     master_node = self.sstore.GetMasterNode()
1961     if master_node in self.nodes:
1962       self.nodes.remove(master_node)
1963       self.nodes.append(master_node)
1964
1965     data = []
1966     for node in self.nodes:
1967       result = ssh.SSHCall(node, "root", self.op.command)
1968       data.append((node, result.output, result.exit_code))
1969
1970     return data
1971
1972
1973 class LUActivateInstanceDisks(NoHooksLU):
1974   """Bring up an instance's disks.
1975
1976   """
1977   _OP_REQP = ["instance_name"]
1978
1979   def CheckPrereq(self):
1980     """Check prerequisites.
1981
1982     This checks that the instance is in the cluster.
1983
1984     """
1985     instance = self.cfg.GetInstanceInfo(
1986       self.cfg.ExpandInstanceName(self.op.instance_name))
1987     if instance is None:
1988       raise errors.OpPrereqError("Instance '%s' not known" %
1989                                  self.op.instance_name)
1990     self.instance = instance
1991
1992
1993   def Exec(self, feedback_fn):
1994     """Activate the disks.
1995
1996     """
1997     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1998     if not disks_ok:
1999       raise errors.OpExecError("Cannot activate block devices")
2000
2001     return disks_info
2002
2003
2004 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2005   """Prepare the block devices for an instance.
2006
2007   This sets up the block devices on all nodes.
2008
2009   Args:
2010     instance: a ganeti.objects.Instance object
2011     ignore_secondaries: if true, errors on secondary nodes won't result
2012                         in an error return from the function
2013
2014   Returns:
2015     false if the operation failed
2016     list of (host, instance_visible_name, node_visible_name) if the operation
2017          suceeded with the mapping from node devices to instance devices
2018   """
2019   device_info = []
2020   disks_ok = True
2021   iname = instance.name
2022   # With the two passes mechanism we try to reduce the window of
2023   # opportunity for the race condition of switching DRBD to primary
2024   # before handshaking occured, but we do not eliminate it
2025
2026   # The proper fix would be to wait (with some limits) until the
2027   # connection has been made and drbd transitions from WFConnection
2028   # into any other network-connected state (Connected, SyncTarget,
2029   # SyncSource, etc.)
2030
2031   # 1st pass, assemble on all nodes in secondary mode
2032   for inst_disk in instance.disks:
2033     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2034       cfg.SetDiskID(node_disk, node)
2035       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2036       if not result:
2037         logger.Error("could not prepare block device %s on node %s"
2038                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2039         if not ignore_secondaries:
2040           disks_ok = False
2041
2042   # FIXME: race condition on drbd migration to primary
2043
2044   # 2nd pass, do only the primary node
2045   for inst_disk in instance.disks:
2046     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2047       if node != instance.primary_node:
2048         continue
2049       cfg.SetDiskID(node_disk, node)
2050       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2051       if not result:
2052         logger.Error("could not prepare block device %s on node %s"
2053                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2054         disks_ok = False
2055     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2056
2057   # leave the disks configured for the primary node
2058   # this is a workaround that would be fixed better by
2059   # improving the logical/physical id handling
2060   for disk in instance.disks:
2061     cfg.SetDiskID(disk, instance.primary_node)
2062
2063   return disks_ok, device_info
2064
2065
2066 def _StartInstanceDisks(cfg, instance, force):
2067   """Start the disks of an instance.
2068
2069   """
2070   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2071                                            ignore_secondaries=force)
2072   if not disks_ok:
2073     _ShutdownInstanceDisks(instance, cfg)
2074     if force is not None and not force:
2075       logger.Error("If the message above refers to a secondary node,"
2076                    " you can retry the operation using '--force'.")
2077     raise errors.OpExecError("Disk consistency error")
2078
2079
2080 class LUDeactivateInstanceDisks(NoHooksLU):
2081   """Shutdown an instance's disks.
2082
2083   """
2084   _OP_REQP = ["instance_name"]
2085
2086   def CheckPrereq(self):
2087     """Check prerequisites.
2088
2089     This checks that the instance is in the cluster.
2090
2091     """
2092     instance = self.cfg.GetInstanceInfo(
2093       self.cfg.ExpandInstanceName(self.op.instance_name))
2094     if instance is None:
2095       raise errors.OpPrereqError("Instance '%s' not known" %
2096                                  self.op.instance_name)
2097     self.instance = instance
2098
2099   def Exec(self, feedback_fn):
2100     """Deactivate the disks
2101
2102     """
2103     instance = self.instance
2104     ins_l = rpc.call_instance_list([instance.primary_node])
2105     ins_l = ins_l[instance.primary_node]
2106     if not type(ins_l) is list:
2107       raise errors.OpExecError("Can't contact node '%s'" %
2108                                instance.primary_node)
2109
2110     if self.instance.name in ins_l:
2111       raise errors.OpExecError("Instance is running, can't shutdown"
2112                                " block devices.")
2113
2114     _ShutdownInstanceDisks(instance, self.cfg)
2115
2116
2117 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2118   """Shutdown block devices of an instance.
2119
2120   This does the shutdown on all nodes of the instance.
2121
2122   If the ignore_primary is false, errors on the primary node are
2123   ignored.
2124
2125   """
2126   result = True
2127   for disk in instance.disks:
2128     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2129       cfg.SetDiskID(top_disk, node)
2130       if not rpc.call_blockdev_shutdown(node, top_disk):
2131         logger.Error("could not shutdown block device %s on node %s" %
2132                      (disk.iv_name, node))
2133         if not ignore_primary or node != instance.primary_node:
2134           result = False
2135   return result
2136
2137
2138 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2139   """Checks if a node has enough free memory.
2140
2141   This function check if a given node has the needed amount of free
2142   memory. In case the node has less memory or we cannot get the
2143   information from the node, this function raise an OpPrereqError
2144   exception.
2145
2146   Args:
2147     - cfg: a ConfigWriter instance
2148     - node: the node name
2149     - reason: string to use in the error message
2150     - requested: the amount of memory in MiB
2151
2152   """
2153   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2154   if not nodeinfo or not isinstance(nodeinfo, dict):
2155     raise errors.OpPrereqError("Could not contact node %s for resource"
2156                              " information" % (node,))
2157
2158   free_mem = nodeinfo[node].get('memory_free')
2159   if not isinstance(free_mem, int):
2160     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2161                              " was '%s'" % (node, free_mem))
2162   if requested > free_mem:
2163     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2164                              " needed %s MiB, available %s MiB" %
2165                              (node, reason, requested, free_mem))
2166
2167
2168 class LUStartupInstance(LogicalUnit):
2169   """Starts an instance.
2170
2171   """
2172   HPATH = "instance-start"
2173   HTYPE = constants.HTYPE_INSTANCE
2174   _OP_REQP = ["instance_name", "force"]
2175
2176   def BuildHooksEnv(self):
2177     """Build hooks env.
2178
2179     This runs on master, primary and secondary nodes of the instance.
2180
2181     """
2182     env = {
2183       "FORCE": self.op.force,
2184       }
2185     env.update(_BuildInstanceHookEnvByObject(self.instance))
2186     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2187           list(self.instance.secondary_nodes))
2188     return env, nl, nl
2189
2190   def CheckPrereq(self):
2191     """Check prerequisites.
2192
2193     This checks that the instance is in the cluster.
2194
2195     """
2196     instance = self.cfg.GetInstanceInfo(
2197       self.cfg.ExpandInstanceName(self.op.instance_name))
2198     if instance is None:
2199       raise errors.OpPrereqError("Instance '%s' not known" %
2200                                  self.op.instance_name)
2201
2202     # check bridges existance
2203     _CheckInstanceBridgesExist(instance)
2204
2205     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2206                          "starting instance %s" % instance.name,
2207                          instance.memory)
2208
2209     self.instance = instance
2210     self.op.instance_name = instance.name
2211
2212   def Exec(self, feedback_fn):
2213     """Start the instance.
2214
2215     """
2216     instance = self.instance
2217     force = self.op.force
2218     extra_args = getattr(self.op, "extra_args", "")
2219
2220     self.cfg.MarkInstanceUp(instance.name)
2221
2222     node_current = instance.primary_node
2223
2224     _StartInstanceDisks(self.cfg, instance, force)
2225
2226     if not rpc.call_instance_start(node_current, instance, extra_args):
2227       _ShutdownInstanceDisks(instance, self.cfg)
2228       raise errors.OpExecError("Could not start instance")
2229
2230
2231 class LURebootInstance(LogicalUnit):
2232   """Reboot an instance.
2233
2234   """
2235   HPATH = "instance-reboot"
2236   HTYPE = constants.HTYPE_INSTANCE
2237   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2238
2239   def BuildHooksEnv(self):
2240     """Build hooks env.
2241
2242     This runs on master, primary and secondary nodes of the instance.
2243
2244     """
2245     env = {
2246       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2247       }
2248     env.update(_BuildInstanceHookEnvByObject(self.instance))
2249     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2250           list(self.instance.secondary_nodes))
2251     return env, nl, nl
2252
2253   def CheckPrereq(self):
2254     """Check prerequisites.
2255
2256     This checks that the instance is in the cluster.
2257
2258     """
2259     instance = self.cfg.GetInstanceInfo(
2260       self.cfg.ExpandInstanceName(self.op.instance_name))
2261     if instance is None:
2262       raise errors.OpPrereqError("Instance '%s' not known" %
2263                                  self.op.instance_name)
2264
2265     # check bridges existance
2266     _CheckInstanceBridgesExist(instance)
2267
2268     self.instance = instance
2269     self.op.instance_name = instance.name
2270
2271   def Exec(self, feedback_fn):
2272     """Reboot the instance.
2273
2274     """
2275     instance = self.instance
2276     ignore_secondaries = self.op.ignore_secondaries
2277     reboot_type = self.op.reboot_type
2278     extra_args = getattr(self.op, "extra_args", "")
2279
2280     node_current = instance.primary_node
2281
2282     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2283                            constants.INSTANCE_REBOOT_HARD,
2284                            constants.INSTANCE_REBOOT_FULL]:
2285       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2286                                   (constants.INSTANCE_REBOOT_SOFT,
2287                                    constants.INSTANCE_REBOOT_HARD,
2288                                    constants.INSTANCE_REBOOT_FULL))
2289
2290     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2291                        constants.INSTANCE_REBOOT_HARD]:
2292       if not rpc.call_instance_reboot(node_current, instance,
2293                                       reboot_type, extra_args):
2294         raise errors.OpExecError("Could not reboot instance")
2295     else:
2296       if not rpc.call_instance_shutdown(node_current, instance):
2297         raise errors.OpExecError("could not shutdown instance for full reboot")
2298       _ShutdownInstanceDisks(instance, self.cfg)
2299       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2300       if not rpc.call_instance_start(node_current, instance, extra_args):
2301         _ShutdownInstanceDisks(instance, self.cfg)
2302         raise errors.OpExecError("Could not start instance for full reboot")
2303
2304     self.cfg.MarkInstanceUp(instance.name)
2305
2306
2307 class LUShutdownInstance(LogicalUnit):
2308   """Shutdown an instance.
2309
2310   """
2311   HPATH = "instance-stop"
2312   HTYPE = constants.HTYPE_INSTANCE
2313   _OP_REQP = ["instance_name"]
2314
2315   def BuildHooksEnv(self):
2316     """Build hooks env.
2317
2318     This runs on master, primary and secondary nodes of the instance.
2319
2320     """
2321     env = _BuildInstanceHookEnvByObject(self.instance)
2322     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2323           list(self.instance.secondary_nodes))
2324     return env, nl, nl
2325
2326   def CheckPrereq(self):
2327     """Check prerequisites.
2328
2329     This checks that the instance is in the cluster.
2330
2331     """
2332     instance = self.cfg.GetInstanceInfo(
2333       self.cfg.ExpandInstanceName(self.op.instance_name))
2334     if instance is None:
2335       raise errors.OpPrereqError("Instance '%s' not known" %
2336                                  self.op.instance_name)
2337     self.instance = instance
2338
2339   def Exec(self, feedback_fn):
2340     """Shutdown the instance.
2341
2342     """
2343     instance = self.instance
2344     node_current = instance.primary_node
2345     self.cfg.MarkInstanceDown(instance.name)
2346     if not rpc.call_instance_shutdown(node_current, instance):
2347       logger.Error("could not shutdown instance")
2348
2349     _ShutdownInstanceDisks(instance, self.cfg)
2350
2351
2352 class LUReinstallInstance(LogicalUnit):
2353   """Reinstall an instance.
2354
2355   """
2356   HPATH = "instance-reinstall"
2357   HTYPE = constants.HTYPE_INSTANCE
2358   _OP_REQP = ["instance_name"]
2359
2360   def BuildHooksEnv(self):
2361     """Build hooks env.
2362
2363     This runs on master, primary and secondary nodes of the instance.
2364
2365     """
2366     env = _BuildInstanceHookEnvByObject(self.instance)
2367     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2368           list(self.instance.secondary_nodes))
2369     return env, nl, nl
2370
2371   def CheckPrereq(self):
2372     """Check prerequisites.
2373
2374     This checks that the instance is in the cluster and is not running.
2375
2376     """
2377     instance = self.cfg.GetInstanceInfo(
2378       self.cfg.ExpandInstanceName(self.op.instance_name))
2379     if instance is None:
2380       raise errors.OpPrereqError("Instance '%s' not known" %
2381                                  self.op.instance_name)
2382     if instance.disk_template == constants.DT_DISKLESS:
2383       raise errors.OpPrereqError("Instance '%s' has no disks" %
2384                                  self.op.instance_name)
2385     if instance.status != "down":
2386       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2387                                  self.op.instance_name)
2388     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2389     if remote_info:
2390       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2391                                  (self.op.instance_name,
2392                                   instance.primary_node))
2393
2394     self.op.os_type = getattr(self.op, "os_type", None)
2395     if self.op.os_type is not None:
2396       # OS verification
2397       pnode = self.cfg.GetNodeInfo(
2398         self.cfg.ExpandNodeName(instance.primary_node))
2399       if pnode is None:
2400         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2401                                    self.op.pnode)
2402       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2403       if not os_obj:
2404         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2405                                    " primary node"  % self.op.os_type)
2406
2407     self.instance = instance
2408
2409   def Exec(self, feedback_fn):
2410     """Reinstall the instance.
2411
2412     """
2413     inst = self.instance
2414
2415     if self.op.os_type is not None:
2416       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2417       inst.os = self.op.os_type
2418       self.cfg.AddInstance(inst)
2419
2420     _StartInstanceDisks(self.cfg, inst, None)
2421     try:
2422       feedback_fn("Running the instance OS create scripts...")
2423       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2424         raise errors.OpExecError("Could not install OS for instance %s"
2425                                  " on node %s" %
2426                                  (inst.name, inst.primary_node))
2427     finally:
2428       _ShutdownInstanceDisks(inst, self.cfg)
2429
2430
2431 class LURenameInstance(LogicalUnit):
2432   """Rename an instance.
2433
2434   """
2435   HPATH = "instance-rename"
2436   HTYPE = constants.HTYPE_INSTANCE
2437   _OP_REQP = ["instance_name", "new_name"]
2438
2439   def BuildHooksEnv(self):
2440     """Build hooks env.
2441
2442     This runs on master, primary and secondary nodes of the instance.
2443
2444     """
2445     env = _BuildInstanceHookEnvByObject(self.instance)
2446     env["INSTANCE_NEW_NAME"] = self.op.new_name
2447     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2448           list(self.instance.secondary_nodes))
2449     return env, nl, nl
2450
2451   def CheckPrereq(self):
2452     """Check prerequisites.
2453
2454     This checks that the instance is in the cluster and is not running.
2455
2456     """
2457     instance = self.cfg.GetInstanceInfo(
2458       self.cfg.ExpandInstanceName(self.op.instance_name))
2459     if instance is None:
2460       raise errors.OpPrereqError("Instance '%s' not known" %
2461                                  self.op.instance_name)
2462     if instance.status != "down":
2463       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2464                                  self.op.instance_name)
2465     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2466     if remote_info:
2467       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2468                                  (self.op.instance_name,
2469                                   instance.primary_node))
2470     self.instance = instance
2471
2472     # new name verification
2473     name_info = utils.HostInfo(self.op.new_name)
2474
2475     self.op.new_name = new_name = name_info.name
2476     instance_list = self.cfg.GetInstanceList()
2477     if new_name in instance_list:
2478       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2479                                  new_name)
2480
2481     if not getattr(self.op, "ignore_ip", False):
2482       command = ["fping", "-q", name_info.ip]
2483       result = utils.RunCmd(command)
2484       if not result.failed:
2485         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2486                                    (name_info.ip, new_name))
2487
2488
2489   def Exec(self, feedback_fn):
2490     """Reinstall the instance.
2491
2492     """
2493     inst = self.instance
2494     old_name = inst.name
2495
2496     self.cfg.RenameInstance(inst.name, self.op.new_name)
2497
2498     # re-read the instance from the configuration after rename
2499     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2500
2501     _StartInstanceDisks(self.cfg, inst, None)
2502     try:
2503       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2504                                           "sda", "sdb"):
2505         msg = ("Could run OS rename script for instance %s on node %s (but the"
2506                " instance has been renamed in Ganeti)" %
2507                (inst.name, inst.primary_node))
2508         logger.Error(msg)
2509     finally:
2510       _ShutdownInstanceDisks(inst, self.cfg)
2511
2512
2513 class LURemoveInstance(LogicalUnit):
2514   """Remove an instance.
2515
2516   """
2517   HPATH = "instance-remove"
2518   HTYPE = constants.HTYPE_INSTANCE
2519   _OP_REQP = ["instance_name"]
2520
2521   def BuildHooksEnv(self):
2522     """Build hooks env.
2523
2524     This runs on master, primary and secondary nodes of the instance.
2525
2526     """
2527     env = _BuildInstanceHookEnvByObject(self.instance)
2528     nl = [self.sstore.GetMasterNode()]
2529     return env, nl, nl
2530
2531   def CheckPrereq(self):
2532     """Check prerequisites.
2533
2534     This checks that the instance is in the cluster.
2535
2536     """
2537     instance = self.cfg.GetInstanceInfo(
2538       self.cfg.ExpandInstanceName(self.op.instance_name))
2539     if instance is None:
2540       raise errors.OpPrereqError("Instance '%s' not known" %
2541                                  self.op.instance_name)
2542     self.instance = instance
2543
2544   def Exec(self, feedback_fn):
2545     """Remove the instance.
2546
2547     """
2548     instance = self.instance
2549     logger.Info("shutting down instance %s on node %s" %
2550                 (instance.name, instance.primary_node))
2551
2552     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2553       if self.op.ignore_failures:
2554         feedback_fn("Warning: can't shutdown instance")
2555       else:
2556         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2557                                  (instance.name, instance.primary_node))
2558
2559     logger.Info("removing block devices for instance %s" % instance.name)
2560
2561     if not _RemoveDisks(instance, self.cfg):
2562       if self.op.ignore_failures:
2563         feedback_fn("Warning: can't remove instance's disks")
2564       else:
2565         raise errors.OpExecError("Can't remove instance's disks")
2566
2567     logger.Info("removing instance %s out of cluster config" % instance.name)
2568
2569     self.cfg.RemoveInstance(instance.name)
2570
2571
2572 class LUQueryInstances(NoHooksLU):
2573   """Logical unit for querying instances.
2574
2575   """
2576   _OP_REQP = ["output_fields", "names"]
2577
2578   def CheckPrereq(self):
2579     """Check prerequisites.
2580
2581     This checks that the fields required are valid output fields.
2582
2583     """
2584     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2585     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2586                                "admin_state", "admin_ram",
2587                                "disk_template", "ip", "mac", "bridge",
2588                                "sda_size", "sdb_size", "vcpus"],
2589                        dynamic=self.dynamic_fields,
2590                        selected=self.op.output_fields)
2591
2592     self.wanted = _GetWantedInstances(self, self.op.names)
2593
2594   def Exec(self, feedback_fn):
2595     """Computes the list of nodes and their attributes.
2596
2597     """
2598     instance_names = self.wanted
2599     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2600                      in instance_names]
2601
2602     # begin data gathering
2603
2604     nodes = frozenset([inst.primary_node for inst in instance_list])
2605
2606     bad_nodes = []
2607     if self.dynamic_fields.intersection(self.op.output_fields):
2608       live_data = {}
2609       node_data = rpc.call_all_instances_info(nodes)
2610       for name in nodes:
2611         result = node_data[name]
2612         if result:
2613           live_data.update(result)
2614         elif result == False:
2615           bad_nodes.append(name)
2616         # else no instance is alive
2617     else:
2618       live_data = dict([(name, {}) for name in instance_names])
2619
2620     # end data gathering
2621
2622     output = []
2623     for instance in instance_list:
2624       iout = []
2625       for field in self.op.output_fields:
2626         if field == "name":
2627           val = instance.name
2628         elif field == "os":
2629           val = instance.os
2630         elif field == "pnode":
2631           val = instance.primary_node
2632         elif field == "snodes":
2633           val = list(instance.secondary_nodes)
2634         elif field == "admin_state":
2635           val = (instance.status != "down")
2636         elif field == "oper_state":
2637           if instance.primary_node in bad_nodes:
2638             val = None
2639           else:
2640             val = bool(live_data.get(instance.name))
2641         elif field == "status":
2642           if instance.primary_node in bad_nodes:
2643             val = "ERROR_nodedown"
2644           else:
2645             running = bool(live_data.get(instance.name))
2646             if running:
2647               if instance.status != "down":
2648                 val = "running"
2649               else:
2650                 val = "ERROR_up"
2651             else:
2652               if instance.status != "down":
2653                 val = "ERROR_down"
2654               else:
2655                 val = "ADMIN_down"
2656         elif field == "admin_ram":
2657           val = instance.memory
2658         elif field == "oper_ram":
2659           if instance.primary_node in bad_nodes:
2660             val = None
2661           elif instance.name in live_data:
2662             val = live_data[instance.name].get("memory", "?")
2663           else:
2664             val = "-"
2665         elif field == "disk_template":
2666           val = instance.disk_template
2667         elif field == "ip":
2668           val = instance.nics[0].ip
2669         elif field == "bridge":
2670           val = instance.nics[0].bridge
2671         elif field == "mac":
2672           val = instance.nics[0].mac
2673         elif field == "sda_size" or field == "sdb_size":
2674           disk = instance.FindDisk(field[:3])
2675           if disk is None:
2676             val = None
2677           else:
2678             val = disk.size
2679         elif field == "vcpus":
2680           val = instance.vcpus
2681         else:
2682           raise errors.ParameterError(field)
2683         iout.append(val)
2684       output.append(iout)
2685
2686     return output
2687
2688
2689 class LUFailoverInstance(LogicalUnit):
2690   """Failover an instance.
2691
2692   """
2693   HPATH = "instance-failover"
2694   HTYPE = constants.HTYPE_INSTANCE
2695   _OP_REQP = ["instance_name", "ignore_consistency"]
2696
2697   def BuildHooksEnv(self):
2698     """Build hooks env.
2699
2700     This runs on master, primary and secondary nodes of the instance.
2701
2702     """
2703     env = {
2704       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2705       }
2706     env.update(_BuildInstanceHookEnvByObject(self.instance))
2707     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2708     return env, nl, nl
2709
2710   def CheckPrereq(self):
2711     """Check prerequisites.
2712
2713     This checks that the instance is in the cluster.
2714
2715     """
2716     instance = self.cfg.GetInstanceInfo(
2717       self.cfg.ExpandInstanceName(self.op.instance_name))
2718     if instance is None:
2719       raise errors.OpPrereqError("Instance '%s' not known" %
2720                                  self.op.instance_name)
2721
2722     if instance.disk_template not in constants.DTS_NET_MIRROR:
2723       raise errors.OpPrereqError("Instance's disk layout is not"
2724                                  " network mirrored, cannot failover.")
2725
2726     secondary_nodes = instance.secondary_nodes
2727     if not secondary_nodes:
2728       raise errors.ProgrammerError("no secondary node but using "
2729                                    "DT_REMOTE_RAID1 template")
2730
2731     target_node = secondary_nodes[0]
2732     # check memory requirements on the secondary node
2733     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2734                          instance.name, instance.memory)
2735
2736     # check bridge existance
2737     brlist = [nic.bridge for nic in instance.nics]
2738     if not rpc.call_bridges_exist(target_node, brlist):
2739       raise errors.OpPrereqError("One or more target bridges %s does not"
2740                                  " exist on destination node '%s'" %
2741                                  (brlist, target_node))
2742
2743     self.instance = instance
2744
2745   def Exec(self, feedback_fn):
2746     """Failover an instance.
2747
2748     The failover is done by shutting it down on its present node and
2749     starting it on the secondary.
2750
2751     """
2752     instance = self.instance
2753
2754     source_node = instance.primary_node
2755     target_node = instance.secondary_nodes[0]
2756
2757     feedback_fn("* checking disk consistency between source and target")
2758     for dev in instance.disks:
2759       # for remote_raid1, these are md over drbd
2760       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2761         if instance.status == "up" and not self.op.ignore_consistency:
2762           raise errors.OpExecError("Disk %s is degraded on target node,"
2763                                    " aborting failover." % dev.iv_name)
2764
2765     feedback_fn("* shutting down instance on source node")
2766     logger.Info("Shutting down instance %s on node %s" %
2767                 (instance.name, source_node))
2768
2769     if not rpc.call_instance_shutdown(source_node, instance):
2770       if self.op.ignore_consistency:
2771         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2772                      " anyway. Please make sure node %s is down"  %
2773                      (instance.name, source_node, source_node))
2774       else:
2775         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2776                                  (instance.name, source_node))
2777
2778     feedback_fn("* deactivating the instance's disks on source node")
2779     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2780       raise errors.OpExecError("Can't shut down the instance's disks.")
2781
2782     instance.primary_node = target_node
2783     # distribute new instance config to the other nodes
2784     self.cfg.AddInstance(instance)
2785
2786     # Only start the instance if it's marked as up
2787     if instance.status == "up":
2788       feedback_fn("* activating the instance's disks on target node")
2789       logger.Info("Starting instance %s on node %s" %
2790                   (instance.name, target_node))
2791
2792       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2793                                                ignore_secondaries=True)
2794       if not disks_ok:
2795         _ShutdownInstanceDisks(instance, self.cfg)
2796         raise errors.OpExecError("Can't activate the instance's disks")
2797
2798       feedback_fn("* starting the instance on the target node")
2799       if not rpc.call_instance_start(target_node, instance, None):
2800         _ShutdownInstanceDisks(instance, self.cfg)
2801         raise errors.OpExecError("Could not start instance %s on node %s." %
2802                                  (instance.name, target_node))
2803
2804
2805 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2806   """Create a tree of block devices on the primary node.
2807
2808   This always creates all devices.
2809
2810   """
2811   if device.children:
2812     for child in device.children:
2813       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2814         return False
2815
2816   cfg.SetDiskID(device, node)
2817   new_id = rpc.call_blockdev_create(node, device, device.size,
2818                                     instance.name, True, info)
2819   if not new_id:
2820     return False
2821   if device.physical_id is None:
2822     device.physical_id = new_id
2823   return True
2824
2825
2826 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2827   """Create a tree of block devices on a secondary node.
2828
2829   If this device type has to be created on secondaries, create it and
2830   all its children.
2831
2832   If not, just recurse to children keeping the same 'force' value.
2833
2834   """
2835   if device.CreateOnSecondary():
2836     force = True
2837   if device.children:
2838     for child in device.children:
2839       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2840                                         child, force, info):
2841         return False
2842
2843   if not force:
2844     return True
2845   cfg.SetDiskID(device, node)
2846   new_id = rpc.call_blockdev_create(node, device, device.size,
2847                                     instance.name, False, info)
2848   if not new_id:
2849     return False
2850   if device.physical_id is None:
2851     device.physical_id = new_id
2852   return True
2853
2854
2855 def _GenerateUniqueNames(cfg, exts):
2856   """Generate a suitable LV name.
2857
2858   This will generate a logical volume name for the given instance.
2859
2860   """
2861   results = []
2862   for val in exts:
2863     new_id = cfg.GenerateUniqueID()
2864     results.append("%s%s" % (new_id, val))
2865   return results
2866
2867
2868 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2869   """Generate a drbd device complete with its children.
2870
2871   """
2872   port = cfg.AllocatePort()
2873   vgname = cfg.GetVGName()
2874   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2875                           logical_id=(vgname, names[0]))
2876   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2877                           logical_id=(vgname, names[1]))
2878   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2879                           logical_id = (primary, secondary, port),
2880                           children = [dev_data, dev_meta])
2881   return drbd_dev
2882
2883
2884 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2885   """Generate a drbd8 device complete with its children.
2886
2887   """
2888   port = cfg.AllocatePort()
2889   vgname = cfg.GetVGName()
2890   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2891                           logical_id=(vgname, names[0]))
2892   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2893                           logical_id=(vgname, names[1]))
2894   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2895                           logical_id = (primary, secondary, port),
2896                           children = [dev_data, dev_meta],
2897                           iv_name=iv_name)
2898   return drbd_dev
2899
2900 def _GenerateDiskTemplate(cfg, template_name,
2901                           instance_name, primary_node,
2902                           secondary_nodes, disk_sz, swap_sz):
2903   """Generate the entire disk layout for a given template type.
2904
2905   """
2906   #TODO: compute space requirements
2907
2908   vgname = cfg.GetVGName()
2909   if template_name == constants.DT_DISKLESS:
2910     disks = []
2911   elif template_name == constants.DT_PLAIN:
2912     if len(secondary_nodes) != 0:
2913       raise errors.ProgrammerError("Wrong template configuration")
2914
2915     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2916     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2917                            logical_id=(vgname, names[0]),
2918                            iv_name = "sda")
2919     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2920                            logical_id=(vgname, names[1]),
2921                            iv_name = "sdb")
2922     disks = [sda_dev, sdb_dev]
2923   elif template_name == constants.DT_LOCAL_RAID1:
2924     if len(secondary_nodes) != 0:
2925       raise errors.ProgrammerError("Wrong template configuration")
2926
2927
2928     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2929                                        ".sdb_m1", ".sdb_m2"])
2930     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2931                               logical_id=(vgname, names[0]))
2932     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2933                               logical_id=(vgname, names[1]))
2934     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2935                               size=disk_sz,
2936                               children = [sda_dev_m1, sda_dev_m2])
2937     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2938                               logical_id=(vgname, names[2]))
2939     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2940                               logical_id=(vgname, names[3]))
2941     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2942                               size=swap_sz,
2943                               children = [sdb_dev_m1, sdb_dev_m2])
2944     disks = [md_sda_dev, md_sdb_dev]
2945   elif template_name == constants.DT_REMOTE_RAID1:
2946     if len(secondary_nodes) != 1:
2947       raise errors.ProgrammerError("Wrong template configuration")
2948     remote_node = secondary_nodes[0]
2949     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2950                                        ".sdb_data", ".sdb_meta"])
2951     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2952                                          disk_sz, names[0:2])
2953     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2954                               children = [drbd_sda_dev], size=disk_sz)
2955     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2956                                          swap_sz, names[2:4])
2957     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2958                               children = [drbd_sdb_dev], size=swap_sz)
2959     disks = [md_sda_dev, md_sdb_dev]
2960   elif template_name == constants.DT_DRBD8:
2961     if len(secondary_nodes) != 1:
2962       raise errors.ProgrammerError("Wrong template configuration")
2963     remote_node = secondary_nodes[0]
2964     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2965                                        ".sdb_data", ".sdb_meta"])
2966     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2967                                          disk_sz, names[0:2], "sda")
2968     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2969                                          swap_sz, names[2:4], "sdb")
2970     disks = [drbd_sda_dev, drbd_sdb_dev]
2971   else:
2972     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2973   return disks
2974
2975
2976 def _GetInstanceInfoText(instance):
2977   """Compute that text that should be added to the disk's metadata.
2978
2979   """
2980   return "originstname+%s" % instance.name
2981
2982
2983 def _CreateDisks(cfg, instance):
2984   """Create all disks for an instance.
2985
2986   This abstracts away some work from AddInstance.
2987
2988   Args:
2989     instance: the instance object
2990
2991   Returns:
2992     True or False showing the success of the creation process
2993
2994   """
2995   info = _GetInstanceInfoText(instance)
2996
2997   for device in instance.disks:
2998     logger.Info("creating volume %s for instance %s" %
2999               (device.iv_name, instance.name))
3000     #HARDCODE
3001     for secondary_node in instance.secondary_nodes:
3002       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3003                                         device, False, info):
3004         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3005                      (device.iv_name, device, secondary_node))
3006         return False
3007     #HARDCODE
3008     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3009                                     instance, device, info):
3010       logger.Error("failed to create volume %s on primary!" %
3011                    device.iv_name)
3012       return False
3013   return True
3014
3015
3016 def _RemoveDisks(instance, cfg):
3017   """Remove all disks for an instance.
3018
3019   This abstracts away some work from `AddInstance()` and
3020   `RemoveInstance()`. Note that in case some of the devices couldn't
3021   be removed, the removal will continue with the other ones (compare
3022   with `_CreateDisks()`).
3023
3024   Args:
3025     instance: the instance object
3026
3027   Returns:
3028     True or False showing the success of the removal proces
3029
3030   """
3031   logger.Info("removing block devices for instance %s" % instance.name)
3032
3033   result = True
3034   for device in instance.disks:
3035     for node, disk in device.ComputeNodeTree(instance.primary_node):
3036       cfg.SetDiskID(disk, node)
3037       if not rpc.call_blockdev_remove(node, disk):
3038         logger.Error("could not remove block device %s on node %s,"
3039                      " continuing anyway" %
3040                      (device.iv_name, node))
3041         result = False
3042   return result
3043
3044
3045 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3046   """Compute disk size requirements in the volume group
3047
3048   This is currently hard-coded for the two-drive layout.
3049
3050   """
3051   # Required free disk space as a function of disk and swap space
3052   req_size_dict = {
3053     constants.DT_DISKLESS: None,
3054     constants.DT_PLAIN: disk_size + swap_size,
3055     constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3056     # 256 MB are added for drbd metadata, 128MB for each drbd device
3057     constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3058     constants.DT_DRBD8: disk_size + swap_size + 256,
3059   }
3060
3061   if disk_template not in req_size_dict:
3062     raise errors.ProgrammerError("Disk template '%s' size requirement"
3063                                  " is unknown" %  disk_template)
3064
3065   return req_size_dict[disk_template]
3066
3067
3068 class LUCreateInstance(LogicalUnit):
3069   """Create an instance.
3070
3071   """
3072   HPATH = "instance-add"
3073   HTYPE = constants.HTYPE_INSTANCE
3074   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3075               "disk_template", "swap_size", "mode", "start", "vcpus",
3076               "wait_for_sync", "ip_check", "mac"]
3077
3078   def _RunAllocator(self):
3079     """Run the allocator based on input opcode.
3080
3081     """
3082     disks = [{"size": self.op.disk_size, "mode": "w"},
3083              {"size": self.op.swap_size, "mode": "w"}]
3084     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3085              "bridge": self.op.bridge}]
3086     ial = IAllocator(self.cfg, self.sstore,
3087                      mode=constants.IALLOCATOR_MODE_ALLOC,
3088                      name=self.op.instance_name,
3089                      disk_template=self.op.disk_template,
3090                      tags=[],
3091                      os=self.op.os_type,
3092                      vcpus=self.op.vcpus,
3093                      mem_size=self.op.mem_size,
3094                      disks=disks,
3095                      nics=nics,
3096                      )
3097
3098     ial.Run(self.op.iallocator)
3099
3100     if not ial.success:
3101       raise errors.OpPrereqError("Can't compute nodes using"
3102                                  " iallocator '%s': %s" % (self.op.iallocator,
3103                                                            ial.info))
3104     if len(ial.nodes) != ial.required_nodes:
3105       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3106                                  " of nodes (%s), required %s" %
3107                                  (len(ial.nodes), ial.required_nodes))
3108     self.op.pnode = ial.nodes[0]
3109     logger.ToStdout("Selected nodes for the instance: %s" %
3110                     (", ".join(ial.nodes),))
3111     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3112                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3113     if ial.required_nodes == 2:
3114       self.op.snode = ial.nodes[1]
3115
3116   def BuildHooksEnv(self):
3117     """Build hooks env.
3118
3119     This runs on master, primary and secondary nodes of the instance.
3120
3121     """
3122     env = {
3123       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3124       "INSTANCE_DISK_SIZE": self.op.disk_size,
3125       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3126       "INSTANCE_ADD_MODE": self.op.mode,
3127       }
3128     if self.op.mode == constants.INSTANCE_IMPORT:
3129       env["INSTANCE_SRC_NODE"] = self.op.src_node
3130       env["INSTANCE_SRC_PATH"] = self.op.src_path
3131       env["INSTANCE_SRC_IMAGE"] = self.src_image
3132
3133     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3134       primary_node=self.op.pnode,
3135       secondary_nodes=self.secondaries,
3136       status=self.instance_status,
3137       os_type=self.op.os_type,
3138       memory=self.op.mem_size,
3139       vcpus=self.op.vcpus,
3140       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3141     ))
3142
3143     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3144           self.secondaries)
3145     return env, nl, nl
3146
3147
3148   def CheckPrereq(self):
3149     """Check prerequisites.
3150
3151     """
3152     # set optional parameters to none if they don't exist
3153     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3154                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3155                  "vnc_bind_address"]:
3156       if not hasattr(self.op, attr):
3157         setattr(self.op, attr, None)
3158
3159     if self.op.mode not in (constants.INSTANCE_CREATE,
3160                             constants.INSTANCE_IMPORT):
3161       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3162                                  self.op.mode)
3163
3164     if self.op.mode == constants.INSTANCE_IMPORT:
3165       src_node = getattr(self.op, "src_node", None)
3166       src_path = getattr(self.op, "src_path", None)
3167       if src_node is None or src_path is None:
3168         raise errors.OpPrereqError("Importing an instance requires source"
3169                                    " node and path options")
3170       src_node_full = self.cfg.ExpandNodeName(src_node)
3171       if src_node_full is None:
3172         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3173       self.op.src_node = src_node = src_node_full
3174
3175       if not os.path.isabs(src_path):
3176         raise errors.OpPrereqError("The source path must be absolute")
3177
3178       export_info = rpc.call_export_info(src_node, src_path)
3179
3180       if not export_info:
3181         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3182
3183       if not export_info.has_section(constants.INISECT_EXP):
3184         raise errors.ProgrammerError("Corrupted export config")
3185
3186       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3187       if (int(ei_version) != constants.EXPORT_VERSION):
3188         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3189                                    (ei_version, constants.EXPORT_VERSION))
3190
3191       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3192         raise errors.OpPrereqError("Can't import instance with more than"
3193                                    " one data disk")
3194
3195       # FIXME: are the old os-es, disk sizes, etc. useful?
3196       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3197       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3198                                                          'disk0_dump'))
3199       self.src_image = diskimage
3200     else: # INSTANCE_CREATE
3201       if getattr(self.op, "os_type", None) is None:
3202         raise errors.OpPrereqError("No guest OS specified")
3203
3204     #### instance parameters check
3205
3206     # disk template and mirror node verification
3207     if self.op.disk_template not in constants.DISK_TEMPLATES:
3208       raise errors.OpPrereqError("Invalid disk template name")
3209
3210     # instance name verification
3211     hostname1 = utils.HostInfo(self.op.instance_name)
3212
3213     self.op.instance_name = instance_name = hostname1.name
3214     instance_list = self.cfg.GetInstanceList()
3215     if instance_name in instance_list:
3216       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3217                                  instance_name)
3218
3219     # ip validity checks
3220     ip = getattr(self.op, "ip", None)
3221     if ip is None or ip.lower() == "none":
3222       inst_ip = None
3223     elif ip.lower() == "auto":
3224       inst_ip = hostname1.ip
3225     else:
3226       if not utils.IsValidIP(ip):
3227         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3228                                    " like a valid IP" % ip)
3229       inst_ip = ip
3230     self.inst_ip = self.op.ip = inst_ip
3231
3232     if self.op.start and not self.op.ip_check:
3233       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3234                                  " adding an instance in start mode")
3235
3236     if self.op.ip_check:
3237       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3238         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3239                                    (hostname1.ip, instance_name))
3240
3241     # MAC address verification
3242     if self.op.mac != "auto":
3243       if not utils.IsValidMac(self.op.mac.lower()):
3244         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3245                                    self.op.mac)
3246
3247     # bridge verification
3248     bridge = getattr(self.op, "bridge", None)
3249     if bridge is None:
3250       self.op.bridge = self.cfg.GetDefBridge()
3251     else:
3252       self.op.bridge = bridge
3253
3254     # boot order verification
3255     if self.op.hvm_boot_order is not None:
3256       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3257         raise errors.OpPrereqError("invalid boot order specified,"
3258                                    " must be one or more of [acdn]")
3259     #### allocator run
3260
3261     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3262       raise errors.OpPrereqError("One and only one of iallocator and primary"
3263                                  " node must be given")
3264
3265     if self.op.iallocator is not None:
3266       self._RunAllocator()
3267
3268     #### node related checks
3269
3270     # check primary node
3271     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3272     if pnode is None:
3273       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3274                                  self.op.pnode)
3275     self.op.pnode = pnode.name
3276     self.pnode = pnode
3277     self.secondaries = []
3278
3279     # mirror node verification
3280     if self.op.disk_template in constants.DTS_NET_MIRROR:
3281       if getattr(self.op, "snode", None) is None:
3282         raise errors.OpPrereqError("The networked disk templates need"
3283                                    " a mirror node")
3284
3285       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3286       if snode_name is None:
3287         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3288                                    self.op.snode)
3289       elif snode_name == pnode.name:
3290         raise errors.OpPrereqError("The secondary node cannot be"
3291                                    " the primary node.")
3292       self.secondaries.append(snode_name)
3293
3294     req_size = _ComputeDiskSize(self.op.disk_template,
3295                                 self.op.disk_size, self.op.swap_size)
3296
3297     # Check lv size requirements
3298     if req_size is not None:
3299       nodenames = [pnode.name] + self.secondaries
3300       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3301       for node in nodenames:
3302         info = nodeinfo.get(node, None)
3303         if not info:
3304           raise errors.OpPrereqError("Cannot get current information"
3305                                      " from node '%s'" % nodeinfo)
3306         vg_free = info.get('vg_free', None)
3307         if not isinstance(vg_free, int):
3308           raise errors.OpPrereqError("Can't compute free disk space on"
3309                                      " node %s" % node)
3310         if req_size > info['vg_free']:
3311           raise errors.OpPrereqError("Not enough disk space on target node %s."
3312                                      " %d MB available, %d MB required" %
3313                                      (node, info['vg_free'], req_size))
3314
3315     # os verification
3316     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3317     if not os_obj:
3318       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3319                                  " primary node"  % self.op.os_type)
3320
3321     if self.op.kernel_path == constants.VALUE_NONE:
3322       raise errors.OpPrereqError("Can't set instance kernel to none")
3323
3324
3325     # bridge check on primary node
3326     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3327       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3328                                  " destination node '%s'" %
3329                                  (self.op.bridge, pnode.name))
3330
3331     # hvm_cdrom_image_path verification
3332     if self.op.hvm_cdrom_image_path is not None:
3333       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3334         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3335                                    " be an absolute path or None, not %s" %
3336                                    self.op.hvm_cdrom_image_path)
3337       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3338         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3339                                    " regular file or a symlink pointing to"
3340                                    " an existing regular file, not %s" %
3341                                    self.op.hvm_cdrom_image_path)
3342
3343     # vnc_bind_address verification
3344     if self.op.vnc_bind_address is not None:
3345       if not utils.IsValidIP(self.op.vnc_bind_address):
3346         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3347                                    " like a valid IP address" %
3348                                    self.op.vnc_bind_address)
3349
3350     if self.op.start:
3351       self.instance_status = 'up'
3352     else:
3353       self.instance_status = 'down'
3354
3355   def Exec(self, feedback_fn):
3356     """Create and add the instance to the cluster.
3357
3358     """
3359     instance = self.op.instance_name
3360     pnode_name = self.pnode.name
3361
3362     if self.op.mac == "auto":
3363       mac_address = self.cfg.GenerateMAC()
3364     else:
3365       mac_address = self.op.mac
3366
3367     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3368     if self.inst_ip is not None:
3369       nic.ip = self.inst_ip
3370
3371     ht_kind = self.sstore.GetHypervisorType()
3372     if ht_kind in constants.HTS_REQ_PORT:
3373       network_port = self.cfg.AllocatePort()
3374     else:
3375       network_port = None
3376
3377     if self.op.vnc_bind_address is None:
3378       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3379
3380     disks = _GenerateDiskTemplate(self.cfg,
3381                                   self.op.disk_template,
3382                                   instance, pnode_name,
3383                                   self.secondaries, self.op.disk_size,
3384                                   self.op.swap_size)
3385
3386     iobj = objects.Instance(name=instance, os=self.op.os_type,
3387                             primary_node=pnode_name,
3388                             memory=self.op.mem_size,
3389                             vcpus=self.op.vcpus,
3390                             nics=[nic], disks=disks,
3391                             disk_template=self.op.disk_template,
3392                             status=self.instance_status,
3393                             network_port=network_port,
3394                             kernel_path=self.op.kernel_path,
3395                             initrd_path=self.op.initrd_path,
3396                             hvm_boot_order=self.op.hvm_boot_order,
3397                             hvm_acpi=self.op.hvm_acpi,
3398                             hvm_pae=self.op.hvm_pae,
3399                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3400                             vnc_bind_address=self.op.vnc_bind_address,
3401                             )
3402
3403     feedback_fn("* creating instance disks...")
3404     if not _CreateDisks(self.cfg, iobj):
3405       _RemoveDisks(iobj, self.cfg)
3406       raise errors.OpExecError("Device creation failed, reverting...")
3407
3408     feedback_fn("adding instance %s to cluster config" % instance)
3409
3410     self.cfg.AddInstance(iobj)
3411
3412     if self.op.wait_for_sync:
3413       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3414     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3415       # make sure the disks are not degraded (still sync-ing is ok)
3416       time.sleep(15)
3417       feedback_fn("* checking mirrors status")
3418       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3419     else:
3420       disk_abort = False
3421
3422     if disk_abort:
3423       _RemoveDisks(iobj, self.cfg)
3424       self.cfg.RemoveInstance(iobj.name)
3425       raise errors.OpExecError("There are some degraded disks for"
3426                                " this instance")
3427
3428     feedback_fn("creating os for instance %s on node %s" %
3429                 (instance, pnode_name))
3430
3431     if iobj.disk_template != constants.DT_DISKLESS:
3432       if self.op.mode == constants.INSTANCE_CREATE:
3433         feedback_fn("* running the instance OS create scripts...")
3434         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3435           raise errors.OpExecError("could not add os for instance %s"
3436                                    " on node %s" %
3437                                    (instance, pnode_name))
3438
3439       elif self.op.mode == constants.INSTANCE_IMPORT:
3440         feedback_fn("* running the instance OS import scripts...")
3441         src_node = self.op.src_node
3442         src_image = self.src_image
3443         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3444                                                 src_node, src_image):
3445           raise errors.OpExecError("Could not import os for instance"
3446                                    " %s on node %s" %
3447                                    (instance, pnode_name))
3448       else:
3449         # also checked in the prereq part
3450         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3451                                      % self.op.mode)
3452
3453     if self.op.start:
3454       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3455       feedback_fn("* starting instance...")
3456       if not rpc.call_instance_start(pnode_name, iobj, None):
3457         raise errors.OpExecError("Could not start instance")
3458
3459
3460 class LUConnectConsole(NoHooksLU):
3461   """Connect to an instance's console.
3462
3463   This is somewhat special in that it returns the command line that
3464   you need to run on the master node in order to connect to the
3465   console.
3466
3467   """
3468   _OP_REQP = ["instance_name"]
3469
3470   def CheckPrereq(self):
3471     """Check prerequisites.
3472
3473     This checks that the instance is in the cluster.
3474
3475     """
3476     instance = self.cfg.GetInstanceInfo(
3477       self.cfg.ExpandInstanceName(self.op.instance_name))
3478     if instance is None:
3479       raise errors.OpPrereqError("Instance '%s' not known" %
3480                                  self.op.instance_name)
3481     self.instance = instance
3482
3483   def Exec(self, feedback_fn):
3484     """Connect to the console of an instance
3485
3486     """
3487     instance = self.instance
3488     node = instance.primary_node
3489
3490     node_insts = rpc.call_instance_list([node])[node]
3491     if node_insts is False:
3492       raise errors.OpExecError("Can't connect to node %s." % node)
3493
3494     if instance.name not in node_insts:
3495       raise errors.OpExecError("Instance %s is not running." % instance.name)
3496
3497     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3498
3499     hyper = hypervisor.GetHypervisor()
3500     console_cmd = hyper.GetShellCommandForConsole(instance)
3501     # build ssh cmdline
3502     argv = ["ssh", "-q", "-t"]
3503     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3504     argv.extend(ssh.BATCH_MODE_OPTS)
3505     argv.append(node)
3506     argv.append(console_cmd)
3507     return "ssh", argv
3508
3509
3510 class LUAddMDDRBDComponent(LogicalUnit):
3511   """Adda new mirror member to an instance's disk.
3512
3513   """
3514   HPATH = "mirror-add"
3515   HTYPE = constants.HTYPE_INSTANCE
3516   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3517
3518   def BuildHooksEnv(self):
3519     """Build hooks env.
3520
3521     This runs on the master, the primary and all the secondaries.
3522
3523     """
3524     env = {
3525       "NEW_SECONDARY": self.op.remote_node,
3526       "DISK_NAME": self.op.disk_name,
3527       }
3528     env.update(_BuildInstanceHookEnvByObject(self.instance))
3529     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3530           self.op.remote_node,] + list(self.instance.secondary_nodes)
3531     return env, nl, nl
3532
3533   def CheckPrereq(self):
3534     """Check prerequisites.
3535
3536     This checks that the instance is in the cluster.
3537
3538     """
3539     instance = self.cfg.GetInstanceInfo(
3540       self.cfg.ExpandInstanceName(self.op.instance_name))
3541     if instance is None:
3542       raise errors.OpPrereqError("Instance '%s' not known" %
3543                                  self.op.instance_name)
3544     self.instance = instance
3545
3546     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3547     if remote_node is None:
3548       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3549     self.remote_node = remote_node
3550
3551     if remote_node == instance.primary_node:
3552       raise errors.OpPrereqError("The specified node is the primary node of"
3553                                  " the instance.")
3554
3555     if instance.disk_template != constants.DT_REMOTE_RAID1:
3556       raise errors.OpPrereqError("Instance's disk layout is not"
3557                                  " remote_raid1.")
3558     for disk in instance.disks:
3559       if disk.iv_name == self.op.disk_name:
3560         break
3561     else:
3562       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3563                                  " instance." % self.op.disk_name)
3564     if len(disk.children) > 1:
3565       raise errors.OpPrereqError("The device already has two slave devices."
3566                                  " This would create a 3-disk raid1 which we"
3567                                  " don't allow.")
3568     self.disk = disk
3569
3570   def Exec(self, feedback_fn):
3571     """Add the mirror component
3572
3573     """
3574     disk = self.disk
3575     instance = self.instance
3576
3577     remote_node = self.remote_node
3578     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3579     names = _GenerateUniqueNames(self.cfg, lv_names)
3580     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3581                                      remote_node, disk.size, names)
3582
3583     logger.Info("adding new mirror component on secondary")
3584     #HARDCODE
3585     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3586                                       new_drbd, False,
3587                                       _GetInstanceInfoText(instance)):
3588       raise errors.OpExecError("Failed to create new component on secondary"
3589                                " node %s" % remote_node)
3590
3591     logger.Info("adding new mirror component on primary")
3592     #HARDCODE
3593     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3594                                     instance, new_drbd,
3595                                     _GetInstanceInfoText(instance)):
3596       # remove secondary dev
3597       self.cfg.SetDiskID(new_drbd, remote_node)
3598       rpc.call_blockdev_remove(remote_node, new_drbd)
3599       raise errors.OpExecError("Failed to create volume on primary")
3600
3601     # the device exists now
3602     # call the primary node to add the mirror to md
3603     logger.Info("adding new mirror component to md")
3604     if not rpc.call_blockdev_addchildren(instance.primary_node,
3605                                          disk, [new_drbd]):
3606       logger.Error("Can't add mirror compoment to md!")
3607       self.cfg.SetDiskID(new_drbd, remote_node)
3608       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3609         logger.Error("Can't rollback on secondary")
3610       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3611       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3612         logger.Error("Can't rollback on primary")
3613       raise errors.OpExecError("Can't add mirror component to md array")
3614
3615     disk.children.append(new_drbd)
3616
3617     self.cfg.AddInstance(instance)
3618
3619     _WaitForSync(self.cfg, instance, self.proc)
3620
3621     return 0
3622
3623
3624 class LURemoveMDDRBDComponent(LogicalUnit):
3625   """Remove a component from a remote_raid1 disk.
3626
3627   """
3628   HPATH = "mirror-remove"
3629   HTYPE = constants.HTYPE_INSTANCE
3630   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3631
3632   def BuildHooksEnv(self):
3633     """Build hooks env.
3634
3635     This runs on the master, the primary and all the secondaries.
3636
3637     """
3638     env = {
3639       "DISK_NAME": self.op.disk_name,
3640       "DISK_ID": self.op.disk_id,
3641       "OLD_SECONDARY": self.old_secondary,
3642       }
3643     env.update(_BuildInstanceHookEnvByObject(self.instance))
3644     nl = [self.sstore.GetMasterNode(),
3645           self.instance.primary_node] + list(self.instance.secondary_nodes)
3646     return env, nl, nl
3647
3648   def CheckPrereq(self):
3649     """Check prerequisites.
3650
3651     This checks that the instance is in the cluster.
3652
3653     """
3654     instance = self.cfg.GetInstanceInfo(
3655       self.cfg.ExpandInstanceName(self.op.instance_name))
3656     if instance is None:
3657       raise errors.OpPrereqError("Instance '%s' not known" %
3658                                  self.op.instance_name)
3659     self.instance = instance
3660
3661     if instance.disk_template != constants.DT_REMOTE_RAID1:
3662       raise errors.OpPrereqError("Instance's disk layout is not"
3663                                  " remote_raid1.")
3664     for disk in instance.disks:
3665       if disk.iv_name == self.op.disk_name:
3666         break
3667     else:
3668       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3669                                  " instance." % self.op.disk_name)
3670     for child in disk.children:
3671       if (child.dev_type == constants.LD_DRBD7 and
3672           child.logical_id[2] == self.op.disk_id):
3673         break
3674     else:
3675       raise errors.OpPrereqError("Can't find the device with this port.")
3676
3677     if len(disk.children) < 2:
3678       raise errors.OpPrereqError("Cannot remove the last component from"
3679                                  " a mirror.")
3680     self.disk = disk
3681     self.child = child
3682     if self.child.logical_id[0] == instance.primary_node:
3683       oid = 1
3684     else:
3685       oid = 0
3686     self.old_secondary = self.child.logical_id[oid]
3687
3688   def Exec(self, feedback_fn):
3689     """Remove the mirror component
3690
3691     """
3692     instance = self.instance
3693     disk = self.disk
3694     child = self.child
3695     logger.Info("remove mirror component")
3696     self.cfg.SetDiskID(disk, instance.primary_node)
3697     if not rpc.call_blockdev_removechildren(instance.primary_node,
3698                                             disk, [child]):
3699       raise errors.OpExecError("Can't remove child from mirror.")
3700
3701     for node in child.logical_id[:2]:
3702       self.cfg.SetDiskID(child, node)
3703       if not rpc.call_blockdev_remove(node, child):
3704         logger.Error("Warning: failed to remove device from node %s,"
3705                      " continuing operation." % node)
3706
3707     disk.children.remove(child)
3708     self.cfg.AddInstance(instance)
3709
3710
3711 class LUReplaceDisks(LogicalUnit):
3712   """Replace the disks of an instance.
3713
3714   """
3715   HPATH = "mirrors-replace"
3716   HTYPE = constants.HTYPE_INSTANCE
3717   _OP_REQP = ["instance_name", "mode", "disks"]
3718
3719   def _RunAllocator(self):
3720     """Compute a new secondary node using an IAllocator.
3721
3722     """
3723     ial = IAllocator(self.cfg, self.sstore,
3724                      mode=constants.IALLOCATOR_MODE_RELOC,
3725                      name=self.op.instance_name,
3726                      relocate_from=[self.sec_node])
3727
3728     ial.Run(self.op.iallocator)
3729
3730     if not ial.success:
3731       raise errors.OpPrereqError("Can't compute nodes using"
3732                                  " iallocator '%s': %s" % (self.op.iallocator,
3733                                                            ial.info))
3734     if len(ial.nodes) != ial.required_nodes:
3735       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3736                                  " of nodes (%s), required %s" %
3737                                  (len(ial.nodes), ial.required_nodes))
3738     self.op.remote_node = ial.nodes[0]
3739     logger.ToStdout("Selected new secondary for the instance: %s" %
3740                     self.op.remote_node)
3741
3742   def BuildHooksEnv(self):
3743     """Build hooks env.
3744
3745     This runs on the master, the primary and all the secondaries.
3746
3747     """
3748     env = {
3749       "MODE": self.op.mode,
3750       "NEW_SECONDARY": self.op.remote_node,
3751       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3752       }
3753     env.update(_BuildInstanceHookEnvByObject(self.instance))
3754     nl = [
3755       self.sstore.GetMasterNode(),
3756       self.instance.primary_node,
3757       ]
3758     if self.op.remote_node is not None:
3759       nl.append(self.op.remote_node)
3760     return env, nl, nl
3761
3762   def CheckPrereq(self):
3763     """Check prerequisites.
3764
3765     This checks that the instance is in the cluster.
3766
3767     """
3768     if not hasattr(self.op, "remote_node"):
3769       self.op.remote_node = None
3770
3771     instance = self.cfg.GetInstanceInfo(
3772       self.cfg.ExpandInstanceName(self.op.instance_name))
3773     if instance is None:
3774       raise errors.OpPrereqError("Instance '%s' not known" %
3775                                  self.op.instance_name)
3776     self.instance = instance
3777     self.op.instance_name = instance.name
3778
3779     if instance.disk_template not in constants.DTS_NET_MIRROR:
3780       raise errors.OpPrereqError("Instance's disk layout is not"
3781                                  " network mirrored.")
3782
3783     if len(instance.secondary_nodes) != 1:
3784       raise errors.OpPrereqError("The instance has a strange layout,"
3785                                  " expected one secondary but found %d" %
3786                                  len(instance.secondary_nodes))
3787
3788     self.sec_node = instance.secondary_nodes[0]
3789
3790     ia_name = getattr(self.op, "iallocator", None)
3791     if ia_name is not None:
3792       if self.op.remote_node is not None:
3793         raise errors.OpPrereqError("Give either the iallocator or the new"
3794                                    " secondary, not both")
3795       self.op.remote_node = self._RunAllocator()
3796
3797     remote_node = self.op.remote_node
3798     if remote_node is not None:
3799       remote_node = self.cfg.ExpandNodeName(remote_node)
3800       if remote_node is None:
3801         raise errors.OpPrereqError("Node '%s' not known" %
3802                                    self.op.remote_node)
3803       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3804     else:
3805       self.remote_node_info = None
3806     if remote_node == instance.primary_node:
3807       raise errors.OpPrereqError("The specified node is the primary node of"
3808                                  " the instance.")
3809     elif remote_node == self.sec_node:
3810       if self.op.mode == constants.REPLACE_DISK_SEC:
3811         # this is for DRBD8, where we can't execute the same mode of
3812         # replacement as for drbd7 (no different port allocated)
3813         raise errors.OpPrereqError("Same secondary given, cannot execute"
3814                                    " replacement")
3815       # the user gave the current secondary, switch to
3816       # 'no-replace-secondary' mode for drbd7
3817       remote_node = None
3818     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3819         self.op.mode != constants.REPLACE_DISK_ALL):
3820       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3821                                  " disks replacement, not individual ones")
3822     if instance.disk_template == constants.DT_DRBD8:
3823       if (self.op.mode == constants.REPLACE_DISK_ALL and
3824           remote_node is not None):
3825         # switch to replace secondary mode
3826         self.op.mode = constants.REPLACE_DISK_SEC
3827
3828       if self.op.mode == constants.REPLACE_DISK_ALL:
3829         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3830                                    " secondary disk replacement, not"
3831                                    " both at once")
3832       elif self.op.mode == constants.REPLACE_DISK_PRI:
3833         if remote_node is not None:
3834           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3835                                      " the secondary while doing a primary"
3836                                      " node disk replacement")
3837         self.tgt_node = instance.primary_node
3838         self.oth_node = instance.secondary_nodes[0]
3839       elif self.op.mode == constants.REPLACE_DISK_SEC:
3840         self.new_node = remote_node # this can be None, in which case
3841                                     # we don't change the secondary
3842         self.tgt_node = instance.secondary_nodes[0]
3843         self.oth_node = instance.primary_node
3844       else:
3845         raise errors.ProgrammerError("Unhandled disk replace mode")
3846
3847     for name in self.op.disks:
3848       if instance.FindDisk(name) is None:
3849         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3850                                    (name, instance.name))
3851     self.op.remote_node = remote_node
3852
3853   def _ExecRR1(self, feedback_fn):
3854     """Replace the disks of an instance.
3855
3856     """
3857     instance = self.instance
3858     iv_names = {}
3859     # start of work
3860     if self.op.remote_node is None:
3861       remote_node = self.sec_node
3862     else:
3863       remote_node = self.op.remote_node
3864     cfg = self.cfg
3865     for dev in instance.disks:
3866       size = dev.size
3867       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3868       names = _GenerateUniqueNames(cfg, lv_names)
3869       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3870                                        remote_node, size, names)
3871       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3872       logger.Info("adding new mirror component on secondary for %s" %
3873                   dev.iv_name)
3874       #HARDCODE
3875       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3876                                         new_drbd, False,
3877                                         _GetInstanceInfoText(instance)):
3878         raise errors.OpExecError("Failed to create new component on secondary"
3879                                  " node %s. Full abort, cleanup manually!" %
3880                                  remote_node)
3881
3882       logger.Info("adding new mirror component on primary")
3883       #HARDCODE
3884       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3885                                       instance, new_drbd,
3886                                       _GetInstanceInfoText(instance)):
3887         # remove secondary dev
3888         cfg.SetDiskID(new_drbd, remote_node)
3889         rpc.call_blockdev_remove(remote_node, new_drbd)
3890         raise errors.OpExecError("Failed to create volume on primary!"
3891                                  " Full abort, cleanup manually!!")
3892
3893       # the device exists now
3894       # call the primary node to add the mirror to md
3895       logger.Info("adding new mirror component to md")
3896       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3897                                            [new_drbd]):
3898         logger.Error("Can't add mirror compoment to md!")
3899         cfg.SetDiskID(new_drbd, remote_node)
3900         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3901           logger.Error("Can't rollback on secondary")
3902         cfg.SetDiskID(new_drbd, instance.primary_node)
3903         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3904           logger.Error("Can't rollback on primary")
3905         raise errors.OpExecError("Full abort, cleanup manually!!")
3906
3907       dev.children.append(new_drbd)
3908       cfg.AddInstance(instance)
3909
3910     # this can fail as the old devices are degraded and _WaitForSync
3911     # does a combined result over all disks, so we don't check its
3912     # return value
3913     _WaitForSync(cfg, instance, self.proc, unlock=True)
3914
3915     # so check manually all the devices
3916     for name in iv_names:
3917       dev, child, new_drbd = iv_names[name]
3918       cfg.SetDiskID(dev, instance.primary_node)
3919       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3920       if is_degr:
3921         raise errors.OpExecError("MD device %s is degraded!" % name)
3922       cfg.SetDiskID(new_drbd, instance.primary_node)
3923       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3924       if is_degr:
3925         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3926
3927     for name in iv_names:
3928       dev, child, new_drbd = iv_names[name]
3929       logger.Info("remove mirror %s component" % name)
3930       cfg.SetDiskID(dev, instance.primary_node)
3931       if not rpc.call_blockdev_removechildren(instance.primary_node,
3932                                               dev, [child]):
3933         logger.Error("Can't remove child from mirror, aborting"
3934                      " *this device cleanup*.\nYou need to cleanup manually!!")
3935         continue
3936
3937       for node in child.logical_id[:2]:
3938         logger.Info("remove child device on %s" % node)
3939         cfg.SetDiskID(child, node)
3940         if not rpc.call_blockdev_remove(node, child):
3941           logger.Error("Warning: failed to remove device from node %s,"
3942                        " continuing operation." % node)
3943
3944       dev.children.remove(child)
3945
3946       cfg.AddInstance(instance)
3947
3948   def _ExecD8DiskOnly(self, feedback_fn):
3949     """Replace a disk on the primary or secondary for dbrd8.
3950
3951     The algorithm for replace is quite complicated:
3952       - for each disk to be replaced:
3953         - create new LVs on the target node with unique names
3954         - detach old LVs from the drbd device
3955         - rename old LVs to name_replaced.<time_t>
3956         - rename new LVs to old LVs
3957         - attach the new LVs (with the old names now) to the drbd device
3958       - wait for sync across all devices
3959       - for each modified disk:
3960         - remove old LVs (which have the name name_replaces.<time_t>)
3961
3962     Failures are not very well handled.
3963
3964     """
3965     steps_total = 6
3966     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3967     instance = self.instance
3968     iv_names = {}
3969     vgname = self.cfg.GetVGName()
3970     # start of work
3971     cfg = self.cfg
3972     tgt_node = self.tgt_node
3973     oth_node = self.oth_node
3974
3975     # Step: check device activation
3976     self.proc.LogStep(1, steps_total, "check device existence")
3977     info("checking volume groups")
3978     my_vg = cfg.GetVGName()
3979     results = rpc.call_vg_list([oth_node, tgt_node])
3980     if not results:
3981       raise errors.OpExecError("Can't list volume groups on the nodes")
3982     for node in oth_node, tgt_node:
3983       res = results.get(node, False)
3984       if not res or my_vg not in res:
3985         raise errors.OpExecError("Volume group '%s' not found on %s" %
3986                                  (my_vg, node))
3987     for dev in instance.disks:
3988       if not dev.iv_name in self.op.disks:
3989         continue
3990       for node in tgt_node, oth_node:
3991         info("checking %s on %s" % (dev.iv_name, node))
3992         cfg.SetDiskID(dev, node)
3993         if not rpc.call_blockdev_find(node, dev):
3994           raise errors.OpExecError("Can't find device %s on node %s" %
3995                                    (dev.iv_name, node))
3996
3997     # Step: check other node consistency
3998     self.proc.LogStep(2, steps_total, "check peer consistency")
3999     for dev in instance.disks:
4000       if not dev.iv_name in self.op.disks:
4001         continue
4002       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4003       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4004                                    oth_node==instance.primary_node):
4005         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4006                                  " to replace disks on this node (%s)" %
4007                                  (oth_node, tgt_node))
4008
4009     # Step: create new storage
4010     self.proc.LogStep(3, steps_total, "allocate new storage")
4011     for dev in instance.disks:
4012       if not dev.iv_name in self.op.disks:
4013         continue
4014       size = dev.size
4015       cfg.SetDiskID(dev, tgt_node)
4016       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4017       names = _GenerateUniqueNames(cfg, lv_names)
4018       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4019                              logical_id=(vgname, names[0]))
4020       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4021                              logical_id=(vgname, names[1]))
4022       new_lvs = [lv_data, lv_meta]
4023       old_lvs = dev.children
4024       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4025       info("creating new local storage on %s for %s" %
4026            (tgt_node, dev.iv_name))
4027       # since we *always* want to create this LV, we use the
4028       # _Create...OnPrimary (which forces the creation), even if we
4029       # are talking about the secondary node
4030       for new_lv in new_lvs:
4031         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4032                                         _GetInstanceInfoText(instance)):
4033           raise errors.OpExecError("Failed to create new LV named '%s' on"
4034                                    " node '%s'" %
4035                                    (new_lv.logical_id[1], tgt_node))
4036
4037     # Step: for each lv, detach+rename*2+attach
4038     self.proc.LogStep(4, steps_total, "change drbd configuration")
4039     for dev, old_lvs, new_lvs in iv_names.itervalues():
4040       info("detaching %s drbd from local storage" % dev.iv_name)
4041       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4042         raise errors.OpExecError("Can't detach drbd from local storage on node"
4043                                  " %s for device %s" % (tgt_node, dev.iv_name))
4044       #dev.children = []
4045       #cfg.Update(instance)
4046
4047       # ok, we created the new LVs, so now we know we have the needed
4048       # storage; as such, we proceed on the target node to rename
4049       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4050       # using the assumption that logical_id == physical_id (which in
4051       # turn is the unique_id on that node)
4052
4053       # FIXME(iustin): use a better name for the replaced LVs
4054       temp_suffix = int(time.time())
4055       ren_fn = lambda d, suff: (d.physical_id[0],
4056                                 d.physical_id[1] + "_replaced-%s" % suff)
4057       # build the rename list based on what LVs exist on the node
4058       rlist = []
4059       for to_ren in old_lvs:
4060         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4061         if find_res is not None: # device exists
4062           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4063
4064       info("renaming the old LVs on the target node")
4065       if not rpc.call_blockdev_rename(tgt_node, rlist):
4066         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4067       # now we rename the new LVs to the old LVs
4068       info("renaming the new LVs on the target node")
4069       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4070       if not rpc.call_blockdev_rename(tgt_node, rlist):
4071         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4072
4073       for old, new in zip(old_lvs, new_lvs):
4074         new.logical_id = old.logical_id
4075         cfg.SetDiskID(new, tgt_node)
4076
4077       for disk in old_lvs:
4078         disk.logical_id = ren_fn(disk, temp_suffix)
4079         cfg.SetDiskID(disk, tgt_node)
4080
4081       # now that the new lvs have the old name, we can add them to the device
4082       info("adding new mirror component on %s" % tgt_node)
4083       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4084         for new_lv in new_lvs:
4085           if not rpc.call_blockdev_remove(tgt_node, new_lv):
4086             warning("Can't rollback device %s", hint="manually cleanup unused"
4087                     " logical volumes")
4088         raise errors.OpExecError("Can't add local storage to drbd")
4089
4090       dev.children = new_lvs
4091       cfg.Update(instance)
4092
4093     # Step: wait for sync
4094
4095     # this can fail as the old devices are degraded and _WaitForSync
4096     # does a combined result over all disks, so we don't check its
4097     # return value
4098     self.proc.LogStep(5, steps_total, "sync devices")
4099     _WaitForSync(cfg, instance, self.proc, unlock=True)
4100
4101     # so check manually all the devices
4102     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4103       cfg.SetDiskID(dev, instance.primary_node)
4104       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4105       if is_degr:
4106         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4107
4108     # Step: remove old storage
4109     self.proc.LogStep(6, steps_total, "removing old storage")
4110     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4111       info("remove logical volumes for %s" % name)
4112       for lv in old_lvs:
4113         cfg.SetDiskID(lv, tgt_node)
4114         if not rpc.call_blockdev_remove(tgt_node, lv):
4115           warning("Can't remove old LV", hint="manually remove unused LVs")
4116           continue
4117
4118   def _ExecD8Secondary(self, feedback_fn):
4119     """Replace the secondary node for drbd8.
4120
4121     The algorithm for replace is quite complicated:
4122       - for all disks of the instance:
4123         - create new LVs on the new node with same names
4124         - shutdown the drbd device on the old secondary
4125         - disconnect the drbd network on the primary
4126         - create the drbd device on the new secondary
4127         - network attach the drbd on the primary, using an artifice:
4128           the drbd code for Attach() will connect to the network if it
4129           finds a device which is connected to the good local disks but
4130           not network enabled
4131       - wait for sync across all devices
4132       - remove all disks from the old secondary
4133
4134     Failures are not very well handled.
4135
4136     """
4137     steps_total = 6
4138     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4139     instance = self.instance
4140     iv_names = {}
4141     vgname = self.cfg.GetVGName()
4142     # start of work
4143     cfg = self.cfg
4144     old_node = self.tgt_node
4145     new_node = self.new_node
4146     pri_node = instance.primary_node
4147
4148     # Step: check device activation
4149     self.proc.LogStep(1, steps_total, "check device existence")
4150     info("checking volume groups")
4151     my_vg = cfg.GetVGName()
4152     results = rpc.call_vg_list([pri_node, new_node])
4153     if not results:
4154       raise errors.OpExecError("Can't list volume groups on the nodes")
4155     for node in pri_node, new_node:
4156       res = results.get(node, False)
4157       if not res or my_vg not in res:
4158         raise errors.OpExecError("Volume group '%s' not found on %s" %
4159                                  (my_vg, node))
4160     for dev in instance.disks:
4161       if not dev.iv_name in self.op.disks:
4162         continue
4163       info("checking %s on %s" % (dev.iv_name, pri_node))
4164       cfg.SetDiskID(dev, pri_node)
4165       if not rpc.call_blockdev_find(pri_node, dev):
4166         raise errors.OpExecError("Can't find device %s on node %s" %
4167                                  (dev.iv_name, pri_node))
4168
4169     # Step: check other node consistency
4170     self.proc.LogStep(2, steps_total, "check peer consistency")
4171     for dev in instance.disks:
4172       if not dev.iv_name in self.op.disks:
4173         continue
4174       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4175       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4176         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4177                                  " unsafe to replace the secondary" %
4178                                  pri_node)
4179
4180     # Step: create new storage
4181     self.proc.LogStep(3, steps_total, "allocate new storage")
4182     for dev in instance.disks:
4183       size = dev.size
4184       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4185       # since we *always* want to create this LV, we use the
4186       # _Create...OnPrimary (which forces the creation), even if we
4187       # are talking about the secondary node
4188       for new_lv in dev.children:
4189         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4190                                         _GetInstanceInfoText(instance)):
4191           raise errors.OpExecError("Failed to create new LV named '%s' on"
4192                                    " node '%s'" %
4193                                    (new_lv.logical_id[1], new_node))
4194
4195       iv_names[dev.iv_name] = (dev, dev.children)
4196
4197     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4198     for dev in instance.disks:
4199       size = dev.size
4200       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4201       # create new devices on new_node
4202       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4203                               logical_id=(pri_node, new_node,
4204                                           dev.logical_id[2]),
4205                               children=dev.children)
4206       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4207                                         new_drbd, False,
4208                                       _GetInstanceInfoText(instance)):
4209         raise errors.OpExecError("Failed to create new DRBD on"
4210                                  " node '%s'" % new_node)
4211
4212     for dev in instance.disks:
4213       # we have new devices, shutdown the drbd on the old secondary
4214       info("shutting down drbd for %s on old node" % dev.iv_name)
4215       cfg.SetDiskID(dev, old_node)
4216       if not rpc.call_blockdev_shutdown(old_node, dev):
4217         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4218                 hint="Please cleanup this device manually as soon as possible")
4219
4220     info("detaching primary drbds from the network (=> standalone)")
4221     done = 0
4222     for dev in instance.disks:
4223       cfg.SetDiskID(dev, pri_node)
4224       # set the physical (unique in bdev terms) id to None, meaning
4225       # detach from network
4226       dev.physical_id = (None,) * len(dev.physical_id)
4227       # and 'find' the device, which will 'fix' it to match the
4228       # standalone state
4229       if rpc.call_blockdev_find(pri_node, dev):
4230         done += 1
4231       else:
4232         warning("Failed to detach drbd %s from network, unusual case" %
4233                 dev.iv_name)
4234
4235     if not done:
4236       # no detaches succeeded (very unlikely)
4237       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4238
4239     # if we managed to detach at least one, we update all the disks of
4240     # the instance to point to the new secondary
4241     info("updating instance configuration")
4242     for dev in instance.disks:
4243       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4244       cfg.SetDiskID(dev, pri_node)
4245     cfg.Update(instance)
4246
4247     # and now perform the drbd attach
4248     info("attaching primary drbds to new secondary (standalone => connected)")
4249     failures = []
4250     for dev in instance.disks:
4251       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4252       # since the attach is smart, it's enough to 'find' the device,
4253       # it will automatically activate the network, if the physical_id
4254       # is correct
4255       cfg.SetDiskID(dev, pri_node)
4256       if not rpc.call_blockdev_find(pri_node, dev):
4257         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4258                 "please do a gnt-instance info to see the status of disks")
4259
4260     # this can fail as the old devices are degraded and _WaitForSync
4261     # does a combined result over all disks, so we don't check its
4262     # return value
4263     self.proc.LogStep(5, steps_total, "sync devices")
4264     _WaitForSync(cfg, instance, self.proc, unlock=True)
4265
4266     # so check manually all the devices
4267     for name, (dev, old_lvs) in iv_names.iteritems():
4268       cfg.SetDiskID(dev, pri_node)
4269       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4270       if is_degr:
4271         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4272
4273     self.proc.LogStep(6, steps_total, "removing old storage")
4274     for name, (dev, old_lvs) in iv_names.iteritems():
4275       info("remove logical volumes for %s" % name)
4276       for lv in old_lvs:
4277         cfg.SetDiskID(lv, old_node)
4278         if not rpc.call_blockdev_remove(old_node, lv):
4279           warning("Can't remove LV on old secondary",
4280                   hint="Cleanup stale volumes by hand")
4281
4282   def Exec(self, feedback_fn):
4283     """Execute disk replacement.
4284
4285     This dispatches the disk replacement to the appropriate handler.
4286
4287     """
4288     instance = self.instance
4289     if instance.disk_template == constants.DT_REMOTE_RAID1:
4290       fn = self._ExecRR1
4291     elif instance.disk_template == constants.DT_DRBD8:
4292       if self.op.remote_node is None:
4293         fn = self._ExecD8DiskOnly
4294       else:
4295         fn = self._ExecD8Secondary
4296     else:
4297       raise errors.ProgrammerError("Unhandled disk replacement case")
4298     return fn(feedback_fn)
4299
4300
4301 class LUQueryInstanceData(NoHooksLU):
4302   """Query runtime instance data.
4303
4304   """
4305   _OP_REQP = ["instances"]
4306
4307   def CheckPrereq(self):
4308     """Check prerequisites.
4309
4310     This only checks the optional instance list against the existing names.
4311
4312     """
4313     if not isinstance(self.op.instances, list):
4314       raise errors.OpPrereqError("Invalid argument type 'instances'")
4315     if self.op.instances:
4316       self.wanted_instances = []
4317       names = self.op.instances
4318       for name in names:
4319         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4320         if instance is None:
4321           raise errors.OpPrereqError("No such instance name '%s'" % name)
4322         self.wanted_instances.append(instance)
4323     else:
4324       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4325                                in self.cfg.GetInstanceList()]
4326     return
4327
4328
4329   def _ComputeDiskStatus(self, instance, snode, dev):
4330     """Compute block device status.
4331
4332     """
4333     self.cfg.SetDiskID(dev, instance.primary_node)
4334     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4335     if dev.dev_type in constants.LDS_DRBD:
4336       # we change the snode then (otherwise we use the one passed in)
4337       if dev.logical_id[0] == instance.primary_node:
4338         snode = dev.logical_id[1]
4339       else:
4340         snode = dev.logical_id[0]
4341
4342     if snode:
4343       self.cfg.SetDiskID(dev, snode)
4344       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4345     else:
4346       dev_sstatus = None
4347
4348     if dev.children:
4349       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4350                       for child in dev.children]
4351     else:
4352       dev_children = []
4353
4354     data = {
4355       "iv_name": dev.iv_name,
4356       "dev_type": dev.dev_type,
4357       "logical_id": dev.logical_id,
4358       "physical_id": dev.physical_id,
4359       "pstatus": dev_pstatus,
4360       "sstatus": dev_sstatus,
4361       "children": dev_children,
4362       }
4363
4364     return data
4365
4366   def Exec(self, feedback_fn):
4367     """Gather and return data"""
4368     result = {}
4369     for instance in self.wanted_instances:
4370       remote_info = rpc.call_instance_info(instance.primary_node,
4371                                                 instance.name)
4372       if remote_info and "state" in remote_info:
4373         remote_state = "up"
4374       else:
4375         remote_state = "down"
4376       if instance.status == "down":
4377         config_state = "down"
4378       else:
4379         config_state = "up"
4380
4381       disks = [self._ComputeDiskStatus(instance, None, device)
4382                for device in instance.disks]
4383
4384       idict = {
4385         "name": instance.name,
4386         "config_state": config_state,
4387         "run_state": remote_state,
4388         "pnode": instance.primary_node,
4389         "snodes": instance.secondary_nodes,
4390         "os": instance.os,
4391         "memory": instance.memory,
4392         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4393         "disks": disks,
4394         "vcpus": instance.vcpus,
4395         }
4396
4397       htkind = self.sstore.GetHypervisorType()
4398       if htkind == constants.HT_XEN_PVM30:
4399         idict["kernel_path"] = instance.kernel_path
4400         idict["initrd_path"] = instance.initrd_path
4401
4402       if htkind == constants.HT_XEN_HVM31:
4403         idict["hvm_boot_order"] = instance.hvm_boot_order
4404         idict["hvm_acpi"] = instance.hvm_acpi
4405         idict["hvm_pae"] = instance.hvm_pae
4406         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4407
4408       if htkind in constants.HTS_REQ_PORT:
4409         idict["vnc_bind_address"] = instance.vnc_bind_address
4410         idict["network_port"] = instance.network_port
4411
4412       result[instance.name] = idict
4413
4414     return result
4415
4416
4417 class LUSetInstanceParms(LogicalUnit):
4418   """Modifies an instances's parameters.
4419
4420   """
4421   HPATH = "instance-modify"
4422   HTYPE = constants.HTYPE_INSTANCE
4423   _OP_REQP = ["instance_name"]
4424
4425   def BuildHooksEnv(self):
4426     """Build hooks env.
4427
4428     This runs on the master, primary and secondaries.
4429
4430     """
4431     args = dict()
4432     if self.mem:
4433       args['memory'] = self.mem
4434     if self.vcpus:
4435       args['vcpus'] = self.vcpus
4436     if self.do_ip or self.do_bridge or self.mac:
4437       if self.do_ip:
4438         ip = self.ip
4439       else:
4440         ip = self.instance.nics[0].ip
4441       if self.bridge:
4442         bridge = self.bridge
4443       else:
4444         bridge = self.instance.nics[0].bridge
4445       if self.mac:
4446         mac = self.mac
4447       else:
4448         mac = self.instance.nics[0].mac
4449       args['nics'] = [(ip, bridge, mac)]
4450     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4451     nl = [self.sstore.GetMasterNode(),
4452           self.instance.primary_node] + list(self.instance.secondary_nodes)
4453     return env, nl, nl
4454
4455   def CheckPrereq(self):
4456     """Check prerequisites.
4457
4458     This only checks the instance list against the existing names.
4459
4460     """
4461     self.mem = getattr(self.op, "mem", None)
4462     self.vcpus = getattr(self.op, "vcpus", None)
4463     self.ip = getattr(self.op, "ip", None)
4464     self.mac = getattr(self.op, "mac", None)
4465     self.bridge = getattr(self.op, "bridge", None)
4466     self.kernel_path = getattr(self.op, "kernel_path", None)
4467     self.initrd_path = getattr(self.op, "initrd_path", None)
4468     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4469     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4470     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4471     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4472     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4473     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4474                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4475                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4476                  self.vnc_bind_address]
4477     if all_parms.count(None) == len(all_parms):
4478       raise errors.OpPrereqError("No changes submitted")
4479     if self.mem is not None:
4480       try:
4481         self.mem = int(self.mem)
4482       except ValueError, err:
4483         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4484     if self.vcpus is not None:
4485       try:
4486         self.vcpus = int(self.vcpus)
4487       except ValueError, err:
4488         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4489     if self.ip is not None:
4490       self.do_ip = True
4491       if self.ip.lower() == "none":
4492         self.ip = None
4493       else:
4494         if not utils.IsValidIP(self.ip):
4495           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4496     else:
4497       self.do_ip = False
4498     self.do_bridge = (self.bridge is not None)
4499     if self.mac is not None:
4500       if self.cfg.IsMacInUse(self.mac):
4501         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4502                                    self.mac)
4503       if not utils.IsValidMac(self.mac):
4504         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4505
4506     if self.kernel_path is not None:
4507       self.do_kernel_path = True
4508       if self.kernel_path == constants.VALUE_NONE:
4509         raise errors.OpPrereqError("Can't set instance to no kernel")
4510
4511       if self.kernel_path != constants.VALUE_DEFAULT:
4512         if not os.path.isabs(self.kernel_path):
4513           raise errors.OpPrereqError("The kernel path must be an absolute"
4514                                     " filename")
4515     else:
4516       self.do_kernel_path = False
4517
4518     if self.initrd_path is not None:
4519       self.do_initrd_path = True
4520       if self.initrd_path not in (constants.VALUE_NONE,
4521                                   constants.VALUE_DEFAULT):
4522         if not os.path.isabs(self.initrd_path):
4523           raise errors.OpPrereqError("The initrd path must be an absolute"
4524                                     " filename")
4525     else:
4526       self.do_initrd_path = False
4527
4528     # boot order verification
4529     if self.hvm_boot_order is not None:
4530       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4531         if len(self.hvm_boot_order.strip("acdn")) != 0:
4532           raise errors.OpPrereqError("invalid boot order specified,"
4533                                      " must be one or more of [acdn]"
4534                                      " or 'default'")
4535
4536     # hvm_cdrom_image_path verification
4537     if self.op.hvm_cdrom_image_path is not None:
4538       if not os.path.isabs(self.op.hvm_cdrom_image_path):
4539         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4540                                    " be an absolute path or None, not %s" %
4541                                    self.op.hvm_cdrom_image_path)
4542       if not os.path.isfile(self.op.hvm_cdrom_image_path):
4543         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4544                                    " regular file or a symlink pointing to"
4545                                    " an existing regular file, not %s" %
4546                                    self.op.hvm_cdrom_image_path)
4547
4548     # vnc_bind_address verification
4549     if self.op.vnc_bind_address is not None:
4550       if not utils.IsValidIP(self.op.vnc_bind_address):
4551         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4552                                    " like a valid IP address" %
4553                                    self.op.vnc_bind_address)
4554
4555     instance = self.cfg.GetInstanceInfo(
4556       self.cfg.ExpandInstanceName(self.op.instance_name))
4557     if instance is None:
4558       raise errors.OpPrereqError("No such instance name '%s'" %
4559                                  self.op.instance_name)
4560     self.op.instance_name = instance.name
4561     self.instance = instance
4562     return
4563
4564   def Exec(self, feedback_fn):
4565     """Modifies an instance.
4566
4567     All parameters take effect only at the next restart of the instance.
4568     """
4569     result = []
4570     instance = self.instance
4571     if self.mem:
4572       instance.memory = self.mem
4573       result.append(("mem", self.mem))
4574     if self.vcpus:
4575       instance.vcpus = self.vcpus
4576       result.append(("vcpus",  self.vcpus))
4577     if self.do_ip:
4578       instance.nics[0].ip = self.ip
4579       result.append(("ip", self.ip))
4580     if self.bridge:
4581       instance.nics[0].bridge = self.bridge
4582       result.append(("bridge", self.bridge))
4583     if self.mac:
4584       instance.nics[0].mac = self.mac
4585       result.append(("mac", self.mac))
4586     if self.do_kernel_path:
4587       instance.kernel_path = self.kernel_path
4588       result.append(("kernel_path", self.kernel_path))
4589     if self.do_initrd_path:
4590       instance.initrd_path = self.initrd_path
4591       result.append(("initrd_path", self.initrd_path))
4592     if self.hvm_boot_order:
4593       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4594         instance.hvm_boot_order = None
4595       else:
4596         instance.hvm_boot_order = self.hvm_boot_order
4597       result.append(("hvm_boot_order", self.hvm_boot_order))
4598     if self.hvm_acpi:
4599       result.append(("hvm_acpi", self.hvm_acpi))
4600     if self.hvm_pae:
4601       result.append(("hvm_pae", self.hvm_pae))
4602     if self.hvm_cdrom_image_path:
4603       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4604     if self.vnc_bind_address:
4605       instance.vnc_bind_address = self.vnc_bind_address
4606       result.append(("vnc_bind_address", self.vnc_bind_address))
4607
4608     self.cfg.AddInstance(instance)
4609
4610     return result
4611
4612
4613 class LUQueryExports(NoHooksLU):
4614   """Query the exports list
4615
4616   """
4617   _OP_REQP = []
4618
4619   def CheckPrereq(self):
4620     """Check that the nodelist contains only existing nodes.
4621
4622     """
4623     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4624
4625   def Exec(self, feedback_fn):
4626     """Compute the list of all the exported system images.
4627
4628     Returns:
4629       a dictionary with the structure node->(export-list)
4630       where export-list is a list of the instances exported on
4631       that node.
4632
4633     """
4634     return rpc.call_export_list(self.nodes)
4635
4636
4637 class LUExportInstance(LogicalUnit):
4638   """Export an instance to an image in the cluster.
4639
4640   """
4641   HPATH = "instance-export"
4642   HTYPE = constants.HTYPE_INSTANCE
4643   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4644
4645   def BuildHooksEnv(self):
4646     """Build hooks env.
4647
4648     This will run on the master, primary node and target node.
4649
4650     """
4651     env = {
4652       "EXPORT_NODE": self.op.target_node,
4653       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4654       }
4655     env.update(_BuildInstanceHookEnvByObject(self.instance))
4656     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4657           self.op.target_node]
4658     return env, nl, nl
4659
4660   def CheckPrereq(self):
4661     """Check prerequisites.
4662
4663     This checks that the instance and node names are valid.
4664
4665     """
4666     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4667     self.instance = self.cfg.GetInstanceInfo(instance_name)
4668     if self.instance is None:
4669       raise errors.OpPrereqError("Instance '%s' not found" %
4670                                  self.op.instance_name)
4671
4672     # node verification
4673     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4674     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4675
4676     if self.dst_node is None:
4677       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4678                                  self.op.target_node)
4679     self.op.target_node = self.dst_node.name
4680
4681   def Exec(self, feedback_fn):
4682     """Export an instance to an image in the cluster.
4683
4684     """
4685     instance = self.instance
4686     dst_node = self.dst_node
4687     src_node = instance.primary_node
4688     if self.op.shutdown:
4689       # shutdown the instance, but not the disks
4690       if not rpc.call_instance_shutdown(src_node, instance):
4691         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4692                                  (instance.name, src_node))
4693
4694     vgname = self.cfg.GetVGName()
4695
4696     snap_disks = []
4697
4698     try:
4699       for disk in instance.disks:
4700         if disk.iv_name == "sda":
4701           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4702           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4703
4704           if not new_dev_name:
4705             logger.Error("could not snapshot block device %s on node %s" %
4706                          (disk.logical_id[1], src_node))
4707           else:
4708             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4709                                       logical_id=(vgname, new_dev_name),
4710                                       physical_id=(vgname, new_dev_name),
4711                                       iv_name=disk.iv_name)
4712             snap_disks.append(new_dev)
4713
4714     finally:
4715       if self.op.shutdown and instance.status == "up":
4716         if not rpc.call_instance_start(src_node, instance, None):
4717           _ShutdownInstanceDisks(instance, self.cfg)
4718           raise errors.OpExecError("Could not start instance")
4719
4720     # TODO: check for size
4721
4722     for dev in snap_disks:
4723       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4724                                            instance):
4725         logger.Error("could not export block device %s from node"
4726                      " %s to node %s" %
4727                      (dev.logical_id[1], src_node, dst_node.name))
4728       if not rpc.call_blockdev_remove(src_node, dev):
4729         logger.Error("could not remove snapshot block device %s from"
4730                      " node %s" % (dev.logical_id[1], src_node))
4731
4732     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4733       logger.Error("could not finalize export for instance %s on node %s" %
4734                    (instance.name, dst_node.name))
4735
4736     nodelist = self.cfg.GetNodeList()
4737     nodelist.remove(dst_node.name)
4738
4739     # on one-node clusters nodelist will be empty after the removal
4740     # if we proceed the backup would be removed because OpQueryExports
4741     # substitutes an empty list with the full cluster node list.
4742     if nodelist:
4743       op = opcodes.OpQueryExports(nodes=nodelist)
4744       exportlist = self.proc.ChainOpCode(op)
4745       for node in exportlist:
4746         if instance.name in exportlist[node]:
4747           if not rpc.call_export_remove(node, instance.name):
4748             logger.Error("could not remove older export for instance %s"
4749                          " on node %s" % (instance.name, node))
4750
4751
4752 class LURemoveExport(NoHooksLU):
4753   """Remove exports related to the named instance.
4754
4755   """
4756   _OP_REQP = ["instance_name"]
4757
4758   def CheckPrereq(self):
4759     """Check prerequisites.
4760     """
4761     pass
4762
4763   def Exec(self, feedback_fn):
4764     """Remove any export.
4765
4766     """
4767     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4768     # If the instance was not found we'll try with the name that was passed in.
4769     # This will only work if it was an FQDN, though.
4770     fqdn_warn = False
4771     if not instance_name:
4772       fqdn_warn = True
4773       instance_name = self.op.instance_name
4774
4775     op = opcodes.OpQueryExports(nodes=[])
4776     exportlist = self.proc.ChainOpCode(op)
4777     found = False
4778     for node in exportlist:
4779       if instance_name in exportlist[node]:
4780         found = True
4781         if not rpc.call_export_remove(node, instance_name):
4782           logger.Error("could not remove export for instance %s"
4783                        " on node %s" % (instance_name, node))
4784
4785     if fqdn_warn and not found:
4786       feedback_fn("Export not found. If trying to remove an export belonging"
4787                   " to a deleted instance please use its Fully Qualified"
4788                   " Domain Name.")
4789
4790
4791 class TagsLU(NoHooksLU):
4792   """Generic tags LU.
4793
4794   This is an abstract class which is the parent of all the other tags LUs.
4795
4796   """
4797   def CheckPrereq(self):
4798     """Check prerequisites.
4799
4800     """
4801     if self.op.kind == constants.TAG_CLUSTER:
4802       self.target = self.cfg.GetClusterInfo()
4803     elif self.op.kind == constants.TAG_NODE:
4804       name = self.cfg.ExpandNodeName(self.op.name)
4805       if name is None:
4806         raise errors.OpPrereqError("Invalid node name (%s)" %
4807                                    (self.op.name,))
4808       self.op.name = name
4809       self.target = self.cfg.GetNodeInfo(name)
4810     elif self.op.kind == constants.TAG_INSTANCE:
4811       name = self.cfg.ExpandInstanceName(self.op.name)
4812       if name is None:
4813         raise errors.OpPrereqError("Invalid instance name (%s)" %
4814                                    (self.op.name,))
4815       self.op.name = name
4816       self.target = self.cfg.GetInstanceInfo(name)
4817     else:
4818       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4819                                  str(self.op.kind))
4820
4821
4822 class LUGetTags(TagsLU):
4823   """Returns the tags of a given object.
4824
4825   """
4826   _OP_REQP = ["kind", "name"]
4827
4828   def Exec(self, feedback_fn):
4829     """Returns the tag list.
4830
4831     """
4832     return self.target.GetTags()
4833
4834
4835 class LUSearchTags(NoHooksLU):
4836   """Searches the tags for a given pattern.
4837
4838   """
4839   _OP_REQP = ["pattern"]
4840
4841   def CheckPrereq(self):
4842     """Check prerequisites.
4843
4844     This checks the pattern passed for validity by compiling it.
4845
4846     """
4847     try:
4848       self.re = re.compile(self.op.pattern)
4849     except re.error, err:
4850       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4851                                  (self.op.pattern, err))
4852
4853   def Exec(self, feedback_fn):
4854     """Returns the tag list.
4855
4856     """
4857     cfg = self.cfg
4858     tgts = [("/cluster", cfg.GetClusterInfo())]
4859     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4860     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4861     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4862     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4863     results = []
4864     for path, target in tgts:
4865       for tag in target.GetTags():
4866         if self.re.search(tag):
4867           results.append((path, tag))
4868     return results
4869
4870
4871 class LUAddTags(TagsLU):
4872   """Sets a tag on a given object.
4873
4874   """
4875   _OP_REQP = ["kind", "name", "tags"]
4876
4877   def CheckPrereq(self):
4878     """Check prerequisites.
4879
4880     This checks the type and length of the tag name and value.
4881
4882     """
4883     TagsLU.CheckPrereq(self)
4884     for tag in self.op.tags:
4885       objects.TaggableObject.ValidateTag(tag)
4886
4887   def Exec(self, feedback_fn):
4888     """Sets the tag.
4889
4890     """
4891     try:
4892       for tag in self.op.tags:
4893         self.target.AddTag(tag)
4894     except errors.TagError, err:
4895       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4896     try:
4897       self.cfg.Update(self.target)
4898     except errors.ConfigurationError:
4899       raise errors.OpRetryError("There has been a modification to the"
4900                                 " config file and the operation has been"
4901                                 " aborted. Please retry.")
4902
4903
4904 class LUDelTags(TagsLU):
4905   """Delete a list of tags from a given object.
4906
4907   """
4908   _OP_REQP = ["kind", "name", "tags"]
4909
4910   def CheckPrereq(self):
4911     """Check prerequisites.
4912
4913     This checks that we have the given tag.
4914
4915     """
4916     TagsLU.CheckPrereq(self)
4917     for tag in self.op.tags:
4918       objects.TaggableObject.ValidateTag(tag)
4919     del_tags = frozenset(self.op.tags)
4920     cur_tags = self.target.GetTags()
4921     if not del_tags <= cur_tags:
4922       diff_tags = del_tags - cur_tags
4923       diff_names = ["'%s'" % tag for tag in diff_tags]
4924       diff_names.sort()
4925       raise errors.OpPrereqError("Tag(s) %s not found" %
4926                                  (",".join(diff_names)))
4927
4928   def Exec(self, feedback_fn):
4929     """Remove the tag from the object.
4930
4931     """
4932     for tag in self.op.tags:
4933       self.target.RemoveTag(tag)
4934     try:
4935       self.cfg.Update(self.target)
4936     except errors.ConfigurationError:
4937       raise errors.OpRetryError("There has been a modification to the"
4938                                 " config file and the operation has been"
4939                                 " aborted. Please retry.")
4940
4941 class LUTestDelay(NoHooksLU):
4942   """Sleep for a specified amount of time.
4943
4944   This LU sleeps on the master and/or nodes for a specified amoutn of
4945   time.
4946
4947   """
4948   _OP_REQP = ["duration", "on_master", "on_nodes"]
4949
4950   def CheckPrereq(self):
4951     """Check prerequisites.
4952
4953     This checks that we have a good list of nodes and/or the duration
4954     is valid.
4955
4956     """
4957
4958     if self.op.on_nodes:
4959       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4960
4961   def Exec(self, feedback_fn):
4962     """Do the actual sleep.
4963
4964     """
4965     if self.op.on_master:
4966       if not utils.TestDelay(self.op.duration):
4967         raise errors.OpExecError("Error during master delay test")
4968     if self.op.on_nodes:
4969       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4970       if not result:
4971         raise errors.OpExecError("Complete failure from rpc call")
4972       for node, node_result in result.items():
4973         if not node_result:
4974           raise errors.OpExecError("Failure during rpc call to node %s,"
4975                                    " result: %s" % (node, node_result))
4976
4977
4978 class IAllocator(object):
4979   """IAllocator framework.
4980
4981   An IAllocator instance has three sets of attributes:
4982     - cfg/sstore that are needed to query the cluster
4983     - input data (all members of the _KEYS class attribute are required)
4984     - four buffer attributes (in|out_data|text), that represent the
4985       input (to the external script) in text and data structure format,
4986       and the output from it, again in two formats
4987     - the result variables from the script (success, info, nodes) for
4988       easy usage
4989
4990   """
4991   _ALLO_KEYS = [
4992     "mem_size", "disks", "disk_template",
4993     "os", "tags", "nics", "vcpus",
4994     ]
4995   _RELO_KEYS = [
4996     "relocate_from",
4997     ]
4998
4999   def __init__(self, cfg, sstore, mode, name, **kwargs):
5000     self.cfg = cfg
5001     self.sstore = sstore
5002     # init buffer variables
5003     self.in_text = self.out_text = self.in_data = self.out_data = None
5004     # init all input fields so that pylint is happy
5005     self.mode = mode
5006     self.name = name
5007     self.mem_size = self.disks = self.disk_template = None
5008     self.os = self.tags = self.nics = self.vcpus = None
5009     self.relocate_from = None
5010     # computed fields
5011     self.required_nodes = None
5012     # init result fields
5013     self.success = self.info = self.nodes = None
5014     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5015       keyset = self._ALLO_KEYS
5016     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5017       keyset = self._RELO_KEYS
5018     else:
5019       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5020                                    " IAllocator" % self.mode)
5021     for key in kwargs:
5022       if key not in keyset:
5023         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5024                                      " IAllocator" % key)
5025       setattr(self, key, kwargs[key])
5026     for key in keyset:
5027       if key not in kwargs:
5028         raise errors.ProgrammerError("Missing input parameter '%s' to"
5029                                      " IAllocator" % key)
5030     self._BuildInputData()
5031
5032   def _ComputeClusterData(self):
5033     """Compute the generic allocator input data.
5034
5035     This is the data that is independent of the actual operation.
5036
5037     """
5038     cfg = self.cfg
5039     # cluster data
5040     data = {
5041       "version": 1,
5042       "cluster_name": self.sstore.GetClusterName(),
5043       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5044       "hypervisor_type": self.sstore.GetHypervisorType(),
5045       # we don't have job IDs
5046       }
5047
5048     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5049
5050     # node data
5051     node_results = {}
5052     node_list = cfg.GetNodeList()
5053     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5054     for nname in node_list:
5055       ninfo = cfg.GetNodeInfo(nname)
5056       if nname not in node_data or not isinstance(node_data[nname], dict):
5057         raise errors.OpExecError("Can't get data for node %s" % nname)
5058       remote_info = node_data[nname]
5059       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5060                    'vg_size', 'vg_free']:
5061         if attr not in remote_info:
5062           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5063                                    (nname, attr))
5064         try:
5065           remote_info[attr] = int(remote_info[attr])
5066         except ValueError, err:
5067           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5068                                    " %s" % (nname, attr, str(err)))
5069       # compute memory used by primary instances
5070       i_p_mem = i_p_up_mem = 0
5071       for iinfo in i_list:
5072         if iinfo.primary_node == nname:
5073           i_p_mem += iinfo.memory
5074           if iinfo.status == "up":
5075             i_p_up_mem += iinfo.memory
5076
5077       # compute memory used by instances
5078       pnr = {
5079         "tags": list(ninfo.GetTags()),
5080         "total_memory": remote_info['memory_total'],
5081         "reserved_memory": remote_info['memory_dom0'],
5082         "free_memory": remote_info['memory_free'],
5083         "i_pri_memory": i_p_mem,
5084         "i_pri_up_memory": i_p_up_mem,
5085         "total_disk": remote_info['vg_size'],
5086         "free_disk": remote_info['vg_free'],
5087         "primary_ip": ninfo.primary_ip,
5088         "secondary_ip": ninfo.secondary_ip,
5089         }
5090       node_results[nname] = pnr
5091     data["nodes"] = node_results
5092
5093     # instance data
5094     instance_data = {}
5095     for iinfo in i_list:
5096       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5097                   for n in iinfo.nics]
5098       pir = {
5099         "tags": list(iinfo.GetTags()),
5100         "should_run": iinfo.status == "up",
5101         "vcpus": iinfo.vcpus,
5102         "memory": iinfo.memory,
5103         "os": iinfo.os,
5104         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5105         "nics": nic_data,
5106         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5107         "disk_template": iinfo.disk_template,
5108         }
5109       instance_data[iinfo.name] = pir
5110
5111     data["instances"] = instance_data
5112
5113     self.in_data = data
5114
5115   def _AddNewInstance(self):
5116     """Add new instance data to allocator structure.
5117
5118     This in combination with _AllocatorGetClusterData will create the
5119     correct structure needed as input for the allocator.
5120
5121     The checks for the completeness of the opcode must have already been
5122     done.
5123
5124     """
5125     data = self.in_data
5126     if len(self.disks) != 2:
5127       raise errors.OpExecError("Only two-disk configurations supported")
5128
5129     disk_space = _ComputeDiskSize(self.disk_template,
5130                                   self.disks[0]["size"], self.disks[1]["size"])
5131
5132     if self.disk_template in constants.DTS_NET_MIRROR:
5133       self.required_nodes = 2
5134     else:
5135       self.required_nodes = 1
5136     request = {
5137       "type": "allocate",
5138       "name": self.name,
5139       "disk_template": self.disk_template,
5140       "tags": self.tags,
5141       "os": self.os,
5142       "vcpus": self.vcpus,
5143       "memory": self.mem_size,
5144       "disks": self.disks,
5145       "disk_space_total": disk_space,
5146       "nics": self.nics,
5147       "required_nodes": self.required_nodes,
5148       }
5149     data["request"] = request
5150
5151   def _AddRelocateInstance(self):
5152     """Add relocate instance data to allocator structure.
5153
5154     This in combination with _IAllocatorGetClusterData will create the
5155     correct structure needed as input for the allocator.
5156
5157     The checks for the completeness of the opcode must have already been
5158     done.
5159
5160     """
5161     instance = self.cfg.GetInstanceInfo(self.name)
5162     if instance is None:
5163       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5164                                    " IAllocator" % self.name)
5165
5166     if instance.disk_template not in constants.DTS_NET_MIRROR:
5167       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5168
5169     if len(instance.secondary_nodes) != 1:
5170       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5171
5172     self.required_nodes = 1
5173
5174     disk_space = _ComputeDiskSize(instance.disk_template,
5175                                   instance.disks[0].size,
5176                                   instance.disks[1].size)
5177
5178     request = {
5179       "type": "relocate",
5180       "name": self.name,
5181       "disk_space_total": disk_space,
5182       "required_nodes": self.required_nodes,
5183       "relocate_from": self.relocate_from,
5184       }
5185     self.in_data["request"] = request
5186
5187   def _BuildInputData(self):
5188     """Build input data structures.
5189
5190     """
5191     self._ComputeClusterData()
5192
5193     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5194       self._AddNewInstance()
5195     else:
5196       self._AddRelocateInstance()
5197
5198     self.in_text = serializer.Dump(self.in_data)
5199
5200   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5201     """Run an instance allocator and return the results.
5202
5203     """
5204     data = self.in_text
5205
5206     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5207
5208     if not isinstance(result, tuple) or len(result) != 4:
5209       raise errors.OpExecError("Invalid result from master iallocator runner")
5210
5211     rcode, stdout, stderr, fail = result
5212
5213     if rcode == constants.IARUN_NOTFOUND:
5214       raise errors.OpExecError("Can't find allocator '%s'" % name)
5215     elif rcode == constants.IARUN_FAILURE:
5216         raise errors.OpExecError("Instance allocator call failed: %s,"
5217                                  " output: %s" %
5218                                  (fail, stdout+stderr))
5219     self.out_text = stdout
5220     if validate:
5221       self._ValidateResult()
5222
5223   def _ValidateResult(self):
5224     """Process the allocator results.
5225
5226     This will process and if successful save the result in
5227     self.out_data and the other parameters.
5228
5229     """
5230     try:
5231       rdict = serializer.Load(self.out_text)
5232     except Exception, err:
5233       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5234
5235     if not isinstance(rdict, dict):
5236       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5237
5238     for key in "success", "info", "nodes":
5239       if key not in rdict:
5240         raise errors.OpExecError("Can't parse iallocator results:"
5241                                  " missing key '%s'" % key)
5242       setattr(self, key, rdict[key])
5243
5244     if not isinstance(rdict["nodes"], list):
5245       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5246                                " is not a list")
5247     self.out_data = rdict
5248
5249
5250 class LUTestAllocator(NoHooksLU):
5251   """Run allocator tests.
5252
5253   This LU runs the allocator tests
5254
5255   """
5256   _OP_REQP = ["direction", "mode", "name"]
5257
5258   def CheckPrereq(self):
5259     """Check prerequisites.
5260
5261     This checks the opcode parameters depending on the director and mode test.
5262
5263     """
5264     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5265       for attr in ["name", "mem_size", "disks", "disk_template",
5266                    "os", "tags", "nics", "vcpus"]:
5267         if not hasattr(self.op, attr):
5268           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5269                                      attr)
5270       iname = self.cfg.ExpandInstanceName(self.op.name)
5271       if iname is not None:
5272         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5273                                    iname)
5274       if not isinstance(self.op.nics, list):
5275         raise errors.OpPrereqError("Invalid parameter 'nics'")
5276       for row in self.op.nics:
5277         if (not isinstance(row, dict) or
5278             "mac" not in row or
5279             "ip" not in row or
5280             "bridge" not in row):
5281           raise errors.OpPrereqError("Invalid contents of the"
5282                                      " 'nics' parameter")
5283       if not isinstance(self.op.disks, list):
5284         raise errors.OpPrereqError("Invalid parameter 'disks'")
5285       if len(self.op.disks) != 2:
5286         raise errors.OpPrereqError("Only two-disk configurations supported")
5287       for row in self.op.disks:
5288         if (not isinstance(row, dict) or
5289             "size" not in row or
5290             not isinstance(row["size"], int) or
5291             "mode" not in row or
5292             row["mode"] not in ['r', 'w']):
5293           raise errors.OpPrereqError("Invalid contents of the"
5294                                      " 'disks' parameter")
5295     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5296       if not hasattr(self.op, "name"):
5297         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5298       fname = self.cfg.ExpandInstanceName(self.op.name)
5299       if fname is None:
5300         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5301                                    self.op.name)
5302       self.op.name = fname
5303       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5304     else:
5305       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5306                                  self.op.mode)
5307
5308     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5309       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5310         raise errors.OpPrereqError("Missing allocator name")
5311     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5312       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5313                                  self.op.direction)
5314
5315   def Exec(self, feedback_fn):
5316     """Run the allocator test.
5317
5318     """
5319     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5320       ial = IAllocator(self.cfg, self.sstore,
5321                        mode=self.op.mode,
5322                        name=self.op.name,
5323                        mem_size=self.op.mem_size,
5324                        disks=self.op.disks,
5325                        disk_template=self.op.disk_template,
5326                        os=self.op.os,
5327                        tags=self.op.tags,
5328                        nics=self.op.nics,
5329                        vcpus=self.op.vcpus,
5330                        )
5331     else:
5332       ial = IAllocator(self.cfg, self.sstore,
5333                        mode=self.op.mode,
5334                        name=self.op.name,
5335                        relocate_from=list(self.relocate_from),
5336                        )
5337
5338     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5339       result = ial.in_text
5340     else:
5341       ial.Run(self.op.allocator, validate=False)
5342       result = ial.out_text
5343     return result