- Implement “gnt-instance reinstall --os-type=…”
[ganeti-local] / lib / cmdlib.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import socket
30 import time
31 import tempfile
32 import re
33 import platform
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47 class LogicalUnit(object):
48   """Logical Unit base class..
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.processor = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     for attr_name in self._OP_REQP:
78       attr_val = getattr(op, attr_name, None)
79       if attr_val is None:
80         raise errors.OpPrereqError, ("Required parameter '%s' missing" %
81                                      attr_name)
82     if self.REQ_CLUSTER:
83       if not cfg.IsCluster():
84         raise errors.OpPrereqError, ("Cluster not initialized yet,"
85                                      " use 'gnt-cluster init' first.")
86       if self.REQ_MASTER:
87         master = sstore.GetMasterNode()
88         if master != socket.gethostname():
89           raise errors.OpPrereqError, ("Commands must be run on the master"
90                                        " node %s" % master)
91
92   def CheckPrereq(self):
93     """Check prerequisites for this LU.
94
95     This method should check that the prerequisites for the execution
96     of this LU are fulfilled. It can do internode communication, but
97     it should be idempotent - no cluster or system changes are
98     allowed.
99
100     The method should raise errors.OpPrereqError in case something is
101     not fulfilled. Its return value is ignored.
102
103     This method should also update all the parameters of the opcode to
104     their canonical form; e.g. a short node name must be fully
105     expanded after this method has successfully completed (so that
106     hooks, logging, etc. work correctly).
107
108     """
109     raise NotImplementedError
110
111   def Exec(self, feedback_fn):
112     """Execute the LU.
113
114     This method should implement the actual work. It should raise
115     errors.OpExecError for failures that are somewhat dealt with in
116     code, or expected.
117
118     """
119     raise NotImplementedError
120
121   def BuildHooksEnv(self):
122     """Build hooks environment for this LU.
123
124     This method should return a three-node tuple consisting of: a dict
125     containing the environment that will be used for running the
126     specific hook for this LU, a list of node names on which the hook
127     should run before the execution, and a list of node names on which
128     the hook should run after the execution.
129
130     The keys of the dict must not have 'GANETI_' prefixed as this will
131     be handled in the hooks runner. Also note additional keys will be
132     added by the hooks runner. If the LU doesn't define any
133     environment, an empty dict (and not None) should be returned.
134
135     As for the node lists, the master should not be included in the
136     them, as it will be added by the hooks runner in case this LU
137     requires a cluster to run on (otherwise we don't have a node
138     list). No nodes should be returned as an empty list (and not
139     None).
140
141     Note that if the HPATH for a LU class is None, this function will
142     not be called.
143
144     """
145     raise NotImplementedError
146
147
148 class NoHooksLU(LogicalUnit):
149   """Simple LU which runs no hooks.
150
151   This LU is intended as a parent for other LogicalUnits which will
152   run no hooks, in order to reduce duplicate code.
153
154   """
155   HPATH = None
156   HTYPE = None
157
158   def BuildHooksEnv(self):
159     """Build hooks env.
160
161     This is a no-op, since we don't run hooks.
162
163     """
164     return
165
166
167 def _GetWantedNodes(lu, nodes):
168   """Returns list of checked and expanded nodes.
169
170   Args:
171     nodes: List of nodes (strings) or None for all
172
173   """
174   if nodes is not None and not isinstance(nodes, list):
175     raise errors.OpPrereqError, "Invalid argument type 'nodes'"
176
177   if nodes:
178     wanted_nodes = []
179
180     for name in nodes:
181       node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
182       if node is None:
183         raise errors.OpPrereqError, ("No such node name '%s'" % name)
184     wanted_nodes.append(node)
185
186     return wanted_nodes
187   else:
188     return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
189
190
191 def _CheckOutputFields(static, dynamic, selected):
192   """Checks whether all selected fields are valid.
193
194   Args:
195     static: Static fields
196     dynamic: Dynamic fields
197
198   """
199   static_fields = frozenset(static)
200   dynamic_fields = frozenset(dynamic)
201
202   all_fields = static_fields | dynamic_fields
203
204   if not all_fields.issuperset(selected):
205     raise errors.OpPrereqError, ("Unknown output fields selected: %s"
206                                  % ",".join(frozenset(selected).
207                                             difference(all_fields)))
208
209
210 def _UpdateEtcHosts(fullnode, ip):
211   """Ensure a node has a correct entry in /etc/hosts.
212
213   Args:
214     fullnode - Fully qualified domain name of host. (str)
215     ip       - IPv4 address of host (str)
216
217   """
218   node = fullnode.split(".", 1)[0]
219
220   f = open('/etc/hosts', 'r+')
221
222   inthere = False
223
224   save_lines = []
225   add_lines = []
226   removed = False
227
228   while True:
229     rawline = f.readline()
230
231     if not rawline:
232       # End of file
233       break
234
235     line = rawline.split('\n')[0]
236
237     # Strip off comments
238     line = line.split('#')[0]
239
240     if not line:
241       # Entire line was comment, skip
242       save_lines.append(rawline)
243       continue
244
245     fields = line.split()
246
247     haveall = True
248     havesome = False
249     for spec in [ ip, fullnode, node ]:
250       if spec not in fields:
251         haveall = False
252       if spec in fields:
253         havesome = True
254
255     if haveall:
256       inthere = True
257       save_lines.append(rawline)
258       continue
259
260     if havesome and not haveall:
261       # Line (old, or manual?) which is missing some.  Remove.
262       removed = True
263       continue
264
265     save_lines.append(rawline)
266
267   if not inthere:
268     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
269
270   if removed:
271     if add_lines:
272       save_lines = save_lines + add_lines
273
274     # We removed a line, write a new file and replace old.
275     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
276     newfile = os.fdopen(fd, 'w')
277     newfile.write(''.join(save_lines))
278     newfile.close()
279     os.rename(tmpname, '/etc/hosts')
280
281   elif add_lines:
282     # Simply appending a new line will do the trick.
283     f.seek(0, 2)
284     for add in add_lines:
285       f.write(add)
286
287   f.close()
288
289
290 def _UpdateKnownHosts(fullnode, ip, pubkey):
291   """Ensure a node has a correct known_hosts entry.
292
293   Args:
294     fullnode - Fully qualified domain name of host. (str)
295     ip       - IPv4 address of host (str)
296     pubkey   - the public key of the cluster
297
298   """
299   if os.path.exists('/etc/ssh/ssh_known_hosts'):
300     f = open('/etc/ssh/ssh_known_hosts', 'r+')
301   else:
302     f = open('/etc/ssh/ssh_known_hosts', 'w+')
303
304   inthere = False
305
306   save_lines = []
307   add_lines = []
308   removed = False
309
310   while True:
311     rawline = f.readline()
312     logger.Debug('read %s' % (repr(rawline),))
313
314     if not rawline:
315       # End of file
316       break
317
318     line = rawline.split('\n')[0]
319
320     parts = line.split(' ')
321     fields = parts[0].split(',')
322     key = parts[2]
323
324     haveall = True
325     havesome = False
326     for spec in [ ip, fullnode ]:
327       if spec not in fields:
328         haveall = False
329       if spec in fields:
330         havesome = True
331
332     logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
333     if haveall and key == pubkey:
334       inthere = True
335       save_lines.append(rawline)
336       logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
337       continue
338
339     if havesome and (not haveall or key != pubkey):
340       removed = True
341       logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
342       continue
343
344     save_lines.append(rawline)
345
346   if not inthere:
347     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
348     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
349
350   if removed:
351     save_lines = save_lines + add_lines
352
353     # Write a new file and replace old.
354     fd, tmpname = tempfile.mkstemp('tmp', 'ssh_known_hosts_', '/etc/ssh')
355     newfile = os.fdopen(fd, 'w')
356     newfile.write(''.join(save_lines))
357     newfile.close()
358     logger.Debug("Wrote new known_hosts.")
359     os.rename(tmpname, '/etc/ssh/ssh_known_hosts')
360
361   elif add_lines:
362     # Simply appending a new line will do the trick.
363     f.seek(0, 2)
364     for add in add_lines:
365       f.write(add)
366
367   f.close()
368
369
370 def _HasValidVG(vglist, vgname):
371   """Checks if the volume group list is valid.
372
373   A non-None return value means there's an error, and the return value
374   is the error message.
375
376   """
377   vgsize = vglist.get(vgname, None)
378   if vgsize is None:
379     return "volume group '%s' missing" % vgname
380   elif vgsize < 20480:
381     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
382             (vgname, vgsize))
383   return None
384
385
386 def _InitSSHSetup(node):
387   """Setup the SSH configuration for the cluster.
388
389
390   This generates a dsa keypair for root, adds the pub key to the
391   permitted hosts and adds the hostkey to its own known hosts.
392
393   Args:
394     node: the name of this host as a fqdn
395
396   """
397   utils.RemoveFile('/root/.ssh/known_hosts')
398
399   if os.path.exists('/root/.ssh/id_dsa'):
400     utils.CreateBackup('/root/.ssh/id_dsa')
401   if os.path.exists('/root/.ssh/id_dsa.pub'):
402     utils.CreateBackup('/root/.ssh/id_dsa.pub')
403
404   utils.RemoveFile('/root/.ssh/id_dsa')
405   utils.RemoveFile('/root/.ssh/id_dsa.pub')
406
407   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
408                          "-f", "/root/.ssh/id_dsa",
409                          "-q", "-N", ""])
410   if result.failed:
411     raise errors.OpExecError, ("could not generate ssh keypair, error %s" %
412                                result.output)
413
414   f = open('/root/.ssh/id_dsa.pub', 'r')
415   try:
416     utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
417   finally:
418     f.close()
419
420
421 def _InitGanetiServerSetup(ss):
422   """Setup the necessary configuration for the initial node daemon.
423
424   This creates the nodepass file containing the shared password for
425   the cluster and also generates the SSL certificate.
426
427   """
428   # Create pseudo random password
429   randpass = sha.new(os.urandom(64)).hexdigest()
430   # and write it into sstore
431   ss.SetKey(ss.SS_NODED_PASS, randpass)
432
433   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
434                          "-days", str(365*5), "-nodes", "-x509",
435                          "-keyout", constants.SSL_CERT_FILE,
436                          "-out", constants.SSL_CERT_FILE, "-batch"])
437   if result.failed:
438     raise errors.OpExecError, ("could not generate server ssl cert, command"
439                                " %s had exitcode %s and error message %s" %
440                                (result.cmd, result.exit_code, result.output))
441
442   os.chmod(constants.SSL_CERT_FILE, 0400)
443
444   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
445
446   if result.failed:
447     raise errors.OpExecError, ("could not start the node daemon, command %s"
448                                " had exitcode %s and error %s" %
449                                (result.cmd, result.exit_code, result.output))
450
451
452 class LUInitCluster(LogicalUnit):
453   """Initialise the cluster.
454
455   """
456   HPATH = "cluster-init"
457   HTYPE = constants.HTYPE_CLUSTER
458   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
459               "def_bridge", "master_netdev"]
460   REQ_CLUSTER = False
461
462   def BuildHooksEnv(self):
463     """Build hooks env.
464
465     Notes: Since we don't require a cluster, we must manually add
466     ourselves in the post-run node list.
467
468     """
469     env = {"CLUSTER": self.op.cluster_name,
470            "MASTER": self.hostname['hostname_full']}
471     return env, [], [self.hostname['hostname_full']]
472
473   def CheckPrereq(self):
474     """Verify that the passed name is a valid one.
475
476     """
477     if config.ConfigWriter.IsCluster():
478       raise errors.OpPrereqError, ("Cluster is already initialised")
479
480     hostname_local = socket.gethostname()
481     self.hostname = hostname = utils.LookupHostname(hostname_local)
482     if not hostname:
483       raise errors.OpPrereqError, ("Cannot resolve my own hostname ('%s')" %
484                                    hostname_local)
485
486     self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
487     if not clustername:
488       raise errors.OpPrereqError, ("Cannot resolve given cluster name ('%s')"
489                                    % self.op.cluster_name)
490
491     result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
492     if result.failed:
493       raise errors.OpPrereqError, ("Inconsistency: this host's name resolves"
494                                    " to %s,\nbut this ip address does not"
495                                    " belong to this host."
496                                    " Aborting." % hostname['ip'])
497
498     secondary_ip = getattr(self.op, "secondary_ip", None)
499     if secondary_ip and not utils.IsValidIP(secondary_ip):
500       raise errors.OpPrereqError, ("Invalid secondary ip given")
501     if secondary_ip and secondary_ip != hostname['ip']:
502       result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
503       if result.failed:
504         raise errors.OpPrereqError, ("You gave %s as secondary IP,\n"
505                                      "but it does not belong to this host." %
506                                      secondary_ip)
507     self.secondary_ip = secondary_ip
508
509     # checks presence of the volume group given
510     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
511
512     if vgstatus:
513       raise errors.OpPrereqError, ("Error: %s" % vgstatus)
514
515     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
516                     self.op.mac_prefix):
517       raise errors.OpPrereqError, ("Invalid mac prefix given '%s'" %
518                                    self.op.mac_prefix)
519
520     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
521       raise errors.OpPrereqError, ("Invalid hypervisor type given '%s'" %
522                                    self.op.hypervisor_type)
523
524     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
525     if result.failed:
526       raise errors.OpPrereqError, ("Invalid master netdev given (%s): '%s'" %
527                                    (self.op.master_netdev, result.output))
528
529   def Exec(self, feedback_fn):
530     """Initialize the cluster.
531
532     """
533     clustername = self.clustername
534     hostname = self.hostname
535
536     # set up the simple store
537     ss = ssconf.SimpleStore()
538     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
539     ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
540     ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
541     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
542     ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
543
544     # set up the inter-node password and certificate
545     _InitGanetiServerSetup(ss)
546
547     # start the master ip
548     rpc.call_node_start_master(hostname['hostname_full'])
549
550     # set up ssh config and /etc/hosts
551     f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
552     try:
553       sshline = f.read()
554     finally:
555       f.close()
556     sshkey = sshline.split(" ")[1]
557
558     _UpdateEtcHosts(hostname['hostname_full'],
559                     hostname['ip'],
560                     )
561
562     _UpdateKnownHosts(hostname['hostname_full'],
563                       hostname['ip'],
564                       sshkey,
565                       )
566
567     _InitSSHSetup(hostname['hostname'])
568
569     # init of cluster config file
570     cfgw = config.ConfigWriter()
571     cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
572                     sshkey, self.op.mac_prefix,
573                     self.op.vg_name, self.op.def_bridge)
574
575
576 class LUDestroyCluster(NoHooksLU):
577   """Logical unit for destroying the cluster.
578
579   """
580   _OP_REQP = []
581
582   def CheckPrereq(self):
583     """Check prerequisites.
584
585     This checks whether the cluster is empty.
586
587     Any errors are signalled by raising errors.OpPrereqError.
588
589     """
590     master = self.sstore.GetMasterNode()
591
592     nodelist = self.cfg.GetNodeList()
593     if len(nodelist) > 0 and nodelist != [master]:
594       raise errors.OpPrereqError, ("There are still %d node(s) in "
595                                    "this cluster." % (len(nodelist) - 1))
596
597   def Exec(self, feedback_fn):
598     """Destroys the cluster.
599
600     """
601     utils.CreateBackup('/root/.ssh/id_dsa')
602     utils.CreateBackup('/root/.ssh/id_dsa.pub')
603     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
604
605
606 class LUVerifyCluster(NoHooksLU):
607   """Verifies the cluster status.
608
609   """
610   _OP_REQP = []
611
612   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
613                   remote_version, feedback_fn):
614     """Run multiple tests against a node.
615
616     Test list:
617       - compares ganeti version
618       - checks vg existance and size > 20G
619       - checks config file checksum
620       - checks ssh to other nodes
621
622     Args:
623       node: name of the node to check
624       file_list: required list of files
625       local_cksum: dictionary of local files and their checksums
626
627     """
628     # compares ganeti version
629     local_version = constants.PROTOCOL_VERSION
630     if not remote_version:
631       feedback_fn(" - ERROR: connection to %s failed" % (node))
632       return True
633
634     if local_version != remote_version:
635       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
636                       (local_version, node, remote_version))
637       return True
638
639     # checks vg existance and size > 20G
640
641     bad = False
642     if not vglist:
643       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
644                       (node,))
645       bad = True
646     else:
647       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
648       if vgstatus:
649         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
650         bad = True
651
652     # checks config file checksum
653     # checks ssh to any
654
655     if 'filelist' not in node_result:
656       bad = True
657       feedback_fn("  - ERROR: node hasn't returned file checksum data")
658     else:
659       remote_cksum = node_result['filelist']
660       for file_name in file_list:
661         if file_name not in remote_cksum:
662           bad = True
663           feedback_fn("  - ERROR: file '%s' missing" % file_name)
664         elif remote_cksum[file_name] != local_cksum[file_name]:
665           bad = True
666           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
667
668     if 'nodelist' not in node_result:
669       bad = True
670       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
671     else:
672       if node_result['nodelist']:
673         bad = True
674         for node in node_result['nodelist']:
675           feedback_fn("  - ERROR: communication with node '%s': %s" %
676                           (node, node_result['nodelist'][node]))
677     hyp_result = node_result.get('hypervisor', None)
678     if hyp_result is not None:
679       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
680     return bad
681
682   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
683     """Verify an instance.
684
685     This function checks to see if the required block devices are
686     available on the instance's node.
687
688     """
689     bad = False
690
691     instancelist = self.cfg.GetInstanceList()
692     if not instance in instancelist:
693       feedback_fn("  - ERROR: instance %s not in instance list %s" %
694                       (instance, instancelist))
695       bad = True
696
697     instanceconfig = self.cfg.GetInstanceInfo(instance)
698     node_current = instanceconfig.primary_node
699
700     node_vol_should = {}
701     instanceconfig.MapLVsByNode(node_vol_should)
702
703     for node in node_vol_should:
704       for volume in node_vol_should[node]:
705         if node not in node_vol_is or volume not in node_vol_is[node]:
706           feedback_fn("  - ERROR: volume %s missing on node %s" %
707                           (volume, node))
708           bad = True
709
710     if not instanceconfig.status == 'down':
711       if not instance in node_instance[node_current]:
712         feedback_fn("  - ERROR: instance %s not running on node %s" %
713                         (instance, node_current))
714         bad = True
715
716     for node in node_instance:
717       if (not node == node_current):
718         if instance in node_instance[node]:
719           feedback_fn("  - ERROR: instance %s should not run on node %s" %
720                           (instance, node))
721           bad = True
722
723     return not bad
724
725   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726     """Verify if there are any unknown volumes in the cluster.
727
728     The .os, .swap and backup volumes are ignored. All other volumes are
729     reported as unknown.
730
731     """
732     bad = False
733
734     for node in node_vol_is:
735       for volume in node_vol_is[node]:
736         if node not in node_vol_should or volume not in node_vol_should[node]:
737           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738                       (volume, node))
739           bad = True
740     return bad
741
742   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743     """Verify the list of running instances.
744
745     This checks what instances are running but unknown to the cluster.
746
747     """
748     bad = False
749     for node in node_instance:
750       for runninginstance in node_instance[node]:
751         if runninginstance not in instancelist:
752           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753                           (runninginstance, node))
754           bad = True
755     return bad
756
757   def CheckPrereq(self):
758     """Check prerequisites.
759
760     This has no prerequisites.
761
762     """
763     pass
764
765   def Exec(self, feedback_fn):
766     """Verify integrity of cluster, performing various test on nodes.
767
768     """
769     bad = False
770     feedback_fn("* Verifying global settings")
771     self.cfg.VerifyConfig()
772
773     master = self.sstore.GetMasterNode()
774     vg_name = self.cfg.GetVGName()
775     nodelist = utils.NiceSort(self.cfg.GetNodeList())
776     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
777     node_volume = {}
778     node_instance = {}
779
780     # FIXME: verify OS list
781     # do local checksums
782     file_names = list(self.sstore.GetFileList())
783     file_names.append(constants.SSL_CERT_FILE)
784     file_names.append(constants.CLUSTER_CONF_FILE)
785     local_checksums = utils.FingerprintFiles(file_names)
786
787     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
788     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
789     all_instanceinfo = rpc.call_instance_list(nodelist)
790     all_vglist = rpc.call_vg_list(nodelist)
791     node_verify_param = {
792       'filelist': file_names,
793       'nodelist': nodelist,
794       'hypervisor': None,
795       }
796     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
797     all_rversion = rpc.call_version(nodelist)
798
799     for node in nodelist:
800       feedback_fn("* Verifying node %s" % node)
801       result = self._VerifyNode(node, file_names, local_checksums,
802                                 all_vglist[node], all_nvinfo[node],
803                                 all_rversion[node], feedback_fn)
804       bad = bad or result
805
806       # node_volume
807       volumeinfo = all_volumeinfo[node]
808
809       if type(volumeinfo) != dict:
810         feedback_fn("  - ERROR: connection to %s failed" % (node,))
811         bad = True
812         continue
813
814       node_volume[node] = volumeinfo
815
816       # node_instance
817       nodeinstance = all_instanceinfo[node]
818       if type(nodeinstance) != list:
819         feedback_fn("  - ERROR: connection to %s failed" % (node,))
820         bad = True
821         continue
822
823       node_instance[node] = nodeinstance
824
825     node_vol_should = {}
826
827     for instance in instancelist:
828       feedback_fn("* Verifying instance %s" % instance)
829       result =  self._VerifyInstance(instance, node_volume, node_instance,
830                                      feedback_fn)
831       bad = bad or result
832
833       inst_config = self.cfg.GetInstanceInfo(instance)
834
835       inst_config.MapLVsByNode(node_vol_should)
836
837     feedback_fn("* Verifying orphan volumes")
838     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
839                                        feedback_fn)
840     bad = bad or result
841
842     feedback_fn("* Verifying remaining instances")
843     result = self._VerifyOrphanInstances(instancelist, node_instance,
844                                          feedback_fn)
845     bad = bad or result
846
847     return int(bad)
848
849
850 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
851   """Sleep and poll for an instance's disk to sync.
852
853   """
854   if not instance.disks:
855     return True
856
857   if not oneshot:
858     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
859
860   node = instance.primary_node
861
862   for dev in instance.disks:
863     cfgw.SetDiskID(dev, node)
864
865   retries = 0
866   while True:
867     max_time = 0
868     done = True
869     cumul_degraded = False
870     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
871     if not rstats:
872       logger.ToStderr("Can't get any data from node %s" % node)
873       retries += 1
874       if retries >= 10:
875         raise errors.RemoteError, ("Can't contact node %s for mirror data,"
876                                    " aborting." % node)
877       time.sleep(6)
878       continue
879     retries = 0
880     for i in range(len(rstats)):
881       mstat = rstats[i]
882       if mstat is None:
883         logger.ToStderr("Can't compute data for node %s/%s" %
884                         (node, instance.disks[i].iv_name))
885         continue
886       perc_done, est_time, is_degraded = mstat
887       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
888       if perc_done is not None:
889         done = False
890         if est_time is not None:
891           rem_time = "%d estimated seconds remaining" % est_time
892           max_time = est_time
893         else:
894           rem_time = "no time estimate"
895         logger.ToStdout("- device %s: %5.2f%% done, %s" %
896                         (instance.disks[i].iv_name, perc_done, rem_time))
897     if done or oneshot:
898       break
899
900     if unlock:
901       utils.Unlock('cmd')
902     try:
903       time.sleep(min(60, max_time))
904     finally:
905       if unlock:
906         utils.Lock('cmd')
907
908   if done:
909     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
910   return not cumul_degraded
911
912
913 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
914   """Check that mirrors are not degraded.
915
916   """
917   cfgw.SetDiskID(dev, node)
918
919   result = True
920   if on_primary or dev.AssembleOnSecondary():
921     rstats = rpc.call_blockdev_find(node, dev)
922     if not rstats:
923       logger.ToStderr("Can't get any data from node %s" % node)
924       result = False
925     else:
926       result = result and (not rstats[5])
927   if dev.children:
928     for child in dev.children:
929       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
930
931   return result
932
933
934 class LUDiagnoseOS(NoHooksLU):
935   """Logical unit for OS diagnose/query.
936
937   """
938   _OP_REQP = []
939
940   def CheckPrereq(self):
941     """Check prerequisites.
942
943     This always succeeds, since this is a pure query LU.
944
945     """
946     return
947
948   def Exec(self, feedback_fn):
949     """Compute the list of OSes.
950
951     """
952     node_list = self.cfg.GetNodeList()
953     node_data = rpc.call_os_diagnose(node_list)
954     if node_data == False:
955       raise errors.OpExecError, "Can't gather the list of OSes"
956     return node_data
957
958
959 class LURemoveNode(LogicalUnit):
960   """Logical unit for removing a node.
961
962   """
963   HPATH = "node-remove"
964   HTYPE = constants.HTYPE_NODE
965   _OP_REQP = ["node_name"]
966
967   def BuildHooksEnv(self):
968     """Build hooks env.
969
970     This doesn't run on the target node in the pre phase as a failed
971     node would not allows itself to run.
972
973     """
974     all_nodes = self.cfg.GetNodeList()
975     all_nodes.remove(self.op.node_name)
976     return {"NODE_NAME": self.op.node_name}, all_nodes, all_nodes
977
978   def CheckPrereq(self):
979     """Check prerequisites.
980
981     This checks:
982      - the node exists in the configuration
983      - it does not have primary or secondary instances
984      - it's not the master
985
986     Any errors are signalled by raising errors.OpPrereqError.
987
988     """
989     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
990     if node is None:
991       logger.Error("Error: Node '%s' is unknown." % self.op.node_name)
992       return 1
993
994     instance_list = self.cfg.GetInstanceList()
995
996     masternode = self.sstore.GetMasterNode()
997     if node.name == masternode:
998       raise errors.OpPrereqError, ("Node is the master node,"
999                                    " you need to failover first.")
1000
1001     for instance_name in instance_list:
1002       instance = self.cfg.GetInstanceInfo(instance_name)
1003       if node.name == instance.primary_node:
1004         raise errors.OpPrereqError, ("Instance %s still running on the node,"
1005                                      " please remove first." % instance_name)
1006       if node.name in instance.secondary_nodes:
1007         raise errors.OpPrereqError, ("Instance %s has node as a secondary,"
1008                                      " please remove first." % instance_name)
1009     self.op.node_name = node.name
1010     self.node = node
1011
1012   def Exec(self, feedback_fn):
1013     """Removes the node from the cluster.
1014
1015     """
1016     node = self.node
1017     logger.Info("stopping the node daemon and removing configs from node %s" %
1018                 node.name)
1019
1020     rpc.call_node_leave_cluster(node.name)
1021
1022     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1023
1024     logger.Info("Removing node %s from config" % node.name)
1025
1026     self.cfg.RemoveNode(node.name)
1027
1028
1029 class LUQueryNodes(NoHooksLU):
1030   """Logical unit for querying nodes.
1031
1032   """
1033   _OP_REQP = ["output_fields"]
1034
1035   def CheckPrereq(self):
1036     """Check prerequisites.
1037
1038     This checks that the fields required are valid output fields.
1039
1040     """
1041     self.dynamic_fields = frozenset(["dtotal", "dfree",
1042                                      "mtotal", "mnode", "mfree"])
1043
1044     _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1045                        dynamic=self.dynamic_fields,
1046                        selected=self.op.output_fields)
1047
1048
1049   def Exec(self, feedback_fn):
1050     """Computes the list of nodes and their attributes.
1051
1052     """
1053     nodenames = utils.NiceSort(self.cfg.GetNodeList())
1054     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1055
1056
1057     # begin data gathering
1058
1059     if self.dynamic_fields.intersection(self.op.output_fields):
1060       live_data = {}
1061       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1062       for name in nodenames:
1063         nodeinfo = node_data.get(name, None)
1064         if nodeinfo:
1065           live_data[name] = {
1066             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1067             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1068             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1069             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1070             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1071             }
1072         else:
1073           live_data[name] = {}
1074     else:
1075       live_data = dict.fromkeys(nodenames, {})
1076
1077     node_to_primary = dict.fromkeys(nodenames, 0)
1078     node_to_secondary = dict.fromkeys(nodenames, 0)
1079
1080     if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1081       instancelist = self.cfg.GetInstanceList()
1082
1083       for instance in instancelist:
1084         instanceinfo = self.cfg.GetInstanceInfo(instance)
1085         node_to_primary[instanceinfo.primary_node] += 1
1086         for secnode in instanceinfo.secondary_nodes:
1087           node_to_secondary[secnode] += 1
1088
1089     # end data gathering
1090
1091     output = []
1092     for node in nodelist:
1093       node_output = []
1094       for field in self.op.output_fields:
1095         if field == "name":
1096           val = node.name
1097         elif field == "pinst":
1098           val = node_to_primary[node.name]
1099         elif field == "sinst":
1100           val = node_to_secondary[node.name]
1101         elif field == "pip":
1102           val = node.primary_ip
1103         elif field == "sip":
1104           val = node.secondary_ip
1105         elif field in self.dynamic_fields:
1106           val = live_data[node.name].get(field, "?")
1107         else:
1108           raise errors.ParameterError, field
1109         val = str(val)
1110         node_output.append(val)
1111       output.append(node_output)
1112
1113     return output
1114
1115
1116 class LUQueryNodeVolumes(NoHooksLU):
1117   """Logical unit for getting volumes on node(s).
1118
1119   """
1120   _OP_REQP = ["nodes", "output_fields"]
1121
1122   def CheckPrereq(self):
1123     """Check prerequisites.
1124
1125     This checks that the fields required are valid output fields.
1126
1127     """
1128     self.nodes = _GetWantedNodes(self, self.op.nodes)
1129
1130     _CheckOutputFields(static=["node"],
1131                        dynamic=["phys", "vg", "name", "size", "instance"],
1132                        selected=self.op.output_fields)
1133
1134
1135   def Exec(self, feedback_fn):
1136     """Computes the list of nodes and their attributes.
1137
1138     """
1139     nodenames = utils.NiceSort([node.name for node in self.nodes])
1140     volumes = rpc.call_node_volumes(nodenames)
1141
1142     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1143              in self.cfg.GetInstanceList()]
1144
1145     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1146
1147     output = []
1148     for node in nodenames:
1149       node_vols = volumes[node][:]
1150       node_vols.sort(key=lambda vol: vol['dev'])
1151
1152       for vol in node_vols:
1153         node_output = []
1154         for field in self.op.output_fields:
1155           if field == "node":
1156             val = node
1157           elif field == "phys":
1158             val = vol['dev']
1159           elif field == "vg":
1160             val = vol['vg']
1161           elif field == "name":
1162             val = vol['name']
1163           elif field == "size":
1164             val = int(float(vol['size']))
1165           elif field == "instance":
1166             for inst in ilist:
1167               if node not in lv_by_node[inst]:
1168                 continue
1169               if vol['name'] in lv_by_node[inst][node]:
1170                 val = inst.name
1171                 break
1172             else:
1173               val = '-'
1174           else:
1175             raise errors.ParameterError, field
1176           node_output.append(str(val))
1177
1178         output.append(node_output)
1179
1180     return output
1181
1182
1183 class LUAddNode(LogicalUnit):
1184   """Logical unit for adding node to the cluster.
1185
1186   """
1187   HPATH = "node-add"
1188   HTYPE = constants.HTYPE_NODE
1189   _OP_REQP = ["node_name"]
1190
1191   def BuildHooksEnv(self):
1192     """Build hooks env.
1193
1194     This will run on all nodes before, and on all nodes + the new node after.
1195
1196     """
1197     env = {
1198       "NODE_NAME": self.op.node_name,
1199       "NODE_PIP": self.op.primary_ip,
1200       "NODE_SIP": self.op.secondary_ip,
1201       }
1202     nodes_0 = self.cfg.GetNodeList()
1203     nodes_1 = nodes_0 + [self.op.node_name, ]
1204     return env, nodes_0, nodes_1
1205
1206   def CheckPrereq(self):
1207     """Check prerequisites.
1208
1209     This checks:
1210      - the new node is not already in the config
1211      - it is resolvable
1212      - its parameters (single/dual homed) matches the cluster
1213
1214     Any errors are signalled by raising errors.OpPrereqError.
1215
1216     """
1217     node_name = self.op.node_name
1218     cfg = self.cfg
1219
1220     dns_data = utils.LookupHostname(node_name)
1221     if not dns_data:
1222       raise errors.OpPrereqError, ("Node %s is not resolvable" % node_name)
1223
1224     node = dns_data['hostname']
1225     primary_ip = self.op.primary_ip = dns_data['ip']
1226     secondary_ip = getattr(self.op, "secondary_ip", None)
1227     if secondary_ip is None:
1228       secondary_ip = primary_ip
1229     if not utils.IsValidIP(secondary_ip):
1230       raise errors.OpPrereqError, ("Invalid secondary IP given")
1231     self.op.secondary_ip = secondary_ip
1232     node_list = cfg.GetNodeList()
1233     if node in node_list:
1234       raise errors.OpPrereqError, ("Node %s is already in the configuration"
1235                                    % node)
1236
1237     for existing_node_name in node_list:
1238       existing_node = cfg.GetNodeInfo(existing_node_name)
1239       if (existing_node.primary_ip == primary_ip or
1240           existing_node.secondary_ip == primary_ip or
1241           existing_node.primary_ip == secondary_ip or
1242           existing_node.secondary_ip == secondary_ip):
1243         raise errors.OpPrereqError, ("New node ip address(es) conflict with"
1244                                      " existing node %s" % existing_node.name)
1245
1246     # check that the type of the node (single versus dual homed) is the
1247     # same as for the master
1248     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1249     master_singlehomed = myself.secondary_ip == myself.primary_ip
1250     newbie_singlehomed = secondary_ip == primary_ip
1251     if master_singlehomed != newbie_singlehomed:
1252       if master_singlehomed:
1253         raise errors.OpPrereqError, ("The master has no private ip but the"
1254                                      " new node has one")
1255       else:
1256         raise errors.OpPrereqError ("The master has a private ip but the"
1257                                     " new node doesn't have one")
1258
1259     # checks reachablity
1260     command = ["fping", "-q", primary_ip]
1261     result = utils.RunCmd(command)
1262     if result.failed:
1263       raise errors.OpPrereqError, ("Node not reachable by ping")
1264
1265     if not newbie_singlehomed:
1266       # check reachability from my secondary ip to newbie's secondary ip
1267       command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1268       result = utils.RunCmd(command)
1269       if result.failed:
1270         raise errors.OpPrereqError, ("Node secondary ip not reachable by ping")
1271
1272     self.new_node = objects.Node(name=node,
1273                                  primary_ip=primary_ip,
1274                                  secondary_ip=secondary_ip)
1275
1276   def Exec(self, feedback_fn):
1277     """Adds the new node to the cluster.
1278
1279     """
1280     new_node = self.new_node
1281     node = new_node.name
1282
1283     # set up inter-node password and certificate and restarts the node daemon
1284     gntpass = self.sstore.GetNodeDaemonPassword()
1285     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1286       raise errors.OpExecError, ("ganeti password corruption detected")
1287     f = open(constants.SSL_CERT_FILE)
1288     try:
1289       gntpem = f.read(8192)
1290     finally:
1291       f.close()
1292     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1293     # so we use this to detect an invalid certificate; as long as the
1294     # cert doesn't contain this, the here-document will be correctly
1295     # parsed by the shell sequence below
1296     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1297       raise errors.OpExecError, ("invalid PEM encoding in the SSL certificate")
1298     if not gntpem.endswith("\n"):
1299       raise errors.OpExecError, ("PEM must end with newline")
1300     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1301
1302     # remove first the root's known_hosts file
1303     utils.RemoveFile("/root/.ssh/known_hosts")
1304     # and then connect with ssh to set password and start ganeti-noded
1305     # note that all the below variables are sanitized at this point,
1306     # either by being constants or by the checks above
1307     ss = self.sstore
1308     mycommand = ("umask 077 && "
1309                  "echo '%s' > '%s' && "
1310                  "cat > '%s' << '!EOF.' && \n"
1311                  "%s!EOF.\n%s restart" %
1312                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1313                   constants.SSL_CERT_FILE, gntpem,
1314                   constants.NODE_INITD_SCRIPT))
1315
1316     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1317     if result.failed:
1318       raise errors.OpExecError, ("Remote command on node %s, error: %s,"
1319                                  " output: %s" %
1320                                  (node, result.fail_reason, result.output))
1321
1322     # check connectivity
1323     time.sleep(4)
1324
1325     result = rpc.call_version([node])[node]
1326     if result:
1327       if constants.PROTOCOL_VERSION == result:
1328         logger.Info("communication to node %s fine, sw version %s match" %
1329                     (node, result))
1330       else:
1331         raise errors.OpExecError, ("Version mismatch master version %s,"
1332                                    " node version %s" %
1333                                    (constants.PROTOCOL_VERSION, result))
1334     else:
1335       raise errors.OpExecError, ("Cannot get version from the new node")
1336
1337     # setup ssh on node
1338     logger.Info("copy ssh key to node %s" % node)
1339     keyarray = []
1340     keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1341                 "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1342                 "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1343
1344     for i in keyfiles:
1345       f = open(i, 'r')
1346       try:
1347         keyarray.append(f.read())
1348       finally:
1349         f.close()
1350
1351     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1352                                keyarray[3], keyarray[4], keyarray[5])
1353
1354     if not result:
1355       raise errors.OpExecError, ("Cannot transfer ssh keys to the new node")
1356
1357     # Add node to our /etc/hosts, and add key to known_hosts
1358     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1359     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1360                       self.cfg.GetHostKey())
1361
1362     if new_node.secondary_ip != new_node.primary_ip:
1363       result = ssh.SSHCall(node, "root",
1364                            "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1365       if result.failed:
1366         raise errors.OpExecError, ("Node claims it doesn't have the"
1367                                    " secondary ip you gave (%s).\n"
1368                                    "Please fix and re-run this command." %
1369                                    new_node.secondary_ip)
1370
1371     # Distribute updated /etc/hosts and known_hosts to all nodes,
1372     # including the node just added
1373     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1374     dist_nodes = self.cfg.GetNodeList() + [node]
1375     if myself.name in dist_nodes:
1376       dist_nodes.remove(myself.name)
1377
1378     logger.Debug("Copying hosts and known_hosts to all nodes")
1379     for fname in ("/etc/hosts", "/etc/ssh/ssh_known_hosts"):
1380       result = rpc.call_upload_file(dist_nodes, fname)
1381       for to_node in dist_nodes:
1382         if not result[to_node]:
1383           logger.Error("copy of file %s to node %s failed" %
1384                        (fname, to_node))
1385
1386     to_copy = ss.GetFileList()
1387     for fname in to_copy:
1388       if not ssh.CopyFileToNode(node, fname):
1389         logger.Error("could not copy file %s to node %s" % (fname, node))
1390
1391     logger.Info("adding node %s to cluster.conf" % node)
1392     self.cfg.AddNode(new_node)
1393
1394
1395 class LUMasterFailover(LogicalUnit):
1396   """Failover the master node to the current node.
1397
1398   This is a special LU in that it must run on a non-master node.
1399
1400   """
1401   HPATH = "master-failover"
1402   HTYPE = constants.HTYPE_CLUSTER
1403   REQ_MASTER = False
1404   _OP_REQP = []
1405
1406   def BuildHooksEnv(self):
1407     """Build hooks env.
1408
1409     This will run on the new master only in the pre phase, and on all
1410     the nodes in the post phase.
1411
1412     """
1413     env = {
1414       "NEW_MASTER": self.new_master,
1415       "OLD_MASTER": self.old_master,
1416       }
1417     return env, [self.new_master], self.cfg.GetNodeList()
1418
1419   def CheckPrereq(self):
1420     """Check prerequisites.
1421
1422     This checks that we are not already the master.
1423
1424     """
1425     self.new_master = socket.gethostname()
1426
1427     self.old_master = self.sstore.GetMasterNode()
1428
1429     if self.old_master == self.new_master:
1430       raise errors.OpPrereqError, ("This commands must be run on the node"
1431                                    " where you want the new master to be.\n"
1432                                    "%s is already the master" %
1433                                    self.old_master)
1434
1435   def Exec(self, feedback_fn):
1436     """Failover the master node.
1437
1438     This command, when run on a non-master node, will cause the current
1439     master to cease being master, and the non-master to become new
1440     master.
1441
1442     """
1443     #TODO: do not rely on gethostname returning the FQDN
1444     logger.Info("setting master to %s, old master: %s" %
1445                 (self.new_master, self.old_master))
1446
1447     if not rpc.call_node_stop_master(self.old_master):
1448       logger.Error("could disable the master role on the old master"
1449                    " %s, please disable manually" % self.old_master)
1450
1451     ss = self.sstore
1452     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1453     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1454                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1455       logger.Error("could not distribute the new simple store master file"
1456                    " to the other nodes, please check.")
1457
1458     if not rpc.call_node_start_master(self.new_master):
1459       logger.Error("could not start the master role on the new master"
1460                    " %s, please check" % self.new_master)
1461       feedback_fn("Error in activating the master IP on the new master,\n"
1462                   "please fix manually.")
1463
1464
1465
1466 class LUQueryClusterInfo(NoHooksLU):
1467   """Query cluster configuration.
1468
1469   """
1470   _OP_REQP = []
1471   REQ_MASTER = False
1472
1473   def CheckPrereq(self):
1474     """No prerequsites needed for this LU.
1475
1476     """
1477     pass
1478
1479   def Exec(self, feedback_fn):
1480     """Return cluster config.
1481
1482     """
1483     result = {
1484       "name": self.sstore.GetClusterName(),
1485       "software_version": constants.RELEASE_VERSION,
1486       "protocol_version": constants.PROTOCOL_VERSION,
1487       "config_version": constants.CONFIG_VERSION,
1488       "os_api_version": constants.OS_API_VERSION,
1489       "export_version": constants.EXPORT_VERSION,
1490       "master": self.sstore.GetMasterNode(),
1491       "architecture": (platform.architecture()[0], platform.machine()),
1492       }
1493
1494     return result
1495
1496
1497 class LUClusterCopyFile(NoHooksLU):
1498   """Copy file to cluster.
1499
1500   """
1501   _OP_REQP = ["nodes", "filename"]
1502
1503   def CheckPrereq(self):
1504     """Check prerequisites.
1505
1506     It should check that the named file exists and that the given list
1507     of nodes is valid.
1508
1509     """
1510     if not os.path.exists(self.op.filename):
1511       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1512
1513     self.nodes = _GetWantedNodes(self, self.op.nodes)
1514
1515   def Exec(self, feedback_fn):
1516     """Copy a file from master to some nodes.
1517
1518     Args:
1519       opts - class with options as members
1520       args - list containing a single element, the file name
1521     Opts used:
1522       nodes - list containing the name of target nodes; if empty, all nodes
1523
1524     """
1525     filename = self.op.filename
1526
1527     myname = socket.gethostname()
1528
1529     for node in self.nodes:
1530       if node == myname:
1531         continue
1532       if not ssh.CopyFileToNode(node, filename):
1533         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1534
1535
1536 class LUDumpClusterConfig(NoHooksLU):
1537   """Return a text-representation of the cluster-config.
1538
1539   """
1540   _OP_REQP = []
1541
1542   def CheckPrereq(self):
1543     """No prerequisites.
1544
1545     """
1546     pass
1547
1548   def Exec(self, feedback_fn):
1549     """Dump a representation of the cluster config to the standard output.
1550
1551     """
1552     return self.cfg.DumpConfig()
1553
1554
1555 class LURunClusterCommand(NoHooksLU):
1556   """Run a command on some nodes.
1557
1558   """
1559   _OP_REQP = ["command", "nodes"]
1560
1561   def CheckPrereq(self):
1562     """Check prerequisites.
1563
1564     It checks that the given list of nodes is valid.
1565
1566     """
1567     self.nodes = _GetWantedNodes(self, self.op.nodes)
1568
1569   def Exec(self, feedback_fn):
1570     """Run a command on some nodes.
1571
1572     """
1573     data = []
1574     for node in self.nodes:
1575       result = utils.RunCmd(["ssh", node.name, self.op.command])
1576       data.append((node.name, result.cmd, result.output, result.exit_code))
1577
1578     return data
1579
1580
1581 class LUActivateInstanceDisks(NoHooksLU):
1582   """Bring up an instance's disks.
1583
1584   """
1585   _OP_REQP = ["instance_name"]
1586
1587   def CheckPrereq(self):
1588     """Check prerequisites.
1589
1590     This checks that the instance is in the cluster.
1591
1592     """
1593     instance = self.cfg.GetInstanceInfo(
1594       self.cfg.ExpandInstanceName(self.op.instance_name))
1595     if instance is None:
1596       raise errors.OpPrereqError, ("Instance '%s' not known" %
1597                                    self.op.instance_name)
1598     self.instance = instance
1599
1600
1601   def Exec(self, feedback_fn):
1602     """Activate the disks.
1603
1604     """
1605     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1606     if not disks_ok:
1607       raise errors.OpExecError, ("Cannot activate block devices")
1608
1609     return disks_info
1610
1611
1612 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1613   """Prepare the block devices for an instance.
1614
1615   This sets up the block devices on all nodes.
1616
1617   Args:
1618     instance: a ganeti.objects.Instance object
1619     ignore_secondaries: if true, errors on secondary nodes won't result
1620                         in an error return from the function
1621
1622   Returns:
1623     false if the operation failed
1624     list of (host, instance_visible_name, node_visible_name) if the operation
1625          suceeded with the mapping from node devices to instance devices
1626   """
1627   device_info = []
1628   disks_ok = True
1629   for inst_disk in instance.disks:
1630     master_result = None
1631     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1632       cfg.SetDiskID(node_disk, node)
1633       is_primary = node == instance.primary_node
1634       result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1635       if not result:
1636         logger.Error("could not prepare block device %s on node %s (is_pri"
1637                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1638         if is_primary or not ignore_secondaries:
1639           disks_ok = False
1640       if is_primary:
1641         master_result = result
1642     device_info.append((instance.primary_node, inst_disk.iv_name,
1643                         master_result))
1644
1645   return disks_ok, device_info
1646
1647
1648 def _StartInstanceDisks(cfg, instance, force):
1649   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1650                                            ignore_secondaries=force)
1651   if not disks_ok:
1652     _ShutdownInstanceDisks(instance, cfg)
1653     if force is not None and not force:
1654       logger.Error("If the message above refers to a secondary node,"
1655                    " you can retry the operation using '--force'.")
1656     raise errors.OpExecError, ("Disk consistency error")
1657
1658
1659 class LUDeactivateInstanceDisks(NoHooksLU):
1660   """Shutdown an instance's disks.
1661
1662   """
1663   _OP_REQP = ["instance_name"]
1664
1665   def CheckPrereq(self):
1666     """Check prerequisites.
1667
1668     This checks that the instance is in the cluster.
1669
1670     """
1671     instance = self.cfg.GetInstanceInfo(
1672       self.cfg.ExpandInstanceName(self.op.instance_name))
1673     if instance is None:
1674       raise errors.OpPrereqError, ("Instance '%s' not known" %
1675                                    self.op.instance_name)
1676     self.instance = instance
1677
1678   def Exec(self, feedback_fn):
1679     """Deactivate the disks
1680
1681     """
1682     instance = self.instance
1683     ins_l = rpc.call_instance_list([instance.primary_node])
1684     ins_l = ins_l[instance.primary_node]
1685     if not type(ins_l) is list:
1686       raise errors.OpExecError, ("Can't contact node '%s'" %
1687                                  instance.primary_node)
1688
1689     if self.instance.name in ins_l:
1690       raise errors.OpExecError, ("Instance is running, can't shutdown"
1691                                  " block devices.")
1692
1693     _ShutdownInstanceDisks(instance, self.cfg)
1694
1695
1696 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1697   """Shutdown block devices of an instance.
1698
1699   This does the shutdown on all nodes of the instance.
1700
1701   If the ignore_primary is false, errors on the primary node are
1702   ignored.
1703
1704   """
1705   result = True
1706   for disk in instance.disks:
1707     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1708       cfg.SetDiskID(top_disk, node)
1709       if not rpc.call_blockdev_shutdown(node, top_disk):
1710         logger.Error("could not shutdown block device %s on node %s" %
1711                      (disk.iv_name, node))
1712         if not ignore_primary or node != instance.primary_node:
1713           result = False
1714   return result
1715
1716
1717 class LUStartupInstance(LogicalUnit):
1718   """Starts an instance.
1719
1720   """
1721   HPATH = "instance-start"
1722   HTYPE = constants.HTYPE_INSTANCE
1723   _OP_REQP = ["instance_name", "force"]
1724
1725   def BuildHooksEnv(self):
1726     """Build hooks env.
1727
1728     This runs on master, primary and secondary nodes of the instance.
1729
1730     """
1731     env = {
1732       "INSTANCE_NAME": self.op.instance_name,
1733       "INSTANCE_PRIMARY": self.instance.primary_node,
1734       "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1735       "FORCE": self.op.force,
1736       }
1737     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1738           list(self.instance.secondary_nodes))
1739     return env, nl, nl
1740
1741   def CheckPrereq(self):
1742     """Check prerequisites.
1743
1744     This checks that the instance is in the cluster.
1745
1746     """
1747     instance = self.cfg.GetInstanceInfo(
1748       self.cfg.ExpandInstanceName(self.op.instance_name))
1749     if instance is None:
1750       raise errors.OpPrereqError, ("Instance '%s' not known" %
1751                                    self.op.instance_name)
1752
1753     # check bridges existance
1754     brlist = [nic.bridge for nic in instance.nics]
1755     if not rpc.call_bridges_exist(instance.primary_node, brlist):
1756       raise errors.OpPrereqError, ("one or more target bridges %s does not"
1757                                    " exist on destination node '%s'" %
1758                                    (brlist, instance.primary_node))
1759
1760     self.instance = instance
1761     self.op.instance_name = instance.name
1762
1763   def Exec(self, feedback_fn):
1764     """Start the instance.
1765
1766     """
1767     instance = self.instance
1768     force = self.op.force
1769     extra_args = getattr(self.op, "extra_args", "")
1770
1771     node_current = instance.primary_node
1772
1773     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1774     if not nodeinfo:
1775       raise errors.OpExecError, ("Could not contact node %s for infos" %
1776                                  (node_current))
1777
1778     freememory = nodeinfo[node_current]['memory_free']
1779     memory = instance.memory
1780     if memory > freememory:
1781       raise errors.OpExecError, ("Not enough memory to start instance"
1782                                  " %s on node %s"
1783                                  " needed %s MiB, available %s MiB" %
1784                                  (instance.name, node_current, memory,
1785                                   freememory))
1786
1787     _StartInstanceDisks(self.cfg, instance, force)
1788
1789     if not rpc.call_instance_start(node_current, instance, extra_args):
1790       _ShutdownInstanceDisks(instance, self.cfg)
1791       raise errors.OpExecError, ("Could not start instance")
1792
1793     self.cfg.MarkInstanceUp(instance.name)
1794
1795
1796 class LUShutdownInstance(LogicalUnit):
1797   """Shutdown an instance.
1798
1799   """
1800   HPATH = "instance-stop"
1801   HTYPE = constants.HTYPE_INSTANCE
1802   _OP_REQP = ["instance_name"]
1803
1804   def BuildHooksEnv(self):
1805     """Build hooks env.
1806
1807     This runs on master, primary and secondary nodes of the instance.
1808
1809     """
1810     env = {
1811       "INSTANCE_NAME": self.op.instance_name,
1812       "INSTANCE_PRIMARY": self.instance.primary_node,
1813       "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1814       }
1815     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1816           list(self.instance.secondary_nodes))
1817     return env, nl, nl
1818
1819   def CheckPrereq(self):
1820     """Check prerequisites.
1821
1822     This checks that the instance is in the cluster.
1823
1824     """
1825     instance = self.cfg.GetInstanceInfo(
1826       self.cfg.ExpandInstanceName(self.op.instance_name))
1827     if instance is None:
1828       raise errors.OpPrereqError, ("Instance '%s' not known" %
1829                                    self.op.instance_name)
1830     self.instance = instance
1831
1832   def Exec(self, feedback_fn):
1833     """Shutdown the instance.
1834
1835     """
1836     instance = self.instance
1837     node_current = instance.primary_node
1838     if not rpc.call_instance_shutdown(node_current, instance):
1839       logger.Error("could not shutdown instance")
1840
1841     self.cfg.MarkInstanceDown(instance.name)
1842     _ShutdownInstanceDisks(instance, self.cfg)
1843
1844
1845 class LUReinstallInstance(LogicalUnit):
1846   """Reinstall an instance.
1847
1848   """
1849   HPATH = "instance-reinstall"
1850   HTYPE = constants.HTYPE_INSTANCE
1851   _OP_REQP = ["instance_name"]
1852
1853   def BuildHooksEnv(self):
1854     """Build hooks env.
1855
1856     This runs on master, primary and secondary nodes of the instance.
1857
1858     """
1859     env = {
1860       "INSTANCE_NAME": self.op.instance_name,
1861       "INSTANCE_PRIMARY": self.instance.primary_node,
1862       "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1863       }
1864     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1865           list(self.instance.secondary_nodes))
1866     return env, nl, nl
1867
1868   def CheckPrereq(self):
1869     """Check prerequisites.
1870
1871     This checks that the instance is in the cluster and is not running.
1872
1873     """
1874     instance = self.cfg.GetInstanceInfo(
1875       self.cfg.ExpandInstanceName(self.op.instance_name))
1876     if instance is None:
1877       raise errors.OpPrereqError, ("Instance '%s' not known" %
1878                                    self.op.instance_name)
1879     if instance.disk_template == constants.DT_DISKLESS:
1880       raise errors.OpPrereqError, ("Instance '%s' has no disks" %
1881                                    self.op.instance_name)
1882     if instance.status != "down":
1883       raise errors.OpPrereqError, ("Instance '%s' is marked to be up" %
1884                                    self.op.instance_name)
1885     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1886     if remote_info:
1887       raise errors.OpPrereqError, ("Instance '%s' is running on the node %s" %
1888                                    (self.op.instance_name,
1889                                     instance.primary_node))
1890
1891     self.op.os_type = getattr(self.op, "os_type", None)
1892     if self.op.os_type is not None:
1893       # OS verification
1894       pnode = self.cfg.GetNodeInfo(
1895         self.cfg.ExpandNodeName(instance.primary_node))
1896       if pnode is None:
1897         raise errors.OpPrereqError, ("Primary node '%s' is unknown" %
1898                                      self.op.pnode)
1899       os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
1900       if not isinstance(os_obj, objects.OS):
1901         raise errors.OpPrereqError, ("OS '%s' not in supported OS list for"
1902                                      " primary node"  % self.op.os_type)
1903
1904     self.instance = instance
1905
1906   def Exec(self, feedback_fn):
1907     """Reinstall the instance.
1908
1909     """
1910     inst = self.instance
1911
1912     if self.op.os_type is not None:
1913       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
1914       inst.os = self.op.os_type
1915       self.cfg.AddInstance(inst)
1916
1917     _StartInstanceDisks(self.cfg, inst, None)
1918     try:
1919       feedback_fn("Running the instance OS create scripts...")
1920       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
1921         raise errors.OpExecError, ("Could not install OS for instance %s "
1922                                    "on node %s" %
1923                                    (inst.name, inst.primary_node))
1924     finally:
1925       _ShutdownInstanceDisks(inst, self.cfg)
1926
1927
1928 class LURemoveInstance(LogicalUnit):
1929   """Remove an instance.
1930
1931   """
1932   HPATH = "instance-remove"
1933   HTYPE = constants.HTYPE_INSTANCE
1934   _OP_REQP = ["instance_name"]
1935
1936   def BuildHooksEnv(self):
1937     """Build hooks env.
1938
1939     This runs on master, primary and secondary nodes of the instance.
1940
1941     """
1942     env = {
1943       "INSTANCE_NAME": self.op.instance_name,
1944       "INSTANCE_PRIMARY": self.instance.primary_node,
1945       "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1946       }
1947     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1948           list(self.instance.secondary_nodes))
1949     return env, nl, nl
1950
1951   def CheckPrereq(self):
1952     """Check prerequisites.
1953
1954     This checks that the instance is in the cluster.
1955
1956     """
1957     instance = self.cfg.GetInstanceInfo(
1958       self.cfg.ExpandInstanceName(self.op.instance_name))
1959     if instance is None:
1960       raise errors.OpPrereqError, ("Instance '%s' not known" %
1961                                    self.op.instance_name)
1962     self.instance = instance
1963
1964   def Exec(self, feedback_fn):
1965     """Remove the instance.
1966
1967     """
1968     instance = self.instance
1969     logger.Info("shutting down instance %s on node %s" %
1970                 (instance.name, instance.primary_node))
1971
1972     if not rpc.call_instance_shutdown(instance.primary_node, instance):
1973       raise errors.OpExecError, ("Could not shutdown instance %s on node %s" %
1974                                  (instance.name, instance.primary_node))
1975
1976     logger.Info("removing block devices for instance %s" % instance.name)
1977
1978     _RemoveDisks(instance, self.cfg)
1979
1980     logger.Info("removing instance %s out of cluster config" % instance.name)
1981
1982     self.cfg.RemoveInstance(instance.name)
1983
1984
1985 class LUQueryInstances(NoHooksLU):
1986   """Logical unit for querying instances.
1987
1988   """
1989   _OP_REQP = ["output_fields"]
1990
1991   def CheckPrereq(self):
1992     """Check prerequisites.
1993
1994     This checks that the fields required are valid output fields.
1995
1996     """
1997     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
1998     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
1999                                "admin_state", "admin_ram",
2000                                "disk_template", "ip", "mac", "bridge"],
2001                        dynamic=self.dynamic_fields,
2002                        selected=self.op.output_fields)
2003
2004   def Exec(self, feedback_fn):
2005     """Computes the list of nodes and their attributes.
2006
2007     """
2008     instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2009     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2010                      in instance_names]
2011
2012     # begin data gathering
2013
2014     nodes = frozenset([inst.primary_node for inst in instance_list])
2015
2016     bad_nodes = []
2017     if self.dynamic_fields.intersection(self.op.output_fields):
2018       live_data = {}
2019       node_data = rpc.call_all_instances_info(nodes)
2020       for name in nodes:
2021         result = node_data[name]
2022         if result:
2023           live_data.update(result)
2024         elif result == False:
2025           bad_nodes.append(name)
2026         # else no instance is alive
2027     else:
2028       live_data = dict([(name, {}) for name in instance_names])
2029
2030     # end data gathering
2031
2032     output = []
2033     for instance in instance_list:
2034       iout = []
2035       for field in self.op.output_fields:
2036         if field == "name":
2037           val = instance.name
2038         elif field == "os":
2039           val = instance.os
2040         elif field == "pnode":
2041           val = instance.primary_node
2042         elif field == "snodes":
2043           val = ",".join(instance.secondary_nodes) or "-"
2044         elif field == "admin_state":
2045           if instance.status == "down":
2046             val = "no"
2047           else:
2048             val = "yes"
2049         elif field == "oper_state":
2050           if instance.primary_node in bad_nodes:
2051             val = "(node down)"
2052           else:
2053             if live_data.get(instance.name):
2054               val = "running"
2055             else:
2056               val = "stopped"
2057         elif field == "admin_ram":
2058           val = instance.memory
2059         elif field == "oper_ram":
2060           if instance.primary_node in bad_nodes:
2061             val = "(node down)"
2062           elif instance.name in live_data:
2063             val = live_data[instance.name].get("memory", "?")
2064           else:
2065             val = "-"
2066         elif field == "disk_template":
2067           val = instance.disk_template
2068         elif field == "ip":
2069           val = instance.nics[0].ip
2070         elif field == "bridge":
2071           val = instance.nics[0].bridge
2072         elif field == "mac":
2073           val = instance.nics[0].mac
2074         else:
2075           raise errors.ParameterError, field
2076         val = str(val)
2077         iout.append(val)
2078       output.append(iout)
2079
2080     return output
2081
2082
2083 class LUFailoverInstance(LogicalUnit):
2084   """Failover an instance.
2085
2086   """
2087   HPATH = "instance-failover"
2088   HTYPE = constants.HTYPE_INSTANCE
2089   _OP_REQP = ["instance_name", "ignore_consistency"]
2090
2091   def BuildHooksEnv(self):
2092     """Build hooks env.
2093
2094     This runs on master, primary and secondary nodes of the instance.
2095
2096     """
2097     env = {
2098       "INSTANCE_NAME": self.op.instance_name,
2099       "INSTANCE_PRIMARY": self.instance.primary_node,
2100       "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
2101       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2102       }
2103     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2104     return env, nl, nl
2105
2106   def CheckPrereq(self):
2107     """Check prerequisites.
2108
2109     This checks that the instance is in the cluster.
2110
2111     """
2112     instance = self.cfg.GetInstanceInfo(
2113       self.cfg.ExpandInstanceName(self.op.instance_name))
2114     if instance is None:
2115       raise errors.OpPrereqError, ("Instance '%s' not known" %
2116                                    self.op.instance_name)
2117
2118     # check memory requirements on the secondary node
2119     target_node = instance.secondary_nodes[0]
2120     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2121     info = nodeinfo.get(target_node, None)
2122     if not info:
2123       raise errors.OpPrereqError, ("Cannot get current information"
2124                                    " from node '%s'" % nodeinfo)
2125     if instance.memory > info['memory_free']:
2126       raise errors.OpPrereqError, ("Not enough memory on target node %s."
2127                                    " %d MB available, %d MB required" %
2128                                    (target_node, info['memory_free'],
2129                                     instance.memory))
2130
2131     # check bridge existance
2132     brlist = [nic.bridge for nic in instance.nics]
2133     if not rpc.call_bridges_exist(instance.primary_node, brlist):
2134       raise errors.OpPrereqError, ("one or more target bridges %s does not"
2135                                    " exist on destination node '%s'" %
2136                                    (brlist, instance.primary_node))
2137
2138     self.instance = instance
2139
2140   def Exec(self, feedback_fn):
2141     """Failover an instance.
2142
2143     The failover is done by shutting it down on its present node and
2144     starting it on the secondary.
2145
2146     """
2147     instance = self.instance
2148
2149     source_node = instance.primary_node
2150     target_node = instance.secondary_nodes[0]
2151
2152     feedback_fn("* checking disk consistency between source and target")
2153     for dev in instance.disks:
2154       # for remote_raid1, these are md over drbd
2155       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2156         if not self.op.ignore_consistency:
2157           raise errors.OpExecError, ("Disk %s is degraded on target node,"
2158                                      " aborting failover." % dev.iv_name)
2159
2160     feedback_fn("* checking target node resource availability")
2161     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2162
2163     if not nodeinfo:
2164       raise errors.OpExecError, ("Could not contact target node %s." %
2165                                  target_node)
2166
2167     free_memory = int(nodeinfo[target_node]['memory_free'])
2168     memory = instance.memory
2169     if memory > free_memory:
2170       raise errors.OpExecError, ("Not enough memory to create instance %s on"
2171                                  " node %s. needed %s MiB, available %s MiB" %
2172                                  (instance.name, target_node, memory,
2173                                   free_memory))
2174
2175     feedback_fn("* shutting down instance on source node")
2176     logger.Info("Shutting down instance %s on node %s" %
2177                 (instance.name, source_node))
2178
2179     if not rpc.call_instance_shutdown(source_node, instance):
2180       logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2181                    " anyway. Please make sure node %s is down"  %
2182                    (instance.name, source_node, source_node))
2183
2184     feedback_fn("* deactivating the instance's disks on source node")
2185     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2186       raise errors.OpExecError, ("Can't shut down the instance's disks.")
2187
2188     instance.primary_node = target_node
2189     # distribute new instance config to the other nodes
2190     self.cfg.AddInstance(instance)
2191
2192     feedback_fn("* activating the instance's disks on target node")
2193     logger.Info("Starting instance %s on node %s" %
2194                 (instance.name, target_node))
2195
2196     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2197                                              ignore_secondaries=True)
2198     if not disks_ok:
2199       _ShutdownInstanceDisks(instance, self.cfg)
2200       raise errors.OpExecError, ("Can't activate the instance's disks")
2201
2202     feedback_fn("* starting the instance on the target node")
2203     if not rpc.call_instance_start(target_node, instance, None):
2204       _ShutdownInstanceDisks(instance, self.cfg)
2205       raise errors.OpExecError("Could not start instance %s on node %s." %
2206                                (instance.name, target_node))
2207
2208
2209 def _CreateBlockDevOnPrimary(cfg, node, device):
2210   """Create a tree of block devices on the primary node.
2211
2212   This always creates all devices.
2213
2214   """
2215   if device.children:
2216     for child in device.children:
2217       if not _CreateBlockDevOnPrimary(cfg, node, child):
2218         return False
2219
2220   cfg.SetDiskID(device, node)
2221   new_id = rpc.call_blockdev_create(node, device, device.size, True)
2222   if not new_id:
2223     return False
2224   if device.physical_id is None:
2225     device.physical_id = new_id
2226   return True
2227
2228
2229 def _CreateBlockDevOnSecondary(cfg, node, device, force):
2230   """Create a tree of block devices on a secondary node.
2231
2232   If this device type has to be created on secondaries, create it and
2233   all its children.
2234
2235   If not, just recurse to children keeping the same 'force' value.
2236
2237   """
2238   if device.CreateOnSecondary():
2239     force = True
2240   if device.children:
2241     for child in device.children:
2242       if not _CreateBlockDevOnSecondary(cfg, node, child, force):
2243         return False
2244
2245   if not force:
2246     return True
2247   cfg.SetDiskID(device, node)
2248   new_id = rpc.call_blockdev_create(node, device, device.size, False)
2249   if not new_id:
2250     return False
2251   if device.physical_id is None:
2252     device.physical_id = new_id
2253   return True
2254
2255
2256 def _GenerateMDDRBDBranch(cfg, vgname, primary, secondary, size, base):
2257   """Generate a drbd device complete with its children.
2258
2259   """
2260   port = cfg.AllocatePort()
2261   base = "%s_%s" % (base, port)
2262   dev_data = objects.Disk(dev_type="lvm", size=size,
2263                           logical_id=(vgname, "%s.data" % base))
2264   dev_meta = objects.Disk(dev_type="lvm", size=128,
2265                           logical_id=(vgname, "%s.meta" % base))
2266   drbd_dev = objects.Disk(dev_type="drbd", size=size,
2267                           logical_id = (primary, secondary, port),
2268                           children = [dev_data, dev_meta])
2269   return drbd_dev
2270
2271
2272 def _GenerateDiskTemplate(cfg, vgname, template_name,
2273                           instance_name, primary_node,
2274                           secondary_nodes, disk_sz, swap_sz):
2275   """Generate the entire disk layout for a given template type.
2276
2277   """
2278   #TODO: compute space requirements
2279
2280   if template_name == "diskless":
2281     disks = []
2282   elif template_name == "plain":
2283     if len(secondary_nodes) != 0:
2284       raise errors.ProgrammerError("Wrong template configuration")
2285     sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2286                            logical_id=(vgname, "%s.os" % instance_name),
2287                            iv_name = "sda")
2288     sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2289                            logical_id=(vgname, "%s.swap" % instance_name),
2290                            iv_name = "sdb")
2291     disks = [sda_dev, sdb_dev]
2292   elif template_name == "local_raid1":
2293     if len(secondary_nodes) != 0:
2294       raise errors.ProgrammerError("Wrong template configuration")
2295     sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2296                               logical_id=(vgname, "%s.os_m1" % instance_name))
2297     sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2298                               logical_id=(vgname, "%s.os_m2" % instance_name))
2299     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2300                               size=disk_sz,
2301                               children = [sda_dev_m1, sda_dev_m2])
2302     sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2303                               logical_id=(vgname, "%s.swap_m1" %
2304                                           instance_name))
2305     sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2306                               logical_id=(vgname, "%s.swap_m2" %
2307                                           instance_name))
2308     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2309                               size=swap_sz,
2310                               children = [sdb_dev_m1, sdb_dev_m2])
2311     disks = [md_sda_dev, md_sdb_dev]
2312   elif template_name == "remote_raid1":
2313     if len(secondary_nodes) != 1:
2314       raise errors.ProgrammerError("Wrong template configuration")
2315     remote_node = secondary_nodes[0]
2316     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, vgname,
2317                                          primary_node, remote_node, disk_sz,
2318                                          "%s-sda" % instance_name)
2319     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2320                               children = [drbd_sda_dev], size=disk_sz)
2321     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, vgname,
2322                                          primary_node, remote_node, swap_sz,
2323                                          "%s-sdb" % instance_name)
2324     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2325                               children = [drbd_sdb_dev], size=swap_sz)
2326     disks = [md_sda_dev, md_sdb_dev]
2327   else:
2328     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2329   return disks
2330
2331
2332 def _CreateDisks(cfg, instance):
2333   """Create all disks for an instance.
2334
2335   This abstracts away some work from AddInstance.
2336
2337   Args:
2338     instance: the instance object
2339
2340   Returns:
2341     True or False showing the success of the creation process
2342
2343   """
2344   for device in instance.disks:
2345     logger.Info("creating volume %s for instance %s" %
2346               (device.iv_name, instance.name))
2347     #HARDCODE
2348     for secondary_node in instance.secondary_nodes:
2349       if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False):
2350         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2351                      (device.iv_name, device, secondary_node))
2352         return False
2353     #HARDCODE
2354     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device):
2355       logger.Error("failed to create volume %s on primary!" %
2356                    device.iv_name)
2357       return False
2358   return True
2359
2360
2361 def _RemoveDisks(instance, cfg):
2362   """Remove all disks for an instance.
2363
2364   This abstracts away some work from `AddInstance()` and
2365   `RemoveInstance()`. Note that in case some of the devices couldn't
2366   be remove, the removal will continue with the other ones (compare
2367   with `_CreateDisks()`).
2368
2369   Args:
2370     instance: the instance object
2371
2372   Returns:
2373     True or False showing the success of the removal proces
2374
2375   """
2376   logger.Info("removing block devices for instance %s" % instance.name)
2377
2378   result = True
2379   for device in instance.disks:
2380     for node, disk in device.ComputeNodeTree(instance.primary_node):
2381       cfg.SetDiskID(disk, node)
2382       if not rpc.call_blockdev_remove(node, disk):
2383         logger.Error("could not remove block device %s on node %s,"
2384                      " continuing anyway" %
2385                      (device.iv_name, node))
2386         result = False
2387   return result
2388
2389
2390 class LUCreateInstance(LogicalUnit):
2391   """Create an instance.
2392
2393   """
2394   HPATH = "instance-add"
2395   HTYPE = constants.HTYPE_INSTANCE
2396   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2397               "disk_template", "swap_size", "mode", "start", "vcpus",
2398               "wait_for_sync"]
2399
2400   def BuildHooksEnv(self):
2401     """Build hooks env.
2402
2403     This runs on master, primary and secondary nodes of the instance.
2404
2405     """
2406     env = {
2407       "INSTANCE_NAME": self.op.instance_name,
2408       "INSTANCE_PRIMARY": self.op.pnode,
2409       "INSTANCE_SECONDARIES": " ".join(self.secondaries),
2410       "DISK_TEMPLATE": self.op.disk_template,
2411       "MEM_SIZE": self.op.mem_size,
2412       "DISK_SIZE": self.op.disk_size,
2413       "SWAP_SIZE": self.op.swap_size,
2414       "VCPUS": self.op.vcpus,
2415       "BRIDGE": self.op.bridge,
2416       "INSTANCE_ADD_MODE": self.op.mode,
2417       }
2418     if self.op.mode == constants.INSTANCE_IMPORT:
2419       env["SRC_NODE"] = self.op.src_node
2420       env["SRC_PATH"] = self.op.src_path
2421       env["SRC_IMAGE"] = self.src_image
2422     if self.inst_ip:
2423       env["INSTANCE_IP"] = self.inst_ip
2424
2425     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2426           self.secondaries)
2427     return env, nl, nl
2428
2429
2430   def CheckPrereq(self):
2431     """Check prerequisites.
2432
2433     """
2434     if self.op.mode not in (constants.INSTANCE_CREATE,
2435                             constants.INSTANCE_IMPORT):
2436       raise errors.OpPrereqError, ("Invalid instance creation mode '%s'" %
2437                                    self.op.mode)
2438
2439     if self.op.mode == constants.INSTANCE_IMPORT:
2440       src_node = getattr(self.op, "src_node", None)
2441       src_path = getattr(self.op, "src_path", None)
2442       if src_node is None or src_path is None:
2443         raise errors.OpPrereqError, ("Importing an instance requires source"
2444                                      " node and path options")
2445       src_node_full = self.cfg.ExpandNodeName(src_node)
2446       if src_node_full is None:
2447         raise errors.OpPrereqError, ("Unknown source node '%s'" % src_node)
2448       self.op.src_node = src_node = src_node_full
2449
2450       if not os.path.isabs(src_path):
2451         raise errors.OpPrereqError, ("The source path must be absolute")
2452
2453       export_info = rpc.call_export_info(src_node, src_path)
2454
2455       if not export_info:
2456         raise errors.OpPrereqError, ("No export found in dir %s" % src_path)
2457
2458       if not export_info.has_section(constants.INISECT_EXP):
2459         raise errors.ProgrammerError, ("Corrupted export config")
2460
2461       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2462       if (int(ei_version) != constants.EXPORT_VERSION):
2463         raise errors.OpPrereqError, ("Wrong export version %s (wanted %d)" %
2464                                      (ei_version, constants.EXPORT_VERSION))
2465
2466       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2467         raise errors.OpPrereqError, ("Can't import instance with more than"
2468                                      " one data disk")
2469
2470       # FIXME: are the old os-es, disk sizes, etc. useful?
2471       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2472       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2473                                                          'disk0_dump'))
2474       self.src_image = diskimage
2475     else: # INSTANCE_CREATE
2476       if getattr(self.op, "os_type", None) is None:
2477         raise errors.OpPrereqError, ("No guest OS specified")
2478
2479     # check primary node
2480     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2481     if pnode is None:
2482       raise errors.OpPrereqError, ("Primary node '%s' is unknown" %
2483                                    self.op.pnode)
2484     self.op.pnode = pnode.name
2485     self.pnode = pnode
2486     self.secondaries = []
2487     # disk template and mirror node verification
2488     if self.op.disk_template not in constants.DISK_TEMPLATES:
2489       raise errors.OpPrereqError, ("Invalid disk template name")
2490
2491     if self.op.disk_template == constants.DT_REMOTE_RAID1:
2492       if getattr(self.op, "snode", None) is None:
2493         raise errors.OpPrereqError, ("The 'remote_raid1' disk template needs"
2494                                      " a mirror node")
2495
2496       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2497       if snode_name is None:
2498         raise errors.OpPrereqError, ("Unknown secondary node '%s'" %
2499                                      self.op.snode)
2500       elif snode_name == pnode.name:
2501         raise errors.OpPrereqError, ("The secondary node cannot be"
2502                                      " the primary node.")
2503       self.secondaries.append(snode_name)
2504
2505     # Check lv size requirements
2506     nodenames = [pnode.name] + self.secondaries
2507     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2508
2509     # Required free disk space as a function of disk and swap space
2510     req_size_dict = {
2511       constants.DT_DISKLESS: 0,
2512       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2513       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2514       # 256 MB are added for drbd metadata, 128MB for each drbd device
2515       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2516     }
2517
2518     if self.op.disk_template not in req_size_dict:
2519       raise errors.ProgrammerError, ("Disk template '%s' size requirement"
2520                                      " is unknown" %  self.op.disk_template)
2521
2522     req_size = req_size_dict[self.op.disk_template]
2523
2524     for node in nodenames:
2525       info = nodeinfo.get(node, None)
2526       if not info:
2527         raise errors.OpPrereqError, ("Cannot get current information"
2528                                      " from node '%s'" % nodeinfo)
2529       if req_size > info['vg_free']:
2530         raise errors.OpPrereqError, ("Not enough disk space on target node %s."
2531                                      " %d MB available, %d MB required" %
2532                                      (node, info['vg_free'], req_size))
2533
2534     # os verification
2535     os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2536     if not isinstance(os_obj, objects.OS):
2537       raise errors.OpPrereqError, ("OS '%s' not in supported os list for"
2538                                    " primary node"  % self.op.os_type)
2539
2540     # instance verification
2541     hostname1 = utils.LookupHostname(self.op.instance_name)
2542     if not hostname1:
2543       raise errors.OpPrereqError, ("Instance name '%s' not found in dns" %
2544                                    self.op.instance_name)
2545
2546     self.op.instance_name = instance_name = hostname1['hostname']
2547     instance_list = self.cfg.GetInstanceList()
2548     if instance_name in instance_list:
2549       raise errors.OpPrereqError, ("Instance '%s' is already in the cluster" %
2550                                    instance_name)
2551
2552     ip = getattr(self.op, "ip", None)
2553     if ip is None or ip.lower() == "none":
2554       inst_ip = None
2555     elif ip.lower() == "auto":
2556       inst_ip = hostname1['ip']
2557     else:
2558       if not utils.IsValidIP(ip):
2559         raise errors.OpPrereqError, ("given IP address '%s' doesn't look"
2560                                      " like a valid IP" % ip)
2561       inst_ip = ip
2562     self.inst_ip = inst_ip
2563
2564     command = ["fping", "-q", hostname1['ip']]
2565     result = utils.RunCmd(command)
2566     if not result.failed:
2567       raise errors.OpPrereqError, ("IP %s of instance %s already in use" %
2568                                    (hostname1['ip'], instance_name))
2569
2570     # bridge verification
2571     bridge = getattr(self.op, "bridge", None)
2572     if bridge is None:
2573       self.op.bridge = self.cfg.GetDefBridge()
2574     else:
2575       self.op.bridge = bridge
2576
2577     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2578       raise errors.OpPrereqError, ("target bridge '%s' does not exist on"
2579                                    " destination node '%s'" %
2580                                    (self.op.bridge, pnode.name))
2581
2582     if self.op.start:
2583       self.instance_status = 'up'
2584     else:
2585       self.instance_status = 'down'
2586
2587   def Exec(self, feedback_fn):
2588     """Create and add the instance to the cluster.
2589
2590     """
2591     instance = self.op.instance_name
2592     pnode_name = self.pnode.name
2593
2594     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2595     if self.inst_ip is not None:
2596       nic.ip = self.inst_ip
2597
2598     disks = _GenerateDiskTemplate(self.cfg, self.cfg.GetVGName(),
2599                                   self.op.disk_template,
2600                                   instance, pnode_name,
2601                                   self.secondaries, self.op.disk_size,
2602                                   self.op.swap_size)
2603
2604     iobj = objects.Instance(name=instance, os=self.op.os_type,
2605                             primary_node=pnode_name,
2606                             memory=self.op.mem_size,
2607                             vcpus=self.op.vcpus,
2608                             nics=[nic], disks=disks,
2609                             disk_template=self.op.disk_template,
2610                             status=self.instance_status,
2611                             )
2612
2613     feedback_fn("* creating instance disks...")
2614     if not _CreateDisks(self.cfg, iobj):
2615       _RemoveDisks(iobj, self.cfg)
2616       raise errors.OpExecError, ("Device creation failed, reverting...")
2617
2618     feedback_fn("adding instance %s to cluster config" % instance)
2619
2620     self.cfg.AddInstance(iobj)
2621
2622     if self.op.wait_for_sync:
2623       disk_abort = not _WaitForSync(self.cfg, iobj)
2624     elif iobj.disk_template == "remote_raid1":
2625       # make sure the disks are not degraded (still sync-ing is ok)
2626       time.sleep(15)
2627       feedback_fn("* checking mirrors status")
2628       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2629     else:
2630       disk_abort = False
2631
2632     if disk_abort:
2633       _RemoveDisks(iobj, self.cfg)
2634       self.cfg.RemoveInstance(iobj.name)
2635       raise errors.OpExecError, ("There are some degraded disks for"
2636                                       " this instance")
2637
2638     feedback_fn("creating os for instance %s on node %s" %
2639                 (instance, pnode_name))
2640
2641     if iobj.disk_template != constants.DT_DISKLESS:
2642       if self.op.mode == constants.INSTANCE_CREATE:
2643         feedback_fn("* running the instance OS create scripts...")
2644         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2645           raise errors.OpExecError, ("could not add os for instance %s"
2646                                           " on node %s" %
2647                                           (instance, pnode_name))
2648
2649       elif self.op.mode == constants.INSTANCE_IMPORT:
2650         feedback_fn("* running the instance OS import scripts...")
2651         src_node = self.op.src_node
2652         src_image = self.src_image
2653         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2654                                                 src_node, src_image):
2655           raise errors.OpExecError, ("Could not import os for instance"
2656                                           " %s on node %s" %
2657                                           (instance, pnode_name))
2658       else:
2659         # also checked in the prereq part
2660         raise errors.ProgrammerError, ("Unknown OS initialization mode '%s'"
2661                                        % self.op.mode)
2662
2663     if self.op.start:
2664       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2665       feedback_fn("* starting instance...")
2666       if not rpc.call_instance_start(pnode_name, iobj, None):
2667         raise errors.OpExecError, ("Could not start instance")
2668
2669
2670 class LUConnectConsole(NoHooksLU):
2671   """Connect to an instance's console.
2672
2673   This is somewhat special in that it returns the command line that
2674   you need to run on the master node in order to connect to the
2675   console.
2676
2677   """
2678   _OP_REQP = ["instance_name"]
2679
2680   def CheckPrereq(self):
2681     """Check prerequisites.
2682
2683     This checks that the instance is in the cluster.
2684
2685     """
2686     instance = self.cfg.GetInstanceInfo(
2687       self.cfg.ExpandInstanceName(self.op.instance_name))
2688     if instance is None:
2689       raise errors.OpPrereqError, ("Instance '%s' not known" %
2690                                    self.op.instance_name)
2691     self.instance = instance
2692
2693   def Exec(self, feedback_fn):
2694     """Connect to the console of an instance
2695
2696     """
2697     instance = self.instance
2698     node = instance.primary_node
2699
2700     node_insts = rpc.call_instance_list([node])[node]
2701     if node_insts is False:
2702       raise errors.OpExecError, ("Can't connect to node %s." % node)
2703
2704     if instance.name not in node_insts:
2705       raise errors.OpExecError, ("Instance %s is not running." % instance.name)
2706
2707     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2708
2709     hyper = hypervisor.GetHypervisor()
2710     console_cmd = hyper.GetShellCommandForConsole(instance.name)
2711     return node, console_cmd
2712
2713
2714 class LUAddMDDRBDComponent(LogicalUnit):
2715   """Adda new mirror member to an instance's disk.
2716
2717   """
2718   HPATH = "mirror-add"
2719   HTYPE = constants.HTYPE_INSTANCE
2720   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2721
2722   def BuildHooksEnv(self):
2723     """Build hooks env.
2724
2725     This runs on the master, the primary and all the secondaries.
2726
2727     """
2728     env = {
2729       "INSTANCE_NAME": self.op.instance_name,
2730       "NEW_SECONDARY": self.op.remote_node,
2731       "DISK_NAME": self.op.disk_name,
2732       }
2733     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2734           self.op.remote_node,] + list(self.instance.secondary_nodes)
2735     return env, nl, nl
2736
2737   def CheckPrereq(self):
2738     """Check prerequisites.
2739
2740     This checks that the instance is in the cluster.
2741
2742     """
2743     instance = self.cfg.GetInstanceInfo(
2744       self.cfg.ExpandInstanceName(self.op.instance_name))
2745     if instance is None:
2746       raise errors.OpPrereqError, ("Instance '%s' not known" %
2747                                    self.op.instance_name)
2748     self.instance = instance
2749
2750     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2751     if remote_node is None:
2752       raise errors.OpPrereqError, ("Node '%s' not known" % self.op.remote_node)
2753     self.remote_node = remote_node
2754
2755     if remote_node == instance.primary_node:
2756       raise errors.OpPrereqError, ("The specified node is the primary node of"
2757                                    " the instance.")
2758
2759     if instance.disk_template != constants.DT_REMOTE_RAID1:
2760       raise errors.OpPrereqError, ("Instance's disk layout is not"
2761                                    " remote_raid1.")
2762     for disk in instance.disks:
2763       if disk.iv_name == self.op.disk_name:
2764         break
2765     else:
2766       raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2767                                    " instance." % self.op.disk_name)
2768     if len(disk.children) > 1:
2769       raise errors.OpPrereqError, ("The device already has two slave"
2770                                    " devices.\n"
2771                                    "This would create a 3-disk raid1"
2772                                    " which we don't allow.")
2773     self.disk = disk
2774
2775   def Exec(self, feedback_fn):
2776     """Add the mirror component
2777
2778     """
2779     disk = self.disk
2780     instance = self.instance
2781
2782     remote_node = self.remote_node
2783     new_drbd = _GenerateMDDRBDBranch(self.cfg, self.cfg.GetVGName(),
2784                                      instance.primary_node, remote_node,
2785                                      disk.size, "%s-%s" %
2786                                      (instance.name, self.op.disk_name))
2787
2788     logger.Info("adding new mirror component on secondary")
2789     #HARDCODE
2790     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False):
2791       raise errors.OpExecError, ("Failed to create new component on secondary"
2792                                  " node %s" % remote_node)
2793
2794     logger.Info("adding new mirror component on primary")
2795     #HARDCODE
2796     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd):
2797       # remove secondary dev
2798       self.cfg.SetDiskID(new_drbd, remote_node)
2799       rpc.call_blockdev_remove(remote_node, new_drbd)
2800       raise errors.OpExecError, ("Failed to create volume on primary")
2801
2802     # the device exists now
2803     # call the primary node to add the mirror to md
2804     logger.Info("adding new mirror component to md")
2805     if not rpc.call_blockdev_addchild(instance.primary_node,
2806                                            disk, new_drbd):
2807       logger.Error("Can't add mirror compoment to md!")
2808       self.cfg.SetDiskID(new_drbd, remote_node)
2809       if not rpc.call_blockdev_remove(remote_node, new_drbd):
2810         logger.Error("Can't rollback on secondary")
2811       self.cfg.SetDiskID(new_drbd, instance.primary_node)
2812       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2813         logger.Error("Can't rollback on primary")
2814       raise errors.OpExecError, "Can't add mirror component to md array"
2815
2816     disk.children.append(new_drbd)
2817
2818     self.cfg.AddInstance(instance)
2819
2820     _WaitForSync(self.cfg, instance)
2821
2822     return 0
2823
2824
2825 class LURemoveMDDRBDComponent(LogicalUnit):
2826   """Remove a component from a remote_raid1 disk.
2827
2828   """
2829   HPATH = "mirror-remove"
2830   HTYPE = constants.HTYPE_INSTANCE
2831   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2832
2833   def BuildHooksEnv(self):
2834     """Build hooks env.
2835
2836     This runs on the master, the primary and all the secondaries.
2837
2838     """
2839     env = {
2840       "INSTANCE_NAME": self.op.instance_name,
2841       "DISK_NAME": self.op.disk_name,
2842       "DISK_ID": self.op.disk_id,
2843       "OLD_SECONDARY": self.old_secondary,
2844       }
2845     nl = [self.sstore.GetMasterNode(),
2846           self.instance.primary_node] + list(self.instance.secondary_nodes)
2847     return env, nl, nl
2848
2849   def CheckPrereq(self):
2850     """Check prerequisites.
2851
2852     This checks that the instance is in the cluster.
2853
2854     """
2855     instance = self.cfg.GetInstanceInfo(
2856       self.cfg.ExpandInstanceName(self.op.instance_name))
2857     if instance is None:
2858       raise errors.OpPrereqError, ("Instance '%s' not known" %
2859                                    self.op.instance_name)
2860     self.instance = instance
2861
2862     if instance.disk_template != constants.DT_REMOTE_RAID1:
2863       raise errors.OpPrereqError, ("Instance's disk layout is not"
2864                                    " remote_raid1.")
2865     for disk in instance.disks:
2866       if disk.iv_name == self.op.disk_name:
2867         break
2868     else:
2869       raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2870                                    " instance." % self.op.disk_name)
2871     for child in disk.children:
2872       if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2873         break
2874     else:
2875       raise errors.OpPrereqError, ("Can't find the device with this port.")
2876
2877     if len(disk.children) < 2:
2878       raise errors.OpPrereqError, ("Cannot remove the last component from"
2879                                    " a mirror.")
2880     self.disk = disk
2881     self.child = child
2882     if self.child.logical_id[0] == instance.primary_node:
2883       oid = 1
2884     else:
2885       oid = 0
2886     self.old_secondary = self.child.logical_id[oid]
2887
2888   def Exec(self, feedback_fn):
2889     """Remove the mirror component
2890
2891     """
2892     instance = self.instance
2893     disk = self.disk
2894     child = self.child
2895     logger.Info("remove mirror component")
2896     self.cfg.SetDiskID(disk, instance.primary_node)
2897     if not rpc.call_blockdev_removechild(instance.primary_node,
2898                                               disk, child):
2899       raise errors.OpExecError, ("Can't remove child from mirror.")
2900
2901     for node in child.logical_id[:2]:
2902       self.cfg.SetDiskID(child, node)
2903       if not rpc.call_blockdev_remove(node, child):
2904         logger.Error("Warning: failed to remove device from node %s,"
2905                      " continuing operation." % node)
2906
2907     disk.children.remove(child)
2908     self.cfg.AddInstance(instance)
2909
2910
2911 class LUReplaceDisks(LogicalUnit):
2912   """Replace the disks of an instance.
2913
2914   """
2915   HPATH = "mirrors-replace"
2916   HTYPE = constants.HTYPE_INSTANCE
2917   _OP_REQP = ["instance_name"]
2918
2919   def BuildHooksEnv(self):
2920     """Build hooks env.
2921
2922     This runs on the master, the primary and all the secondaries.
2923
2924     """
2925     env = {
2926       "INSTANCE_NAME": self.op.instance_name,
2927       "NEW_SECONDARY": self.op.remote_node,
2928       "OLD_SECONDARY": self.instance.secondary_nodes[0],
2929       }
2930     nl = [self.sstore.GetMasterNode(),
2931           self.instance.primary_node] + list(self.instance.secondary_nodes)
2932     return env, nl, nl
2933
2934   def CheckPrereq(self):
2935     """Check prerequisites.
2936
2937     This checks that the instance is in the cluster.
2938
2939     """
2940     instance = self.cfg.GetInstanceInfo(
2941       self.cfg.ExpandInstanceName(self.op.instance_name))
2942     if instance is None:
2943       raise errors.OpPrereqError, ("Instance '%s' not known" %
2944                                    self.op.instance_name)
2945     self.instance = instance
2946
2947     if instance.disk_template != constants.DT_REMOTE_RAID1:
2948       raise errors.OpPrereqError, ("Instance's disk layout is not"
2949                                    " remote_raid1.")
2950
2951     if len(instance.secondary_nodes) != 1:
2952       raise errors.OpPrereqError, ("The instance has a strange layout,"
2953                                    " expected one secondary but found %d" %
2954                                    len(instance.secondary_nodes))
2955
2956     remote_node = getattr(self.op, "remote_node", None)
2957     if remote_node is None:
2958       remote_node = instance.secondary_nodes[0]
2959     else:
2960       remote_node = self.cfg.ExpandNodeName(remote_node)
2961       if remote_node is None:
2962         raise errors.OpPrereqError, ("Node '%s' not known" %
2963                                      self.op.remote_node)
2964     if remote_node == instance.primary_node:
2965       raise errors.OpPrereqError, ("The specified node is the primary node of"
2966                                    " the instance.")
2967     self.op.remote_node = remote_node
2968
2969   def Exec(self, feedback_fn):
2970     """Replace the disks of an instance.
2971
2972     """
2973     instance = self.instance
2974     iv_names = {}
2975     # start of work
2976     remote_node = self.op.remote_node
2977     cfg = self.cfg
2978     vgname = cfg.GetVGName()
2979     for dev in instance.disks:
2980       size = dev.size
2981       new_drbd = _GenerateMDDRBDBranch(cfg, vgname, instance.primary_node,
2982                                        remote_node, size,
2983                                        "%s-%s" % (instance.name, dev.iv_name))
2984       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
2985       logger.Info("adding new mirror component on secondary for %s" %
2986                   dev.iv_name)
2987       #HARDCODE
2988       if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False):
2989         raise errors.OpExecError, ("Failed to create new component on"
2990                                    " secondary node %s\n"
2991                                    "Full abort, cleanup manually!" %
2992                                    remote_node)
2993
2994       logger.Info("adding new mirror component on primary")
2995       #HARDCODE
2996       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd):
2997         # remove secondary dev
2998         cfg.SetDiskID(new_drbd, remote_node)
2999         rpc.call_blockdev_remove(remote_node, new_drbd)
3000         raise errors.OpExecError("Failed to create volume on primary!\n"
3001                                  "Full abort, cleanup manually!!")
3002
3003       # the device exists now
3004       # call the primary node to add the mirror to md
3005       logger.Info("adding new mirror component to md")
3006       if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3007                                         new_drbd):
3008         logger.Error("Can't add mirror compoment to md!")
3009         cfg.SetDiskID(new_drbd, remote_node)
3010         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3011           logger.Error("Can't rollback on secondary")
3012         cfg.SetDiskID(new_drbd, instance.primary_node)
3013         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3014           logger.Error("Can't rollback on primary")
3015         raise errors.OpExecError, ("Full abort, cleanup manually!!")
3016
3017       dev.children.append(new_drbd)
3018       cfg.AddInstance(instance)
3019
3020     # this can fail as the old devices are degraded and _WaitForSync
3021     # does a combined result over all disks, so we don't check its
3022     # return value
3023     _WaitForSync(cfg, instance, unlock=True)
3024
3025     # so check manually all the devices
3026     for name in iv_names:
3027       dev, child, new_drbd = iv_names[name]
3028       cfg.SetDiskID(dev, instance.primary_node)
3029       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3030       if is_degr:
3031         raise errors.OpExecError, ("MD device %s is degraded!" % name)
3032       cfg.SetDiskID(new_drbd, instance.primary_node)
3033       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3034       if is_degr:
3035         raise errors.OpExecError, ("New drbd device %s is degraded!" % name)
3036
3037     for name in iv_names:
3038       dev, child, new_drbd = iv_names[name]
3039       logger.Info("remove mirror %s component" % name)
3040       cfg.SetDiskID(dev, instance.primary_node)
3041       if not rpc.call_blockdev_removechild(instance.primary_node,
3042                                                 dev, child):
3043         logger.Error("Can't remove child from mirror, aborting"
3044                      " *this device cleanup*.\nYou need to cleanup manually!!")
3045         continue
3046
3047       for node in child.logical_id[:2]:
3048         logger.Info("remove child device on %s" % node)
3049         cfg.SetDiskID(child, node)
3050         if not rpc.call_blockdev_remove(node, child):
3051           logger.Error("Warning: failed to remove device from node %s,"
3052                        " continuing operation." % node)
3053
3054       dev.children.remove(child)
3055
3056       cfg.AddInstance(instance)
3057
3058
3059 class LUQueryInstanceData(NoHooksLU):
3060   """Query runtime instance data.
3061
3062   """
3063   _OP_REQP = ["instances"]
3064
3065   def CheckPrereq(self):
3066     """Check prerequisites.
3067
3068     This only checks the optional instance list against the existing names.
3069
3070     """
3071     if not isinstance(self.op.instances, list):
3072       raise errors.OpPrereqError, "Invalid argument type 'instances'"
3073     if self.op.instances:
3074       self.wanted_instances = []
3075       names = self.op.instances
3076       for name in names:
3077         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3078         if instance is None:
3079           raise errors.OpPrereqError, ("No such instance name '%s'" % name)
3080       self.wanted_instances.append(instance)
3081     else:
3082       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3083                                in self.cfg.GetInstanceList()]
3084     return
3085
3086
3087   def _ComputeDiskStatus(self, instance, snode, dev):
3088     """Compute block device status.
3089
3090     """
3091     self.cfg.SetDiskID(dev, instance.primary_node)
3092     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3093     if dev.dev_type == "drbd":
3094       # we change the snode then (otherwise we use the one passed in)
3095       if dev.logical_id[0] == instance.primary_node:
3096         snode = dev.logical_id[1]
3097       else:
3098         snode = dev.logical_id[0]
3099
3100     if snode:
3101       self.cfg.SetDiskID(dev, snode)
3102       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3103     else:
3104       dev_sstatus = None
3105
3106     if dev.children:
3107       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3108                       for child in dev.children]
3109     else:
3110       dev_children = []
3111
3112     data = {
3113       "iv_name": dev.iv_name,
3114       "dev_type": dev.dev_type,
3115       "logical_id": dev.logical_id,
3116       "physical_id": dev.physical_id,
3117       "pstatus": dev_pstatus,
3118       "sstatus": dev_sstatus,
3119       "children": dev_children,
3120       }
3121
3122     return data
3123
3124   def Exec(self, feedback_fn):
3125     """Gather and return data"""
3126     result = {}
3127     for instance in self.wanted_instances:
3128       remote_info = rpc.call_instance_info(instance.primary_node,
3129                                                 instance.name)
3130       if remote_info and "state" in remote_info:
3131         remote_state = "up"
3132       else:
3133         remote_state = "down"
3134       if instance.status == "down":
3135         config_state = "down"
3136       else:
3137         config_state = "up"
3138
3139       disks = [self._ComputeDiskStatus(instance, None, device)
3140                for device in instance.disks]
3141
3142       idict = {
3143         "name": instance.name,
3144         "config_state": config_state,
3145         "run_state": remote_state,
3146         "pnode": instance.primary_node,
3147         "snodes": instance.secondary_nodes,
3148         "os": instance.os,
3149         "memory": instance.memory,
3150         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3151         "disks": disks,
3152         }
3153
3154       result[instance.name] = idict
3155
3156     return result
3157
3158
3159 class LUQueryNodeData(NoHooksLU):
3160   """Logical unit for querying node data.
3161
3162   """
3163   _OP_REQP = ["nodes"]
3164
3165   def CheckPrereq(self):
3166     """Check prerequisites.
3167
3168     This only checks the optional node list against the existing names.
3169
3170     """
3171     self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3172
3173   def Exec(self, feedback_fn):
3174     """Compute and return the list of nodes.
3175
3176     """
3177     ilist = [self.cfg.GetInstanceInfo(iname) for iname
3178              in self.cfg.GetInstanceList()]
3179     result = []
3180     for node in self.wanted_nodes:
3181       result.append((node.name, node.primary_ip, node.secondary_ip,
3182                      [inst.name for inst in ilist
3183                       if inst.primary_node == node.name],
3184                      [inst.name for inst in ilist
3185                       if node.name in inst.secondary_nodes],
3186                      ))
3187     return result
3188
3189
3190 class LUSetInstanceParms(LogicalUnit):
3191   """Modifies an instances's parameters.
3192
3193   """
3194   HPATH = "instance-modify"
3195   HTYPE = constants.HTYPE_INSTANCE
3196   _OP_REQP = ["instance_name"]
3197
3198   def BuildHooksEnv(self):
3199     """Build hooks env.
3200
3201     This runs on the master, primary and secondaries.
3202
3203     """
3204     env = {
3205       "INSTANCE_NAME": self.op.instance_name,
3206       }
3207     if self.mem:
3208       env["MEM_SIZE"] = self.mem
3209     if self.vcpus:
3210       env["VCPUS"] = self.vcpus
3211     if self.do_ip:
3212       env["INSTANCE_IP"] = self.ip
3213     if self.bridge:
3214       env["BRIDGE"] = self.bridge
3215
3216     nl = [self.sstore.GetMasterNode(),
3217           self.instance.primary_node] + list(self.instance.secondary_nodes)
3218
3219     return env, nl, nl
3220
3221   def CheckPrereq(self):
3222     """Check prerequisites.
3223
3224     This only checks the instance list against the existing names.
3225
3226     """
3227     self.mem = getattr(self.op, "mem", None)
3228     self.vcpus = getattr(self.op, "vcpus", None)
3229     self.ip = getattr(self.op, "ip", None)
3230     self.bridge = getattr(self.op, "bridge", None)
3231     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3232       raise errors.OpPrereqError, ("No changes submitted")
3233     if self.mem is not None:
3234       try:
3235         self.mem = int(self.mem)
3236       except ValueError, err:
3237         raise errors.OpPrereqError, ("Invalid memory size: %s" % str(err))
3238     if self.vcpus is not None:
3239       try:
3240         self.vcpus = int(self.vcpus)
3241       except ValueError, err:
3242         raise errors.OpPrereqError, ("Invalid vcpus number: %s" % str(err))
3243     if self.ip is not None:
3244       self.do_ip = True
3245       if self.ip.lower() == "none":
3246         self.ip = None
3247       else:
3248         if not utils.IsValidIP(self.ip):
3249           raise errors.OpPrereqError, ("Invalid IP address '%s'." % self.ip)
3250     else:
3251       self.do_ip = False
3252
3253     instance = self.cfg.GetInstanceInfo(
3254       self.cfg.ExpandInstanceName(self.op.instance_name))
3255     if instance is None:
3256       raise errors.OpPrereqError, ("No such instance name '%s'" %
3257                                    self.op.instance_name)
3258     self.op.instance_name = instance.name
3259     self.instance = instance
3260     return
3261
3262   def Exec(self, feedback_fn):
3263     """Modifies an instance.
3264
3265     All parameters take effect only at the next restart of the instance.
3266     """
3267     result = []
3268     instance = self.instance
3269     if self.mem:
3270       instance.memory = self.mem
3271       result.append(("mem", self.mem))
3272     if self.vcpus:
3273       instance.vcpus = self.vcpus
3274       result.append(("vcpus",  self.vcpus))
3275     if self.do_ip:
3276       instance.nics[0].ip = self.ip
3277       result.append(("ip", self.ip))
3278     if self.bridge:
3279       instance.nics[0].bridge = self.bridge
3280       result.append(("bridge", self.bridge))
3281
3282     self.cfg.AddInstance(instance)
3283
3284     return result
3285
3286
3287 class LUQueryExports(NoHooksLU):
3288   """Query the exports list
3289
3290   """
3291   _OP_REQP = []
3292
3293   def CheckPrereq(self):
3294     """Check that the nodelist contains only existing nodes.
3295
3296     """
3297     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3298
3299   def Exec(self, feedback_fn):
3300     """Compute the list of all the exported system images.
3301
3302     Returns:
3303       a dictionary with the structure node->(export-list)
3304       where export-list is a list of the instances exported on
3305       that node.
3306
3307     """
3308     return rpc.call_export_list([node.name for node in self.nodes])
3309
3310
3311 class LUExportInstance(LogicalUnit):
3312   """Export an instance to an image in the cluster.
3313
3314   """
3315   HPATH = "instance-export"
3316   HTYPE = constants.HTYPE_INSTANCE
3317   _OP_REQP = ["instance_name", "target_node", "shutdown"]
3318
3319   def BuildHooksEnv(self):
3320     """Build hooks env.
3321
3322     This will run on the master, primary node and target node.
3323
3324     """
3325     env = {
3326       "INSTANCE_NAME": self.op.instance_name,
3327       "EXPORT_NODE": self.op.target_node,
3328       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3329       }
3330     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3331           self.op.target_node]
3332     return env, nl, nl
3333
3334   def CheckPrereq(self):
3335     """Check prerequisites.
3336
3337     This checks that the instance name is a valid one.
3338
3339     """
3340     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3341     self.instance = self.cfg.GetInstanceInfo(instance_name)
3342     if self.instance is None:
3343       raise errors.OpPrereqError, ("Instance '%s' not found" %
3344                                    self.op.instance_name)
3345
3346     # node verification
3347     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3348     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3349
3350     if self.dst_node is None:
3351       raise errors.OpPrereqError, ("Destination node '%s' is unknown." %
3352                                    self.op.target_node)
3353     self.op.target_node = self.dst_node.name
3354
3355   def Exec(self, feedback_fn):
3356     """Export an instance to an image in the cluster.
3357
3358     """
3359     instance = self.instance
3360     dst_node = self.dst_node
3361     src_node = instance.primary_node
3362     # shutdown the instance, unless requested not to do so
3363     if self.op.shutdown:
3364       op = opcodes.OpShutdownInstance(instance_name=instance.name)
3365       self.processor.ChainOpCode(op, feedback_fn)
3366
3367     vgname = self.cfg.GetVGName()
3368
3369     snap_disks = []
3370
3371     try:
3372       for disk in instance.disks:
3373         if disk.iv_name == "sda":
3374           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3375           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3376
3377           if not new_dev_name:
3378             logger.Error("could not snapshot block device %s on node %s" %
3379                          (disk.logical_id[1], src_node))
3380           else:
3381             new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3382                                       logical_id=(vgname, new_dev_name),
3383                                       physical_id=(vgname, new_dev_name),
3384                                       iv_name=disk.iv_name)
3385             snap_disks.append(new_dev)
3386
3387     finally:
3388       if self.op.shutdown:
3389         op = opcodes.OpStartupInstance(instance_name=instance.name,
3390                                        force=False)
3391         self.processor.ChainOpCode(op, feedback_fn)
3392
3393     # TODO: check for size
3394
3395     for dev in snap_disks:
3396       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3397                                            instance):
3398         logger.Error("could not export block device %s from node"
3399                      " %s to node %s" %
3400                      (dev.logical_id[1], src_node, dst_node.name))
3401       if not rpc.call_blockdev_remove(src_node, dev):
3402         logger.Error("could not remove snapshot block device %s from"
3403                      " node %s" % (dev.logical_id[1], src_node))
3404
3405     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3406       logger.Error("could not finalize export for instance %s on node %s" %
3407                    (instance.name, dst_node.name))
3408
3409     nodelist = self.cfg.GetNodeList()
3410     nodelist.remove(dst_node.name)
3411
3412     # on one-node clusters nodelist will be empty after the removal
3413     # if we proceed the backup would be removed because OpQueryExports
3414     # substitutes an empty list with the full cluster node list.
3415     if nodelist:
3416       op = opcodes.OpQueryExports(nodes=nodelist)
3417       exportlist = self.processor.ChainOpCode(op, feedback_fn)
3418       for node in exportlist:
3419         if instance.name in exportlist[node]:
3420           if not rpc.call_export_remove(node, instance.name):
3421             logger.Error("could not remove older export for instance %s"
3422                          " on node %s" % (instance.name, node))