Abstract more strings values into constants
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import socket
30 import time
31 import tempfile
32 import re
33 import platform
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.processor = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     for attr_name in self._OP_REQP:
78       attr_val = getattr(op, attr_name, None)
79       if attr_val is None:
80         raise errors.OpPrereqError("Required parameter '%s' missing" %
81                                    attr_name)
82     if self.REQ_CLUSTER:
83       if not cfg.IsCluster():
84         raise errors.OpPrereqError("Cluster not initialized yet,"
85                                    " use 'gnt-cluster init' first.")
86       if self.REQ_MASTER:
87         master = sstore.GetMasterNode()
88         if master != utils.HostInfo().name:
89           raise errors.OpPrereqError("Commands must be run on the master"
90                                      " node %s" % master)
91
92   def CheckPrereq(self):
93     """Check prerequisites for this LU.
94
95     This method should check that the prerequisites for the execution
96     of this LU are fulfilled. It can do internode communication, but
97     it should be idempotent - no cluster or system changes are
98     allowed.
99
100     The method should raise errors.OpPrereqError in case something is
101     not fulfilled. Its return value is ignored.
102
103     This method should also update all the parameters of the opcode to
104     their canonical form; e.g. a short node name must be fully
105     expanded after this method has successfully completed (so that
106     hooks, logging, etc. work correctly).
107
108     """
109     raise NotImplementedError
110
111   def Exec(self, feedback_fn):
112     """Execute the LU.
113
114     This method should implement the actual work. It should raise
115     errors.OpExecError for failures that are somewhat dealt with in
116     code, or expected.
117
118     """
119     raise NotImplementedError
120
121   def BuildHooksEnv(self):
122     """Build hooks environment for this LU.
123
124     This method should return a three-node tuple consisting of: a dict
125     containing the environment that will be used for running the
126     specific hook for this LU, a list of node names on which the hook
127     should run before the execution, and a list of node names on which
128     the hook should run after the execution.
129
130     The keys of the dict must not have 'GANETI_' prefixed as this will
131     be handled in the hooks runner. Also note additional keys will be
132     added by the hooks runner. If the LU doesn't define any
133     environment, an empty dict (and not None) should be returned.
134
135     As for the node lists, the master should not be included in the
136     them, as it will be added by the hooks runner in case this LU
137     requires a cluster to run on (otherwise we don't have a node
138     list). No nodes should be returned as an empty list (and not
139     None).
140
141     Note that if the HPATH for a LU class is None, this function will
142     not be called.
143
144     """
145     raise NotImplementedError
146
147
148 class NoHooksLU(LogicalUnit):
149   """Simple LU which runs no hooks.
150
151   This LU is intended as a parent for other LogicalUnits which will
152   run no hooks, in order to reduce duplicate code.
153
154   """
155   HPATH = None
156   HTYPE = None
157
158   def BuildHooksEnv(self):
159     """Build hooks env.
160
161     This is a no-op, since we don't run hooks.
162
163     """
164     return {}, [], []
165
166
167 def _GetWantedNodes(lu, nodes):
168   """Returns list of checked and expanded node names.
169
170   Args:
171     nodes: List of nodes (strings) or None for all
172
173   """
174   if not isinstance(nodes, list):
175     raise errors.OpPrereqError("Invalid argument type 'nodes'")
176
177   if nodes:
178     wanted = []
179
180     for name in nodes:
181       node = lu.cfg.ExpandNodeName(name)
182       if node is None:
183         raise errors.OpPrereqError("No such node name '%s'" % name)
184       wanted.append(node)
185
186   else:
187     wanted = lu.cfg.GetNodeList()
188   return utils.NiceSort(wanted)
189
190
191 def _GetWantedInstances(lu, instances):
192   """Returns list of checked and expanded instance names.
193
194   Args:
195     instances: List of instances (strings) or None for all
196
197   """
198   if not isinstance(instances, list):
199     raise errors.OpPrereqError("Invalid argument type 'instances'")
200
201   if instances:
202     wanted = []
203
204     for name in instances:
205       instance = lu.cfg.ExpandInstanceName(name)
206       if instance is None:
207         raise errors.OpPrereqError("No such instance name '%s'" % name)
208       wanted.append(instance)
209
210   else:
211     wanted = lu.cfg.GetInstanceList()
212   return utils.NiceSort(wanted)
213
214
215 def _CheckOutputFields(static, dynamic, selected):
216   """Checks whether all selected fields are valid.
217
218   Args:
219     static: Static fields
220     dynamic: Dynamic fields
221
222   """
223   static_fields = frozenset(static)
224   dynamic_fields = frozenset(dynamic)
225
226   all_fields = static_fields | dynamic_fields
227
228   if not all_fields.issuperset(selected):
229     raise errors.OpPrereqError("Unknown output fields selected: %s"
230                                % ",".join(frozenset(selected).
231                                           difference(all_fields)))
232
233
234 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235                           memory, vcpus, nics):
236   """Builds instance related env variables for hooks from single variables.
237
238   Args:
239     secondary_nodes: List of secondary nodes as strings
240   """
241   env = {
242     "OP_TARGET": name,
243     "INSTANCE_NAME": name,
244     "INSTANCE_PRIMARY": primary_node,
245     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
246     "INSTANCE_OS_TYPE": os_type,
247     "INSTANCE_STATUS": status,
248     "INSTANCE_MEMORY": memory,
249     "INSTANCE_VCPUS": vcpus,
250   }
251
252   if nics:
253     nic_count = len(nics)
254     for idx, (ip, bridge) in enumerate(nics):
255       if ip is None:
256         ip = ""
257       env["INSTANCE_NIC%d_IP" % idx] = ip
258       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
259   else:
260     nic_count = 0
261
262   env["INSTANCE_NIC_COUNT"] = nic_count
263
264   return env
265
266
267 def _BuildInstanceHookEnvByObject(instance, override=None):
268   """Builds instance related env variables for hooks from an object.
269
270   Args:
271     instance: objects.Instance object of instance
272     override: dict of values to override
273   """
274   args = {
275     'name': instance.name,
276     'primary_node': instance.primary_node,
277     'secondary_nodes': instance.secondary_nodes,
278     'os_type': instance.os,
279     'status': instance.os,
280     'memory': instance.memory,
281     'vcpus': instance.vcpus,
282     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
283   }
284   if override:
285     args.update(override)
286   return _BuildInstanceHookEnv(**args)
287
288
289 def _UpdateEtcHosts(fullnode, ip):
290   """Ensure a node has a correct entry in /etc/hosts.
291
292   Args:
293     fullnode - Fully qualified domain name of host. (str)
294     ip       - IPv4 address of host (str)
295
296   """
297   node = fullnode.split(".", 1)[0]
298
299   f = open('/etc/hosts', 'r+')
300
301   inthere = False
302
303   save_lines = []
304   add_lines = []
305   removed = False
306
307   while True:
308     rawline = f.readline()
309
310     if not rawline:
311       # End of file
312       break
313
314     line = rawline.split('\n')[0]
315
316     # Strip off comments
317     line = line.split('#')[0]
318
319     if not line:
320       # Entire line was comment, skip
321       save_lines.append(rawline)
322       continue
323
324     fields = line.split()
325
326     haveall = True
327     havesome = False
328     for spec in [ ip, fullnode, node ]:
329       if spec not in fields:
330         haveall = False
331       if spec in fields:
332         havesome = True
333
334     if haveall:
335       inthere = True
336       save_lines.append(rawline)
337       continue
338
339     if havesome and not haveall:
340       # Line (old, or manual?) which is missing some.  Remove.
341       removed = True
342       continue
343
344     save_lines.append(rawline)
345
346   if not inthere:
347     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
348
349   if removed:
350     if add_lines:
351       save_lines = save_lines + add_lines
352
353     # We removed a line, write a new file and replace old.
354     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
355     newfile = os.fdopen(fd, 'w')
356     newfile.write(''.join(save_lines))
357     newfile.close()
358     os.rename(tmpname, '/etc/hosts')
359
360   elif add_lines:
361     # Simply appending a new line will do the trick.
362     f.seek(0, 2)
363     for add in add_lines:
364       f.write(add)
365
366   f.close()
367
368
369 def _UpdateKnownHosts(fullnode, ip, pubkey):
370   """Ensure a node has a correct known_hosts entry.
371
372   Args:
373     fullnode - Fully qualified domain name of host. (str)
374     ip       - IPv4 address of host (str)
375     pubkey   - the public key of the cluster
376
377   """
378   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
379     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
380   else:
381     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
382
383   inthere = False
384
385   save_lines = []
386   add_lines = []
387   removed = False
388
389   while True:
390     rawline = f.readline()
391     logger.Debug('read %s' % (repr(rawline),))
392
393     if not rawline:
394       # End of file
395       break
396
397     line = rawline.split('\n')[0]
398
399     parts = line.split(' ')
400     fields = parts[0].split(',')
401     key = parts[2]
402
403     haveall = True
404     havesome = False
405     for spec in [ ip, fullnode ]:
406       if spec not in fields:
407         haveall = False
408       if spec in fields:
409         havesome = True
410
411     logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
412     if haveall and key == pubkey:
413       inthere = True
414       save_lines.append(rawline)
415       logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
416       continue
417
418     if havesome and (not haveall or key != pubkey):
419       removed = True
420       logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
421       continue
422
423     save_lines.append(rawline)
424
425   if not inthere:
426     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
427     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
428
429   if removed:
430     save_lines = save_lines + add_lines
431
432     # Write a new file and replace old.
433     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
434                                    constants.DATA_DIR)
435     newfile = os.fdopen(fd, 'w')
436     try:
437       newfile.write(''.join(save_lines))
438     finally:
439       newfile.close()
440     logger.Debug("Wrote new known_hosts.")
441     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
442
443   elif add_lines:
444     # Simply appending a new line will do the trick.
445     f.seek(0, 2)
446     for add in add_lines:
447       f.write(add)
448
449   f.close()
450
451
452 def _HasValidVG(vglist, vgname):
453   """Checks if the volume group list is valid.
454
455   A non-None return value means there's an error, and the return value
456   is the error message.
457
458   """
459   vgsize = vglist.get(vgname, None)
460   if vgsize is None:
461     return "volume group '%s' missing" % vgname
462   elif vgsize < 20480:
463     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
464             (vgname, vgsize))
465   return None
466
467
468 def _InitSSHSetup(node):
469   """Setup the SSH configuration for the cluster.
470
471
472   This generates a dsa keypair for root, adds the pub key to the
473   permitted hosts and adds the hostkey to its own known hosts.
474
475   Args:
476     node: the name of this host as a fqdn
477
478   """
479   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
480
481   for name in priv_key, pub_key:
482     if os.path.exists(name):
483       utils.CreateBackup(name)
484     utils.RemoveFile(name)
485
486   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487                          "-f", priv_key,
488                          "-q", "-N", ""])
489   if result.failed:
490     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
491                              result.output)
492
493   f = open(pub_key, 'r')
494   try:
495     utils.AddAuthorizedKey(auth_keys, f.read(8192))
496   finally:
497     f.close()
498
499
500 def _InitGanetiServerSetup(ss):
501   """Setup the necessary configuration for the initial node daemon.
502
503   This creates the nodepass file containing the shared password for
504   the cluster and also generates the SSL certificate.
505
506   """
507   # Create pseudo random password
508   randpass = sha.new(os.urandom(64)).hexdigest()
509   # and write it into sstore
510   ss.SetKey(ss.SS_NODED_PASS, randpass)
511
512   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513                          "-days", str(365*5), "-nodes", "-x509",
514                          "-keyout", constants.SSL_CERT_FILE,
515                          "-out", constants.SSL_CERT_FILE, "-batch"])
516   if result.failed:
517     raise errors.OpExecError("could not generate server ssl cert, command"
518                              " %s had exitcode %s and error message %s" %
519                              (result.cmd, result.exit_code, result.output))
520
521   os.chmod(constants.SSL_CERT_FILE, 0400)
522
523   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
524
525   if result.failed:
526     raise errors.OpExecError("Could not start the node daemon, command %s"
527                              " had exitcode %s and error %s" %
528                              (result.cmd, result.exit_code, result.output))
529
530
531 def _CheckInstanceBridgesExist(instance):
532   """Check that the brigdes needed by an instance exist.
533
534   """
535   # check bridges existance
536   brlist = [nic.bridge for nic in instance.nics]
537   if not rpc.call_bridges_exist(instance.primary_node, brlist):
538     raise errors.OpPrereqError("one or more target bridges %s does not"
539                                " exist on destination node '%s'" %
540                                (brlist, instance.primary_node))
541
542
543 class LUInitCluster(LogicalUnit):
544   """Initialise the cluster.
545
546   """
547   HPATH = "cluster-init"
548   HTYPE = constants.HTYPE_CLUSTER
549   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
550               "def_bridge", "master_netdev"]
551   REQ_CLUSTER = False
552
553   def BuildHooksEnv(self):
554     """Build hooks env.
555
556     Notes: Since we don't require a cluster, we must manually add
557     ourselves in the post-run node list.
558
559     """
560     env = {"OP_TARGET": self.op.cluster_name}
561     return env, [], [self.hostname.name]
562
563   def CheckPrereq(self):
564     """Verify that the passed name is a valid one.
565
566     """
567     if config.ConfigWriter.IsCluster():
568       raise errors.OpPrereqError("Cluster is already initialised")
569
570     self.hostname = hostname = utils.HostInfo()
571
572     if hostname.ip.startswith("127."):
573       raise errors.OpPrereqError("This host's IP resolves to the private"
574                                  " range (%s). Please fix DNS or /etc/hosts." %
575                                  (hostname.ip,))
576
577     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
578
579     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
580                          constants.DEFAULT_NODED_PORT):
581       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
582                                  " to %s,\nbut this ip address does not"
583                                  " belong to this host."
584                                  " Aborting." % hostname.ip)
585
586     secondary_ip = getattr(self.op, "secondary_ip", None)
587     if secondary_ip and not utils.IsValidIP(secondary_ip):
588       raise errors.OpPrereqError("Invalid secondary ip given")
589     if (secondary_ip and
590         secondary_ip != hostname.ip and
591         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
592                            constants.DEFAULT_NODED_PORT))):
593       raise errors.OpPrereqError("You gave %s as secondary IP,\n"
594                                  "but it does not belong to this host." %
595                                  secondary_ip)
596     self.secondary_ip = secondary_ip
597
598     # checks presence of the volume group given
599     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
600
601     if vgstatus:
602       raise errors.OpPrereqError("Error: %s" % vgstatus)
603
604     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
605                     self.op.mac_prefix):
606       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
607                                  self.op.mac_prefix)
608
609     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
610       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
611                                  self.op.hypervisor_type)
612
613     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
614     if result.failed:
615       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
616                                  (self.op.master_netdev,
617                                   result.output.strip()))
618
619   def Exec(self, feedback_fn):
620     """Initialize the cluster.
621
622     """
623     clustername = self.clustername
624     hostname = self.hostname
625
626     # set up the simple store
627     self.sstore = ss = ssconf.SimpleStore()
628     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
629     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
630     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
631     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
632     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
633
634     # set up the inter-node password and certificate
635     _InitGanetiServerSetup(ss)
636
637     # start the master ip
638     rpc.call_node_start_master(hostname.name)
639
640     # set up ssh config and /etc/hosts
641     f = open(constants.SSH_HOST_RSA_PUB, 'r')
642     try:
643       sshline = f.read()
644     finally:
645       f.close()
646     sshkey = sshline.split(" ")[1]
647
648     _UpdateEtcHosts(hostname.name, hostname.ip)
649
650     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
651
652     _InitSSHSetup(hostname.name)
653
654     # init of cluster config file
655     self.cfg = cfgw = config.ConfigWriter()
656     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
657                     sshkey, self.op.mac_prefix,
658                     self.op.vg_name, self.op.def_bridge)
659
660
661 class LUDestroyCluster(NoHooksLU):
662   """Logical unit for destroying the cluster.
663
664   """
665   _OP_REQP = []
666
667   def CheckPrereq(self):
668     """Check prerequisites.
669
670     This checks whether the cluster is empty.
671
672     Any errors are signalled by raising errors.OpPrereqError.
673
674     """
675     master = self.sstore.GetMasterNode()
676
677     nodelist = self.cfg.GetNodeList()
678     if len(nodelist) != 1 or nodelist[0] != master:
679       raise errors.OpPrereqError("There are still %d node(s) in"
680                                  " this cluster." % (len(nodelist) - 1))
681     instancelist = self.cfg.GetInstanceList()
682     if instancelist:
683       raise errors.OpPrereqError("There are still %d instance(s) in"
684                                  " this cluster." % len(instancelist))
685
686   def Exec(self, feedback_fn):
687     """Destroys the cluster.
688
689     """
690     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
691     utils.CreateBackup(priv_key)
692     utils.CreateBackup(pub_key)
693     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
694
695
696 class LUVerifyCluster(NoHooksLU):
697   """Verifies the cluster status.
698
699   """
700   _OP_REQP = []
701
702   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
703                   remote_version, feedback_fn):
704     """Run multiple tests against a node.
705
706     Test list:
707       - compares ganeti version
708       - checks vg existance and size > 20G
709       - checks config file checksum
710       - checks ssh to other nodes
711
712     Args:
713       node: name of the node to check
714       file_list: required list of files
715       local_cksum: dictionary of local files and their checksums
716
717     """
718     # compares ganeti version
719     local_version = constants.PROTOCOL_VERSION
720     if not remote_version:
721       feedback_fn(" - ERROR: connection to %s failed" % (node))
722       return True
723
724     if local_version != remote_version:
725       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
726                       (local_version, node, remote_version))
727       return True
728
729     # checks vg existance and size > 20G
730
731     bad = False
732     if not vglist:
733       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
734                       (node,))
735       bad = True
736     else:
737       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
738       if vgstatus:
739         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
740         bad = True
741
742     # checks config file checksum
743     # checks ssh to any
744
745     if 'filelist' not in node_result:
746       bad = True
747       feedback_fn("  - ERROR: node hasn't returned file checksum data")
748     else:
749       remote_cksum = node_result['filelist']
750       for file_name in file_list:
751         if file_name not in remote_cksum:
752           bad = True
753           feedback_fn("  - ERROR: file '%s' missing" % file_name)
754         elif remote_cksum[file_name] != local_cksum[file_name]:
755           bad = True
756           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
757
758     if 'nodelist' not in node_result:
759       bad = True
760       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
761     else:
762       if node_result['nodelist']:
763         bad = True
764         for node in node_result['nodelist']:
765           feedback_fn("  - ERROR: communication with node '%s': %s" %
766                           (node, node_result['nodelist'][node]))
767     hyp_result = node_result.get('hypervisor', None)
768     if hyp_result is not None:
769       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
770     return bad
771
772   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
773     """Verify an instance.
774
775     This function checks to see if the required block devices are
776     available on the instance's node.
777
778     """
779     bad = False
780
781     instancelist = self.cfg.GetInstanceList()
782     if not instance in instancelist:
783       feedback_fn("  - ERROR: instance %s not in instance list %s" %
784                       (instance, instancelist))
785       bad = True
786
787     instanceconfig = self.cfg.GetInstanceInfo(instance)
788     node_current = instanceconfig.primary_node
789
790     node_vol_should = {}
791     instanceconfig.MapLVsByNode(node_vol_should)
792
793     for node in node_vol_should:
794       for volume in node_vol_should[node]:
795         if node not in node_vol_is or volume not in node_vol_is[node]:
796           feedback_fn("  - ERROR: volume %s missing on node %s" %
797                           (volume, node))
798           bad = True
799
800     if not instanceconfig.status == 'down':
801       if not instance in node_instance[node_current]:
802         feedback_fn("  - ERROR: instance %s not running on node %s" %
803                         (instance, node_current))
804         bad = True
805
806     for node in node_instance:
807       if (not node == node_current):
808         if instance in node_instance[node]:
809           feedback_fn("  - ERROR: instance %s should not run on node %s" %
810                           (instance, node))
811           bad = True
812
813     return bad
814
815   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
816     """Verify if there are any unknown volumes in the cluster.
817
818     The .os, .swap and backup volumes are ignored. All other volumes are
819     reported as unknown.
820
821     """
822     bad = False
823
824     for node in node_vol_is:
825       for volume in node_vol_is[node]:
826         if node not in node_vol_should or volume not in node_vol_should[node]:
827           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
828                       (volume, node))
829           bad = True
830     return bad
831
832   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
833     """Verify the list of running instances.
834
835     This checks what instances are running but unknown to the cluster.
836
837     """
838     bad = False
839     for node in node_instance:
840       for runninginstance in node_instance[node]:
841         if runninginstance not in instancelist:
842           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
843                           (runninginstance, node))
844           bad = True
845     return bad
846
847   def CheckPrereq(self):
848     """Check prerequisites.
849
850     This has no prerequisites.
851
852     """
853     pass
854
855   def Exec(self, feedback_fn):
856     """Verify integrity of cluster, performing various test on nodes.
857
858     """
859     bad = False
860     feedback_fn("* Verifying global settings")
861     self.cfg.VerifyConfig()
862
863     master = self.sstore.GetMasterNode()
864     vg_name = self.cfg.GetVGName()
865     nodelist = utils.NiceSort(self.cfg.GetNodeList())
866     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
867     node_volume = {}
868     node_instance = {}
869
870     # FIXME: verify OS list
871     # do local checksums
872     file_names = list(self.sstore.GetFileList())
873     file_names.append(constants.SSL_CERT_FILE)
874     file_names.append(constants.CLUSTER_CONF_FILE)
875     local_checksums = utils.FingerprintFiles(file_names)
876
877     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
878     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
879     all_instanceinfo = rpc.call_instance_list(nodelist)
880     all_vglist = rpc.call_vg_list(nodelist)
881     node_verify_param = {
882       'filelist': file_names,
883       'nodelist': nodelist,
884       'hypervisor': None,
885       }
886     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
887     all_rversion = rpc.call_version(nodelist)
888
889     for node in nodelist:
890       feedback_fn("* Verifying node %s" % node)
891       result = self._VerifyNode(node, file_names, local_checksums,
892                                 all_vglist[node], all_nvinfo[node],
893                                 all_rversion[node], feedback_fn)
894       bad = bad or result
895
896       # node_volume
897       volumeinfo = all_volumeinfo[node]
898
899       if type(volumeinfo) != dict:
900         feedback_fn("  - ERROR: connection to %s failed" % (node,))
901         bad = True
902         continue
903
904       node_volume[node] = volumeinfo
905
906       # node_instance
907       nodeinstance = all_instanceinfo[node]
908       if type(nodeinstance) != list:
909         feedback_fn("  - ERROR: connection to %s failed" % (node,))
910         bad = True
911         continue
912
913       node_instance[node] = nodeinstance
914
915     node_vol_should = {}
916
917     for instance in instancelist:
918       feedback_fn("* Verifying instance %s" % instance)
919       result =  self._VerifyInstance(instance, node_volume, node_instance,
920                                      feedback_fn)
921       bad = bad or result
922
923       inst_config = self.cfg.GetInstanceInfo(instance)
924
925       inst_config.MapLVsByNode(node_vol_should)
926
927     feedback_fn("* Verifying orphan volumes")
928     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
929                                        feedback_fn)
930     bad = bad or result
931
932     feedback_fn("* Verifying remaining instances")
933     result = self._VerifyOrphanInstances(instancelist, node_instance,
934                                          feedback_fn)
935     bad = bad or result
936
937     return int(bad)
938
939
940 class LURenameCluster(LogicalUnit):
941   """Rename the cluster.
942
943   """
944   HPATH = "cluster-rename"
945   HTYPE = constants.HTYPE_CLUSTER
946   _OP_REQP = ["name"]
947
948   def BuildHooksEnv(self):
949     """Build hooks env.
950
951     """
952     env = {
953       "OP_TARGET": self.op.sstore.GetClusterName(),
954       "NEW_NAME": self.op.name,
955       }
956     mn = self.sstore.GetMasterNode()
957     return env, [mn], [mn]
958
959   def CheckPrereq(self):
960     """Verify that the passed name is a valid one.
961
962     """
963     hostname = utils.HostInfo(self.op.name)
964
965     new_name = hostname.name
966     self.ip = new_ip = hostname.ip
967     old_name = self.sstore.GetClusterName()
968     old_ip = self.sstore.GetMasterIP()
969     if new_name == old_name and new_ip == old_ip:
970       raise errors.OpPrereqError("Neither the name nor the IP address of the"
971                                  " cluster has changed")
972     if new_ip != old_ip:
973       result = utils.RunCmd(["fping", "-q", new_ip])
974       if not result.failed:
975         raise errors.OpPrereqError("The given cluster IP address (%s) is"
976                                    " reachable on the network. Aborting." %
977                                    new_ip)
978
979     self.op.name = new_name
980
981   def Exec(self, feedback_fn):
982     """Rename the cluster.
983
984     """
985     clustername = self.op.name
986     ip = self.ip
987     ss = self.sstore
988
989     # shutdown the master IP
990     master = ss.GetMasterNode()
991     if not rpc.call_node_stop_master(master):
992       raise errors.OpExecError("Could not disable the master role")
993
994     try:
995       # modify the sstore
996       ss.SetKey(ss.SS_MASTER_IP, ip)
997       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
998
999       # Distribute updated ss config to all nodes
1000       myself = self.cfg.GetNodeInfo(master)
1001       dist_nodes = self.cfg.GetNodeList()
1002       if myself.name in dist_nodes:
1003         dist_nodes.remove(myself.name)
1004
1005       logger.Debug("Copying updated ssconf data to all nodes")
1006       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1007         fname = ss.KeyToFilename(keyname)
1008         result = rpc.call_upload_file(dist_nodes, fname)
1009         for to_node in dist_nodes:
1010           if not result[to_node]:
1011             logger.Error("copy of file %s to node %s failed" %
1012                          (fname, to_node))
1013     finally:
1014       if not rpc.call_node_start_master(master):
1015         logger.Error("Could not re-enable the master role on the master,\n"
1016                      "please restart manually.")
1017
1018
1019 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1020   """Sleep and poll for an instance's disk to sync.
1021
1022   """
1023   if not instance.disks:
1024     return True
1025
1026   if not oneshot:
1027     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1028
1029   node = instance.primary_node
1030
1031   for dev in instance.disks:
1032     cfgw.SetDiskID(dev, node)
1033
1034   retries = 0
1035   while True:
1036     max_time = 0
1037     done = True
1038     cumul_degraded = False
1039     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1040     if not rstats:
1041       logger.ToStderr("Can't get any data from node %s" % node)
1042       retries += 1
1043       if retries >= 10:
1044         raise errors.RemoteError("Can't contact node %s for mirror data,"
1045                                  " aborting." % node)
1046       time.sleep(6)
1047       continue
1048     retries = 0
1049     for i in range(len(rstats)):
1050       mstat = rstats[i]
1051       if mstat is None:
1052         logger.ToStderr("Can't compute data for node %s/%s" %
1053                         (node, instance.disks[i].iv_name))
1054         continue
1055       perc_done, est_time, is_degraded = mstat
1056       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1057       if perc_done is not None:
1058         done = False
1059         if est_time is not None:
1060           rem_time = "%d estimated seconds remaining" % est_time
1061           max_time = est_time
1062         else:
1063           rem_time = "no time estimate"
1064         logger.ToStdout("- device %s: %5.2f%% done, %s" %
1065                         (instance.disks[i].iv_name, perc_done, rem_time))
1066     if done or oneshot:
1067       break
1068
1069     if unlock:
1070       utils.Unlock('cmd')
1071     try:
1072       time.sleep(min(60, max_time))
1073     finally:
1074       if unlock:
1075         utils.Lock('cmd')
1076
1077   if done:
1078     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1079   return not cumul_degraded
1080
1081
1082 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1083   """Check that mirrors are not degraded.
1084
1085   """
1086   cfgw.SetDiskID(dev, node)
1087
1088   result = True
1089   if on_primary or dev.AssembleOnSecondary():
1090     rstats = rpc.call_blockdev_find(node, dev)
1091     if not rstats:
1092       logger.ToStderr("Can't get any data from node %s" % node)
1093       result = False
1094     else:
1095       result = result and (not rstats[5])
1096   if dev.children:
1097     for child in dev.children:
1098       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1099
1100   return result
1101
1102
1103 class LUDiagnoseOS(NoHooksLU):
1104   """Logical unit for OS diagnose/query.
1105
1106   """
1107   _OP_REQP = []
1108
1109   def CheckPrereq(self):
1110     """Check prerequisites.
1111
1112     This always succeeds, since this is a pure query LU.
1113
1114     """
1115     return
1116
1117   def Exec(self, feedback_fn):
1118     """Compute the list of OSes.
1119
1120     """
1121     node_list = self.cfg.GetNodeList()
1122     node_data = rpc.call_os_diagnose(node_list)
1123     if node_data == False:
1124       raise errors.OpExecError("Can't gather the list of OSes")
1125     return node_data
1126
1127
1128 class LURemoveNode(LogicalUnit):
1129   """Logical unit for removing a node.
1130
1131   """
1132   HPATH = "node-remove"
1133   HTYPE = constants.HTYPE_NODE
1134   _OP_REQP = ["node_name"]
1135
1136   def BuildHooksEnv(self):
1137     """Build hooks env.
1138
1139     This doesn't run on the target node in the pre phase as a failed
1140     node would not allows itself to run.
1141
1142     """
1143     env = {
1144       "OP_TARGET": self.op.node_name,
1145       "NODE_NAME": self.op.node_name,
1146       }
1147     all_nodes = self.cfg.GetNodeList()
1148     all_nodes.remove(self.op.node_name)
1149     return env, all_nodes, all_nodes
1150
1151   def CheckPrereq(self):
1152     """Check prerequisites.
1153
1154     This checks:
1155      - the node exists in the configuration
1156      - it does not have primary or secondary instances
1157      - it's not the master
1158
1159     Any errors are signalled by raising errors.OpPrereqError.
1160
1161     """
1162     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1163     if node is None:
1164       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1165
1166     instance_list = self.cfg.GetInstanceList()
1167
1168     masternode = self.sstore.GetMasterNode()
1169     if node.name == masternode:
1170       raise errors.OpPrereqError("Node is the master node,"
1171                                  " you need to failover first.")
1172
1173     for instance_name in instance_list:
1174       instance = self.cfg.GetInstanceInfo(instance_name)
1175       if node.name == instance.primary_node:
1176         raise errors.OpPrereqError("Instance %s still running on the node,"
1177                                    " please remove first." % instance_name)
1178       if node.name in instance.secondary_nodes:
1179         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1180                                    " please remove first." % instance_name)
1181     self.op.node_name = node.name
1182     self.node = node
1183
1184   def Exec(self, feedback_fn):
1185     """Removes the node from the cluster.
1186
1187     """
1188     node = self.node
1189     logger.Info("stopping the node daemon and removing configs from node %s" %
1190                 node.name)
1191
1192     rpc.call_node_leave_cluster(node.name)
1193
1194     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1195
1196     logger.Info("Removing node %s from config" % node.name)
1197
1198     self.cfg.RemoveNode(node.name)
1199
1200
1201 class LUQueryNodes(NoHooksLU):
1202   """Logical unit for querying nodes.
1203
1204   """
1205   _OP_REQP = ["output_fields", "names"]
1206
1207   def CheckPrereq(self):
1208     """Check prerequisites.
1209
1210     This checks that the fields required are valid output fields.
1211
1212     """
1213     self.dynamic_fields = frozenset(["dtotal", "dfree",
1214                                      "mtotal", "mnode", "mfree",
1215                                      "bootid"])
1216
1217     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1218                                "pinst_list", "sinst_list",
1219                                "pip", "sip"],
1220                        dynamic=self.dynamic_fields,
1221                        selected=self.op.output_fields)
1222
1223     self.wanted = _GetWantedNodes(self, self.op.names)
1224
1225   def Exec(self, feedback_fn):
1226     """Computes the list of nodes and their attributes.
1227
1228     """
1229     nodenames = self.wanted
1230     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1231
1232     # begin data gathering
1233
1234     if self.dynamic_fields.intersection(self.op.output_fields):
1235       live_data = {}
1236       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1237       for name in nodenames:
1238         nodeinfo = node_data.get(name, None)
1239         if nodeinfo:
1240           live_data[name] = {
1241             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1242             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1243             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1244             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1245             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1246             "bootid": nodeinfo['bootid'],
1247             }
1248         else:
1249           live_data[name] = {}
1250     else:
1251       live_data = dict.fromkeys(nodenames, {})
1252
1253     node_to_primary = dict([(name, set()) for name in nodenames])
1254     node_to_secondary = dict([(name, set()) for name in nodenames])
1255
1256     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1257                              "sinst_cnt", "sinst_list"))
1258     if inst_fields & frozenset(self.op.output_fields):
1259       instancelist = self.cfg.GetInstanceList()
1260
1261       for instance_name in instancelist:
1262         inst = self.cfg.GetInstanceInfo(instance_name)
1263         if inst.primary_node in node_to_primary:
1264           node_to_primary[inst.primary_node].add(inst.name)
1265         for secnode in inst.secondary_nodes:
1266           if secnode in node_to_secondary:
1267             node_to_secondary[secnode].add(inst.name)
1268
1269     # end data gathering
1270
1271     output = []
1272     for node in nodelist:
1273       node_output = []
1274       for field in self.op.output_fields:
1275         if field == "name":
1276           val = node.name
1277         elif field == "pinst_list":
1278           val = list(node_to_primary[node.name])
1279         elif field == "sinst_list":
1280           val = list(node_to_secondary[node.name])
1281         elif field == "pinst_cnt":
1282           val = len(node_to_primary[node.name])
1283         elif field == "sinst_cnt":
1284           val = len(node_to_secondary[node.name])
1285         elif field == "pip":
1286           val = node.primary_ip
1287         elif field == "sip":
1288           val = node.secondary_ip
1289         elif field in self.dynamic_fields:
1290           val = live_data[node.name].get(field, None)
1291         else:
1292           raise errors.ParameterError(field)
1293         node_output.append(val)
1294       output.append(node_output)
1295
1296     return output
1297
1298
1299 class LUQueryNodeVolumes(NoHooksLU):
1300   """Logical unit for getting volumes on node(s).
1301
1302   """
1303   _OP_REQP = ["nodes", "output_fields"]
1304
1305   def CheckPrereq(self):
1306     """Check prerequisites.
1307
1308     This checks that the fields required are valid output fields.
1309
1310     """
1311     self.nodes = _GetWantedNodes(self, self.op.nodes)
1312
1313     _CheckOutputFields(static=["node"],
1314                        dynamic=["phys", "vg", "name", "size", "instance"],
1315                        selected=self.op.output_fields)
1316
1317
1318   def Exec(self, feedback_fn):
1319     """Computes the list of nodes and their attributes.
1320
1321     """
1322     nodenames = self.nodes
1323     volumes = rpc.call_node_volumes(nodenames)
1324
1325     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1326              in self.cfg.GetInstanceList()]
1327
1328     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1329
1330     output = []
1331     for node in nodenames:
1332       if node not in volumes or not volumes[node]:
1333         continue
1334
1335       node_vols = volumes[node][:]
1336       node_vols.sort(key=lambda vol: vol['dev'])
1337
1338       for vol in node_vols:
1339         node_output = []
1340         for field in self.op.output_fields:
1341           if field == "node":
1342             val = node
1343           elif field == "phys":
1344             val = vol['dev']
1345           elif field == "vg":
1346             val = vol['vg']
1347           elif field == "name":
1348             val = vol['name']
1349           elif field == "size":
1350             val = int(float(vol['size']))
1351           elif field == "instance":
1352             for inst in ilist:
1353               if node not in lv_by_node[inst]:
1354                 continue
1355               if vol['name'] in lv_by_node[inst][node]:
1356                 val = inst.name
1357                 break
1358             else:
1359               val = '-'
1360           else:
1361             raise errors.ParameterError(field)
1362           node_output.append(str(val))
1363
1364         output.append(node_output)
1365
1366     return output
1367
1368
1369 class LUAddNode(LogicalUnit):
1370   """Logical unit for adding node to the cluster.
1371
1372   """
1373   HPATH = "node-add"
1374   HTYPE = constants.HTYPE_NODE
1375   _OP_REQP = ["node_name"]
1376
1377   def BuildHooksEnv(self):
1378     """Build hooks env.
1379
1380     This will run on all nodes before, and on all nodes + the new node after.
1381
1382     """
1383     env = {
1384       "OP_TARGET": self.op.node_name,
1385       "NODE_NAME": self.op.node_name,
1386       "NODE_PIP": self.op.primary_ip,
1387       "NODE_SIP": self.op.secondary_ip,
1388       }
1389     nodes_0 = self.cfg.GetNodeList()
1390     nodes_1 = nodes_0 + [self.op.node_name, ]
1391     return env, nodes_0, nodes_1
1392
1393   def CheckPrereq(self):
1394     """Check prerequisites.
1395
1396     This checks:
1397      - the new node is not already in the config
1398      - it is resolvable
1399      - its parameters (single/dual homed) matches the cluster
1400
1401     Any errors are signalled by raising errors.OpPrereqError.
1402
1403     """
1404     node_name = self.op.node_name
1405     cfg = self.cfg
1406
1407     dns_data = utils.HostInfo(node_name)
1408
1409     node = dns_data.name
1410     primary_ip = self.op.primary_ip = dns_data.ip
1411     secondary_ip = getattr(self.op, "secondary_ip", None)
1412     if secondary_ip is None:
1413       secondary_ip = primary_ip
1414     if not utils.IsValidIP(secondary_ip):
1415       raise errors.OpPrereqError("Invalid secondary IP given")
1416     self.op.secondary_ip = secondary_ip
1417     node_list = cfg.GetNodeList()
1418     if node in node_list:
1419       raise errors.OpPrereqError("Node %s is already in the configuration"
1420                                  % node)
1421
1422     for existing_node_name in node_list:
1423       existing_node = cfg.GetNodeInfo(existing_node_name)
1424       if (existing_node.primary_ip == primary_ip or
1425           existing_node.secondary_ip == primary_ip or
1426           existing_node.primary_ip == secondary_ip or
1427           existing_node.secondary_ip == secondary_ip):
1428         raise errors.OpPrereqError("New node ip address(es) conflict with"
1429                                    " existing node %s" % existing_node.name)
1430
1431     # check that the type of the node (single versus dual homed) is the
1432     # same as for the master
1433     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1434     master_singlehomed = myself.secondary_ip == myself.primary_ip
1435     newbie_singlehomed = secondary_ip == primary_ip
1436     if master_singlehomed != newbie_singlehomed:
1437       if master_singlehomed:
1438         raise errors.OpPrereqError("The master has no private ip but the"
1439                                    " new node has one")
1440       else:
1441         raise errors.OpPrereqError("The master has a private ip but the"
1442                                    " new node doesn't have one")
1443
1444     # checks reachablity
1445     if not utils.TcpPing(utils.HostInfo().name,
1446                          primary_ip,
1447                          constants.DEFAULT_NODED_PORT):
1448       raise errors.OpPrereqError("Node not reachable by ping")
1449
1450     if not newbie_singlehomed:
1451       # check reachability from my secondary ip to newbie's secondary ip
1452       if not utils.TcpPing(myself.secondary_ip,
1453                            secondary_ip,
1454                            constants.DEFAULT_NODED_PORT):
1455         raise errors.OpPrereqError(
1456           "Node secondary ip not reachable by TCP based ping to noded port")
1457
1458     self.new_node = objects.Node(name=node,
1459                                  primary_ip=primary_ip,
1460                                  secondary_ip=secondary_ip)
1461
1462   def Exec(self, feedback_fn):
1463     """Adds the new node to the cluster.
1464
1465     """
1466     new_node = self.new_node
1467     node = new_node.name
1468
1469     # set up inter-node password and certificate and restarts the node daemon
1470     gntpass = self.sstore.GetNodeDaemonPassword()
1471     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1472       raise errors.OpExecError("ganeti password corruption detected")
1473     f = open(constants.SSL_CERT_FILE)
1474     try:
1475       gntpem = f.read(8192)
1476     finally:
1477       f.close()
1478     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1479     # so we use this to detect an invalid certificate; as long as the
1480     # cert doesn't contain this, the here-document will be correctly
1481     # parsed by the shell sequence below
1482     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1483       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1484     if not gntpem.endswith("\n"):
1485       raise errors.OpExecError("PEM must end with newline")
1486     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1487
1488     # and then connect with ssh to set password and start ganeti-noded
1489     # note that all the below variables are sanitized at this point,
1490     # either by being constants or by the checks above
1491     ss = self.sstore
1492     mycommand = ("umask 077 && "
1493                  "echo '%s' > '%s' && "
1494                  "cat > '%s' << '!EOF.' && \n"
1495                  "%s!EOF.\n%s restart" %
1496                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1497                   constants.SSL_CERT_FILE, gntpem,
1498                   constants.NODE_INITD_SCRIPT))
1499
1500     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1501     if result.failed:
1502       raise errors.OpExecError("Remote command on node %s, error: %s,"
1503                                " output: %s" %
1504                                (node, result.fail_reason, result.output))
1505
1506     # check connectivity
1507     time.sleep(4)
1508
1509     result = rpc.call_version([node])[node]
1510     if result:
1511       if constants.PROTOCOL_VERSION == result:
1512         logger.Info("communication to node %s fine, sw version %s match" %
1513                     (node, result))
1514       else:
1515         raise errors.OpExecError("Version mismatch master version %s,"
1516                                  " node version %s" %
1517                                  (constants.PROTOCOL_VERSION, result))
1518     else:
1519       raise errors.OpExecError("Cannot get version from the new node")
1520
1521     # setup ssh on node
1522     logger.Info("copy ssh key to node %s" % node)
1523     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1524     keyarray = []
1525     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1526                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1527                 priv_key, pub_key]
1528
1529     for i in keyfiles:
1530       f = open(i, 'r')
1531       try:
1532         keyarray.append(f.read())
1533       finally:
1534         f.close()
1535
1536     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1537                                keyarray[3], keyarray[4], keyarray[5])
1538
1539     if not result:
1540       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1541
1542     # Add node to our /etc/hosts, and add key to known_hosts
1543     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1544     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1545                       self.cfg.GetHostKey())
1546
1547     if new_node.secondary_ip != new_node.primary_ip:
1548       if not rpc.call_node_tcp_ping(new_node.name,
1549                                     constants.LOCALHOST_IP_ADDRESS,
1550                                     new_node.secondary_ip,
1551                                     constants.DEFAULT_NODED_PORT,
1552                                     10, False):
1553         raise errors.OpExecError("Node claims it doesn't have the"
1554                                  " secondary ip you gave (%s).\n"
1555                                  "Please fix and re-run this command." %
1556                                  new_node.secondary_ip)
1557
1558     success, msg = ssh.VerifyNodeHostname(node)
1559     if not success:
1560       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1561                                " than the one the resolver gives: %s.\n"
1562                                "Please fix and re-run this command." %
1563                                (node, msg))
1564
1565     # Distribute updated /etc/hosts and known_hosts to all nodes,
1566     # including the node just added
1567     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1568     dist_nodes = self.cfg.GetNodeList() + [node]
1569     if myself.name in dist_nodes:
1570       dist_nodes.remove(myself.name)
1571
1572     logger.Debug("Copying hosts and known_hosts to all nodes")
1573     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1574       result = rpc.call_upload_file(dist_nodes, fname)
1575       for to_node in dist_nodes:
1576         if not result[to_node]:
1577           logger.Error("copy of file %s to node %s failed" %
1578                        (fname, to_node))
1579
1580     to_copy = ss.GetFileList()
1581     for fname in to_copy:
1582       if not ssh.CopyFileToNode(node, fname):
1583         logger.Error("could not copy file %s to node %s" % (fname, node))
1584
1585     logger.Info("adding node %s to cluster.conf" % node)
1586     self.cfg.AddNode(new_node)
1587
1588
1589 class LUMasterFailover(LogicalUnit):
1590   """Failover the master node to the current node.
1591
1592   This is a special LU in that it must run on a non-master node.
1593
1594   """
1595   HPATH = "master-failover"
1596   HTYPE = constants.HTYPE_CLUSTER
1597   REQ_MASTER = False
1598   _OP_REQP = []
1599
1600   def BuildHooksEnv(self):
1601     """Build hooks env.
1602
1603     This will run on the new master only in the pre phase, and on all
1604     the nodes in the post phase.
1605
1606     """
1607     env = {
1608       "OP_TARGET": self.new_master,
1609       "NEW_MASTER": self.new_master,
1610       "OLD_MASTER": self.old_master,
1611       }
1612     return env, [self.new_master], self.cfg.GetNodeList()
1613
1614   def CheckPrereq(self):
1615     """Check prerequisites.
1616
1617     This checks that we are not already the master.
1618
1619     """
1620     self.new_master = utils.HostInfo().name
1621     self.old_master = self.sstore.GetMasterNode()
1622
1623     if self.old_master == self.new_master:
1624       raise errors.OpPrereqError("This commands must be run on the node"
1625                                  " where you want the new master to be.\n"
1626                                  "%s is already the master" %
1627                                  self.old_master)
1628
1629   def Exec(self, feedback_fn):
1630     """Failover the master node.
1631
1632     This command, when run on a non-master node, will cause the current
1633     master to cease being master, and the non-master to become new
1634     master.
1635
1636     """
1637     #TODO: do not rely on gethostname returning the FQDN
1638     logger.Info("setting master to %s, old master: %s" %
1639                 (self.new_master, self.old_master))
1640
1641     if not rpc.call_node_stop_master(self.old_master):
1642       logger.Error("could disable the master role on the old master"
1643                    " %s, please disable manually" % self.old_master)
1644
1645     ss = self.sstore
1646     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1647     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1648                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1649       logger.Error("could not distribute the new simple store master file"
1650                    " to the other nodes, please check.")
1651
1652     if not rpc.call_node_start_master(self.new_master):
1653       logger.Error("could not start the master role on the new master"
1654                    " %s, please check" % self.new_master)
1655       feedback_fn("Error in activating the master IP on the new master,\n"
1656                   "please fix manually.")
1657
1658
1659
1660 class LUQueryClusterInfo(NoHooksLU):
1661   """Query cluster configuration.
1662
1663   """
1664   _OP_REQP = []
1665   REQ_MASTER = False
1666
1667   def CheckPrereq(self):
1668     """No prerequsites needed for this LU.
1669
1670     """
1671     pass
1672
1673   def Exec(self, feedback_fn):
1674     """Return cluster config.
1675
1676     """
1677     result = {
1678       "name": self.sstore.GetClusterName(),
1679       "software_version": constants.RELEASE_VERSION,
1680       "protocol_version": constants.PROTOCOL_VERSION,
1681       "config_version": constants.CONFIG_VERSION,
1682       "os_api_version": constants.OS_API_VERSION,
1683       "export_version": constants.EXPORT_VERSION,
1684       "master": self.sstore.GetMasterNode(),
1685       "architecture": (platform.architecture()[0], platform.machine()),
1686       }
1687
1688     return result
1689
1690
1691 class LUClusterCopyFile(NoHooksLU):
1692   """Copy file to cluster.
1693
1694   """
1695   _OP_REQP = ["nodes", "filename"]
1696
1697   def CheckPrereq(self):
1698     """Check prerequisites.
1699
1700     It should check that the named file exists and that the given list
1701     of nodes is valid.
1702
1703     """
1704     if not os.path.exists(self.op.filename):
1705       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1706
1707     self.nodes = _GetWantedNodes(self, self.op.nodes)
1708
1709   def Exec(self, feedback_fn):
1710     """Copy a file from master to some nodes.
1711
1712     Args:
1713       opts - class with options as members
1714       args - list containing a single element, the file name
1715     Opts used:
1716       nodes - list containing the name of target nodes; if empty, all nodes
1717
1718     """
1719     filename = self.op.filename
1720
1721     myname = utils.HostInfo().name
1722
1723     for node in self.nodes:
1724       if node == myname:
1725         continue
1726       if not ssh.CopyFileToNode(node, filename):
1727         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1728
1729
1730 class LUDumpClusterConfig(NoHooksLU):
1731   """Return a text-representation of the cluster-config.
1732
1733   """
1734   _OP_REQP = []
1735
1736   def CheckPrereq(self):
1737     """No prerequisites.
1738
1739     """
1740     pass
1741
1742   def Exec(self, feedback_fn):
1743     """Dump a representation of the cluster config to the standard output.
1744
1745     """
1746     return self.cfg.DumpConfig()
1747
1748
1749 class LURunClusterCommand(NoHooksLU):
1750   """Run a command on some nodes.
1751
1752   """
1753   _OP_REQP = ["command", "nodes"]
1754
1755   def CheckPrereq(self):
1756     """Check prerequisites.
1757
1758     It checks that the given list of nodes is valid.
1759
1760     """
1761     self.nodes = _GetWantedNodes(self, self.op.nodes)
1762
1763   def Exec(self, feedback_fn):
1764     """Run a command on some nodes.
1765
1766     """
1767     data = []
1768     for node in self.nodes:
1769       result = ssh.SSHCall(node, "root", self.op.command)
1770       data.append((node, result.output, result.exit_code))
1771
1772     return data
1773
1774
1775 class LUActivateInstanceDisks(NoHooksLU):
1776   """Bring up an instance's disks.
1777
1778   """
1779   _OP_REQP = ["instance_name"]
1780
1781   def CheckPrereq(self):
1782     """Check prerequisites.
1783
1784     This checks that the instance is in the cluster.
1785
1786     """
1787     instance = self.cfg.GetInstanceInfo(
1788       self.cfg.ExpandInstanceName(self.op.instance_name))
1789     if instance is None:
1790       raise errors.OpPrereqError("Instance '%s' not known" %
1791                                  self.op.instance_name)
1792     self.instance = instance
1793
1794
1795   def Exec(self, feedback_fn):
1796     """Activate the disks.
1797
1798     """
1799     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1800     if not disks_ok:
1801       raise errors.OpExecError("Cannot activate block devices")
1802
1803     return disks_info
1804
1805
1806 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1807   """Prepare the block devices for an instance.
1808
1809   This sets up the block devices on all nodes.
1810
1811   Args:
1812     instance: a ganeti.objects.Instance object
1813     ignore_secondaries: if true, errors on secondary nodes won't result
1814                         in an error return from the function
1815
1816   Returns:
1817     false if the operation failed
1818     list of (host, instance_visible_name, node_visible_name) if the operation
1819          suceeded with the mapping from node devices to instance devices
1820   """
1821   device_info = []
1822   disks_ok = True
1823   for inst_disk in instance.disks:
1824     master_result = None
1825     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1826       cfg.SetDiskID(node_disk, node)
1827       is_primary = node == instance.primary_node
1828       result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1829       if not result:
1830         logger.Error("could not prepare block device %s on node %s (is_pri"
1831                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1832         if is_primary or not ignore_secondaries:
1833           disks_ok = False
1834       if is_primary:
1835         master_result = result
1836     device_info.append((instance.primary_node, inst_disk.iv_name,
1837                         master_result))
1838
1839   return disks_ok, device_info
1840
1841
1842 def _StartInstanceDisks(cfg, instance, force):
1843   """Start the disks of an instance.
1844
1845   """
1846   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1847                                            ignore_secondaries=force)
1848   if not disks_ok:
1849     _ShutdownInstanceDisks(instance, cfg)
1850     if force is not None and not force:
1851       logger.Error("If the message above refers to a secondary node,"
1852                    " you can retry the operation using '--force'.")
1853     raise errors.OpExecError("Disk consistency error")
1854
1855
1856 class LUDeactivateInstanceDisks(NoHooksLU):
1857   """Shutdown an instance's disks.
1858
1859   """
1860   _OP_REQP = ["instance_name"]
1861
1862   def CheckPrereq(self):
1863     """Check prerequisites.
1864
1865     This checks that the instance is in the cluster.
1866
1867     """
1868     instance = self.cfg.GetInstanceInfo(
1869       self.cfg.ExpandInstanceName(self.op.instance_name))
1870     if instance is None:
1871       raise errors.OpPrereqError("Instance '%s' not known" %
1872                                  self.op.instance_name)
1873     self.instance = instance
1874
1875   def Exec(self, feedback_fn):
1876     """Deactivate the disks
1877
1878     """
1879     instance = self.instance
1880     ins_l = rpc.call_instance_list([instance.primary_node])
1881     ins_l = ins_l[instance.primary_node]
1882     if not type(ins_l) is list:
1883       raise errors.OpExecError("Can't contact node '%s'" %
1884                                instance.primary_node)
1885
1886     if self.instance.name in ins_l:
1887       raise errors.OpExecError("Instance is running, can't shutdown"
1888                                " block devices.")
1889
1890     _ShutdownInstanceDisks(instance, self.cfg)
1891
1892
1893 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1894   """Shutdown block devices of an instance.
1895
1896   This does the shutdown on all nodes of the instance.
1897
1898   If the ignore_primary is false, errors on the primary node are
1899   ignored.
1900
1901   """
1902   result = True
1903   for disk in instance.disks:
1904     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1905       cfg.SetDiskID(top_disk, node)
1906       if not rpc.call_blockdev_shutdown(node, top_disk):
1907         logger.Error("could not shutdown block device %s on node %s" %
1908                      (disk.iv_name, node))
1909         if not ignore_primary or node != instance.primary_node:
1910           result = False
1911   return result
1912
1913
1914 class LUStartupInstance(LogicalUnit):
1915   """Starts an instance.
1916
1917   """
1918   HPATH = "instance-start"
1919   HTYPE = constants.HTYPE_INSTANCE
1920   _OP_REQP = ["instance_name", "force"]
1921
1922   def BuildHooksEnv(self):
1923     """Build hooks env.
1924
1925     This runs on master, primary and secondary nodes of the instance.
1926
1927     """
1928     env = {
1929       "FORCE": self.op.force,
1930       }
1931     env.update(_BuildInstanceHookEnvByObject(self.instance))
1932     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1933           list(self.instance.secondary_nodes))
1934     return env, nl, nl
1935
1936   def CheckPrereq(self):
1937     """Check prerequisites.
1938
1939     This checks that the instance is in the cluster.
1940
1941     """
1942     instance = self.cfg.GetInstanceInfo(
1943       self.cfg.ExpandInstanceName(self.op.instance_name))
1944     if instance is None:
1945       raise errors.OpPrereqError("Instance '%s' not known" %
1946                                  self.op.instance_name)
1947
1948     # check bridges existance
1949     _CheckInstanceBridgesExist(instance)
1950
1951     self.instance = instance
1952     self.op.instance_name = instance.name
1953
1954   def Exec(self, feedback_fn):
1955     """Start the instance.
1956
1957     """
1958     instance = self.instance
1959     force = self.op.force
1960     extra_args = getattr(self.op, "extra_args", "")
1961
1962     node_current = instance.primary_node
1963
1964     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1965     if not nodeinfo:
1966       raise errors.OpExecError("Could not contact node %s for infos" %
1967                                (node_current))
1968
1969     freememory = nodeinfo[node_current]['memory_free']
1970     memory = instance.memory
1971     if memory > freememory:
1972       raise errors.OpExecError("Not enough memory to start instance"
1973                                " %s on node %s"
1974                                " needed %s MiB, available %s MiB" %
1975                                (instance.name, node_current, memory,
1976                                 freememory))
1977
1978     _StartInstanceDisks(self.cfg, instance, force)
1979
1980     if not rpc.call_instance_start(node_current, instance, extra_args):
1981       _ShutdownInstanceDisks(instance, self.cfg)
1982       raise errors.OpExecError("Could not start instance")
1983
1984     self.cfg.MarkInstanceUp(instance.name)
1985
1986
1987 class LURebootInstance(LogicalUnit):
1988   """Reboot an instance.
1989
1990   """
1991   HPATH = "instance-reboot"
1992   HTYPE = constants.HTYPE_INSTANCE
1993   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
1994
1995   def BuildHooksEnv(self):
1996     """Build hooks env.
1997
1998     This runs on master, primary and secondary nodes of the instance.
1999
2000     """
2001     env = {
2002       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2003       }
2004     env.update(_BuildInstanceHookEnvByObject(self.instance))
2005     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2006           list(self.instance.secondary_nodes))
2007     return env, nl, nl
2008
2009   def CheckPrereq(self):
2010     """Check prerequisites.
2011
2012     This checks that the instance is in the cluster.
2013
2014     """
2015     instance = self.cfg.GetInstanceInfo(
2016       self.cfg.ExpandInstanceName(self.op.instance_name))
2017     if instance is None:
2018       raise errors.OpPrereqError("Instance '%s' not known" %
2019                                  self.op.instance_name)
2020
2021     # check bridges existance
2022     _CheckInstanceBridgesExist(instance)
2023
2024     self.instance = instance
2025     self.op.instance_name = instance.name
2026
2027   def Exec(self, feedback_fn):
2028     """Reboot the instance.
2029
2030     """
2031     instance = self.instance
2032     ignore_secondaries = self.op.ignore_secondaries
2033     reboot_type = self.op.reboot_type
2034     extra_args = getattr(self.op, "extra_args", "")
2035
2036     node_current = instance.primary_node
2037
2038     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2039                            constants.INSTANCE_REBOOT_HARD,
2040                            constants.INSTANCE_REBOOT_FULL]:
2041       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2042                                   (constants.INSTANCE_REBOOT_SOFT,
2043                                    constants.INSTANCE_REBOOT_HARD,
2044                                    constants.INSTANCE_REBOOT_FULL))
2045
2046     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2047                        constants.INSTANCE_REBOOT_HARD]:
2048       if not rpc.call_instance_reboot(node_current, instance,
2049                                       reboot_type, extra_args):
2050         raise errors.OpExecError("Could not reboot instance")
2051     else:
2052       if not rpc.call_instance_shutdown(node_current, instance):
2053         raise errors.OpExecError("could not shutdown instance for full reboot")
2054       _ShutdownInstanceDisks(instance, self.cfg)
2055       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2056       if not rpc.call_instance_start(node_current, instance, extra_args):
2057         _ShutdownInstanceDisks(instance, self.cfg)
2058         raise errors.OpExecError("Could not start instance for full reboot")
2059
2060     self.cfg.MarkInstanceUp(instance.name)
2061
2062
2063 class LUShutdownInstance(LogicalUnit):
2064   """Shutdown an instance.
2065
2066   """
2067   HPATH = "instance-stop"
2068   HTYPE = constants.HTYPE_INSTANCE
2069   _OP_REQP = ["instance_name"]
2070
2071   def BuildHooksEnv(self):
2072     """Build hooks env.
2073
2074     This runs on master, primary and secondary nodes of the instance.
2075
2076     """
2077     env = _BuildInstanceHookEnvByObject(self.instance)
2078     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2079           list(self.instance.secondary_nodes))
2080     return env, nl, nl
2081
2082   def CheckPrereq(self):
2083     """Check prerequisites.
2084
2085     This checks that the instance is in the cluster.
2086
2087     """
2088     instance = self.cfg.GetInstanceInfo(
2089       self.cfg.ExpandInstanceName(self.op.instance_name))
2090     if instance is None:
2091       raise errors.OpPrereqError("Instance '%s' not known" %
2092                                  self.op.instance_name)
2093     self.instance = instance
2094
2095   def Exec(self, feedback_fn):
2096     """Shutdown the instance.
2097
2098     """
2099     instance = self.instance
2100     node_current = instance.primary_node
2101     if not rpc.call_instance_shutdown(node_current, instance):
2102       logger.Error("could not shutdown instance")
2103
2104     self.cfg.MarkInstanceDown(instance.name)
2105     _ShutdownInstanceDisks(instance, self.cfg)
2106
2107
2108 class LUReinstallInstance(LogicalUnit):
2109   """Reinstall an instance.
2110
2111   """
2112   HPATH = "instance-reinstall"
2113   HTYPE = constants.HTYPE_INSTANCE
2114   _OP_REQP = ["instance_name"]
2115
2116   def BuildHooksEnv(self):
2117     """Build hooks env.
2118
2119     This runs on master, primary and secondary nodes of the instance.
2120
2121     """
2122     env = _BuildInstanceHookEnvByObject(self.instance)
2123     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2124           list(self.instance.secondary_nodes))
2125     return env, nl, nl
2126
2127   def CheckPrereq(self):
2128     """Check prerequisites.
2129
2130     This checks that the instance is in the cluster and is not running.
2131
2132     """
2133     instance = self.cfg.GetInstanceInfo(
2134       self.cfg.ExpandInstanceName(self.op.instance_name))
2135     if instance is None:
2136       raise errors.OpPrereqError("Instance '%s' not known" %
2137                                  self.op.instance_name)
2138     if instance.disk_template == constants.DT_DISKLESS:
2139       raise errors.OpPrereqError("Instance '%s' has no disks" %
2140                                  self.op.instance_name)
2141     if instance.status != "down":
2142       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2143                                  self.op.instance_name)
2144     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2145     if remote_info:
2146       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2147                                  (self.op.instance_name,
2148                                   instance.primary_node))
2149
2150     self.op.os_type = getattr(self.op, "os_type", None)
2151     if self.op.os_type is not None:
2152       # OS verification
2153       pnode = self.cfg.GetNodeInfo(
2154         self.cfg.ExpandNodeName(instance.primary_node))
2155       if pnode is None:
2156         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2157                                    self.op.pnode)
2158       os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2159       if not isinstance(os_obj, objects.OS):
2160         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2161                                    " primary node"  % self.op.os_type)
2162
2163     self.instance = instance
2164
2165   def Exec(self, feedback_fn):
2166     """Reinstall the instance.
2167
2168     """
2169     inst = self.instance
2170
2171     if self.op.os_type is not None:
2172       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2173       inst.os = self.op.os_type
2174       self.cfg.AddInstance(inst)
2175
2176     _StartInstanceDisks(self.cfg, inst, None)
2177     try:
2178       feedback_fn("Running the instance OS create scripts...")
2179       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2180         raise errors.OpExecError("Could not install OS for instance %s "
2181                                  "on node %s" %
2182                                  (inst.name, inst.primary_node))
2183     finally:
2184       _ShutdownInstanceDisks(inst, self.cfg)
2185
2186
2187 class LURenameInstance(LogicalUnit):
2188   """Rename an instance.
2189
2190   """
2191   HPATH = "instance-rename"
2192   HTYPE = constants.HTYPE_INSTANCE
2193   _OP_REQP = ["instance_name", "new_name"]
2194
2195   def BuildHooksEnv(self):
2196     """Build hooks env.
2197
2198     This runs on master, primary and secondary nodes of the instance.
2199
2200     """
2201     env = _BuildInstanceHookEnvByObject(self.instance)
2202     env["INSTANCE_NEW_NAME"] = self.op.new_name
2203     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2204           list(self.instance.secondary_nodes))
2205     return env, nl, nl
2206
2207   def CheckPrereq(self):
2208     """Check prerequisites.
2209
2210     This checks that the instance is in the cluster and is not running.
2211
2212     """
2213     instance = self.cfg.GetInstanceInfo(
2214       self.cfg.ExpandInstanceName(self.op.instance_name))
2215     if instance is None:
2216       raise errors.OpPrereqError("Instance '%s' not known" %
2217                                  self.op.instance_name)
2218     if instance.status != "down":
2219       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2220                                  self.op.instance_name)
2221     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2222     if remote_info:
2223       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2224                                  (self.op.instance_name,
2225                                   instance.primary_node))
2226     self.instance = instance
2227
2228     # new name verification
2229     name_info = utils.HostInfo(self.op.new_name)
2230
2231     self.op.new_name = new_name = name_info.name
2232     if not getattr(self.op, "ignore_ip", False):
2233       command = ["fping", "-q", name_info.ip]
2234       result = utils.RunCmd(command)
2235       if not result.failed:
2236         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2237                                    (name_info.ip, new_name))
2238
2239
2240   def Exec(self, feedback_fn):
2241     """Reinstall the instance.
2242
2243     """
2244     inst = self.instance
2245     old_name = inst.name
2246
2247     self.cfg.RenameInstance(inst.name, self.op.new_name)
2248
2249     # re-read the instance from the configuration after rename
2250     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2251
2252     _StartInstanceDisks(self.cfg, inst, None)
2253     try:
2254       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2255                                           "sda", "sdb"):
2256         msg = ("Could run OS rename script for instance %s\n"
2257                "on node %s\n"
2258                "(but the instance has been renamed in Ganeti)" %
2259                (inst.name, inst.primary_node))
2260         logger.Error(msg)
2261     finally:
2262       _ShutdownInstanceDisks(inst, self.cfg)
2263
2264
2265 class LURemoveInstance(LogicalUnit):
2266   """Remove an instance.
2267
2268   """
2269   HPATH = "instance-remove"
2270   HTYPE = constants.HTYPE_INSTANCE
2271   _OP_REQP = ["instance_name"]
2272
2273   def BuildHooksEnv(self):
2274     """Build hooks env.
2275
2276     This runs on master, primary and secondary nodes of the instance.
2277
2278     """
2279     env = _BuildInstanceHookEnvByObject(self.instance)
2280     nl = [self.sstore.GetMasterNode()]
2281     return env, nl, nl
2282
2283   def CheckPrereq(self):
2284     """Check prerequisites.
2285
2286     This checks that the instance is in the cluster.
2287
2288     """
2289     instance = self.cfg.GetInstanceInfo(
2290       self.cfg.ExpandInstanceName(self.op.instance_name))
2291     if instance is None:
2292       raise errors.OpPrereqError("Instance '%s' not known" %
2293                                  self.op.instance_name)
2294     self.instance = instance
2295
2296   def Exec(self, feedback_fn):
2297     """Remove the instance.
2298
2299     """
2300     instance = self.instance
2301     logger.Info("shutting down instance %s on node %s" %
2302                 (instance.name, instance.primary_node))
2303
2304     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2305       if self.op.ignore_failures:
2306         feedback_fn("Warning: can't shutdown instance")
2307       else:
2308         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2309                                  (instance.name, instance.primary_node))
2310
2311     logger.Info("removing block devices for instance %s" % instance.name)
2312
2313     if not _RemoveDisks(instance, self.cfg):
2314       if self.op.ignore_failures:
2315         feedback_fn("Warning: can't remove instance's disks")
2316       else:
2317         raise errors.OpExecError("Can't remove instance's disks")
2318
2319     logger.Info("removing instance %s out of cluster config" % instance.name)
2320
2321     self.cfg.RemoveInstance(instance.name)
2322
2323
2324 class LUQueryInstances(NoHooksLU):
2325   """Logical unit for querying instances.
2326
2327   """
2328   _OP_REQP = ["output_fields", "names"]
2329
2330   def CheckPrereq(self):
2331     """Check prerequisites.
2332
2333     This checks that the fields required are valid output fields.
2334
2335     """
2336     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2337     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2338                                "admin_state", "admin_ram",
2339                                "disk_template", "ip", "mac", "bridge",
2340                                "sda_size", "sdb_size"],
2341                        dynamic=self.dynamic_fields,
2342                        selected=self.op.output_fields)
2343
2344     self.wanted = _GetWantedInstances(self, self.op.names)
2345
2346   def Exec(self, feedback_fn):
2347     """Computes the list of nodes and their attributes.
2348
2349     """
2350     instance_names = self.wanted
2351     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2352                      in instance_names]
2353
2354     # begin data gathering
2355
2356     nodes = frozenset([inst.primary_node for inst in instance_list])
2357
2358     bad_nodes = []
2359     if self.dynamic_fields.intersection(self.op.output_fields):
2360       live_data = {}
2361       node_data = rpc.call_all_instances_info(nodes)
2362       for name in nodes:
2363         result = node_data[name]
2364         if result:
2365           live_data.update(result)
2366         elif result == False:
2367           bad_nodes.append(name)
2368         # else no instance is alive
2369     else:
2370       live_data = dict([(name, {}) for name in instance_names])
2371
2372     # end data gathering
2373
2374     output = []
2375     for instance in instance_list:
2376       iout = []
2377       for field in self.op.output_fields:
2378         if field == "name":
2379           val = instance.name
2380         elif field == "os":
2381           val = instance.os
2382         elif field == "pnode":
2383           val = instance.primary_node
2384         elif field == "snodes":
2385           val = list(instance.secondary_nodes)
2386         elif field == "admin_state":
2387           val = (instance.status != "down")
2388         elif field == "oper_state":
2389           if instance.primary_node in bad_nodes:
2390             val = None
2391           else:
2392             val = bool(live_data.get(instance.name))
2393         elif field == "admin_ram":
2394           val = instance.memory
2395         elif field == "oper_ram":
2396           if instance.primary_node in bad_nodes:
2397             val = None
2398           elif instance.name in live_data:
2399             val = live_data[instance.name].get("memory", "?")
2400           else:
2401             val = "-"
2402         elif field == "disk_template":
2403           val = instance.disk_template
2404         elif field == "ip":
2405           val = instance.nics[0].ip
2406         elif field == "bridge":
2407           val = instance.nics[0].bridge
2408         elif field == "mac":
2409           val = instance.nics[0].mac
2410         elif field == "sda_size" or field == "sdb_size":
2411           disk = instance.FindDisk(field[:3])
2412           if disk is None:
2413             val = None
2414           else:
2415             val = disk.size
2416         else:
2417           raise errors.ParameterError(field)
2418         iout.append(val)
2419       output.append(iout)
2420
2421     return output
2422
2423
2424 class LUFailoverInstance(LogicalUnit):
2425   """Failover an instance.
2426
2427   """
2428   HPATH = "instance-failover"
2429   HTYPE = constants.HTYPE_INSTANCE
2430   _OP_REQP = ["instance_name", "ignore_consistency"]
2431
2432   def BuildHooksEnv(self):
2433     """Build hooks env.
2434
2435     This runs on master, primary and secondary nodes of the instance.
2436
2437     """
2438     env = {
2439       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2440       }
2441     env.update(_BuildInstanceHookEnvByObject(self.instance))
2442     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2443     return env, nl, nl
2444
2445   def CheckPrereq(self):
2446     """Check prerequisites.
2447
2448     This checks that the instance is in the cluster.
2449
2450     """
2451     instance = self.cfg.GetInstanceInfo(
2452       self.cfg.ExpandInstanceName(self.op.instance_name))
2453     if instance is None:
2454       raise errors.OpPrereqError("Instance '%s' not known" %
2455                                  self.op.instance_name)
2456
2457     if instance.disk_template != constants.DT_REMOTE_RAID1:
2458       raise errors.OpPrereqError("Instance's disk layout is not"
2459                                  " remote_raid1.")
2460
2461     secondary_nodes = instance.secondary_nodes
2462     if not secondary_nodes:
2463       raise errors.ProgrammerError("no secondary node but using "
2464                                    "DT_REMOTE_RAID1 template")
2465
2466     # check memory requirements on the secondary node
2467     target_node = secondary_nodes[0]
2468     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2469     info = nodeinfo.get(target_node, None)
2470     if not info:
2471       raise errors.OpPrereqError("Cannot get current information"
2472                                  " from node '%s'" % nodeinfo)
2473     if instance.memory > info['memory_free']:
2474       raise errors.OpPrereqError("Not enough memory on target node %s."
2475                                  " %d MB available, %d MB required" %
2476                                  (target_node, info['memory_free'],
2477                                   instance.memory))
2478
2479     # check bridge existance
2480     brlist = [nic.bridge for nic in instance.nics]
2481     if not rpc.call_bridges_exist(instance.primary_node, brlist):
2482       raise errors.OpPrereqError("One or more target bridges %s does not"
2483                                  " exist on destination node '%s'" %
2484                                  (brlist, instance.primary_node))
2485
2486     self.instance = instance
2487
2488   def Exec(self, feedback_fn):
2489     """Failover an instance.
2490
2491     The failover is done by shutting it down on its present node and
2492     starting it on the secondary.
2493
2494     """
2495     instance = self.instance
2496
2497     source_node = instance.primary_node
2498     target_node = instance.secondary_nodes[0]
2499
2500     feedback_fn("* checking disk consistency between source and target")
2501     for dev in instance.disks:
2502       # for remote_raid1, these are md over drbd
2503       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2504         if not self.op.ignore_consistency:
2505           raise errors.OpExecError("Disk %s is degraded on target node,"
2506                                    " aborting failover." % dev.iv_name)
2507
2508     feedback_fn("* checking target node resource availability")
2509     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2510
2511     if not nodeinfo:
2512       raise errors.OpExecError("Could not contact target node %s." %
2513                                target_node)
2514
2515     free_memory = int(nodeinfo[target_node]['memory_free'])
2516     memory = instance.memory
2517     if memory > free_memory:
2518       raise errors.OpExecError("Not enough memory to create instance %s on"
2519                                " node %s. needed %s MiB, available %s MiB" %
2520                                (instance.name, target_node, memory,
2521                                 free_memory))
2522
2523     feedback_fn("* shutting down instance on source node")
2524     logger.Info("Shutting down instance %s on node %s" %
2525                 (instance.name, source_node))
2526
2527     if not rpc.call_instance_shutdown(source_node, instance):
2528       logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2529                    " anyway. Please make sure node %s is down"  %
2530                    (instance.name, source_node, source_node))
2531
2532     feedback_fn("* deactivating the instance's disks on source node")
2533     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2534       raise errors.OpExecError("Can't shut down the instance's disks.")
2535
2536     instance.primary_node = target_node
2537     # distribute new instance config to the other nodes
2538     self.cfg.AddInstance(instance)
2539
2540     feedback_fn("* activating the instance's disks on target node")
2541     logger.Info("Starting instance %s on node %s" %
2542                 (instance.name, target_node))
2543
2544     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2545                                              ignore_secondaries=True)
2546     if not disks_ok:
2547       _ShutdownInstanceDisks(instance, self.cfg)
2548       raise errors.OpExecError("Can't activate the instance's disks")
2549
2550     feedback_fn("* starting the instance on the target node")
2551     if not rpc.call_instance_start(target_node, instance, None):
2552       _ShutdownInstanceDisks(instance, self.cfg)
2553       raise errors.OpExecError("Could not start instance %s on node %s." %
2554                                (instance.name, target_node))
2555
2556
2557 def _CreateBlockDevOnPrimary(cfg, node, device, info):
2558   """Create a tree of block devices on the primary node.
2559
2560   This always creates all devices.
2561
2562   """
2563   if device.children:
2564     for child in device.children:
2565       if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2566         return False
2567
2568   cfg.SetDiskID(device, node)
2569   new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2570   if not new_id:
2571     return False
2572   if device.physical_id is None:
2573     device.physical_id = new_id
2574   return True
2575
2576
2577 def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2578   """Create a tree of block devices on a secondary node.
2579
2580   If this device type has to be created on secondaries, create it and
2581   all its children.
2582
2583   If not, just recurse to children keeping the same 'force' value.
2584
2585   """
2586   if device.CreateOnSecondary():
2587     force = True
2588   if device.children:
2589     for child in device.children:
2590       if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2591         return False
2592
2593   if not force:
2594     return True
2595   cfg.SetDiskID(device, node)
2596   new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2597   if not new_id:
2598     return False
2599   if device.physical_id is None:
2600     device.physical_id = new_id
2601   return True
2602
2603
2604 def _GenerateUniqueNames(cfg, exts):
2605   """Generate a suitable LV name.
2606
2607   This will generate a logical volume name for the given instance.
2608
2609   """
2610   results = []
2611   for val in exts:
2612     new_id = cfg.GenerateUniqueID()
2613     results.append("%s%s" % (new_id, val))
2614   return results
2615
2616
2617 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2618   """Generate a drbd device complete with its children.
2619
2620   """
2621   port = cfg.AllocatePort()
2622   vgname = cfg.GetVGName()
2623   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2624                           logical_id=(vgname, names[0]))
2625   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2626                           logical_id=(vgname, names[1]))
2627   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2628                           logical_id = (primary, secondary, port),
2629                           children = [dev_data, dev_meta])
2630   return drbd_dev
2631
2632
2633 def _GenerateDiskTemplate(cfg, template_name,
2634                           instance_name, primary_node,
2635                           secondary_nodes, disk_sz, swap_sz):
2636   """Generate the entire disk layout for a given template type.
2637
2638   """
2639   #TODO: compute space requirements
2640
2641   vgname = cfg.GetVGName()
2642   if template_name == "diskless":
2643     disks = []
2644   elif template_name == "plain":
2645     if len(secondary_nodes) != 0:
2646       raise errors.ProgrammerError("Wrong template configuration")
2647
2648     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2649     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2650                            logical_id=(vgname, names[0]),
2651                            iv_name = "sda")
2652     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2653                            logical_id=(vgname, names[1]),
2654                            iv_name = "sdb")
2655     disks = [sda_dev, sdb_dev]
2656   elif template_name == "local_raid1":
2657     if len(secondary_nodes) != 0:
2658       raise errors.ProgrammerError("Wrong template configuration")
2659
2660
2661     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2662                                        ".sdb_m1", ".sdb_m2"])
2663     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2664                               logical_id=(vgname, names[0]))
2665     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2666                               logical_id=(vgname, names[1]))
2667     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2668                               size=disk_sz,
2669                               children = [sda_dev_m1, sda_dev_m2])
2670     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2671                               logical_id=(vgname, names[2]))
2672     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2673                               logical_id=(vgname, names[3]))
2674     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2675                               size=swap_sz,
2676                               children = [sdb_dev_m1, sdb_dev_m2])
2677     disks = [md_sda_dev, md_sdb_dev]
2678   elif template_name == constants.DT_REMOTE_RAID1:
2679     if len(secondary_nodes) != 1:
2680       raise errors.ProgrammerError("Wrong template configuration")
2681     remote_node = secondary_nodes[0]
2682     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2683                                        ".sdb_data", ".sdb_meta"])
2684     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2685                                          disk_sz, names[0:2])
2686     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2687                               children = [drbd_sda_dev], size=disk_sz)
2688     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2689                                          swap_sz, names[2:4])
2690     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2691                               children = [drbd_sdb_dev], size=swap_sz)
2692     disks = [md_sda_dev, md_sdb_dev]
2693   else:
2694     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2695   return disks
2696
2697
2698 def _GetInstanceInfoText(instance):
2699   """Compute that text that should be added to the disk's metadata.
2700
2701   """
2702   return "originstname+%s" % instance.name
2703
2704
2705 def _CreateDisks(cfg, instance):
2706   """Create all disks for an instance.
2707
2708   This abstracts away some work from AddInstance.
2709
2710   Args:
2711     instance: the instance object
2712
2713   Returns:
2714     True or False showing the success of the creation process
2715
2716   """
2717   info = _GetInstanceInfoText(instance)
2718
2719   for device in instance.disks:
2720     logger.Info("creating volume %s for instance %s" %
2721               (device.iv_name, instance.name))
2722     #HARDCODE
2723     for secondary_node in instance.secondary_nodes:
2724       if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2725                                         info):
2726         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2727                      (device.iv_name, device, secondary_node))
2728         return False
2729     #HARDCODE
2730     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2731       logger.Error("failed to create volume %s on primary!" %
2732                    device.iv_name)
2733       return False
2734   return True
2735
2736
2737 def _RemoveDisks(instance, cfg):
2738   """Remove all disks for an instance.
2739
2740   This abstracts away some work from `AddInstance()` and
2741   `RemoveInstance()`. Note that in case some of the devices couldn't
2742   be removed, the removal will continue with the other ones (compare
2743   with `_CreateDisks()`).
2744
2745   Args:
2746     instance: the instance object
2747
2748   Returns:
2749     True or False showing the success of the removal proces
2750
2751   """
2752   logger.Info("removing block devices for instance %s" % instance.name)
2753
2754   result = True
2755   for device in instance.disks:
2756     for node, disk in device.ComputeNodeTree(instance.primary_node):
2757       cfg.SetDiskID(disk, node)
2758       if not rpc.call_blockdev_remove(node, disk):
2759         logger.Error("could not remove block device %s on node %s,"
2760                      " continuing anyway" %
2761                      (device.iv_name, node))
2762         result = False
2763   return result
2764
2765
2766 class LUCreateInstance(LogicalUnit):
2767   """Create an instance.
2768
2769   """
2770   HPATH = "instance-add"
2771   HTYPE = constants.HTYPE_INSTANCE
2772   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2773               "disk_template", "swap_size", "mode", "start", "vcpus",
2774               "wait_for_sync", "ip_check"]
2775
2776   def BuildHooksEnv(self):
2777     """Build hooks env.
2778
2779     This runs on master, primary and secondary nodes of the instance.
2780
2781     """
2782     env = {
2783       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2784       "INSTANCE_DISK_SIZE": self.op.disk_size,
2785       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2786       "INSTANCE_ADD_MODE": self.op.mode,
2787       }
2788     if self.op.mode == constants.INSTANCE_IMPORT:
2789       env["INSTANCE_SRC_NODE"] = self.op.src_node
2790       env["INSTANCE_SRC_PATH"] = self.op.src_path
2791       env["INSTANCE_SRC_IMAGE"] = self.src_image
2792
2793     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2794       primary_node=self.op.pnode,
2795       secondary_nodes=self.secondaries,
2796       status=self.instance_status,
2797       os_type=self.op.os_type,
2798       memory=self.op.mem_size,
2799       vcpus=self.op.vcpus,
2800       nics=[(self.inst_ip, self.op.bridge)],
2801     ))
2802
2803     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2804           self.secondaries)
2805     return env, nl, nl
2806
2807
2808   def CheckPrereq(self):
2809     """Check prerequisites.
2810
2811     """
2812     if self.op.mode not in (constants.INSTANCE_CREATE,
2813                             constants.INSTANCE_IMPORT):
2814       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2815                                  self.op.mode)
2816
2817     if self.op.mode == constants.INSTANCE_IMPORT:
2818       src_node = getattr(self.op, "src_node", None)
2819       src_path = getattr(self.op, "src_path", None)
2820       if src_node is None or src_path is None:
2821         raise errors.OpPrereqError("Importing an instance requires source"
2822                                    " node and path options")
2823       src_node_full = self.cfg.ExpandNodeName(src_node)
2824       if src_node_full is None:
2825         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2826       self.op.src_node = src_node = src_node_full
2827
2828       if not os.path.isabs(src_path):
2829         raise errors.OpPrereqError("The source path must be absolute")
2830
2831       export_info = rpc.call_export_info(src_node, src_path)
2832
2833       if not export_info:
2834         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2835
2836       if not export_info.has_section(constants.INISECT_EXP):
2837         raise errors.ProgrammerError("Corrupted export config")
2838
2839       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2840       if (int(ei_version) != constants.EXPORT_VERSION):
2841         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2842                                    (ei_version, constants.EXPORT_VERSION))
2843
2844       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2845         raise errors.OpPrereqError("Can't import instance with more than"
2846                                    " one data disk")
2847
2848       # FIXME: are the old os-es, disk sizes, etc. useful?
2849       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2850       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2851                                                          'disk0_dump'))
2852       self.src_image = diskimage
2853     else: # INSTANCE_CREATE
2854       if getattr(self.op, "os_type", None) is None:
2855         raise errors.OpPrereqError("No guest OS specified")
2856
2857     # check primary node
2858     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2859     if pnode is None:
2860       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2861                                  self.op.pnode)
2862     self.op.pnode = pnode.name
2863     self.pnode = pnode
2864     self.secondaries = []
2865     # disk template and mirror node verification
2866     if self.op.disk_template not in constants.DISK_TEMPLATES:
2867       raise errors.OpPrereqError("Invalid disk template name")
2868
2869     if self.op.disk_template == constants.DT_REMOTE_RAID1:
2870       if getattr(self.op, "snode", None) is None:
2871         raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2872                                    " a mirror node")
2873
2874       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2875       if snode_name is None:
2876         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2877                                    self.op.snode)
2878       elif snode_name == pnode.name:
2879         raise errors.OpPrereqError("The secondary node cannot be"
2880                                    " the primary node.")
2881       self.secondaries.append(snode_name)
2882
2883     # Check lv size requirements
2884     nodenames = [pnode.name] + self.secondaries
2885     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2886
2887     # Required free disk space as a function of disk and swap space
2888     req_size_dict = {
2889       constants.DT_DISKLESS: 0,
2890       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2891       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2892       # 256 MB are added for drbd metadata, 128MB for each drbd device
2893       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2894     }
2895
2896     if self.op.disk_template not in req_size_dict:
2897       raise errors.ProgrammerError("Disk template '%s' size requirement"
2898                                    " is unknown" %  self.op.disk_template)
2899
2900     req_size = req_size_dict[self.op.disk_template]
2901
2902     for node in nodenames:
2903       info = nodeinfo.get(node, None)
2904       if not info:
2905         raise errors.OpPrereqError("Cannot get current information"
2906                                    " from node '%s'" % nodeinfo)
2907       if req_size > info['vg_free']:
2908         raise errors.OpPrereqError("Not enough disk space on target node %s."
2909                                    " %d MB available, %d MB required" %
2910                                    (node, info['vg_free'], req_size))
2911
2912     # os verification
2913     os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2914     if not isinstance(os_obj, objects.OS):
2915       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2916                                  " primary node"  % self.op.os_type)
2917
2918     # instance verification
2919     hostname1 = utils.HostInfo(self.op.instance_name)
2920
2921     self.op.instance_name = instance_name = hostname1.name
2922     instance_list = self.cfg.GetInstanceList()
2923     if instance_name in instance_list:
2924       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2925                                  instance_name)
2926
2927     ip = getattr(self.op, "ip", None)
2928     if ip is None or ip.lower() == "none":
2929       inst_ip = None
2930     elif ip.lower() == "auto":
2931       inst_ip = hostname1.ip
2932     else:
2933       if not utils.IsValidIP(ip):
2934         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2935                                    " like a valid IP" % ip)
2936       inst_ip = ip
2937     self.inst_ip = inst_ip
2938
2939     if self.op.start and not self.op.ip_check:
2940       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2941                                  " adding an instance in start mode")
2942
2943     if self.op.ip_check:
2944       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2945                        constants.DEFAULT_NODED_PORT):
2946         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2947                                    (hostname1.ip, instance_name))
2948
2949     # bridge verification
2950     bridge = getattr(self.op, "bridge", None)
2951     if bridge is None:
2952       self.op.bridge = self.cfg.GetDefBridge()
2953     else:
2954       self.op.bridge = bridge
2955
2956     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2957       raise errors.OpPrereqError("target bridge '%s' does not exist on"
2958                                  " destination node '%s'" %
2959                                  (self.op.bridge, pnode.name))
2960
2961     if self.op.start:
2962       self.instance_status = 'up'
2963     else:
2964       self.instance_status = 'down'
2965
2966   def Exec(self, feedback_fn):
2967     """Create and add the instance to the cluster.
2968
2969     """
2970     instance = self.op.instance_name
2971     pnode_name = self.pnode.name
2972
2973     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2974     if self.inst_ip is not None:
2975       nic.ip = self.inst_ip
2976
2977     disks = _GenerateDiskTemplate(self.cfg,
2978                                   self.op.disk_template,
2979                                   instance, pnode_name,
2980                                   self.secondaries, self.op.disk_size,
2981                                   self.op.swap_size)
2982
2983     iobj = objects.Instance(name=instance, os=self.op.os_type,
2984                             primary_node=pnode_name,
2985                             memory=self.op.mem_size,
2986                             vcpus=self.op.vcpus,
2987                             nics=[nic], disks=disks,
2988                             disk_template=self.op.disk_template,
2989                             status=self.instance_status,
2990                             )
2991
2992     feedback_fn("* creating instance disks...")
2993     if not _CreateDisks(self.cfg, iobj):
2994       _RemoveDisks(iobj, self.cfg)
2995       raise errors.OpExecError("Device creation failed, reverting...")
2996
2997     feedback_fn("adding instance %s to cluster config" % instance)
2998
2999     self.cfg.AddInstance(iobj)
3000
3001     if self.op.wait_for_sync:
3002       disk_abort = not _WaitForSync(self.cfg, iobj)
3003     elif iobj.disk_template == constants.DT_REMOTE_RAID1:
3004       # make sure the disks are not degraded (still sync-ing is ok)
3005       time.sleep(15)
3006       feedback_fn("* checking mirrors status")
3007       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
3008     else:
3009       disk_abort = False
3010
3011     if disk_abort:
3012       _RemoveDisks(iobj, self.cfg)
3013       self.cfg.RemoveInstance(iobj.name)
3014       raise errors.OpExecError("There are some degraded disks for"
3015                                " this instance")
3016
3017     feedback_fn("creating os for instance %s on node %s" %
3018                 (instance, pnode_name))
3019
3020     if iobj.disk_template != constants.DT_DISKLESS:
3021       if self.op.mode == constants.INSTANCE_CREATE:
3022         feedback_fn("* running the instance OS create scripts...")
3023         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3024           raise errors.OpExecError("could not add os for instance %s"
3025                                    " on node %s" %
3026                                    (instance, pnode_name))
3027
3028       elif self.op.mode == constants.INSTANCE_IMPORT:
3029         feedback_fn("* running the instance OS import scripts...")
3030         src_node = self.op.src_node
3031         src_image = self.src_image
3032         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3033                                                 src_node, src_image):
3034           raise errors.OpExecError("Could not import os for instance"
3035                                    " %s on node %s" %
3036                                    (instance, pnode_name))
3037       else:
3038         # also checked in the prereq part
3039         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3040                                      % self.op.mode)
3041
3042     if self.op.start:
3043       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3044       feedback_fn("* starting instance...")
3045       if not rpc.call_instance_start(pnode_name, iobj, None):
3046         raise errors.OpExecError("Could not start instance")
3047
3048
3049 class LUConnectConsole(NoHooksLU):
3050   """Connect to an instance's console.
3051
3052   This is somewhat special in that it returns the command line that
3053   you need to run on the master node in order to connect to the
3054   console.
3055
3056   """
3057   _OP_REQP = ["instance_name"]
3058
3059   def CheckPrereq(self):
3060     """Check prerequisites.
3061
3062     This checks that the instance is in the cluster.
3063
3064     """
3065     instance = self.cfg.GetInstanceInfo(
3066       self.cfg.ExpandInstanceName(self.op.instance_name))
3067     if instance is None:
3068       raise errors.OpPrereqError("Instance '%s' not known" %
3069                                  self.op.instance_name)
3070     self.instance = instance
3071
3072   def Exec(self, feedback_fn):
3073     """Connect to the console of an instance
3074
3075     """
3076     instance = self.instance
3077     node = instance.primary_node
3078
3079     node_insts = rpc.call_instance_list([node])[node]
3080     if node_insts is False:
3081       raise errors.OpExecError("Can't connect to node %s." % node)
3082
3083     if instance.name not in node_insts:
3084       raise errors.OpExecError("Instance %s is not running." % instance.name)
3085
3086     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3087
3088     hyper = hypervisor.GetHypervisor()
3089     console_cmd = hyper.GetShellCommandForConsole(instance.name)
3090     # build ssh cmdline
3091     argv = ["ssh", "-q", "-t"]
3092     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3093     argv.extend(ssh.BATCH_MODE_OPTS)
3094     argv.append(node)
3095     argv.append(console_cmd)
3096     return "ssh", argv
3097
3098
3099 class LUAddMDDRBDComponent(LogicalUnit):
3100   """Adda new mirror member to an instance's disk.
3101
3102   """
3103   HPATH = "mirror-add"
3104   HTYPE = constants.HTYPE_INSTANCE
3105   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3106
3107   def BuildHooksEnv(self):
3108     """Build hooks env.
3109
3110     This runs on the master, the primary and all the secondaries.
3111
3112     """
3113     env = {
3114       "NEW_SECONDARY": self.op.remote_node,
3115       "DISK_NAME": self.op.disk_name,
3116       }
3117     env.update(_BuildInstanceHookEnvByObject(self.instance))
3118     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3119           self.op.remote_node,] + list(self.instance.secondary_nodes)
3120     return env, nl, nl
3121
3122   def CheckPrereq(self):
3123     """Check prerequisites.
3124
3125     This checks that the instance is in the cluster.
3126
3127     """
3128     instance = self.cfg.GetInstanceInfo(
3129       self.cfg.ExpandInstanceName(self.op.instance_name))
3130     if instance is None:
3131       raise errors.OpPrereqError("Instance '%s' not known" %
3132                                  self.op.instance_name)
3133     self.instance = instance
3134
3135     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3136     if remote_node is None:
3137       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3138     self.remote_node = remote_node
3139
3140     if remote_node == instance.primary_node:
3141       raise errors.OpPrereqError("The specified node is the primary node of"
3142                                  " the instance.")
3143
3144     if instance.disk_template != constants.DT_REMOTE_RAID1:
3145       raise errors.OpPrereqError("Instance's disk layout is not"
3146                                  " remote_raid1.")
3147     for disk in instance.disks:
3148       if disk.iv_name == self.op.disk_name:
3149         break
3150     else:
3151       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3152                                  " instance." % self.op.disk_name)
3153     if len(disk.children) > 1:
3154       raise errors.OpPrereqError("The device already has two slave"
3155                                  " devices.\n"
3156                                  "This would create a 3-disk raid1"
3157                                  " which we don't allow.")
3158     self.disk = disk
3159
3160   def Exec(self, feedback_fn):
3161     """Add the mirror component
3162
3163     """
3164     disk = self.disk
3165     instance = self.instance
3166
3167     remote_node = self.remote_node
3168     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3169     names = _GenerateUniqueNames(self.cfg, lv_names)
3170     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3171                                      remote_node, disk.size, names)
3172
3173     logger.Info("adding new mirror component on secondary")
3174     #HARDCODE
3175     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
3176                                       _GetInstanceInfoText(instance)):
3177       raise errors.OpExecError("Failed to create new component on secondary"
3178                                " node %s" % remote_node)
3179
3180     logger.Info("adding new mirror component on primary")
3181     #HARDCODE
3182     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
3183                                     _GetInstanceInfoText(instance)):
3184       # remove secondary dev
3185       self.cfg.SetDiskID(new_drbd, remote_node)
3186       rpc.call_blockdev_remove(remote_node, new_drbd)
3187       raise errors.OpExecError("Failed to create volume on primary")
3188
3189     # the device exists now
3190     # call the primary node to add the mirror to md
3191     logger.Info("adding new mirror component to md")
3192     if not rpc.call_blockdev_addchild(instance.primary_node,
3193                                            disk, new_drbd):
3194       logger.Error("Can't add mirror compoment to md!")
3195       self.cfg.SetDiskID(new_drbd, remote_node)
3196       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3197         logger.Error("Can't rollback on secondary")
3198       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3199       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3200         logger.Error("Can't rollback on primary")
3201       raise errors.OpExecError("Can't add mirror component to md array")
3202
3203     disk.children.append(new_drbd)
3204
3205     self.cfg.AddInstance(instance)
3206
3207     _WaitForSync(self.cfg, instance)
3208
3209     return 0
3210
3211
3212 class LURemoveMDDRBDComponent(LogicalUnit):
3213   """Remove a component from a remote_raid1 disk.
3214
3215   """
3216   HPATH = "mirror-remove"
3217   HTYPE = constants.HTYPE_INSTANCE
3218   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3219
3220   def BuildHooksEnv(self):
3221     """Build hooks env.
3222
3223     This runs on the master, the primary and all the secondaries.
3224
3225     """
3226     env = {
3227       "DISK_NAME": self.op.disk_name,
3228       "DISK_ID": self.op.disk_id,
3229       "OLD_SECONDARY": self.old_secondary,
3230       }
3231     env.update(_BuildInstanceHookEnvByObject(self.instance))
3232     nl = [self.sstore.GetMasterNode(),
3233           self.instance.primary_node] + list(self.instance.secondary_nodes)
3234     return env, nl, nl
3235
3236   def CheckPrereq(self):
3237     """Check prerequisites.
3238
3239     This checks that the instance is in the cluster.
3240
3241     """
3242     instance = self.cfg.GetInstanceInfo(
3243       self.cfg.ExpandInstanceName(self.op.instance_name))
3244     if instance is None:
3245       raise errors.OpPrereqError("Instance '%s' not known" %
3246                                  self.op.instance_name)
3247     self.instance = instance
3248
3249     if instance.disk_template != constants.DT_REMOTE_RAID1:
3250       raise errors.OpPrereqError("Instance's disk layout is not"
3251                                  " remote_raid1.")
3252     for disk in instance.disks:
3253       if disk.iv_name == self.op.disk_name:
3254         break
3255     else:
3256       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3257                                  " instance." % self.op.disk_name)
3258     for child in disk.children:
3259       if (child.dev_type == constants.LD_DRBD7 and
3260           child.logical_id[2] == self.op.disk_id):
3261         break
3262     else:
3263       raise errors.OpPrereqError("Can't find the device with this port.")
3264
3265     if len(disk.children) < 2:
3266       raise errors.OpPrereqError("Cannot remove the last component from"
3267                                  " a mirror.")
3268     self.disk = disk
3269     self.child = child
3270     if self.child.logical_id[0] == instance.primary_node:
3271       oid = 1
3272     else:
3273       oid = 0
3274     self.old_secondary = self.child.logical_id[oid]
3275
3276   def Exec(self, feedback_fn):
3277     """Remove the mirror component
3278
3279     """
3280     instance = self.instance
3281     disk = self.disk
3282     child = self.child
3283     logger.Info("remove mirror component")
3284     self.cfg.SetDiskID(disk, instance.primary_node)
3285     if not rpc.call_blockdev_removechild(instance.primary_node,
3286                                               disk, child):
3287       raise errors.OpExecError("Can't remove child from mirror.")
3288
3289     for node in child.logical_id[:2]:
3290       self.cfg.SetDiskID(child, node)
3291       if not rpc.call_blockdev_remove(node, child):
3292         logger.Error("Warning: failed to remove device from node %s,"
3293                      " continuing operation." % node)
3294
3295     disk.children.remove(child)
3296     self.cfg.AddInstance(instance)
3297
3298
3299 class LUReplaceDisks(LogicalUnit):
3300   """Replace the disks of an instance.
3301
3302   """
3303   HPATH = "mirrors-replace"
3304   HTYPE = constants.HTYPE_INSTANCE
3305   _OP_REQP = ["instance_name"]
3306
3307   def BuildHooksEnv(self):
3308     """Build hooks env.
3309
3310     This runs on the master, the primary and all the secondaries.
3311
3312     """
3313     env = {
3314       "NEW_SECONDARY": self.op.remote_node,
3315       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3316       }
3317     env.update(_BuildInstanceHookEnvByObject(self.instance))
3318     nl = [self.sstore.GetMasterNode(),
3319           self.instance.primary_node] + list(self.instance.secondary_nodes)
3320     return env, nl, nl
3321
3322   def CheckPrereq(self):
3323     """Check prerequisites.
3324
3325     This checks that the instance is in the cluster.
3326
3327     """
3328     instance = self.cfg.GetInstanceInfo(
3329       self.cfg.ExpandInstanceName(self.op.instance_name))
3330     if instance is None:
3331       raise errors.OpPrereqError("Instance '%s' not known" %
3332                                  self.op.instance_name)
3333     self.instance = instance
3334
3335     if instance.disk_template != constants.DT_REMOTE_RAID1:
3336       raise errors.OpPrereqError("Instance's disk layout is not"
3337                                  " remote_raid1.")
3338
3339     if len(instance.secondary_nodes) != 1:
3340       raise errors.OpPrereqError("The instance has a strange layout,"
3341                                  " expected one secondary but found %d" %
3342                                  len(instance.secondary_nodes))
3343
3344     remote_node = getattr(self.op, "remote_node", None)
3345     if remote_node is None:
3346       remote_node = instance.secondary_nodes[0]
3347     else:
3348       remote_node = self.cfg.ExpandNodeName(remote_node)
3349       if remote_node is None:
3350         raise errors.OpPrereqError("Node '%s' not known" %
3351                                    self.op.remote_node)
3352     if remote_node == instance.primary_node:
3353       raise errors.OpPrereqError("The specified node is the primary node of"
3354                                  " the instance.")
3355     self.op.remote_node = remote_node
3356
3357   def Exec(self, feedback_fn):
3358     """Replace the disks of an instance.
3359
3360     """
3361     instance = self.instance
3362     iv_names = {}
3363     # start of work
3364     remote_node = self.op.remote_node
3365     cfg = self.cfg
3366     for dev in instance.disks:
3367       size = dev.size
3368       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3369       names = _GenerateUniqueNames(cfg, lv_names)
3370       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3371                                        remote_node, size, names)
3372       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3373       logger.Info("adding new mirror component on secondary for %s" %
3374                   dev.iv_name)
3375       #HARDCODE
3376       if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3377                                         _GetInstanceInfoText(instance)):
3378         raise errors.OpExecError("Failed to create new component on"
3379                                  " secondary node %s\n"
3380                                  "Full abort, cleanup manually!" %
3381                                  remote_node)
3382
3383       logger.Info("adding new mirror component on primary")
3384       #HARDCODE
3385       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3386                                       _GetInstanceInfoText(instance)):
3387         # remove secondary dev
3388         cfg.SetDiskID(new_drbd, remote_node)
3389         rpc.call_blockdev_remove(remote_node, new_drbd)
3390         raise errors.OpExecError("Failed to create volume on primary!\n"
3391                                  "Full abort, cleanup manually!!")
3392
3393       # the device exists now
3394       # call the primary node to add the mirror to md
3395       logger.Info("adding new mirror component to md")
3396       if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3397                                         new_drbd):
3398         logger.Error("Can't add mirror compoment to md!")
3399         cfg.SetDiskID(new_drbd, remote_node)
3400         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3401           logger.Error("Can't rollback on secondary")
3402         cfg.SetDiskID(new_drbd, instance.primary_node)
3403         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3404           logger.Error("Can't rollback on primary")
3405         raise errors.OpExecError("Full abort, cleanup manually!!")
3406
3407       dev.children.append(new_drbd)
3408       cfg.AddInstance(instance)
3409
3410     # this can fail as the old devices are degraded and _WaitForSync
3411     # does a combined result over all disks, so we don't check its
3412     # return value
3413     _WaitForSync(cfg, instance, unlock=True)
3414
3415     # so check manually all the devices
3416     for name in iv_names:
3417       dev, child, new_drbd = iv_names[name]
3418       cfg.SetDiskID(dev, instance.primary_node)
3419       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3420       if is_degr:
3421         raise errors.OpExecError("MD device %s is degraded!" % name)
3422       cfg.SetDiskID(new_drbd, instance.primary_node)
3423       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3424       if is_degr:
3425         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3426
3427     for name in iv_names:
3428       dev, child, new_drbd = iv_names[name]
3429       logger.Info("remove mirror %s component" % name)
3430       cfg.SetDiskID(dev, instance.primary_node)
3431       if not rpc.call_blockdev_removechild(instance.primary_node,
3432                                                 dev, child):
3433         logger.Error("Can't remove child from mirror, aborting"
3434                      " *this device cleanup*.\nYou need to cleanup manually!!")
3435         continue
3436
3437       for node in child.logical_id[:2]:
3438         logger.Info("remove child device on %s" % node)
3439         cfg.SetDiskID(child, node)
3440         if not rpc.call_blockdev_remove(node, child):
3441           logger.Error("Warning: failed to remove device from node %s,"
3442                        " continuing operation." % node)
3443
3444       dev.children.remove(child)
3445
3446       cfg.AddInstance(instance)
3447
3448
3449 class LUQueryInstanceData(NoHooksLU):
3450   """Query runtime instance data.
3451
3452   """
3453   _OP_REQP = ["instances"]
3454
3455   def CheckPrereq(self):
3456     """Check prerequisites.
3457
3458     This only checks the optional instance list against the existing names.
3459
3460     """
3461     if not isinstance(self.op.instances, list):
3462       raise errors.OpPrereqError("Invalid argument type 'instances'")
3463     if self.op.instances:
3464       self.wanted_instances = []
3465       names = self.op.instances
3466       for name in names:
3467         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3468         if instance is None:
3469           raise errors.OpPrereqError("No such instance name '%s'" % name)
3470       self.wanted_instances.append(instance)
3471     else:
3472       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3473                                in self.cfg.GetInstanceList()]
3474     return
3475
3476
3477   def _ComputeDiskStatus(self, instance, snode, dev):
3478     """Compute block device status.
3479
3480     """
3481     self.cfg.SetDiskID(dev, instance.primary_node)
3482     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3483     if dev.dev_type == constants.LD_DRBD7:
3484       # we change the snode then (otherwise we use the one passed in)
3485       if dev.logical_id[0] == instance.primary_node:
3486         snode = dev.logical_id[1]
3487       else:
3488         snode = dev.logical_id[0]
3489
3490     if snode:
3491       self.cfg.SetDiskID(dev, snode)
3492       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3493     else:
3494       dev_sstatus = None
3495
3496     if dev.children:
3497       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3498                       for child in dev.children]
3499     else:
3500       dev_children = []
3501
3502     data = {
3503       "iv_name": dev.iv_name,
3504       "dev_type": dev.dev_type,
3505       "logical_id": dev.logical_id,
3506       "physical_id": dev.physical_id,
3507       "pstatus": dev_pstatus,
3508       "sstatus": dev_sstatus,
3509       "children": dev_children,
3510       }
3511
3512     return data
3513
3514   def Exec(self, feedback_fn):
3515     """Gather and return data"""
3516     result = {}
3517     for instance in self.wanted_instances:
3518       remote_info = rpc.call_instance_info(instance.primary_node,
3519                                                 instance.name)
3520       if remote_info and "state" in remote_info:
3521         remote_state = "up"
3522       else:
3523         remote_state = "down"
3524       if instance.status == "down":
3525         config_state = "down"
3526       else:
3527         config_state = "up"
3528
3529       disks = [self._ComputeDiskStatus(instance, None, device)
3530                for device in instance.disks]
3531
3532       idict = {
3533         "name": instance.name,
3534         "config_state": config_state,
3535         "run_state": remote_state,
3536         "pnode": instance.primary_node,
3537         "snodes": instance.secondary_nodes,
3538         "os": instance.os,
3539         "memory": instance.memory,
3540         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3541         "disks": disks,
3542         "vcpus": instance.vcpus,
3543         }
3544
3545       result[instance.name] = idict
3546
3547     return result
3548
3549
3550 class LUSetInstanceParms(LogicalUnit):
3551   """Modifies an instances's parameters.
3552
3553   """
3554   HPATH = "instance-modify"
3555   HTYPE = constants.HTYPE_INSTANCE
3556   _OP_REQP = ["instance_name"]
3557
3558   def BuildHooksEnv(self):
3559     """Build hooks env.
3560
3561     This runs on the master, primary and secondaries.
3562
3563     """
3564     args = dict()
3565     if self.mem:
3566       args['memory'] = self.mem
3567     if self.vcpus:
3568       args['vcpus'] = self.vcpus
3569     if self.do_ip or self.do_bridge:
3570       if self.do_ip:
3571         ip = self.ip
3572       else:
3573         ip = self.instance.nics[0].ip
3574       if self.bridge:
3575         bridge = self.bridge
3576       else:
3577         bridge = self.instance.nics[0].bridge
3578       args['nics'] = [(ip, bridge)]
3579     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3580     nl = [self.sstore.GetMasterNode(),
3581           self.instance.primary_node] + list(self.instance.secondary_nodes)
3582     return env, nl, nl
3583
3584   def CheckPrereq(self):
3585     """Check prerequisites.
3586
3587     This only checks the instance list against the existing names.
3588
3589     """
3590     self.mem = getattr(self.op, "mem", None)
3591     self.vcpus = getattr(self.op, "vcpus", None)
3592     self.ip = getattr(self.op, "ip", None)
3593     self.bridge = getattr(self.op, "bridge", None)
3594     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3595       raise errors.OpPrereqError("No changes submitted")
3596     if self.mem is not None:
3597       try:
3598         self.mem = int(self.mem)
3599       except ValueError, err:
3600         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3601     if self.vcpus is not None:
3602       try:
3603         self.vcpus = int(self.vcpus)
3604       except ValueError, err:
3605         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3606     if self.ip is not None:
3607       self.do_ip = True
3608       if self.ip.lower() == "none":
3609         self.ip = None
3610       else:
3611         if not utils.IsValidIP(self.ip):
3612           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3613     else:
3614       self.do_ip = False
3615     self.do_bridge = (self.bridge is not None)
3616
3617     instance = self.cfg.GetInstanceInfo(
3618       self.cfg.ExpandInstanceName(self.op.instance_name))
3619     if instance is None:
3620       raise errors.OpPrereqError("No such instance name '%s'" %
3621                                  self.op.instance_name)
3622     self.op.instance_name = instance.name
3623     self.instance = instance
3624     return
3625
3626   def Exec(self, feedback_fn):
3627     """Modifies an instance.
3628
3629     All parameters take effect only at the next restart of the instance.
3630     """
3631     result = []
3632     instance = self.instance
3633     if self.mem:
3634       instance.memory = self.mem
3635       result.append(("mem", self.mem))
3636     if self.vcpus:
3637       instance.vcpus = self.vcpus
3638       result.append(("vcpus",  self.vcpus))
3639     if self.do_ip:
3640       instance.nics[0].ip = self.ip
3641       result.append(("ip", self.ip))
3642     if self.bridge:
3643       instance.nics[0].bridge = self.bridge
3644       result.append(("bridge", self.bridge))
3645
3646     self.cfg.AddInstance(instance)
3647
3648     return result
3649
3650
3651 class LUQueryExports(NoHooksLU):
3652   """Query the exports list
3653
3654   """
3655   _OP_REQP = []
3656
3657   def CheckPrereq(self):
3658     """Check that the nodelist contains only existing nodes.
3659
3660     """
3661     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3662
3663   def Exec(self, feedback_fn):
3664     """Compute the list of all the exported system images.
3665
3666     Returns:
3667       a dictionary with the structure node->(export-list)
3668       where export-list is a list of the instances exported on
3669       that node.
3670
3671     """
3672     return rpc.call_export_list(self.nodes)
3673
3674
3675 class LUExportInstance(LogicalUnit):
3676   """Export an instance to an image in the cluster.
3677
3678   """
3679   HPATH = "instance-export"
3680   HTYPE = constants.HTYPE_INSTANCE
3681   _OP_REQP = ["instance_name", "target_node", "shutdown"]
3682
3683   def BuildHooksEnv(self):
3684     """Build hooks env.
3685
3686     This will run on the master, primary node and target node.
3687
3688     """
3689     env = {
3690       "EXPORT_NODE": self.op.target_node,
3691       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3692       }
3693     env.update(_BuildInstanceHookEnvByObject(self.instance))
3694     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3695           self.op.target_node]
3696     return env, nl, nl
3697
3698   def CheckPrereq(self):
3699     """Check prerequisites.
3700
3701     This checks that the instance name is a valid one.
3702
3703     """
3704     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3705     self.instance = self.cfg.GetInstanceInfo(instance_name)
3706     if self.instance is None:
3707       raise errors.OpPrereqError("Instance '%s' not found" %
3708                                  self.op.instance_name)
3709
3710     # node verification
3711     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3712     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3713
3714     if self.dst_node is None:
3715       raise errors.OpPrereqError("Destination node '%s' is unknown." %
3716                                  self.op.target_node)
3717     self.op.target_node = self.dst_node.name
3718
3719   def Exec(self, feedback_fn):
3720     """Export an instance to an image in the cluster.
3721
3722     """
3723     instance = self.instance
3724     dst_node = self.dst_node
3725     src_node = instance.primary_node
3726     # shutdown the instance, unless requested not to do so
3727     if self.op.shutdown:
3728       op = opcodes.OpShutdownInstance(instance_name=instance.name)
3729       self.processor.ChainOpCode(op, feedback_fn)
3730
3731     vgname = self.cfg.GetVGName()
3732
3733     snap_disks = []
3734
3735     try:
3736       for disk in instance.disks:
3737         if disk.iv_name == "sda":
3738           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3739           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3740
3741           if not new_dev_name:
3742             logger.Error("could not snapshot block device %s on node %s" %
3743                          (disk.logical_id[1], src_node))
3744           else:
3745             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
3746                                       logical_id=(vgname, new_dev_name),
3747                                       physical_id=(vgname, new_dev_name),
3748                                       iv_name=disk.iv_name)
3749             snap_disks.append(new_dev)
3750
3751     finally:
3752       if self.op.shutdown:
3753         op = opcodes.OpStartupInstance(instance_name=instance.name,
3754                                        force=False)
3755         self.processor.ChainOpCode(op, feedback_fn)
3756
3757     # TODO: check for size
3758
3759     for dev in snap_disks:
3760       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3761                                            instance):
3762         logger.Error("could not export block device %s from node"
3763                      " %s to node %s" %
3764                      (dev.logical_id[1], src_node, dst_node.name))
3765       if not rpc.call_blockdev_remove(src_node, dev):
3766         logger.Error("could not remove snapshot block device %s from"
3767                      " node %s" % (dev.logical_id[1], src_node))
3768
3769     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3770       logger.Error("could not finalize export for instance %s on node %s" %
3771                    (instance.name, dst_node.name))
3772
3773     nodelist = self.cfg.GetNodeList()
3774     nodelist.remove(dst_node.name)
3775
3776     # on one-node clusters nodelist will be empty after the removal
3777     # if we proceed the backup would be removed because OpQueryExports
3778     # substitutes an empty list with the full cluster node list.
3779     if nodelist:
3780       op = opcodes.OpQueryExports(nodes=nodelist)
3781       exportlist = self.processor.ChainOpCode(op, feedback_fn)
3782       for node in exportlist:
3783         if instance.name in exportlist[node]:
3784           if not rpc.call_export_remove(node, instance.name):
3785             logger.Error("could not remove older export for instance %s"
3786                          " on node %s" % (instance.name, node))
3787
3788
3789 class TagsLU(NoHooksLU):
3790   """Generic tags LU.
3791
3792   This is an abstract class which is the parent of all the other tags LUs.
3793
3794   """
3795   def CheckPrereq(self):
3796     """Check prerequisites.
3797
3798     """
3799     if self.op.kind == constants.TAG_CLUSTER:
3800       self.target = self.cfg.GetClusterInfo()
3801     elif self.op.kind == constants.TAG_NODE:
3802       name = self.cfg.ExpandNodeName(self.op.name)
3803       if name is None:
3804         raise errors.OpPrereqError("Invalid node name (%s)" %
3805                                    (self.op.name,))
3806       self.op.name = name
3807       self.target = self.cfg.GetNodeInfo(name)
3808     elif self.op.kind == constants.TAG_INSTANCE:
3809       name = self.cfg.ExpandInstanceName(self.op.name)
3810       if name is None:
3811         raise errors.OpPrereqError("Invalid instance name (%s)" %
3812                                    (self.op.name,))
3813       self.op.name = name
3814       self.target = self.cfg.GetInstanceInfo(name)
3815     else:
3816       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3817                                  str(self.op.kind))
3818
3819
3820 class LUGetTags(TagsLU):
3821   """Returns the tags of a given object.
3822
3823   """
3824   _OP_REQP = ["kind", "name"]
3825
3826   def Exec(self, feedback_fn):
3827     """Returns the tag list.
3828
3829     """
3830     return self.target.GetTags()
3831
3832
3833 class LUAddTags(TagsLU):
3834   """Sets a tag on a given object.
3835
3836   """
3837   _OP_REQP = ["kind", "name", "tags"]
3838
3839   def CheckPrereq(self):
3840     """Check prerequisites.
3841
3842     This checks the type and length of the tag name and value.
3843
3844     """
3845     TagsLU.CheckPrereq(self)
3846     for tag in self.op.tags:
3847       objects.TaggableObject.ValidateTag(tag)
3848
3849   def Exec(self, feedback_fn):
3850     """Sets the tag.
3851
3852     """
3853     try:
3854       for tag in self.op.tags:
3855         self.target.AddTag(tag)
3856     except errors.TagError, err:
3857       raise errors.OpExecError("Error while setting tag: %s" % str(err))
3858     try:
3859       self.cfg.Update(self.target)
3860     except errors.ConfigurationError:
3861       raise errors.OpRetryError("There has been a modification to the"
3862                                 " config file and the operation has been"
3863                                 " aborted. Please retry.")
3864
3865
3866 class LUDelTags(TagsLU):
3867   """Delete a list of tags from a given object.
3868
3869   """
3870   _OP_REQP = ["kind", "name", "tags"]
3871
3872   def CheckPrereq(self):
3873     """Check prerequisites.
3874
3875     This checks that we have the given tag.
3876
3877     """
3878     TagsLU.CheckPrereq(self)
3879     for tag in self.op.tags:
3880       objects.TaggableObject.ValidateTag(tag)
3881     del_tags = frozenset(self.op.tags)
3882     cur_tags = self.target.GetTags()
3883     if not del_tags <= cur_tags:
3884       diff_tags = del_tags - cur_tags
3885       diff_names = ["'%s'" % tag for tag in diff_tags]
3886       diff_names.sort()
3887       raise errors.OpPrereqError("Tag(s) %s not found" %
3888                                  (",".join(diff_names)))
3889
3890   def Exec(self, feedback_fn):
3891     """Remove the tag from the object.
3892
3893     """
3894     for tag in self.op.tags:
3895       self.target.RemoveTag(tag)
3896     try:
3897       self.cfg.Update(self.target)
3898     except errors.ConfigurationError:
3899       raise errors.OpRetryError("There has been a modification to the"
3900                                 " config file and the operation has been"
3901                                 " aborted. Please retry.")