Modify two mirror-device related rpc calls
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import socket
30 import time
31 import tempfile
32 import re
33 import platform
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.processor = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     for attr_name in self._OP_REQP:
78       attr_val = getattr(op, attr_name, None)
79       if attr_val is None:
80         raise errors.OpPrereqError("Required parameter '%s' missing" %
81                                    attr_name)
82     if self.REQ_CLUSTER:
83       if not cfg.IsCluster():
84         raise errors.OpPrereqError("Cluster not initialized yet,"
85                                    " use 'gnt-cluster init' first.")
86       if self.REQ_MASTER:
87         master = sstore.GetMasterNode()
88         if master != utils.HostInfo().name:
89           raise errors.OpPrereqError("Commands must be run on the master"
90                                      " node %s" % master)
91
92   def CheckPrereq(self):
93     """Check prerequisites for this LU.
94
95     This method should check that the prerequisites for the execution
96     of this LU are fulfilled. It can do internode communication, but
97     it should be idempotent - no cluster or system changes are
98     allowed.
99
100     The method should raise errors.OpPrereqError in case something is
101     not fulfilled. Its return value is ignored.
102
103     This method should also update all the parameters of the opcode to
104     their canonical form; e.g. a short node name must be fully
105     expanded after this method has successfully completed (so that
106     hooks, logging, etc. work correctly).
107
108     """
109     raise NotImplementedError
110
111   def Exec(self, feedback_fn):
112     """Execute the LU.
113
114     This method should implement the actual work. It should raise
115     errors.OpExecError for failures that are somewhat dealt with in
116     code, or expected.
117
118     """
119     raise NotImplementedError
120
121   def BuildHooksEnv(self):
122     """Build hooks environment for this LU.
123
124     This method should return a three-node tuple consisting of: a dict
125     containing the environment that will be used for running the
126     specific hook for this LU, a list of node names on which the hook
127     should run before the execution, and a list of node names on which
128     the hook should run after the execution.
129
130     The keys of the dict must not have 'GANETI_' prefixed as this will
131     be handled in the hooks runner. Also note additional keys will be
132     added by the hooks runner. If the LU doesn't define any
133     environment, an empty dict (and not None) should be returned.
134
135     As for the node lists, the master should not be included in the
136     them, as it will be added by the hooks runner in case this LU
137     requires a cluster to run on (otherwise we don't have a node
138     list). No nodes should be returned as an empty list (and not
139     None).
140
141     Note that if the HPATH for a LU class is None, this function will
142     not be called.
143
144     """
145     raise NotImplementedError
146
147
148 class NoHooksLU(LogicalUnit):
149   """Simple LU which runs no hooks.
150
151   This LU is intended as a parent for other LogicalUnits which will
152   run no hooks, in order to reduce duplicate code.
153
154   """
155   HPATH = None
156   HTYPE = None
157
158   def BuildHooksEnv(self):
159     """Build hooks env.
160
161     This is a no-op, since we don't run hooks.
162
163     """
164     return {}, [], []
165
166
167 def _GetWantedNodes(lu, nodes):
168   """Returns list of checked and expanded node names.
169
170   Args:
171     nodes: List of nodes (strings) or None for all
172
173   """
174   if not isinstance(nodes, list):
175     raise errors.OpPrereqError("Invalid argument type 'nodes'")
176
177   if nodes:
178     wanted = []
179
180     for name in nodes:
181       node = lu.cfg.ExpandNodeName(name)
182       if node is None:
183         raise errors.OpPrereqError("No such node name '%s'" % name)
184       wanted.append(node)
185
186   else:
187     wanted = lu.cfg.GetNodeList()
188   return utils.NiceSort(wanted)
189
190
191 def _GetWantedInstances(lu, instances):
192   """Returns list of checked and expanded instance names.
193
194   Args:
195     instances: List of instances (strings) or None for all
196
197   """
198   if not isinstance(instances, list):
199     raise errors.OpPrereqError("Invalid argument type 'instances'")
200
201   if instances:
202     wanted = []
203
204     for name in instances:
205       instance = lu.cfg.ExpandInstanceName(name)
206       if instance is None:
207         raise errors.OpPrereqError("No such instance name '%s'" % name)
208       wanted.append(instance)
209
210   else:
211     wanted = lu.cfg.GetInstanceList()
212   return utils.NiceSort(wanted)
213
214
215 def _CheckOutputFields(static, dynamic, selected):
216   """Checks whether all selected fields are valid.
217
218   Args:
219     static: Static fields
220     dynamic: Dynamic fields
221
222   """
223   static_fields = frozenset(static)
224   dynamic_fields = frozenset(dynamic)
225
226   all_fields = static_fields | dynamic_fields
227
228   if not all_fields.issuperset(selected):
229     raise errors.OpPrereqError("Unknown output fields selected: %s"
230                                % ",".join(frozenset(selected).
231                                           difference(all_fields)))
232
233
234 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235                           memory, vcpus, nics):
236   """Builds instance related env variables for hooks from single variables.
237
238   Args:
239     secondary_nodes: List of secondary nodes as strings
240   """
241   env = {
242     "OP_TARGET": name,
243     "INSTANCE_NAME": name,
244     "INSTANCE_PRIMARY": primary_node,
245     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
246     "INSTANCE_OS_TYPE": os_type,
247     "INSTANCE_STATUS": status,
248     "INSTANCE_MEMORY": memory,
249     "INSTANCE_VCPUS": vcpus,
250   }
251
252   if nics:
253     nic_count = len(nics)
254     for idx, (ip, bridge) in enumerate(nics):
255       if ip is None:
256         ip = ""
257       env["INSTANCE_NIC%d_IP" % idx] = ip
258       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
259   else:
260     nic_count = 0
261
262   env["INSTANCE_NIC_COUNT"] = nic_count
263
264   return env
265
266
267 def _BuildInstanceHookEnvByObject(instance, override=None):
268   """Builds instance related env variables for hooks from an object.
269
270   Args:
271     instance: objects.Instance object of instance
272     override: dict of values to override
273   """
274   args = {
275     'name': instance.name,
276     'primary_node': instance.primary_node,
277     'secondary_nodes': instance.secondary_nodes,
278     'os_type': instance.os,
279     'status': instance.os,
280     'memory': instance.memory,
281     'vcpus': instance.vcpus,
282     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
283   }
284   if override:
285     args.update(override)
286   return _BuildInstanceHookEnv(**args)
287
288
289 def _UpdateEtcHosts(fullnode, ip):
290   """Ensure a node has a correct entry in /etc/hosts.
291
292   Args:
293     fullnode - Fully qualified domain name of host. (str)
294     ip       - IPv4 address of host (str)
295
296   """
297   node = fullnode.split(".", 1)[0]
298
299   f = open('/etc/hosts', 'r+')
300
301   inthere = False
302
303   save_lines = []
304   add_lines = []
305   removed = False
306
307   while True:
308     rawline = f.readline()
309
310     if not rawline:
311       # End of file
312       break
313
314     line = rawline.split('\n')[0]
315
316     # Strip off comments
317     line = line.split('#')[0]
318
319     if not line:
320       # Entire line was comment, skip
321       save_lines.append(rawline)
322       continue
323
324     fields = line.split()
325
326     haveall = True
327     havesome = False
328     for spec in [ ip, fullnode, node ]:
329       if spec not in fields:
330         haveall = False
331       if spec in fields:
332         havesome = True
333
334     if haveall:
335       inthere = True
336       save_lines.append(rawline)
337       continue
338
339     if havesome and not haveall:
340       # Line (old, or manual?) which is missing some.  Remove.
341       removed = True
342       continue
343
344     save_lines.append(rawline)
345
346   if not inthere:
347     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
348
349   if removed:
350     if add_lines:
351       save_lines = save_lines + add_lines
352
353     # We removed a line, write a new file and replace old.
354     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
355     newfile = os.fdopen(fd, 'w')
356     newfile.write(''.join(save_lines))
357     newfile.close()
358     os.rename(tmpname, '/etc/hosts')
359
360   elif add_lines:
361     # Simply appending a new line will do the trick.
362     f.seek(0, 2)
363     for add in add_lines:
364       f.write(add)
365
366   f.close()
367
368
369 def _UpdateKnownHosts(fullnode, ip, pubkey):
370   """Ensure a node has a correct known_hosts entry.
371
372   Args:
373     fullnode - Fully qualified domain name of host. (str)
374     ip       - IPv4 address of host (str)
375     pubkey   - the public key of the cluster
376
377   """
378   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
379     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
380   else:
381     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
382
383   inthere = False
384
385   save_lines = []
386   add_lines = []
387   removed = False
388
389   while True:
390     rawline = f.readline()
391     logger.Debug('read %s' % (repr(rawline),))
392
393     if not rawline:
394       # End of file
395       break
396
397     line = rawline.split('\n')[0]
398
399     parts = line.split(' ')
400     fields = parts[0].split(',')
401     key = parts[2]
402
403     haveall = True
404     havesome = False
405     for spec in [ ip, fullnode ]:
406       if spec not in fields:
407         haveall = False
408       if spec in fields:
409         havesome = True
410
411     logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
412     if haveall and key == pubkey:
413       inthere = True
414       save_lines.append(rawline)
415       logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
416       continue
417
418     if havesome and (not haveall or key != pubkey):
419       removed = True
420       logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
421       continue
422
423     save_lines.append(rawline)
424
425   if not inthere:
426     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
427     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
428
429   if removed:
430     save_lines = save_lines + add_lines
431
432     # Write a new file and replace old.
433     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
434                                    constants.DATA_DIR)
435     newfile = os.fdopen(fd, 'w')
436     try:
437       newfile.write(''.join(save_lines))
438     finally:
439       newfile.close()
440     logger.Debug("Wrote new known_hosts.")
441     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
442
443   elif add_lines:
444     # Simply appending a new line will do the trick.
445     f.seek(0, 2)
446     for add in add_lines:
447       f.write(add)
448
449   f.close()
450
451
452 def _HasValidVG(vglist, vgname):
453   """Checks if the volume group list is valid.
454
455   A non-None return value means there's an error, and the return value
456   is the error message.
457
458   """
459   vgsize = vglist.get(vgname, None)
460   if vgsize is None:
461     return "volume group '%s' missing" % vgname
462   elif vgsize < 20480:
463     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
464             (vgname, vgsize))
465   return None
466
467
468 def _InitSSHSetup(node):
469   """Setup the SSH configuration for the cluster.
470
471
472   This generates a dsa keypair for root, adds the pub key to the
473   permitted hosts and adds the hostkey to its own known hosts.
474
475   Args:
476     node: the name of this host as a fqdn
477
478   """
479   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
480
481   for name in priv_key, pub_key:
482     if os.path.exists(name):
483       utils.CreateBackup(name)
484     utils.RemoveFile(name)
485
486   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487                          "-f", priv_key,
488                          "-q", "-N", ""])
489   if result.failed:
490     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
491                              result.output)
492
493   f = open(pub_key, 'r')
494   try:
495     utils.AddAuthorizedKey(auth_keys, f.read(8192))
496   finally:
497     f.close()
498
499
500 def _InitGanetiServerSetup(ss):
501   """Setup the necessary configuration for the initial node daemon.
502
503   This creates the nodepass file containing the shared password for
504   the cluster and also generates the SSL certificate.
505
506   """
507   # Create pseudo random password
508   randpass = sha.new(os.urandom(64)).hexdigest()
509   # and write it into sstore
510   ss.SetKey(ss.SS_NODED_PASS, randpass)
511
512   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513                          "-days", str(365*5), "-nodes", "-x509",
514                          "-keyout", constants.SSL_CERT_FILE,
515                          "-out", constants.SSL_CERT_FILE, "-batch"])
516   if result.failed:
517     raise errors.OpExecError("could not generate server ssl cert, command"
518                              " %s had exitcode %s and error message %s" %
519                              (result.cmd, result.exit_code, result.output))
520
521   os.chmod(constants.SSL_CERT_FILE, 0400)
522
523   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
524
525   if result.failed:
526     raise errors.OpExecError("Could not start the node daemon, command %s"
527                              " had exitcode %s and error %s" %
528                              (result.cmd, result.exit_code, result.output))
529
530
531 def _CheckInstanceBridgesExist(instance):
532   """Check that the brigdes needed by an instance exist.
533
534   """
535   # check bridges existance
536   brlist = [nic.bridge for nic in instance.nics]
537   if not rpc.call_bridges_exist(instance.primary_node, brlist):
538     raise errors.OpPrereqError("one or more target bridges %s does not"
539                                " exist on destination node '%s'" %
540                                (brlist, instance.primary_node))
541
542
543 class LUInitCluster(LogicalUnit):
544   """Initialise the cluster.
545
546   """
547   HPATH = "cluster-init"
548   HTYPE = constants.HTYPE_CLUSTER
549   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
550               "def_bridge", "master_netdev"]
551   REQ_CLUSTER = False
552
553   def BuildHooksEnv(self):
554     """Build hooks env.
555
556     Notes: Since we don't require a cluster, we must manually add
557     ourselves in the post-run node list.
558
559     """
560     env = {"OP_TARGET": self.op.cluster_name}
561     return env, [], [self.hostname.name]
562
563   def CheckPrereq(self):
564     """Verify that the passed name is a valid one.
565
566     """
567     if config.ConfigWriter.IsCluster():
568       raise errors.OpPrereqError("Cluster is already initialised")
569
570     self.hostname = hostname = utils.HostInfo()
571
572     if hostname.ip.startswith("127."):
573       raise errors.OpPrereqError("This host's IP resolves to the private"
574                                  " range (%s). Please fix DNS or /etc/hosts." %
575                                  (hostname.ip,))
576
577     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
578
579     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
580                          constants.DEFAULT_NODED_PORT):
581       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
582                                  " to %s,\nbut this ip address does not"
583                                  " belong to this host."
584                                  " Aborting." % hostname.ip)
585
586     secondary_ip = getattr(self.op, "secondary_ip", None)
587     if secondary_ip and not utils.IsValidIP(secondary_ip):
588       raise errors.OpPrereqError("Invalid secondary ip given")
589     if (secondary_ip and
590         secondary_ip != hostname.ip and
591         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
592                            constants.DEFAULT_NODED_PORT))):
593       raise errors.OpPrereqError("You gave %s as secondary IP,\n"
594                                  "but it does not belong to this host." %
595                                  secondary_ip)
596     self.secondary_ip = secondary_ip
597
598     # checks presence of the volume group given
599     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
600
601     if vgstatus:
602       raise errors.OpPrereqError("Error: %s" % vgstatus)
603
604     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
605                     self.op.mac_prefix):
606       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
607                                  self.op.mac_prefix)
608
609     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
610       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
611                                  self.op.hypervisor_type)
612
613     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
614     if result.failed:
615       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
616                                  (self.op.master_netdev,
617                                   result.output.strip()))
618
619   def Exec(self, feedback_fn):
620     """Initialize the cluster.
621
622     """
623     clustername = self.clustername
624     hostname = self.hostname
625
626     # set up the simple store
627     self.sstore = ss = ssconf.SimpleStore()
628     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
629     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
630     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
631     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
632     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
633
634     # set up the inter-node password and certificate
635     _InitGanetiServerSetup(ss)
636
637     # start the master ip
638     rpc.call_node_start_master(hostname.name)
639
640     # set up ssh config and /etc/hosts
641     f = open(constants.SSH_HOST_RSA_PUB, 'r')
642     try:
643       sshline = f.read()
644     finally:
645       f.close()
646     sshkey = sshline.split(" ")[1]
647
648     _UpdateEtcHosts(hostname.name, hostname.ip)
649
650     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
651
652     _InitSSHSetup(hostname.name)
653
654     # init of cluster config file
655     self.cfg = cfgw = config.ConfigWriter()
656     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
657                     sshkey, self.op.mac_prefix,
658                     self.op.vg_name, self.op.def_bridge)
659
660
661 class LUDestroyCluster(NoHooksLU):
662   """Logical unit for destroying the cluster.
663
664   """
665   _OP_REQP = []
666
667   def CheckPrereq(self):
668     """Check prerequisites.
669
670     This checks whether the cluster is empty.
671
672     Any errors are signalled by raising errors.OpPrereqError.
673
674     """
675     master = self.sstore.GetMasterNode()
676
677     nodelist = self.cfg.GetNodeList()
678     if len(nodelist) != 1 or nodelist[0] != master:
679       raise errors.OpPrereqError("There are still %d node(s) in"
680                                  " this cluster." % (len(nodelist) - 1))
681     instancelist = self.cfg.GetInstanceList()
682     if instancelist:
683       raise errors.OpPrereqError("There are still %d instance(s) in"
684                                  " this cluster." % len(instancelist))
685
686   def Exec(self, feedback_fn):
687     """Destroys the cluster.
688
689     """
690     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
691     utils.CreateBackup(priv_key)
692     utils.CreateBackup(pub_key)
693     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
694
695
696 class LUVerifyCluster(NoHooksLU):
697   """Verifies the cluster status.
698
699   """
700   _OP_REQP = []
701
702   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
703                   remote_version, feedback_fn):
704     """Run multiple tests against a node.
705
706     Test list:
707       - compares ganeti version
708       - checks vg existance and size > 20G
709       - checks config file checksum
710       - checks ssh to other nodes
711
712     Args:
713       node: name of the node to check
714       file_list: required list of files
715       local_cksum: dictionary of local files and their checksums
716
717     """
718     # compares ganeti version
719     local_version = constants.PROTOCOL_VERSION
720     if not remote_version:
721       feedback_fn(" - ERROR: connection to %s failed" % (node))
722       return True
723
724     if local_version != remote_version:
725       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
726                       (local_version, node, remote_version))
727       return True
728
729     # checks vg existance and size > 20G
730
731     bad = False
732     if not vglist:
733       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
734                       (node,))
735       bad = True
736     else:
737       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
738       if vgstatus:
739         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
740         bad = True
741
742     # checks config file checksum
743     # checks ssh to any
744
745     if 'filelist' not in node_result:
746       bad = True
747       feedback_fn("  - ERROR: node hasn't returned file checksum data")
748     else:
749       remote_cksum = node_result['filelist']
750       for file_name in file_list:
751         if file_name not in remote_cksum:
752           bad = True
753           feedback_fn("  - ERROR: file '%s' missing" % file_name)
754         elif remote_cksum[file_name] != local_cksum[file_name]:
755           bad = True
756           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
757
758     if 'nodelist' not in node_result:
759       bad = True
760       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
761     else:
762       if node_result['nodelist']:
763         bad = True
764         for node in node_result['nodelist']:
765           feedback_fn("  - ERROR: communication with node '%s': %s" %
766                           (node, node_result['nodelist'][node]))
767     hyp_result = node_result.get('hypervisor', None)
768     if hyp_result is not None:
769       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
770     return bad
771
772   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
773     """Verify an instance.
774
775     This function checks to see if the required block devices are
776     available on the instance's node.
777
778     """
779     bad = False
780
781     instancelist = self.cfg.GetInstanceList()
782     if not instance in instancelist:
783       feedback_fn("  - ERROR: instance %s not in instance list %s" %
784                       (instance, instancelist))
785       bad = True
786
787     instanceconfig = self.cfg.GetInstanceInfo(instance)
788     node_current = instanceconfig.primary_node
789
790     node_vol_should = {}
791     instanceconfig.MapLVsByNode(node_vol_should)
792
793     for node in node_vol_should:
794       for volume in node_vol_should[node]:
795         if node not in node_vol_is or volume not in node_vol_is[node]:
796           feedback_fn("  - ERROR: volume %s missing on node %s" %
797                           (volume, node))
798           bad = True
799
800     if not instanceconfig.status == 'down':
801       if not instance in node_instance[node_current]:
802         feedback_fn("  - ERROR: instance %s not running on node %s" %
803                         (instance, node_current))
804         bad = True
805
806     for node in node_instance:
807       if (not node == node_current):
808         if instance in node_instance[node]:
809           feedback_fn("  - ERROR: instance %s should not run on node %s" %
810                           (instance, node))
811           bad = True
812
813     return bad
814
815   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
816     """Verify if there are any unknown volumes in the cluster.
817
818     The .os, .swap and backup volumes are ignored. All other volumes are
819     reported as unknown.
820
821     """
822     bad = False
823
824     for node in node_vol_is:
825       for volume in node_vol_is[node]:
826         if node not in node_vol_should or volume not in node_vol_should[node]:
827           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
828                       (volume, node))
829           bad = True
830     return bad
831
832   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
833     """Verify the list of running instances.
834
835     This checks what instances are running but unknown to the cluster.
836
837     """
838     bad = False
839     for node in node_instance:
840       for runninginstance in node_instance[node]:
841         if runninginstance not in instancelist:
842           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
843                           (runninginstance, node))
844           bad = True
845     return bad
846
847   def CheckPrereq(self):
848     """Check prerequisites.
849
850     This has no prerequisites.
851
852     """
853     pass
854
855   def Exec(self, feedback_fn):
856     """Verify integrity of cluster, performing various test on nodes.
857
858     """
859     bad = False
860     feedback_fn("* Verifying global settings")
861     self.cfg.VerifyConfig()
862
863     master = self.sstore.GetMasterNode()
864     vg_name = self.cfg.GetVGName()
865     nodelist = utils.NiceSort(self.cfg.GetNodeList())
866     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
867     node_volume = {}
868     node_instance = {}
869
870     # FIXME: verify OS list
871     # do local checksums
872     file_names = list(self.sstore.GetFileList())
873     file_names.append(constants.SSL_CERT_FILE)
874     file_names.append(constants.CLUSTER_CONF_FILE)
875     local_checksums = utils.FingerprintFiles(file_names)
876
877     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
878     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
879     all_instanceinfo = rpc.call_instance_list(nodelist)
880     all_vglist = rpc.call_vg_list(nodelist)
881     node_verify_param = {
882       'filelist': file_names,
883       'nodelist': nodelist,
884       'hypervisor': None,
885       }
886     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
887     all_rversion = rpc.call_version(nodelist)
888
889     for node in nodelist:
890       feedback_fn("* Verifying node %s" % node)
891       result = self._VerifyNode(node, file_names, local_checksums,
892                                 all_vglist[node], all_nvinfo[node],
893                                 all_rversion[node], feedback_fn)
894       bad = bad or result
895
896       # node_volume
897       volumeinfo = all_volumeinfo[node]
898
899       if type(volumeinfo) != dict:
900         feedback_fn("  - ERROR: connection to %s failed" % (node,))
901         bad = True
902         continue
903
904       node_volume[node] = volumeinfo
905
906       # node_instance
907       nodeinstance = all_instanceinfo[node]
908       if type(nodeinstance) != list:
909         feedback_fn("  - ERROR: connection to %s failed" % (node,))
910         bad = True
911         continue
912
913       node_instance[node] = nodeinstance
914
915     node_vol_should = {}
916
917     for instance in instancelist:
918       feedback_fn("* Verifying instance %s" % instance)
919       result =  self._VerifyInstance(instance, node_volume, node_instance,
920                                      feedback_fn)
921       bad = bad or result
922
923       inst_config = self.cfg.GetInstanceInfo(instance)
924
925       inst_config.MapLVsByNode(node_vol_should)
926
927     feedback_fn("* Verifying orphan volumes")
928     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
929                                        feedback_fn)
930     bad = bad or result
931
932     feedback_fn("* Verifying remaining instances")
933     result = self._VerifyOrphanInstances(instancelist, node_instance,
934                                          feedback_fn)
935     bad = bad or result
936
937     return int(bad)
938
939
940 class LURenameCluster(LogicalUnit):
941   """Rename the cluster.
942
943   """
944   HPATH = "cluster-rename"
945   HTYPE = constants.HTYPE_CLUSTER
946   _OP_REQP = ["name"]
947
948   def BuildHooksEnv(self):
949     """Build hooks env.
950
951     """
952     env = {
953       "OP_TARGET": self.op.sstore.GetClusterName(),
954       "NEW_NAME": self.op.name,
955       }
956     mn = self.sstore.GetMasterNode()
957     return env, [mn], [mn]
958
959   def CheckPrereq(self):
960     """Verify that the passed name is a valid one.
961
962     """
963     hostname = utils.HostInfo(self.op.name)
964
965     new_name = hostname.name
966     self.ip = new_ip = hostname.ip
967     old_name = self.sstore.GetClusterName()
968     old_ip = self.sstore.GetMasterIP()
969     if new_name == old_name and new_ip == old_ip:
970       raise errors.OpPrereqError("Neither the name nor the IP address of the"
971                                  " cluster has changed")
972     if new_ip != old_ip:
973       result = utils.RunCmd(["fping", "-q", new_ip])
974       if not result.failed:
975         raise errors.OpPrereqError("The given cluster IP address (%s) is"
976                                    " reachable on the network. Aborting." %
977                                    new_ip)
978
979     self.op.name = new_name
980
981   def Exec(self, feedback_fn):
982     """Rename the cluster.
983
984     """
985     clustername = self.op.name
986     ip = self.ip
987     ss = self.sstore
988
989     # shutdown the master IP
990     master = ss.GetMasterNode()
991     if not rpc.call_node_stop_master(master):
992       raise errors.OpExecError("Could not disable the master role")
993
994     try:
995       # modify the sstore
996       ss.SetKey(ss.SS_MASTER_IP, ip)
997       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
998
999       # Distribute updated ss config to all nodes
1000       myself = self.cfg.GetNodeInfo(master)
1001       dist_nodes = self.cfg.GetNodeList()
1002       if myself.name in dist_nodes:
1003         dist_nodes.remove(myself.name)
1004
1005       logger.Debug("Copying updated ssconf data to all nodes")
1006       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1007         fname = ss.KeyToFilename(keyname)
1008         result = rpc.call_upload_file(dist_nodes, fname)
1009         for to_node in dist_nodes:
1010           if not result[to_node]:
1011             logger.Error("copy of file %s to node %s failed" %
1012                          (fname, to_node))
1013     finally:
1014       if not rpc.call_node_start_master(master):
1015         logger.Error("Could not re-enable the master role on the master,\n"
1016                      "please restart manually.")
1017
1018
1019 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1020   """Sleep and poll for an instance's disk to sync.
1021
1022   """
1023   if not instance.disks:
1024     return True
1025
1026   if not oneshot:
1027     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1028
1029   node = instance.primary_node
1030
1031   for dev in instance.disks:
1032     cfgw.SetDiskID(dev, node)
1033
1034   retries = 0
1035   while True:
1036     max_time = 0
1037     done = True
1038     cumul_degraded = False
1039     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1040     if not rstats:
1041       logger.ToStderr("Can't get any data from node %s" % node)
1042       retries += 1
1043       if retries >= 10:
1044         raise errors.RemoteError("Can't contact node %s for mirror data,"
1045                                  " aborting." % node)
1046       time.sleep(6)
1047       continue
1048     retries = 0
1049     for i in range(len(rstats)):
1050       mstat = rstats[i]
1051       if mstat is None:
1052         logger.ToStderr("Can't compute data for node %s/%s" %
1053                         (node, instance.disks[i].iv_name))
1054         continue
1055       perc_done, est_time, is_degraded = mstat
1056       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1057       if perc_done is not None:
1058         done = False
1059         if est_time is not None:
1060           rem_time = "%d estimated seconds remaining" % est_time
1061           max_time = est_time
1062         else:
1063           rem_time = "no time estimate"
1064         logger.ToStdout("- device %s: %5.2f%% done, %s" %
1065                         (instance.disks[i].iv_name, perc_done, rem_time))
1066     if done or oneshot:
1067       break
1068
1069     if unlock:
1070       utils.Unlock('cmd')
1071     try:
1072       time.sleep(min(60, max_time))
1073     finally:
1074       if unlock:
1075         utils.Lock('cmd')
1076
1077   if done:
1078     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1079   return not cumul_degraded
1080
1081
1082 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1083   """Check that mirrors are not degraded.
1084
1085   """
1086   cfgw.SetDiskID(dev, node)
1087
1088   result = True
1089   if on_primary or dev.AssembleOnSecondary():
1090     rstats = rpc.call_blockdev_find(node, dev)
1091     if not rstats:
1092       logger.ToStderr("Can't get any data from node %s" % node)
1093       result = False
1094     else:
1095       result = result and (not rstats[5])
1096   if dev.children:
1097     for child in dev.children:
1098       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1099
1100   return result
1101
1102
1103 class LUDiagnoseOS(NoHooksLU):
1104   """Logical unit for OS diagnose/query.
1105
1106   """
1107   _OP_REQP = []
1108
1109   def CheckPrereq(self):
1110     """Check prerequisites.
1111
1112     This always succeeds, since this is a pure query LU.
1113
1114     """
1115     return
1116
1117   def Exec(self, feedback_fn):
1118     """Compute the list of OSes.
1119
1120     """
1121     node_list = self.cfg.GetNodeList()
1122     node_data = rpc.call_os_diagnose(node_list)
1123     if node_data == False:
1124       raise errors.OpExecError("Can't gather the list of OSes")
1125     return node_data
1126
1127
1128 class LURemoveNode(LogicalUnit):
1129   """Logical unit for removing a node.
1130
1131   """
1132   HPATH = "node-remove"
1133   HTYPE = constants.HTYPE_NODE
1134   _OP_REQP = ["node_name"]
1135
1136   def BuildHooksEnv(self):
1137     """Build hooks env.
1138
1139     This doesn't run on the target node in the pre phase as a failed
1140     node would not allows itself to run.
1141
1142     """
1143     env = {
1144       "OP_TARGET": self.op.node_name,
1145       "NODE_NAME": self.op.node_name,
1146       }
1147     all_nodes = self.cfg.GetNodeList()
1148     all_nodes.remove(self.op.node_name)
1149     return env, all_nodes, all_nodes
1150
1151   def CheckPrereq(self):
1152     """Check prerequisites.
1153
1154     This checks:
1155      - the node exists in the configuration
1156      - it does not have primary or secondary instances
1157      - it's not the master
1158
1159     Any errors are signalled by raising errors.OpPrereqError.
1160
1161     """
1162     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1163     if node is None:
1164       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1165
1166     instance_list = self.cfg.GetInstanceList()
1167
1168     masternode = self.sstore.GetMasterNode()
1169     if node.name == masternode:
1170       raise errors.OpPrereqError("Node is the master node,"
1171                                  " you need to failover first.")
1172
1173     for instance_name in instance_list:
1174       instance = self.cfg.GetInstanceInfo(instance_name)
1175       if node.name == instance.primary_node:
1176         raise errors.OpPrereqError("Instance %s still running on the node,"
1177                                    " please remove first." % instance_name)
1178       if node.name in instance.secondary_nodes:
1179         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1180                                    " please remove first." % instance_name)
1181     self.op.node_name = node.name
1182     self.node = node
1183
1184   def Exec(self, feedback_fn):
1185     """Removes the node from the cluster.
1186
1187     """
1188     node = self.node
1189     logger.Info("stopping the node daemon and removing configs from node %s" %
1190                 node.name)
1191
1192     rpc.call_node_leave_cluster(node.name)
1193
1194     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1195
1196     logger.Info("Removing node %s from config" % node.name)
1197
1198     self.cfg.RemoveNode(node.name)
1199
1200
1201 class LUQueryNodes(NoHooksLU):
1202   """Logical unit for querying nodes.
1203
1204   """
1205   _OP_REQP = ["output_fields", "names"]
1206
1207   def CheckPrereq(self):
1208     """Check prerequisites.
1209
1210     This checks that the fields required are valid output fields.
1211
1212     """
1213     self.dynamic_fields = frozenset(["dtotal", "dfree",
1214                                      "mtotal", "mnode", "mfree",
1215                                      "bootid"])
1216
1217     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1218                                "pinst_list", "sinst_list",
1219                                "pip", "sip"],
1220                        dynamic=self.dynamic_fields,
1221                        selected=self.op.output_fields)
1222
1223     self.wanted = _GetWantedNodes(self, self.op.names)
1224
1225   def Exec(self, feedback_fn):
1226     """Computes the list of nodes and their attributes.
1227
1228     """
1229     nodenames = self.wanted
1230     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1231
1232     # begin data gathering
1233
1234     if self.dynamic_fields.intersection(self.op.output_fields):
1235       live_data = {}
1236       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1237       for name in nodenames:
1238         nodeinfo = node_data.get(name, None)
1239         if nodeinfo:
1240           live_data[name] = {
1241             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1242             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1243             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1244             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1245             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1246             "bootid": nodeinfo['bootid'],
1247             }
1248         else:
1249           live_data[name] = {}
1250     else:
1251       live_data = dict.fromkeys(nodenames, {})
1252
1253     node_to_primary = dict([(name, set()) for name in nodenames])
1254     node_to_secondary = dict([(name, set()) for name in nodenames])
1255
1256     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1257                              "sinst_cnt", "sinst_list"))
1258     if inst_fields & frozenset(self.op.output_fields):
1259       instancelist = self.cfg.GetInstanceList()
1260
1261       for instance_name in instancelist:
1262         inst = self.cfg.GetInstanceInfo(instance_name)
1263         if inst.primary_node in node_to_primary:
1264           node_to_primary[inst.primary_node].add(inst.name)
1265         for secnode in inst.secondary_nodes:
1266           if secnode in node_to_secondary:
1267             node_to_secondary[secnode].add(inst.name)
1268
1269     # end data gathering
1270
1271     output = []
1272     for node in nodelist:
1273       node_output = []
1274       for field in self.op.output_fields:
1275         if field == "name":
1276           val = node.name
1277         elif field == "pinst_list":
1278           val = list(node_to_primary[node.name])
1279         elif field == "sinst_list":
1280           val = list(node_to_secondary[node.name])
1281         elif field == "pinst_cnt":
1282           val = len(node_to_primary[node.name])
1283         elif field == "sinst_cnt":
1284           val = len(node_to_secondary[node.name])
1285         elif field == "pip":
1286           val = node.primary_ip
1287         elif field == "sip":
1288           val = node.secondary_ip
1289         elif field in self.dynamic_fields:
1290           val = live_data[node.name].get(field, None)
1291         else:
1292           raise errors.ParameterError(field)
1293         node_output.append(val)
1294       output.append(node_output)
1295
1296     return output
1297
1298
1299 class LUQueryNodeVolumes(NoHooksLU):
1300   """Logical unit for getting volumes on node(s).
1301
1302   """
1303   _OP_REQP = ["nodes", "output_fields"]
1304
1305   def CheckPrereq(self):
1306     """Check prerequisites.
1307
1308     This checks that the fields required are valid output fields.
1309
1310     """
1311     self.nodes = _GetWantedNodes(self, self.op.nodes)
1312
1313     _CheckOutputFields(static=["node"],
1314                        dynamic=["phys", "vg", "name", "size", "instance"],
1315                        selected=self.op.output_fields)
1316
1317
1318   def Exec(self, feedback_fn):
1319     """Computes the list of nodes and their attributes.
1320
1321     """
1322     nodenames = self.nodes
1323     volumes = rpc.call_node_volumes(nodenames)
1324
1325     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1326              in self.cfg.GetInstanceList()]
1327
1328     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1329
1330     output = []
1331     for node in nodenames:
1332       if node not in volumes or not volumes[node]:
1333         continue
1334
1335       node_vols = volumes[node][:]
1336       node_vols.sort(key=lambda vol: vol['dev'])
1337
1338       for vol in node_vols:
1339         node_output = []
1340         for field in self.op.output_fields:
1341           if field == "node":
1342             val = node
1343           elif field == "phys":
1344             val = vol['dev']
1345           elif field == "vg":
1346             val = vol['vg']
1347           elif field == "name":
1348             val = vol['name']
1349           elif field == "size":
1350             val = int(float(vol['size']))
1351           elif field == "instance":
1352             for inst in ilist:
1353               if node not in lv_by_node[inst]:
1354                 continue
1355               if vol['name'] in lv_by_node[inst][node]:
1356                 val = inst.name
1357                 break
1358             else:
1359               val = '-'
1360           else:
1361             raise errors.ParameterError(field)
1362           node_output.append(str(val))
1363
1364         output.append(node_output)
1365
1366     return output
1367
1368
1369 class LUAddNode(LogicalUnit):
1370   """Logical unit for adding node to the cluster.
1371
1372   """
1373   HPATH = "node-add"
1374   HTYPE = constants.HTYPE_NODE
1375   _OP_REQP = ["node_name"]
1376
1377   def BuildHooksEnv(self):
1378     """Build hooks env.
1379
1380     This will run on all nodes before, and on all nodes + the new node after.
1381
1382     """
1383     env = {
1384       "OP_TARGET": self.op.node_name,
1385       "NODE_NAME": self.op.node_name,
1386       "NODE_PIP": self.op.primary_ip,
1387       "NODE_SIP": self.op.secondary_ip,
1388       }
1389     nodes_0 = self.cfg.GetNodeList()
1390     nodes_1 = nodes_0 + [self.op.node_name, ]
1391     return env, nodes_0, nodes_1
1392
1393   def CheckPrereq(self):
1394     """Check prerequisites.
1395
1396     This checks:
1397      - the new node is not already in the config
1398      - it is resolvable
1399      - its parameters (single/dual homed) matches the cluster
1400
1401     Any errors are signalled by raising errors.OpPrereqError.
1402
1403     """
1404     node_name = self.op.node_name
1405     cfg = self.cfg
1406
1407     dns_data = utils.HostInfo(node_name)
1408
1409     node = dns_data.name
1410     primary_ip = self.op.primary_ip = dns_data.ip
1411     secondary_ip = getattr(self.op, "secondary_ip", None)
1412     if secondary_ip is None:
1413       secondary_ip = primary_ip
1414     if not utils.IsValidIP(secondary_ip):
1415       raise errors.OpPrereqError("Invalid secondary IP given")
1416     self.op.secondary_ip = secondary_ip
1417     node_list = cfg.GetNodeList()
1418     if node in node_list:
1419       raise errors.OpPrereqError("Node %s is already in the configuration"
1420                                  % node)
1421
1422     for existing_node_name in node_list:
1423       existing_node = cfg.GetNodeInfo(existing_node_name)
1424       if (existing_node.primary_ip == primary_ip or
1425           existing_node.secondary_ip == primary_ip or
1426           existing_node.primary_ip == secondary_ip or
1427           existing_node.secondary_ip == secondary_ip):
1428         raise errors.OpPrereqError("New node ip address(es) conflict with"
1429                                    " existing node %s" % existing_node.name)
1430
1431     # check that the type of the node (single versus dual homed) is the
1432     # same as for the master
1433     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1434     master_singlehomed = myself.secondary_ip == myself.primary_ip
1435     newbie_singlehomed = secondary_ip == primary_ip
1436     if master_singlehomed != newbie_singlehomed:
1437       if master_singlehomed:
1438         raise errors.OpPrereqError("The master has no private ip but the"
1439                                    " new node has one")
1440       else:
1441         raise errors.OpPrereqError("The master has a private ip but the"
1442                                    " new node doesn't have one")
1443
1444     # checks reachablity
1445     if not utils.TcpPing(utils.HostInfo().name,
1446                          primary_ip,
1447                          constants.DEFAULT_NODED_PORT):
1448       raise errors.OpPrereqError("Node not reachable by ping")
1449
1450     if not newbie_singlehomed:
1451       # check reachability from my secondary ip to newbie's secondary ip
1452       if not utils.TcpPing(myself.secondary_ip,
1453                            secondary_ip,
1454                            constants.DEFAULT_NODED_PORT):
1455         raise errors.OpPrereqError(
1456           "Node secondary ip not reachable by TCP based ping to noded port")
1457
1458     self.new_node = objects.Node(name=node,
1459                                  primary_ip=primary_ip,
1460                                  secondary_ip=secondary_ip)
1461
1462   def Exec(self, feedback_fn):
1463     """Adds the new node to the cluster.
1464
1465     """
1466     new_node = self.new_node
1467     node = new_node.name
1468
1469     # set up inter-node password and certificate and restarts the node daemon
1470     gntpass = self.sstore.GetNodeDaemonPassword()
1471     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1472       raise errors.OpExecError("ganeti password corruption detected")
1473     f = open(constants.SSL_CERT_FILE)
1474     try:
1475       gntpem = f.read(8192)
1476     finally:
1477       f.close()
1478     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1479     # so we use this to detect an invalid certificate; as long as the
1480     # cert doesn't contain this, the here-document will be correctly
1481     # parsed by the shell sequence below
1482     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1483       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1484     if not gntpem.endswith("\n"):
1485       raise errors.OpExecError("PEM must end with newline")
1486     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1487
1488     # and then connect with ssh to set password and start ganeti-noded
1489     # note that all the below variables are sanitized at this point,
1490     # either by being constants or by the checks above
1491     ss = self.sstore
1492     mycommand = ("umask 077 && "
1493                  "echo '%s' > '%s' && "
1494                  "cat > '%s' << '!EOF.' && \n"
1495                  "%s!EOF.\n%s restart" %
1496                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1497                   constants.SSL_CERT_FILE, gntpem,
1498                   constants.NODE_INITD_SCRIPT))
1499
1500     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1501     if result.failed:
1502       raise errors.OpExecError("Remote command on node %s, error: %s,"
1503                                " output: %s" %
1504                                (node, result.fail_reason, result.output))
1505
1506     # check connectivity
1507     time.sleep(4)
1508
1509     result = rpc.call_version([node])[node]
1510     if result:
1511       if constants.PROTOCOL_VERSION == result:
1512         logger.Info("communication to node %s fine, sw version %s match" %
1513                     (node, result))
1514       else:
1515         raise errors.OpExecError("Version mismatch master version %s,"
1516                                  " node version %s" %
1517                                  (constants.PROTOCOL_VERSION, result))
1518     else:
1519       raise errors.OpExecError("Cannot get version from the new node")
1520
1521     # setup ssh on node
1522     logger.Info("copy ssh key to node %s" % node)
1523     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1524     keyarray = []
1525     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1526                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1527                 priv_key, pub_key]
1528
1529     for i in keyfiles:
1530       f = open(i, 'r')
1531       try:
1532         keyarray.append(f.read())
1533       finally:
1534         f.close()
1535
1536     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1537                                keyarray[3], keyarray[4], keyarray[5])
1538
1539     if not result:
1540       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1541
1542     # Add node to our /etc/hosts, and add key to known_hosts
1543     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1544     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1545                       self.cfg.GetHostKey())
1546
1547     if new_node.secondary_ip != new_node.primary_ip:
1548       if not rpc.call_node_tcp_ping(new_node.name,
1549                                     constants.LOCALHOST_IP_ADDRESS,
1550                                     new_node.secondary_ip,
1551                                     constants.DEFAULT_NODED_PORT,
1552                                     10, False):
1553         raise errors.OpExecError("Node claims it doesn't have the"
1554                                  " secondary ip you gave (%s).\n"
1555                                  "Please fix and re-run this command." %
1556                                  new_node.secondary_ip)
1557
1558     success, msg = ssh.VerifyNodeHostname(node)
1559     if not success:
1560       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1561                                " than the one the resolver gives: %s.\n"
1562                                "Please fix and re-run this command." %
1563                                (node, msg))
1564
1565     # Distribute updated /etc/hosts and known_hosts to all nodes,
1566     # including the node just added
1567     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1568     dist_nodes = self.cfg.GetNodeList() + [node]
1569     if myself.name in dist_nodes:
1570       dist_nodes.remove(myself.name)
1571
1572     logger.Debug("Copying hosts and known_hosts to all nodes")
1573     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1574       result = rpc.call_upload_file(dist_nodes, fname)
1575       for to_node in dist_nodes:
1576         if not result[to_node]:
1577           logger.Error("copy of file %s to node %s failed" %
1578                        (fname, to_node))
1579
1580     to_copy = ss.GetFileList()
1581     for fname in to_copy:
1582       if not ssh.CopyFileToNode(node, fname):
1583         logger.Error("could not copy file %s to node %s" % (fname, node))
1584
1585     logger.Info("adding node %s to cluster.conf" % node)
1586     self.cfg.AddNode(new_node)
1587
1588
1589 class LUMasterFailover(LogicalUnit):
1590   """Failover the master node to the current node.
1591
1592   This is a special LU in that it must run on a non-master node.
1593
1594   """
1595   HPATH = "master-failover"
1596   HTYPE = constants.HTYPE_CLUSTER
1597   REQ_MASTER = False
1598   _OP_REQP = []
1599
1600   def BuildHooksEnv(self):
1601     """Build hooks env.
1602
1603     This will run on the new master only in the pre phase, and on all
1604     the nodes in the post phase.
1605
1606     """
1607     env = {
1608       "OP_TARGET": self.new_master,
1609       "NEW_MASTER": self.new_master,
1610       "OLD_MASTER": self.old_master,
1611       }
1612     return env, [self.new_master], self.cfg.GetNodeList()
1613
1614   def CheckPrereq(self):
1615     """Check prerequisites.
1616
1617     This checks that we are not already the master.
1618
1619     """
1620     self.new_master = utils.HostInfo().name
1621     self.old_master = self.sstore.GetMasterNode()
1622
1623     if self.old_master == self.new_master:
1624       raise errors.OpPrereqError("This commands must be run on the node"
1625                                  " where you want the new master to be.\n"
1626                                  "%s is already the master" %
1627                                  self.old_master)
1628
1629   def Exec(self, feedback_fn):
1630     """Failover the master node.
1631
1632     This command, when run on a non-master node, will cause the current
1633     master to cease being master, and the non-master to become new
1634     master.
1635
1636     """
1637     #TODO: do not rely on gethostname returning the FQDN
1638     logger.Info("setting master to %s, old master: %s" %
1639                 (self.new_master, self.old_master))
1640
1641     if not rpc.call_node_stop_master(self.old_master):
1642       logger.Error("could disable the master role on the old master"
1643                    " %s, please disable manually" % self.old_master)
1644
1645     ss = self.sstore
1646     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1647     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1648                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1649       logger.Error("could not distribute the new simple store master file"
1650                    " to the other nodes, please check.")
1651
1652     if not rpc.call_node_start_master(self.new_master):
1653       logger.Error("could not start the master role on the new master"
1654                    " %s, please check" % self.new_master)
1655       feedback_fn("Error in activating the master IP on the new master,\n"
1656                   "please fix manually.")
1657
1658
1659
1660 class LUQueryClusterInfo(NoHooksLU):
1661   """Query cluster configuration.
1662
1663   """
1664   _OP_REQP = []
1665   REQ_MASTER = False
1666
1667   def CheckPrereq(self):
1668     """No prerequsites needed for this LU.
1669
1670     """
1671     pass
1672
1673   def Exec(self, feedback_fn):
1674     """Return cluster config.
1675
1676     """
1677     result = {
1678       "name": self.sstore.GetClusterName(),
1679       "software_version": constants.RELEASE_VERSION,
1680       "protocol_version": constants.PROTOCOL_VERSION,
1681       "config_version": constants.CONFIG_VERSION,
1682       "os_api_version": constants.OS_API_VERSION,
1683       "export_version": constants.EXPORT_VERSION,
1684       "master": self.sstore.GetMasterNode(),
1685       "architecture": (platform.architecture()[0], platform.machine()),
1686       }
1687
1688     return result
1689
1690
1691 class LUClusterCopyFile(NoHooksLU):
1692   """Copy file to cluster.
1693
1694   """
1695   _OP_REQP = ["nodes", "filename"]
1696
1697   def CheckPrereq(self):
1698     """Check prerequisites.
1699
1700     It should check that the named file exists and that the given list
1701     of nodes is valid.
1702
1703     """
1704     if not os.path.exists(self.op.filename):
1705       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1706
1707     self.nodes = _GetWantedNodes(self, self.op.nodes)
1708
1709   def Exec(self, feedback_fn):
1710     """Copy a file from master to some nodes.
1711
1712     Args:
1713       opts - class with options as members
1714       args - list containing a single element, the file name
1715     Opts used:
1716       nodes - list containing the name of target nodes; if empty, all nodes
1717
1718     """
1719     filename = self.op.filename
1720
1721     myname = utils.HostInfo().name
1722
1723     for node in self.nodes:
1724       if node == myname:
1725         continue
1726       if not ssh.CopyFileToNode(node, filename):
1727         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1728
1729
1730 class LUDumpClusterConfig(NoHooksLU):
1731   """Return a text-representation of the cluster-config.
1732
1733   """
1734   _OP_REQP = []
1735
1736   def CheckPrereq(self):
1737     """No prerequisites.
1738
1739     """
1740     pass
1741
1742   def Exec(self, feedback_fn):
1743     """Dump a representation of the cluster config to the standard output.
1744
1745     """
1746     return self.cfg.DumpConfig()
1747
1748
1749 class LURunClusterCommand(NoHooksLU):
1750   """Run a command on some nodes.
1751
1752   """
1753   _OP_REQP = ["command", "nodes"]
1754
1755   def CheckPrereq(self):
1756     """Check prerequisites.
1757
1758     It checks that the given list of nodes is valid.
1759
1760     """
1761     self.nodes = _GetWantedNodes(self, self.op.nodes)
1762
1763   def Exec(self, feedback_fn):
1764     """Run a command on some nodes.
1765
1766     """
1767     data = []
1768     for node in self.nodes:
1769       result = ssh.SSHCall(node, "root", self.op.command)
1770       data.append((node, result.output, result.exit_code))
1771
1772     return data
1773
1774
1775 class LUActivateInstanceDisks(NoHooksLU):
1776   """Bring up an instance's disks.
1777
1778   """
1779   _OP_REQP = ["instance_name"]
1780
1781   def CheckPrereq(self):
1782     """Check prerequisites.
1783
1784     This checks that the instance is in the cluster.
1785
1786     """
1787     instance = self.cfg.GetInstanceInfo(
1788       self.cfg.ExpandInstanceName(self.op.instance_name))
1789     if instance is None:
1790       raise errors.OpPrereqError("Instance '%s' not known" %
1791                                  self.op.instance_name)
1792     self.instance = instance
1793
1794
1795   def Exec(self, feedback_fn):
1796     """Activate the disks.
1797
1798     """
1799     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1800     if not disks_ok:
1801       raise errors.OpExecError("Cannot activate block devices")
1802
1803     return disks_info
1804
1805
1806 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1807   """Prepare the block devices for an instance.
1808
1809   This sets up the block devices on all nodes.
1810
1811   Args:
1812     instance: a ganeti.objects.Instance object
1813     ignore_secondaries: if true, errors on secondary nodes won't result
1814                         in an error return from the function
1815
1816   Returns:
1817     false if the operation failed
1818     list of (host, instance_visible_name, node_visible_name) if the operation
1819          suceeded with the mapping from node devices to instance devices
1820   """
1821   device_info = []
1822   disks_ok = True
1823   for inst_disk in instance.disks:
1824     master_result = None
1825     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1826       cfg.SetDiskID(node_disk, node)
1827       is_primary = node == instance.primary_node
1828       result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1829       if not result:
1830         logger.Error("could not prepare block device %s on node %s (is_pri"
1831                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1832         if is_primary or not ignore_secondaries:
1833           disks_ok = False
1834       if is_primary:
1835         master_result = result
1836     device_info.append((instance.primary_node, inst_disk.iv_name,
1837                         master_result))
1838
1839   # leave the disks configured for the primary node
1840   # this is a workaround that would be fixed better by
1841   # improving the logical/physical id handling
1842   for disk in instance.disks:
1843     cfg.SetDiskID(disk, instance.primary_node)
1844
1845   return disks_ok, device_info
1846
1847
1848 def _StartInstanceDisks(cfg, instance, force):
1849   """Start the disks of an instance.
1850
1851   """
1852   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1853                                            ignore_secondaries=force)
1854   if not disks_ok:
1855     _ShutdownInstanceDisks(instance, cfg)
1856     if force is not None and not force:
1857       logger.Error("If the message above refers to a secondary node,"
1858                    " you can retry the operation using '--force'.")
1859     raise errors.OpExecError("Disk consistency error")
1860
1861
1862 class LUDeactivateInstanceDisks(NoHooksLU):
1863   """Shutdown an instance's disks.
1864
1865   """
1866   _OP_REQP = ["instance_name"]
1867
1868   def CheckPrereq(self):
1869     """Check prerequisites.
1870
1871     This checks that the instance is in the cluster.
1872
1873     """
1874     instance = self.cfg.GetInstanceInfo(
1875       self.cfg.ExpandInstanceName(self.op.instance_name))
1876     if instance is None:
1877       raise errors.OpPrereqError("Instance '%s' not known" %
1878                                  self.op.instance_name)
1879     self.instance = instance
1880
1881   def Exec(self, feedback_fn):
1882     """Deactivate the disks
1883
1884     """
1885     instance = self.instance
1886     ins_l = rpc.call_instance_list([instance.primary_node])
1887     ins_l = ins_l[instance.primary_node]
1888     if not type(ins_l) is list:
1889       raise errors.OpExecError("Can't contact node '%s'" %
1890                                instance.primary_node)
1891
1892     if self.instance.name in ins_l:
1893       raise errors.OpExecError("Instance is running, can't shutdown"
1894                                " block devices.")
1895
1896     _ShutdownInstanceDisks(instance, self.cfg)
1897
1898
1899 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1900   """Shutdown block devices of an instance.
1901
1902   This does the shutdown on all nodes of the instance.
1903
1904   If the ignore_primary is false, errors on the primary node are
1905   ignored.
1906
1907   """
1908   result = True
1909   for disk in instance.disks:
1910     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1911       cfg.SetDiskID(top_disk, node)
1912       if not rpc.call_blockdev_shutdown(node, top_disk):
1913         logger.Error("could not shutdown block device %s on node %s" %
1914                      (disk.iv_name, node))
1915         if not ignore_primary or node != instance.primary_node:
1916           result = False
1917   return result
1918
1919
1920 class LUStartupInstance(LogicalUnit):
1921   """Starts an instance.
1922
1923   """
1924   HPATH = "instance-start"
1925   HTYPE = constants.HTYPE_INSTANCE
1926   _OP_REQP = ["instance_name", "force"]
1927
1928   def BuildHooksEnv(self):
1929     """Build hooks env.
1930
1931     This runs on master, primary and secondary nodes of the instance.
1932
1933     """
1934     env = {
1935       "FORCE": self.op.force,
1936       }
1937     env.update(_BuildInstanceHookEnvByObject(self.instance))
1938     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1939           list(self.instance.secondary_nodes))
1940     return env, nl, nl
1941
1942   def CheckPrereq(self):
1943     """Check prerequisites.
1944
1945     This checks that the instance is in the cluster.
1946
1947     """
1948     instance = self.cfg.GetInstanceInfo(
1949       self.cfg.ExpandInstanceName(self.op.instance_name))
1950     if instance is None:
1951       raise errors.OpPrereqError("Instance '%s' not known" %
1952                                  self.op.instance_name)
1953
1954     # check bridges existance
1955     _CheckInstanceBridgesExist(instance)
1956
1957     self.instance = instance
1958     self.op.instance_name = instance.name
1959
1960   def Exec(self, feedback_fn):
1961     """Start the instance.
1962
1963     """
1964     instance = self.instance
1965     force = self.op.force
1966     extra_args = getattr(self.op, "extra_args", "")
1967
1968     node_current = instance.primary_node
1969
1970     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1971     if not nodeinfo:
1972       raise errors.OpExecError("Could not contact node %s for infos" %
1973                                (node_current))
1974
1975     freememory = nodeinfo[node_current]['memory_free']
1976     memory = instance.memory
1977     if memory > freememory:
1978       raise errors.OpExecError("Not enough memory to start instance"
1979                                " %s on node %s"
1980                                " needed %s MiB, available %s MiB" %
1981                                (instance.name, node_current, memory,
1982                                 freememory))
1983
1984     _StartInstanceDisks(self.cfg, instance, force)
1985
1986     if not rpc.call_instance_start(node_current, instance, extra_args):
1987       _ShutdownInstanceDisks(instance, self.cfg)
1988       raise errors.OpExecError("Could not start instance")
1989
1990     self.cfg.MarkInstanceUp(instance.name)
1991
1992
1993 class LURebootInstance(LogicalUnit):
1994   """Reboot an instance.
1995
1996   """
1997   HPATH = "instance-reboot"
1998   HTYPE = constants.HTYPE_INSTANCE
1999   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2000
2001   def BuildHooksEnv(self):
2002     """Build hooks env.
2003
2004     This runs on master, primary and secondary nodes of the instance.
2005
2006     """
2007     env = {
2008       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2009       }
2010     env.update(_BuildInstanceHookEnvByObject(self.instance))
2011     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2012           list(self.instance.secondary_nodes))
2013     return env, nl, nl
2014
2015   def CheckPrereq(self):
2016     """Check prerequisites.
2017
2018     This checks that the instance is in the cluster.
2019
2020     """
2021     instance = self.cfg.GetInstanceInfo(
2022       self.cfg.ExpandInstanceName(self.op.instance_name))
2023     if instance is None:
2024       raise errors.OpPrereqError("Instance '%s' not known" %
2025                                  self.op.instance_name)
2026
2027     # check bridges existance
2028     _CheckInstanceBridgesExist(instance)
2029
2030     self.instance = instance
2031     self.op.instance_name = instance.name
2032
2033   def Exec(self, feedback_fn):
2034     """Reboot the instance.
2035
2036     """
2037     instance = self.instance
2038     ignore_secondaries = self.op.ignore_secondaries
2039     reboot_type = self.op.reboot_type
2040     extra_args = getattr(self.op, "extra_args", "")
2041
2042     node_current = instance.primary_node
2043
2044     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2045                            constants.INSTANCE_REBOOT_HARD,
2046                            constants.INSTANCE_REBOOT_FULL]:
2047       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2048                                   (constants.INSTANCE_REBOOT_SOFT,
2049                                    constants.INSTANCE_REBOOT_HARD,
2050                                    constants.INSTANCE_REBOOT_FULL))
2051
2052     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2053                        constants.INSTANCE_REBOOT_HARD]:
2054       if not rpc.call_instance_reboot(node_current, instance,
2055                                       reboot_type, extra_args):
2056         raise errors.OpExecError("Could not reboot instance")
2057     else:
2058       if not rpc.call_instance_shutdown(node_current, instance):
2059         raise errors.OpExecError("could not shutdown instance for full reboot")
2060       _ShutdownInstanceDisks(instance, self.cfg)
2061       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2062       if not rpc.call_instance_start(node_current, instance, extra_args):
2063         _ShutdownInstanceDisks(instance, self.cfg)
2064         raise errors.OpExecError("Could not start instance for full reboot")
2065
2066     self.cfg.MarkInstanceUp(instance.name)
2067
2068
2069 class LUShutdownInstance(LogicalUnit):
2070   """Shutdown an instance.
2071
2072   """
2073   HPATH = "instance-stop"
2074   HTYPE = constants.HTYPE_INSTANCE
2075   _OP_REQP = ["instance_name"]
2076
2077   def BuildHooksEnv(self):
2078     """Build hooks env.
2079
2080     This runs on master, primary and secondary nodes of the instance.
2081
2082     """
2083     env = _BuildInstanceHookEnvByObject(self.instance)
2084     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2085           list(self.instance.secondary_nodes))
2086     return env, nl, nl
2087
2088   def CheckPrereq(self):
2089     """Check prerequisites.
2090
2091     This checks that the instance is in the cluster.
2092
2093     """
2094     instance = self.cfg.GetInstanceInfo(
2095       self.cfg.ExpandInstanceName(self.op.instance_name))
2096     if instance is None:
2097       raise errors.OpPrereqError("Instance '%s' not known" %
2098                                  self.op.instance_name)
2099     self.instance = instance
2100
2101   def Exec(self, feedback_fn):
2102     """Shutdown the instance.
2103
2104     """
2105     instance = self.instance
2106     node_current = instance.primary_node
2107     if not rpc.call_instance_shutdown(node_current, instance):
2108       logger.Error("could not shutdown instance")
2109
2110     self.cfg.MarkInstanceDown(instance.name)
2111     _ShutdownInstanceDisks(instance, self.cfg)
2112
2113
2114 class LUReinstallInstance(LogicalUnit):
2115   """Reinstall an instance.
2116
2117   """
2118   HPATH = "instance-reinstall"
2119   HTYPE = constants.HTYPE_INSTANCE
2120   _OP_REQP = ["instance_name"]
2121
2122   def BuildHooksEnv(self):
2123     """Build hooks env.
2124
2125     This runs on master, primary and secondary nodes of the instance.
2126
2127     """
2128     env = _BuildInstanceHookEnvByObject(self.instance)
2129     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2130           list(self.instance.secondary_nodes))
2131     return env, nl, nl
2132
2133   def CheckPrereq(self):
2134     """Check prerequisites.
2135
2136     This checks that the instance is in the cluster and is not running.
2137
2138     """
2139     instance = self.cfg.GetInstanceInfo(
2140       self.cfg.ExpandInstanceName(self.op.instance_name))
2141     if instance is None:
2142       raise errors.OpPrereqError("Instance '%s' not known" %
2143                                  self.op.instance_name)
2144     if instance.disk_template == constants.DT_DISKLESS:
2145       raise errors.OpPrereqError("Instance '%s' has no disks" %
2146                                  self.op.instance_name)
2147     if instance.status != "down":
2148       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2149                                  self.op.instance_name)
2150     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2151     if remote_info:
2152       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2153                                  (self.op.instance_name,
2154                                   instance.primary_node))
2155
2156     self.op.os_type = getattr(self.op, "os_type", None)
2157     if self.op.os_type is not None:
2158       # OS verification
2159       pnode = self.cfg.GetNodeInfo(
2160         self.cfg.ExpandNodeName(instance.primary_node))
2161       if pnode is None:
2162         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2163                                    self.op.pnode)
2164       os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2165       if not isinstance(os_obj, objects.OS):
2166         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2167                                    " primary node"  % self.op.os_type)
2168
2169     self.instance = instance
2170
2171   def Exec(self, feedback_fn):
2172     """Reinstall the instance.
2173
2174     """
2175     inst = self.instance
2176
2177     if self.op.os_type is not None:
2178       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2179       inst.os = self.op.os_type
2180       self.cfg.AddInstance(inst)
2181
2182     _StartInstanceDisks(self.cfg, inst, None)
2183     try:
2184       feedback_fn("Running the instance OS create scripts...")
2185       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2186         raise errors.OpExecError("Could not install OS for instance %s "
2187                                  "on node %s" %
2188                                  (inst.name, inst.primary_node))
2189     finally:
2190       _ShutdownInstanceDisks(inst, self.cfg)
2191
2192
2193 class LURenameInstance(LogicalUnit):
2194   """Rename an instance.
2195
2196   """
2197   HPATH = "instance-rename"
2198   HTYPE = constants.HTYPE_INSTANCE
2199   _OP_REQP = ["instance_name", "new_name"]
2200
2201   def BuildHooksEnv(self):
2202     """Build hooks env.
2203
2204     This runs on master, primary and secondary nodes of the instance.
2205
2206     """
2207     env = _BuildInstanceHookEnvByObject(self.instance)
2208     env["INSTANCE_NEW_NAME"] = self.op.new_name
2209     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2210           list(self.instance.secondary_nodes))
2211     return env, nl, nl
2212
2213   def CheckPrereq(self):
2214     """Check prerequisites.
2215
2216     This checks that the instance is in the cluster and is not running.
2217
2218     """
2219     instance = self.cfg.GetInstanceInfo(
2220       self.cfg.ExpandInstanceName(self.op.instance_name))
2221     if instance is None:
2222       raise errors.OpPrereqError("Instance '%s' not known" %
2223                                  self.op.instance_name)
2224     if instance.status != "down":
2225       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2226                                  self.op.instance_name)
2227     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2228     if remote_info:
2229       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2230                                  (self.op.instance_name,
2231                                   instance.primary_node))
2232     self.instance = instance
2233
2234     # new name verification
2235     name_info = utils.HostInfo(self.op.new_name)
2236
2237     self.op.new_name = new_name = name_info.name
2238     if not getattr(self.op, "ignore_ip", False):
2239       command = ["fping", "-q", name_info.ip]
2240       result = utils.RunCmd(command)
2241       if not result.failed:
2242         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2243                                    (name_info.ip, new_name))
2244
2245
2246   def Exec(self, feedback_fn):
2247     """Reinstall the instance.
2248
2249     """
2250     inst = self.instance
2251     old_name = inst.name
2252
2253     self.cfg.RenameInstance(inst.name, self.op.new_name)
2254
2255     # re-read the instance from the configuration after rename
2256     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2257
2258     _StartInstanceDisks(self.cfg, inst, None)
2259     try:
2260       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2261                                           "sda", "sdb"):
2262         msg = ("Could run OS rename script for instance %s\n"
2263                "on node %s\n"
2264                "(but the instance has been renamed in Ganeti)" %
2265                (inst.name, inst.primary_node))
2266         logger.Error(msg)
2267     finally:
2268       _ShutdownInstanceDisks(inst, self.cfg)
2269
2270
2271 class LURemoveInstance(LogicalUnit):
2272   """Remove an instance.
2273
2274   """
2275   HPATH = "instance-remove"
2276   HTYPE = constants.HTYPE_INSTANCE
2277   _OP_REQP = ["instance_name"]
2278
2279   def BuildHooksEnv(self):
2280     """Build hooks env.
2281
2282     This runs on master, primary and secondary nodes of the instance.
2283
2284     """
2285     env = _BuildInstanceHookEnvByObject(self.instance)
2286     nl = [self.sstore.GetMasterNode()]
2287     return env, nl, nl
2288
2289   def CheckPrereq(self):
2290     """Check prerequisites.
2291
2292     This checks that the instance is in the cluster.
2293
2294     """
2295     instance = self.cfg.GetInstanceInfo(
2296       self.cfg.ExpandInstanceName(self.op.instance_name))
2297     if instance is None:
2298       raise errors.OpPrereqError("Instance '%s' not known" %
2299                                  self.op.instance_name)
2300     self.instance = instance
2301
2302   def Exec(self, feedback_fn):
2303     """Remove the instance.
2304
2305     """
2306     instance = self.instance
2307     logger.Info("shutting down instance %s on node %s" %
2308                 (instance.name, instance.primary_node))
2309
2310     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2311       if self.op.ignore_failures:
2312         feedback_fn("Warning: can't shutdown instance")
2313       else:
2314         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2315                                  (instance.name, instance.primary_node))
2316
2317     logger.Info("removing block devices for instance %s" % instance.name)
2318
2319     if not _RemoveDisks(instance, self.cfg):
2320       if self.op.ignore_failures:
2321         feedback_fn("Warning: can't remove instance's disks")
2322       else:
2323         raise errors.OpExecError("Can't remove instance's disks")
2324
2325     logger.Info("removing instance %s out of cluster config" % instance.name)
2326
2327     self.cfg.RemoveInstance(instance.name)
2328
2329
2330 class LUQueryInstances(NoHooksLU):
2331   """Logical unit for querying instances.
2332
2333   """
2334   _OP_REQP = ["output_fields", "names"]
2335
2336   def CheckPrereq(self):
2337     """Check prerequisites.
2338
2339     This checks that the fields required are valid output fields.
2340
2341     """
2342     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2343     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2344                                "admin_state", "admin_ram",
2345                                "disk_template", "ip", "mac", "bridge",
2346                                "sda_size", "sdb_size"],
2347                        dynamic=self.dynamic_fields,
2348                        selected=self.op.output_fields)
2349
2350     self.wanted = _GetWantedInstances(self, self.op.names)
2351
2352   def Exec(self, feedback_fn):
2353     """Computes the list of nodes and their attributes.
2354
2355     """
2356     instance_names = self.wanted
2357     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2358                      in instance_names]
2359
2360     # begin data gathering
2361
2362     nodes = frozenset([inst.primary_node for inst in instance_list])
2363
2364     bad_nodes = []
2365     if self.dynamic_fields.intersection(self.op.output_fields):
2366       live_data = {}
2367       node_data = rpc.call_all_instances_info(nodes)
2368       for name in nodes:
2369         result = node_data[name]
2370         if result:
2371           live_data.update(result)
2372         elif result == False:
2373           bad_nodes.append(name)
2374         # else no instance is alive
2375     else:
2376       live_data = dict([(name, {}) for name in instance_names])
2377
2378     # end data gathering
2379
2380     output = []
2381     for instance in instance_list:
2382       iout = []
2383       for field in self.op.output_fields:
2384         if field == "name":
2385           val = instance.name
2386         elif field == "os":
2387           val = instance.os
2388         elif field == "pnode":
2389           val = instance.primary_node
2390         elif field == "snodes":
2391           val = list(instance.secondary_nodes)
2392         elif field == "admin_state":
2393           val = (instance.status != "down")
2394         elif field == "oper_state":
2395           if instance.primary_node in bad_nodes:
2396             val = None
2397           else:
2398             val = bool(live_data.get(instance.name))
2399         elif field == "admin_ram":
2400           val = instance.memory
2401         elif field == "oper_ram":
2402           if instance.primary_node in bad_nodes:
2403             val = None
2404           elif instance.name in live_data:
2405             val = live_data[instance.name].get("memory", "?")
2406           else:
2407             val = "-"
2408         elif field == "disk_template":
2409           val = instance.disk_template
2410         elif field == "ip":
2411           val = instance.nics[0].ip
2412         elif field == "bridge":
2413           val = instance.nics[0].bridge
2414         elif field == "mac":
2415           val = instance.nics[0].mac
2416         elif field == "sda_size" or field == "sdb_size":
2417           disk = instance.FindDisk(field[:3])
2418           if disk is None:
2419             val = None
2420           else:
2421             val = disk.size
2422         else:
2423           raise errors.ParameterError(field)
2424         iout.append(val)
2425       output.append(iout)
2426
2427     return output
2428
2429
2430 class LUFailoverInstance(LogicalUnit):
2431   """Failover an instance.
2432
2433   """
2434   HPATH = "instance-failover"
2435   HTYPE = constants.HTYPE_INSTANCE
2436   _OP_REQP = ["instance_name", "ignore_consistency"]
2437
2438   def BuildHooksEnv(self):
2439     """Build hooks env.
2440
2441     This runs on master, primary and secondary nodes of the instance.
2442
2443     """
2444     env = {
2445       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2446       }
2447     env.update(_BuildInstanceHookEnvByObject(self.instance))
2448     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2449     return env, nl, nl
2450
2451   def CheckPrereq(self):
2452     """Check prerequisites.
2453
2454     This checks that the instance is in the cluster.
2455
2456     """
2457     instance = self.cfg.GetInstanceInfo(
2458       self.cfg.ExpandInstanceName(self.op.instance_name))
2459     if instance is None:
2460       raise errors.OpPrereqError("Instance '%s' not known" %
2461                                  self.op.instance_name)
2462
2463     if instance.disk_template not in constants.DTS_NET_MIRROR:
2464       raise errors.OpPrereqError("Instance's disk layout is not"
2465                                  " network mirrored, cannot failover.")
2466
2467     secondary_nodes = instance.secondary_nodes
2468     if not secondary_nodes:
2469       raise errors.ProgrammerError("no secondary node but using "
2470                                    "DT_REMOTE_RAID1 template")
2471
2472     # check memory requirements on the secondary node
2473     target_node = secondary_nodes[0]
2474     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2475     info = nodeinfo.get(target_node, None)
2476     if not info:
2477       raise errors.OpPrereqError("Cannot get current information"
2478                                  " from node '%s'" % nodeinfo)
2479     if instance.memory > info['memory_free']:
2480       raise errors.OpPrereqError("Not enough memory on target node %s."
2481                                  " %d MB available, %d MB required" %
2482                                  (target_node, info['memory_free'],
2483                                   instance.memory))
2484
2485     # check bridge existance
2486     brlist = [nic.bridge for nic in instance.nics]
2487     if not rpc.call_bridges_exist(instance.primary_node, brlist):
2488       raise errors.OpPrereqError("One or more target bridges %s does not"
2489                                  " exist on destination node '%s'" %
2490                                  (brlist, instance.primary_node))
2491
2492     self.instance = instance
2493
2494   def Exec(self, feedback_fn):
2495     """Failover an instance.
2496
2497     The failover is done by shutting it down on its present node and
2498     starting it on the secondary.
2499
2500     """
2501     instance = self.instance
2502
2503     source_node = instance.primary_node
2504     target_node = instance.secondary_nodes[0]
2505
2506     feedback_fn("* checking disk consistency between source and target")
2507     for dev in instance.disks:
2508       # for remote_raid1, these are md over drbd
2509       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2510         if not self.op.ignore_consistency:
2511           raise errors.OpExecError("Disk %s is degraded on target node,"
2512                                    " aborting failover." % dev.iv_name)
2513
2514     feedback_fn("* checking target node resource availability")
2515     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2516
2517     if not nodeinfo:
2518       raise errors.OpExecError("Could not contact target node %s." %
2519                                target_node)
2520
2521     free_memory = int(nodeinfo[target_node]['memory_free'])
2522     memory = instance.memory
2523     if memory > free_memory:
2524       raise errors.OpExecError("Not enough memory to create instance %s on"
2525                                " node %s. needed %s MiB, available %s MiB" %
2526                                (instance.name, target_node, memory,
2527                                 free_memory))
2528
2529     feedback_fn("* shutting down instance on source node")
2530     logger.Info("Shutting down instance %s on node %s" %
2531                 (instance.name, source_node))
2532
2533     if not rpc.call_instance_shutdown(source_node, instance):
2534       logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2535                    " anyway. Please make sure node %s is down"  %
2536                    (instance.name, source_node, source_node))
2537
2538     feedback_fn("* deactivating the instance's disks on source node")
2539     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2540       raise errors.OpExecError("Can't shut down the instance's disks.")
2541
2542     instance.primary_node = target_node
2543     # distribute new instance config to the other nodes
2544     self.cfg.AddInstance(instance)
2545
2546     feedback_fn("* activating the instance's disks on target node")
2547     logger.Info("Starting instance %s on node %s" %
2548                 (instance.name, target_node))
2549
2550     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2551                                              ignore_secondaries=True)
2552     if not disks_ok:
2553       _ShutdownInstanceDisks(instance, self.cfg)
2554       raise errors.OpExecError("Can't activate the instance's disks")
2555
2556     feedback_fn("* starting the instance on the target node")
2557     if not rpc.call_instance_start(target_node, instance, None):
2558       _ShutdownInstanceDisks(instance, self.cfg)
2559       raise errors.OpExecError("Could not start instance %s on node %s." %
2560                                (instance.name, target_node))
2561
2562
2563 def _CreateBlockDevOnPrimary(cfg, node, device, info):
2564   """Create a tree of block devices on the primary node.
2565
2566   This always creates all devices.
2567
2568   """
2569   if device.children:
2570     for child in device.children:
2571       if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2572         return False
2573
2574   cfg.SetDiskID(device, node)
2575   new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2576   if not new_id:
2577     return False
2578   if device.physical_id is None:
2579     device.physical_id = new_id
2580   return True
2581
2582
2583 def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2584   """Create a tree of block devices on a secondary node.
2585
2586   If this device type has to be created on secondaries, create it and
2587   all its children.
2588
2589   If not, just recurse to children keeping the same 'force' value.
2590
2591   """
2592   if device.CreateOnSecondary():
2593     force = True
2594   if device.children:
2595     for child in device.children:
2596       if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2597         return False
2598
2599   if not force:
2600     return True
2601   cfg.SetDiskID(device, node)
2602   new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2603   if not new_id:
2604     return False
2605   if device.physical_id is None:
2606     device.physical_id = new_id
2607   return True
2608
2609
2610 def _GenerateUniqueNames(cfg, exts):
2611   """Generate a suitable LV name.
2612
2613   This will generate a logical volume name for the given instance.
2614
2615   """
2616   results = []
2617   for val in exts:
2618     new_id = cfg.GenerateUniqueID()
2619     results.append("%s%s" % (new_id, val))
2620   return results
2621
2622
2623 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2624   """Generate a drbd device complete with its children.
2625
2626   """
2627   port = cfg.AllocatePort()
2628   vgname = cfg.GetVGName()
2629   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2630                           logical_id=(vgname, names[0]))
2631   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2632                           logical_id=(vgname, names[1]))
2633   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2634                           logical_id = (primary, secondary, port),
2635                           children = [dev_data, dev_meta])
2636   return drbd_dev
2637
2638
2639 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2640   """Generate a drbd8 device complete with its children.
2641
2642   """
2643   port = cfg.AllocatePort()
2644   vgname = cfg.GetVGName()
2645   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2646                           logical_id=(vgname, names[0]))
2647   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2648                           logical_id=(vgname, names[1]))
2649   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2650                           logical_id = (primary, secondary, port),
2651                           children = [dev_data, dev_meta],
2652                           iv_name=iv_name)
2653   return drbd_dev
2654
2655 def _GenerateDiskTemplate(cfg, template_name,
2656                           instance_name, primary_node,
2657                           secondary_nodes, disk_sz, swap_sz):
2658   """Generate the entire disk layout for a given template type.
2659
2660   """
2661   #TODO: compute space requirements
2662
2663   vgname = cfg.GetVGName()
2664   if template_name == "diskless":
2665     disks = []
2666   elif template_name == "plain":
2667     if len(secondary_nodes) != 0:
2668       raise errors.ProgrammerError("Wrong template configuration")
2669
2670     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2671     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2672                            logical_id=(vgname, names[0]),
2673                            iv_name = "sda")
2674     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2675                            logical_id=(vgname, names[1]),
2676                            iv_name = "sdb")
2677     disks = [sda_dev, sdb_dev]
2678   elif template_name == "local_raid1":
2679     if len(secondary_nodes) != 0:
2680       raise errors.ProgrammerError("Wrong template configuration")
2681
2682
2683     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2684                                        ".sdb_m1", ".sdb_m2"])
2685     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2686                               logical_id=(vgname, names[0]))
2687     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2688                               logical_id=(vgname, names[1]))
2689     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2690                               size=disk_sz,
2691                               children = [sda_dev_m1, sda_dev_m2])
2692     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2693                               logical_id=(vgname, names[2]))
2694     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2695                               logical_id=(vgname, names[3]))
2696     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2697                               size=swap_sz,
2698                               children = [sdb_dev_m1, sdb_dev_m2])
2699     disks = [md_sda_dev, md_sdb_dev]
2700   elif template_name == constants.DT_REMOTE_RAID1:
2701     if len(secondary_nodes) != 1:
2702       raise errors.ProgrammerError("Wrong template configuration")
2703     remote_node = secondary_nodes[0]
2704     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2705                                        ".sdb_data", ".sdb_meta"])
2706     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2707                                          disk_sz, names[0:2])
2708     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2709                               children = [drbd_sda_dev], size=disk_sz)
2710     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2711                                          swap_sz, names[2:4])
2712     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2713                               children = [drbd_sdb_dev], size=swap_sz)
2714     disks = [md_sda_dev, md_sdb_dev]
2715   elif template_name == constants.DT_DRBD8:
2716     if len(secondary_nodes) != 1:
2717       raise errors.ProgrammerError("Wrong template configuration")
2718     remote_node = secondary_nodes[0]
2719     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2720                                        ".sdb_data", ".sdb_meta"])
2721     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2722                                          disk_sz, names[0:2], "sda")
2723     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2724                                          swap_sz, names[2:4], "sdb")
2725     disks = [drbd_sda_dev, drbd_sdb_dev]
2726   else:
2727     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2728   return disks
2729
2730
2731 def _GetInstanceInfoText(instance):
2732   """Compute that text that should be added to the disk's metadata.
2733
2734   """
2735   return "originstname+%s" % instance.name
2736
2737
2738 def _CreateDisks(cfg, instance):
2739   """Create all disks for an instance.
2740
2741   This abstracts away some work from AddInstance.
2742
2743   Args:
2744     instance: the instance object
2745
2746   Returns:
2747     True or False showing the success of the creation process
2748
2749   """
2750   info = _GetInstanceInfoText(instance)
2751
2752   for device in instance.disks:
2753     logger.Info("creating volume %s for instance %s" %
2754               (device.iv_name, instance.name))
2755     #HARDCODE
2756     for secondary_node in instance.secondary_nodes:
2757       if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2758                                         info):
2759         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2760                      (device.iv_name, device, secondary_node))
2761         return False
2762     #HARDCODE
2763     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2764       logger.Error("failed to create volume %s on primary!" %
2765                    device.iv_name)
2766       return False
2767   return True
2768
2769
2770 def _RemoveDisks(instance, cfg):
2771   """Remove all disks for an instance.
2772
2773   This abstracts away some work from `AddInstance()` and
2774   `RemoveInstance()`. Note that in case some of the devices couldn't
2775   be removed, the removal will continue with the other ones (compare
2776   with `_CreateDisks()`).
2777
2778   Args:
2779     instance: the instance object
2780
2781   Returns:
2782     True or False showing the success of the removal proces
2783
2784   """
2785   logger.Info("removing block devices for instance %s" % instance.name)
2786
2787   result = True
2788   for device in instance.disks:
2789     for node, disk in device.ComputeNodeTree(instance.primary_node):
2790       cfg.SetDiskID(disk, node)
2791       if not rpc.call_blockdev_remove(node, disk):
2792         logger.Error("could not remove block device %s on node %s,"
2793                      " continuing anyway" %
2794                      (device.iv_name, node))
2795         result = False
2796   return result
2797
2798
2799 class LUCreateInstance(LogicalUnit):
2800   """Create an instance.
2801
2802   """
2803   HPATH = "instance-add"
2804   HTYPE = constants.HTYPE_INSTANCE
2805   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2806               "disk_template", "swap_size", "mode", "start", "vcpus",
2807               "wait_for_sync", "ip_check"]
2808
2809   def BuildHooksEnv(self):
2810     """Build hooks env.
2811
2812     This runs on master, primary and secondary nodes of the instance.
2813
2814     """
2815     env = {
2816       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2817       "INSTANCE_DISK_SIZE": self.op.disk_size,
2818       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2819       "INSTANCE_ADD_MODE": self.op.mode,
2820       }
2821     if self.op.mode == constants.INSTANCE_IMPORT:
2822       env["INSTANCE_SRC_NODE"] = self.op.src_node
2823       env["INSTANCE_SRC_PATH"] = self.op.src_path
2824       env["INSTANCE_SRC_IMAGE"] = self.src_image
2825
2826     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2827       primary_node=self.op.pnode,
2828       secondary_nodes=self.secondaries,
2829       status=self.instance_status,
2830       os_type=self.op.os_type,
2831       memory=self.op.mem_size,
2832       vcpus=self.op.vcpus,
2833       nics=[(self.inst_ip, self.op.bridge)],
2834     ))
2835
2836     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2837           self.secondaries)
2838     return env, nl, nl
2839
2840
2841   def CheckPrereq(self):
2842     """Check prerequisites.
2843
2844     """
2845     if self.op.mode not in (constants.INSTANCE_CREATE,
2846                             constants.INSTANCE_IMPORT):
2847       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2848                                  self.op.mode)
2849
2850     if self.op.mode == constants.INSTANCE_IMPORT:
2851       src_node = getattr(self.op, "src_node", None)
2852       src_path = getattr(self.op, "src_path", None)
2853       if src_node is None or src_path is None:
2854         raise errors.OpPrereqError("Importing an instance requires source"
2855                                    " node and path options")
2856       src_node_full = self.cfg.ExpandNodeName(src_node)
2857       if src_node_full is None:
2858         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2859       self.op.src_node = src_node = src_node_full
2860
2861       if not os.path.isabs(src_path):
2862         raise errors.OpPrereqError("The source path must be absolute")
2863
2864       export_info = rpc.call_export_info(src_node, src_path)
2865
2866       if not export_info:
2867         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2868
2869       if not export_info.has_section(constants.INISECT_EXP):
2870         raise errors.ProgrammerError("Corrupted export config")
2871
2872       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2873       if (int(ei_version) != constants.EXPORT_VERSION):
2874         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2875                                    (ei_version, constants.EXPORT_VERSION))
2876
2877       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2878         raise errors.OpPrereqError("Can't import instance with more than"
2879                                    " one data disk")
2880
2881       # FIXME: are the old os-es, disk sizes, etc. useful?
2882       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2883       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2884                                                          'disk0_dump'))
2885       self.src_image = diskimage
2886     else: # INSTANCE_CREATE
2887       if getattr(self.op, "os_type", None) is None:
2888         raise errors.OpPrereqError("No guest OS specified")
2889
2890     # check primary node
2891     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2892     if pnode is None:
2893       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2894                                  self.op.pnode)
2895     self.op.pnode = pnode.name
2896     self.pnode = pnode
2897     self.secondaries = []
2898     # disk template and mirror node verification
2899     if self.op.disk_template not in constants.DISK_TEMPLATES:
2900       raise errors.OpPrereqError("Invalid disk template name")
2901
2902     if self.op.disk_template in constants.DTS_NET_MIRROR:
2903       if getattr(self.op, "snode", None) is None:
2904         raise errors.OpPrereqError("The networked disk templates need"
2905                                    " a mirror node")
2906
2907       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2908       if snode_name is None:
2909         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2910                                    self.op.snode)
2911       elif snode_name == pnode.name:
2912         raise errors.OpPrereqError("The secondary node cannot be"
2913                                    " the primary node.")
2914       self.secondaries.append(snode_name)
2915
2916     # Check lv size requirements
2917     nodenames = [pnode.name] + self.secondaries
2918     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2919
2920     # Required free disk space as a function of disk and swap space
2921     req_size_dict = {
2922       constants.DT_DISKLESS: 0,
2923       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2924       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2925       # 256 MB are added for drbd metadata, 128MB for each drbd device
2926       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2927       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2928     }
2929
2930     if self.op.disk_template not in req_size_dict:
2931       raise errors.ProgrammerError("Disk template '%s' size requirement"
2932                                    " is unknown" %  self.op.disk_template)
2933
2934     req_size = req_size_dict[self.op.disk_template]
2935
2936     for node in nodenames:
2937       info = nodeinfo.get(node, None)
2938       if not info:
2939         raise errors.OpPrereqError("Cannot get current information"
2940                                    " from node '%s'" % nodeinfo)
2941       if req_size > info['vg_free']:
2942         raise errors.OpPrereqError("Not enough disk space on target node %s."
2943                                    " %d MB available, %d MB required" %
2944                                    (node, info['vg_free'], req_size))
2945
2946     # os verification
2947     os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2948     if not isinstance(os_obj, objects.OS):
2949       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2950                                  " primary node"  % self.op.os_type)
2951
2952     # instance verification
2953     hostname1 = utils.HostInfo(self.op.instance_name)
2954
2955     self.op.instance_name = instance_name = hostname1.name
2956     instance_list = self.cfg.GetInstanceList()
2957     if instance_name in instance_list:
2958       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2959                                  instance_name)
2960
2961     ip = getattr(self.op, "ip", None)
2962     if ip is None or ip.lower() == "none":
2963       inst_ip = None
2964     elif ip.lower() == "auto":
2965       inst_ip = hostname1.ip
2966     else:
2967       if not utils.IsValidIP(ip):
2968         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2969                                    " like a valid IP" % ip)
2970       inst_ip = ip
2971     self.inst_ip = inst_ip
2972
2973     if self.op.start and not self.op.ip_check:
2974       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2975                                  " adding an instance in start mode")
2976
2977     if self.op.ip_check:
2978       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2979                        constants.DEFAULT_NODED_PORT):
2980         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2981                                    (hostname1.ip, instance_name))
2982
2983     # bridge verification
2984     bridge = getattr(self.op, "bridge", None)
2985     if bridge is None:
2986       self.op.bridge = self.cfg.GetDefBridge()
2987     else:
2988       self.op.bridge = bridge
2989
2990     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2991       raise errors.OpPrereqError("target bridge '%s' does not exist on"
2992                                  " destination node '%s'" %
2993                                  (self.op.bridge, pnode.name))
2994
2995     if self.op.start:
2996       self.instance_status = 'up'
2997     else:
2998       self.instance_status = 'down'
2999
3000   def Exec(self, feedback_fn):
3001     """Create and add the instance to the cluster.
3002
3003     """
3004     instance = self.op.instance_name
3005     pnode_name = self.pnode.name
3006
3007     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
3008     if self.inst_ip is not None:
3009       nic.ip = self.inst_ip
3010
3011     disks = _GenerateDiskTemplate(self.cfg,
3012                                   self.op.disk_template,
3013                                   instance, pnode_name,
3014                                   self.secondaries, self.op.disk_size,
3015                                   self.op.swap_size)
3016
3017     iobj = objects.Instance(name=instance, os=self.op.os_type,
3018                             primary_node=pnode_name,
3019                             memory=self.op.mem_size,
3020                             vcpus=self.op.vcpus,
3021                             nics=[nic], disks=disks,
3022                             disk_template=self.op.disk_template,
3023                             status=self.instance_status,
3024                             )
3025
3026     feedback_fn("* creating instance disks...")
3027     if not _CreateDisks(self.cfg, iobj):
3028       _RemoveDisks(iobj, self.cfg)
3029       raise errors.OpExecError("Device creation failed, reverting...")
3030
3031     feedback_fn("adding instance %s to cluster config" % instance)
3032
3033     self.cfg.AddInstance(iobj)
3034
3035     if self.op.wait_for_sync:
3036       disk_abort = not _WaitForSync(self.cfg, iobj)
3037     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3038       # make sure the disks are not degraded (still sync-ing is ok)
3039       time.sleep(15)
3040       feedback_fn("* checking mirrors status")
3041       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
3042     else:
3043       disk_abort = False
3044
3045     if disk_abort:
3046       _RemoveDisks(iobj, self.cfg)
3047       self.cfg.RemoveInstance(iobj.name)
3048       raise errors.OpExecError("There are some degraded disks for"
3049                                " this instance")
3050
3051     feedback_fn("creating os for instance %s on node %s" %
3052                 (instance, pnode_name))
3053
3054     if iobj.disk_template != constants.DT_DISKLESS:
3055       if self.op.mode == constants.INSTANCE_CREATE:
3056         feedback_fn("* running the instance OS create scripts...")
3057         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3058           raise errors.OpExecError("could not add os for instance %s"
3059                                    " on node %s" %
3060                                    (instance, pnode_name))
3061
3062       elif self.op.mode == constants.INSTANCE_IMPORT:
3063         feedback_fn("* running the instance OS import scripts...")
3064         src_node = self.op.src_node
3065         src_image = self.src_image
3066         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3067                                                 src_node, src_image):
3068           raise errors.OpExecError("Could not import os for instance"
3069                                    " %s on node %s" %
3070                                    (instance, pnode_name))
3071       else:
3072         # also checked in the prereq part
3073         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3074                                      % self.op.mode)
3075
3076     if self.op.start:
3077       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3078       feedback_fn("* starting instance...")
3079       if not rpc.call_instance_start(pnode_name, iobj, None):
3080         raise errors.OpExecError("Could not start instance")
3081
3082
3083 class LUConnectConsole(NoHooksLU):
3084   """Connect to an instance's console.
3085
3086   This is somewhat special in that it returns the command line that
3087   you need to run on the master node in order to connect to the
3088   console.
3089
3090   """
3091   _OP_REQP = ["instance_name"]
3092
3093   def CheckPrereq(self):
3094     """Check prerequisites.
3095
3096     This checks that the instance is in the cluster.
3097
3098     """
3099     instance = self.cfg.GetInstanceInfo(
3100       self.cfg.ExpandInstanceName(self.op.instance_name))
3101     if instance is None:
3102       raise errors.OpPrereqError("Instance '%s' not known" %
3103                                  self.op.instance_name)
3104     self.instance = instance
3105
3106   def Exec(self, feedback_fn):
3107     """Connect to the console of an instance
3108
3109     """
3110     instance = self.instance
3111     node = instance.primary_node
3112
3113     node_insts = rpc.call_instance_list([node])[node]
3114     if node_insts is False:
3115       raise errors.OpExecError("Can't connect to node %s." % node)
3116
3117     if instance.name not in node_insts:
3118       raise errors.OpExecError("Instance %s is not running." % instance.name)
3119
3120     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3121
3122     hyper = hypervisor.GetHypervisor()
3123     console_cmd = hyper.GetShellCommandForConsole(instance.name)
3124     # build ssh cmdline
3125     argv = ["ssh", "-q", "-t"]
3126     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3127     argv.extend(ssh.BATCH_MODE_OPTS)
3128     argv.append(node)
3129     argv.append(console_cmd)
3130     return "ssh", argv
3131
3132
3133 class LUAddMDDRBDComponent(LogicalUnit):
3134   """Adda new mirror member to an instance's disk.
3135
3136   """
3137   HPATH = "mirror-add"
3138   HTYPE = constants.HTYPE_INSTANCE
3139   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3140
3141   def BuildHooksEnv(self):
3142     """Build hooks env.
3143
3144     This runs on the master, the primary and all the secondaries.
3145
3146     """
3147     env = {
3148       "NEW_SECONDARY": self.op.remote_node,
3149       "DISK_NAME": self.op.disk_name,
3150       }
3151     env.update(_BuildInstanceHookEnvByObject(self.instance))
3152     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3153           self.op.remote_node,] + list(self.instance.secondary_nodes)
3154     return env, nl, nl
3155
3156   def CheckPrereq(self):
3157     """Check prerequisites.
3158
3159     This checks that the instance is in the cluster.
3160
3161     """
3162     instance = self.cfg.GetInstanceInfo(
3163       self.cfg.ExpandInstanceName(self.op.instance_name))
3164     if instance is None:
3165       raise errors.OpPrereqError("Instance '%s' not known" %
3166                                  self.op.instance_name)
3167     self.instance = instance
3168
3169     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3170     if remote_node is None:
3171       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3172     self.remote_node = remote_node
3173
3174     if remote_node == instance.primary_node:
3175       raise errors.OpPrereqError("The specified node is the primary node of"
3176                                  " the instance.")
3177
3178     if instance.disk_template != constants.DT_REMOTE_RAID1:
3179       raise errors.OpPrereqError("Instance's disk layout is not"
3180                                  " remote_raid1.")
3181     for disk in instance.disks:
3182       if disk.iv_name == self.op.disk_name:
3183         break
3184     else:
3185       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3186                                  " instance." % self.op.disk_name)
3187     if len(disk.children) > 1:
3188       raise errors.OpPrereqError("The device already has two slave"
3189                                  " devices.\n"
3190                                  "This would create a 3-disk raid1"
3191                                  " which we don't allow.")
3192     self.disk = disk
3193
3194   def Exec(self, feedback_fn):
3195     """Add the mirror component
3196
3197     """
3198     disk = self.disk
3199     instance = self.instance
3200
3201     remote_node = self.remote_node
3202     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3203     names = _GenerateUniqueNames(self.cfg, lv_names)
3204     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3205                                      remote_node, disk.size, names)
3206
3207     logger.Info("adding new mirror component on secondary")
3208     #HARDCODE
3209     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
3210                                       _GetInstanceInfoText(instance)):
3211       raise errors.OpExecError("Failed to create new component on secondary"
3212                                " node %s" % remote_node)
3213
3214     logger.Info("adding new mirror component on primary")
3215     #HARDCODE
3216     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
3217                                     _GetInstanceInfoText(instance)):
3218       # remove secondary dev
3219       self.cfg.SetDiskID(new_drbd, remote_node)
3220       rpc.call_blockdev_remove(remote_node, new_drbd)
3221       raise errors.OpExecError("Failed to create volume on primary")
3222
3223     # the device exists now
3224     # call the primary node to add the mirror to md
3225     logger.Info("adding new mirror component to md")
3226     if not rpc.call_blockdev_addchildren(instance.primary_node,
3227                                          disk, [new_drbd]):
3228       logger.Error("Can't add mirror compoment to md!")
3229       self.cfg.SetDiskID(new_drbd, remote_node)
3230       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3231         logger.Error("Can't rollback on secondary")
3232       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3233       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3234         logger.Error("Can't rollback on primary")
3235       raise errors.OpExecError("Can't add mirror component to md array")
3236
3237     disk.children.append(new_drbd)
3238
3239     self.cfg.AddInstance(instance)
3240
3241     _WaitForSync(self.cfg, instance)
3242
3243     return 0
3244
3245
3246 class LURemoveMDDRBDComponent(LogicalUnit):
3247   """Remove a component from a remote_raid1 disk.
3248
3249   """
3250   HPATH = "mirror-remove"
3251   HTYPE = constants.HTYPE_INSTANCE
3252   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3253
3254   def BuildHooksEnv(self):
3255     """Build hooks env.
3256
3257     This runs on the master, the primary and all the secondaries.
3258
3259     """
3260     env = {
3261       "DISK_NAME": self.op.disk_name,
3262       "DISK_ID": self.op.disk_id,
3263       "OLD_SECONDARY": self.old_secondary,
3264       }
3265     env.update(_BuildInstanceHookEnvByObject(self.instance))
3266     nl = [self.sstore.GetMasterNode(),
3267           self.instance.primary_node] + list(self.instance.secondary_nodes)
3268     return env, nl, nl
3269
3270   def CheckPrereq(self):
3271     """Check prerequisites.
3272
3273     This checks that the instance is in the cluster.
3274
3275     """
3276     instance = self.cfg.GetInstanceInfo(
3277       self.cfg.ExpandInstanceName(self.op.instance_name))
3278     if instance is None:
3279       raise errors.OpPrereqError("Instance '%s' not known" %
3280                                  self.op.instance_name)
3281     self.instance = instance
3282
3283     if instance.disk_template != constants.DT_REMOTE_RAID1:
3284       raise errors.OpPrereqError("Instance's disk layout is not"
3285                                  " remote_raid1.")
3286     for disk in instance.disks:
3287       if disk.iv_name == self.op.disk_name:
3288         break
3289     else:
3290       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3291                                  " instance." % self.op.disk_name)
3292     for child in disk.children:
3293       if (child.dev_type == constants.LD_DRBD7 and
3294           child.logical_id[2] == self.op.disk_id):
3295         break
3296     else:
3297       raise errors.OpPrereqError("Can't find the device with this port.")
3298
3299     if len(disk.children) < 2:
3300       raise errors.OpPrereqError("Cannot remove the last component from"
3301                                  " a mirror.")
3302     self.disk = disk
3303     self.child = child
3304     if self.child.logical_id[0] == instance.primary_node:
3305       oid = 1
3306     else:
3307       oid = 0
3308     self.old_secondary = self.child.logical_id[oid]
3309
3310   def Exec(self, feedback_fn):
3311     """Remove the mirror component
3312
3313     """
3314     instance = self.instance
3315     disk = self.disk
3316     child = self.child
3317     logger.Info("remove mirror component")
3318     self.cfg.SetDiskID(disk, instance.primary_node)
3319     if not rpc.call_blockdev_removechildren(instance.primary_node,
3320                                             disk, [child]):
3321       raise errors.OpExecError("Can't remove child from mirror.")
3322
3323     for node in child.logical_id[:2]:
3324       self.cfg.SetDiskID(child, node)
3325       if not rpc.call_blockdev_remove(node, child):
3326         logger.Error("Warning: failed to remove device from node %s,"
3327                      " continuing operation." % node)
3328
3329     disk.children.remove(child)
3330     self.cfg.AddInstance(instance)
3331
3332
3333 class LUReplaceDisks(LogicalUnit):
3334   """Replace the disks of an instance.
3335
3336   """
3337   HPATH = "mirrors-replace"
3338   HTYPE = constants.HTYPE_INSTANCE
3339   _OP_REQP = ["instance_name"]
3340
3341   def BuildHooksEnv(self):
3342     """Build hooks env.
3343
3344     This runs on the master, the primary and all the secondaries.
3345
3346     """
3347     env = {
3348       "NEW_SECONDARY": self.op.remote_node,
3349       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3350       }
3351     env.update(_BuildInstanceHookEnvByObject(self.instance))
3352     nl = [self.sstore.GetMasterNode(),
3353           self.instance.primary_node] + list(self.instance.secondary_nodes)
3354     return env, nl, nl
3355
3356   def CheckPrereq(self):
3357     """Check prerequisites.
3358
3359     This checks that the instance is in the cluster.
3360
3361     """
3362     instance = self.cfg.GetInstanceInfo(
3363       self.cfg.ExpandInstanceName(self.op.instance_name))
3364     if instance is None:
3365       raise errors.OpPrereqError("Instance '%s' not known" %
3366                                  self.op.instance_name)
3367     self.instance = instance
3368
3369     if instance.disk_template != constants.DT_REMOTE_RAID1:
3370       raise errors.OpPrereqError("Instance's disk layout is not"
3371                                  " remote_raid1.")
3372
3373     if len(instance.secondary_nodes) != 1:
3374       raise errors.OpPrereqError("The instance has a strange layout,"
3375                                  " expected one secondary but found %d" %
3376                                  len(instance.secondary_nodes))
3377
3378     remote_node = getattr(self.op, "remote_node", None)
3379     if remote_node is None:
3380       remote_node = instance.secondary_nodes[0]
3381     else:
3382       remote_node = self.cfg.ExpandNodeName(remote_node)
3383       if remote_node is None:
3384         raise errors.OpPrereqError("Node '%s' not known" %
3385                                    self.op.remote_node)
3386     if remote_node == instance.primary_node:
3387       raise errors.OpPrereqError("The specified node is the primary node of"
3388                                  " the instance.")
3389     self.op.remote_node = remote_node
3390
3391   def Exec(self, feedback_fn):
3392     """Replace the disks of an instance.
3393
3394     """
3395     instance = self.instance
3396     iv_names = {}
3397     # start of work
3398     remote_node = self.op.remote_node
3399     cfg = self.cfg
3400     for dev in instance.disks:
3401       size = dev.size
3402       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3403       names = _GenerateUniqueNames(cfg, lv_names)
3404       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3405                                        remote_node, size, names)
3406       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3407       logger.Info("adding new mirror component on secondary for %s" %
3408                   dev.iv_name)
3409       #HARDCODE
3410       if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3411                                         _GetInstanceInfoText(instance)):
3412         raise errors.OpExecError("Failed to create new component on"
3413                                  " secondary node %s\n"
3414                                  "Full abort, cleanup manually!" %
3415                                  remote_node)
3416
3417       logger.Info("adding new mirror component on primary")
3418       #HARDCODE
3419       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3420                                       _GetInstanceInfoText(instance)):
3421         # remove secondary dev
3422         cfg.SetDiskID(new_drbd, remote_node)
3423         rpc.call_blockdev_remove(remote_node, new_drbd)
3424         raise errors.OpExecError("Failed to create volume on primary!\n"
3425                                  "Full abort, cleanup manually!!")
3426
3427       # the device exists now
3428       # call the primary node to add the mirror to md
3429       logger.Info("adding new mirror component to md")
3430       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3431                                            [new_drbd]):
3432         logger.Error("Can't add mirror compoment to md!")
3433         cfg.SetDiskID(new_drbd, remote_node)
3434         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3435           logger.Error("Can't rollback on secondary")
3436         cfg.SetDiskID(new_drbd, instance.primary_node)
3437         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3438           logger.Error("Can't rollback on primary")
3439         raise errors.OpExecError("Full abort, cleanup manually!!")
3440
3441       dev.children.append(new_drbd)
3442       cfg.AddInstance(instance)
3443
3444     # this can fail as the old devices are degraded and _WaitForSync
3445     # does a combined result over all disks, so we don't check its
3446     # return value
3447     _WaitForSync(cfg, instance, unlock=True)
3448
3449     # so check manually all the devices
3450     for name in iv_names:
3451       dev, child, new_drbd = iv_names[name]
3452       cfg.SetDiskID(dev, instance.primary_node)
3453       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3454       if is_degr:
3455         raise errors.OpExecError("MD device %s is degraded!" % name)
3456       cfg.SetDiskID(new_drbd, instance.primary_node)
3457       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3458       if is_degr:
3459         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3460
3461     for name in iv_names:
3462       dev, child, new_drbd = iv_names[name]
3463       logger.Info("remove mirror %s component" % name)
3464       cfg.SetDiskID(dev, instance.primary_node)
3465       if not rpc.call_blockdev_removechildren(instance.primary_node,
3466                                               dev, [child]):
3467         logger.Error("Can't remove child from mirror, aborting"
3468                      " *this device cleanup*.\nYou need to cleanup manually!!")
3469         continue
3470
3471       for node in child.logical_id[:2]:
3472         logger.Info("remove child device on %s" % node)
3473         cfg.SetDiskID(child, node)
3474         if not rpc.call_blockdev_remove(node, child):
3475           logger.Error("Warning: failed to remove device from node %s,"
3476                        " continuing operation." % node)
3477
3478       dev.children.remove(child)
3479
3480       cfg.AddInstance(instance)
3481
3482
3483 class LUQueryInstanceData(NoHooksLU):
3484   """Query runtime instance data.
3485
3486   """
3487   _OP_REQP = ["instances"]
3488
3489   def CheckPrereq(self):
3490     """Check prerequisites.
3491
3492     This only checks the optional instance list against the existing names.
3493
3494     """
3495     if not isinstance(self.op.instances, list):
3496       raise errors.OpPrereqError("Invalid argument type 'instances'")
3497     if self.op.instances:
3498       self.wanted_instances = []
3499       names = self.op.instances
3500       for name in names:
3501         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3502         if instance is None:
3503           raise errors.OpPrereqError("No such instance name '%s'" % name)
3504       self.wanted_instances.append(instance)
3505     else:
3506       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3507                                in self.cfg.GetInstanceList()]
3508     return
3509
3510
3511   def _ComputeDiskStatus(self, instance, snode, dev):
3512     """Compute block device status.
3513
3514     """
3515     self.cfg.SetDiskID(dev, instance.primary_node)
3516     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3517     if dev.dev_type in constants.LDS_DRBD:
3518       # we change the snode then (otherwise we use the one passed in)
3519       if dev.logical_id[0] == instance.primary_node:
3520         snode = dev.logical_id[1]
3521       else:
3522         snode = dev.logical_id[0]
3523
3524     if snode:
3525       self.cfg.SetDiskID(dev, snode)
3526       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3527     else:
3528       dev_sstatus = None
3529
3530     if dev.children:
3531       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3532                       for child in dev.children]
3533     else:
3534       dev_children = []
3535
3536     data = {
3537       "iv_name": dev.iv_name,
3538       "dev_type": dev.dev_type,
3539       "logical_id": dev.logical_id,
3540       "physical_id": dev.physical_id,
3541       "pstatus": dev_pstatus,
3542       "sstatus": dev_sstatus,
3543       "children": dev_children,
3544       }
3545
3546     return data
3547
3548   def Exec(self, feedback_fn):
3549     """Gather and return data"""
3550     result = {}
3551     for instance in self.wanted_instances:
3552       remote_info = rpc.call_instance_info(instance.primary_node,
3553                                                 instance.name)
3554       if remote_info and "state" in remote_info:
3555         remote_state = "up"
3556       else:
3557         remote_state = "down"
3558       if instance.status == "down":
3559         config_state = "down"
3560       else:
3561         config_state = "up"
3562
3563       disks = [self._ComputeDiskStatus(instance, None, device)
3564                for device in instance.disks]
3565
3566       idict = {
3567         "name": instance.name,
3568         "config_state": config_state,
3569         "run_state": remote_state,
3570         "pnode": instance.primary_node,
3571         "snodes": instance.secondary_nodes,
3572         "os": instance.os,
3573         "memory": instance.memory,
3574         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3575         "disks": disks,
3576         "vcpus": instance.vcpus,
3577         }
3578
3579       result[instance.name] = idict
3580
3581     return result
3582
3583
3584 class LUSetInstanceParms(LogicalUnit):
3585   """Modifies an instances's parameters.
3586
3587   """
3588   HPATH = "instance-modify"
3589   HTYPE = constants.HTYPE_INSTANCE
3590   _OP_REQP = ["instance_name"]
3591
3592   def BuildHooksEnv(self):
3593     """Build hooks env.
3594
3595     This runs on the master, primary and secondaries.
3596
3597     """
3598     args = dict()
3599     if self.mem:
3600       args['memory'] = self.mem
3601     if self.vcpus:
3602       args['vcpus'] = self.vcpus
3603     if self.do_ip or self.do_bridge:
3604       if self.do_ip:
3605         ip = self.ip
3606       else:
3607         ip = self.instance.nics[0].ip
3608       if self.bridge:
3609         bridge = self.bridge
3610       else:
3611         bridge = self.instance.nics[0].bridge
3612       args['nics'] = [(ip, bridge)]
3613     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3614     nl = [self.sstore.GetMasterNode(),
3615           self.instance.primary_node] + list(self.instance.secondary_nodes)
3616     return env, nl, nl
3617
3618   def CheckPrereq(self):
3619     """Check prerequisites.
3620
3621     This only checks the instance list against the existing names.
3622
3623     """
3624     self.mem = getattr(self.op, "mem", None)
3625     self.vcpus = getattr(self.op, "vcpus", None)
3626     self.ip = getattr(self.op, "ip", None)
3627     self.bridge = getattr(self.op, "bridge", None)
3628     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3629       raise errors.OpPrereqError("No changes submitted")
3630     if self.mem is not None:
3631       try:
3632         self.mem = int(self.mem)
3633       except ValueError, err:
3634         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3635     if self.vcpus is not None:
3636       try:
3637         self.vcpus = int(self.vcpus)
3638       except ValueError, err:
3639         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3640     if self.ip is not None:
3641       self.do_ip = True
3642       if self.ip.lower() == "none":
3643         self.ip = None
3644       else:
3645         if not utils.IsValidIP(self.ip):
3646           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3647     else:
3648       self.do_ip = False
3649     self.do_bridge = (self.bridge is not None)
3650
3651     instance = self.cfg.GetInstanceInfo(
3652       self.cfg.ExpandInstanceName(self.op.instance_name))
3653     if instance is None:
3654       raise errors.OpPrereqError("No such instance name '%s'" %
3655                                  self.op.instance_name)
3656     self.op.instance_name = instance.name
3657     self.instance = instance
3658     return
3659
3660   def Exec(self, feedback_fn):
3661     """Modifies an instance.
3662
3663     All parameters take effect only at the next restart of the instance.
3664     """
3665     result = []
3666     instance = self.instance
3667     if self.mem:
3668       instance.memory = self.mem
3669       result.append(("mem", self.mem))
3670     if self.vcpus:
3671       instance.vcpus = self.vcpus
3672       result.append(("vcpus",  self.vcpus))
3673     if self.do_ip:
3674       instance.nics[0].ip = self.ip
3675       result.append(("ip", self.ip))
3676     if self.bridge:
3677       instance.nics[0].bridge = self.bridge
3678       result.append(("bridge", self.bridge))
3679
3680     self.cfg.AddInstance(instance)
3681
3682     return result
3683
3684
3685 class LUQueryExports(NoHooksLU):
3686   """Query the exports list
3687
3688   """
3689   _OP_REQP = []
3690
3691   def CheckPrereq(self):
3692     """Check that the nodelist contains only existing nodes.
3693
3694     """
3695     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3696
3697   def Exec(self, feedback_fn):
3698     """Compute the list of all the exported system images.
3699
3700     Returns:
3701       a dictionary with the structure node->(export-list)
3702       where export-list is a list of the instances exported on
3703       that node.
3704
3705     """
3706     return rpc.call_export_list(self.nodes)
3707
3708
3709 class LUExportInstance(LogicalUnit):
3710   """Export an instance to an image in the cluster.
3711
3712   """
3713   HPATH = "instance-export"
3714   HTYPE = constants.HTYPE_INSTANCE
3715   _OP_REQP = ["instance_name", "target_node", "shutdown"]
3716
3717   def BuildHooksEnv(self):
3718     """Build hooks env.
3719
3720     This will run on the master, primary node and target node.
3721
3722     """
3723     env = {
3724       "EXPORT_NODE": self.op.target_node,
3725       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3726       }
3727     env.update(_BuildInstanceHookEnvByObject(self.instance))
3728     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3729           self.op.target_node]
3730     return env, nl, nl
3731
3732   def CheckPrereq(self):
3733     """Check prerequisites.
3734
3735     This checks that the instance name is a valid one.
3736
3737     """
3738     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3739     self.instance = self.cfg.GetInstanceInfo(instance_name)
3740     if self.instance is None:
3741       raise errors.OpPrereqError("Instance '%s' not found" %
3742                                  self.op.instance_name)
3743
3744     # node verification
3745     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3746     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3747
3748     if self.dst_node is None:
3749       raise errors.OpPrereqError("Destination node '%s' is unknown." %
3750                                  self.op.target_node)
3751     self.op.target_node = self.dst_node.name
3752
3753   def Exec(self, feedback_fn):
3754     """Export an instance to an image in the cluster.
3755
3756     """
3757     instance = self.instance
3758     dst_node = self.dst_node
3759     src_node = instance.primary_node
3760     # shutdown the instance, unless requested not to do so
3761     if self.op.shutdown:
3762       op = opcodes.OpShutdownInstance(instance_name=instance.name)
3763       self.processor.ChainOpCode(op, feedback_fn)
3764
3765     vgname = self.cfg.GetVGName()
3766
3767     snap_disks = []
3768
3769     try:
3770       for disk in instance.disks:
3771         if disk.iv_name == "sda":
3772           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3773           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3774
3775           if not new_dev_name:
3776             logger.Error("could not snapshot block device %s on node %s" %
3777                          (disk.logical_id[1], src_node))
3778           else:
3779             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
3780                                       logical_id=(vgname, new_dev_name),
3781                                       physical_id=(vgname, new_dev_name),
3782                                       iv_name=disk.iv_name)
3783             snap_disks.append(new_dev)
3784
3785     finally:
3786       if self.op.shutdown:
3787         op = opcodes.OpStartupInstance(instance_name=instance.name,
3788                                        force=False)
3789         self.processor.ChainOpCode(op, feedback_fn)
3790
3791     # TODO: check for size
3792
3793     for dev in snap_disks:
3794       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3795                                            instance):
3796         logger.Error("could not export block device %s from node"
3797                      " %s to node %s" %
3798                      (dev.logical_id[1], src_node, dst_node.name))
3799       if not rpc.call_blockdev_remove(src_node, dev):
3800         logger.Error("could not remove snapshot block device %s from"
3801                      " node %s" % (dev.logical_id[1], src_node))
3802
3803     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3804       logger.Error("could not finalize export for instance %s on node %s" %
3805                    (instance.name, dst_node.name))
3806
3807     nodelist = self.cfg.GetNodeList()
3808     nodelist.remove(dst_node.name)
3809
3810     # on one-node clusters nodelist will be empty after the removal
3811     # if we proceed the backup would be removed because OpQueryExports
3812     # substitutes an empty list with the full cluster node list.
3813     if nodelist:
3814       op = opcodes.OpQueryExports(nodes=nodelist)
3815       exportlist = self.processor.ChainOpCode(op, feedback_fn)
3816       for node in exportlist:
3817         if instance.name in exportlist[node]:
3818           if not rpc.call_export_remove(node, instance.name):
3819             logger.Error("could not remove older export for instance %s"
3820                          " on node %s" % (instance.name, node))
3821
3822
3823 class TagsLU(NoHooksLU):
3824   """Generic tags LU.
3825
3826   This is an abstract class which is the parent of all the other tags LUs.
3827
3828   """
3829   def CheckPrereq(self):
3830     """Check prerequisites.
3831
3832     """
3833     if self.op.kind == constants.TAG_CLUSTER:
3834       self.target = self.cfg.GetClusterInfo()
3835     elif self.op.kind == constants.TAG_NODE:
3836       name = self.cfg.ExpandNodeName(self.op.name)
3837       if name is None:
3838         raise errors.OpPrereqError("Invalid node name (%s)" %
3839                                    (self.op.name,))
3840       self.op.name = name
3841       self.target = self.cfg.GetNodeInfo(name)
3842     elif self.op.kind == constants.TAG_INSTANCE:
3843       name = self.cfg.ExpandInstanceName(self.op.name)
3844       if name is None:
3845         raise errors.OpPrereqError("Invalid instance name (%s)" %
3846                                    (self.op.name,))
3847       self.op.name = name
3848       self.target = self.cfg.GetInstanceInfo(name)
3849     else:
3850       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3851                                  str(self.op.kind))
3852
3853
3854 class LUGetTags(TagsLU):
3855   """Returns the tags of a given object.
3856
3857   """
3858   _OP_REQP = ["kind", "name"]
3859
3860   def Exec(self, feedback_fn):
3861     """Returns the tag list.
3862
3863     """
3864     return self.target.GetTags()
3865
3866
3867 class LUAddTags(TagsLU):
3868   """Sets a tag on a given object.
3869
3870   """
3871   _OP_REQP = ["kind", "name", "tags"]
3872
3873   def CheckPrereq(self):
3874     """Check prerequisites.
3875
3876     This checks the type and length of the tag name and value.
3877
3878     """
3879     TagsLU.CheckPrereq(self)
3880     for tag in self.op.tags:
3881       objects.TaggableObject.ValidateTag(tag)
3882
3883   def Exec(self, feedback_fn):
3884     """Sets the tag.
3885
3886     """
3887     try:
3888       for tag in self.op.tags:
3889         self.target.AddTag(tag)
3890     except errors.TagError, err:
3891       raise errors.OpExecError("Error while setting tag: %s" % str(err))
3892     try:
3893       self.cfg.Update(self.target)
3894     except errors.ConfigurationError:
3895       raise errors.OpRetryError("There has been a modification to the"
3896                                 " config file and the operation has been"
3897                                 " aborted. Please retry.")
3898
3899
3900 class LUDelTags(TagsLU):
3901   """Delete a list of tags from a given object.
3902
3903   """
3904   _OP_REQP = ["kind", "name", "tags"]
3905
3906   def CheckPrereq(self):
3907     """Check prerequisites.
3908
3909     This checks that we have the given tag.
3910
3911     """
3912     TagsLU.CheckPrereq(self)
3913     for tag in self.op.tags:
3914       objects.TaggableObject.ValidateTag(tag)
3915     del_tags = frozenset(self.op.tags)
3916     cur_tags = self.target.GetTags()
3917     if not del_tags <= cur_tags:
3918       diff_tags = del_tags - cur_tags
3919       diff_names = ["'%s'" % tag for tag in diff_tags]
3920       diff_names.sort()
3921       raise errors.OpPrereqError("Tag(s) %s not found" %
3922                                  (",".join(diff_names)))
3923
3924   def Exec(self, feedback_fn):
3925     """Remove the tag from the object.
3926
3927     """
3928     for tag in self.op.tags:
3929       self.target.RemoveTag(tag)
3930     try:
3931       self.cfg.Update(self.target)
3932     except errors.ConfigurationError:
3933       raise errors.OpRetryError("There has been a modification to the"
3934                                 " config file and the operation has been"
3935                                 " aborted. Please retry.")