Move string formatting out of LUQueryInstances
[ganeti-local] / lib / cmdlib.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import socket
30 import time
31 import tempfile
32 import re
33 import platform
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.processor = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     for attr_name in self._OP_REQP:
78       attr_val = getattr(op, attr_name, None)
79       if attr_val is None:
80         raise errors.OpPrereqError("Required parameter '%s' missing" %
81                                    attr_name)
82     if self.REQ_CLUSTER:
83       if not cfg.IsCluster():
84         raise errors.OpPrereqError("Cluster not initialized yet,"
85                                    " use 'gnt-cluster init' first.")
86       if self.REQ_MASTER:
87         master = sstore.GetMasterNode()
88         if master != socket.gethostname():
89           raise errors.OpPrereqError("Commands must be run on the master"
90                                      " node %s" % master)
91
92   def CheckPrereq(self):
93     """Check prerequisites for this LU.
94
95     This method should check that the prerequisites for the execution
96     of this LU are fulfilled. It can do internode communication, but
97     it should be idempotent - no cluster or system changes are
98     allowed.
99
100     The method should raise errors.OpPrereqError in case something is
101     not fulfilled. Its return value is ignored.
102
103     This method should also update all the parameters of the opcode to
104     their canonical form; e.g. a short node name must be fully
105     expanded after this method has successfully completed (so that
106     hooks, logging, etc. work correctly).
107
108     """
109     raise NotImplementedError
110
111   def Exec(self, feedback_fn):
112     """Execute the LU.
113
114     This method should implement the actual work. It should raise
115     errors.OpExecError for failures that are somewhat dealt with in
116     code, or expected.
117
118     """
119     raise NotImplementedError
120
121   def BuildHooksEnv(self):
122     """Build hooks environment for this LU.
123
124     This method should return a three-node tuple consisting of: a dict
125     containing the environment that will be used for running the
126     specific hook for this LU, a list of node names on which the hook
127     should run before the execution, and a list of node names on which
128     the hook should run after the execution.
129
130     The keys of the dict must not have 'GANETI_' prefixed as this will
131     be handled in the hooks runner. Also note additional keys will be
132     added by the hooks runner. If the LU doesn't define any
133     environment, an empty dict (and not None) should be returned.
134
135     As for the node lists, the master should not be included in the
136     them, as it will be added by the hooks runner in case this LU
137     requires a cluster to run on (otherwise we don't have a node
138     list). No nodes should be returned as an empty list (and not
139     None).
140
141     Note that if the HPATH for a LU class is None, this function will
142     not be called.
143
144     """
145     raise NotImplementedError
146
147
148 class NoHooksLU(LogicalUnit):
149   """Simple LU which runs no hooks.
150
151   This LU is intended as a parent for other LogicalUnits which will
152   run no hooks, in order to reduce duplicate code.
153
154   """
155   HPATH = None
156   HTYPE = None
157
158   def BuildHooksEnv(self):
159     """Build hooks env.
160
161     This is a no-op, since we don't run hooks.
162
163     """
164     return
165
166
167 def _GetWantedNodes(lu, nodes):
168   """Returns list of checked and expanded nodes.
169
170   Args:
171     nodes: List of nodes (strings) or None for all
172
173   """
174   if not isinstance(nodes, list):
175     raise errors.OpPrereqError("Invalid argument type 'nodes'")
176
177   if nodes:
178     wanted = []
179
180     for name in nodes:
181       node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
182       if node is None:
183         raise errors.OpPrereqError("No such node name '%s'" % name)
184       wanted.append(node)
185
186   else:
187     wanted = [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
188   return wanted
189
190
191 def _GetWantedInstances(lu, instances):
192   """Returns list of checked and expanded instances.
193
194   Args:
195     instances: List of instances (strings) or None for all
196
197   """
198   if not isinstance(instances, list):
199     raise errors.OpPrereqError("Invalid argument type 'instances'")
200
201   if instances:
202     wanted = []
203
204     for name in instances:
205       instance = lu.cfg.GetInstanceInfo(lu.cfg.ExpandInstanceName(name))
206       if instance is None:
207         raise errors.OpPrereqError("No such instance name '%s'" % name)
208       wanted.append(instance)
209
210   else:
211     wanted = [lu.cfg.GetInstanceInfo(name)
212               for name in lu.cfg.GetInstanceList()]
213   return wanted
214
215
216 def _CheckOutputFields(static, dynamic, selected):
217   """Checks whether all selected fields are valid.
218
219   Args:
220     static: Static fields
221     dynamic: Dynamic fields
222
223   """
224   static_fields = frozenset(static)
225   dynamic_fields = frozenset(dynamic)
226
227   all_fields = static_fields | dynamic_fields
228
229   if not all_fields.issuperset(selected):
230     raise errors.OpPrereqError("Unknown output fields selected: %s"
231                                % ",".join(frozenset(selected).
232                                           difference(all_fields)))
233
234
235 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
236                           memory, vcpus, nics):
237   """Builds instance related env variables for hooks from single variables.
238
239   Args:
240     secondary_nodes: List of secondary nodes as strings
241   """
242   env = {
243     "INSTANCE_NAME": name,
244     "INSTANCE_PRIMARY": primary_node,
245     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
246     "INSTANCE_OS_TYPE": os_type,
247     "INSTANCE_STATUS": status,
248     "INSTANCE_MEMORY": memory,
249     "INSTANCE_VCPUS": vcpus,
250   }
251
252   if nics:
253     nic_count = len(nics)
254     for idx, (ip, bridge) in enumerate(nics):
255       if ip is None:
256         ip = ""
257       env["INSTANCE_NIC%d_IP" % idx] = ip
258       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
259   else:
260     nic_count = 0
261
262   env["INSTANCE_NIC_COUNT"] = nic_count
263
264   return env
265
266
267 def _BuildInstanceHookEnvByObject(instance, override=None):
268   """Builds instance related env variables for hooks from an object.
269
270   Args:
271     instance: objects.Instance object of instance
272     override: dict of values to override
273   """
274   args = {
275     'name': instance.name,
276     'primary_node': instance.primary_node,
277     'secondary_nodes': instance.secondary_nodes,
278     'os_type': instance.os,
279     'status': instance.os,
280     'memory': instance.memory,
281     'vcpus': instance.vcpus,
282     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
283   }
284   if override:
285     args.update(override)
286   return _BuildInstanceHookEnv(**args)
287
288
289 def _UpdateEtcHosts(fullnode, ip):
290   """Ensure a node has a correct entry in /etc/hosts.
291
292   Args:
293     fullnode - Fully qualified domain name of host. (str)
294     ip       - IPv4 address of host (str)
295
296   """
297   node = fullnode.split(".", 1)[0]
298
299   f = open('/etc/hosts', 'r+')
300
301   inthere = False
302
303   save_lines = []
304   add_lines = []
305   removed = False
306
307   while True:
308     rawline = f.readline()
309
310     if not rawline:
311       # End of file
312       break
313
314     line = rawline.split('\n')[0]
315
316     # Strip off comments
317     line = line.split('#')[0]
318
319     if not line:
320       # Entire line was comment, skip
321       save_lines.append(rawline)
322       continue
323
324     fields = line.split()
325
326     haveall = True
327     havesome = False
328     for spec in [ ip, fullnode, node ]:
329       if spec not in fields:
330         haveall = False
331       if spec in fields:
332         havesome = True
333
334     if haveall:
335       inthere = True
336       save_lines.append(rawline)
337       continue
338
339     if havesome and not haveall:
340       # Line (old, or manual?) which is missing some.  Remove.
341       removed = True
342       continue
343
344     save_lines.append(rawline)
345
346   if not inthere:
347     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
348
349   if removed:
350     if add_lines:
351       save_lines = save_lines + add_lines
352
353     # We removed a line, write a new file and replace old.
354     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
355     newfile = os.fdopen(fd, 'w')
356     newfile.write(''.join(save_lines))
357     newfile.close()
358     os.rename(tmpname, '/etc/hosts')
359
360   elif add_lines:
361     # Simply appending a new line will do the trick.
362     f.seek(0, 2)
363     for add in add_lines:
364       f.write(add)
365
366   f.close()
367
368
369 def _UpdateKnownHosts(fullnode, ip, pubkey):
370   """Ensure a node has a correct known_hosts entry.
371
372   Args:
373     fullnode - Fully qualified domain name of host. (str)
374     ip       - IPv4 address of host (str)
375     pubkey   - the public key of the cluster
376
377   """
378   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
379     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
380   else:
381     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
382
383   inthere = False
384
385   save_lines = []
386   add_lines = []
387   removed = False
388
389   while True:
390     rawline = f.readline()
391     logger.Debug('read %s' % (repr(rawline),))
392
393     if not rawline:
394       # End of file
395       break
396
397     line = rawline.split('\n')[0]
398
399     parts = line.split(' ')
400     fields = parts[0].split(',')
401     key = parts[2]
402
403     haveall = True
404     havesome = False
405     for spec in [ ip, fullnode ]:
406       if spec not in fields:
407         haveall = False
408       if spec in fields:
409         havesome = True
410
411     logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
412     if haveall and key == pubkey:
413       inthere = True
414       save_lines.append(rawline)
415       logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
416       continue
417
418     if havesome and (not haveall or key != pubkey):
419       removed = True
420       logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
421       continue
422
423     save_lines.append(rawline)
424
425   if not inthere:
426     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
427     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
428
429   if removed:
430     save_lines = save_lines + add_lines
431
432     # Write a new file and replace old.
433     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
434                                    constants.DATA_DIR)
435     newfile = os.fdopen(fd, 'w')
436     try:
437       newfile.write(''.join(save_lines))
438     finally:
439       newfile.close()
440     logger.Debug("Wrote new known_hosts.")
441     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
442
443   elif add_lines:
444     # Simply appending a new line will do the trick.
445     f.seek(0, 2)
446     for add in add_lines:
447       f.write(add)
448
449   f.close()
450
451
452 def _HasValidVG(vglist, vgname):
453   """Checks if the volume group list is valid.
454
455   A non-None return value means there's an error, and the return value
456   is the error message.
457
458   """
459   vgsize = vglist.get(vgname, None)
460   if vgsize is None:
461     return "volume group '%s' missing" % vgname
462   elif vgsize < 20480:
463     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
464             (vgname, vgsize))
465   return None
466
467
468 def _InitSSHSetup(node):
469   """Setup the SSH configuration for the cluster.
470
471
472   This generates a dsa keypair for root, adds the pub key to the
473   permitted hosts and adds the hostkey to its own known hosts.
474
475   Args:
476     node: the name of this host as a fqdn
477
478   """
479   if os.path.exists('/root/.ssh/id_dsa'):
480     utils.CreateBackup('/root/.ssh/id_dsa')
481   if os.path.exists('/root/.ssh/id_dsa.pub'):
482     utils.CreateBackup('/root/.ssh/id_dsa.pub')
483
484   utils.RemoveFile('/root/.ssh/id_dsa')
485   utils.RemoveFile('/root/.ssh/id_dsa.pub')
486
487   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
488                          "-f", "/root/.ssh/id_dsa",
489                          "-q", "-N", ""])
490   if result.failed:
491     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
492                              result.output)
493
494   f = open('/root/.ssh/id_dsa.pub', 'r')
495   try:
496     utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
497   finally:
498     f.close()
499
500
501 def _InitGanetiServerSetup(ss):
502   """Setup the necessary configuration for the initial node daemon.
503
504   This creates the nodepass file containing the shared password for
505   the cluster and also generates the SSL certificate.
506
507   """
508   # Create pseudo random password
509   randpass = sha.new(os.urandom(64)).hexdigest()
510   # and write it into sstore
511   ss.SetKey(ss.SS_NODED_PASS, randpass)
512
513   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
514                          "-days", str(365*5), "-nodes", "-x509",
515                          "-keyout", constants.SSL_CERT_FILE,
516                          "-out", constants.SSL_CERT_FILE, "-batch"])
517   if result.failed:
518     raise errors.OpExecError("could not generate server ssl cert, command"
519                              " %s had exitcode %s and error message %s" %
520                              (result.cmd, result.exit_code, result.output))
521
522   os.chmod(constants.SSL_CERT_FILE, 0400)
523
524   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
525
526   if result.failed:
527     raise errors.OpExecError("Could not start the node daemon, command %s"
528                              " had exitcode %s and error %s" %
529                              (result.cmd, result.exit_code, result.output))
530
531
532 class LUInitCluster(LogicalUnit):
533   """Initialise the cluster.
534
535   """
536   HPATH = "cluster-init"
537   HTYPE = constants.HTYPE_CLUSTER
538   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
539               "def_bridge", "master_netdev"]
540   REQ_CLUSTER = False
541
542   def BuildHooksEnv(self):
543     """Build hooks env.
544
545     Notes: Since we don't require a cluster, we must manually add
546     ourselves in the post-run node list.
547
548     """
549     env = {
550       "CLUSTER": self.op.cluster_name,
551       "MASTER": self.hostname['hostname_full'],
552       }
553     return env, [], [self.hostname['hostname_full']]
554
555   def CheckPrereq(self):
556     """Verify that the passed name is a valid one.
557
558     """
559     if config.ConfigWriter.IsCluster():
560       raise errors.OpPrereqError("Cluster is already initialised")
561
562     hostname_local = socket.gethostname()
563     self.hostname = hostname = utils.LookupHostname(hostname_local)
564     if not hostname:
565       raise errors.OpPrereqError("Cannot resolve my own hostname ('%s')" %
566                                  hostname_local)
567
568     if hostname["hostname_full"] != hostname_local:
569       raise errors.OpPrereqError("My own hostname (%s) does not match the"
570                                  " resolver (%s): probably not using FQDN"
571                                  " for hostname." %
572                                  (hostname_local, hostname["hostname_full"]))
573
574     if hostname["ip"].startswith("127."):
575       raise errors.OpPrereqError("This host's IP resolves to the private"
576                                  " range (%s). Please fix DNS or /etc/hosts." %
577                                  (hostname["ip"],))
578
579     self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
580     if not clustername:
581       raise errors.OpPrereqError("Cannot resolve given cluster name ('%s')"
582                                  % self.op.cluster_name)
583
584     result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
585     if result.failed:
586       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
587                                  " to %s,\nbut this ip address does not"
588                                  " belong to this host."
589                                  " Aborting." % hostname['ip'])
590
591     secondary_ip = getattr(self.op, "secondary_ip", None)
592     if secondary_ip and not utils.IsValidIP(secondary_ip):
593       raise errors.OpPrereqError("Invalid secondary ip given")
594     if secondary_ip and secondary_ip != hostname['ip']:
595       result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
596       if result.failed:
597         raise errors.OpPrereqError("You gave %s as secondary IP,\n"
598                                    "but it does not belong to this host." %
599                                    secondary_ip)
600     self.secondary_ip = secondary_ip
601
602     # checks presence of the volume group given
603     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
604
605     if vgstatus:
606       raise errors.OpPrereqError("Error: %s" % vgstatus)
607
608     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
609                     self.op.mac_prefix):
610       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
611                                  self.op.mac_prefix)
612
613     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
614       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
615                                  self.op.hypervisor_type)
616
617     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
618     if result.failed:
619       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
620                                  (self.op.master_netdev,
621                                   result.output.strip()))
622
623   def Exec(self, feedback_fn):
624     """Initialize the cluster.
625
626     """
627     clustername = self.clustername
628     hostname = self.hostname
629
630     # set up the simple store
631     ss = ssconf.SimpleStore()
632     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
633     ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
634     ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
635     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
636     ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
637
638     # set up the inter-node password and certificate
639     _InitGanetiServerSetup(ss)
640
641     # start the master ip
642     rpc.call_node_start_master(hostname['hostname_full'])
643
644     # set up ssh config and /etc/hosts
645     f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
646     try:
647       sshline = f.read()
648     finally:
649       f.close()
650     sshkey = sshline.split(" ")[1]
651
652     _UpdateEtcHosts(hostname['hostname_full'],
653                     hostname['ip'],
654                     )
655
656     _UpdateKnownHosts(hostname['hostname_full'],
657                       hostname['ip'],
658                       sshkey,
659                       )
660
661     _InitSSHSetup(hostname['hostname'])
662
663     # init of cluster config file
664     cfgw = config.ConfigWriter()
665     cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
666                     sshkey, self.op.mac_prefix,
667                     self.op.vg_name, self.op.def_bridge)
668
669
670 class LUDestroyCluster(NoHooksLU):
671   """Logical unit for destroying the cluster.
672
673   """
674   _OP_REQP = []
675
676   def CheckPrereq(self):
677     """Check prerequisites.
678
679     This checks whether the cluster is empty.
680
681     Any errors are signalled by raising errors.OpPrereqError.
682
683     """
684     master = self.sstore.GetMasterNode()
685
686     nodelist = self.cfg.GetNodeList()
687     if len(nodelist) != 1 or nodelist[0] != master:
688       raise errors.OpPrereqError("There are still %d node(s) in"
689                                  " this cluster." % (len(nodelist) - 1))
690     instancelist = self.cfg.GetInstanceList()
691     if instancelist:
692       raise errors.OpPrereqError("There are still %d instance(s) in"
693                                  " this cluster." % len(instancelist))
694
695   def Exec(self, feedback_fn):
696     """Destroys the cluster.
697
698     """
699     utils.CreateBackup('/root/.ssh/id_dsa')
700     utils.CreateBackup('/root/.ssh/id_dsa.pub')
701     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
702
703
704 class LUVerifyCluster(NoHooksLU):
705   """Verifies the cluster status.
706
707   """
708   _OP_REQP = []
709
710   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
711                   remote_version, feedback_fn):
712     """Run multiple tests against a node.
713
714     Test list:
715       - compares ganeti version
716       - checks vg existance and size > 20G
717       - checks config file checksum
718       - checks ssh to other nodes
719
720     Args:
721       node: name of the node to check
722       file_list: required list of files
723       local_cksum: dictionary of local files and their checksums
724
725     """
726     # compares ganeti version
727     local_version = constants.PROTOCOL_VERSION
728     if not remote_version:
729       feedback_fn(" - ERROR: connection to %s failed" % (node))
730       return True
731
732     if local_version != remote_version:
733       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
734                       (local_version, node, remote_version))
735       return True
736
737     # checks vg existance and size > 20G
738
739     bad = False
740     if not vglist:
741       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
742                       (node,))
743       bad = True
744     else:
745       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
746       if vgstatus:
747         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
748         bad = True
749
750     # checks config file checksum
751     # checks ssh to any
752
753     if 'filelist' not in node_result:
754       bad = True
755       feedback_fn("  - ERROR: node hasn't returned file checksum data")
756     else:
757       remote_cksum = node_result['filelist']
758       for file_name in file_list:
759         if file_name not in remote_cksum:
760           bad = True
761           feedback_fn("  - ERROR: file '%s' missing" % file_name)
762         elif remote_cksum[file_name] != local_cksum[file_name]:
763           bad = True
764           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
765
766     if 'nodelist' not in node_result:
767       bad = True
768       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
769     else:
770       if node_result['nodelist']:
771         bad = True
772         for node in node_result['nodelist']:
773           feedback_fn("  - ERROR: communication with node '%s': %s" %
774                           (node, node_result['nodelist'][node]))
775     hyp_result = node_result.get('hypervisor', None)
776     if hyp_result is not None:
777       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
778     return bad
779
780   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
781     """Verify an instance.
782
783     This function checks to see if the required block devices are
784     available on the instance's node.
785
786     """
787     bad = False
788
789     instancelist = self.cfg.GetInstanceList()
790     if not instance in instancelist:
791       feedback_fn("  - ERROR: instance %s not in instance list %s" %
792                       (instance, instancelist))
793       bad = True
794
795     instanceconfig = self.cfg.GetInstanceInfo(instance)
796     node_current = instanceconfig.primary_node
797
798     node_vol_should = {}
799     instanceconfig.MapLVsByNode(node_vol_should)
800
801     for node in node_vol_should:
802       for volume in node_vol_should[node]:
803         if node not in node_vol_is or volume not in node_vol_is[node]:
804           feedback_fn("  - ERROR: volume %s missing on node %s" %
805                           (volume, node))
806           bad = True
807
808     if not instanceconfig.status == 'down':
809       if not instance in node_instance[node_current]:
810         feedback_fn("  - ERROR: instance %s not running on node %s" %
811                         (instance, node_current))
812         bad = True
813
814     for node in node_instance:
815       if (not node == node_current):
816         if instance in node_instance[node]:
817           feedback_fn("  - ERROR: instance %s should not run on node %s" %
818                           (instance, node))
819           bad = True
820
821     return not bad
822
823   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
824     """Verify if there are any unknown volumes in the cluster.
825
826     The .os, .swap and backup volumes are ignored. All other volumes are
827     reported as unknown.
828
829     """
830     bad = False
831
832     for node in node_vol_is:
833       for volume in node_vol_is[node]:
834         if node not in node_vol_should or volume not in node_vol_should[node]:
835           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
836                       (volume, node))
837           bad = True
838     return bad
839
840   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
841     """Verify the list of running instances.
842
843     This checks what instances are running but unknown to the cluster.
844
845     """
846     bad = False
847     for node in node_instance:
848       for runninginstance in node_instance[node]:
849         if runninginstance not in instancelist:
850           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
851                           (runninginstance, node))
852           bad = True
853     return bad
854
855   def CheckPrereq(self):
856     """Check prerequisites.
857
858     This has no prerequisites.
859
860     """
861     pass
862
863   def Exec(self, feedback_fn):
864     """Verify integrity of cluster, performing various test on nodes.
865
866     """
867     bad = False
868     feedback_fn("* Verifying global settings")
869     self.cfg.VerifyConfig()
870
871     master = self.sstore.GetMasterNode()
872     vg_name = self.cfg.GetVGName()
873     nodelist = utils.NiceSort(self.cfg.GetNodeList())
874     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
875     node_volume = {}
876     node_instance = {}
877
878     # FIXME: verify OS list
879     # do local checksums
880     file_names = list(self.sstore.GetFileList())
881     file_names.append(constants.SSL_CERT_FILE)
882     file_names.append(constants.CLUSTER_CONF_FILE)
883     local_checksums = utils.FingerprintFiles(file_names)
884
885     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
886     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
887     all_instanceinfo = rpc.call_instance_list(nodelist)
888     all_vglist = rpc.call_vg_list(nodelist)
889     node_verify_param = {
890       'filelist': file_names,
891       'nodelist': nodelist,
892       'hypervisor': None,
893       }
894     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
895     all_rversion = rpc.call_version(nodelist)
896
897     for node in nodelist:
898       feedback_fn("* Verifying node %s" % node)
899       result = self._VerifyNode(node, file_names, local_checksums,
900                                 all_vglist[node], all_nvinfo[node],
901                                 all_rversion[node], feedback_fn)
902       bad = bad or result
903
904       # node_volume
905       volumeinfo = all_volumeinfo[node]
906
907       if type(volumeinfo) != dict:
908         feedback_fn("  - ERROR: connection to %s failed" % (node,))
909         bad = True
910         continue
911
912       node_volume[node] = volumeinfo
913
914       # node_instance
915       nodeinstance = all_instanceinfo[node]
916       if type(nodeinstance) != list:
917         feedback_fn("  - ERROR: connection to %s failed" % (node,))
918         bad = True
919         continue
920
921       node_instance[node] = nodeinstance
922
923     node_vol_should = {}
924
925     for instance in instancelist:
926       feedback_fn("* Verifying instance %s" % instance)
927       result =  self._VerifyInstance(instance, node_volume, node_instance,
928                                      feedback_fn)
929       bad = bad or result
930
931       inst_config = self.cfg.GetInstanceInfo(instance)
932
933       inst_config.MapLVsByNode(node_vol_should)
934
935     feedback_fn("* Verifying orphan volumes")
936     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
937                                        feedback_fn)
938     bad = bad or result
939
940     feedback_fn("* Verifying remaining instances")
941     result = self._VerifyOrphanInstances(instancelist, node_instance,
942                                          feedback_fn)
943     bad = bad or result
944
945     return int(bad)
946
947
948 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
949   """Sleep and poll for an instance's disk to sync.
950
951   """
952   if not instance.disks:
953     return True
954
955   if not oneshot:
956     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
957
958   node = instance.primary_node
959
960   for dev in instance.disks:
961     cfgw.SetDiskID(dev, node)
962
963   retries = 0
964   while True:
965     max_time = 0
966     done = True
967     cumul_degraded = False
968     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
969     if not rstats:
970       logger.ToStderr("Can't get any data from node %s" % node)
971       retries += 1
972       if retries >= 10:
973         raise errors.RemoteError("Can't contact node %s for mirror data,"
974                                  " aborting." % node)
975       time.sleep(6)
976       continue
977     retries = 0
978     for i in range(len(rstats)):
979       mstat = rstats[i]
980       if mstat is None:
981         logger.ToStderr("Can't compute data for node %s/%s" %
982                         (node, instance.disks[i].iv_name))
983         continue
984       perc_done, est_time, is_degraded = mstat
985       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
986       if perc_done is not None:
987         done = False
988         if est_time is not None:
989           rem_time = "%d estimated seconds remaining" % est_time
990           max_time = est_time
991         else:
992           rem_time = "no time estimate"
993         logger.ToStdout("- device %s: %5.2f%% done, %s" %
994                         (instance.disks[i].iv_name, perc_done, rem_time))
995     if done or oneshot:
996       break
997
998     if unlock:
999       utils.Unlock('cmd')
1000     try:
1001       time.sleep(min(60, max_time))
1002     finally:
1003       if unlock:
1004         utils.Lock('cmd')
1005
1006   if done:
1007     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1008   return not cumul_degraded
1009
1010
1011 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1012   """Check that mirrors are not degraded.
1013
1014   """
1015   cfgw.SetDiskID(dev, node)
1016
1017   result = True
1018   if on_primary or dev.AssembleOnSecondary():
1019     rstats = rpc.call_blockdev_find(node, dev)
1020     if not rstats:
1021       logger.ToStderr("Can't get any data from node %s" % node)
1022       result = False
1023     else:
1024       result = result and (not rstats[5])
1025   if dev.children:
1026     for child in dev.children:
1027       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1028
1029   return result
1030
1031
1032 class LUDiagnoseOS(NoHooksLU):
1033   """Logical unit for OS diagnose/query.
1034
1035   """
1036   _OP_REQP = []
1037
1038   def CheckPrereq(self):
1039     """Check prerequisites.
1040
1041     This always succeeds, since this is a pure query LU.
1042
1043     """
1044     return
1045
1046   def Exec(self, feedback_fn):
1047     """Compute the list of OSes.
1048
1049     """
1050     node_list = self.cfg.GetNodeList()
1051     node_data = rpc.call_os_diagnose(node_list)
1052     if node_data == False:
1053       raise errors.OpExecError("Can't gather the list of OSes")
1054     return node_data
1055
1056
1057 class LURemoveNode(LogicalUnit):
1058   """Logical unit for removing a node.
1059
1060   """
1061   HPATH = "node-remove"
1062   HTYPE = constants.HTYPE_NODE
1063   _OP_REQP = ["node_name"]
1064
1065   def BuildHooksEnv(self):
1066     """Build hooks env.
1067
1068     This doesn't run on the target node in the pre phase as a failed
1069     node would not allows itself to run.
1070
1071     """
1072     env = {
1073       "NODE_NAME": self.op.node_name,
1074       }
1075     all_nodes = self.cfg.GetNodeList()
1076     all_nodes.remove(self.op.node_name)
1077     return env, all_nodes, all_nodes
1078
1079   def CheckPrereq(self):
1080     """Check prerequisites.
1081
1082     This checks:
1083      - the node exists in the configuration
1084      - it does not have primary or secondary instances
1085      - it's not the master
1086
1087     Any errors are signalled by raising errors.OpPrereqError.
1088
1089     """
1090     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1091     if node is None:
1092       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1093
1094     instance_list = self.cfg.GetInstanceList()
1095
1096     masternode = self.sstore.GetMasterNode()
1097     if node.name == masternode:
1098       raise errors.OpPrereqError("Node is the master node,"
1099                                  " you need to failover first.")
1100
1101     for instance_name in instance_list:
1102       instance = self.cfg.GetInstanceInfo(instance_name)
1103       if node.name == instance.primary_node:
1104         raise errors.OpPrereqError("Instance %s still running on the node,"
1105                                    " please remove first." % instance_name)
1106       if node.name in instance.secondary_nodes:
1107         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1108                                    " please remove first." % instance_name)
1109     self.op.node_name = node.name
1110     self.node = node
1111
1112   def Exec(self, feedback_fn):
1113     """Removes the node from the cluster.
1114
1115     """
1116     node = self.node
1117     logger.Info("stopping the node daemon and removing configs from node %s" %
1118                 node.name)
1119
1120     rpc.call_node_leave_cluster(node.name)
1121
1122     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1123
1124     logger.Info("Removing node %s from config" % node.name)
1125
1126     self.cfg.RemoveNode(node.name)
1127
1128
1129 class LUQueryNodes(NoHooksLU):
1130   """Logical unit for querying nodes.
1131
1132   """
1133   _OP_REQP = ["output_fields"]
1134
1135   def CheckPrereq(self):
1136     """Check prerequisites.
1137
1138     This checks that the fields required are valid output fields.
1139
1140     """
1141     self.dynamic_fields = frozenset(["dtotal", "dfree",
1142                                      "mtotal", "mnode", "mfree"])
1143
1144     _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1145                        dynamic=self.dynamic_fields,
1146                        selected=self.op.output_fields)
1147
1148
1149   def Exec(self, feedback_fn):
1150     """Computes the list of nodes and their attributes.
1151
1152     """
1153     nodenames = utils.NiceSort(self.cfg.GetNodeList())
1154     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1155
1156
1157     # begin data gathering
1158
1159     if self.dynamic_fields.intersection(self.op.output_fields):
1160       live_data = {}
1161       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1162       for name in nodenames:
1163         nodeinfo = node_data.get(name, None)
1164         if nodeinfo:
1165           live_data[name] = {
1166             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1167             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1168             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1169             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1170             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1171             }
1172         else:
1173           live_data[name] = {}
1174     else:
1175       live_data = dict.fromkeys(nodenames, {})
1176
1177     node_to_primary = dict.fromkeys(nodenames, 0)
1178     node_to_secondary = dict.fromkeys(nodenames, 0)
1179
1180     if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1181       instancelist = self.cfg.GetInstanceList()
1182
1183       for instance in instancelist:
1184         instanceinfo = self.cfg.GetInstanceInfo(instance)
1185         node_to_primary[instanceinfo.primary_node] += 1
1186         for secnode in instanceinfo.secondary_nodes:
1187           node_to_secondary[secnode] += 1
1188
1189     # end data gathering
1190
1191     output = []
1192     for node in nodelist:
1193       node_output = []
1194       for field in self.op.output_fields:
1195         if field == "name":
1196           val = node.name
1197         elif field == "pinst":
1198           val = node_to_primary[node.name]
1199         elif field == "sinst":
1200           val = node_to_secondary[node.name]
1201         elif field == "pip":
1202           val = node.primary_ip
1203         elif field == "sip":
1204           val = node.secondary_ip
1205         elif field in self.dynamic_fields:
1206           val = live_data[node.name].get(field, "?")
1207         else:
1208           raise errors.ParameterError(field)
1209         val = str(val)
1210         node_output.append(val)
1211       output.append(node_output)
1212
1213     return output
1214
1215
1216 class LUQueryNodeVolumes(NoHooksLU):
1217   """Logical unit for getting volumes on node(s).
1218
1219   """
1220   _OP_REQP = ["nodes", "output_fields"]
1221
1222   def CheckPrereq(self):
1223     """Check prerequisites.
1224
1225     This checks that the fields required are valid output fields.
1226
1227     """
1228     self.nodes = _GetWantedNodes(self, self.op.nodes)
1229
1230     _CheckOutputFields(static=["node"],
1231                        dynamic=["phys", "vg", "name", "size", "instance"],
1232                        selected=self.op.output_fields)
1233
1234
1235   def Exec(self, feedback_fn):
1236     """Computes the list of nodes and their attributes.
1237
1238     """
1239     nodenames = utils.NiceSort([node.name for node in self.nodes])
1240     volumes = rpc.call_node_volumes(nodenames)
1241
1242     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1243              in self.cfg.GetInstanceList()]
1244
1245     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1246
1247     output = []
1248     for node in nodenames:
1249       if node not in volumes or not volumes[node]:
1250         continue
1251
1252       node_vols = volumes[node][:]
1253       node_vols.sort(key=lambda vol: vol['dev'])
1254
1255       for vol in node_vols:
1256         node_output = []
1257         for field in self.op.output_fields:
1258           if field == "node":
1259             val = node
1260           elif field == "phys":
1261             val = vol['dev']
1262           elif field == "vg":
1263             val = vol['vg']
1264           elif field == "name":
1265             val = vol['name']
1266           elif field == "size":
1267             val = int(float(vol['size']))
1268           elif field == "instance":
1269             for inst in ilist:
1270               if node not in lv_by_node[inst]:
1271                 continue
1272               if vol['name'] in lv_by_node[inst][node]:
1273                 val = inst.name
1274                 break
1275             else:
1276               val = '-'
1277           else:
1278             raise errors.ParameterError(field)
1279           node_output.append(str(val))
1280
1281         output.append(node_output)
1282
1283     return output
1284
1285
1286 class LUAddNode(LogicalUnit):
1287   """Logical unit for adding node to the cluster.
1288
1289   """
1290   HPATH = "node-add"
1291   HTYPE = constants.HTYPE_NODE
1292   _OP_REQP = ["node_name"]
1293
1294   def BuildHooksEnv(self):
1295     """Build hooks env.
1296
1297     This will run on all nodes before, and on all nodes + the new node after.
1298
1299     """
1300     env = {
1301       "NODE_NAME": self.op.node_name,
1302       "NODE_PIP": self.op.primary_ip,
1303       "NODE_SIP": self.op.secondary_ip,
1304       }
1305     nodes_0 = self.cfg.GetNodeList()
1306     nodes_1 = nodes_0 + [self.op.node_name, ]
1307     return env, nodes_0, nodes_1
1308
1309   def CheckPrereq(self):
1310     """Check prerequisites.
1311
1312     This checks:
1313      - the new node is not already in the config
1314      - it is resolvable
1315      - its parameters (single/dual homed) matches the cluster
1316
1317     Any errors are signalled by raising errors.OpPrereqError.
1318
1319     """
1320     node_name = self.op.node_name
1321     cfg = self.cfg
1322
1323     dns_data = utils.LookupHostname(node_name)
1324     if not dns_data:
1325       raise errors.OpPrereqError("Node %s is not resolvable" % node_name)
1326
1327     node = dns_data['hostname']
1328     primary_ip = self.op.primary_ip = dns_data['ip']
1329     secondary_ip = getattr(self.op, "secondary_ip", None)
1330     if secondary_ip is None:
1331       secondary_ip = primary_ip
1332     if not utils.IsValidIP(secondary_ip):
1333       raise errors.OpPrereqError("Invalid secondary IP given")
1334     self.op.secondary_ip = secondary_ip
1335     node_list = cfg.GetNodeList()
1336     if node in node_list:
1337       raise errors.OpPrereqError("Node %s is already in the configuration"
1338                                  % node)
1339
1340     for existing_node_name in node_list:
1341       existing_node = cfg.GetNodeInfo(existing_node_name)
1342       if (existing_node.primary_ip == primary_ip or
1343           existing_node.secondary_ip == primary_ip or
1344           existing_node.primary_ip == secondary_ip or
1345           existing_node.secondary_ip == secondary_ip):
1346         raise errors.OpPrereqError("New node ip address(es) conflict with"
1347                                    " existing node %s" % existing_node.name)
1348
1349     # check that the type of the node (single versus dual homed) is the
1350     # same as for the master
1351     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1352     master_singlehomed = myself.secondary_ip == myself.primary_ip
1353     newbie_singlehomed = secondary_ip == primary_ip
1354     if master_singlehomed != newbie_singlehomed:
1355       if master_singlehomed:
1356         raise errors.OpPrereqError("The master has no private ip but the"
1357                                    " new node has one")
1358       else:
1359         raise errors.OpPrereqError("The master has a private ip but the"
1360                                    " new node doesn't have one")
1361
1362     # checks reachablity
1363     command = ["fping", "-q", primary_ip]
1364     result = utils.RunCmd(command)
1365     if result.failed:
1366       raise errors.OpPrereqError("Node not reachable by ping")
1367
1368     if not newbie_singlehomed:
1369       # check reachability from my secondary ip to newbie's secondary ip
1370       command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1371       result = utils.RunCmd(command)
1372       if result.failed:
1373         raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1374
1375     self.new_node = objects.Node(name=node,
1376                                  primary_ip=primary_ip,
1377                                  secondary_ip=secondary_ip)
1378
1379   def Exec(self, feedback_fn):
1380     """Adds the new node to the cluster.
1381
1382     """
1383     new_node = self.new_node
1384     node = new_node.name
1385
1386     # set up inter-node password and certificate and restarts the node daemon
1387     gntpass = self.sstore.GetNodeDaemonPassword()
1388     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1389       raise errors.OpExecError("ganeti password corruption detected")
1390     f = open(constants.SSL_CERT_FILE)
1391     try:
1392       gntpem = f.read(8192)
1393     finally:
1394       f.close()
1395     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1396     # so we use this to detect an invalid certificate; as long as the
1397     # cert doesn't contain this, the here-document will be correctly
1398     # parsed by the shell sequence below
1399     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1400       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1401     if not gntpem.endswith("\n"):
1402       raise errors.OpExecError("PEM must end with newline")
1403     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1404
1405     # and then connect with ssh to set password and start ganeti-noded
1406     # note that all the below variables are sanitized at this point,
1407     # either by being constants or by the checks above
1408     ss = self.sstore
1409     mycommand = ("umask 077 && "
1410                  "echo '%s' > '%s' && "
1411                  "cat > '%s' << '!EOF.' && \n"
1412                  "%s!EOF.\n%s restart" %
1413                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1414                   constants.SSL_CERT_FILE, gntpem,
1415                   constants.NODE_INITD_SCRIPT))
1416
1417     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1418     if result.failed:
1419       raise errors.OpExecError("Remote command on node %s, error: %s,"
1420                                " output: %s" %
1421                                (node, result.fail_reason, result.output))
1422
1423     # check connectivity
1424     time.sleep(4)
1425
1426     result = rpc.call_version([node])[node]
1427     if result:
1428       if constants.PROTOCOL_VERSION == result:
1429         logger.Info("communication to node %s fine, sw version %s match" %
1430                     (node, result))
1431       else:
1432         raise errors.OpExecError("Version mismatch master version %s,"
1433                                  " node version %s" %
1434                                  (constants.PROTOCOL_VERSION, result))
1435     else:
1436       raise errors.OpExecError("Cannot get version from the new node")
1437
1438     # setup ssh on node
1439     logger.Info("copy ssh key to node %s" % node)
1440     keyarray = []
1441     keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1442                 "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1443                 "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1444
1445     for i in keyfiles:
1446       f = open(i, 'r')
1447       try:
1448         keyarray.append(f.read())
1449       finally:
1450         f.close()
1451
1452     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1453                                keyarray[3], keyarray[4], keyarray[5])
1454
1455     if not result:
1456       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1457
1458     # Add node to our /etc/hosts, and add key to known_hosts
1459     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1460     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1461                       self.cfg.GetHostKey())
1462
1463     if new_node.secondary_ip != new_node.primary_ip:
1464       result = ssh.SSHCall(node, "root",
1465                            "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1466       if result.failed:
1467         raise errors.OpExecError("Node claims it doesn't have the"
1468                                  " secondary ip you gave (%s).\n"
1469                                  "Please fix and re-run this command." %
1470                                  new_node.secondary_ip)
1471
1472     success, msg = ssh.VerifyNodeHostname(node)
1473     if not success:
1474       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1475                                " than the one the resolver gives: %s.\n"
1476                                "Please fix and re-run this command." %
1477                                (node, msg))
1478
1479     # Distribute updated /etc/hosts and known_hosts to all nodes,
1480     # including the node just added
1481     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1482     dist_nodes = self.cfg.GetNodeList() + [node]
1483     if myself.name in dist_nodes:
1484       dist_nodes.remove(myself.name)
1485
1486     logger.Debug("Copying hosts and known_hosts to all nodes")
1487     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1488       result = rpc.call_upload_file(dist_nodes, fname)
1489       for to_node in dist_nodes:
1490         if not result[to_node]:
1491           logger.Error("copy of file %s to node %s failed" %
1492                        (fname, to_node))
1493
1494     to_copy = ss.GetFileList()
1495     for fname in to_copy:
1496       if not ssh.CopyFileToNode(node, fname):
1497         logger.Error("could not copy file %s to node %s" % (fname, node))
1498
1499     logger.Info("adding node %s to cluster.conf" % node)
1500     self.cfg.AddNode(new_node)
1501
1502
1503 class LUMasterFailover(LogicalUnit):
1504   """Failover the master node to the current node.
1505
1506   This is a special LU in that it must run on a non-master node.
1507
1508   """
1509   HPATH = "master-failover"
1510   HTYPE = constants.HTYPE_CLUSTER
1511   REQ_MASTER = False
1512   _OP_REQP = []
1513
1514   def BuildHooksEnv(self):
1515     """Build hooks env.
1516
1517     This will run on the new master only in the pre phase, and on all
1518     the nodes in the post phase.
1519
1520     """
1521     env = {
1522       "NEW_MASTER": self.new_master,
1523       "OLD_MASTER": self.old_master,
1524       }
1525     return env, [self.new_master], self.cfg.GetNodeList()
1526
1527   def CheckPrereq(self):
1528     """Check prerequisites.
1529
1530     This checks that we are not already the master.
1531
1532     """
1533     self.new_master = socket.gethostname()
1534
1535     self.old_master = self.sstore.GetMasterNode()
1536
1537     if self.old_master == self.new_master:
1538       raise errors.OpPrereqError("This commands must be run on the node"
1539                                  " where you want the new master to be.\n"
1540                                  "%s is already the master" %
1541                                  self.old_master)
1542
1543   def Exec(self, feedback_fn):
1544     """Failover the master node.
1545
1546     This command, when run on a non-master node, will cause the current
1547     master to cease being master, and the non-master to become new
1548     master.
1549
1550     """
1551     #TODO: do not rely on gethostname returning the FQDN
1552     logger.Info("setting master to %s, old master: %s" %
1553                 (self.new_master, self.old_master))
1554
1555     if not rpc.call_node_stop_master(self.old_master):
1556       logger.Error("could disable the master role on the old master"
1557                    " %s, please disable manually" % self.old_master)
1558
1559     ss = self.sstore
1560     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1561     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1562                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1563       logger.Error("could not distribute the new simple store master file"
1564                    " to the other nodes, please check.")
1565
1566     if not rpc.call_node_start_master(self.new_master):
1567       logger.Error("could not start the master role on the new master"
1568                    " %s, please check" % self.new_master)
1569       feedback_fn("Error in activating the master IP on the new master,\n"
1570                   "please fix manually.")
1571
1572
1573
1574 class LUQueryClusterInfo(NoHooksLU):
1575   """Query cluster configuration.
1576
1577   """
1578   _OP_REQP = []
1579   REQ_MASTER = False
1580
1581   def CheckPrereq(self):
1582     """No prerequsites needed for this LU.
1583
1584     """
1585     pass
1586
1587   def Exec(self, feedback_fn):
1588     """Return cluster config.
1589
1590     """
1591     result = {
1592       "name": self.sstore.GetClusterName(),
1593       "software_version": constants.RELEASE_VERSION,
1594       "protocol_version": constants.PROTOCOL_VERSION,
1595       "config_version": constants.CONFIG_VERSION,
1596       "os_api_version": constants.OS_API_VERSION,
1597       "export_version": constants.EXPORT_VERSION,
1598       "master": self.sstore.GetMasterNode(),
1599       "architecture": (platform.architecture()[0], platform.machine()),
1600       }
1601
1602     return result
1603
1604
1605 class LUClusterCopyFile(NoHooksLU):
1606   """Copy file to cluster.
1607
1608   """
1609   _OP_REQP = ["nodes", "filename"]
1610
1611   def CheckPrereq(self):
1612     """Check prerequisites.
1613
1614     It should check that the named file exists and that the given list
1615     of nodes is valid.
1616
1617     """
1618     if not os.path.exists(self.op.filename):
1619       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1620
1621     self.nodes = _GetWantedNodes(self, self.op.nodes)
1622
1623   def Exec(self, feedback_fn):
1624     """Copy a file from master to some nodes.
1625
1626     Args:
1627       opts - class with options as members
1628       args - list containing a single element, the file name
1629     Opts used:
1630       nodes - list containing the name of target nodes; if empty, all nodes
1631
1632     """
1633     filename = self.op.filename
1634
1635     myname = socket.gethostname()
1636
1637     for node in [node.name for node in self.nodes]:
1638       if node == myname:
1639         continue
1640       if not ssh.CopyFileToNode(node, filename):
1641         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1642
1643
1644 class LUDumpClusterConfig(NoHooksLU):
1645   """Return a text-representation of the cluster-config.
1646
1647   """
1648   _OP_REQP = []
1649
1650   def CheckPrereq(self):
1651     """No prerequisites.
1652
1653     """
1654     pass
1655
1656   def Exec(self, feedback_fn):
1657     """Dump a representation of the cluster config to the standard output.
1658
1659     """
1660     return self.cfg.DumpConfig()
1661
1662
1663 class LURunClusterCommand(NoHooksLU):
1664   """Run a command on some nodes.
1665
1666   """
1667   _OP_REQP = ["command", "nodes"]
1668
1669   def CheckPrereq(self):
1670     """Check prerequisites.
1671
1672     It checks that the given list of nodes is valid.
1673
1674     """
1675     self.nodes = _GetWantedNodes(self, self.op.nodes)
1676
1677   def Exec(self, feedback_fn):
1678     """Run a command on some nodes.
1679
1680     """
1681     data = []
1682     for node in self.nodes:
1683       result = ssh.SSHCall(node.name, "root", self.op.command)
1684       data.append((node.name, result.output, result.exit_code))
1685
1686     return data
1687
1688
1689 class LUActivateInstanceDisks(NoHooksLU):
1690   """Bring up an instance's disks.
1691
1692   """
1693   _OP_REQP = ["instance_name"]
1694
1695   def CheckPrereq(self):
1696     """Check prerequisites.
1697
1698     This checks that the instance is in the cluster.
1699
1700     """
1701     instance = self.cfg.GetInstanceInfo(
1702       self.cfg.ExpandInstanceName(self.op.instance_name))
1703     if instance is None:
1704       raise errors.OpPrereqError("Instance '%s' not known" %
1705                                  self.op.instance_name)
1706     self.instance = instance
1707
1708
1709   def Exec(self, feedback_fn):
1710     """Activate the disks.
1711
1712     """
1713     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1714     if not disks_ok:
1715       raise errors.OpExecError("Cannot activate block devices")
1716
1717     return disks_info
1718
1719
1720 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1721   """Prepare the block devices for an instance.
1722
1723   This sets up the block devices on all nodes.
1724
1725   Args:
1726     instance: a ganeti.objects.Instance object
1727     ignore_secondaries: if true, errors on secondary nodes won't result
1728                         in an error return from the function
1729
1730   Returns:
1731     false if the operation failed
1732     list of (host, instance_visible_name, node_visible_name) if the operation
1733          suceeded with the mapping from node devices to instance devices
1734   """
1735   device_info = []
1736   disks_ok = True
1737   for inst_disk in instance.disks:
1738     master_result = None
1739     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1740       cfg.SetDiskID(node_disk, node)
1741       is_primary = node == instance.primary_node
1742       result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1743       if not result:
1744         logger.Error("could not prepare block device %s on node %s (is_pri"
1745                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1746         if is_primary or not ignore_secondaries:
1747           disks_ok = False
1748       if is_primary:
1749         master_result = result
1750     device_info.append((instance.primary_node, inst_disk.iv_name,
1751                         master_result))
1752
1753   return disks_ok, device_info
1754
1755
1756 def _StartInstanceDisks(cfg, instance, force):
1757   """Start the disks of an instance.
1758
1759   """
1760   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1761                                            ignore_secondaries=force)
1762   if not disks_ok:
1763     _ShutdownInstanceDisks(instance, cfg)
1764     if force is not None and not force:
1765       logger.Error("If the message above refers to a secondary node,"
1766                    " you can retry the operation using '--force'.")
1767     raise errors.OpExecError("Disk consistency error")
1768
1769
1770 class LUDeactivateInstanceDisks(NoHooksLU):
1771   """Shutdown an instance's disks.
1772
1773   """
1774   _OP_REQP = ["instance_name"]
1775
1776   def CheckPrereq(self):
1777     """Check prerequisites.
1778
1779     This checks that the instance is in the cluster.
1780
1781     """
1782     instance = self.cfg.GetInstanceInfo(
1783       self.cfg.ExpandInstanceName(self.op.instance_name))
1784     if instance is None:
1785       raise errors.OpPrereqError("Instance '%s' not known" %
1786                                  self.op.instance_name)
1787     self.instance = instance
1788
1789   def Exec(self, feedback_fn):
1790     """Deactivate the disks
1791
1792     """
1793     instance = self.instance
1794     ins_l = rpc.call_instance_list([instance.primary_node])
1795     ins_l = ins_l[instance.primary_node]
1796     if not type(ins_l) is list:
1797       raise errors.OpExecError("Can't contact node '%s'" %
1798                                instance.primary_node)
1799
1800     if self.instance.name in ins_l:
1801       raise errors.OpExecError("Instance is running, can't shutdown"
1802                                " block devices.")
1803
1804     _ShutdownInstanceDisks(instance, self.cfg)
1805
1806
1807 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1808   """Shutdown block devices of an instance.
1809
1810   This does the shutdown on all nodes of the instance.
1811
1812   If the ignore_primary is false, errors on the primary node are
1813   ignored.
1814
1815   """
1816   result = True
1817   for disk in instance.disks:
1818     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1819       cfg.SetDiskID(top_disk, node)
1820       if not rpc.call_blockdev_shutdown(node, top_disk):
1821         logger.Error("could not shutdown block device %s on node %s" %
1822                      (disk.iv_name, node))
1823         if not ignore_primary or node != instance.primary_node:
1824           result = False
1825   return result
1826
1827
1828 class LUStartupInstance(LogicalUnit):
1829   """Starts an instance.
1830
1831   """
1832   HPATH = "instance-start"
1833   HTYPE = constants.HTYPE_INSTANCE
1834   _OP_REQP = ["instance_name", "force"]
1835
1836   def BuildHooksEnv(self):
1837     """Build hooks env.
1838
1839     This runs on master, primary and secondary nodes of the instance.
1840
1841     """
1842     env = {
1843       "FORCE": self.op.force,
1844       }
1845     env.update(_BuildInstanceHookEnvByObject(self.instance))
1846     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1847           list(self.instance.secondary_nodes))
1848     return env, nl, nl
1849
1850   def CheckPrereq(self):
1851     """Check prerequisites.
1852
1853     This checks that the instance is in the cluster.
1854
1855     """
1856     instance = self.cfg.GetInstanceInfo(
1857       self.cfg.ExpandInstanceName(self.op.instance_name))
1858     if instance is None:
1859       raise errors.OpPrereqError("Instance '%s' not known" %
1860                                  self.op.instance_name)
1861
1862     # check bridges existance
1863     brlist = [nic.bridge for nic in instance.nics]
1864     if not rpc.call_bridges_exist(instance.primary_node, brlist):
1865       raise errors.OpPrereqError("one or more target bridges %s does not"
1866                                  " exist on destination node '%s'" %
1867                                  (brlist, instance.primary_node))
1868
1869     self.instance = instance
1870     self.op.instance_name = instance.name
1871
1872   def Exec(self, feedback_fn):
1873     """Start the instance.
1874
1875     """
1876     instance = self.instance
1877     force = self.op.force
1878     extra_args = getattr(self.op, "extra_args", "")
1879
1880     node_current = instance.primary_node
1881
1882     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1883     if not nodeinfo:
1884       raise errors.OpExecError("Could not contact node %s for infos" %
1885                                (node_current))
1886
1887     freememory = nodeinfo[node_current]['memory_free']
1888     memory = instance.memory
1889     if memory > freememory:
1890       raise errors.OpExecError("Not enough memory to start instance"
1891                                " %s on node %s"
1892                                " needed %s MiB, available %s MiB" %
1893                                (instance.name, node_current, memory,
1894                                 freememory))
1895
1896     _StartInstanceDisks(self.cfg, instance, force)
1897
1898     if not rpc.call_instance_start(node_current, instance, extra_args):
1899       _ShutdownInstanceDisks(instance, self.cfg)
1900       raise errors.OpExecError("Could not start instance")
1901
1902     self.cfg.MarkInstanceUp(instance.name)
1903
1904
1905 class LUShutdownInstance(LogicalUnit):
1906   """Shutdown an instance.
1907
1908   """
1909   HPATH = "instance-stop"
1910   HTYPE = constants.HTYPE_INSTANCE
1911   _OP_REQP = ["instance_name"]
1912
1913   def BuildHooksEnv(self):
1914     """Build hooks env.
1915
1916     This runs on master, primary and secondary nodes of the instance.
1917
1918     """
1919     env = _BuildInstanceHookEnvByObject(self.instance)
1920     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1921           list(self.instance.secondary_nodes))
1922     return env, nl, nl
1923
1924   def CheckPrereq(self):
1925     """Check prerequisites.
1926
1927     This checks that the instance is in the cluster.
1928
1929     """
1930     instance = self.cfg.GetInstanceInfo(
1931       self.cfg.ExpandInstanceName(self.op.instance_name))
1932     if instance is None:
1933       raise errors.OpPrereqError("Instance '%s' not known" %
1934                                  self.op.instance_name)
1935     self.instance = instance
1936
1937   def Exec(self, feedback_fn):
1938     """Shutdown the instance.
1939
1940     """
1941     instance = self.instance
1942     node_current = instance.primary_node
1943     if not rpc.call_instance_shutdown(node_current, instance):
1944       logger.Error("could not shutdown instance")
1945
1946     self.cfg.MarkInstanceDown(instance.name)
1947     _ShutdownInstanceDisks(instance, self.cfg)
1948
1949
1950 class LUReinstallInstance(LogicalUnit):
1951   """Reinstall an instance.
1952
1953   """
1954   HPATH = "instance-reinstall"
1955   HTYPE = constants.HTYPE_INSTANCE
1956   _OP_REQP = ["instance_name"]
1957
1958   def BuildHooksEnv(self):
1959     """Build hooks env.
1960
1961     This runs on master, primary and secondary nodes of the instance.
1962
1963     """
1964     env = _BuildInstanceHookEnvByObject(self.instance)
1965     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1966           list(self.instance.secondary_nodes))
1967     return env, nl, nl
1968
1969   def CheckPrereq(self):
1970     """Check prerequisites.
1971
1972     This checks that the instance is in the cluster and is not running.
1973
1974     """
1975     instance = self.cfg.GetInstanceInfo(
1976       self.cfg.ExpandInstanceName(self.op.instance_name))
1977     if instance is None:
1978       raise errors.OpPrereqError("Instance '%s' not known" %
1979                                  self.op.instance_name)
1980     if instance.disk_template == constants.DT_DISKLESS:
1981       raise errors.OpPrereqError("Instance '%s' has no disks" %
1982                                  self.op.instance_name)
1983     if instance.status != "down":
1984       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
1985                                  self.op.instance_name)
1986     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1987     if remote_info:
1988       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
1989                                  (self.op.instance_name,
1990                                   instance.primary_node))
1991
1992     self.op.os_type = getattr(self.op, "os_type", None)
1993     if self.op.os_type is not None:
1994       # OS verification
1995       pnode = self.cfg.GetNodeInfo(
1996         self.cfg.ExpandNodeName(instance.primary_node))
1997       if pnode is None:
1998         raise errors.OpPrereqError("Primary node '%s' is unknown" %
1999                                    self.op.pnode)
2000       os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2001       if not isinstance(os_obj, objects.OS):
2002         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2003                                    " primary node"  % self.op.os_type)
2004
2005     self.instance = instance
2006
2007   def Exec(self, feedback_fn):
2008     """Reinstall the instance.
2009
2010     """
2011     inst = self.instance
2012
2013     if self.op.os_type is not None:
2014       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2015       inst.os = self.op.os_type
2016       self.cfg.AddInstance(inst)
2017
2018     _StartInstanceDisks(self.cfg, inst, None)
2019     try:
2020       feedback_fn("Running the instance OS create scripts...")
2021       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2022         raise errors.OpExecError("Could not install OS for instance %s "
2023                                  "on node %s" %
2024                                  (inst.name, inst.primary_node))
2025     finally:
2026       _ShutdownInstanceDisks(inst, self.cfg)
2027
2028
2029 class LURemoveInstance(LogicalUnit):
2030   """Remove an instance.
2031
2032   """
2033   HPATH = "instance-remove"
2034   HTYPE = constants.HTYPE_INSTANCE
2035   _OP_REQP = ["instance_name"]
2036
2037   def BuildHooksEnv(self):
2038     """Build hooks env.
2039
2040     This runs on master, primary and secondary nodes of the instance.
2041
2042     """
2043     env = _BuildInstanceHookEnvByObject(self.instance)
2044     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2045           list(self.instance.secondary_nodes))
2046     return env, nl, nl
2047
2048   def CheckPrereq(self):
2049     """Check prerequisites.
2050
2051     This checks that the instance is in the cluster.
2052
2053     """
2054     instance = self.cfg.GetInstanceInfo(
2055       self.cfg.ExpandInstanceName(self.op.instance_name))
2056     if instance is None:
2057       raise errors.OpPrereqError("Instance '%s' not known" %
2058                                  self.op.instance_name)
2059     self.instance = instance
2060
2061   def Exec(self, feedback_fn):
2062     """Remove the instance.
2063
2064     """
2065     instance = self.instance
2066     logger.Info("shutting down instance %s on node %s" %
2067                 (instance.name, instance.primary_node))
2068
2069     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2070       raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2071                                (instance.name, instance.primary_node))
2072
2073     logger.Info("removing block devices for instance %s" % instance.name)
2074
2075     _RemoveDisks(instance, self.cfg)
2076
2077     logger.Info("removing instance %s out of cluster config" % instance.name)
2078
2079     self.cfg.RemoveInstance(instance.name)
2080
2081
2082 class LUQueryInstances(NoHooksLU):
2083   """Logical unit for querying instances.
2084
2085   """
2086   _OP_REQP = ["output_fields"]
2087
2088   def CheckPrereq(self):
2089     """Check prerequisites.
2090
2091     This checks that the fields required are valid output fields.
2092
2093     """
2094     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2095     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2096                                "admin_state", "admin_ram",
2097                                "disk_template", "ip", "mac", "bridge",
2098                                "sda_size", "sdb_size"],
2099                        dynamic=self.dynamic_fields,
2100                        selected=self.op.output_fields)
2101
2102   def Exec(self, feedback_fn):
2103     """Computes the list of nodes and their attributes.
2104
2105     """
2106     instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2107     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2108                      in instance_names]
2109
2110     # begin data gathering
2111
2112     nodes = frozenset([inst.primary_node for inst in instance_list])
2113
2114     bad_nodes = []
2115     if self.dynamic_fields.intersection(self.op.output_fields):
2116       live_data = {}
2117       node_data = rpc.call_all_instances_info(nodes)
2118       for name in nodes:
2119         result = node_data[name]
2120         if result:
2121           live_data.update(result)
2122         elif result == False:
2123           bad_nodes.append(name)
2124         # else no instance is alive
2125     else:
2126       live_data = dict([(name, {}) for name in instance_names])
2127
2128     # end data gathering
2129
2130     output = []
2131     for instance in instance_list:
2132       iout = []
2133       for field in self.op.output_fields:
2134         if field == "name":
2135           val = instance.name
2136         elif field == "os":
2137           val = instance.os
2138         elif field == "pnode":
2139           val = instance.primary_node
2140         elif field == "snodes":
2141           val = list(instance.secondary_nodes)
2142         elif field == "admin_state":
2143           val = (instance.status != "down")
2144         elif field == "oper_state":
2145           if instance.primary_node in bad_nodes:
2146             val = None
2147           else:
2148             val = bool(live_data.get(instance.name))
2149         elif field == "admin_ram":
2150           val = instance.memory
2151         elif field == "oper_ram":
2152           if instance.primary_node in bad_nodes:
2153             val = None
2154           elif instance.name in live_data:
2155             val = live_data[instance.name].get("memory", "?")
2156           else:
2157             val = "-"
2158         elif field == "disk_template":
2159           val = instance.disk_template
2160         elif field == "ip":
2161           val = instance.nics[0].ip
2162         elif field == "bridge":
2163           val = instance.nics[0].bridge
2164         elif field == "mac":
2165           val = instance.nics[0].mac
2166         elif field == "sda_size" or field == "sdb_size":
2167           disk = instance.FindDisk(field[:3])
2168           if disk is None:
2169             val = None
2170           else:
2171             val = disk.size
2172         else:
2173           raise errors.ParameterError(field)
2174         iout.append(val)
2175       output.append(iout)
2176
2177     return output
2178
2179
2180 class LUFailoverInstance(LogicalUnit):
2181   """Failover an instance.
2182
2183   """
2184   HPATH = "instance-failover"
2185   HTYPE = constants.HTYPE_INSTANCE
2186   _OP_REQP = ["instance_name", "ignore_consistency"]
2187
2188   def BuildHooksEnv(self):
2189     """Build hooks env.
2190
2191     This runs on master, primary and secondary nodes of the instance.
2192
2193     """
2194     env = {
2195       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2196       }
2197     env.update(_BuildInstanceHookEnvByObject(self.instance))
2198     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2199     return env, nl, nl
2200
2201   def CheckPrereq(self):
2202     """Check prerequisites.
2203
2204     This checks that the instance is in the cluster.
2205
2206     """
2207     instance = self.cfg.GetInstanceInfo(
2208       self.cfg.ExpandInstanceName(self.op.instance_name))
2209     if instance is None:
2210       raise errors.OpPrereqError("Instance '%s' not known" %
2211                                  self.op.instance_name)
2212
2213     if instance.disk_template != constants.DT_REMOTE_RAID1:
2214       raise errors.OpPrereqError("Instance's disk layout is not"
2215                                  " remote_raid1.")
2216
2217     secondary_nodes = instance.secondary_nodes
2218     if not secondary_nodes:
2219       raise errors.ProgrammerError("no secondary node but using "
2220                                    "DT_REMOTE_RAID1 template")
2221
2222     # check memory requirements on the secondary node
2223     target_node = secondary_nodes[0]
2224     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2225     info = nodeinfo.get(target_node, None)
2226     if not info:
2227       raise errors.OpPrereqError("Cannot get current information"
2228                                  " from node '%s'" % nodeinfo)
2229     if instance.memory > info['memory_free']:
2230       raise errors.OpPrereqError("Not enough memory on target node %s."
2231                                  " %d MB available, %d MB required" %
2232                                  (target_node, info['memory_free'],
2233                                   instance.memory))
2234
2235     # check bridge existance
2236     brlist = [nic.bridge for nic in instance.nics]
2237     if not rpc.call_bridges_exist(instance.primary_node, brlist):
2238       raise errors.OpPrereqError("One or more target bridges %s does not"
2239                                  " exist on destination node '%s'" %
2240                                  (brlist, instance.primary_node))
2241
2242     self.instance = instance
2243
2244   def Exec(self, feedback_fn):
2245     """Failover an instance.
2246
2247     The failover is done by shutting it down on its present node and
2248     starting it on the secondary.
2249
2250     """
2251     instance = self.instance
2252
2253     source_node = instance.primary_node
2254     target_node = instance.secondary_nodes[0]
2255
2256     feedback_fn("* checking disk consistency between source and target")
2257     for dev in instance.disks:
2258       # for remote_raid1, these are md over drbd
2259       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2260         if not self.op.ignore_consistency:
2261           raise errors.OpExecError("Disk %s is degraded on target node,"
2262                                    " aborting failover." % dev.iv_name)
2263
2264     feedback_fn("* checking target node resource availability")
2265     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2266
2267     if not nodeinfo:
2268       raise errors.OpExecError("Could not contact target node %s." %
2269                                target_node)
2270
2271     free_memory = int(nodeinfo[target_node]['memory_free'])
2272     memory = instance.memory
2273     if memory > free_memory:
2274       raise errors.OpExecError("Not enough memory to create instance %s on"
2275                                " node %s. needed %s MiB, available %s MiB" %
2276                                (instance.name, target_node, memory,
2277                                 free_memory))
2278
2279     feedback_fn("* shutting down instance on source node")
2280     logger.Info("Shutting down instance %s on node %s" %
2281                 (instance.name, source_node))
2282
2283     if not rpc.call_instance_shutdown(source_node, instance):
2284       logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2285                    " anyway. Please make sure node %s is down"  %
2286                    (instance.name, source_node, source_node))
2287
2288     feedback_fn("* deactivating the instance's disks on source node")
2289     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2290       raise errors.OpExecError("Can't shut down the instance's disks.")
2291
2292     instance.primary_node = target_node
2293     # distribute new instance config to the other nodes
2294     self.cfg.AddInstance(instance)
2295
2296     feedback_fn("* activating the instance's disks on target node")
2297     logger.Info("Starting instance %s on node %s" %
2298                 (instance.name, target_node))
2299
2300     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2301                                              ignore_secondaries=True)
2302     if not disks_ok:
2303       _ShutdownInstanceDisks(instance, self.cfg)
2304       raise errors.OpExecError("Can't activate the instance's disks")
2305
2306     feedback_fn("* starting the instance on the target node")
2307     if not rpc.call_instance_start(target_node, instance, None):
2308       _ShutdownInstanceDisks(instance, self.cfg)
2309       raise errors.OpExecError("Could not start instance %s on node %s." %
2310                                (instance.name, target_node))
2311
2312
2313 def _CreateBlockDevOnPrimary(cfg, node, device, info):
2314   """Create a tree of block devices on the primary node.
2315
2316   This always creates all devices.
2317
2318   """
2319   if device.children:
2320     for child in device.children:
2321       if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2322         return False
2323
2324   cfg.SetDiskID(device, node)
2325   new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2326   if not new_id:
2327     return False
2328   if device.physical_id is None:
2329     device.physical_id = new_id
2330   return True
2331
2332
2333 def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2334   """Create a tree of block devices on a secondary node.
2335
2336   If this device type has to be created on secondaries, create it and
2337   all its children.
2338
2339   If not, just recurse to children keeping the same 'force' value.
2340
2341   """
2342   if device.CreateOnSecondary():
2343     force = True
2344   if device.children:
2345     for child in device.children:
2346       if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2347         return False
2348
2349   if not force:
2350     return True
2351   cfg.SetDiskID(device, node)
2352   new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2353   if not new_id:
2354     return False
2355   if device.physical_id is None:
2356     device.physical_id = new_id
2357   return True
2358
2359
2360 def _GenerateUniqueNames(cfg, exts):
2361   """Generate a suitable LV name.
2362
2363   This will generate a logical volume name for the given instance.
2364
2365   """
2366   results = []
2367   for val in exts:
2368     new_id = cfg.GenerateUniqueID()
2369     results.append("%s%s" % (new_id, val))
2370   return results
2371
2372
2373 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2374   """Generate a drbd device complete with its children.
2375
2376   """
2377   port = cfg.AllocatePort()
2378   vgname = cfg.GetVGName()
2379   dev_data = objects.Disk(dev_type="lvm", size=size,
2380                           logical_id=(vgname, names[0]))
2381   dev_meta = objects.Disk(dev_type="lvm", size=128,
2382                           logical_id=(vgname, names[1]))
2383   drbd_dev = objects.Disk(dev_type="drbd", size=size,
2384                           logical_id = (primary, secondary, port),
2385                           children = [dev_data, dev_meta])
2386   return drbd_dev
2387
2388
2389 def _GenerateDiskTemplate(cfg, template_name,
2390                           instance_name, primary_node,
2391                           secondary_nodes, disk_sz, swap_sz):
2392   """Generate the entire disk layout for a given template type.
2393
2394   """
2395   #TODO: compute space requirements
2396
2397   vgname = cfg.GetVGName()
2398   if template_name == "diskless":
2399     disks = []
2400   elif template_name == "plain":
2401     if len(secondary_nodes) != 0:
2402       raise errors.ProgrammerError("Wrong template configuration")
2403
2404     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2405     sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2406                            logical_id=(vgname, names[0]),
2407                            iv_name = "sda")
2408     sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2409                            logical_id=(vgname, names[1]),
2410                            iv_name = "sdb")
2411     disks = [sda_dev, sdb_dev]
2412   elif template_name == "local_raid1":
2413     if len(secondary_nodes) != 0:
2414       raise errors.ProgrammerError("Wrong template configuration")
2415
2416
2417     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2418                                        ".sdb_m1", ".sdb_m2"])
2419     sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2420                               logical_id=(vgname, names[0]))
2421     sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2422                               logical_id=(vgname, names[1]))
2423     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2424                               size=disk_sz,
2425                               children = [sda_dev_m1, sda_dev_m2])
2426     sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2427                               logical_id=(vgname, names[2]))
2428     sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2429                               logical_id=(vgname, names[3]))
2430     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2431                               size=swap_sz,
2432                               children = [sdb_dev_m1, sdb_dev_m2])
2433     disks = [md_sda_dev, md_sdb_dev]
2434   elif template_name == constants.DT_REMOTE_RAID1:
2435     if len(secondary_nodes) != 1:
2436       raise errors.ProgrammerError("Wrong template configuration")
2437     remote_node = secondary_nodes[0]
2438     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2439                                        ".sdb_data", ".sdb_meta"])
2440     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2441                                          disk_sz, names[0:2])
2442     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2443                               children = [drbd_sda_dev], size=disk_sz)
2444     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2445                                          swap_sz, names[2:4])
2446     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2447                               children = [drbd_sdb_dev], size=swap_sz)
2448     disks = [md_sda_dev, md_sdb_dev]
2449   else:
2450     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2451   return disks
2452
2453
2454 def _GetInstanceInfoText(instance):
2455   """Compute that text that should be added to the disk's metadata.
2456
2457   """
2458   return "originstname+%s" % instance.name
2459
2460
2461 def _CreateDisks(cfg, instance):
2462   """Create all disks for an instance.
2463
2464   This abstracts away some work from AddInstance.
2465
2466   Args:
2467     instance: the instance object
2468
2469   Returns:
2470     True or False showing the success of the creation process
2471
2472   """
2473   info = _GetInstanceInfoText(instance)
2474
2475   for device in instance.disks:
2476     logger.Info("creating volume %s for instance %s" %
2477               (device.iv_name, instance.name))
2478     #HARDCODE
2479     for secondary_node in instance.secondary_nodes:
2480       if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2481                                         info):
2482         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2483                      (device.iv_name, device, secondary_node))
2484         return False
2485     #HARDCODE
2486     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2487       logger.Error("failed to create volume %s on primary!" %
2488                    device.iv_name)
2489       return False
2490   return True
2491
2492
2493 def _RemoveDisks(instance, cfg):
2494   """Remove all disks for an instance.
2495
2496   This abstracts away some work from `AddInstance()` and
2497   `RemoveInstance()`. Note that in case some of the devices couldn't
2498   be remove, the removal will continue with the other ones (compare
2499   with `_CreateDisks()`).
2500
2501   Args:
2502     instance: the instance object
2503
2504   Returns:
2505     True or False showing the success of the removal proces
2506
2507   """
2508   logger.Info("removing block devices for instance %s" % instance.name)
2509
2510   result = True
2511   for device in instance.disks:
2512     for node, disk in device.ComputeNodeTree(instance.primary_node):
2513       cfg.SetDiskID(disk, node)
2514       if not rpc.call_blockdev_remove(node, disk):
2515         logger.Error("could not remove block device %s on node %s,"
2516                      " continuing anyway" %
2517                      (device.iv_name, node))
2518         result = False
2519   return result
2520
2521
2522 class LUCreateInstance(LogicalUnit):
2523   """Create an instance.
2524
2525   """
2526   HPATH = "instance-add"
2527   HTYPE = constants.HTYPE_INSTANCE
2528   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2529               "disk_template", "swap_size", "mode", "start", "vcpus",
2530               "wait_for_sync"]
2531
2532   def BuildHooksEnv(self):
2533     """Build hooks env.
2534
2535     This runs on master, primary and secondary nodes of the instance.
2536
2537     """
2538     env = {
2539       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2540       "INSTANCE_DISK_SIZE": self.op.disk_size,
2541       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2542       "INSTANCE_ADD_MODE": self.op.mode,
2543       }
2544     if self.op.mode == constants.INSTANCE_IMPORT:
2545       env["INSTANCE_SRC_NODE"] = self.op.src_node
2546       env["INSTANCE_SRC_PATH"] = self.op.src_path
2547       env["INSTANCE_SRC_IMAGE"] = self.src_image
2548
2549     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2550       primary_node=self.op.pnode,
2551       secondary_nodes=self.secondaries,
2552       status=self.instance_status,
2553       os_type=self.op.os_type,
2554       memory=self.op.mem_size,
2555       vcpus=self.op.vcpus,
2556       nics=[(self.inst_ip, self.op.bridge)],
2557     ))
2558
2559     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2560           self.secondaries)
2561     return env, nl, nl
2562
2563
2564   def CheckPrereq(self):
2565     """Check prerequisites.
2566
2567     """
2568     if self.op.mode not in (constants.INSTANCE_CREATE,
2569                             constants.INSTANCE_IMPORT):
2570       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2571                                  self.op.mode)
2572
2573     if self.op.mode == constants.INSTANCE_IMPORT:
2574       src_node = getattr(self.op, "src_node", None)
2575       src_path = getattr(self.op, "src_path", None)
2576       if src_node is None or src_path is None:
2577         raise errors.OpPrereqError("Importing an instance requires source"
2578                                    " node and path options")
2579       src_node_full = self.cfg.ExpandNodeName(src_node)
2580       if src_node_full is None:
2581         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2582       self.op.src_node = src_node = src_node_full
2583
2584       if not os.path.isabs(src_path):
2585         raise errors.OpPrereqError("The source path must be absolute")
2586
2587       export_info = rpc.call_export_info(src_node, src_path)
2588
2589       if not export_info:
2590         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2591
2592       if not export_info.has_section(constants.INISECT_EXP):
2593         raise errors.ProgrammerError("Corrupted export config")
2594
2595       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2596       if (int(ei_version) != constants.EXPORT_VERSION):
2597         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2598                                    (ei_version, constants.EXPORT_VERSION))
2599
2600       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2601         raise errors.OpPrereqError("Can't import instance with more than"
2602                                    " one data disk")
2603
2604       # FIXME: are the old os-es, disk sizes, etc. useful?
2605       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2606       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2607                                                          'disk0_dump'))
2608       self.src_image = diskimage
2609     else: # INSTANCE_CREATE
2610       if getattr(self.op, "os_type", None) is None:
2611         raise errors.OpPrereqError("No guest OS specified")
2612
2613     # check primary node
2614     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2615     if pnode is None:
2616       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2617                                  self.op.pnode)
2618     self.op.pnode = pnode.name
2619     self.pnode = pnode
2620     self.secondaries = []
2621     # disk template and mirror node verification
2622     if self.op.disk_template not in constants.DISK_TEMPLATES:
2623       raise errors.OpPrereqError("Invalid disk template name")
2624
2625     if self.op.disk_template == constants.DT_REMOTE_RAID1:
2626       if getattr(self.op, "snode", None) is None:
2627         raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2628                                    " a mirror node")
2629
2630       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2631       if snode_name is None:
2632         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2633                                    self.op.snode)
2634       elif snode_name == pnode.name:
2635         raise errors.OpPrereqError("The secondary node cannot be"
2636                                    " the primary node.")
2637       self.secondaries.append(snode_name)
2638
2639     # Check lv size requirements
2640     nodenames = [pnode.name] + self.secondaries
2641     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2642
2643     # Required free disk space as a function of disk and swap space
2644     req_size_dict = {
2645       constants.DT_DISKLESS: 0,
2646       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2647       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2648       # 256 MB are added for drbd metadata, 128MB for each drbd device
2649       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2650     }
2651
2652     if self.op.disk_template not in req_size_dict:
2653       raise errors.ProgrammerError("Disk template '%s' size requirement"
2654                                    " is unknown" %  self.op.disk_template)
2655
2656     req_size = req_size_dict[self.op.disk_template]
2657
2658     for node in nodenames:
2659       info = nodeinfo.get(node, None)
2660       if not info:
2661         raise errors.OpPrereqError("Cannot get current information"
2662                                    " from node '%s'" % nodeinfo)
2663       if req_size > info['vg_free']:
2664         raise errors.OpPrereqError("Not enough disk space on target node %s."
2665                                    " %d MB available, %d MB required" %
2666                                    (node, info['vg_free'], req_size))
2667
2668     # os verification
2669     os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2670     if not isinstance(os_obj, objects.OS):
2671       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2672                                  " primary node"  % self.op.os_type)
2673
2674     # instance verification
2675     hostname1 = utils.LookupHostname(self.op.instance_name)
2676     if not hostname1:
2677       raise errors.OpPrereqError("Instance name '%s' not found in dns" %
2678                                  self.op.instance_name)
2679
2680     self.op.instance_name = instance_name = hostname1['hostname']
2681     instance_list = self.cfg.GetInstanceList()
2682     if instance_name in instance_list:
2683       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2684                                  instance_name)
2685
2686     ip = getattr(self.op, "ip", None)
2687     if ip is None or ip.lower() == "none":
2688       inst_ip = None
2689     elif ip.lower() == "auto":
2690       inst_ip = hostname1['ip']
2691     else:
2692       if not utils.IsValidIP(ip):
2693         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2694                                    " like a valid IP" % ip)
2695       inst_ip = ip
2696     self.inst_ip = inst_ip
2697
2698     command = ["fping", "-q", hostname1['ip']]
2699     result = utils.RunCmd(command)
2700     if not result.failed:
2701       raise errors.OpPrereqError("IP %s of instance %s already in use" %
2702                                  (hostname1['ip'], instance_name))
2703
2704     # bridge verification
2705     bridge = getattr(self.op, "bridge", None)
2706     if bridge is None:
2707       self.op.bridge = self.cfg.GetDefBridge()
2708     else:
2709       self.op.bridge = bridge
2710
2711     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2712       raise errors.OpPrereqError("target bridge '%s' does not exist on"
2713                                  " destination node '%s'" %
2714                                  (self.op.bridge, pnode.name))
2715
2716     if self.op.start:
2717       self.instance_status = 'up'
2718     else:
2719       self.instance_status = 'down'
2720
2721   def Exec(self, feedback_fn):
2722     """Create and add the instance to the cluster.
2723
2724     """
2725     instance = self.op.instance_name
2726     pnode_name = self.pnode.name
2727
2728     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2729     if self.inst_ip is not None:
2730       nic.ip = self.inst_ip
2731
2732     disks = _GenerateDiskTemplate(self.cfg,
2733                                   self.op.disk_template,
2734                                   instance, pnode_name,
2735                                   self.secondaries, self.op.disk_size,
2736                                   self.op.swap_size)
2737
2738     iobj = objects.Instance(name=instance, os=self.op.os_type,
2739                             primary_node=pnode_name,
2740                             memory=self.op.mem_size,
2741                             vcpus=self.op.vcpus,
2742                             nics=[nic], disks=disks,
2743                             disk_template=self.op.disk_template,
2744                             status=self.instance_status,
2745                             )
2746
2747     feedback_fn("* creating instance disks...")
2748     if not _CreateDisks(self.cfg, iobj):
2749       _RemoveDisks(iobj, self.cfg)
2750       raise errors.OpExecError("Device creation failed, reverting...")
2751
2752     feedback_fn("adding instance %s to cluster config" % instance)
2753
2754     self.cfg.AddInstance(iobj)
2755
2756     if self.op.wait_for_sync:
2757       disk_abort = not _WaitForSync(self.cfg, iobj)
2758     elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2759       # make sure the disks are not degraded (still sync-ing is ok)
2760       time.sleep(15)
2761       feedback_fn("* checking mirrors status")
2762       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2763     else:
2764       disk_abort = False
2765
2766     if disk_abort:
2767       _RemoveDisks(iobj, self.cfg)
2768       self.cfg.RemoveInstance(iobj.name)
2769       raise errors.OpExecError("There are some degraded disks for"
2770                                " this instance")
2771
2772     feedback_fn("creating os for instance %s on node %s" %
2773                 (instance, pnode_name))
2774
2775     if iobj.disk_template != constants.DT_DISKLESS:
2776       if self.op.mode == constants.INSTANCE_CREATE:
2777         feedback_fn("* running the instance OS create scripts...")
2778         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2779           raise errors.OpExecError("could not add os for instance %s"
2780                                    " on node %s" %
2781                                    (instance, pnode_name))
2782
2783       elif self.op.mode == constants.INSTANCE_IMPORT:
2784         feedback_fn("* running the instance OS import scripts...")
2785         src_node = self.op.src_node
2786         src_image = self.src_image
2787         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2788                                                 src_node, src_image):
2789           raise errors.OpExecError("Could not import os for instance"
2790                                    " %s on node %s" %
2791                                    (instance, pnode_name))
2792       else:
2793         # also checked in the prereq part
2794         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2795                                      % self.op.mode)
2796
2797     if self.op.start:
2798       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2799       feedback_fn("* starting instance...")
2800       if not rpc.call_instance_start(pnode_name, iobj, None):
2801         raise errors.OpExecError("Could not start instance")
2802
2803
2804 class LUConnectConsole(NoHooksLU):
2805   """Connect to an instance's console.
2806
2807   This is somewhat special in that it returns the command line that
2808   you need to run on the master node in order to connect to the
2809   console.
2810
2811   """
2812   _OP_REQP = ["instance_name"]
2813
2814   def CheckPrereq(self):
2815     """Check prerequisites.
2816
2817     This checks that the instance is in the cluster.
2818
2819     """
2820     instance = self.cfg.GetInstanceInfo(
2821       self.cfg.ExpandInstanceName(self.op.instance_name))
2822     if instance is None:
2823       raise errors.OpPrereqError("Instance '%s' not known" %
2824                                  self.op.instance_name)
2825     self.instance = instance
2826
2827   def Exec(self, feedback_fn):
2828     """Connect to the console of an instance
2829
2830     """
2831     instance = self.instance
2832     node = instance.primary_node
2833
2834     node_insts = rpc.call_instance_list([node])[node]
2835     if node_insts is False:
2836       raise errors.OpExecError("Can't connect to node %s." % node)
2837
2838     if instance.name not in node_insts:
2839       raise errors.OpExecError("Instance %s is not running." % instance.name)
2840
2841     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2842
2843     hyper = hypervisor.GetHypervisor()
2844     console_cmd = hyper.GetShellCommandForConsole(instance.name)
2845     # build ssh cmdline
2846     argv = ["ssh", "-q", "-t"]
2847     argv.extend(ssh.KNOWN_HOSTS_OPTS)
2848     argv.extend(ssh.BATCH_MODE_OPTS)
2849     argv.append(node)
2850     argv.append(console_cmd)
2851     return "ssh", argv
2852
2853
2854 class LUAddMDDRBDComponent(LogicalUnit):
2855   """Adda new mirror member to an instance's disk.
2856
2857   """
2858   HPATH = "mirror-add"
2859   HTYPE = constants.HTYPE_INSTANCE
2860   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2861
2862   def BuildHooksEnv(self):
2863     """Build hooks env.
2864
2865     This runs on the master, the primary and all the secondaries.
2866
2867     """
2868     env = {
2869       "NEW_SECONDARY": self.op.remote_node,
2870       "DISK_NAME": self.op.disk_name,
2871       }
2872     env.update(_BuildInstanceHookEnvByObject(self.instance))
2873     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2874           self.op.remote_node,] + list(self.instance.secondary_nodes)
2875     return env, nl, nl
2876
2877   def CheckPrereq(self):
2878     """Check prerequisites.
2879
2880     This checks that the instance is in the cluster.
2881
2882     """
2883     instance = self.cfg.GetInstanceInfo(
2884       self.cfg.ExpandInstanceName(self.op.instance_name))
2885     if instance is None:
2886       raise errors.OpPrereqError("Instance '%s' not known" %
2887                                  self.op.instance_name)
2888     self.instance = instance
2889
2890     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2891     if remote_node is None:
2892       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
2893     self.remote_node = remote_node
2894
2895     if remote_node == instance.primary_node:
2896       raise errors.OpPrereqError("The specified node is the primary node of"
2897                                  " the instance.")
2898
2899     if instance.disk_template != constants.DT_REMOTE_RAID1:
2900       raise errors.OpPrereqError("Instance's disk layout is not"
2901                                  " remote_raid1.")
2902     for disk in instance.disks:
2903       if disk.iv_name == self.op.disk_name:
2904         break
2905     else:
2906       raise errors.OpPrereqError("Can't find this device ('%s') in the"
2907                                  " instance." % self.op.disk_name)
2908     if len(disk.children) > 1:
2909       raise errors.OpPrereqError("The device already has two slave"
2910                                  " devices.\n"
2911                                  "This would create a 3-disk raid1"
2912                                  " which we don't allow.")
2913     self.disk = disk
2914
2915   def Exec(self, feedback_fn):
2916     """Add the mirror component
2917
2918     """
2919     disk = self.disk
2920     instance = self.instance
2921
2922     remote_node = self.remote_node
2923     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
2924     names = _GenerateUniqueNames(self.cfg, lv_names)
2925     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
2926                                      remote_node, disk.size, names)
2927
2928     logger.Info("adding new mirror component on secondary")
2929     #HARDCODE
2930     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
2931                                       _GetInstanceInfoText(instance)):
2932       raise errors.OpExecError("Failed to create new component on secondary"
2933                                " node %s" % remote_node)
2934
2935     logger.Info("adding new mirror component on primary")
2936     #HARDCODE
2937     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
2938                                     _GetInstanceInfoText(instance)):
2939       # remove secondary dev
2940       self.cfg.SetDiskID(new_drbd, remote_node)
2941       rpc.call_blockdev_remove(remote_node, new_drbd)
2942       raise errors.OpExecError("Failed to create volume on primary")
2943
2944     # the device exists now
2945     # call the primary node to add the mirror to md
2946     logger.Info("adding new mirror component to md")
2947     if not rpc.call_blockdev_addchild(instance.primary_node,
2948                                            disk, new_drbd):
2949       logger.Error("Can't add mirror compoment to md!")
2950       self.cfg.SetDiskID(new_drbd, remote_node)
2951       if not rpc.call_blockdev_remove(remote_node, new_drbd):
2952         logger.Error("Can't rollback on secondary")
2953       self.cfg.SetDiskID(new_drbd, instance.primary_node)
2954       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2955         logger.Error("Can't rollback on primary")
2956       raise errors.OpExecError("Can't add mirror component to md array")
2957
2958     disk.children.append(new_drbd)
2959
2960     self.cfg.AddInstance(instance)
2961
2962     _WaitForSync(self.cfg, instance)
2963
2964     return 0
2965
2966
2967 class LURemoveMDDRBDComponent(LogicalUnit):
2968   """Remove a component from a remote_raid1 disk.
2969
2970   """
2971   HPATH = "mirror-remove"
2972   HTYPE = constants.HTYPE_INSTANCE
2973   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2974
2975   def BuildHooksEnv(self):
2976     """Build hooks env.
2977
2978     This runs on the master, the primary and all the secondaries.
2979
2980     """
2981     env = {
2982       "DISK_NAME": self.op.disk_name,
2983       "DISK_ID": self.op.disk_id,
2984       "OLD_SECONDARY": self.old_secondary,
2985       }
2986     env.update(_BuildInstanceHookEnvByObject(self.instance))
2987     nl = [self.sstore.GetMasterNode(),
2988           self.instance.primary_node] + list(self.instance.secondary_nodes)
2989     return env, nl, nl
2990
2991   def CheckPrereq(self):
2992     """Check prerequisites.
2993
2994     This checks that the instance is in the cluster.
2995
2996     """
2997     instance = self.cfg.GetInstanceInfo(
2998       self.cfg.ExpandInstanceName(self.op.instance_name))
2999     if instance is None:
3000       raise errors.OpPrereqError("Instance '%s' not known" %
3001                                  self.op.instance_name)
3002     self.instance = instance
3003
3004     if instance.disk_template != constants.DT_REMOTE_RAID1:
3005       raise errors.OpPrereqError("Instance's disk layout is not"
3006                                  " remote_raid1.")
3007     for disk in instance.disks:
3008       if disk.iv_name == self.op.disk_name:
3009         break
3010     else:
3011       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3012                                  " instance." % self.op.disk_name)
3013     for child in disk.children:
3014       if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
3015         break
3016     else:
3017       raise errors.OpPrereqError("Can't find the device with this port.")
3018
3019     if len(disk.children) < 2:
3020       raise errors.OpPrereqError("Cannot remove the last component from"
3021                                  " a mirror.")
3022     self.disk = disk
3023     self.child = child
3024     if self.child.logical_id[0] == instance.primary_node:
3025       oid = 1
3026     else:
3027       oid = 0
3028     self.old_secondary = self.child.logical_id[oid]
3029
3030   def Exec(self, feedback_fn):
3031     """Remove the mirror component
3032
3033     """
3034     instance = self.instance
3035     disk = self.disk
3036     child = self.child
3037     logger.Info("remove mirror component")
3038     self.cfg.SetDiskID(disk, instance.primary_node)
3039     if not rpc.call_blockdev_removechild(instance.primary_node,
3040                                               disk, child):
3041       raise errors.OpExecError("Can't remove child from mirror.")
3042
3043     for node in child.logical_id[:2]:
3044       self.cfg.SetDiskID(child, node)
3045       if not rpc.call_blockdev_remove(node, child):
3046         logger.Error("Warning: failed to remove device from node %s,"
3047                      " continuing operation." % node)
3048
3049     disk.children.remove(child)
3050     self.cfg.AddInstance(instance)
3051
3052
3053 class LUReplaceDisks(LogicalUnit):
3054   """Replace the disks of an instance.
3055
3056   """
3057   HPATH = "mirrors-replace"
3058   HTYPE = constants.HTYPE_INSTANCE
3059   _OP_REQP = ["instance_name"]
3060
3061   def BuildHooksEnv(self):
3062     """Build hooks env.
3063
3064     This runs on the master, the primary and all the secondaries.
3065
3066     """
3067     env = {
3068       "NEW_SECONDARY": self.op.remote_node,
3069       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3070       }
3071     env.update(_BuildInstanceHookEnvByObject(self.instance))
3072     nl = [self.sstore.GetMasterNode(),
3073           self.instance.primary_node] + list(self.instance.secondary_nodes)
3074     return env, nl, nl
3075
3076   def CheckPrereq(self):
3077     """Check prerequisites.
3078
3079     This checks that the instance is in the cluster.
3080
3081     """
3082     instance = self.cfg.GetInstanceInfo(
3083       self.cfg.ExpandInstanceName(self.op.instance_name))
3084     if instance is None:
3085       raise errors.OpPrereqError("Instance '%s' not known" %
3086                                  self.op.instance_name)
3087     self.instance = instance
3088
3089     if instance.disk_template != constants.DT_REMOTE_RAID1:
3090       raise errors.OpPrereqError("Instance's disk layout is not"
3091                                  " remote_raid1.")
3092
3093     if len(instance.secondary_nodes) != 1:
3094       raise errors.OpPrereqError("The instance has a strange layout,"
3095                                  " expected one secondary but found %d" %
3096                                  len(instance.secondary_nodes))
3097
3098     remote_node = getattr(self.op, "remote_node", None)
3099     if remote_node is None:
3100       remote_node = instance.secondary_nodes[0]
3101     else:
3102       remote_node = self.cfg.ExpandNodeName(remote_node)
3103       if remote_node is None:
3104         raise errors.OpPrereqError("Node '%s' not known" %
3105                                    self.op.remote_node)
3106     if remote_node == instance.primary_node:
3107       raise errors.OpPrereqError("The specified node is the primary node of"
3108                                  " the instance.")
3109     self.op.remote_node = remote_node
3110
3111   def Exec(self, feedback_fn):
3112     """Replace the disks of an instance.
3113
3114     """
3115     instance = self.instance
3116     iv_names = {}
3117     # start of work
3118     remote_node = self.op.remote_node
3119     cfg = self.cfg
3120     for dev in instance.disks:
3121       size = dev.size
3122       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3123       names = _GenerateUniqueNames(cfg, lv_names)
3124       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3125                                        remote_node, size, names)
3126       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3127       logger.Info("adding new mirror component on secondary for %s" %
3128                   dev.iv_name)
3129       #HARDCODE
3130       if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3131                                         _GetInstanceInfoText(instance)):
3132         raise errors.OpExecError("Failed to create new component on"
3133                                  " secondary node %s\n"
3134                                  "Full abort, cleanup manually!" %
3135                                  remote_node)
3136
3137       logger.Info("adding new mirror component on primary")
3138       #HARDCODE
3139       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3140                                       _GetInstanceInfoText(instance)):
3141         # remove secondary dev
3142         cfg.SetDiskID(new_drbd, remote_node)
3143         rpc.call_blockdev_remove(remote_node, new_drbd)
3144         raise errors.OpExecError("Failed to create volume on primary!\n"
3145                                  "Full abort, cleanup manually!!")
3146
3147       # the device exists now
3148       # call the primary node to add the mirror to md
3149       logger.Info("adding new mirror component to md")
3150       if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3151                                         new_drbd):
3152         logger.Error("Can't add mirror compoment to md!")
3153         cfg.SetDiskID(new_drbd, remote_node)
3154         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3155           logger.Error("Can't rollback on secondary")
3156         cfg.SetDiskID(new_drbd, instance.primary_node)
3157         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3158           logger.Error("Can't rollback on primary")
3159         raise errors.OpExecError("Full abort, cleanup manually!!")
3160
3161       dev.children.append(new_drbd)
3162       cfg.AddInstance(instance)
3163
3164     # this can fail as the old devices are degraded and _WaitForSync
3165     # does a combined result over all disks, so we don't check its
3166     # return value
3167     _WaitForSync(cfg, instance, unlock=True)
3168
3169     # so check manually all the devices
3170     for name in iv_names:
3171       dev, child, new_drbd = iv_names[name]
3172       cfg.SetDiskID(dev, instance.primary_node)
3173       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3174       if is_degr:
3175         raise errors.OpExecError("MD device %s is degraded!" % name)
3176       cfg.SetDiskID(new_drbd, instance.primary_node)
3177       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3178       if is_degr:
3179         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3180
3181     for name in iv_names:
3182       dev, child, new_drbd = iv_names[name]
3183       logger.Info("remove mirror %s component" % name)
3184       cfg.SetDiskID(dev, instance.primary_node)
3185       if not rpc.call_blockdev_removechild(instance.primary_node,
3186                                                 dev, child):
3187         logger.Error("Can't remove child from mirror, aborting"
3188                      " *this device cleanup*.\nYou need to cleanup manually!!")
3189         continue
3190
3191       for node in child.logical_id[:2]:
3192         logger.Info("remove child device on %s" % node)
3193         cfg.SetDiskID(child, node)
3194         if not rpc.call_blockdev_remove(node, child):
3195           logger.Error("Warning: failed to remove device from node %s,"
3196                        " continuing operation." % node)
3197
3198       dev.children.remove(child)
3199
3200       cfg.AddInstance(instance)
3201
3202
3203 class LUQueryInstanceData(NoHooksLU):
3204   """Query runtime instance data.
3205
3206   """
3207   _OP_REQP = ["instances"]
3208
3209   def CheckPrereq(self):
3210     """Check prerequisites.
3211
3212     This only checks the optional instance list against the existing names.
3213
3214     """
3215     if not isinstance(self.op.instances, list):
3216       raise errors.OpPrereqError("Invalid argument type 'instances'")
3217     if self.op.instances:
3218       self.wanted_instances = []
3219       names = self.op.instances
3220       for name in names:
3221         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3222         if instance is None:
3223           raise errors.OpPrereqError("No such instance name '%s'" % name)
3224       self.wanted_instances.append(instance)
3225     else:
3226       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3227                                in self.cfg.GetInstanceList()]
3228     return
3229
3230
3231   def _ComputeDiskStatus(self, instance, snode, dev):
3232     """Compute block device status.
3233
3234     """
3235     self.cfg.SetDiskID(dev, instance.primary_node)
3236     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3237     if dev.dev_type == "drbd":
3238       # we change the snode then (otherwise we use the one passed in)
3239       if dev.logical_id[0] == instance.primary_node:
3240         snode = dev.logical_id[1]
3241       else:
3242         snode = dev.logical_id[0]
3243
3244     if snode:
3245       self.cfg.SetDiskID(dev, snode)
3246       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3247     else:
3248       dev_sstatus = None
3249
3250     if dev.children:
3251       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3252                       for child in dev.children]
3253     else:
3254       dev_children = []
3255
3256     data = {
3257       "iv_name": dev.iv_name,
3258       "dev_type": dev.dev_type,
3259       "logical_id": dev.logical_id,
3260       "physical_id": dev.physical_id,
3261       "pstatus": dev_pstatus,
3262       "sstatus": dev_sstatus,
3263       "children": dev_children,
3264       }
3265
3266     return data
3267
3268   def Exec(self, feedback_fn):
3269     """Gather and return data"""
3270     result = {}
3271     for instance in self.wanted_instances:
3272       remote_info = rpc.call_instance_info(instance.primary_node,
3273                                                 instance.name)
3274       if remote_info and "state" in remote_info:
3275         remote_state = "up"
3276       else:
3277         remote_state = "down"
3278       if instance.status == "down":
3279         config_state = "down"
3280       else:
3281         config_state = "up"
3282
3283       disks = [self._ComputeDiskStatus(instance, None, device)
3284                for device in instance.disks]
3285
3286       idict = {
3287         "name": instance.name,
3288         "config_state": config_state,
3289         "run_state": remote_state,
3290         "pnode": instance.primary_node,
3291         "snodes": instance.secondary_nodes,
3292         "os": instance.os,
3293         "memory": instance.memory,
3294         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3295         "disks": disks,
3296         }
3297
3298       result[instance.name] = idict
3299
3300     return result
3301
3302
3303 class LUQueryNodeData(NoHooksLU):
3304   """Logical unit for querying node data.
3305
3306   """
3307   _OP_REQP = ["nodes"]
3308
3309   def CheckPrereq(self):
3310     """Check prerequisites.
3311
3312     This only checks the optional node list against the existing names.
3313
3314     """
3315     self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3316
3317   def Exec(self, feedback_fn):
3318     """Compute and return the list of nodes.
3319
3320     """
3321     ilist = [self.cfg.GetInstanceInfo(iname) for iname
3322              in self.cfg.GetInstanceList()]
3323     result = []
3324     for node in self.wanted_nodes:
3325       result.append((node.name, node.primary_ip, node.secondary_ip,
3326                      [inst.name for inst in ilist
3327                       if inst.primary_node == node.name],
3328                      [inst.name for inst in ilist
3329                       if node.name in inst.secondary_nodes],
3330                      ))
3331     return result
3332
3333
3334 class LUSetInstanceParms(LogicalUnit):
3335   """Modifies an instances's parameters.
3336
3337   """
3338   HPATH = "instance-modify"
3339   HTYPE = constants.HTYPE_INSTANCE
3340   _OP_REQP = ["instance_name"]
3341
3342   def BuildHooksEnv(self):
3343     """Build hooks env.
3344
3345     This runs on the master, primary and secondaries.
3346
3347     """
3348     args = dict()
3349     if self.mem:
3350       args['memory'] = self.mem
3351     if self.vcpus:
3352       args['vcpus'] = self.vcpus
3353     if self.do_ip or self.do_bridge:
3354       if self.do_ip:
3355         ip = self.ip
3356       else:
3357         ip = self.instance.nics[0].ip
3358       if self.bridge:
3359         bridge = self.bridge
3360       else:
3361         bridge = self.instance.nics[0].bridge
3362       args['nics'] = [(ip, bridge)]
3363     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3364     nl = [self.sstore.GetMasterNode(),
3365           self.instance.primary_node] + list(self.instance.secondary_nodes)
3366     return env, nl, nl
3367
3368   def CheckPrereq(self):
3369     """Check prerequisites.
3370
3371     This only checks the instance list against the existing names.
3372
3373     """
3374     self.mem = getattr(self.op, "mem", None)
3375     self.vcpus = getattr(self.op, "vcpus", None)
3376     self.ip = getattr(self.op, "ip", None)
3377     self.bridge = getattr(self.op, "bridge", None)
3378     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3379       raise errors.OpPrereqError("No changes submitted")
3380     if self.mem is not None:
3381       try:
3382         self.mem = int(self.mem)
3383       except ValueError, err:
3384         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3385     if self.vcpus is not None:
3386       try:
3387         self.vcpus = int(self.vcpus)
3388       except ValueError, err:
3389         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3390     if self.ip is not None:
3391       self.do_ip = True
3392       if self.ip.lower() == "none":
3393         self.ip = None
3394       else:
3395         if not utils.IsValidIP(self.ip):
3396           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3397     else:
3398       self.do_ip = False
3399     self.do_bridge = (self.bridge is not None)
3400
3401     instance = self.cfg.GetInstanceInfo(
3402       self.cfg.ExpandInstanceName(self.op.instance_name))
3403     if instance is None:
3404       raise errors.OpPrereqError("No such instance name '%s'" %
3405                                  self.op.instance_name)
3406     self.op.instance_name = instance.name
3407     self.instance = instance
3408     return
3409
3410   def Exec(self, feedback_fn):
3411     """Modifies an instance.
3412
3413     All parameters take effect only at the next restart of the instance.
3414     """
3415     result = []
3416     instance = self.instance
3417     if self.mem:
3418       instance.memory = self.mem
3419       result.append(("mem", self.mem))
3420     if self.vcpus:
3421       instance.vcpus = self.vcpus
3422       result.append(("vcpus",  self.vcpus))
3423     if self.do_ip:
3424       instance.nics[0].ip = self.ip
3425       result.append(("ip", self.ip))
3426     if self.bridge:
3427       instance.nics[0].bridge = self.bridge
3428       result.append(("bridge", self.bridge))
3429
3430     self.cfg.AddInstance(instance)
3431
3432     return result
3433
3434
3435 class LUQueryExports(NoHooksLU):
3436   """Query the exports list
3437
3438   """
3439   _OP_REQP = []
3440
3441   def CheckPrereq(self):
3442     """Check that the nodelist contains only existing nodes.
3443
3444     """
3445     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3446
3447   def Exec(self, feedback_fn):
3448     """Compute the list of all the exported system images.
3449
3450     Returns:
3451       a dictionary with the structure node->(export-list)
3452       where export-list is a list of the instances exported on
3453       that node.
3454
3455     """
3456     return rpc.call_export_list([node.name for node in self.nodes])
3457
3458
3459 class LUExportInstance(LogicalUnit):
3460   """Export an instance to an image in the cluster.
3461
3462   """
3463   HPATH = "instance-export"
3464   HTYPE = constants.HTYPE_INSTANCE
3465   _OP_REQP = ["instance_name", "target_node", "shutdown"]
3466
3467   def BuildHooksEnv(self):
3468     """Build hooks env.
3469
3470     This will run on the master, primary node and target node.
3471
3472     """
3473     env = {
3474       "EXPORT_NODE": self.op.target_node,
3475       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3476       }
3477     env.update(_BuildInstanceHookEnvByObject(self.instance))
3478     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3479           self.op.target_node]
3480     return env, nl, nl
3481
3482   def CheckPrereq(self):
3483     """Check prerequisites.
3484
3485     This checks that the instance name is a valid one.
3486
3487     """
3488     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3489     self.instance = self.cfg.GetInstanceInfo(instance_name)
3490     if self.instance is None:
3491       raise errors.OpPrereqError("Instance '%s' not found" %
3492                                  self.op.instance_name)
3493
3494     # node verification
3495     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3496     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3497
3498     if self.dst_node is None:
3499       raise errors.OpPrereqError("Destination node '%s' is unknown." %
3500                                  self.op.target_node)
3501     self.op.target_node = self.dst_node.name
3502
3503   def Exec(self, feedback_fn):
3504     """Export an instance to an image in the cluster.
3505
3506     """
3507     instance = self.instance
3508     dst_node = self.dst_node
3509     src_node = instance.primary_node
3510     # shutdown the instance, unless requested not to do so
3511     if self.op.shutdown:
3512       op = opcodes.OpShutdownInstance(instance_name=instance.name)
3513       self.processor.ChainOpCode(op, feedback_fn)
3514
3515     vgname = self.cfg.GetVGName()
3516
3517     snap_disks = []
3518
3519     try:
3520       for disk in instance.disks:
3521         if disk.iv_name == "sda":
3522           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3523           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3524
3525           if not new_dev_name:
3526             logger.Error("could not snapshot block device %s on node %s" %
3527                          (disk.logical_id[1], src_node))
3528           else:
3529             new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3530                                       logical_id=(vgname, new_dev_name),
3531                                       physical_id=(vgname, new_dev_name),
3532                                       iv_name=disk.iv_name)
3533             snap_disks.append(new_dev)
3534
3535     finally:
3536       if self.op.shutdown:
3537         op = opcodes.OpStartupInstance(instance_name=instance.name,
3538                                        force=False)
3539         self.processor.ChainOpCode(op, feedback_fn)
3540
3541     # TODO: check for size
3542
3543     for dev in snap_disks:
3544       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3545                                            instance):
3546         logger.Error("could not export block device %s from node"
3547                      " %s to node %s" %
3548                      (dev.logical_id[1], src_node, dst_node.name))
3549       if not rpc.call_blockdev_remove(src_node, dev):
3550         logger.Error("could not remove snapshot block device %s from"
3551                      " node %s" % (dev.logical_id[1], src_node))
3552
3553     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3554       logger.Error("could not finalize export for instance %s on node %s" %
3555                    (instance.name, dst_node.name))
3556
3557     nodelist = self.cfg.GetNodeList()
3558     nodelist.remove(dst_node.name)
3559
3560     # on one-node clusters nodelist will be empty after the removal
3561     # if we proceed the backup would be removed because OpQueryExports
3562     # substitutes an empty list with the full cluster node list.
3563     if nodelist:
3564       op = opcodes.OpQueryExports(nodes=nodelist)
3565       exportlist = self.processor.ChainOpCode(op, feedback_fn)
3566       for node in exportlist:
3567         if instance.name in exportlist[node]:
3568           if not rpc.call_export_remove(node, instance.name):
3569             logger.Error("could not remove older export for instance %s"
3570                          " on node %s" % (instance.name, node))
3571
3572
3573 class TagsLU(NoHooksLU):
3574   """Generic tags LU.
3575
3576   This is an abstract class which is the parent of all the other tags LUs.
3577
3578   """
3579   def CheckPrereq(self):
3580     """Check prerequisites.
3581
3582     """
3583     if self.op.kind == constants.TAG_CLUSTER:
3584       self.target = self.cfg.GetClusterInfo()
3585     elif self.op.kind == constants.TAG_NODE:
3586       name = self.cfg.ExpandNodeName(self.op.name)
3587       if name is None:
3588         raise errors.OpPrereqError("Invalid node name (%s)" %
3589                                    (self.op.name,))
3590       self.op.name = name
3591       self.target = self.cfg.GetNodeInfo(name)
3592     elif self.op.kind == constants.TAG_INSTANCE:
3593       name = self.cfg.ExpandInstanceName(name)
3594       if name is None:
3595         raise errors.OpPrereqError("Invalid instance name (%s)" %
3596                                    (self.op.name,))
3597       self.op.name = name
3598       self.target = self.cfg.GetInstanceInfo(name)
3599     else:
3600       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3601                                  str(self.op.kind))
3602
3603
3604 class LUGetTags(TagsLU):
3605   """Returns the tags of a given object.
3606
3607   """
3608   _OP_REQP = ["kind", "name"]
3609
3610   def Exec(self, feedback_fn):
3611     """Returns the tag list.
3612
3613     """
3614     return self.target.GetTags()
3615
3616
3617 class LUAddTag(TagsLU):
3618   """Sets a tag on a given object.
3619
3620   """
3621   _OP_REQP = ["kind", "name", "tag"]
3622
3623   def CheckPrereq(self):
3624     """Check prerequisites.
3625
3626     This checks the type and length of the tag name and value.
3627
3628     """
3629     TagsLU.CheckPrereq(self)
3630     objects.TaggableObject.ValidateTag(self.op.tag)
3631
3632   def Exec(self, feedback_fn):
3633     """Sets the tag.
3634
3635     """
3636     try:
3637       self.target.AddTag(self.op.tag)
3638     except errors.TagError, err:
3639       raise errors.OpExecError("Error while setting tag: %s" % str(err))
3640     try:
3641       self.cfg.Update(self.target)
3642     except errors.ConfigurationError:
3643       raise errors.OpRetryError("There has been a modification to the"
3644                                 " config file and the operation has been"
3645                                 " aborted. Please retry.")
3646
3647
3648 class LUDelTag(TagsLU):
3649   """Delete a tag from a given object.
3650
3651   """
3652   _OP_REQP = ["kind", "name", "tag"]
3653
3654   def CheckPrereq(self):
3655     """Check prerequisites.
3656
3657     This checks that we have the given tag.
3658
3659     """
3660     TagsLU.CheckPrereq(self)
3661     objects.TaggableObject.ValidateTag(self.op.tag)
3662     if self.op.tag not in self.target.GetTags():
3663       raise errors.OpPrereqError("Tag not found")
3664
3665   def Exec(self, feedback_fn):
3666     """Remove the tag from the object.
3667
3668     """
3669     self.target.RemoveTag(self.op.tag)
3670     try:
3671       self.cfg.Update(self.target)
3672     except errors.ConfigurationError:
3673       raise errors.OpRetryError("There has been a modification to the"
3674                                 " config file and the operation has been"
3675                                 " aborted. Please retry.")