Fix the "gnt-cluster getmaster" command by making the LuQueryClusterInfo
[ganeti-local] / lib / cmdlib.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import socket
30 import time
31 import tempfile
32 import re
33 import platform
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47 class LogicalUnit(object):
48   """Logical Unit base class..
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.processor = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     for attr_name in self._OP_REQP:
78       attr_val = getattr(op, attr_name, None)
79       if attr_val is None:
80         raise errors.OpPrereqError, ("Required parameter '%s' missing" %
81                                      attr_name)
82     if self.REQ_CLUSTER:
83       if not cfg.IsCluster():
84         raise errors.OpPrereqError, ("Cluster not initialized yet,"
85                                      " use 'gnt-cluster init' first.")
86       if self.REQ_MASTER:
87         master = sstore.GetMasterNode()
88         if master != socket.gethostname():
89           raise errors.OpPrereqError, ("Commands must be run on the master"
90                                        " node %s" % master)
91
92   def CheckPrereq(self):
93     """Check prerequisites for this LU.
94
95     This method should check that the prerequisites for the execution
96     of this LU are fulfilled. It can do internode communication, but
97     it should be idempotent - no cluster or system changes are
98     allowed.
99
100     The method should raise errors.OpPrereqError in case something is
101     not fulfilled. Its return value is ignored.
102
103     This method should also update all the parameters of the opcode to
104     their canonical form; e.g. a short node name must be fully
105     expanded after this method has successfully completed (so that
106     hooks, logging, etc. work correctly).
107
108     """
109     raise NotImplementedError
110
111   def Exec(self, feedback_fn):
112     """Execute the LU.
113
114     This method should implement the actual work. It should raise
115     errors.OpExecError for failures that are somewhat dealt with in
116     code, or expected.
117
118     """
119     raise NotImplementedError
120
121   def BuildHooksEnv(self):
122     """Build hooks environment for this LU.
123
124     This method should return a three-node tuple consisting of: a dict
125     containing the environment that will be used for running the
126     specific hook for this LU, a list of node names on which the hook
127     should run before the execution, and a list of node names on which
128     the hook should run after the execution.
129
130     The keys of the dict must not have 'GANETI_' prefixed as this will
131     be handled in the hooks runner. Also note additional keys will be
132     added by the hooks runner. If the LU doesn't define any
133     environment, an empty dict (and not None) should be returned.
134
135     As for the node lists, the master should not be included in the
136     them, as it will be added by the hooks runner in case this LU
137     requires a cluster to run on (otherwise we don't have a node
138     list). No nodes should be returned as an empty list (and not
139     None).
140
141     Note that if the HPATH for a LU class is None, this function will
142     not be called.
143
144     """
145     raise NotImplementedError
146
147
148 class NoHooksLU(LogicalUnit):
149   """Simple LU which runs no hooks.
150
151   This LU is intended as a parent for other LogicalUnits which will
152   run no hooks, in order to reduce duplicate code.
153
154   """
155   HPATH = None
156   HTYPE = None
157
158   def BuildHooksEnv(self):
159     """Build hooks env.
160
161     This is a no-op, since we don't run hooks.
162
163     """
164     return
165
166
167 def _GetWantedNodes(lu, nodes):
168   """Returns list of checked and expanded nodes.
169
170   Args:
171     nodes: List of nodes (strings) or None for all
172
173   """
174   if nodes is not None and not isinstance(nodes, list):
175     raise errors.OpPrereqError, "Invalid argument type 'nodes'"
176
177   if nodes:
178     wanted_nodes = []
179
180     for name in nodes:
181       node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
182       if node is None:
183         raise errors.OpPrereqError, ("No such node name '%s'" % name)
184     wanted_nodes.append(node)
185
186     return wanted_nodes
187   else:
188     return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
189
190
191 def _CheckOutputFields(static, dynamic, selected):
192   """Checks whether all selected fields are valid.
193
194   Args:
195     static: Static fields
196     dynamic: Dynamic fields
197
198   """
199   static_fields = frozenset(static)
200   dynamic_fields = frozenset(dynamic)
201
202   all_fields = static_fields | dynamic_fields
203
204   if not all_fields.issuperset(selected):
205     raise errors.OpPrereqError, ("Unknown output fields selected: %s"
206                                  % ",".join(frozenset(selected).
207                                             difference(all_fields)))
208
209
210 def _UpdateEtcHosts(fullnode, ip):
211   """Ensure a node has a correct entry in /etc/hosts.
212
213   Args:
214     fullnode - Fully qualified domain name of host. (str)
215     ip       - IPv4 address of host (str)
216
217   """
218   node = fullnode.split(".", 1)[0]
219
220   f = open('/etc/hosts', 'r+')
221
222   inthere = False
223
224   save_lines = []
225   add_lines = []
226   removed = False
227
228   while True:
229     rawline = f.readline()
230
231     if not rawline:
232       # End of file
233       break
234
235     line = rawline.split('\n')[0]
236
237     # Strip off comments
238     line = line.split('#')[0]
239
240     if not line:
241       # Entire line was comment, skip
242       save_lines.append(rawline)
243       continue
244
245     fields = line.split()
246
247     haveall = True
248     havesome = False
249     for spec in [ ip, fullnode, node ]:
250       if spec not in fields:
251         haveall = False
252       if spec in fields:
253         havesome = True
254
255     if haveall:
256       inthere = True
257       save_lines.append(rawline)
258       continue
259
260     if havesome and not haveall:
261       # Line (old, or manual?) which is missing some.  Remove.
262       removed = True
263       continue
264
265     save_lines.append(rawline)
266
267   if not inthere:
268     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
269
270   if removed:
271     if add_lines:
272       save_lines = save_lines + add_lines
273
274     # We removed a line, write a new file and replace old.
275     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
276     newfile = os.fdopen(fd, 'w')
277     newfile.write(''.join(save_lines))
278     newfile.close()
279     os.rename(tmpname, '/etc/hosts')
280
281   elif add_lines:
282     # Simply appending a new line will do the trick.
283     f.seek(0, 2)
284     for add in add_lines:
285       f.write(add)
286
287   f.close()
288
289
290 def _UpdateKnownHosts(fullnode, ip, pubkey):
291   """Ensure a node has a correct known_hosts entry.
292
293   Args:
294     fullnode - Fully qualified domain name of host. (str)
295     ip       - IPv4 address of host (str)
296     pubkey   - the public key of the cluster
297
298   """
299   if os.path.exists('/etc/ssh/ssh_known_hosts'):
300     f = open('/etc/ssh/ssh_known_hosts', 'r+')
301   else:
302     f = open('/etc/ssh/ssh_known_hosts', 'w+')
303
304   inthere = False
305
306   save_lines = []
307   add_lines = []
308   removed = False
309
310   while True:
311     rawline = f.readline()
312     logger.Debug('read %s' % (repr(rawline),))
313
314     if not rawline:
315       # End of file
316       break
317
318     line = rawline.split('\n')[0]
319
320     parts = line.split(' ')
321     fields = parts[0].split(',')
322     key = parts[2]
323
324     haveall = True
325     havesome = False
326     for spec in [ ip, fullnode ]:
327       if spec not in fields:
328         haveall = False
329       if spec in fields:
330         havesome = True
331
332     logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
333     if haveall and key == pubkey:
334       inthere = True
335       save_lines.append(rawline)
336       logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
337       continue
338
339     if havesome and (not haveall or key != pubkey):
340       removed = True
341       logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
342       continue
343
344     save_lines.append(rawline)
345
346   if not inthere:
347     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
348     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
349
350   if removed:
351     save_lines = save_lines + add_lines
352
353     # Write a new file and replace old.
354     fd, tmpname = tempfile.mkstemp('tmp', 'ssh_known_hosts_', '/etc/ssh')
355     newfile = os.fdopen(fd, 'w')
356     newfile.write(''.join(save_lines))
357     newfile.close()
358     logger.Debug("Wrote new known_hosts.")
359     os.rename(tmpname, '/etc/ssh/ssh_known_hosts')
360
361   elif add_lines:
362     # Simply appending a new line will do the trick.
363     f.seek(0, 2)
364     for add in add_lines:
365       f.write(add)
366
367   f.close()
368
369
370 def _HasValidVG(vglist, vgname):
371   """Checks if the volume group list is valid.
372
373   A non-None return value means there's an error, and the return value
374   is the error message.
375
376   """
377   vgsize = vglist.get(vgname, None)
378   if vgsize is None:
379     return "volume group '%s' missing" % vgname
380   elif vgsize < 20480:
381     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
382             (vgname, vgsize))
383   return None
384
385
386 def _InitSSHSetup(node):
387   """Setup the SSH configuration for the cluster.
388
389
390   This generates a dsa keypair for root, adds the pub key to the
391   permitted hosts and adds the hostkey to its own known hosts.
392
393   Args:
394     node: the name of this host as a fqdn
395
396   """
397   utils.RemoveFile('/root/.ssh/known_hosts')
398
399   if os.path.exists('/root/.ssh/id_dsa'):
400     utils.CreateBackup('/root/.ssh/id_dsa')
401   if os.path.exists('/root/.ssh/id_dsa.pub'):
402     utils.CreateBackup('/root/.ssh/id_dsa.pub')
403
404   utils.RemoveFile('/root/.ssh/id_dsa')
405   utils.RemoveFile('/root/.ssh/id_dsa.pub')
406
407   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
408                          "-f", "/root/.ssh/id_dsa",
409                          "-q", "-N", ""])
410   if result.failed:
411     raise errors.OpExecError, ("could not generate ssh keypair, error %s" %
412                                result.output)
413
414   f = open('/root/.ssh/id_dsa.pub', 'r')
415   try:
416     utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
417   finally:
418     f.close()
419
420
421 def _InitGanetiServerSetup(ss):
422   """Setup the necessary configuration for the initial node daemon.
423
424   This creates the nodepass file containing the shared password for
425   the cluster and also generates the SSL certificate.
426
427   """
428   # Create pseudo random password
429   randpass = sha.new(os.urandom(64)).hexdigest()
430   # and write it into sstore
431   ss.SetKey(ss.SS_NODED_PASS, randpass)
432
433   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
434                          "-days", str(365*5), "-nodes", "-x509",
435                          "-keyout", constants.SSL_CERT_FILE,
436                          "-out", constants.SSL_CERT_FILE, "-batch"])
437   if result.failed:
438     raise errors.OpExecError, ("could not generate server ssl cert, command"
439                                " %s had exitcode %s and error message %s" %
440                                (result.cmd, result.exit_code, result.output))
441
442   os.chmod(constants.SSL_CERT_FILE, 0400)
443
444   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
445
446   if result.failed:
447     raise errors.OpExecError, ("could not start the node daemon, command %s"
448                                " had exitcode %s and error %s" %
449                                (result.cmd, result.exit_code, result.output))
450
451
452 class LUInitCluster(LogicalUnit):
453   """Initialise the cluster.
454
455   """
456   HPATH = "cluster-init"
457   HTYPE = constants.HTYPE_CLUSTER
458   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
459               "def_bridge", "master_netdev"]
460   REQ_CLUSTER = False
461
462   def BuildHooksEnv(self):
463     """Build hooks env.
464
465     Notes: Since we don't require a cluster, we must manually add
466     ourselves in the post-run node list.
467
468     """
469     env = {"CLUSTER": self.op.cluster_name,
470            "MASTER": self.hostname['hostname_full']}
471     return env, [], [self.hostname['hostname_full']]
472
473   def CheckPrereq(self):
474     """Verify that the passed name is a valid one.
475
476     """
477     if config.ConfigWriter.IsCluster():
478       raise errors.OpPrereqError, ("Cluster is already initialised")
479
480     hostname_local = socket.gethostname()
481     self.hostname = hostname = utils.LookupHostname(hostname_local)
482     if not hostname:
483       raise errors.OpPrereqError, ("Cannot resolve my own hostname ('%s')" %
484                                    hostname_local)
485
486     self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
487     if not clustername:
488       raise errors.OpPrereqError, ("Cannot resolve given cluster name ('%s')"
489                                    % self.op.cluster_name)
490
491     result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
492     if result.failed:
493       raise errors.OpPrereqError, ("Inconsistency: this host's name resolves"
494                                    " to %s,\nbut this ip address does not"
495                                    " belong to this host."
496                                    " Aborting." % hostname['ip'])
497
498     secondary_ip = getattr(self.op, "secondary_ip", None)
499     if secondary_ip and not utils.IsValidIP(secondary_ip):
500       raise errors.OpPrereqError, ("Invalid secondary ip given")
501     if secondary_ip and secondary_ip != hostname['ip']:
502       result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
503       if result.failed:
504         raise errors.OpPrereqError, ("You gave %s as secondary IP,\n"
505                                      "but it does not belong to this host." %
506                                      secondary_ip)
507     self.secondary_ip = secondary_ip
508
509     # checks presence of the volume group given
510     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
511
512     if vgstatus:
513       raise errors.OpPrereqError, ("Error: %s" % vgstatus)
514
515     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
516                     self.op.mac_prefix):
517       raise errors.OpPrereqError, ("Invalid mac prefix given '%s'" %
518                                    self.op.mac_prefix)
519
520     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
521       raise errors.OpPrereqError, ("Invalid hypervisor type given '%s'" %
522                                    self.op.hypervisor_type)
523
524     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
525     if result.failed:
526       raise errors.OpPrereqError, ("Invalid master netdev given (%s): '%s'" %
527                                    (self.op.master_netdev, result.output))
528
529   def Exec(self, feedback_fn):
530     """Initialize the cluster.
531
532     """
533     clustername = self.clustername
534     hostname = self.hostname
535
536     # set up the simple store
537     ss = ssconf.SimpleStore()
538     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
539     ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
540     ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
541     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
542     ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
543
544     # set up the inter-node password and certificate
545     _InitGanetiServerSetup(ss)
546
547     # start the master ip
548     rpc.call_node_start_master(hostname['hostname_full'])
549
550     # set up ssh config and /etc/hosts
551     f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
552     try:
553       sshline = f.read()
554     finally:
555       f.close()
556     sshkey = sshline.split(" ")[1]
557
558     _UpdateEtcHosts(hostname['hostname_full'],
559                     hostname['ip'],
560                     )
561
562     _UpdateKnownHosts(hostname['hostname_full'],
563                       hostname['ip'],
564                       sshkey,
565                       )
566
567     _InitSSHSetup(hostname['hostname'])
568
569     # init of cluster config file
570     cfgw = config.ConfigWriter()
571     cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
572                     sshkey, self.op.mac_prefix,
573                     self.op.vg_name, self.op.def_bridge)
574
575
576 class LUDestroyCluster(NoHooksLU):
577   """Logical unit for destroying the cluster.
578
579   """
580   _OP_REQP = []
581
582   def CheckPrereq(self):
583     """Check prerequisites.
584
585     This checks whether the cluster is empty.
586
587     Any errors are signalled by raising errors.OpPrereqError.
588
589     """
590     master = self.sstore.GetMasterNode()
591
592     nodelist = self.cfg.GetNodeList()
593     if len(nodelist) > 0 and nodelist != [master]:
594       raise errors.OpPrereqError, ("There are still %d node(s) in "
595                                    "this cluster." % (len(nodelist) - 1))
596
597   def Exec(self, feedback_fn):
598     """Destroys the cluster.
599
600     """
601     utils.CreateBackup('/root/.ssh/id_dsa')
602     utils.CreateBackup('/root/.ssh/id_dsa.pub')
603     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
604
605
606 class LUVerifyCluster(NoHooksLU):
607   """Verifies the cluster status.
608
609   """
610   _OP_REQP = []
611
612   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
613                   remote_version, feedback_fn):
614     """Run multiple tests against a node.
615
616     Test list:
617       - compares ganeti version
618       - checks vg existance and size > 20G
619       - checks config file checksum
620       - checks ssh to other nodes
621
622     Args:
623       node: name of the node to check
624       file_list: required list of files
625       local_cksum: dictionary of local files and their checksums
626
627     """
628     # compares ganeti version
629     local_version = constants.PROTOCOL_VERSION
630     if not remote_version:
631       feedback_fn(" - ERROR: connection to %s failed" % (node))
632       return True
633
634     if local_version != remote_version:
635       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
636                       (local_version, node, remote_version))
637       return True
638
639     # checks vg existance and size > 20G
640
641     bad = False
642     if not vglist:
643       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
644                       (node,))
645       bad = True
646     else:
647       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
648       if vgstatus:
649         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
650         bad = True
651
652     # checks config file checksum
653     # checks ssh to any
654
655     if 'filelist' not in node_result:
656       bad = True
657       feedback_fn("  - ERROR: node hasn't returned file checksum data")
658     else:
659       remote_cksum = node_result['filelist']
660       for file_name in file_list:
661         if file_name not in remote_cksum:
662           bad = True
663           feedback_fn("  - ERROR: file '%s' missing" % file_name)
664         elif remote_cksum[file_name] != local_cksum[file_name]:
665           bad = True
666           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
667
668     if 'nodelist' not in node_result:
669       bad = True
670       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
671     else:
672       if node_result['nodelist']:
673         bad = True
674         for node in node_result['nodelist']:
675           feedback_fn("  - ERROR: communication with node '%s': %s" %
676                           (node, node_result['nodelist'][node]))
677     hyp_result = node_result.get('hypervisor', None)
678     if hyp_result is not None:
679       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
680     return bad
681
682   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
683     """Verify an instance.
684
685     This function checks to see if the required block devices are
686     available on the instance's node.
687
688     """
689     bad = False
690
691     instancelist = self.cfg.GetInstanceList()
692     if not instance in instancelist:
693       feedback_fn("  - ERROR: instance %s not in instance list %s" %
694                       (instance, instancelist))
695       bad = True
696
697     instanceconfig = self.cfg.GetInstanceInfo(instance)
698     node_current = instanceconfig.primary_node
699
700     node_vol_should = {}
701     instanceconfig.MapLVsByNode(node_vol_should)
702
703     for node in node_vol_should:
704       for volume in node_vol_should[node]:
705         if node not in node_vol_is or volume not in node_vol_is[node]:
706           feedback_fn("  - ERROR: volume %s missing on node %s" %
707                           (volume, node))
708           bad = True
709
710     if not instanceconfig.status == 'down':
711       if not instance in node_instance[node_current]:
712         feedback_fn("  - ERROR: instance %s not running on node %s" %
713                         (instance, node_current))
714         bad = True
715
716     for node in node_instance:
717       if (not node == node_current):
718         if instance in node_instance[node]:
719           feedback_fn("  - ERROR: instance %s should not run on node %s" %
720                           (instance, node))
721           bad = True
722
723     return not bad
724
725   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726     """Verify if there are any unknown volumes in the cluster.
727
728     The .os, .swap and backup volumes are ignored. All other volumes are
729     reported as unknown.
730
731     """
732     bad = False
733
734     for node in node_vol_is:
735       for volume in node_vol_is[node]:
736         if node not in node_vol_should or volume not in node_vol_should[node]:
737           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738                       (volume, node))
739           bad = True
740     return bad
741
742   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743     """Verify the list of running instances.
744
745     This checks what instances are running but unknown to the cluster.
746
747     """
748     bad = False
749     for node in node_instance:
750       for runninginstance in node_instance[node]:
751         if runninginstance not in instancelist:
752           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753                           (runninginstance, node))
754           bad = True
755     return bad
756
757   def CheckPrereq(self):
758     """Check prerequisites.
759
760     This has no prerequisites.
761
762     """
763     pass
764
765   def Exec(self, feedback_fn):
766     """Verify integrity of cluster, performing various test on nodes.
767
768     """
769     bad = False
770     feedback_fn("* Verifying global settings")
771     self.cfg.VerifyConfig()
772
773     master = self.sstore.GetMasterNode()
774     vg_name = self.cfg.GetVGName()
775     nodelist = utils.NiceSort(self.cfg.GetNodeList())
776     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
777     node_volume = {}
778     node_instance = {}
779
780     # FIXME: verify OS list
781     # do local checksums
782     file_names = list(self.sstore.GetFileList())
783     file_names.append(constants.SSL_CERT_FILE)
784     file_names.append(constants.CLUSTER_CONF_FILE)
785     local_checksums = utils.FingerprintFiles(file_names)
786
787     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
788     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
789     all_instanceinfo = rpc.call_instance_list(nodelist)
790     all_vglist = rpc.call_vg_list(nodelist)
791     node_verify_param = {
792       'filelist': file_names,
793       'nodelist': nodelist,
794       'hypervisor': None,
795       }
796     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
797     all_rversion = rpc.call_version(nodelist)
798
799     for node in nodelist:
800       feedback_fn("* Verifying node %s" % node)
801       result = self._VerifyNode(node, file_names, local_checksums,
802                                 all_vglist[node], all_nvinfo[node],
803                                 all_rversion[node], feedback_fn)
804       bad = bad or result
805
806       # node_volume
807       volumeinfo = all_volumeinfo[node]
808
809       if type(volumeinfo) != dict:
810         feedback_fn("  - ERROR: connection to %s failed" % (node,))
811         bad = True
812         continue
813
814       node_volume[node] = volumeinfo
815
816       # node_instance
817       nodeinstance = all_instanceinfo[node]
818       if type(nodeinstance) != list:
819         feedback_fn("  - ERROR: connection to %s failed" % (node,))
820         bad = True
821         continue
822
823       node_instance[node] = nodeinstance
824
825     node_vol_should = {}
826
827     for instance in instancelist:
828       feedback_fn("* Verifying instance %s" % instance)
829       result =  self._VerifyInstance(instance, node_volume, node_instance,
830                                      feedback_fn)
831       bad = bad or result
832
833       inst_config = self.cfg.GetInstanceInfo(instance)
834
835       inst_config.MapLVsByNode(node_vol_should)
836
837     feedback_fn("* Verifying orphan volumes")
838     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
839                                        feedback_fn)
840     bad = bad or result
841
842     feedback_fn("* Verifying remaining instances")
843     result = self._VerifyOrphanInstances(instancelist, node_instance,
844                                          feedback_fn)
845     bad = bad or result
846
847     return int(bad)
848
849
850 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
851   """Sleep and poll for an instance's disk to sync.
852
853   """
854   if not instance.disks:
855     return True
856
857   if not oneshot:
858     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
859
860   node = instance.primary_node
861
862   for dev in instance.disks:
863     cfgw.SetDiskID(dev, node)
864
865   retries = 0
866   while True:
867     max_time = 0
868     done = True
869     cumul_degraded = False
870     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
871     if not rstats:
872       logger.ToStderr("Can't get any data from node %s" % node)
873       retries += 1
874       if retries >= 10:
875         raise errors.RemoteError, ("Can't contact node %s for mirror data,"
876                                    " aborting." % node)
877       time.sleep(6)
878       continue
879     retries = 0
880     for i in range(len(rstats)):
881       mstat = rstats[i]
882       if mstat is None:
883         logger.ToStderr("Can't compute data for node %s/%s" %
884                         (node, instance.disks[i].iv_name))
885         continue
886       perc_done, est_time, is_degraded = mstat
887       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
888       if perc_done is not None:
889         done = False
890         if est_time is not None:
891           rem_time = "%d estimated seconds remaining" % est_time
892           max_time = est_time
893         else:
894           rem_time = "no time estimate"
895         logger.ToStdout("- device %s: %5.2f%% done, %s" %
896                         (instance.disks[i].iv_name, perc_done, rem_time))
897     if done or oneshot:
898       break
899
900     if unlock:
901       utils.Unlock('cmd')
902     try:
903       time.sleep(min(60, max_time))
904     finally:
905       if unlock:
906         utils.Lock('cmd')
907
908   if done:
909     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
910   return not cumul_degraded
911
912
913 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
914   """Check that mirrors are not degraded.
915
916   """
917   cfgw.SetDiskID(dev, node)
918
919   result = True
920   if on_primary or dev.AssembleOnSecondary():
921     rstats = rpc.call_blockdev_find(node, dev)
922     if not rstats:
923       logger.ToStderr("Can't get any data from node %s" % node)
924       result = False
925     else:
926       result = result and (not rstats[5])
927   if dev.children:
928     for child in dev.children:
929       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
930
931   return result
932
933
934 class LUDiagnoseOS(NoHooksLU):
935   """Logical unit for OS diagnose/query.
936
937   """
938   _OP_REQP = []
939
940   def CheckPrereq(self):
941     """Check prerequisites.
942
943     This always succeeds, since this is a pure query LU.
944
945     """
946     return
947
948   def Exec(self, feedback_fn):
949     """Compute the list of OSes.
950
951     """
952     node_list = self.cfg.GetNodeList()
953     node_data = rpc.call_os_diagnose(node_list)
954     if node_data == False:
955       raise errors.OpExecError, "Can't gather the list of OSes"
956     return node_data
957
958
959 class LURemoveNode(LogicalUnit):
960   """Logical unit for removing a node.
961
962   """
963   HPATH = "node-remove"
964   HTYPE = constants.HTYPE_NODE
965   _OP_REQP = ["node_name"]
966
967   def BuildHooksEnv(self):
968     """Build hooks env.
969
970     This doesn't run on the target node in the pre phase as a failed
971     node would not allows itself to run.
972
973     """
974     all_nodes = self.cfg.GetNodeList()
975     all_nodes.remove(self.op.node_name)
976     return {"NODE_NAME": self.op.node_name}, all_nodes, all_nodes
977
978   def CheckPrereq(self):
979     """Check prerequisites.
980
981     This checks:
982      - the node exists in the configuration
983      - it does not have primary or secondary instances
984      - it's not the master
985
986     Any errors are signalled by raising errors.OpPrereqError.
987
988     """
989     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
990     if node is None:
991       logger.Error("Error: Node '%s' is unknown." % self.op.node_name)
992       return 1
993
994     instance_list = self.cfg.GetInstanceList()
995
996     masternode = self.sstore.GetMasterNode()
997     if node.name == masternode:
998       raise errors.OpPrereqError, ("Node is the master node,"
999                                    " you need to failover first.")
1000
1001     for instance_name in instance_list:
1002       instance = self.cfg.GetInstanceInfo(instance_name)
1003       if node.name == instance.primary_node:
1004         raise errors.OpPrereqError, ("Instance %s still running on the node,"
1005                                      " please remove first." % instance_name)
1006       if node.name in instance.secondary_nodes:
1007         raise errors.OpPrereqError, ("Instance %s has node as a secondary,"
1008                                      " please remove first." % instance_name)
1009     self.op.node_name = node.name
1010     self.node = node
1011
1012   def Exec(self, feedback_fn):
1013     """Removes the node from the cluster.
1014
1015     """
1016     node = self.node
1017     logger.Info("stopping the node daemon and removing configs from node %s" %
1018                 node.name)
1019
1020     rpc.call_node_leave_cluster(node.name)
1021
1022     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1023
1024     logger.Info("Removing node %s from config" % node.name)
1025
1026     self.cfg.RemoveNode(node.name)
1027
1028
1029 class LUQueryNodes(NoHooksLU):
1030   """Logical unit for querying nodes.
1031
1032   """
1033   _OP_REQP = ["output_fields"]
1034
1035   def CheckPrereq(self):
1036     """Check prerequisites.
1037
1038     This checks that the fields required are valid output fields.
1039
1040     """
1041     self.dynamic_fields = frozenset(["dtotal", "dfree",
1042                                      "mtotal", "mnode", "mfree"])
1043
1044     _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1045                        dynamic=self.dynamic_fields,
1046                        selected=self.op.output_fields)
1047
1048
1049   def Exec(self, feedback_fn):
1050     """Computes the list of nodes and their attributes.
1051
1052     """
1053     nodenames = utils.NiceSort(self.cfg.GetNodeList())
1054     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1055
1056
1057     # begin data gathering
1058
1059     if self.dynamic_fields.intersection(self.op.output_fields):
1060       live_data = {}
1061       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1062       for name in nodenames:
1063         nodeinfo = node_data.get(name, None)
1064         if nodeinfo:
1065           live_data[name] = {
1066             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1067             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1068             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1069             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1070             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1071             }
1072         else:
1073           live_data[name] = {}
1074     else:
1075       live_data = dict.fromkeys(nodenames, {})
1076
1077     node_to_primary = dict.fromkeys(nodenames, 0)
1078     node_to_secondary = dict.fromkeys(nodenames, 0)
1079
1080     if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1081       instancelist = self.cfg.GetInstanceList()
1082
1083       for instance in instancelist:
1084         instanceinfo = self.cfg.GetInstanceInfo(instance)
1085         node_to_primary[instanceinfo.primary_node] += 1
1086         for secnode in instanceinfo.secondary_nodes:
1087           node_to_secondary[secnode] += 1
1088
1089     # end data gathering
1090
1091     output = []
1092     for node in nodelist:
1093       node_output = []
1094       for field in self.op.output_fields:
1095         if field == "name":
1096           val = node.name
1097         elif field == "pinst":
1098           val = node_to_primary[node.name]
1099         elif field == "sinst":
1100           val = node_to_secondary[node.name]
1101         elif field == "pip":
1102           val = node.primary_ip
1103         elif field == "sip":
1104           val = node.secondary_ip
1105         elif field in self.dynamic_fields:
1106           val = live_data[node.name].get(field, "?")
1107         else:
1108           raise errors.ParameterError, field
1109         val = str(val)
1110         node_output.append(val)
1111       output.append(node_output)
1112
1113     return output
1114
1115
1116 class LUQueryNodeVolumes(NoHooksLU):
1117   """Logical unit for getting volumes on node(s).
1118
1119   """
1120   _OP_REQP = ["nodes", "output_fields"]
1121
1122   def CheckPrereq(self):
1123     """Check prerequisites.
1124
1125     This checks that the fields required are valid output fields.
1126
1127     """
1128     self.nodes = _GetWantedNodes(self, self.op.nodes)
1129
1130     _CheckOutputFields(static=["node"],
1131                        dynamic=["phys", "vg", "name", "size", "instance"],
1132                        selected=self.op.output_fields)
1133
1134
1135   def Exec(self, feedback_fn):
1136     """Computes the list of nodes and their attributes.
1137
1138     """
1139     nodenames = utils.NiceSort([node.name for node in self.nodes])
1140     volumes = rpc.call_node_volumes(nodenames)
1141
1142     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1143              in self.cfg.GetInstanceList()]
1144
1145     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1146
1147     output = []
1148     for node in nodenames:
1149       node_vols = volumes[node][:]
1150       node_vols.sort(key=lambda vol: vol['dev'])
1151
1152       for vol in node_vols:
1153         node_output = []
1154         for field in self.op.output_fields:
1155           if field == "node":
1156             val = node
1157           elif field == "phys":
1158             val = vol['dev']
1159           elif field == "vg":
1160             val = vol['vg']
1161           elif field == "name":
1162             val = vol['name']
1163           elif field == "size":
1164             val = int(float(vol['size']))
1165           elif field == "instance":
1166             for inst in ilist:
1167               if node not in lv_by_node[inst]:
1168                 continue
1169               if vol['name'] in lv_by_node[inst][node]:
1170                 val = inst.name
1171                 break
1172             else:
1173               val = '-'
1174           else:
1175             raise errors.ParameterError, field
1176           node_output.append(str(val))
1177
1178         output.append(node_output)
1179
1180     return output
1181
1182
1183 class LUAddNode(LogicalUnit):
1184   """Logical unit for adding node to the cluster.
1185
1186   """
1187   HPATH = "node-add"
1188   HTYPE = constants.HTYPE_NODE
1189   _OP_REQP = ["node_name"]
1190
1191   def BuildHooksEnv(self):
1192     """Build hooks env.
1193
1194     This will run on all nodes before, and on all nodes + the new node after.
1195
1196     """
1197     env = {
1198       "NODE_NAME": self.op.node_name,
1199       "NODE_PIP": self.op.primary_ip,
1200       "NODE_SIP": self.op.secondary_ip,
1201       }
1202     nodes_0 = self.cfg.GetNodeList()
1203     nodes_1 = nodes_0 + [self.op.node_name, ]
1204     return env, nodes_0, nodes_1
1205
1206   def CheckPrereq(self):
1207     """Check prerequisites.
1208
1209     This checks:
1210      - the new node is not already in the config
1211      - it is resolvable
1212      - its parameters (single/dual homed) matches the cluster
1213
1214     Any errors are signalled by raising errors.OpPrereqError.
1215
1216     """
1217     node_name = self.op.node_name
1218     cfg = self.cfg
1219
1220     dns_data = utils.LookupHostname(node_name)
1221     if not dns_data:
1222       raise errors.OpPrereqError, ("Node %s is not resolvable" % node_name)
1223
1224     node = dns_data['hostname']
1225     primary_ip = self.op.primary_ip = dns_data['ip']
1226     secondary_ip = getattr(self.op, "secondary_ip", None)
1227     if secondary_ip is None:
1228       secondary_ip = primary_ip
1229     if not utils.IsValidIP(secondary_ip):
1230       raise errors.OpPrereqError, ("Invalid secondary IP given")
1231     self.op.secondary_ip = secondary_ip
1232     node_list = cfg.GetNodeList()
1233     if node in node_list:
1234       raise errors.OpPrereqError, ("Node %s is already in the configuration"
1235                                    % node)
1236
1237     for existing_node_name in node_list:
1238       existing_node = cfg.GetNodeInfo(existing_node_name)
1239       if (existing_node.primary_ip == primary_ip or
1240           existing_node.secondary_ip == primary_ip or
1241           existing_node.primary_ip == secondary_ip or
1242           existing_node.secondary_ip == secondary_ip):
1243         raise errors.OpPrereqError, ("New node ip address(es) conflict with"
1244                                      " existing node %s" % existing_node.name)
1245
1246     # check that the type of the node (single versus dual homed) is the
1247     # same as for the master
1248     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1249     master_singlehomed = myself.secondary_ip == myself.primary_ip
1250     newbie_singlehomed = secondary_ip == primary_ip
1251     if master_singlehomed != newbie_singlehomed:
1252       if master_singlehomed:
1253         raise errors.OpPrereqError, ("The master has no private ip but the"
1254                                      " new node has one")
1255       else:
1256         raise errors.OpPrereqError ("The master has a private ip but the"
1257                                     " new node doesn't have one")
1258
1259     # checks reachablity
1260     command = ["fping", "-q", primary_ip]
1261     result = utils.RunCmd(command)
1262     if result.failed:
1263       raise errors.OpPrereqError, ("Node not reachable by ping")
1264
1265     if not newbie_singlehomed:
1266       # check reachability from my secondary ip to newbie's secondary ip
1267       command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1268       result = utils.RunCmd(command)
1269       if result.failed:
1270         raise errors.OpPrereqError, ("Node secondary ip not reachable by ping")
1271
1272     self.new_node = objects.Node(name=node,
1273                                  primary_ip=primary_ip,
1274                                  secondary_ip=secondary_ip)
1275
1276   def Exec(self, feedback_fn):
1277     """Adds the new node to the cluster.
1278
1279     """
1280     new_node = self.new_node
1281     node = new_node.name
1282
1283     # set up inter-node password and certificate and restarts the node daemon
1284     gntpass = self.sstore.GetNodeDaemonPassword()
1285     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1286       raise errors.OpExecError, ("ganeti password corruption detected")
1287     f = open(constants.SSL_CERT_FILE)
1288     try:
1289       gntpem = f.read(8192)
1290     finally:
1291       f.close()
1292     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1293     # so we use this to detect an invalid certificate; as long as the
1294     # cert doesn't contain this, the here-document will be correctly
1295     # parsed by the shell sequence below
1296     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1297       raise errors.OpExecError, ("invalid PEM encoding in the SSL certificate")
1298     if not gntpem.endswith("\n"):
1299       raise errors.OpExecError, ("PEM must end with newline")
1300     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1301
1302     # remove first the root's known_hosts file
1303     utils.RemoveFile("/root/.ssh/known_hosts")
1304     # and then connect with ssh to set password and start ganeti-noded
1305     # note that all the below variables are sanitized at this point,
1306     # either by being constants or by the checks above
1307     ss = self.sstore
1308     mycommand = ("umask 077 && "
1309                  "echo '%s' > '%s' && "
1310                  "cat > '%s' << '!EOF.' && \n"
1311                  "%s!EOF.\n%s restart" %
1312                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1313                   constants.SSL_CERT_FILE, gntpem,
1314                   constants.NODE_INITD_SCRIPT))
1315
1316     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1317     if result.failed:
1318       raise errors.OpExecError, ("Remote command on node %s, error: %s,"
1319                                  " output: %s" %
1320                                  (node, result.fail_reason, result.output))
1321
1322     # check connectivity
1323     time.sleep(4)
1324
1325     result = rpc.call_version([node])[node]
1326     if result:
1327       if constants.PROTOCOL_VERSION == result:
1328         logger.Info("communication to node %s fine, sw version %s match" %
1329                     (node, result))
1330       else:
1331         raise errors.OpExecError, ("Version mismatch master version %s,"
1332                                    " node version %s" %
1333                                    (constants.PROTOCOL_VERSION, result))
1334     else:
1335       raise errors.OpExecError, ("Cannot get version from the new node")
1336
1337     # setup ssh on node
1338     logger.Info("copy ssh key to node %s" % node)
1339     keyarray = []
1340     keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1341                 "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1342                 "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1343
1344     for i in keyfiles:
1345       f = open(i, 'r')
1346       try:
1347         keyarray.append(f.read())
1348       finally:
1349         f.close()
1350
1351     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1352                                keyarray[3], keyarray[4], keyarray[5])
1353
1354     if not result:
1355       raise errors.OpExecError, ("Cannot transfer ssh keys to the new node")
1356
1357     # Add node to our /etc/hosts, and add key to known_hosts
1358     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1359     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1360                       self.cfg.GetHostKey())
1361
1362     if new_node.secondary_ip != new_node.primary_ip:
1363       result = ssh.SSHCall(node, "root",
1364                            "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1365       if result.failed:
1366         raise errors.OpExecError, ("Node claims it doesn't have the"
1367                                    " secondary ip you gave (%s).\n"
1368                                    "Please fix and re-run this command." %
1369                                    new_node.secondary_ip)
1370
1371     # Distribute updated /etc/hosts and known_hosts to all nodes,
1372     # including the node just added
1373     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1374     dist_nodes = self.cfg.GetNodeList() + [node]
1375     if myself.name in dist_nodes:
1376       dist_nodes.remove(myself.name)
1377
1378     logger.Debug("Copying hosts and known_hosts to all nodes")
1379     for fname in ("/etc/hosts", "/etc/ssh/ssh_known_hosts"):
1380       result = rpc.call_upload_file(dist_nodes, fname)
1381       for to_node in dist_nodes:
1382         if not result[to_node]:
1383           logger.Error("copy of file %s to node %s failed" %
1384                        (fname, to_node))
1385
1386     to_copy = ss.GetFileList()
1387     for fname in to_copy:
1388       if not ssh.CopyFileToNode(node, fname):
1389         logger.Error("could not copy file %s to node %s" % (fname, node))
1390
1391     logger.Info("adding node %s to cluster.conf" % node)
1392     self.cfg.AddNode(new_node)
1393
1394
1395 class LUMasterFailover(LogicalUnit):
1396   """Failover the master node to the current node.
1397
1398   This is a special LU in that it must run on a non-master node.
1399
1400   """
1401   HPATH = "master-failover"
1402   HTYPE = constants.HTYPE_CLUSTER
1403   REQ_MASTER = False
1404   _OP_REQP = []
1405
1406   def BuildHooksEnv(self):
1407     """Build hooks env.
1408
1409     This will run on the new master only in the pre phase, and on all
1410     the nodes in the post phase.
1411
1412     """
1413     env = {
1414       "NEW_MASTER": self.new_master,
1415       "OLD_MASTER": self.old_master,
1416       }
1417     return env, [self.new_master], self.cfg.GetNodeList()
1418
1419   def CheckPrereq(self):
1420     """Check prerequisites.
1421
1422     This checks that we are not already the master.
1423
1424     """
1425     self.new_master = socket.gethostname()
1426
1427     self.old_master = self.sstore.GetMasterNode()
1428
1429     if self.old_master == self.new_master:
1430       raise errors.OpPrereqError, ("This commands must be run on the node"
1431                                    " where you want the new master to be.\n"
1432                                    "%s is already the master" %
1433                                    self.old_master)
1434
1435   def Exec(self, feedback_fn):
1436     """Failover the master node.
1437
1438     This command, when run on a non-master node, will cause the current
1439     master to cease being master, and the non-master to become new
1440     master.
1441
1442     """
1443     #TODO: do not rely on gethostname returning the FQDN
1444     logger.Info("setting master to %s, old master: %s" %
1445                 (self.new_master, self.old_master))
1446
1447     if not rpc.call_node_stop_master(self.old_master):
1448       logger.Error("could disable the master role on the old master"
1449                    " %s, please disable manually" % self.old_master)
1450
1451     ss = self.sstore
1452     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1453     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1454                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1455       logger.Error("could not distribute the new simple store master file"
1456                    " to the other nodes, please check.")
1457
1458     if not rpc.call_node_start_master(self.new_master):
1459       logger.Error("could not start the master role on the new master"
1460                    " %s, please check" % self.new_master)
1461       feedback_fn("Error in activating the master IP on the new master,\n"
1462                   "please fix manually.")
1463
1464
1465
1466 class LUQueryClusterInfo(NoHooksLU):
1467   """Query cluster configuration.
1468
1469   """
1470   _OP_REQP = []
1471   REQ_MASTER = False
1472
1473   def CheckPrereq(self):
1474     """No prerequsites needed for this LU.
1475
1476     """
1477     pass
1478
1479   def Exec(self, feedback_fn):
1480     """Return cluster config.
1481
1482     """
1483     result = {
1484       "name": self.sstore.GetClusterName(),
1485       "software_version": constants.RELEASE_VERSION,
1486       "protocol_version": constants.PROTOCOL_VERSION,
1487       "config_version": constants.CONFIG_VERSION,
1488       "os_api_version": constants.OS_API_VERSION,
1489       "export_version": constants.EXPORT_VERSION,
1490       "master": self.sstore.GetMasterNode(),
1491       "architecture": (platform.architecture()[0], platform.machine()),
1492       }
1493
1494     return result
1495
1496
1497 class LUClusterCopyFile(NoHooksLU):
1498   """Copy file to cluster.
1499
1500   """
1501   _OP_REQP = ["nodes", "filename"]
1502
1503   def CheckPrereq(self):
1504     """Check prerequisites.
1505
1506     It should check that the named file exists and that the given list
1507     of nodes is valid.
1508
1509     """
1510     if not os.path.exists(self.op.filename):
1511       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1512
1513     self.nodes = _GetWantedNodes(self, self.op.nodes)
1514
1515   def Exec(self, feedback_fn):
1516     """Copy a file from master to some nodes.
1517
1518     Args:
1519       opts - class with options as members
1520       args - list containing a single element, the file name
1521     Opts used:
1522       nodes - list containing the name of target nodes; if empty, all nodes
1523
1524     """
1525     filename = self.op.filename
1526
1527     myname = socket.gethostname()
1528
1529     for node in self.nodes:
1530       if node == myname:
1531         continue
1532       if not ssh.CopyFileToNode(node, filename):
1533         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1534
1535
1536 class LUDumpClusterConfig(NoHooksLU):
1537   """Return a text-representation of the cluster-config.
1538
1539   """
1540   _OP_REQP = []
1541
1542   def CheckPrereq(self):
1543     """No prerequisites.
1544
1545     """
1546     pass
1547
1548   def Exec(self, feedback_fn):
1549     """Dump a representation of the cluster config to the standard output.
1550
1551     """
1552     return self.cfg.DumpConfig()
1553
1554
1555 class LURunClusterCommand(NoHooksLU):
1556   """Run a command on some nodes.
1557
1558   """
1559   _OP_REQP = ["command", "nodes"]
1560
1561   def CheckPrereq(self):
1562     """Check prerequisites.
1563
1564     It checks that the given list of nodes is valid.
1565
1566     """
1567     self.nodes = _GetWantedNodes(self, self.op.nodes)
1568
1569   def Exec(self, feedback_fn):
1570     """Run a command on some nodes.
1571
1572     """
1573     data = []
1574     for node in self.nodes:
1575       result = utils.RunCmd(["ssh", node.name, self.op.command])
1576       data.append((node.name, result.cmd, result.output, result.exit_code))
1577
1578     return data
1579
1580
1581 class LUActivateInstanceDisks(NoHooksLU):
1582   """Bring up an instance's disks.
1583
1584   """
1585   _OP_REQP = ["instance_name"]
1586
1587   def CheckPrereq(self):
1588     """Check prerequisites.
1589
1590     This checks that the instance is in the cluster.
1591
1592     """
1593     instance = self.cfg.GetInstanceInfo(
1594       self.cfg.ExpandInstanceName(self.op.instance_name))
1595     if instance is None:
1596       raise errors.OpPrereqError, ("Instance '%s' not known" %
1597                                    self.op.instance_name)
1598     self.instance = instance
1599
1600
1601   def Exec(self, feedback_fn):
1602     """Activate the disks.
1603
1604     """
1605     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1606     if not disks_ok:
1607       raise errors.OpExecError, ("Cannot activate block devices")
1608
1609     return disks_info
1610
1611
1612 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1613   """Prepare the block devices for an instance.
1614
1615   This sets up the block devices on all nodes.
1616
1617   Args:
1618     instance: a ganeti.objects.Instance object
1619     ignore_secondaries: if true, errors on secondary nodes won't result
1620                         in an error return from the function
1621
1622   Returns:
1623     false if the operation failed
1624     list of (host, instance_visible_name, node_visible_name) if the operation
1625          suceeded with the mapping from node devices to instance devices
1626   """
1627   device_info = []
1628   disks_ok = True
1629   for inst_disk in instance.disks:
1630     master_result = None
1631     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1632       cfg.SetDiskID(node_disk, node)
1633       is_primary = node == instance.primary_node
1634       result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1635       if not result:
1636         logger.Error("could not prepare block device %s on node %s (is_pri"
1637                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1638         if is_primary or not ignore_secondaries:
1639           disks_ok = False
1640       if is_primary:
1641         master_result = result
1642     device_info.append((instance.primary_node, inst_disk.iv_name,
1643                         master_result))
1644
1645   return disks_ok, device_info
1646
1647
1648 def _StartInstanceDisks(cfg, instance, force):
1649   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1650                                            ignore_secondaries=force)
1651   if not disks_ok:
1652     _ShutdownInstanceDisks(instance, cfg)
1653     if force is not None and not force:
1654       logger.Error("If the message above refers to a secondary node,"
1655                    " you can retry the operation using '--force'.")
1656     raise errors.OpExecError, ("Disk consistency error")
1657
1658
1659 class LUDeactivateInstanceDisks(NoHooksLU):
1660   """Shutdown an instance's disks.
1661
1662   """
1663   _OP_REQP = ["instance_name"]
1664
1665   def CheckPrereq(self):
1666     """Check prerequisites.
1667
1668     This checks that the instance is in the cluster.
1669
1670     """
1671     instance = self.cfg.GetInstanceInfo(
1672       self.cfg.ExpandInstanceName(self.op.instance_name))
1673     if instance is None:
1674       raise errors.OpPrereqError, ("Instance '%s' not known" %
1675                                    self.op.instance_name)
1676     self.instance = instance
1677
1678   def Exec(self, feedback_fn):
1679     """Deactivate the disks
1680
1681     """
1682     instance = self.instance
1683     ins_l = rpc.call_instance_list([instance.primary_node])
1684     ins_l = ins_l[instance.primary_node]
1685     if not type(ins_l) is list:
1686       raise errors.OpExecError, ("Can't contact node '%s'" %
1687                                  instance.primary_node)
1688
1689     if self.instance.name in ins_l:
1690       raise errors.OpExecError, ("Instance is running, can't shutdown"
1691                                  " block devices.")
1692
1693     _ShutdownInstanceDisks(instance, self.cfg)
1694
1695
1696 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1697   """Shutdown block devices of an instance.
1698
1699   This does the shutdown on all nodes of the instance.
1700
1701   If the ignore_primary is false, errors on the primary node are
1702   ignored.
1703
1704   """
1705   result = True
1706   for disk in instance.disks:
1707     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1708       cfg.SetDiskID(top_disk, node)
1709       if not rpc.call_blockdev_shutdown(node, top_disk):
1710         logger.Error("could not shutdown block device %s on node %s" %
1711                      (disk.iv_name, node))
1712         if not ignore_primary or node != instance.primary_node:
1713           result = False
1714   return result
1715
1716
1717 class LUStartupInstance(LogicalUnit):
1718   """Starts an instance.
1719
1720   """
1721   HPATH = "instance-start"
1722   HTYPE = constants.HTYPE_INSTANCE
1723   _OP_REQP = ["instance_name", "force"]
1724
1725   def BuildHooksEnv(self):
1726     """Build hooks env.
1727
1728     This runs on master, primary and secondary nodes of the instance.
1729
1730     """
1731     env = {
1732       "INSTANCE_NAME": self.op.instance_name,
1733       "INSTANCE_PRIMARY": self.instance.primary_node,
1734       "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1735       "FORCE": self.op.force,
1736       }
1737     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1738           list(self.instance.secondary_nodes))
1739     return env, nl, nl
1740
1741   def CheckPrereq(self):
1742     """Check prerequisites.
1743
1744     This checks that the instance is in the cluster.
1745
1746     """
1747     instance = self.cfg.GetInstanceInfo(
1748       self.cfg.ExpandInstanceName(self.op.instance_name))
1749     if instance is None:
1750       raise errors.OpPrereqError, ("Instance '%s' not known" %
1751                                    self.op.instance_name)
1752
1753     # check bridges existance
1754     brlist = [nic.bridge for nic in instance.nics]
1755     if not rpc.call_bridges_exist(instance.primary_node, brlist):
1756       raise errors.OpPrereqError, ("one or more target bridges %s does not"
1757                                    " exist on destination node '%s'" %
1758                                    (brlist, instance.primary_node))
1759
1760     self.instance = instance
1761     self.op.instance_name = instance.name
1762
1763   def Exec(self, feedback_fn):
1764     """Start the instance.
1765
1766     """
1767     instance = self.instance
1768     force = self.op.force
1769     extra_args = getattr(self.op, "extra_args", "")
1770
1771     node_current = instance.primary_node
1772
1773     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1774     if not nodeinfo:
1775       raise errors.OpExecError, ("Could not contact node %s for infos" %
1776                                  (node_current))
1777
1778     freememory = nodeinfo[node_current]['memory_free']
1779     memory = instance.memory
1780     if memory > freememory:
1781       raise errors.OpExecError, ("Not enough memory to start instance"
1782                                  " %s on node %s"
1783                                  " needed %s MiB, available %s MiB" %
1784                                  (instance.name, node_current, memory,
1785                                   freememory))
1786
1787     _StartInstanceDisks(self.cfg, instance, force)
1788
1789     if not rpc.call_instance_start(node_current, instance, extra_args):
1790       _ShutdownInstanceDisks(instance, self.cfg)
1791       raise errors.OpExecError, ("Could not start instance")
1792
1793     self.cfg.MarkInstanceUp(instance.name)
1794
1795
1796 class LUShutdownInstance(LogicalUnit):
1797   """Shutdown an instance.
1798
1799   """
1800   HPATH = "instance-stop"
1801   HTYPE = constants.HTYPE_INSTANCE
1802   _OP_REQP = ["instance_name"]
1803
1804   def BuildHooksEnv(self):
1805     """Build hooks env.
1806
1807     This runs on master, primary and secondary nodes of the instance.
1808
1809     """
1810     env = {
1811       "INSTANCE_NAME": self.op.instance_name,
1812       "INSTANCE_PRIMARY": self.instance.primary_node,
1813       "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1814       }
1815     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1816           list(self.instance.secondary_nodes))
1817     return env, nl, nl
1818
1819   def CheckPrereq(self):
1820     """Check prerequisites.
1821
1822     This checks that the instance is in the cluster.
1823
1824     """
1825     instance = self.cfg.GetInstanceInfo(
1826       self.cfg.ExpandInstanceName(self.op.instance_name))
1827     if instance is None:
1828       raise errors.OpPrereqError, ("Instance '%s' not known" %
1829                                    self.op.instance_name)
1830     self.instance = instance
1831
1832   def Exec(self, feedback_fn):
1833     """Shutdown the instance.
1834
1835     """
1836     instance = self.instance
1837     node_current = instance.primary_node
1838     if not rpc.call_instance_shutdown(node_current, instance):
1839       logger.Error("could not shutdown instance")
1840
1841     self.cfg.MarkInstanceDown(instance.name)
1842     _ShutdownInstanceDisks(instance, self.cfg)
1843
1844
1845 class LUReinstallInstance(LogicalUnit):
1846   """Reinstall an instance.
1847
1848   """
1849   HPATH = "instance-reinstall"
1850   HTYPE = constants.HTYPE_INSTANCE
1851   _OP_REQP = ["instance_name"]
1852
1853   def BuildHooksEnv(self):
1854     """Build hooks env.
1855
1856     This runs on master, primary and secondary nodes of the instance.
1857
1858     """
1859     env = {
1860       "INSTANCE_NAME": self.op.instance_name,
1861       "INSTANCE_PRIMARY": self.instance.primary_node,
1862       "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1863       }
1864     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1865           list(self.instance.secondary_nodes))
1866     return env, nl, nl
1867
1868   def CheckPrereq(self):
1869     """Check prerequisites.
1870
1871     This checks that the instance is in the cluster and is not running.
1872
1873     """
1874     instance = self.cfg.GetInstanceInfo(
1875       self.cfg.ExpandInstanceName(self.op.instance_name))
1876     if instance is None:
1877       raise errors.OpPrereqError, ("Instance '%s' not known" %
1878                                    self.op.instance_name)
1879     if instance.disk_template == constants.DT_DISKLESS:
1880       raise errors.OpPrereqError, ("Instance '%s' has no disks" %
1881                                    self.op.instance_name)
1882     if instance.status != "down":
1883       raise errors.OpPrereqError, ("Instance '%s' is marked to be up" %
1884                                    self.op.instance_name)
1885     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1886     if remote_info:
1887       raise errors.OpPrereqError, ("Instance '%s' is running on the node %s" %
1888                                    (self.op.instance_name,
1889                                     instance.primary_node))
1890     self.instance = instance
1891
1892   def Exec(self, feedback_fn):
1893     """Reinstall the instance.
1894
1895     """
1896     inst = self.instance
1897
1898     _StartInstanceDisks(self.cfg, inst, None)
1899     try:
1900       feedback_fn("Running the instance OS create scripts...")
1901       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
1902         raise errors.OpExecError, ("Could not install OS for instance %s "
1903                                    "on node %s" %
1904                                    (inst.name, inst.primary_node))
1905     finally:
1906       _ShutdownInstanceDisks(inst, self.cfg)
1907
1908
1909 class LURemoveInstance(LogicalUnit):
1910   """Remove an instance.
1911
1912   """
1913   HPATH = "instance-remove"
1914   HTYPE = constants.HTYPE_INSTANCE
1915   _OP_REQP = ["instance_name"]
1916
1917   def BuildHooksEnv(self):
1918     """Build hooks env.
1919
1920     This runs on master, primary and secondary nodes of the instance.
1921
1922     """
1923     env = {
1924       "INSTANCE_NAME": self.op.instance_name,
1925       "INSTANCE_PRIMARY": self.instance.primary_node,
1926       "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1927       }
1928     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1929           list(self.instance.secondary_nodes))
1930     return env, nl, nl
1931
1932   def CheckPrereq(self):
1933     """Check prerequisites.
1934
1935     This checks that the instance is in the cluster.
1936
1937     """
1938     instance = self.cfg.GetInstanceInfo(
1939       self.cfg.ExpandInstanceName(self.op.instance_name))
1940     if instance is None:
1941       raise errors.OpPrereqError, ("Instance '%s' not known" %
1942                                    self.op.instance_name)
1943     self.instance = instance
1944
1945   def Exec(self, feedback_fn):
1946     """Remove the instance.
1947
1948     """
1949     instance = self.instance
1950     logger.Info("shutting down instance %s on node %s" %
1951                 (instance.name, instance.primary_node))
1952
1953     if not rpc.call_instance_shutdown(instance.primary_node, instance):
1954       raise errors.OpExecError, ("Could not shutdown instance %s on node %s" %
1955                                  (instance.name, instance.primary_node))
1956
1957     logger.Info("removing block devices for instance %s" % instance.name)
1958
1959     _RemoveDisks(instance, self.cfg)
1960
1961     logger.Info("removing instance %s out of cluster config" % instance.name)
1962
1963     self.cfg.RemoveInstance(instance.name)
1964
1965
1966 class LUQueryInstances(NoHooksLU):
1967   """Logical unit for querying instances.
1968
1969   """
1970   _OP_REQP = ["output_fields"]
1971
1972   def CheckPrereq(self):
1973     """Check prerequisites.
1974
1975     This checks that the fields required are valid output fields.
1976
1977     """
1978     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
1979     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
1980                                "admin_state", "admin_ram",
1981                                "disk_template", "ip", "mac", "bridge"],
1982                        dynamic=self.dynamic_fields,
1983                        selected=self.op.output_fields)
1984
1985   def Exec(self, feedback_fn):
1986     """Computes the list of nodes and their attributes.
1987
1988     """
1989     instance_names = utils.NiceSort(self.cfg.GetInstanceList())
1990     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
1991                      in instance_names]
1992
1993     # begin data gathering
1994
1995     nodes = frozenset([inst.primary_node for inst in instance_list])
1996
1997     bad_nodes = []
1998     if self.dynamic_fields.intersection(self.op.output_fields):
1999       live_data = {}
2000       node_data = rpc.call_all_instances_info(nodes)
2001       for name in nodes:
2002         result = node_data[name]
2003         if result:
2004           live_data.update(result)
2005         elif result == False:
2006           bad_nodes.append(name)
2007         # else no instance is alive
2008     else:
2009       live_data = dict([(name, {}) for name in instance_names])
2010
2011     # end data gathering
2012
2013     output = []
2014     for instance in instance_list:
2015       iout = []
2016       for field in self.op.output_fields:
2017         if field == "name":
2018           val = instance.name
2019         elif field == "os":
2020           val = instance.os
2021         elif field == "pnode":
2022           val = instance.primary_node
2023         elif field == "snodes":
2024           val = ",".join(instance.secondary_nodes) or "-"
2025         elif field == "admin_state":
2026           if instance.status == "down":
2027             val = "no"
2028           else:
2029             val = "yes"
2030         elif field == "oper_state":
2031           if instance.primary_node in bad_nodes:
2032             val = "(node down)"
2033           else:
2034             if live_data.get(instance.name):
2035               val = "running"
2036             else:
2037               val = "stopped"
2038         elif field == "admin_ram":
2039           val = instance.memory
2040         elif field == "oper_ram":
2041           if instance.primary_node in bad_nodes:
2042             val = "(node down)"
2043           elif instance.name in live_data:
2044             val = live_data[instance.name].get("memory", "?")
2045           else:
2046             val = "-"
2047         elif field == "disk_template":
2048           val = instance.disk_template
2049         elif field == "ip":
2050           val = instance.nics[0].ip
2051         elif field == "bridge":
2052           val = instance.nics[0].bridge
2053         elif field == "mac":
2054           val = instance.nics[0].mac
2055         else:
2056           raise errors.ParameterError, field
2057         val = str(val)
2058         iout.append(val)
2059       output.append(iout)
2060
2061     return output
2062
2063
2064 class LUFailoverInstance(LogicalUnit):
2065   """Failover an instance.
2066
2067   """
2068   HPATH = "instance-failover"
2069   HTYPE = constants.HTYPE_INSTANCE
2070   _OP_REQP = ["instance_name", "ignore_consistency"]
2071
2072   def BuildHooksEnv(self):
2073     """Build hooks env.
2074
2075     This runs on master, primary and secondary nodes of the instance.
2076
2077     """
2078     env = {
2079       "INSTANCE_NAME": self.op.instance_name,
2080       "INSTANCE_PRIMARY": self.instance.primary_node,
2081       "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
2082       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2083       }
2084     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2085     return env, nl, nl
2086
2087   def CheckPrereq(self):
2088     """Check prerequisites.
2089
2090     This checks that the instance is in the cluster.
2091
2092     """
2093     instance = self.cfg.GetInstanceInfo(
2094       self.cfg.ExpandInstanceName(self.op.instance_name))
2095     if instance is None:
2096       raise errors.OpPrereqError, ("Instance '%s' not known" %
2097                                    self.op.instance_name)
2098
2099     # check memory requirements on the secondary node
2100     target_node = instance.secondary_nodes[0]
2101     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2102     info = nodeinfo.get(target_node, None)
2103     if not info:
2104       raise errors.OpPrereqError, ("Cannot get current information"
2105                                    " from node '%s'" % nodeinfo)
2106     if instance.memory > info['memory_free']:
2107       raise errors.OpPrereqError, ("Not enough memory on target node %s."
2108                                    " %d MB available, %d MB required" %
2109                                    (target_node, info['memory_free'],
2110                                     instance.memory))
2111
2112     # check bridge existance
2113     brlist = [nic.bridge for nic in instance.nics]
2114     if not rpc.call_bridges_exist(instance.primary_node, brlist):
2115       raise errors.OpPrereqError, ("one or more target bridges %s does not"
2116                                    " exist on destination node '%s'" %
2117                                    (brlist, instance.primary_node))
2118
2119     self.instance = instance
2120
2121   def Exec(self, feedback_fn):
2122     """Failover an instance.
2123
2124     The failover is done by shutting it down on its present node and
2125     starting it on the secondary.
2126
2127     """
2128     instance = self.instance
2129
2130     source_node = instance.primary_node
2131     target_node = instance.secondary_nodes[0]
2132
2133     feedback_fn("* checking disk consistency between source and target")
2134     for dev in instance.disks:
2135       # for remote_raid1, these are md over drbd
2136       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2137         if not self.op.ignore_consistency:
2138           raise errors.OpExecError, ("Disk %s is degraded on target node,"
2139                                      " aborting failover." % dev.iv_name)
2140
2141     feedback_fn("* checking target node resource availability")
2142     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2143
2144     if not nodeinfo:
2145       raise errors.OpExecError, ("Could not contact target node %s." %
2146                                  target_node)
2147
2148     free_memory = int(nodeinfo[target_node]['memory_free'])
2149     memory = instance.memory
2150     if memory > free_memory:
2151       raise errors.OpExecError, ("Not enough memory to create instance %s on"
2152                                  " node %s. needed %s MiB, available %s MiB" %
2153                                  (instance.name, target_node, memory,
2154                                   free_memory))
2155
2156     feedback_fn("* shutting down instance on source node")
2157     logger.Info("Shutting down instance %s on node %s" %
2158                 (instance.name, source_node))
2159
2160     if not rpc.call_instance_shutdown(source_node, instance):
2161       logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2162                    " anyway. Please make sure node %s is down"  %
2163                    (instance.name, source_node, source_node))
2164
2165     feedback_fn("* deactivating the instance's disks on source node")
2166     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2167       raise errors.OpExecError, ("Can't shut down the instance's disks.")
2168
2169     instance.primary_node = target_node
2170     # distribute new instance config to the other nodes
2171     self.cfg.AddInstance(instance)
2172
2173     feedback_fn("* activating the instance's disks on target node")
2174     logger.Info("Starting instance %s on node %s" %
2175                 (instance.name, target_node))
2176
2177     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2178                                              ignore_secondaries=True)
2179     if not disks_ok:
2180       _ShutdownInstanceDisks(instance, self.cfg)
2181       raise errors.OpExecError, ("Can't activate the instance's disks")
2182
2183     feedback_fn("* starting the instance on the target node")
2184     if not rpc.call_instance_start(target_node, instance, None):
2185       _ShutdownInstanceDisks(instance, self.cfg)
2186       raise errors.OpExecError("Could not start instance %s on node %s." %
2187                                (instance.name, target_node))
2188
2189
2190 def _CreateBlockDevOnPrimary(cfg, node, device):
2191   """Create a tree of block devices on the primary node.
2192
2193   This always creates all devices.
2194
2195   """
2196   if device.children:
2197     for child in device.children:
2198       if not _CreateBlockDevOnPrimary(cfg, node, child):
2199         return False
2200
2201   cfg.SetDiskID(device, node)
2202   new_id = rpc.call_blockdev_create(node, device, device.size, True)
2203   if not new_id:
2204     return False
2205   if device.physical_id is None:
2206     device.physical_id = new_id
2207   return True
2208
2209
2210 def _CreateBlockDevOnSecondary(cfg, node, device, force):
2211   """Create a tree of block devices on a secondary node.
2212
2213   If this device type has to be created on secondaries, create it and
2214   all its children.
2215
2216   If not, just recurse to children keeping the same 'force' value.
2217
2218   """
2219   if device.CreateOnSecondary():
2220     force = True
2221   if device.children:
2222     for child in device.children:
2223       if not _CreateBlockDevOnSecondary(cfg, node, child, force):
2224         return False
2225
2226   if not force:
2227     return True
2228   cfg.SetDiskID(device, node)
2229   new_id = rpc.call_blockdev_create(node, device, device.size, False)
2230   if not new_id:
2231     return False
2232   if device.physical_id is None:
2233     device.physical_id = new_id
2234   return True
2235
2236
2237 def _GenerateMDDRBDBranch(cfg, vgname, primary, secondary, size, base):
2238   """Generate a drbd device complete with its children.
2239
2240   """
2241   port = cfg.AllocatePort()
2242   base = "%s_%s" % (base, port)
2243   dev_data = objects.Disk(dev_type="lvm", size=size,
2244                           logical_id=(vgname, "%s.data" % base))
2245   dev_meta = objects.Disk(dev_type="lvm", size=128,
2246                           logical_id=(vgname, "%s.meta" % base))
2247   drbd_dev = objects.Disk(dev_type="drbd", size=size,
2248                           logical_id = (primary, secondary, port),
2249                           children = [dev_data, dev_meta])
2250   return drbd_dev
2251
2252
2253 def _GenerateDiskTemplate(cfg, vgname, template_name,
2254                           instance_name, primary_node,
2255                           secondary_nodes, disk_sz, swap_sz):
2256   """Generate the entire disk layout for a given template type.
2257
2258   """
2259   #TODO: compute space requirements
2260
2261   if template_name == "diskless":
2262     disks = []
2263   elif template_name == "plain":
2264     if len(secondary_nodes) != 0:
2265       raise errors.ProgrammerError("Wrong template configuration")
2266     sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2267                            logical_id=(vgname, "%s.os" % instance_name),
2268                            iv_name = "sda")
2269     sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2270                            logical_id=(vgname, "%s.swap" % instance_name),
2271                            iv_name = "sdb")
2272     disks = [sda_dev, sdb_dev]
2273   elif template_name == "local_raid1":
2274     if len(secondary_nodes) != 0:
2275       raise errors.ProgrammerError("Wrong template configuration")
2276     sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2277                               logical_id=(vgname, "%s.os_m1" % instance_name))
2278     sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2279                               logical_id=(vgname, "%s.os_m2" % instance_name))
2280     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2281                               size=disk_sz,
2282                               children = [sda_dev_m1, sda_dev_m2])
2283     sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2284                               logical_id=(vgname, "%s.swap_m1" %
2285                                           instance_name))
2286     sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2287                               logical_id=(vgname, "%s.swap_m2" %
2288                                           instance_name))
2289     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2290                               size=swap_sz,
2291                               children = [sdb_dev_m1, sdb_dev_m2])
2292     disks = [md_sda_dev, md_sdb_dev]
2293   elif template_name == "remote_raid1":
2294     if len(secondary_nodes) != 1:
2295       raise errors.ProgrammerError("Wrong template configuration")
2296     remote_node = secondary_nodes[0]
2297     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, vgname,
2298                                          primary_node, remote_node, disk_sz,
2299                                          "%s-sda" % instance_name)
2300     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2301                               children = [drbd_sda_dev], size=disk_sz)
2302     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, vgname,
2303                                          primary_node, remote_node, swap_sz,
2304                                          "%s-sdb" % instance_name)
2305     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2306                               children = [drbd_sdb_dev], size=swap_sz)
2307     disks = [md_sda_dev, md_sdb_dev]
2308   else:
2309     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2310   return disks
2311
2312
2313 def _CreateDisks(cfg, instance):
2314   """Create all disks for an instance.
2315
2316   This abstracts away some work from AddInstance.
2317
2318   Args:
2319     instance: the instance object
2320
2321   Returns:
2322     True or False showing the success of the creation process
2323
2324   """
2325   for device in instance.disks:
2326     logger.Info("creating volume %s for instance %s" %
2327               (device.iv_name, instance.name))
2328     #HARDCODE
2329     for secondary_node in instance.secondary_nodes:
2330       if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False):
2331         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2332                      (device.iv_name, device, secondary_node))
2333         return False
2334     #HARDCODE
2335     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device):
2336       logger.Error("failed to create volume %s on primary!" %
2337                    device.iv_name)
2338       return False
2339   return True
2340
2341
2342 def _RemoveDisks(instance, cfg):
2343   """Remove all disks for an instance.
2344
2345   This abstracts away some work from `AddInstance()` and
2346   `RemoveInstance()`. Note that in case some of the devices couldn't
2347   be remove, the removal will continue with the other ones (compare
2348   with `_CreateDisks()`).
2349
2350   Args:
2351     instance: the instance object
2352
2353   Returns:
2354     True or False showing the success of the removal proces
2355
2356   """
2357   logger.Info("removing block devices for instance %s" % instance.name)
2358
2359   result = True
2360   for device in instance.disks:
2361     for node, disk in device.ComputeNodeTree(instance.primary_node):
2362       cfg.SetDiskID(disk, node)
2363       if not rpc.call_blockdev_remove(node, disk):
2364         logger.Error("could not remove block device %s on node %s,"
2365                      " continuing anyway" %
2366                      (device.iv_name, node))
2367         result = False
2368   return result
2369
2370
2371 class LUCreateInstance(LogicalUnit):
2372   """Create an instance.
2373
2374   """
2375   HPATH = "instance-add"
2376   HTYPE = constants.HTYPE_INSTANCE
2377   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2378               "disk_template", "swap_size", "mode", "start", "vcpus",
2379               "wait_for_sync"]
2380
2381   def BuildHooksEnv(self):
2382     """Build hooks env.
2383
2384     This runs on master, primary and secondary nodes of the instance.
2385
2386     """
2387     env = {
2388       "INSTANCE_NAME": self.op.instance_name,
2389       "INSTANCE_PRIMARY": self.op.pnode,
2390       "INSTANCE_SECONDARIES": " ".join(self.secondaries),
2391       "DISK_TEMPLATE": self.op.disk_template,
2392       "MEM_SIZE": self.op.mem_size,
2393       "DISK_SIZE": self.op.disk_size,
2394       "SWAP_SIZE": self.op.swap_size,
2395       "VCPUS": self.op.vcpus,
2396       "BRIDGE": self.op.bridge,
2397       "INSTANCE_ADD_MODE": self.op.mode,
2398       }
2399     if self.op.mode == constants.INSTANCE_IMPORT:
2400       env["SRC_NODE"] = self.op.src_node
2401       env["SRC_PATH"] = self.op.src_path
2402       env["SRC_IMAGE"] = self.src_image
2403     if self.inst_ip:
2404       env["INSTANCE_IP"] = self.inst_ip
2405
2406     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2407           self.secondaries)
2408     return env, nl, nl
2409
2410
2411   def CheckPrereq(self):
2412     """Check prerequisites.
2413
2414     """
2415     if self.op.mode not in (constants.INSTANCE_CREATE,
2416                             constants.INSTANCE_IMPORT):
2417       raise errors.OpPrereqError, ("Invalid instance creation mode '%s'" %
2418                                    self.op.mode)
2419
2420     if self.op.mode == constants.INSTANCE_IMPORT:
2421       src_node = getattr(self.op, "src_node", None)
2422       src_path = getattr(self.op, "src_path", None)
2423       if src_node is None or src_path is None:
2424         raise errors.OpPrereqError, ("Importing an instance requires source"
2425                                      " node and path options")
2426       src_node_full = self.cfg.ExpandNodeName(src_node)
2427       if src_node_full is None:
2428         raise errors.OpPrereqError, ("Unknown source node '%s'" % src_node)
2429       self.op.src_node = src_node = src_node_full
2430
2431       if not os.path.isabs(src_path):
2432         raise errors.OpPrereqError, ("The source path must be absolute")
2433
2434       export_info = rpc.call_export_info(src_node, src_path)
2435
2436       if not export_info:
2437         raise errors.OpPrereqError, ("No export found in dir %s" % src_path)
2438
2439       if not export_info.has_section(constants.INISECT_EXP):
2440         raise errors.ProgrammerError, ("Corrupted export config")
2441
2442       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2443       if (int(ei_version) != constants.EXPORT_VERSION):
2444         raise errors.OpPrereqError, ("Wrong export version %s (wanted %d)" %
2445                                      (ei_version, constants.EXPORT_VERSION))
2446
2447       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2448         raise errors.OpPrereqError, ("Can't import instance with more than"
2449                                      " one data disk")
2450
2451       # FIXME: are the old os-es, disk sizes, etc. useful?
2452       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2453       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2454                                                          'disk0_dump'))
2455       self.src_image = diskimage
2456     else: # INSTANCE_CREATE
2457       if getattr(self.op, "os_type", None) is None:
2458         raise errors.OpPrereqError, ("No guest OS specified")
2459
2460     # check primary node
2461     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2462     if pnode is None:
2463       raise errors.OpPrereqError, ("Primary node '%s' is unknown" %
2464                                    self.op.pnode)
2465     self.op.pnode = pnode.name
2466     self.pnode = pnode
2467     self.secondaries = []
2468     # disk template and mirror node verification
2469     if self.op.disk_template not in constants.DISK_TEMPLATES:
2470       raise errors.OpPrereqError, ("Invalid disk template name")
2471
2472     if self.op.disk_template == constants.DT_REMOTE_RAID1:
2473       if getattr(self.op, "snode", None) is None:
2474         raise errors.OpPrereqError, ("The 'remote_raid1' disk template needs"
2475                                      " a mirror node")
2476
2477       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2478       if snode_name is None:
2479         raise errors.OpPrereqError, ("Unknown secondary node '%s'" %
2480                                      self.op.snode)
2481       elif snode_name == pnode.name:
2482         raise errors.OpPrereqError, ("The secondary node cannot be"
2483                                      " the primary node.")
2484       self.secondaries.append(snode_name)
2485
2486     # Check lv size requirements
2487     nodenames = [pnode.name] + self.secondaries
2488     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2489
2490     # Required free disk space as a function of disk and swap space
2491     req_size_dict = {
2492       constants.DT_DISKLESS: 0,
2493       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2494       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2495       # 256 MB are added for drbd metadata, 128MB for each drbd device
2496       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2497     }
2498
2499     if self.op.disk_template not in req_size_dict:
2500       raise errors.ProgrammerError, ("Disk template '%s' size requirement"
2501                                      " is unknown" %  self.op.disk_template)
2502
2503     req_size = req_size_dict[self.op.disk_template]
2504
2505     for node in nodenames:
2506       info = nodeinfo.get(node, None)
2507       if not info:
2508         raise errors.OpPrereqError, ("Cannot get current information"
2509                                      " from node '%s'" % nodeinfo)
2510       if req_size > info['vg_free']:
2511         raise errors.OpPrereqError, ("Not enough disk space on target node %s."
2512                                      " %d MB available, %d MB required" %
2513                                      (node, info['vg_free'], req_size))
2514
2515     # os verification
2516     os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2517     if not isinstance(os_obj, objects.OS):
2518       raise errors.OpPrereqError, ("OS '%s' not in supported os list for"
2519                                    " primary node"  % self.op.os_type)
2520
2521     # instance verification
2522     hostname1 = utils.LookupHostname(self.op.instance_name)
2523     if not hostname1:
2524       raise errors.OpPrereqError, ("Instance name '%s' not found in dns" %
2525                                    self.op.instance_name)
2526
2527     self.op.instance_name = instance_name = hostname1['hostname']
2528     instance_list = self.cfg.GetInstanceList()
2529     if instance_name in instance_list:
2530       raise errors.OpPrereqError, ("Instance '%s' is already in the cluster" %
2531                                    instance_name)
2532
2533     ip = getattr(self.op, "ip", None)
2534     if ip is None or ip.lower() == "none":
2535       inst_ip = None
2536     elif ip.lower() == "auto":
2537       inst_ip = hostname1['ip']
2538     else:
2539       if not utils.IsValidIP(ip):
2540         raise errors.OpPrereqError, ("given IP address '%s' doesn't look"
2541                                      " like a valid IP" % ip)
2542       inst_ip = ip
2543     self.inst_ip = inst_ip
2544
2545     command = ["fping", "-q", hostname1['ip']]
2546     result = utils.RunCmd(command)
2547     if not result.failed:
2548       raise errors.OpPrereqError, ("IP %s of instance %s already in use" %
2549                                    (hostname1['ip'], instance_name))
2550
2551     # bridge verification
2552     bridge = getattr(self.op, "bridge", None)
2553     if bridge is None:
2554       self.op.bridge = self.cfg.GetDefBridge()
2555     else:
2556       self.op.bridge = bridge
2557
2558     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2559       raise errors.OpPrereqError, ("target bridge '%s' does not exist on"
2560                                    " destination node '%s'" %
2561                                    (self.op.bridge, pnode.name))
2562
2563     if self.op.start:
2564       self.instance_status = 'up'
2565     else:
2566       self.instance_status = 'down'
2567
2568   def Exec(self, feedback_fn):
2569     """Create and add the instance to the cluster.
2570
2571     """
2572     instance = self.op.instance_name
2573     pnode_name = self.pnode.name
2574
2575     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2576     if self.inst_ip is not None:
2577       nic.ip = self.inst_ip
2578
2579     disks = _GenerateDiskTemplate(self.cfg, self.cfg.GetVGName(),
2580                                   self.op.disk_template,
2581                                   instance, pnode_name,
2582                                   self.secondaries, self.op.disk_size,
2583                                   self.op.swap_size)
2584
2585     iobj = objects.Instance(name=instance, os=self.op.os_type,
2586                             primary_node=pnode_name,
2587                             memory=self.op.mem_size,
2588                             vcpus=self.op.vcpus,
2589                             nics=[nic], disks=disks,
2590                             disk_template=self.op.disk_template,
2591                             status=self.instance_status,
2592                             )
2593
2594     feedback_fn("* creating instance disks...")
2595     if not _CreateDisks(self.cfg, iobj):
2596       _RemoveDisks(iobj, self.cfg)
2597       raise errors.OpExecError, ("Device creation failed, reverting...")
2598
2599     feedback_fn("adding instance %s to cluster config" % instance)
2600
2601     self.cfg.AddInstance(iobj)
2602
2603     if self.op.wait_for_sync:
2604       disk_abort = not _WaitForSync(self.cfg, iobj)
2605     elif iobj.disk_template == "remote_raid1":
2606       # make sure the disks are not degraded (still sync-ing is ok)
2607       time.sleep(15)
2608       feedback_fn("* checking mirrors status")
2609       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2610     else:
2611       disk_abort = False
2612
2613     if disk_abort:
2614       _RemoveDisks(iobj, self.cfg)
2615       self.cfg.RemoveInstance(iobj.name)
2616       raise errors.OpExecError, ("There are some degraded disks for"
2617                                       " this instance")
2618
2619     feedback_fn("creating os for instance %s on node %s" %
2620                 (instance, pnode_name))
2621
2622     if iobj.disk_template != constants.DT_DISKLESS:
2623       if self.op.mode == constants.INSTANCE_CREATE:
2624         feedback_fn("* running the instance OS create scripts...")
2625         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2626           raise errors.OpExecError, ("could not add os for instance %s"
2627                                           " on node %s" %
2628                                           (instance, pnode_name))
2629
2630       elif self.op.mode == constants.INSTANCE_IMPORT:
2631         feedback_fn("* running the instance OS import scripts...")
2632         src_node = self.op.src_node
2633         src_image = self.src_image
2634         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2635                                                 src_node, src_image):
2636           raise errors.OpExecError, ("Could not import os for instance"
2637                                           " %s on node %s" %
2638                                           (instance, pnode_name))
2639       else:
2640         # also checked in the prereq part
2641         raise errors.ProgrammerError, ("Unknown OS initialization mode '%s'"
2642                                        % self.op.mode)
2643
2644     if self.op.start:
2645       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2646       feedback_fn("* starting instance...")
2647       if not rpc.call_instance_start(pnode_name, iobj, None):
2648         raise errors.OpExecError, ("Could not start instance")
2649
2650
2651 class LUConnectConsole(NoHooksLU):
2652   """Connect to an instance's console.
2653
2654   This is somewhat special in that it returns the command line that
2655   you need to run on the master node in order to connect to the
2656   console.
2657
2658   """
2659   _OP_REQP = ["instance_name"]
2660
2661   def CheckPrereq(self):
2662     """Check prerequisites.
2663
2664     This checks that the instance is in the cluster.
2665
2666     """
2667     instance = self.cfg.GetInstanceInfo(
2668       self.cfg.ExpandInstanceName(self.op.instance_name))
2669     if instance is None:
2670       raise errors.OpPrereqError, ("Instance '%s' not known" %
2671                                    self.op.instance_name)
2672     self.instance = instance
2673
2674   def Exec(self, feedback_fn):
2675     """Connect to the console of an instance
2676
2677     """
2678     instance = self.instance
2679     node = instance.primary_node
2680
2681     node_insts = rpc.call_instance_list([node])[node]
2682     if node_insts is False:
2683       raise errors.OpExecError, ("Can't connect to node %s." % node)
2684
2685     if instance.name not in node_insts:
2686       raise errors.OpExecError, ("Instance %s is not running." % instance.name)
2687
2688     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2689
2690     hyper = hypervisor.GetHypervisor()
2691     console_cmd = hyper.GetShellCommandForConsole(instance.name)
2692     return node, console_cmd
2693
2694
2695 class LUAddMDDRBDComponent(LogicalUnit):
2696   """Adda new mirror member to an instance's disk.
2697
2698   """
2699   HPATH = "mirror-add"
2700   HTYPE = constants.HTYPE_INSTANCE
2701   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2702
2703   def BuildHooksEnv(self):
2704     """Build hooks env.
2705
2706     This runs on the master, the primary and all the secondaries.
2707
2708     """
2709     env = {
2710       "INSTANCE_NAME": self.op.instance_name,
2711       "NEW_SECONDARY": self.op.remote_node,
2712       "DISK_NAME": self.op.disk_name,
2713       }
2714     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2715           self.op.remote_node,] + list(self.instance.secondary_nodes)
2716     return env, nl, nl
2717
2718   def CheckPrereq(self):
2719     """Check prerequisites.
2720
2721     This checks that the instance is in the cluster.
2722
2723     """
2724     instance = self.cfg.GetInstanceInfo(
2725       self.cfg.ExpandInstanceName(self.op.instance_name))
2726     if instance is None:
2727       raise errors.OpPrereqError, ("Instance '%s' not known" %
2728                                    self.op.instance_name)
2729     self.instance = instance
2730
2731     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2732     if remote_node is None:
2733       raise errors.OpPrereqError, ("Node '%s' not known" % self.op.remote_node)
2734     self.remote_node = remote_node
2735
2736     if remote_node == instance.primary_node:
2737       raise errors.OpPrereqError, ("The specified node is the primary node of"
2738                                    " the instance.")
2739
2740     if instance.disk_template != constants.DT_REMOTE_RAID1:
2741       raise errors.OpPrereqError, ("Instance's disk layout is not"
2742                                    " remote_raid1.")
2743     for disk in instance.disks:
2744       if disk.iv_name == self.op.disk_name:
2745         break
2746     else:
2747       raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2748                                    " instance." % self.op.disk_name)
2749     if len(disk.children) > 1:
2750       raise errors.OpPrereqError, ("The device already has two slave"
2751                                    " devices.\n"
2752                                    "This would create a 3-disk raid1"
2753                                    " which we don't allow.")
2754     self.disk = disk
2755
2756   def Exec(self, feedback_fn):
2757     """Add the mirror component
2758
2759     """
2760     disk = self.disk
2761     instance = self.instance
2762
2763     remote_node = self.remote_node
2764     new_drbd = _GenerateMDDRBDBranch(self.cfg, self.cfg.GetVGName(),
2765                                      instance.primary_node, remote_node,
2766                                      disk.size, "%s-%s" %
2767                                      (instance.name, self.op.disk_name))
2768
2769     logger.Info("adding new mirror component on secondary")
2770     #HARDCODE
2771     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False):
2772       raise errors.OpExecError, ("Failed to create new component on secondary"
2773                                  " node %s" % remote_node)
2774
2775     logger.Info("adding new mirror component on primary")
2776     #HARDCODE
2777     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd):
2778       # remove secondary dev
2779       self.cfg.SetDiskID(new_drbd, remote_node)
2780       rpc.call_blockdev_remove(remote_node, new_drbd)
2781       raise errors.OpExecError, ("Failed to create volume on primary")
2782
2783     # the device exists now
2784     # call the primary node to add the mirror to md
2785     logger.Info("adding new mirror component to md")
2786     if not rpc.call_blockdev_addchild(instance.primary_node,
2787                                            disk, new_drbd):
2788       logger.Error("Can't add mirror compoment to md!")
2789       self.cfg.SetDiskID(new_drbd, remote_node)
2790       if not rpc.call_blockdev_remove(remote_node, new_drbd):
2791         logger.Error("Can't rollback on secondary")
2792       self.cfg.SetDiskID(new_drbd, instance.primary_node)
2793       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2794         logger.Error("Can't rollback on primary")
2795       raise errors.OpExecError, "Can't add mirror component to md array"
2796
2797     disk.children.append(new_drbd)
2798
2799     self.cfg.AddInstance(instance)
2800
2801     _WaitForSync(self.cfg, instance)
2802
2803     return 0
2804
2805
2806 class LURemoveMDDRBDComponent(LogicalUnit):
2807   """Remove a component from a remote_raid1 disk.
2808
2809   """
2810   HPATH = "mirror-remove"
2811   HTYPE = constants.HTYPE_INSTANCE
2812   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2813
2814   def BuildHooksEnv(self):
2815     """Build hooks env.
2816
2817     This runs on the master, the primary and all the secondaries.
2818
2819     """
2820     env = {
2821       "INSTANCE_NAME": self.op.instance_name,
2822       "DISK_NAME": self.op.disk_name,
2823       "DISK_ID": self.op.disk_id,
2824       "OLD_SECONDARY": self.old_secondary,
2825       }
2826     nl = [self.sstore.GetMasterNode(),
2827           self.instance.primary_node] + list(self.instance.secondary_nodes)
2828     return env, nl, nl
2829
2830   def CheckPrereq(self):
2831     """Check prerequisites.
2832
2833     This checks that the instance is in the cluster.
2834
2835     """
2836     instance = self.cfg.GetInstanceInfo(
2837       self.cfg.ExpandInstanceName(self.op.instance_name))
2838     if instance is None:
2839       raise errors.OpPrereqError, ("Instance '%s' not known" %
2840                                    self.op.instance_name)
2841     self.instance = instance
2842
2843     if instance.disk_template != constants.DT_REMOTE_RAID1:
2844       raise errors.OpPrereqError, ("Instance's disk layout is not"
2845                                    " remote_raid1.")
2846     for disk in instance.disks:
2847       if disk.iv_name == self.op.disk_name:
2848         break
2849     else:
2850       raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2851                                    " instance." % self.op.disk_name)
2852     for child in disk.children:
2853       if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2854         break
2855     else:
2856       raise errors.OpPrereqError, ("Can't find the device with this port.")
2857
2858     if len(disk.children) < 2:
2859       raise errors.OpPrereqError, ("Cannot remove the last component from"
2860                                    " a mirror.")
2861     self.disk = disk
2862     self.child = child
2863     if self.child.logical_id[0] == instance.primary_node:
2864       oid = 1
2865     else:
2866       oid = 0
2867     self.old_secondary = self.child.logical_id[oid]
2868
2869   def Exec(self, feedback_fn):
2870     """Remove the mirror component
2871
2872     """
2873     instance = self.instance
2874     disk = self.disk
2875     child = self.child
2876     logger.Info("remove mirror component")
2877     self.cfg.SetDiskID(disk, instance.primary_node)
2878     if not rpc.call_blockdev_removechild(instance.primary_node,
2879                                               disk, child):
2880       raise errors.OpExecError, ("Can't remove child from mirror.")
2881
2882     for node in child.logical_id[:2]:
2883       self.cfg.SetDiskID(child, node)
2884       if not rpc.call_blockdev_remove(node, child):
2885         logger.Error("Warning: failed to remove device from node %s,"
2886                      " continuing operation." % node)
2887
2888     disk.children.remove(child)
2889     self.cfg.AddInstance(instance)
2890
2891
2892 class LUReplaceDisks(LogicalUnit):
2893   """Replace the disks of an instance.
2894
2895   """
2896   HPATH = "mirrors-replace"
2897   HTYPE = constants.HTYPE_INSTANCE
2898   _OP_REQP = ["instance_name"]
2899
2900   def BuildHooksEnv(self):
2901     """Build hooks env.
2902
2903     This runs on the master, the primary and all the secondaries.
2904
2905     """
2906     env = {
2907       "INSTANCE_NAME": self.op.instance_name,
2908       "NEW_SECONDARY": self.op.remote_node,
2909       "OLD_SECONDARY": self.instance.secondary_nodes[0],
2910       }
2911     nl = [self.sstore.GetMasterNode(),
2912           self.instance.primary_node] + list(self.instance.secondary_nodes)
2913     return env, nl, nl
2914
2915   def CheckPrereq(self):
2916     """Check prerequisites.
2917
2918     This checks that the instance is in the cluster.
2919
2920     """
2921     instance = self.cfg.GetInstanceInfo(
2922       self.cfg.ExpandInstanceName(self.op.instance_name))
2923     if instance is None:
2924       raise errors.OpPrereqError, ("Instance '%s' not known" %
2925                                    self.op.instance_name)
2926     self.instance = instance
2927
2928     if instance.disk_template != constants.DT_REMOTE_RAID1:
2929       raise errors.OpPrereqError, ("Instance's disk layout is not"
2930                                    " remote_raid1.")
2931
2932     if len(instance.secondary_nodes) != 1:
2933       raise errors.OpPrereqError, ("The instance has a strange layout,"
2934                                    " expected one secondary but found %d" %
2935                                    len(instance.secondary_nodes))
2936
2937     remote_node = getattr(self.op, "remote_node", None)
2938     if remote_node is None:
2939       remote_node = instance.secondary_nodes[0]
2940     else:
2941       remote_node = self.cfg.ExpandNodeName(remote_node)
2942       if remote_node is None:
2943         raise errors.OpPrereqError, ("Node '%s' not known" %
2944                                      self.op.remote_node)
2945     if remote_node == instance.primary_node:
2946       raise errors.OpPrereqError, ("The specified node is the primary node of"
2947                                    " the instance.")
2948     self.op.remote_node = remote_node
2949
2950   def Exec(self, feedback_fn):
2951     """Replace the disks of an instance.
2952
2953     """
2954     instance = self.instance
2955     iv_names = {}
2956     # start of work
2957     remote_node = self.op.remote_node
2958     cfg = self.cfg
2959     vgname = cfg.GetVGName()
2960     for dev in instance.disks:
2961       size = dev.size
2962       new_drbd = _GenerateMDDRBDBranch(cfg, vgname, instance.primary_node,
2963                                        remote_node, size,
2964                                        "%s-%s" % (instance.name, dev.iv_name))
2965       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
2966       logger.Info("adding new mirror component on secondary for %s" %
2967                   dev.iv_name)
2968       #HARDCODE
2969       if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False):
2970         raise errors.OpExecError, ("Failed to create new component on"
2971                                    " secondary node %s\n"
2972                                    "Full abort, cleanup manually!" %
2973                                    remote_node)
2974
2975       logger.Info("adding new mirror component on primary")
2976       #HARDCODE
2977       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd):
2978         # remove secondary dev
2979         cfg.SetDiskID(new_drbd, remote_node)
2980         rpc.call_blockdev_remove(remote_node, new_drbd)
2981         raise errors.OpExecError("Failed to create volume on primary!\n"
2982                                  "Full abort, cleanup manually!!")
2983
2984       # the device exists now
2985       # call the primary node to add the mirror to md
2986       logger.Info("adding new mirror component to md")
2987       if not rpc.call_blockdev_addchild(instance.primary_node, dev,
2988                                         new_drbd):
2989         logger.Error("Can't add mirror compoment to md!")
2990         cfg.SetDiskID(new_drbd, remote_node)
2991         if not rpc.call_blockdev_remove(remote_node, new_drbd):
2992           logger.Error("Can't rollback on secondary")
2993         cfg.SetDiskID(new_drbd, instance.primary_node)
2994         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2995           logger.Error("Can't rollback on primary")
2996         raise errors.OpExecError, ("Full abort, cleanup manually!!")
2997
2998       dev.children.append(new_drbd)
2999       cfg.AddInstance(instance)
3000
3001     # this can fail as the old devices are degraded and _WaitForSync
3002     # does a combined result over all disks, so we don't check its
3003     # return value
3004     _WaitForSync(cfg, instance, unlock=True)
3005
3006     # so check manually all the devices
3007     for name in iv_names:
3008       dev, child, new_drbd = iv_names[name]
3009       cfg.SetDiskID(dev, instance.primary_node)
3010       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3011       if is_degr:
3012         raise errors.OpExecError, ("MD device %s is degraded!" % name)
3013       cfg.SetDiskID(new_drbd, instance.primary_node)
3014       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3015       if is_degr:
3016         raise errors.OpExecError, ("New drbd device %s is degraded!" % name)
3017
3018     for name in iv_names:
3019       dev, child, new_drbd = iv_names[name]
3020       logger.Info("remove mirror %s component" % name)
3021       cfg.SetDiskID(dev, instance.primary_node)
3022       if not rpc.call_blockdev_removechild(instance.primary_node,
3023                                                 dev, child):
3024         logger.Error("Can't remove child from mirror, aborting"
3025                      " *this device cleanup*.\nYou need to cleanup manually!!")
3026         continue
3027
3028       for node in child.logical_id[:2]:
3029         logger.Info("remove child device on %s" % node)
3030         cfg.SetDiskID(child, node)
3031         if not rpc.call_blockdev_remove(node, child):
3032           logger.Error("Warning: failed to remove device from node %s,"
3033                        " continuing operation." % node)
3034
3035       dev.children.remove(child)
3036
3037       cfg.AddInstance(instance)
3038
3039
3040 class LUQueryInstanceData(NoHooksLU):
3041   """Query runtime instance data.
3042
3043   """
3044   _OP_REQP = ["instances"]
3045
3046   def CheckPrereq(self):
3047     """Check prerequisites.
3048
3049     This only checks the optional instance list against the existing names.
3050
3051     """
3052     if not isinstance(self.op.instances, list):
3053       raise errors.OpPrereqError, "Invalid argument type 'instances'"
3054     if self.op.instances:
3055       self.wanted_instances = []
3056       names = self.op.instances
3057       for name in names:
3058         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3059         if instance is None:
3060           raise errors.OpPrereqError, ("No such instance name '%s'" % name)
3061       self.wanted_instances.append(instance)
3062     else:
3063       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3064                                in self.cfg.GetInstanceList()]
3065     return
3066
3067
3068   def _ComputeDiskStatus(self, instance, snode, dev):
3069     """Compute block device status.
3070
3071     """
3072     self.cfg.SetDiskID(dev, instance.primary_node)
3073     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3074     if dev.dev_type == "drbd":
3075       # we change the snode then (otherwise we use the one passed in)
3076       if dev.logical_id[0] == instance.primary_node:
3077         snode = dev.logical_id[1]
3078       else:
3079         snode = dev.logical_id[0]
3080
3081     if snode:
3082       self.cfg.SetDiskID(dev, snode)
3083       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3084     else:
3085       dev_sstatus = None
3086
3087     if dev.children:
3088       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3089                       for child in dev.children]
3090     else:
3091       dev_children = []
3092
3093     data = {
3094       "iv_name": dev.iv_name,
3095       "dev_type": dev.dev_type,
3096       "logical_id": dev.logical_id,
3097       "physical_id": dev.physical_id,
3098       "pstatus": dev_pstatus,
3099       "sstatus": dev_sstatus,
3100       "children": dev_children,
3101       }
3102
3103     return data
3104
3105   def Exec(self, feedback_fn):
3106     """Gather and return data"""
3107     result = {}
3108     for instance in self.wanted_instances:
3109       remote_info = rpc.call_instance_info(instance.primary_node,
3110                                                 instance.name)
3111       if remote_info and "state" in remote_info:
3112         remote_state = "up"
3113       else:
3114         remote_state = "down"
3115       if instance.status == "down":
3116         config_state = "down"
3117       else:
3118         config_state = "up"
3119
3120       disks = [self._ComputeDiskStatus(instance, None, device)
3121                for device in instance.disks]
3122
3123       idict = {
3124         "name": instance.name,
3125         "config_state": config_state,
3126         "run_state": remote_state,
3127         "pnode": instance.primary_node,
3128         "snodes": instance.secondary_nodes,
3129         "os": instance.os,
3130         "memory": instance.memory,
3131         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3132         "disks": disks,
3133         }
3134
3135       result[instance.name] = idict
3136
3137     return result
3138
3139
3140 class LUQueryNodeData(NoHooksLU):
3141   """Logical unit for querying node data.
3142
3143   """
3144   _OP_REQP = ["nodes"]
3145
3146   def CheckPrereq(self):
3147     """Check prerequisites.
3148
3149     This only checks the optional node list against the existing names.
3150
3151     """
3152     self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3153
3154   def Exec(self, feedback_fn):
3155     """Compute and return the list of nodes.
3156
3157     """
3158     ilist = [self.cfg.GetInstanceInfo(iname) for iname
3159              in self.cfg.GetInstanceList()]
3160     result = []
3161     for node in self.wanted_nodes:
3162       result.append((node.name, node.primary_ip, node.secondary_ip,
3163                      [inst.name for inst in ilist
3164                       if inst.primary_node == node.name],
3165                      [inst.name for inst in ilist
3166                       if node.name in inst.secondary_nodes],
3167                      ))
3168     return result
3169
3170
3171 class LUSetInstanceParms(LogicalUnit):
3172   """Modifies an instances's parameters.
3173
3174   """
3175   HPATH = "instance-modify"
3176   HTYPE = constants.HTYPE_INSTANCE
3177   _OP_REQP = ["instance_name"]
3178
3179   def BuildHooksEnv(self):
3180     """Build hooks env.
3181
3182     This runs on the master, primary and secondaries.
3183
3184     """
3185     env = {
3186       "INSTANCE_NAME": self.op.instance_name,
3187       }
3188     if self.mem:
3189       env["MEM_SIZE"] = self.mem
3190     if self.vcpus:
3191       env["VCPUS"] = self.vcpus
3192     if self.do_ip:
3193       env["INSTANCE_IP"] = self.ip
3194     if self.bridge:
3195       env["BRIDGE"] = self.bridge
3196
3197     nl = [self.sstore.GetMasterNode(),
3198           self.instance.primary_node] + list(self.instance.secondary_nodes)
3199
3200     return env, nl, nl
3201
3202   def CheckPrereq(self):
3203     """Check prerequisites.
3204
3205     This only checks the instance list against the existing names.
3206
3207     """
3208     self.mem = getattr(self.op, "mem", None)
3209     self.vcpus = getattr(self.op, "vcpus", None)
3210     self.ip = getattr(self.op, "ip", None)
3211     self.bridge = getattr(self.op, "bridge", None)
3212     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3213       raise errors.OpPrereqError, ("No changes submitted")
3214     if self.mem is not None:
3215       try:
3216         self.mem = int(self.mem)
3217       except ValueError, err:
3218         raise errors.OpPrereqError, ("Invalid memory size: %s" % str(err))
3219     if self.vcpus is not None:
3220       try:
3221         self.vcpus = int(self.vcpus)
3222       except ValueError, err:
3223         raise errors.OpPrereqError, ("Invalid vcpus number: %s" % str(err))
3224     if self.ip is not None:
3225       self.do_ip = True
3226       if self.ip.lower() == "none":
3227         self.ip = None
3228       else:
3229         if not utils.IsValidIP(self.ip):
3230           raise errors.OpPrereqError, ("Invalid IP address '%s'." % self.ip)
3231     else:
3232       self.do_ip = False
3233
3234     instance = self.cfg.GetInstanceInfo(
3235       self.cfg.ExpandInstanceName(self.op.instance_name))
3236     if instance is None:
3237       raise errors.OpPrereqError, ("No such instance name '%s'" %
3238                                    self.op.instance_name)
3239     self.op.instance_name = instance.name
3240     self.instance = instance
3241     return
3242
3243   def Exec(self, feedback_fn):
3244     """Modifies an instance.
3245
3246     All parameters take effect only at the next restart of the instance.
3247     """
3248     result = []
3249     instance = self.instance
3250     if self.mem:
3251       instance.memory = self.mem
3252       result.append(("mem", self.mem))
3253     if self.vcpus:
3254       instance.vcpus = self.vcpus
3255       result.append(("vcpus",  self.vcpus))
3256     if self.do_ip:
3257       instance.nics[0].ip = self.ip
3258       result.append(("ip", self.ip))
3259     if self.bridge:
3260       instance.nics[0].bridge = self.bridge
3261       result.append(("bridge", self.bridge))
3262
3263     self.cfg.AddInstance(instance)
3264
3265     return result
3266
3267
3268 class LUQueryExports(NoHooksLU):
3269   """Query the exports list
3270
3271   """
3272   _OP_REQP = []
3273
3274   def CheckPrereq(self):
3275     """Check that the nodelist contains only existing nodes.
3276
3277     """
3278     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3279
3280   def Exec(self, feedback_fn):
3281     """Compute the list of all the exported system images.
3282
3283     Returns:
3284       a dictionary with the structure node->(export-list)
3285       where export-list is a list of the instances exported on
3286       that node.
3287
3288     """
3289     return rpc.call_export_list([node.name for node in self.nodes])
3290
3291
3292 class LUExportInstance(LogicalUnit):
3293   """Export an instance to an image in the cluster.
3294
3295   """
3296   HPATH = "instance-export"
3297   HTYPE = constants.HTYPE_INSTANCE
3298   _OP_REQP = ["instance_name", "target_node", "shutdown"]
3299
3300   def BuildHooksEnv(self):
3301     """Build hooks env.
3302
3303     This will run on the master, primary node and target node.
3304
3305     """
3306     env = {
3307       "INSTANCE_NAME": self.op.instance_name,
3308       "EXPORT_NODE": self.op.target_node,
3309       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3310       }
3311     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3312           self.op.target_node]
3313     return env, nl, nl
3314
3315   def CheckPrereq(self):
3316     """Check prerequisites.
3317
3318     This checks that the instance name is a valid one.
3319
3320     """
3321     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3322     self.instance = self.cfg.GetInstanceInfo(instance_name)
3323     if self.instance is None:
3324       raise errors.OpPrereqError, ("Instance '%s' not found" %
3325                                    self.op.instance_name)
3326
3327     # node verification
3328     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3329     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3330
3331     if self.dst_node is None:
3332       raise errors.OpPrereqError, ("Destination node '%s' is unknown." %
3333                                    self.op.target_node)
3334     self.op.target_node = self.dst_node.name
3335
3336   def Exec(self, feedback_fn):
3337     """Export an instance to an image in the cluster.
3338
3339     """
3340     instance = self.instance
3341     dst_node = self.dst_node
3342     src_node = instance.primary_node
3343     # shutdown the instance, unless requested not to do so
3344     if self.op.shutdown:
3345       op = opcodes.OpShutdownInstance(instance_name=instance.name)
3346       self.processor.ChainOpCode(op, feedback_fn)
3347
3348     vgname = self.cfg.GetVGName()
3349
3350     snap_disks = []
3351
3352     try:
3353       for disk in instance.disks:
3354         if disk.iv_name == "sda":
3355           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3356           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3357
3358           if not new_dev_name:
3359             logger.Error("could not snapshot block device %s on node %s" %
3360                          (disk.logical_id[1], src_node))
3361           else:
3362             new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3363                                       logical_id=(vgname, new_dev_name),
3364                                       physical_id=(vgname, new_dev_name),
3365                                       iv_name=disk.iv_name)
3366             snap_disks.append(new_dev)
3367
3368     finally:
3369       if self.op.shutdown:
3370         op = opcodes.OpStartupInstance(instance_name=instance.name,
3371                                        force=False)
3372         self.processor.ChainOpCode(op, feedback_fn)
3373
3374     # TODO: check for size
3375
3376     for dev in snap_disks:
3377       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3378                                            instance):
3379         logger.Error("could not export block device %s from node"
3380                      " %s to node %s" %
3381                      (dev.logical_id[1], src_node, dst_node.name))
3382       if not rpc.call_blockdev_remove(src_node, dev):
3383         logger.Error("could not remove snapshot block device %s from"
3384                      " node %s" % (dev.logical_id[1], src_node))
3385
3386     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3387       logger.Error("could not finalize export for instance %s on node %s" %
3388                    (instance.name, dst_node.name))
3389
3390     nodelist = self.cfg.GetNodeList()
3391     nodelist.remove(dst_node.name)
3392
3393     # on one-node clusters nodelist will be empty after the removal
3394     # if we proceed the backup would be removed because OpQueryExports
3395     # substitutes an empty list with the full cluster node list.
3396     if nodelist:
3397       op = opcodes.OpQueryExports(nodes=nodelist)
3398       exportlist = self.processor.ChainOpCode(op, feedback_fn)
3399       for node in exportlist:
3400         if instance.name in exportlist[node]:
3401           if not rpc.call_export_remove(node, instance.name):
3402             logger.Error("could not remove older export for instance %s"
3403                          " on node %s" % (instance.name, node))