A CheckPrereq method had one unconverted "return 1" statement. Change it to the
[ganeti-local] / lib / cmdlib.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import socket
30 import time
31 import tempfile
32 import re
33 import platform
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.processor = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     for attr_name in self._OP_REQP:
78       attr_val = getattr(op, attr_name, None)
79       if attr_val is None:
80         raise errors.OpPrereqError("Required parameter '%s' missing" %
81                                    attr_name)
82     if self.REQ_CLUSTER:
83       if not cfg.IsCluster():
84         raise errors.OpPrereqError("Cluster not initialized yet,"
85                                    " use 'gnt-cluster init' first.")
86       if self.REQ_MASTER:
87         master = sstore.GetMasterNode()
88         if master != socket.gethostname():
89           raise errors.OpPrereqError("Commands must be run on the master"
90                                      " node %s" % master)
91
92   def CheckPrereq(self):
93     """Check prerequisites for this LU.
94
95     This method should check that the prerequisites for the execution
96     of this LU are fulfilled. It can do internode communication, but
97     it should be idempotent - no cluster or system changes are
98     allowed.
99
100     The method should raise errors.OpPrereqError in case something is
101     not fulfilled. Its return value is ignored.
102
103     This method should also update all the parameters of the opcode to
104     their canonical form; e.g. a short node name must be fully
105     expanded after this method has successfully completed (so that
106     hooks, logging, etc. work correctly).
107
108     """
109     raise NotImplementedError
110
111   def Exec(self, feedback_fn):
112     """Execute the LU.
113
114     This method should implement the actual work. It should raise
115     errors.OpExecError for failures that are somewhat dealt with in
116     code, or expected.
117
118     """
119     raise NotImplementedError
120
121   def BuildHooksEnv(self):
122     """Build hooks environment for this LU.
123
124     This method should return a three-node tuple consisting of: a dict
125     containing the environment that will be used for running the
126     specific hook for this LU, a list of node names on which the hook
127     should run before the execution, and a list of node names on which
128     the hook should run after the execution.
129
130     The keys of the dict must not have 'GANETI_' prefixed as this will
131     be handled in the hooks runner. Also note additional keys will be
132     added by the hooks runner. If the LU doesn't define any
133     environment, an empty dict (and not None) should be returned.
134
135     As for the node lists, the master should not be included in the
136     them, as it will be added by the hooks runner in case this LU
137     requires a cluster to run on (otherwise we don't have a node
138     list). No nodes should be returned as an empty list (and not
139     None).
140
141     Note that if the HPATH for a LU class is None, this function will
142     not be called.
143
144     """
145     raise NotImplementedError
146
147
148 class NoHooksLU(LogicalUnit):
149   """Simple LU which runs no hooks.
150
151   This LU is intended as a parent for other LogicalUnits which will
152   run no hooks, in order to reduce duplicate code.
153
154   """
155   HPATH = None
156   HTYPE = None
157
158   def BuildHooksEnv(self):
159     """Build hooks env.
160
161     This is a no-op, since we don't run hooks.
162
163     """
164     return
165
166
167 def _GetWantedNodes(lu, nodes):
168   """Returns list of checked and expanded nodes.
169
170   Args:
171     nodes: List of nodes (strings) or None for all
172
173   """
174   if nodes is not None and not isinstance(nodes, list):
175     raise errors.OpPrereqError("Invalid argument type 'nodes'")
176
177   if nodes:
178     wanted_nodes = []
179
180     for name in nodes:
181       node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
182       if node is None:
183         raise errors.OpPrereqError("No such node name '%s'" % name)
184     wanted_nodes.append(node)
185
186     return wanted_nodes
187   else:
188     return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
189
190
191 def _CheckOutputFields(static, dynamic, selected):
192   """Checks whether all selected fields are valid.
193
194   Args:
195     static: Static fields
196     dynamic: Dynamic fields
197
198   """
199   static_fields = frozenset(static)
200   dynamic_fields = frozenset(dynamic)
201
202   all_fields = static_fields | dynamic_fields
203
204   if not all_fields.issuperset(selected):
205     raise errors.OpPrereqError("Unknown output fields selected: %s"
206                                % ",".join(frozenset(selected).
207                                           difference(all_fields)))
208
209
210 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
211                           memory, vcpus, nics):
212   """Builds instance related env variables for hooks from single variables.
213
214   Args:
215     secondary_nodes: List of secondary nodes as strings
216   """
217   env = {
218     "INSTANCE_NAME": name,
219     "INSTANCE_PRIMARY": primary_node,
220     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
221     "INSTANCE_OS_TYPE": os_type,
222     "INSTANCE_STATUS": status,
223     "INSTANCE_MEMORY": memory,
224     "INSTANCE_VCPUS": vcpus,
225   }
226
227   if nics:
228     nic_count = len(nics)
229     for idx, (ip, bridge) in enumerate(nics):
230       if ip is None:
231         ip = ""
232       env["INSTANCE_NIC%d_IP" % idx] = ip
233       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
234   else:
235     nic_count = 0
236
237   env["INSTANCE_NIC_COUNT"] = nic_count
238
239   return env
240
241
242 def _BuildInstanceHookEnvByObject(instance, override=None):
243   """Builds instance related env variables for hooks from an object.
244
245   Args:
246     instance: objects.Instance object of instance
247     override: dict of values to override
248   """
249   args = {
250     'name': instance.name,
251     'primary_node': instance.primary_node,
252     'secondary_nodes': instance.secondary_nodes,
253     'os_type': instance.os,
254     'status': instance.os,
255     'memory': instance.memory,
256     'vcpus': instance.vcpus,
257     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
258   }
259   if override:
260     args.update(override)
261   return _BuildInstanceHookEnv(**args)
262
263
264 def _UpdateEtcHosts(fullnode, ip):
265   """Ensure a node has a correct entry in /etc/hosts.
266
267   Args:
268     fullnode - Fully qualified domain name of host. (str)
269     ip       - IPv4 address of host (str)
270
271   """
272   node = fullnode.split(".", 1)[0]
273
274   f = open('/etc/hosts', 'r+')
275
276   inthere = False
277
278   save_lines = []
279   add_lines = []
280   removed = False
281
282   while True:
283     rawline = f.readline()
284
285     if not rawline:
286       # End of file
287       break
288
289     line = rawline.split('\n')[0]
290
291     # Strip off comments
292     line = line.split('#')[0]
293
294     if not line:
295       # Entire line was comment, skip
296       save_lines.append(rawline)
297       continue
298
299     fields = line.split()
300
301     haveall = True
302     havesome = False
303     for spec in [ ip, fullnode, node ]:
304       if spec not in fields:
305         haveall = False
306       if spec in fields:
307         havesome = True
308
309     if haveall:
310       inthere = True
311       save_lines.append(rawline)
312       continue
313
314     if havesome and not haveall:
315       # Line (old, or manual?) which is missing some.  Remove.
316       removed = True
317       continue
318
319     save_lines.append(rawline)
320
321   if not inthere:
322     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
323
324   if removed:
325     if add_lines:
326       save_lines = save_lines + add_lines
327
328     # We removed a line, write a new file and replace old.
329     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
330     newfile = os.fdopen(fd, 'w')
331     newfile.write(''.join(save_lines))
332     newfile.close()
333     os.rename(tmpname, '/etc/hosts')
334
335   elif add_lines:
336     # Simply appending a new line will do the trick.
337     f.seek(0, 2)
338     for add in add_lines:
339       f.write(add)
340
341   f.close()
342
343
344 def _UpdateKnownHosts(fullnode, ip, pubkey):
345   """Ensure a node has a correct known_hosts entry.
346
347   Args:
348     fullnode - Fully qualified domain name of host. (str)
349     ip       - IPv4 address of host (str)
350     pubkey   - the public key of the cluster
351
352   """
353   if os.path.exists('/etc/ssh/ssh_known_hosts'):
354     f = open('/etc/ssh/ssh_known_hosts', 'r+')
355   else:
356     f = open('/etc/ssh/ssh_known_hosts', 'w+')
357
358   inthere = False
359
360   save_lines = []
361   add_lines = []
362   removed = False
363
364   while True:
365     rawline = f.readline()
366     logger.Debug('read %s' % (repr(rawline),))
367
368     if not rawline:
369       # End of file
370       break
371
372     line = rawline.split('\n')[0]
373
374     parts = line.split(' ')
375     fields = parts[0].split(',')
376     key = parts[2]
377
378     haveall = True
379     havesome = False
380     for spec in [ ip, fullnode ]:
381       if spec not in fields:
382         haveall = False
383       if spec in fields:
384         havesome = True
385
386     logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
387     if haveall and key == pubkey:
388       inthere = True
389       save_lines.append(rawline)
390       logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
391       continue
392
393     if havesome and (not haveall or key != pubkey):
394       removed = True
395       logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
396       continue
397
398     save_lines.append(rawline)
399
400   if not inthere:
401     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
402     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
403
404   if removed:
405     save_lines = save_lines + add_lines
406
407     # Write a new file and replace old.
408     fd, tmpname = tempfile.mkstemp('tmp', 'ssh_known_hosts_', '/etc/ssh')
409     newfile = os.fdopen(fd, 'w')
410     newfile.write(''.join(save_lines))
411     newfile.close()
412     logger.Debug("Wrote new known_hosts.")
413     os.rename(tmpname, '/etc/ssh/ssh_known_hosts')
414
415   elif add_lines:
416     # Simply appending a new line will do the trick.
417     f.seek(0, 2)
418     for add in add_lines:
419       f.write(add)
420
421   f.close()
422
423
424 def _HasValidVG(vglist, vgname):
425   """Checks if the volume group list is valid.
426
427   A non-None return value means there's an error, and the return value
428   is the error message.
429
430   """
431   vgsize = vglist.get(vgname, None)
432   if vgsize is None:
433     return "volume group '%s' missing" % vgname
434   elif vgsize < 20480:
435     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
436             (vgname, vgsize))
437   return None
438
439
440 def _InitSSHSetup(node):
441   """Setup the SSH configuration for the cluster.
442
443
444   This generates a dsa keypair for root, adds the pub key to the
445   permitted hosts and adds the hostkey to its own known hosts.
446
447   Args:
448     node: the name of this host as a fqdn
449
450   """
451   utils.RemoveFile('/root/.ssh/known_hosts')
452
453   if os.path.exists('/root/.ssh/id_dsa'):
454     utils.CreateBackup('/root/.ssh/id_dsa')
455   if os.path.exists('/root/.ssh/id_dsa.pub'):
456     utils.CreateBackup('/root/.ssh/id_dsa.pub')
457
458   utils.RemoveFile('/root/.ssh/id_dsa')
459   utils.RemoveFile('/root/.ssh/id_dsa.pub')
460
461   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
462                          "-f", "/root/.ssh/id_dsa",
463                          "-q", "-N", ""])
464   if result.failed:
465     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
466                              result.output)
467
468   f = open('/root/.ssh/id_dsa.pub', 'r')
469   try:
470     utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
471   finally:
472     f.close()
473
474
475 def _InitGanetiServerSetup(ss):
476   """Setup the necessary configuration for the initial node daemon.
477
478   This creates the nodepass file containing the shared password for
479   the cluster and also generates the SSL certificate.
480
481   """
482   # Create pseudo random password
483   randpass = sha.new(os.urandom(64)).hexdigest()
484   # and write it into sstore
485   ss.SetKey(ss.SS_NODED_PASS, randpass)
486
487   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
488                          "-days", str(365*5), "-nodes", "-x509",
489                          "-keyout", constants.SSL_CERT_FILE,
490                          "-out", constants.SSL_CERT_FILE, "-batch"])
491   if result.failed:
492     raise errors.OpExecError("could not generate server ssl cert, command"
493                              " %s had exitcode %s and error message %s" %
494                              (result.cmd, result.exit_code, result.output))
495
496   os.chmod(constants.SSL_CERT_FILE, 0400)
497
498   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
499
500   if result.failed:
501     raise errors.OpExecError("Could not start the node daemon, command %s"
502                              " had exitcode %s and error %s" %
503                              (result.cmd, result.exit_code, result.output))
504
505
506 class LUInitCluster(LogicalUnit):
507   """Initialise the cluster.
508
509   """
510   HPATH = "cluster-init"
511   HTYPE = constants.HTYPE_CLUSTER
512   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
513               "def_bridge", "master_netdev"]
514   REQ_CLUSTER = False
515
516   def BuildHooksEnv(self):
517     """Build hooks env.
518
519     Notes: Since we don't require a cluster, we must manually add
520     ourselves in the post-run node list.
521
522     """
523     env = {
524       "CLUSTER": self.op.cluster_name,
525       "MASTER": self.hostname['hostname_full'],
526       }
527     return env, [], [self.hostname['hostname_full']]
528
529   def CheckPrereq(self):
530     """Verify that the passed name is a valid one.
531
532     """
533     if config.ConfigWriter.IsCluster():
534       raise errors.OpPrereqError("Cluster is already initialised")
535
536     hostname_local = socket.gethostname()
537     self.hostname = hostname = utils.LookupHostname(hostname_local)
538     if not hostname:
539       raise errors.OpPrereqError("Cannot resolve my own hostname ('%s')" %
540                                  hostname_local)
541
542     self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
543     if not clustername:
544       raise errors.OpPrereqError("Cannot resolve given cluster name ('%s')"
545                                  % self.op.cluster_name)
546
547     result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
548     if result.failed:
549       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
550                                  " to %s,\nbut this ip address does not"
551                                  " belong to this host."
552                                  " Aborting." % hostname['ip'])
553
554     secondary_ip = getattr(self.op, "secondary_ip", None)
555     if secondary_ip and not utils.IsValidIP(secondary_ip):
556       raise errors.OpPrereqError("Invalid secondary ip given")
557     if secondary_ip and secondary_ip != hostname['ip']:
558       result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
559       if result.failed:
560         raise errors.OpPrereqError("You gave %s as secondary IP,\n"
561                                    "but it does not belong to this host." %
562                                    secondary_ip)
563     self.secondary_ip = secondary_ip
564
565     # checks presence of the volume group given
566     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
567
568     if vgstatus:
569       raise errors.OpPrereqError("Error: %s" % vgstatus)
570
571     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
572                     self.op.mac_prefix):
573       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
574                                  self.op.mac_prefix)
575
576     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
577       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
578                                  self.op.hypervisor_type)
579
580     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
581     if result.failed:
582       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
583                                  (self.op.master_netdev,
584                                   result.output.strip()))
585
586   def Exec(self, feedback_fn):
587     """Initialize the cluster.
588
589     """
590     clustername = self.clustername
591     hostname = self.hostname
592
593     # set up the simple store
594     ss = ssconf.SimpleStore()
595     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
596     ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
597     ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
598     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
599     ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
600
601     # set up the inter-node password and certificate
602     _InitGanetiServerSetup(ss)
603
604     # start the master ip
605     rpc.call_node_start_master(hostname['hostname_full'])
606
607     # set up ssh config and /etc/hosts
608     f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
609     try:
610       sshline = f.read()
611     finally:
612       f.close()
613     sshkey = sshline.split(" ")[1]
614
615     _UpdateEtcHosts(hostname['hostname_full'],
616                     hostname['ip'],
617                     )
618
619     _UpdateKnownHosts(hostname['hostname_full'],
620                       hostname['ip'],
621                       sshkey,
622                       )
623
624     _InitSSHSetup(hostname['hostname'])
625
626     # init of cluster config file
627     cfgw = config.ConfigWriter()
628     cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
629                     sshkey, self.op.mac_prefix,
630                     self.op.vg_name, self.op.def_bridge)
631
632
633 class LUDestroyCluster(NoHooksLU):
634   """Logical unit for destroying the cluster.
635
636   """
637   _OP_REQP = []
638
639   def CheckPrereq(self):
640     """Check prerequisites.
641
642     This checks whether the cluster is empty.
643
644     Any errors are signalled by raising errors.OpPrereqError.
645
646     """
647     master = self.sstore.GetMasterNode()
648
649     nodelist = self.cfg.GetNodeList()
650     if len(nodelist) != 1 or nodelist[0] != master:
651       raise errors.OpPrereqError("There are still %d node(s) in"
652                                  " this cluster." % (len(nodelist) - 1))
653     instancelist = self.cfg.GetInstanceList()
654     if instancelist:
655       raise errors.OpPrereqError("There are still %d instance(s) in"
656                                  " this cluster." % len(instancelist))
657
658   def Exec(self, feedback_fn):
659     """Destroys the cluster.
660
661     """
662     utils.CreateBackup('/root/.ssh/id_dsa')
663     utils.CreateBackup('/root/.ssh/id_dsa.pub')
664     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
665
666
667 class LUVerifyCluster(NoHooksLU):
668   """Verifies the cluster status.
669
670   """
671   _OP_REQP = []
672
673   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
674                   remote_version, feedback_fn):
675     """Run multiple tests against a node.
676
677     Test list:
678       - compares ganeti version
679       - checks vg existance and size > 20G
680       - checks config file checksum
681       - checks ssh to other nodes
682
683     Args:
684       node: name of the node to check
685       file_list: required list of files
686       local_cksum: dictionary of local files and their checksums
687
688     """
689     # compares ganeti version
690     local_version = constants.PROTOCOL_VERSION
691     if not remote_version:
692       feedback_fn(" - ERROR: connection to %s failed" % (node))
693       return True
694
695     if local_version != remote_version:
696       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
697                       (local_version, node, remote_version))
698       return True
699
700     # checks vg existance and size > 20G
701
702     bad = False
703     if not vglist:
704       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
705                       (node,))
706       bad = True
707     else:
708       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
709       if vgstatus:
710         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
711         bad = True
712
713     # checks config file checksum
714     # checks ssh to any
715
716     if 'filelist' not in node_result:
717       bad = True
718       feedback_fn("  - ERROR: node hasn't returned file checksum data")
719     else:
720       remote_cksum = node_result['filelist']
721       for file_name in file_list:
722         if file_name not in remote_cksum:
723           bad = True
724           feedback_fn("  - ERROR: file '%s' missing" % file_name)
725         elif remote_cksum[file_name] != local_cksum[file_name]:
726           bad = True
727           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
728
729     if 'nodelist' not in node_result:
730       bad = True
731       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
732     else:
733       if node_result['nodelist']:
734         bad = True
735         for node in node_result['nodelist']:
736           feedback_fn("  - ERROR: communication with node '%s': %s" %
737                           (node, node_result['nodelist'][node]))
738     hyp_result = node_result.get('hypervisor', None)
739     if hyp_result is not None:
740       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
741     return bad
742
743   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
744     """Verify an instance.
745
746     This function checks to see if the required block devices are
747     available on the instance's node.
748
749     """
750     bad = False
751
752     instancelist = self.cfg.GetInstanceList()
753     if not instance in instancelist:
754       feedback_fn("  - ERROR: instance %s not in instance list %s" %
755                       (instance, instancelist))
756       bad = True
757
758     instanceconfig = self.cfg.GetInstanceInfo(instance)
759     node_current = instanceconfig.primary_node
760
761     node_vol_should = {}
762     instanceconfig.MapLVsByNode(node_vol_should)
763
764     for node in node_vol_should:
765       for volume in node_vol_should[node]:
766         if node not in node_vol_is or volume not in node_vol_is[node]:
767           feedback_fn("  - ERROR: volume %s missing on node %s" %
768                           (volume, node))
769           bad = True
770
771     if not instanceconfig.status == 'down':
772       if not instance in node_instance[node_current]:
773         feedback_fn("  - ERROR: instance %s not running on node %s" %
774                         (instance, node_current))
775         bad = True
776
777     for node in node_instance:
778       if (not node == node_current):
779         if instance in node_instance[node]:
780           feedback_fn("  - ERROR: instance %s should not run on node %s" %
781                           (instance, node))
782           bad = True
783
784     return not bad
785
786   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
787     """Verify if there are any unknown volumes in the cluster.
788
789     The .os, .swap and backup volumes are ignored. All other volumes are
790     reported as unknown.
791
792     """
793     bad = False
794
795     for node in node_vol_is:
796       for volume in node_vol_is[node]:
797         if node not in node_vol_should or volume not in node_vol_should[node]:
798           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
799                       (volume, node))
800           bad = True
801     return bad
802
803   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
804     """Verify the list of running instances.
805
806     This checks what instances are running but unknown to the cluster.
807
808     """
809     bad = False
810     for node in node_instance:
811       for runninginstance in node_instance[node]:
812         if runninginstance not in instancelist:
813           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
814                           (runninginstance, node))
815           bad = True
816     return bad
817
818   def CheckPrereq(self):
819     """Check prerequisites.
820
821     This has no prerequisites.
822
823     """
824     pass
825
826   def Exec(self, feedback_fn):
827     """Verify integrity of cluster, performing various test on nodes.
828
829     """
830     bad = False
831     feedback_fn("* Verifying global settings")
832     self.cfg.VerifyConfig()
833
834     master = self.sstore.GetMasterNode()
835     vg_name = self.cfg.GetVGName()
836     nodelist = utils.NiceSort(self.cfg.GetNodeList())
837     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
838     node_volume = {}
839     node_instance = {}
840
841     # FIXME: verify OS list
842     # do local checksums
843     file_names = list(self.sstore.GetFileList())
844     file_names.append(constants.SSL_CERT_FILE)
845     file_names.append(constants.CLUSTER_CONF_FILE)
846     local_checksums = utils.FingerprintFiles(file_names)
847
848     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
849     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
850     all_instanceinfo = rpc.call_instance_list(nodelist)
851     all_vglist = rpc.call_vg_list(nodelist)
852     node_verify_param = {
853       'filelist': file_names,
854       'nodelist': nodelist,
855       'hypervisor': None,
856       }
857     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
858     all_rversion = rpc.call_version(nodelist)
859
860     for node in nodelist:
861       feedback_fn("* Verifying node %s" % node)
862       result = self._VerifyNode(node, file_names, local_checksums,
863                                 all_vglist[node], all_nvinfo[node],
864                                 all_rversion[node], feedback_fn)
865       bad = bad or result
866
867       # node_volume
868       volumeinfo = all_volumeinfo[node]
869
870       if type(volumeinfo) != dict:
871         feedback_fn("  - ERROR: connection to %s failed" % (node,))
872         bad = True
873         continue
874
875       node_volume[node] = volumeinfo
876
877       # node_instance
878       nodeinstance = all_instanceinfo[node]
879       if type(nodeinstance) != list:
880         feedback_fn("  - ERROR: connection to %s failed" % (node,))
881         bad = True
882         continue
883
884       node_instance[node] = nodeinstance
885
886     node_vol_should = {}
887
888     for instance in instancelist:
889       feedback_fn("* Verifying instance %s" % instance)
890       result =  self._VerifyInstance(instance, node_volume, node_instance,
891                                      feedback_fn)
892       bad = bad or result
893
894       inst_config = self.cfg.GetInstanceInfo(instance)
895
896       inst_config.MapLVsByNode(node_vol_should)
897
898     feedback_fn("* Verifying orphan volumes")
899     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
900                                        feedback_fn)
901     bad = bad or result
902
903     feedback_fn("* Verifying remaining instances")
904     result = self._VerifyOrphanInstances(instancelist, node_instance,
905                                          feedback_fn)
906     bad = bad or result
907
908     return int(bad)
909
910
911 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
912   """Sleep and poll for an instance's disk to sync.
913
914   """
915   if not instance.disks:
916     return True
917
918   if not oneshot:
919     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
920
921   node = instance.primary_node
922
923   for dev in instance.disks:
924     cfgw.SetDiskID(dev, node)
925
926   retries = 0
927   while True:
928     max_time = 0
929     done = True
930     cumul_degraded = False
931     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
932     if not rstats:
933       logger.ToStderr("Can't get any data from node %s" % node)
934       retries += 1
935       if retries >= 10:
936         raise errors.RemoteError("Can't contact node %s for mirror data,"
937                                  " aborting." % node)
938       time.sleep(6)
939       continue
940     retries = 0
941     for i in range(len(rstats)):
942       mstat = rstats[i]
943       if mstat is None:
944         logger.ToStderr("Can't compute data for node %s/%s" %
945                         (node, instance.disks[i].iv_name))
946         continue
947       perc_done, est_time, is_degraded = mstat
948       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
949       if perc_done is not None:
950         done = False
951         if est_time is not None:
952           rem_time = "%d estimated seconds remaining" % est_time
953           max_time = est_time
954         else:
955           rem_time = "no time estimate"
956         logger.ToStdout("- device %s: %5.2f%% done, %s" %
957                         (instance.disks[i].iv_name, perc_done, rem_time))
958     if done or oneshot:
959       break
960
961     if unlock:
962       utils.Unlock('cmd')
963     try:
964       time.sleep(min(60, max_time))
965     finally:
966       if unlock:
967         utils.Lock('cmd')
968
969   if done:
970     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
971   return not cumul_degraded
972
973
974 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
975   """Check that mirrors are not degraded.
976
977   """
978   cfgw.SetDiskID(dev, node)
979
980   result = True
981   if on_primary or dev.AssembleOnSecondary():
982     rstats = rpc.call_blockdev_find(node, dev)
983     if not rstats:
984       logger.ToStderr("Can't get any data from node %s" % node)
985       result = False
986     else:
987       result = result and (not rstats[5])
988   if dev.children:
989     for child in dev.children:
990       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
991
992   return result
993
994
995 class LUDiagnoseOS(NoHooksLU):
996   """Logical unit for OS diagnose/query.
997
998   """
999   _OP_REQP = []
1000
1001   def CheckPrereq(self):
1002     """Check prerequisites.
1003
1004     This always succeeds, since this is a pure query LU.
1005
1006     """
1007     return
1008
1009   def Exec(self, feedback_fn):
1010     """Compute the list of OSes.
1011
1012     """
1013     node_list = self.cfg.GetNodeList()
1014     node_data = rpc.call_os_diagnose(node_list)
1015     if node_data == False:
1016       raise errors.OpExecError("Can't gather the list of OSes")
1017     return node_data
1018
1019
1020 class LURemoveNode(LogicalUnit):
1021   """Logical unit for removing a node.
1022
1023   """
1024   HPATH = "node-remove"
1025   HTYPE = constants.HTYPE_NODE
1026   _OP_REQP = ["node_name"]
1027
1028   def BuildHooksEnv(self):
1029     """Build hooks env.
1030
1031     This doesn't run on the target node in the pre phase as a failed
1032     node would not allows itself to run.
1033
1034     """
1035     env = {
1036       "NODE_NAME": self.op.node_name,
1037       }
1038     all_nodes = self.cfg.GetNodeList()
1039     all_nodes.remove(self.op.node_name)
1040     return env, all_nodes, all_nodes
1041
1042   def CheckPrereq(self):
1043     """Check prerequisites.
1044
1045     This checks:
1046      - the node exists in the configuration
1047      - it does not have primary or secondary instances
1048      - it's not the master
1049
1050     Any errors are signalled by raising errors.OpPrereqError.
1051
1052     """
1053     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1054     if node is None:
1055       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1056
1057     instance_list = self.cfg.GetInstanceList()
1058
1059     masternode = self.sstore.GetMasterNode()
1060     if node.name == masternode:
1061       raise errors.OpPrereqError("Node is the master node,"
1062                                  " you need to failover first.")
1063
1064     for instance_name in instance_list:
1065       instance = self.cfg.GetInstanceInfo(instance_name)
1066       if node.name == instance.primary_node:
1067         raise errors.OpPrereqError("Instance %s still running on the node,"
1068                                    " please remove first." % instance_name)
1069       if node.name in instance.secondary_nodes:
1070         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1071                                    " please remove first." % instance_name)
1072     self.op.node_name = node.name
1073     self.node = node
1074
1075   def Exec(self, feedback_fn):
1076     """Removes the node from the cluster.
1077
1078     """
1079     node = self.node
1080     logger.Info("stopping the node daemon and removing configs from node %s" %
1081                 node.name)
1082
1083     rpc.call_node_leave_cluster(node.name)
1084
1085     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1086
1087     logger.Info("Removing node %s from config" % node.name)
1088
1089     self.cfg.RemoveNode(node.name)
1090
1091
1092 class LUQueryNodes(NoHooksLU):
1093   """Logical unit for querying nodes.
1094
1095   """
1096   _OP_REQP = ["output_fields"]
1097
1098   def CheckPrereq(self):
1099     """Check prerequisites.
1100
1101     This checks that the fields required are valid output fields.
1102
1103     """
1104     self.dynamic_fields = frozenset(["dtotal", "dfree",
1105                                      "mtotal", "mnode", "mfree"])
1106
1107     _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1108                        dynamic=self.dynamic_fields,
1109                        selected=self.op.output_fields)
1110
1111
1112   def Exec(self, feedback_fn):
1113     """Computes the list of nodes and their attributes.
1114
1115     """
1116     nodenames = utils.NiceSort(self.cfg.GetNodeList())
1117     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1118
1119
1120     # begin data gathering
1121
1122     if self.dynamic_fields.intersection(self.op.output_fields):
1123       live_data = {}
1124       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1125       for name in nodenames:
1126         nodeinfo = node_data.get(name, None)
1127         if nodeinfo:
1128           live_data[name] = {
1129             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1130             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1131             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1132             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1133             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1134             }
1135         else:
1136           live_data[name] = {}
1137     else:
1138       live_data = dict.fromkeys(nodenames, {})
1139
1140     node_to_primary = dict.fromkeys(nodenames, 0)
1141     node_to_secondary = dict.fromkeys(nodenames, 0)
1142
1143     if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1144       instancelist = self.cfg.GetInstanceList()
1145
1146       for instance in instancelist:
1147         instanceinfo = self.cfg.GetInstanceInfo(instance)
1148         node_to_primary[instanceinfo.primary_node] += 1
1149         for secnode in instanceinfo.secondary_nodes:
1150           node_to_secondary[secnode] += 1
1151
1152     # end data gathering
1153
1154     output = []
1155     for node in nodelist:
1156       node_output = []
1157       for field in self.op.output_fields:
1158         if field == "name":
1159           val = node.name
1160         elif field == "pinst":
1161           val = node_to_primary[node.name]
1162         elif field == "sinst":
1163           val = node_to_secondary[node.name]
1164         elif field == "pip":
1165           val = node.primary_ip
1166         elif field == "sip":
1167           val = node.secondary_ip
1168         elif field in self.dynamic_fields:
1169           val = live_data[node.name].get(field, "?")
1170         else:
1171           raise errors.ParameterError(field)
1172         val = str(val)
1173         node_output.append(val)
1174       output.append(node_output)
1175
1176     return output
1177
1178
1179 class LUQueryNodeVolumes(NoHooksLU):
1180   """Logical unit for getting volumes on node(s).
1181
1182   """
1183   _OP_REQP = ["nodes", "output_fields"]
1184
1185   def CheckPrereq(self):
1186     """Check prerequisites.
1187
1188     This checks that the fields required are valid output fields.
1189
1190     """
1191     self.nodes = _GetWantedNodes(self, self.op.nodes)
1192
1193     _CheckOutputFields(static=["node"],
1194                        dynamic=["phys", "vg", "name", "size", "instance"],
1195                        selected=self.op.output_fields)
1196
1197
1198   def Exec(self, feedback_fn):
1199     """Computes the list of nodes and their attributes.
1200
1201     """
1202     nodenames = utils.NiceSort([node.name for node in self.nodes])
1203     volumes = rpc.call_node_volumes(nodenames)
1204
1205     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1206              in self.cfg.GetInstanceList()]
1207
1208     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1209
1210     output = []
1211     for node in nodenames:
1212       if node not in volumes or not volumes[node]:
1213         continue
1214
1215       node_vols = volumes[node][:]
1216       node_vols.sort(key=lambda vol: vol['dev'])
1217
1218       for vol in node_vols:
1219         node_output = []
1220         for field in self.op.output_fields:
1221           if field == "node":
1222             val = node
1223           elif field == "phys":
1224             val = vol['dev']
1225           elif field == "vg":
1226             val = vol['vg']
1227           elif field == "name":
1228             val = vol['name']
1229           elif field == "size":
1230             val = int(float(vol['size']))
1231           elif field == "instance":
1232             for inst in ilist:
1233               if node not in lv_by_node[inst]:
1234                 continue
1235               if vol['name'] in lv_by_node[inst][node]:
1236                 val = inst.name
1237                 break
1238             else:
1239               val = '-'
1240           else:
1241             raise errors.ParameterError(field)
1242           node_output.append(str(val))
1243
1244         output.append(node_output)
1245
1246     return output
1247
1248
1249 class LUAddNode(LogicalUnit):
1250   """Logical unit for adding node to the cluster.
1251
1252   """
1253   HPATH = "node-add"
1254   HTYPE = constants.HTYPE_NODE
1255   _OP_REQP = ["node_name"]
1256
1257   def BuildHooksEnv(self):
1258     """Build hooks env.
1259
1260     This will run on all nodes before, and on all nodes + the new node after.
1261
1262     """
1263     env = {
1264       "NODE_NAME": self.op.node_name,
1265       "NODE_PIP": self.op.primary_ip,
1266       "NODE_SIP": self.op.secondary_ip,
1267       }
1268     nodes_0 = self.cfg.GetNodeList()
1269     nodes_1 = nodes_0 + [self.op.node_name, ]
1270     return env, nodes_0, nodes_1
1271
1272   def CheckPrereq(self):
1273     """Check prerequisites.
1274
1275     This checks:
1276      - the new node is not already in the config
1277      - it is resolvable
1278      - its parameters (single/dual homed) matches the cluster
1279
1280     Any errors are signalled by raising errors.OpPrereqError.
1281
1282     """
1283     node_name = self.op.node_name
1284     cfg = self.cfg
1285
1286     dns_data = utils.LookupHostname(node_name)
1287     if not dns_data:
1288       raise errors.OpPrereqError("Node %s is not resolvable" % node_name)
1289
1290     node = dns_data['hostname']
1291     primary_ip = self.op.primary_ip = dns_data['ip']
1292     secondary_ip = getattr(self.op, "secondary_ip", None)
1293     if secondary_ip is None:
1294       secondary_ip = primary_ip
1295     if not utils.IsValidIP(secondary_ip):
1296       raise errors.OpPrereqError("Invalid secondary IP given")
1297     self.op.secondary_ip = secondary_ip
1298     node_list = cfg.GetNodeList()
1299     if node in node_list:
1300       raise errors.OpPrereqError("Node %s is already in the configuration"
1301                                  % node)
1302
1303     for existing_node_name in node_list:
1304       existing_node = cfg.GetNodeInfo(existing_node_name)
1305       if (existing_node.primary_ip == primary_ip or
1306           existing_node.secondary_ip == primary_ip or
1307           existing_node.primary_ip == secondary_ip or
1308           existing_node.secondary_ip == secondary_ip):
1309         raise errors.OpPrereqError("New node ip address(es) conflict with"
1310                                    " existing node %s" % existing_node.name)
1311
1312     # check that the type of the node (single versus dual homed) is the
1313     # same as for the master
1314     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1315     master_singlehomed = myself.secondary_ip == myself.primary_ip
1316     newbie_singlehomed = secondary_ip == primary_ip
1317     if master_singlehomed != newbie_singlehomed:
1318       if master_singlehomed:
1319         raise errors.OpPrereqError("The master has no private ip but the"
1320                                    " new node has one")
1321       else:
1322         raise errors.OpPrereqError("The master has a private ip but the"
1323                                    " new node doesn't have one")
1324
1325     # checks reachablity
1326     command = ["fping", "-q", primary_ip]
1327     result = utils.RunCmd(command)
1328     if result.failed:
1329       raise errors.OpPrereqError("Node not reachable by ping")
1330
1331     if not newbie_singlehomed:
1332       # check reachability from my secondary ip to newbie's secondary ip
1333       command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1334       result = utils.RunCmd(command)
1335       if result.failed:
1336         raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1337
1338     self.new_node = objects.Node(name=node,
1339                                  primary_ip=primary_ip,
1340                                  secondary_ip=secondary_ip)
1341
1342   def Exec(self, feedback_fn):
1343     """Adds the new node to the cluster.
1344
1345     """
1346     new_node = self.new_node
1347     node = new_node.name
1348
1349     # set up inter-node password and certificate and restarts the node daemon
1350     gntpass = self.sstore.GetNodeDaemonPassword()
1351     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1352       raise errors.OpExecError("ganeti password corruption detected")
1353     f = open(constants.SSL_CERT_FILE)
1354     try:
1355       gntpem = f.read(8192)
1356     finally:
1357       f.close()
1358     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1359     # so we use this to detect an invalid certificate; as long as the
1360     # cert doesn't contain this, the here-document will be correctly
1361     # parsed by the shell sequence below
1362     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1363       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1364     if not gntpem.endswith("\n"):
1365       raise errors.OpExecError("PEM must end with newline")
1366     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1367
1368     # remove first the root's known_hosts file
1369     utils.RemoveFile("/root/.ssh/known_hosts")
1370     # and then connect with ssh to set password and start ganeti-noded
1371     # note that all the below variables are sanitized at this point,
1372     # either by being constants or by the checks above
1373     ss = self.sstore
1374     mycommand = ("umask 077 && "
1375                  "echo '%s' > '%s' && "
1376                  "cat > '%s' << '!EOF.' && \n"
1377                  "%s!EOF.\n%s restart" %
1378                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1379                   constants.SSL_CERT_FILE, gntpem,
1380                   constants.NODE_INITD_SCRIPT))
1381
1382     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1383     if result.failed:
1384       raise errors.OpExecError("Remote command on node %s, error: %s,"
1385                                " output: %s" %
1386                                (node, result.fail_reason, result.output))
1387
1388     # check connectivity
1389     time.sleep(4)
1390
1391     result = rpc.call_version([node])[node]
1392     if result:
1393       if constants.PROTOCOL_VERSION == result:
1394         logger.Info("communication to node %s fine, sw version %s match" %
1395                     (node, result))
1396       else:
1397         raise errors.OpExecError("Version mismatch master version %s,"
1398                                  " node version %s" %
1399                                  (constants.PROTOCOL_VERSION, result))
1400     else:
1401       raise errors.OpExecError("Cannot get version from the new node")
1402
1403     # setup ssh on node
1404     logger.Info("copy ssh key to node %s" % node)
1405     keyarray = []
1406     keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1407                 "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1408                 "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1409
1410     for i in keyfiles:
1411       f = open(i, 'r')
1412       try:
1413         keyarray.append(f.read())
1414       finally:
1415         f.close()
1416
1417     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1418                                keyarray[3], keyarray[4], keyarray[5])
1419
1420     if not result:
1421       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1422
1423     # Add node to our /etc/hosts, and add key to known_hosts
1424     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1425     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1426                       self.cfg.GetHostKey())
1427
1428     if new_node.secondary_ip != new_node.primary_ip:
1429       result = ssh.SSHCall(node, "root",
1430                            "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1431       if result.failed:
1432         raise errors.OpExecError("Node claims it doesn't have the"
1433                                  " secondary ip you gave (%s).\n"
1434                                  "Please fix and re-run this command." %
1435                                  new_node.secondary_ip)
1436
1437     # Distribute updated /etc/hosts and known_hosts to all nodes,
1438     # including the node just added
1439     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1440     dist_nodes = self.cfg.GetNodeList() + [node]
1441     if myself.name in dist_nodes:
1442       dist_nodes.remove(myself.name)
1443
1444     logger.Debug("Copying hosts and known_hosts to all nodes")
1445     for fname in ("/etc/hosts", "/etc/ssh/ssh_known_hosts"):
1446       result = rpc.call_upload_file(dist_nodes, fname)
1447       for to_node in dist_nodes:
1448         if not result[to_node]:
1449           logger.Error("copy of file %s to node %s failed" %
1450                        (fname, to_node))
1451
1452     to_copy = ss.GetFileList()
1453     for fname in to_copy:
1454       if not ssh.CopyFileToNode(node, fname):
1455         logger.Error("could not copy file %s to node %s" % (fname, node))
1456
1457     logger.Info("adding node %s to cluster.conf" % node)
1458     self.cfg.AddNode(new_node)
1459
1460
1461 class LUMasterFailover(LogicalUnit):
1462   """Failover the master node to the current node.
1463
1464   This is a special LU in that it must run on a non-master node.
1465
1466   """
1467   HPATH = "master-failover"
1468   HTYPE = constants.HTYPE_CLUSTER
1469   REQ_MASTER = False
1470   _OP_REQP = []
1471
1472   def BuildHooksEnv(self):
1473     """Build hooks env.
1474
1475     This will run on the new master only in the pre phase, and on all
1476     the nodes in the post phase.
1477
1478     """
1479     env = {
1480       "NEW_MASTER": self.new_master,
1481       "OLD_MASTER": self.old_master,
1482       }
1483     return env, [self.new_master], self.cfg.GetNodeList()
1484
1485   def CheckPrereq(self):
1486     """Check prerequisites.
1487
1488     This checks that we are not already the master.
1489
1490     """
1491     self.new_master = socket.gethostname()
1492
1493     self.old_master = self.sstore.GetMasterNode()
1494
1495     if self.old_master == self.new_master:
1496       raise errors.OpPrereqError("This commands must be run on the node"
1497                                  " where you want the new master to be.\n"
1498                                  "%s is already the master" %
1499                                  self.old_master)
1500
1501   def Exec(self, feedback_fn):
1502     """Failover the master node.
1503
1504     This command, when run on a non-master node, will cause the current
1505     master to cease being master, and the non-master to become new
1506     master.
1507
1508     """
1509     #TODO: do not rely on gethostname returning the FQDN
1510     logger.Info("setting master to %s, old master: %s" %
1511                 (self.new_master, self.old_master))
1512
1513     if not rpc.call_node_stop_master(self.old_master):
1514       logger.Error("could disable the master role on the old master"
1515                    " %s, please disable manually" % self.old_master)
1516
1517     ss = self.sstore
1518     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1519     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1520                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1521       logger.Error("could not distribute the new simple store master file"
1522                    " to the other nodes, please check.")
1523
1524     if not rpc.call_node_start_master(self.new_master):
1525       logger.Error("could not start the master role on the new master"
1526                    " %s, please check" % self.new_master)
1527       feedback_fn("Error in activating the master IP on the new master,\n"
1528                   "please fix manually.")
1529
1530
1531
1532 class LUQueryClusterInfo(NoHooksLU):
1533   """Query cluster configuration.
1534
1535   """
1536   _OP_REQP = []
1537   REQ_MASTER = False
1538
1539   def CheckPrereq(self):
1540     """No prerequsites needed for this LU.
1541
1542     """
1543     pass
1544
1545   def Exec(self, feedback_fn):
1546     """Return cluster config.
1547
1548     """
1549     result = {
1550       "name": self.sstore.GetClusterName(),
1551       "software_version": constants.RELEASE_VERSION,
1552       "protocol_version": constants.PROTOCOL_VERSION,
1553       "config_version": constants.CONFIG_VERSION,
1554       "os_api_version": constants.OS_API_VERSION,
1555       "export_version": constants.EXPORT_VERSION,
1556       "master": self.sstore.GetMasterNode(),
1557       "architecture": (platform.architecture()[0], platform.machine()),
1558       }
1559
1560     return result
1561
1562
1563 class LUClusterCopyFile(NoHooksLU):
1564   """Copy file to cluster.
1565
1566   """
1567   _OP_REQP = ["nodes", "filename"]
1568
1569   def CheckPrereq(self):
1570     """Check prerequisites.
1571
1572     It should check that the named file exists and that the given list
1573     of nodes is valid.
1574
1575     """
1576     if not os.path.exists(self.op.filename):
1577       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1578
1579     self.nodes = _GetWantedNodes(self, self.op.nodes)
1580
1581   def Exec(self, feedback_fn):
1582     """Copy a file from master to some nodes.
1583
1584     Args:
1585       opts - class with options as members
1586       args - list containing a single element, the file name
1587     Opts used:
1588       nodes - list containing the name of target nodes; if empty, all nodes
1589
1590     """
1591     filename = self.op.filename
1592
1593     myname = socket.gethostname()
1594
1595     for node in self.nodes:
1596       if node == myname:
1597         continue
1598       if not ssh.CopyFileToNode(node, filename):
1599         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1600
1601
1602 class LUDumpClusterConfig(NoHooksLU):
1603   """Return a text-representation of the cluster-config.
1604
1605   """
1606   _OP_REQP = []
1607
1608   def CheckPrereq(self):
1609     """No prerequisites.
1610
1611     """
1612     pass
1613
1614   def Exec(self, feedback_fn):
1615     """Dump a representation of the cluster config to the standard output.
1616
1617     """
1618     return self.cfg.DumpConfig()
1619
1620
1621 class LURunClusterCommand(NoHooksLU):
1622   """Run a command on some nodes.
1623
1624   """
1625   _OP_REQP = ["command", "nodes"]
1626
1627   def CheckPrereq(self):
1628     """Check prerequisites.
1629
1630     It checks that the given list of nodes is valid.
1631
1632     """
1633     self.nodes = _GetWantedNodes(self, self.op.nodes)
1634
1635   def Exec(self, feedback_fn):
1636     """Run a command on some nodes.
1637
1638     """
1639     data = []
1640     for node in self.nodes:
1641       result = utils.RunCmd(["ssh", node.name, self.op.command])
1642       data.append((node.name, result.cmd, result.output, result.exit_code))
1643
1644     return data
1645
1646
1647 class LUActivateInstanceDisks(NoHooksLU):
1648   """Bring up an instance's disks.
1649
1650   """
1651   _OP_REQP = ["instance_name"]
1652
1653   def CheckPrereq(self):
1654     """Check prerequisites.
1655
1656     This checks that the instance is in the cluster.
1657
1658     """
1659     instance = self.cfg.GetInstanceInfo(
1660       self.cfg.ExpandInstanceName(self.op.instance_name))
1661     if instance is None:
1662       raise errors.OpPrereqError("Instance '%s' not known" %
1663                                  self.op.instance_name)
1664     self.instance = instance
1665
1666
1667   def Exec(self, feedback_fn):
1668     """Activate the disks.
1669
1670     """
1671     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1672     if not disks_ok:
1673       raise errors.OpExecError("Cannot activate block devices")
1674
1675     return disks_info
1676
1677
1678 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1679   """Prepare the block devices for an instance.
1680
1681   This sets up the block devices on all nodes.
1682
1683   Args:
1684     instance: a ganeti.objects.Instance object
1685     ignore_secondaries: if true, errors on secondary nodes won't result
1686                         in an error return from the function
1687
1688   Returns:
1689     false if the operation failed
1690     list of (host, instance_visible_name, node_visible_name) if the operation
1691          suceeded with the mapping from node devices to instance devices
1692   """
1693   device_info = []
1694   disks_ok = True
1695   for inst_disk in instance.disks:
1696     master_result = None
1697     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1698       cfg.SetDiskID(node_disk, node)
1699       is_primary = node == instance.primary_node
1700       result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1701       if not result:
1702         logger.Error("could not prepare block device %s on node %s (is_pri"
1703                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1704         if is_primary or not ignore_secondaries:
1705           disks_ok = False
1706       if is_primary:
1707         master_result = result
1708     device_info.append((instance.primary_node, inst_disk.iv_name,
1709                         master_result))
1710
1711   return disks_ok, device_info
1712
1713
1714 def _StartInstanceDisks(cfg, instance, force):
1715   """Start the disks of an instance.
1716
1717   """
1718   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1719                                            ignore_secondaries=force)
1720   if not disks_ok:
1721     _ShutdownInstanceDisks(instance, cfg)
1722     if force is not None and not force:
1723       logger.Error("If the message above refers to a secondary node,"
1724                    " you can retry the operation using '--force'.")
1725     raise errors.OpExecError("Disk consistency error")
1726
1727
1728 class LUDeactivateInstanceDisks(NoHooksLU):
1729   """Shutdown an instance's disks.
1730
1731   """
1732   _OP_REQP = ["instance_name"]
1733
1734   def CheckPrereq(self):
1735     """Check prerequisites.
1736
1737     This checks that the instance is in the cluster.
1738
1739     """
1740     instance = self.cfg.GetInstanceInfo(
1741       self.cfg.ExpandInstanceName(self.op.instance_name))
1742     if instance is None:
1743       raise errors.OpPrereqError("Instance '%s' not known" %
1744                                  self.op.instance_name)
1745     self.instance = instance
1746
1747   def Exec(self, feedback_fn):
1748     """Deactivate the disks
1749
1750     """
1751     instance = self.instance
1752     ins_l = rpc.call_instance_list([instance.primary_node])
1753     ins_l = ins_l[instance.primary_node]
1754     if not type(ins_l) is list:
1755       raise errors.OpExecError("Can't contact node '%s'" %
1756                                instance.primary_node)
1757
1758     if self.instance.name in ins_l:
1759       raise errors.OpExecError("Instance is running, can't shutdown"
1760                                " block devices.")
1761
1762     _ShutdownInstanceDisks(instance, self.cfg)
1763
1764
1765 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1766   """Shutdown block devices of an instance.
1767
1768   This does the shutdown on all nodes of the instance.
1769
1770   If the ignore_primary is false, errors on the primary node are
1771   ignored.
1772
1773   """
1774   result = True
1775   for disk in instance.disks:
1776     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1777       cfg.SetDiskID(top_disk, node)
1778       if not rpc.call_blockdev_shutdown(node, top_disk):
1779         logger.Error("could not shutdown block device %s on node %s" %
1780                      (disk.iv_name, node))
1781         if not ignore_primary or node != instance.primary_node:
1782           result = False
1783   return result
1784
1785
1786 class LUStartupInstance(LogicalUnit):
1787   """Starts an instance.
1788
1789   """
1790   HPATH = "instance-start"
1791   HTYPE = constants.HTYPE_INSTANCE
1792   _OP_REQP = ["instance_name", "force"]
1793
1794   def BuildHooksEnv(self):
1795     """Build hooks env.
1796
1797     This runs on master, primary and secondary nodes of the instance.
1798
1799     """
1800     env = {
1801       "FORCE": self.op.force,
1802       }
1803     env.update(_BuildInstanceHookEnvByObject(self.instance))
1804     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1805           list(self.instance.secondary_nodes))
1806     return env, nl, nl
1807
1808   def CheckPrereq(self):
1809     """Check prerequisites.
1810
1811     This checks that the instance is in the cluster.
1812
1813     """
1814     instance = self.cfg.GetInstanceInfo(
1815       self.cfg.ExpandInstanceName(self.op.instance_name))
1816     if instance is None:
1817       raise errors.OpPrereqError("Instance '%s' not known" %
1818                                  self.op.instance_name)
1819
1820     # check bridges existance
1821     brlist = [nic.bridge for nic in instance.nics]
1822     if not rpc.call_bridges_exist(instance.primary_node, brlist):
1823       raise errors.OpPrereqError("one or more target bridges %s does not"
1824                                  " exist on destination node '%s'" %
1825                                  (brlist, instance.primary_node))
1826
1827     self.instance = instance
1828     self.op.instance_name = instance.name
1829
1830   def Exec(self, feedback_fn):
1831     """Start the instance.
1832
1833     """
1834     instance = self.instance
1835     force = self.op.force
1836     extra_args = getattr(self.op, "extra_args", "")
1837
1838     node_current = instance.primary_node
1839
1840     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1841     if not nodeinfo:
1842       raise errors.OpExecError("Could not contact node %s for infos" %
1843                                (node_current))
1844
1845     freememory = nodeinfo[node_current]['memory_free']
1846     memory = instance.memory
1847     if memory > freememory:
1848       raise errors.OpExecError("Not enough memory to start instance"
1849                                " %s on node %s"
1850                                " needed %s MiB, available %s MiB" %
1851                                (instance.name, node_current, memory,
1852                                 freememory))
1853
1854     _StartInstanceDisks(self.cfg, instance, force)
1855
1856     if not rpc.call_instance_start(node_current, instance, extra_args):
1857       _ShutdownInstanceDisks(instance, self.cfg)
1858       raise errors.OpExecError("Could not start instance")
1859
1860     self.cfg.MarkInstanceUp(instance.name)
1861
1862
1863 class LUShutdownInstance(LogicalUnit):
1864   """Shutdown an instance.
1865
1866   """
1867   HPATH = "instance-stop"
1868   HTYPE = constants.HTYPE_INSTANCE
1869   _OP_REQP = ["instance_name"]
1870
1871   def BuildHooksEnv(self):
1872     """Build hooks env.
1873
1874     This runs on master, primary and secondary nodes of the instance.
1875
1876     """
1877     env = _BuildInstanceHookEnvByObject(self.instance)
1878     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1879           list(self.instance.secondary_nodes))
1880     return env, nl, nl
1881
1882   def CheckPrereq(self):
1883     """Check prerequisites.
1884
1885     This checks that the instance is in the cluster.
1886
1887     """
1888     instance = self.cfg.GetInstanceInfo(
1889       self.cfg.ExpandInstanceName(self.op.instance_name))
1890     if instance is None:
1891       raise errors.OpPrereqError("Instance '%s' not known" %
1892                                  self.op.instance_name)
1893     self.instance = instance
1894
1895   def Exec(self, feedback_fn):
1896     """Shutdown the instance.
1897
1898     """
1899     instance = self.instance
1900     node_current = instance.primary_node
1901     if not rpc.call_instance_shutdown(node_current, instance):
1902       logger.Error("could not shutdown instance")
1903
1904     self.cfg.MarkInstanceDown(instance.name)
1905     _ShutdownInstanceDisks(instance, self.cfg)
1906
1907
1908 class LUReinstallInstance(LogicalUnit):
1909   """Reinstall an instance.
1910
1911   """
1912   HPATH = "instance-reinstall"
1913   HTYPE = constants.HTYPE_INSTANCE
1914   _OP_REQP = ["instance_name"]
1915
1916   def BuildHooksEnv(self):
1917     """Build hooks env.
1918
1919     This runs on master, primary and secondary nodes of the instance.
1920
1921     """
1922     env = _BuildInstanceHookEnvByObject(self.instance)
1923     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1924           list(self.instance.secondary_nodes))
1925     return env, nl, nl
1926
1927   def CheckPrereq(self):
1928     """Check prerequisites.
1929
1930     This checks that the instance is in the cluster and is not running.
1931
1932     """
1933     instance = self.cfg.GetInstanceInfo(
1934       self.cfg.ExpandInstanceName(self.op.instance_name))
1935     if instance is None:
1936       raise errors.OpPrereqError("Instance '%s' not known" %
1937                                  self.op.instance_name)
1938     if instance.disk_template == constants.DT_DISKLESS:
1939       raise errors.OpPrereqError("Instance '%s' has no disks" %
1940                                  self.op.instance_name)
1941     if instance.status != "down":
1942       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
1943                                  self.op.instance_name)
1944     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1945     if remote_info:
1946       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
1947                                  (self.op.instance_name,
1948                                   instance.primary_node))
1949
1950     self.op.os_type = getattr(self.op, "os_type", None)
1951     if self.op.os_type is not None:
1952       # OS verification
1953       pnode = self.cfg.GetNodeInfo(
1954         self.cfg.ExpandNodeName(instance.primary_node))
1955       if pnode is None:
1956         raise errors.OpPrereqError("Primary node '%s' is unknown" %
1957                                    self.op.pnode)
1958       os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
1959       if not isinstance(os_obj, objects.OS):
1960         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
1961                                    " primary node"  % self.op.os_type)
1962
1963     self.instance = instance
1964
1965   def Exec(self, feedback_fn):
1966     """Reinstall the instance.
1967
1968     """
1969     inst = self.instance
1970
1971     if self.op.os_type is not None:
1972       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
1973       inst.os = self.op.os_type
1974       self.cfg.AddInstance(inst)
1975
1976     _StartInstanceDisks(self.cfg, inst, None)
1977     try:
1978       feedback_fn("Running the instance OS create scripts...")
1979       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
1980         raise errors.OpExecError("Could not install OS for instance %s "
1981                                  "on node %s" %
1982                                  (inst.name, inst.primary_node))
1983     finally:
1984       _ShutdownInstanceDisks(inst, self.cfg)
1985
1986
1987 class LURemoveInstance(LogicalUnit):
1988   """Remove an instance.
1989
1990   """
1991   HPATH = "instance-remove"
1992   HTYPE = constants.HTYPE_INSTANCE
1993   _OP_REQP = ["instance_name"]
1994
1995   def BuildHooksEnv(self):
1996     """Build hooks env.
1997
1998     This runs on master, primary and secondary nodes of the instance.
1999
2000     """
2001     env = _BuildInstanceHookEnvByObject(self.instance)
2002     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2003           list(self.instance.secondary_nodes))
2004     return env, nl, nl
2005
2006   def CheckPrereq(self):
2007     """Check prerequisites.
2008
2009     This checks that the instance is in the cluster.
2010
2011     """
2012     instance = self.cfg.GetInstanceInfo(
2013       self.cfg.ExpandInstanceName(self.op.instance_name))
2014     if instance is None:
2015       raise errors.OpPrereqError("Instance '%s' not known" %
2016                                  self.op.instance_name)
2017     self.instance = instance
2018
2019   def Exec(self, feedback_fn):
2020     """Remove the instance.
2021
2022     """
2023     instance = self.instance
2024     logger.Info("shutting down instance %s on node %s" %
2025                 (instance.name, instance.primary_node))
2026
2027     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2028       raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2029                                (instance.name, instance.primary_node))
2030
2031     logger.Info("removing block devices for instance %s" % instance.name)
2032
2033     _RemoveDisks(instance, self.cfg)
2034
2035     logger.Info("removing instance %s out of cluster config" % instance.name)
2036
2037     self.cfg.RemoveInstance(instance.name)
2038
2039
2040 class LUQueryInstances(NoHooksLU):
2041   """Logical unit for querying instances.
2042
2043   """
2044   _OP_REQP = ["output_fields"]
2045
2046   def CheckPrereq(self):
2047     """Check prerequisites.
2048
2049     This checks that the fields required are valid output fields.
2050
2051     """
2052     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2053     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2054                                "admin_state", "admin_ram",
2055                                "disk_template", "ip", "mac", "bridge"],
2056                        dynamic=self.dynamic_fields,
2057                        selected=self.op.output_fields)
2058
2059   def Exec(self, feedback_fn):
2060     """Computes the list of nodes and their attributes.
2061
2062     """
2063     instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2064     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2065                      in instance_names]
2066
2067     # begin data gathering
2068
2069     nodes = frozenset([inst.primary_node for inst in instance_list])
2070
2071     bad_nodes = []
2072     if self.dynamic_fields.intersection(self.op.output_fields):
2073       live_data = {}
2074       node_data = rpc.call_all_instances_info(nodes)
2075       for name in nodes:
2076         result = node_data[name]
2077         if result:
2078           live_data.update(result)
2079         elif result == False:
2080           bad_nodes.append(name)
2081         # else no instance is alive
2082     else:
2083       live_data = dict([(name, {}) for name in instance_names])
2084
2085     # end data gathering
2086
2087     output = []
2088     for instance in instance_list:
2089       iout = []
2090       for field in self.op.output_fields:
2091         if field == "name":
2092           val = instance.name
2093         elif field == "os":
2094           val = instance.os
2095         elif field == "pnode":
2096           val = instance.primary_node
2097         elif field == "snodes":
2098           val = ",".join(instance.secondary_nodes) or "-"
2099         elif field == "admin_state":
2100           if instance.status == "down":
2101             val = "no"
2102           else:
2103             val = "yes"
2104         elif field == "oper_state":
2105           if instance.primary_node in bad_nodes:
2106             val = "(node down)"
2107           else:
2108             if live_data.get(instance.name):
2109               val = "running"
2110             else:
2111               val = "stopped"
2112         elif field == "admin_ram":
2113           val = instance.memory
2114         elif field == "oper_ram":
2115           if instance.primary_node in bad_nodes:
2116             val = "(node down)"
2117           elif instance.name in live_data:
2118             val = live_data[instance.name].get("memory", "?")
2119           else:
2120             val = "-"
2121         elif field == "disk_template":
2122           val = instance.disk_template
2123         elif field == "ip":
2124           val = instance.nics[0].ip
2125         elif field == "bridge":
2126           val = instance.nics[0].bridge
2127         elif field == "mac":
2128           val = instance.nics[0].mac
2129         else:
2130           raise errors.ParameterError(field)
2131         val = str(val)
2132         iout.append(val)
2133       output.append(iout)
2134
2135     return output
2136
2137
2138 class LUFailoverInstance(LogicalUnit):
2139   """Failover an instance.
2140
2141   """
2142   HPATH = "instance-failover"
2143   HTYPE = constants.HTYPE_INSTANCE
2144   _OP_REQP = ["instance_name", "ignore_consistency"]
2145
2146   def BuildHooksEnv(self):
2147     """Build hooks env.
2148
2149     This runs on master, primary and secondary nodes of the instance.
2150
2151     """
2152     env = {
2153       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2154       }
2155     env.update(_BuildInstanceHookEnvByObject(self.instance))
2156     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2157     return env, nl, nl
2158
2159   def CheckPrereq(self):
2160     """Check prerequisites.
2161
2162     This checks that the instance is in the cluster.
2163
2164     """
2165     instance = self.cfg.GetInstanceInfo(
2166       self.cfg.ExpandInstanceName(self.op.instance_name))
2167     if instance is None:
2168       raise errors.OpPrereqError("Instance '%s' not known" %
2169                                  self.op.instance_name)
2170
2171     # check memory requirements on the secondary node
2172     target_node = instance.secondary_nodes[0]
2173     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2174     info = nodeinfo.get(target_node, None)
2175     if not info:
2176       raise errors.OpPrereqError("Cannot get current information"
2177                                  " from node '%s'" % nodeinfo)
2178     if instance.memory > info['memory_free']:
2179       raise errors.OpPrereqError("Not enough memory on target node %s."
2180                                  " %d MB available, %d MB required" %
2181                                  (target_node, info['memory_free'],
2182                                   instance.memory))
2183
2184     # check bridge existance
2185     brlist = [nic.bridge for nic in instance.nics]
2186     if not rpc.call_bridges_exist(instance.primary_node, brlist):
2187       raise errors.OpPrereqError("One or more target bridges %s does not"
2188                                  " exist on destination node '%s'" %
2189                                  (brlist, instance.primary_node))
2190
2191     self.instance = instance
2192
2193   def Exec(self, feedback_fn):
2194     """Failover an instance.
2195
2196     The failover is done by shutting it down on its present node and
2197     starting it on the secondary.
2198
2199     """
2200     instance = self.instance
2201
2202     source_node = instance.primary_node
2203     target_node = instance.secondary_nodes[0]
2204
2205     feedback_fn("* checking disk consistency between source and target")
2206     for dev in instance.disks:
2207       # for remote_raid1, these are md over drbd
2208       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2209         if not self.op.ignore_consistency:
2210           raise errors.OpExecError("Disk %s is degraded on target node,"
2211                                    " aborting failover." % dev.iv_name)
2212
2213     feedback_fn("* checking target node resource availability")
2214     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2215
2216     if not nodeinfo:
2217       raise errors.OpExecError("Could not contact target node %s." %
2218                                target_node)
2219
2220     free_memory = int(nodeinfo[target_node]['memory_free'])
2221     memory = instance.memory
2222     if memory > free_memory:
2223       raise errors.OpExecError("Not enough memory to create instance %s on"
2224                                " node %s. needed %s MiB, available %s MiB" %
2225                                (instance.name, target_node, memory,
2226                                 free_memory))
2227
2228     feedback_fn("* shutting down instance on source node")
2229     logger.Info("Shutting down instance %s on node %s" %
2230                 (instance.name, source_node))
2231
2232     if not rpc.call_instance_shutdown(source_node, instance):
2233       logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2234                    " anyway. Please make sure node %s is down"  %
2235                    (instance.name, source_node, source_node))
2236
2237     feedback_fn("* deactivating the instance's disks on source node")
2238     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2239       raise errors.OpExecError("Can't shut down the instance's disks.")
2240
2241     instance.primary_node = target_node
2242     # distribute new instance config to the other nodes
2243     self.cfg.AddInstance(instance)
2244
2245     feedback_fn("* activating the instance's disks on target node")
2246     logger.Info("Starting instance %s on node %s" %
2247                 (instance.name, target_node))
2248
2249     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2250                                              ignore_secondaries=True)
2251     if not disks_ok:
2252       _ShutdownInstanceDisks(instance, self.cfg)
2253       raise errors.OpExecError("Can't activate the instance's disks")
2254
2255     feedback_fn("* starting the instance on the target node")
2256     if not rpc.call_instance_start(target_node, instance, None):
2257       _ShutdownInstanceDisks(instance, self.cfg)
2258       raise errors.OpExecError("Could not start instance %s on node %s." %
2259                                (instance.name, target_node))
2260
2261
2262 def _CreateBlockDevOnPrimary(cfg, node, device, info):
2263   """Create a tree of block devices on the primary node.
2264
2265   This always creates all devices.
2266
2267   """
2268   if device.children:
2269     for child in device.children:
2270       if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2271         return False
2272
2273   cfg.SetDiskID(device, node)
2274   new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2275   if not new_id:
2276     return False
2277   if device.physical_id is None:
2278     device.physical_id = new_id
2279   return True
2280
2281
2282 def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2283   """Create a tree of block devices on a secondary node.
2284
2285   If this device type has to be created on secondaries, create it and
2286   all its children.
2287
2288   If not, just recurse to children keeping the same 'force' value.
2289
2290   """
2291   if device.CreateOnSecondary():
2292     force = True
2293   if device.children:
2294     for child in device.children:
2295       if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2296         return False
2297
2298   if not force:
2299     return True
2300   cfg.SetDiskID(device, node)
2301   new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2302   if not new_id:
2303     return False
2304   if device.physical_id is None:
2305     device.physical_id = new_id
2306   return True
2307
2308
2309 def _GenerateUniqueNames(cfg, exts):
2310   """Generate a suitable LV name.
2311
2312   This will generate a logical volume name for the given instance.
2313
2314   """
2315   results = []
2316   for val in exts:
2317     new_id = cfg.GenerateUniqueID()
2318     results.append("%s%s" % (new_id, val))
2319   return results
2320
2321
2322 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2323   """Generate a drbd device complete with its children.
2324
2325   """
2326   port = cfg.AllocatePort()
2327   vgname = cfg.GetVGName()
2328   dev_data = objects.Disk(dev_type="lvm", size=size,
2329                           logical_id=(vgname, names[0]))
2330   dev_meta = objects.Disk(dev_type="lvm", size=128,
2331                           logical_id=(vgname, names[1]))
2332   drbd_dev = objects.Disk(dev_type="drbd", size=size,
2333                           logical_id = (primary, secondary, port),
2334                           children = [dev_data, dev_meta])
2335   return drbd_dev
2336
2337
2338 def _GenerateDiskTemplate(cfg, template_name,
2339                           instance_name, primary_node,
2340                           secondary_nodes, disk_sz, swap_sz):
2341   """Generate the entire disk layout for a given template type.
2342
2343   """
2344   #TODO: compute space requirements
2345
2346   vgname = cfg.GetVGName()
2347   if template_name == "diskless":
2348     disks = []
2349   elif template_name == "plain":
2350     if len(secondary_nodes) != 0:
2351       raise errors.ProgrammerError("Wrong template configuration")
2352
2353     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2354     sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2355                            logical_id=(vgname, names[0]),
2356                            iv_name = "sda")
2357     sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2358                            logical_id=(vgname, names[1]),
2359                            iv_name = "sdb")
2360     disks = [sda_dev, sdb_dev]
2361   elif template_name == "local_raid1":
2362     if len(secondary_nodes) != 0:
2363       raise errors.ProgrammerError("Wrong template configuration")
2364
2365
2366     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2367                                        ".sdb_m1", ".sdb_m2"])
2368     sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2369                               logical_id=(vgname, names[0]))
2370     sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2371                               logical_id=(vgname, names[1]))
2372     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2373                               size=disk_sz,
2374                               children = [sda_dev_m1, sda_dev_m2])
2375     sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2376                               logical_id=(vgname, names[2]))
2377     sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2378                               logical_id=(vgname, names[3]))
2379     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2380                               size=swap_sz,
2381                               children = [sdb_dev_m1, sdb_dev_m2])
2382     disks = [md_sda_dev, md_sdb_dev]
2383   elif template_name == "remote_raid1":
2384     if len(secondary_nodes) != 1:
2385       raise errors.ProgrammerError("Wrong template configuration")
2386     remote_node = secondary_nodes[0]
2387     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2388                                        ".sdb_data", ".sdb_meta"])
2389     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2390                                          disk_sz, names[0:2])
2391     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2392                               children = [drbd_sda_dev], size=disk_sz)
2393     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2394                                          swap_sz, names[2:4])
2395     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2396                               children = [drbd_sdb_dev], size=swap_sz)
2397     disks = [md_sda_dev, md_sdb_dev]
2398   else:
2399     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2400   return disks
2401
2402
2403 def _GetInstanceInfoText(instance):
2404   """Compute that text that should be added to the disk's metadata.
2405
2406   """
2407   return "originstname+%s" % instance.name
2408
2409
2410 def _CreateDisks(cfg, instance):
2411   """Create all disks for an instance.
2412
2413   This abstracts away some work from AddInstance.
2414
2415   Args:
2416     instance: the instance object
2417
2418   Returns:
2419     True or False showing the success of the creation process
2420
2421   """
2422   info = _GetInstanceInfoText(instance)
2423
2424   for device in instance.disks:
2425     logger.Info("creating volume %s for instance %s" %
2426               (device.iv_name, instance.name))
2427     #HARDCODE
2428     for secondary_node in instance.secondary_nodes:
2429       if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2430                                         info):
2431         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2432                      (device.iv_name, device, secondary_node))
2433         return False
2434     #HARDCODE
2435     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2436       logger.Error("failed to create volume %s on primary!" %
2437                    device.iv_name)
2438       return False
2439   return True
2440
2441
2442 def _RemoveDisks(instance, cfg):
2443   """Remove all disks for an instance.
2444
2445   This abstracts away some work from `AddInstance()` and
2446   `RemoveInstance()`. Note that in case some of the devices couldn't
2447   be remove, the removal will continue with the other ones (compare
2448   with `_CreateDisks()`).
2449
2450   Args:
2451     instance: the instance object
2452
2453   Returns:
2454     True or False showing the success of the removal proces
2455
2456   """
2457   logger.Info("removing block devices for instance %s" % instance.name)
2458
2459   result = True
2460   for device in instance.disks:
2461     for node, disk in device.ComputeNodeTree(instance.primary_node):
2462       cfg.SetDiskID(disk, node)
2463       if not rpc.call_blockdev_remove(node, disk):
2464         logger.Error("could not remove block device %s on node %s,"
2465                      " continuing anyway" %
2466                      (device.iv_name, node))
2467         result = False
2468   return result
2469
2470
2471 class LUCreateInstance(LogicalUnit):
2472   """Create an instance.
2473
2474   """
2475   HPATH = "instance-add"
2476   HTYPE = constants.HTYPE_INSTANCE
2477   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2478               "disk_template", "swap_size", "mode", "start", "vcpus",
2479               "wait_for_sync"]
2480
2481   def BuildHooksEnv(self):
2482     """Build hooks env.
2483
2484     This runs on master, primary and secondary nodes of the instance.
2485
2486     """
2487     env = {
2488       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2489       "INSTANCE_DISK_SIZE": self.op.disk_size,
2490       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2491       "INSTANCE_ADD_MODE": self.op.mode,
2492       }
2493     if self.op.mode == constants.INSTANCE_IMPORT:
2494       env["INSTANCE_SRC_NODE"] = self.op.src_node
2495       env["INSTANCE_SRC_PATH"] = self.op.src_path
2496       env["INSTANCE_SRC_IMAGE"] = self.src_image
2497
2498     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2499       primary_node=self.op.pnode,
2500       secondary_nodes=self.secondaries,
2501       status=self.instance_status,
2502       os_type=self.op.os_type,
2503       memory=self.op.mem_size,
2504       vcpus=self.op.vcpus,
2505       nics=[(self.inst_ip, self.op.bridge)],
2506     ))
2507
2508     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2509           self.secondaries)
2510     return env, nl, nl
2511
2512
2513   def CheckPrereq(self):
2514     """Check prerequisites.
2515
2516     """
2517     if self.op.mode not in (constants.INSTANCE_CREATE,
2518                             constants.INSTANCE_IMPORT):
2519       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2520                                  self.op.mode)
2521
2522     if self.op.mode == constants.INSTANCE_IMPORT:
2523       src_node = getattr(self.op, "src_node", None)
2524       src_path = getattr(self.op, "src_path", None)
2525       if src_node is None or src_path is None:
2526         raise errors.OpPrereqError("Importing an instance requires source"
2527                                    " node and path options")
2528       src_node_full = self.cfg.ExpandNodeName(src_node)
2529       if src_node_full is None:
2530         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2531       self.op.src_node = src_node = src_node_full
2532
2533       if not os.path.isabs(src_path):
2534         raise errors.OpPrereqError("The source path must be absolute")
2535
2536       export_info = rpc.call_export_info(src_node, src_path)
2537
2538       if not export_info:
2539         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2540
2541       if not export_info.has_section(constants.INISECT_EXP):
2542         raise errors.ProgrammerError("Corrupted export config")
2543
2544       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2545       if (int(ei_version) != constants.EXPORT_VERSION):
2546         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2547                                    (ei_version, constants.EXPORT_VERSION))
2548
2549       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2550         raise errors.OpPrereqError("Can't import instance with more than"
2551                                    " one data disk")
2552
2553       # FIXME: are the old os-es, disk sizes, etc. useful?
2554       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2555       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2556                                                          'disk0_dump'))
2557       self.src_image = diskimage
2558     else: # INSTANCE_CREATE
2559       if getattr(self.op, "os_type", None) is None:
2560         raise errors.OpPrereqError("No guest OS specified")
2561
2562     # check primary node
2563     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2564     if pnode is None:
2565       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2566                                  self.op.pnode)
2567     self.op.pnode = pnode.name
2568     self.pnode = pnode
2569     self.secondaries = []
2570     # disk template and mirror node verification
2571     if self.op.disk_template not in constants.DISK_TEMPLATES:
2572       raise errors.OpPrereqError("Invalid disk template name")
2573
2574     if self.op.disk_template == constants.DT_REMOTE_RAID1:
2575       if getattr(self.op, "snode", None) is None:
2576         raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2577                                    " a mirror node")
2578
2579       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2580       if snode_name is None:
2581         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2582                                    self.op.snode)
2583       elif snode_name == pnode.name:
2584         raise errors.OpPrereqError("The secondary node cannot be"
2585                                    " the primary node.")
2586       self.secondaries.append(snode_name)
2587
2588     # Check lv size requirements
2589     nodenames = [pnode.name] + self.secondaries
2590     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2591
2592     # Required free disk space as a function of disk and swap space
2593     req_size_dict = {
2594       constants.DT_DISKLESS: 0,
2595       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2596       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2597       # 256 MB are added for drbd metadata, 128MB for each drbd device
2598       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2599     }
2600
2601     if self.op.disk_template not in req_size_dict:
2602       raise errors.ProgrammerError("Disk template '%s' size requirement"
2603                                    " is unknown" %  self.op.disk_template)
2604
2605     req_size = req_size_dict[self.op.disk_template]
2606
2607     for node in nodenames:
2608       info = nodeinfo.get(node, None)
2609       if not info:
2610         raise errors.OpPrereqError("Cannot get current information"
2611                                    " from node '%s'" % nodeinfo)
2612       if req_size > info['vg_free']:
2613         raise errors.OpPrereqError("Not enough disk space on target node %s."
2614                                    " %d MB available, %d MB required" %
2615                                    (node, info['vg_free'], req_size))
2616
2617     # os verification
2618     os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2619     if not isinstance(os_obj, objects.OS):
2620       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2621                                  " primary node"  % self.op.os_type)
2622
2623     # instance verification
2624     hostname1 = utils.LookupHostname(self.op.instance_name)
2625     if not hostname1:
2626       raise errors.OpPrereqError("Instance name '%s' not found in dns" %
2627                                  self.op.instance_name)
2628
2629     self.op.instance_name = instance_name = hostname1['hostname']
2630     instance_list = self.cfg.GetInstanceList()
2631     if instance_name in instance_list:
2632       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2633                                  instance_name)
2634
2635     ip = getattr(self.op, "ip", None)
2636     if ip is None or ip.lower() == "none":
2637       inst_ip = None
2638     elif ip.lower() == "auto":
2639       inst_ip = hostname1['ip']
2640     else:
2641       if not utils.IsValidIP(ip):
2642         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2643                                    " like a valid IP" % ip)
2644       inst_ip = ip
2645     self.inst_ip = inst_ip
2646
2647     command = ["fping", "-q", hostname1['ip']]
2648     result = utils.RunCmd(command)
2649     if not result.failed:
2650       raise errors.OpPrereqError("IP %s of instance %s already in use" %
2651                                  (hostname1['ip'], instance_name))
2652
2653     # bridge verification
2654     bridge = getattr(self.op, "bridge", None)
2655     if bridge is None:
2656       self.op.bridge = self.cfg.GetDefBridge()
2657     else:
2658       self.op.bridge = bridge
2659
2660     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2661       raise errors.OpPrereqError("target bridge '%s' does not exist on"
2662                                  " destination node '%s'" %
2663                                  (self.op.bridge, pnode.name))
2664
2665     if self.op.start:
2666       self.instance_status = 'up'
2667     else:
2668       self.instance_status = 'down'
2669
2670   def Exec(self, feedback_fn):
2671     """Create and add the instance to the cluster.
2672
2673     """
2674     instance = self.op.instance_name
2675     pnode_name = self.pnode.name
2676
2677     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2678     if self.inst_ip is not None:
2679       nic.ip = self.inst_ip
2680
2681     disks = _GenerateDiskTemplate(self.cfg,
2682                                   self.op.disk_template,
2683                                   instance, pnode_name,
2684                                   self.secondaries, self.op.disk_size,
2685                                   self.op.swap_size)
2686
2687     iobj = objects.Instance(name=instance, os=self.op.os_type,
2688                             primary_node=pnode_name,
2689                             memory=self.op.mem_size,
2690                             vcpus=self.op.vcpus,
2691                             nics=[nic], disks=disks,
2692                             disk_template=self.op.disk_template,
2693                             status=self.instance_status,
2694                             )
2695
2696     feedback_fn("* creating instance disks...")
2697     if not _CreateDisks(self.cfg, iobj):
2698       _RemoveDisks(iobj, self.cfg)
2699       raise errors.OpExecError("Device creation failed, reverting...")
2700
2701     feedback_fn("adding instance %s to cluster config" % instance)
2702
2703     self.cfg.AddInstance(iobj)
2704
2705     if self.op.wait_for_sync:
2706       disk_abort = not _WaitForSync(self.cfg, iobj)
2707     elif iobj.disk_template == "remote_raid1":
2708       # make sure the disks are not degraded (still sync-ing is ok)
2709       time.sleep(15)
2710       feedback_fn("* checking mirrors status")
2711       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2712     else:
2713       disk_abort = False
2714
2715     if disk_abort:
2716       _RemoveDisks(iobj, self.cfg)
2717       self.cfg.RemoveInstance(iobj.name)
2718       raise errors.OpExecError("There are some degraded disks for"
2719                                " this instance")
2720
2721     feedback_fn("creating os for instance %s on node %s" %
2722                 (instance, pnode_name))
2723
2724     if iobj.disk_template != constants.DT_DISKLESS:
2725       if self.op.mode == constants.INSTANCE_CREATE:
2726         feedback_fn("* running the instance OS create scripts...")
2727         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2728           raise errors.OpExecError("could not add os for instance %s"
2729                                    " on node %s" %
2730                                    (instance, pnode_name))
2731
2732       elif self.op.mode == constants.INSTANCE_IMPORT:
2733         feedback_fn("* running the instance OS import scripts...")
2734         src_node = self.op.src_node
2735         src_image = self.src_image
2736         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2737                                                 src_node, src_image):
2738           raise errors.OpExecError("Could not import os for instance"
2739                                    " %s on node %s" %
2740                                    (instance, pnode_name))
2741       else:
2742         # also checked in the prereq part
2743         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2744                                      % self.op.mode)
2745
2746     if self.op.start:
2747       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2748       feedback_fn("* starting instance...")
2749       if not rpc.call_instance_start(pnode_name, iobj, None):
2750         raise errors.OpExecError("Could not start instance")
2751
2752
2753 class LUConnectConsole(NoHooksLU):
2754   """Connect to an instance's console.
2755
2756   This is somewhat special in that it returns the command line that
2757   you need to run on the master node in order to connect to the
2758   console.
2759
2760   """
2761   _OP_REQP = ["instance_name"]
2762
2763   def CheckPrereq(self):
2764     """Check prerequisites.
2765
2766     This checks that the instance is in the cluster.
2767
2768     """
2769     instance = self.cfg.GetInstanceInfo(
2770       self.cfg.ExpandInstanceName(self.op.instance_name))
2771     if instance is None:
2772       raise errors.OpPrereqError("Instance '%s' not known" %
2773                                  self.op.instance_name)
2774     self.instance = instance
2775
2776   def Exec(self, feedback_fn):
2777     """Connect to the console of an instance
2778
2779     """
2780     instance = self.instance
2781     node = instance.primary_node
2782
2783     node_insts = rpc.call_instance_list([node])[node]
2784     if node_insts is False:
2785       raise errors.OpExecError("Can't connect to node %s." % node)
2786
2787     if instance.name not in node_insts:
2788       raise errors.OpExecError("Instance %s is not running." % instance.name)
2789
2790     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2791
2792     hyper = hypervisor.GetHypervisor()
2793     console_cmd = hyper.GetShellCommandForConsole(instance.name)
2794     return node, console_cmd
2795
2796
2797 class LUAddMDDRBDComponent(LogicalUnit):
2798   """Adda new mirror member to an instance's disk.
2799
2800   """
2801   HPATH = "mirror-add"
2802   HTYPE = constants.HTYPE_INSTANCE
2803   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2804
2805   def BuildHooksEnv(self):
2806     """Build hooks env.
2807
2808     This runs on the master, the primary and all the secondaries.
2809
2810     """
2811     env = {
2812       "NEW_SECONDARY": self.op.remote_node,
2813       "DISK_NAME": self.op.disk_name,
2814       }
2815     env.update(_BuildInstanceHookEnvByObject(self.instance))
2816     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2817           self.op.remote_node,] + list(self.instance.secondary_nodes)
2818     return env, nl, nl
2819
2820   def CheckPrereq(self):
2821     """Check prerequisites.
2822
2823     This checks that the instance is in the cluster.
2824
2825     """
2826     instance = self.cfg.GetInstanceInfo(
2827       self.cfg.ExpandInstanceName(self.op.instance_name))
2828     if instance is None:
2829       raise errors.OpPrereqError("Instance '%s' not known" %
2830                                  self.op.instance_name)
2831     self.instance = instance
2832
2833     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2834     if remote_node is None:
2835       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
2836     self.remote_node = remote_node
2837
2838     if remote_node == instance.primary_node:
2839       raise errors.OpPrereqError("The specified node is the primary node of"
2840                                  " the instance.")
2841
2842     if instance.disk_template != constants.DT_REMOTE_RAID1:
2843       raise errors.OpPrereqError("Instance's disk layout is not"
2844                                  " remote_raid1.")
2845     for disk in instance.disks:
2846       if disk.iv_name == self.op.disk_name:
2847         break
2848     else:
2849       raise errors.OpPrereqError("Can't find this device ('%s') in the"
2850                                  " instance." % self.op.disk_name)
2851     if len(disk.children) > 1:
2852       raise errors.OpPrereqError("The device already has two slave"
2853                                  " devices.\n"
2854                                  "This would create a 3-disk raid1"
2855                                  " which we don't allow.")
2856     self.disk = disk
2857
2858   def Exec(self, feedback_fn):
2859     """Add the mirror component
2860
2861     """
2862     disk = self.disk
2863     instance = self.instance
2864
2865     remote_node = self.remote_node
2866     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
2867     names = _GenerateUniqueNames(self.cfg, lv_names)
2868     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
2869                                      remote_node, disk.size, names)
2870
2871     logger.Info("adding new mirror component on secondary")
2872     #HARDCODE
2873     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
2874                                       _GetInstanceInfoText(instance)):
2875       raise errors.OpExecError("Failed to create new component on secondary"
2876                                " node %s" % remote_node)
2877
2878     logger.Info("adding new mirror component on primary")
2879     #HARDCODE
2880     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
2881                                     _GetInstanceInfoText(instance)):
2882       # remove secondary dev
2883       self.cfg.SetDiskID(new_drbd, remote_node)
2884       rpc.call_blockdev_remove(remote_node, new_drbd)
2885       raise errors.OpExecError("Failed to create volume on primary")
2886
2887     # the device exists now
2888     # call the primary node to add the mirror to md
2889     logger.Info("adding new mirror component to md")
2890     if not rpc.call_blockdev_addchild(instance.primary_node,
2891                                            disk, new_drbd):
2892       logger.Error("Can't add mirror compoment to md!")
2893       self.cfg.SetDiskID(new_drbd, remote_node)
2894       if not rpc.call_blockdev_remove(remote_node, new_drbd):
2895         logger.Error("Can't rollback on secondary")
2896       self.cfg.SetDiskID(new_drbd, instance.primary_node)
2897       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2898         logger.Error("Can't rollback on primary")
2899       raise errors.OpExecError("Can't add mirror component to md array")
2900
2901     disk.children.append(new_drbd)
2902
2903     self.cfg.AddInstance(instance)
2904
2905     _WaitForSync(self.cfg, instance)
2906
2907     return 0
2908
2909
2910 class LURemoveMDDRBDComponent(LogicalUnit):
2911   """Remove a component from a remote_raid1 disk.
2912
2913   """
2914   HPATH = "mirror-remove"
2915   HTYPE = constants.HTYPE_INSTANCE
2916   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2917
2918   def BuildHooksEnv(self):
2919     """Build hooks env.
2920
2921     This runs on the master, the primary and all the secondaries.
2922
2923     """
2924     env = {
2925       "DISK_NAME": self.op.disk_name,
2926       "DISK_ID": self.op.disk_id,
2927       "OLD_SECONDARY": self.old_secondary,
2928       }
2929     env.update(_BuildInstanceHookEnvByObject(self.instance))
2930     nl = [self.sstore.GetMasterNode(),
2931           self.instance.primary_node] + list(self.instance.secondary_nodes)
2932     return env, nl, nl
2933
2934   def CheckPrereq(self):
2935     """Check prerequisites.
2936
2937     This checks that the instance is in the cluster.
2938
2939     """
2940     instance = self.cfg.GetInstanceInfo(
2941       self.cfg.ExpandInstanceName(self.op.instance_name))
2942     if instance is None:
2943       raise errors.OpPrereqError("Instance '%s' not known" %
2944                                  self.op.instance_name)
2945     self.instance = instance
2946
2947     if instance.disk_template != constants.DT_REMOTE_RAID1:
2948       raise errors.OpPrereqError("Instance's disk layout is not"
2949                                  " remote_raid1.")
2950     for disk in instance.disks:
2951       if disk.iv_name == self.op.disk_name:
2952         break
2953     else:
2954       raise errors.OpPrereqError("Can't find this device ('%s') in the"
2955                                  " instance." % self.op.disk_name)
2956     for child in disk.children:
2957       if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2958         break
2959     else:
2960       raise errors.OpPrereqError("Can't find the device with this port.")
2961
2962     if len(disk.children) < 2:
2963       raise errors.OpPrereqError("Cannot remove the last component from"
2964                                  " a mirror.")
2965     self.disk = disk
2966     self.child = child
2967     if self.child.logical_id[0] == instance.primary_node:
2968       oid = 1
2969     else:
2970       oid = 0
2971     self.old_secondary = self.child.logical_id[oid]
2972
2973   def Exec(self, feedback_fn):
2974     """Remove the mirror component
2975
2976     """
2977     instance = self.instance
2978     disk = self.disk
2979     child = self.child
2980     logger.Info("remove mirror component")
2981     self.cfg.SetDiskID(disk, instance.primary_node)
2982     if not rpc.call_blockdev_removechild(instance.primary_node,
2983                                               disk, child):
2984       raise errors.OpExecError("Can't remove child from mirror.")
2985
2986     for node in child.logical_id[:2]:
2987       self.cfg.SetDiskID(child, node)
2988       if not rpc.call_blockdev_remove(node, child):
2989         logger.Error("Warning: failed to remove device from node %s,"
2990                      " continuing operation." % node)
2991
2992     disk.children.remove(child)
2993     self.cfg.AddInstance(instance)
2994
2995
2996 class LUReplaceDisks(LogicalUnit):
2997   """Replace the disks of an instance.
2998
2999   """
3000   HPATH = "mirrors-replace"
3001   HTYPE = constants.HTYPE_INSTANCE
3002   _OP_REQP = ["instance_name"]
3003
3004   def BuildHooksEnv(self):
3005     """Build hooks env.
3006
3007     This runs on the master, the primary and all the secondaries.
3008
3009     """
3010     env = {
3011       "NEW_SECONDARY": self.op.remote_node,
3012       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3013       }
3014     env.update(_BuildInstanceHookEnvByObject(self.instance))
3015     nl = [self.sstore.GetMasterNode(),
3016           self.instance.primary_node] + list(self.instance.secondary_nodes)
3017     return env, nl, nl
3018
3019   def CheckPrereq(self):
3020     """Check prerequisites.
3021
3022     This checks that the instance is in the cluster.
3023
3024     """
3025     instance = self.cfg.GetInstanceInfo(
3026       self.cfg.ExpandInstanceName(self.op.instance_name))
3027     if instance is None:
3028       raise errors.OpPrereqError("Instance '%s' not known" %
3029                                  self.op.instance_name)
3030     self.instance = instance
3031
3032     if instance.disk_template != constants.DT_REMOTE_RAID1:
3033       raise errors.OpPrereqError("Instance's disk layout is not"
3034                                  " remote_raid1.")
3035
3036     if len(instance.secondary_nodes) != 1:
3037       raise errors.OpPrereqError("The instance has a strange layout,"
3038                                  " expected one secondary but found %d" %
3039                                  len(instance.secondary_nodes))
3040
3041     remote_node = getattr(self.op, "remote_node", None)
3042     if remote_node is None:
3043       remote_node = instance.secondary_nodes[0]
3044     else:
3045       remote_node = self.cfg.ExpandNodeName(remote_node)
3046       if remote_node is None:
3047         raise errors.OpPrereqError("Node '%s' not known" %
3048                                    self.op.remote_node)
3049     if remote_node == instance.primary_node:
3050       raise errors.OpPrereqError("The specified node is the primary node of"
3051                                  " the instance.")
3052     self.op.remote_node = remote_node
3053
3054   def Exec(self, feedback_fn):
3055     """Replace the disks of an instance.
3056
3057     """
3058     instance = self.instance
3059     iv_names = {}
3060     # start of work
3061     remote_node = self.op.remote_node
3062     cfg = self.cfg
3063     vgname = cfg.GetVGName()
3064     for dev in instance.disks:
3065       size = dev.size
3066       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3067       names = _GenerateUniqueNames(cfg, lv_names)
3068       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3069                                        remote_node, size, names)
3070       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3071       logger.Info("adding new mirror component on secondary for %s" %
3072                   dev.iv_name)
3073       #HARDCODE
3074       if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3075                                         _GetInstanceInfoText(instance)):
3076         raise errors.OpExecError("Failed to create new component on"
3077                                  " secondary node %s\n"
3078                                  "Full abort, cleanup manually!" %
3079                                  remote_node)
3080
3081       logger.Info("adding new mirror component on primary")
3082       #HARDCODE
3083       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3084                                       _GetInstanceInfoText(instance)):
3085         # remove secondary dev
3086         cfg.SetDiskID(new_drbd, remote_node)
3087         rpc.call_blockdev_remove(remote_node, new_drbd)
3088         raise errors.OpExecError("Failed to create volume on primary!\n"
3089                                  "Full abort, cleanup manually!!")
3090
3091       # the device exists now
3092       # call the primary node to add the mirror to md
3093       logger.Info("adding new mirror component to md")
3094       if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3095                                         new_drbd):
3096         logger.Error("Can't add mirror compoment to md!")
3097         cfg.SetDiskID(new_drbd, remote_node)
3098         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3099           logger.Error("Can't rollback on secondary")
3100         cfg.SetDiskID(new_drbd, instance.primary_node)
3101         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3102           logger.Error("Can't rollback on primary")
3103         raise errors.OpExecError("Full abort, cleanup manually!!")
3104
3105       dev.children.append(new_drbd)
3106       cfg.AddInstance(instance)
3107
3108     # this can fail as the old devices are degraded and _WaitForSync
3109     # does a combined result over all disks, so we don't check its
3110     # return value
3111     _WaitForSync(cfg, instance, unlock=True)
3112
3113     # so check manually all the devices
3114     for name in iv_names:
3115       dev, child, new_drbd = iv_names[name]
3116       cfg.SetDiskID(dev, instance.primary_node)
3117       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3118       if is_degr:
3119         raise errors.OpExecError("MD device %s is degraded!" % name)
3120       cfg.SetDiskID(new_drbd, instance.primary_node)
3121       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3122       if is_degr:
3123         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3124
3125     for name in iv_names:
3126       dev, child, new_drbd = iv_names[name]
3127       logger.Info("remove mirror %s component" % name)
3128       cfg.SetDiskID(dev, instance.primary_node)
3129       if not rpc.call_blockdev_removechild(instance.primary_node,
3130                                                 dev, child):
3131         logger.Error("Can't remove child from mirror, aborting"
3132                      " *this device cleanup*.\nYou need to cleanup manually!!")
3133         continue
3134
3135       for node in child.logical_id[:2]:
3136         logger.Info("remove child device on %s" % node)
3137         cfg.SetDiskID(child, node)
3138         if not rpc.call_blockdev_remove(node, child):
3139           logger.Error("Warning: failed to remove device from node %s,"
3140                        " continuing operation." % node)
3141
3142       dev.children.remove(child)
3143
3144       cfg.AddInstance(instance)
3145
3146
3147 class LUQueryInstanceData(NoHooksLU):
3148   """Query runtime instance data.
3149
3150   """
3151   _OP_REQP = ["instances"]
3152
3153   def CheckPrereq(self):
3154     """Check prerequisites.
3155
3156     This only checks the optional instance list against the existing names.
3157
3158     """
3159     if not isinstance(self.op.instances, list):
3160       raise errors.OpPrereqError("Invalid argument type 'instances'")
3161     if self.op.instances:
3162       self.wanted_instances = []
3163       names = self.op.instances
3164       for name in names:
3165         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3166         if instance is None:
3167           raise errors.OpPrereqError("No such instance name '%s'" % name)
3168       self.wanted_instances.append(instance)
3169     else:
3170       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3171                                in self.cfg.GetInstanceList()]
3172     return
3173
3174
3175   def _ComputeDiskStatus(self, instance, snode, dev):
3176     """Compute block device status.
3177
3178     """
3179     self.cfg.SetDiskID(dev, instance.primary_node)
3180     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3181     if dev.dev_type == "drbd":
3182       # we change the snode then (otherwise we use the one passed in)
3183       if dev.logical_id[0] == instance.primary_node:
3184         snode = dev.logical_id[1]
3185       else:
3186         snode = dev.logical_id[0]
3187
3188     if snode:
3189       self.cfg.SetDiskID(dev, snode)
3190       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3191     else:
3192       dev_sstatus = None
3193
3194     if dev.children:
3195       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3196                       for child in dev.children]
3197     else:
3198       dev_children = []
3199
3200     data = {
3201       "iv_name": dev.iv_name,
3202       "dev_type": dev.dev_type,
3203       "logical_id": dev.logical_id,
3204       "physical_id": dev.physical_id,
3205       "pstatus": dev_pstatus,
3206       "sstatus": dev_sstatus,
3207       "children": dev_children,
3208       }
3209
3210     return data
3211
3212   def Exec(self, feedback_fn):
3213     """Gather and return data"""
3214     result = {}
3215     for instance in self.wanted_instances:
3216       remote_info = rpc.call_instance_info(instance.primary_node,
3217                                                 instance.name)
3218       if remote_info and "state" in remote_info:
3219         remote_state = "up"
3220       else:
3221         remote_state = "down"
3222       if instance.status == "down":
3223         config_state = "down"
3224       else:
3225         config_state = "up"
3226
3227       disks = [self._ComputeDiskStatus(instance, None, device)
3228                for device in instance.disks]
3229
3230       idict = {
3231         "name": instance.name,
3232         "config_state": config_state,
3233         "run_state": remote_state,
3234         "pnode": instance.primary_node,
3235         "snodes": instance.secondary_nodes,
3236         "os": instance.os,
3237         "memory": instance.memory,
3238         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3239         "disks": disks,
3240         }
3241
3242       result[instance.name] = idict
3243
3244     return result
3245
3246
3247 class LUQueryNodeData(NoHooksLU):
3248   """Logical unit for querying node data.
3249
3250   """
3251   _OP_REQP = ["nodes"]
3252
3253   def CheckPrereq(self):
3254     """Check prerequisites.
3255
3256     This only checks the optional node list against the existing names.
3257
3258     """
3259     self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3260
3261   def Exec(self, feedback_fn):
3262     """Compute and return the list of nodes.
3263
3264     """
3265     ilist = [self.cfg.GetInstanceInfo(iname) for iname
3266              in self.cfg.GetInstanceList()]
3267     result = []
3268     for node in self.wanted_nodes:
3269       result.append((node.name, node.primary_ip, node.secondary_ip,
3270                      [inst.name for inst in ilist
3271                       if inst.primary_node == node.name],
3272                      [inst.name for inst in ilist
3273                       if node.name in inst.secondary_nodes],
3274                      ))
3275     return result
3276
3277
3278 class LUSetInstanceParms(LogicalUnit):
3279   """Modifies an instances's parameters.
3280
3281   """
3282   HPATH = "instance-modify"
3283   HTYPE = constants.HTYPE_INSTANCE
3284   _OP_REQP = ["instance_name"]
3285
3286   def BuildHooksEnv(self):
3287     """Build hooks env.
3288
3289     This runs on the master, primary and secondaries.
3290
3291     """
3292     args = dict()
3293     if self.mem:
3294       args['memory'] = self.mem
3295     if self.vcpus:
3296       args['vcpus'] = self.vcpus
3297     if self.do_ip or self.do_bridge:
3298       if self.do_ip:
3299         ip = self.ip
3300       else:
3301         ip = self.instance.nics[0].ip
3302       if self.bridge:
3303         bridge = self.bridge
3304       else:
3305         bridge = self.instance.nics[0].bridge
3306       args['nics'] = [(ip, bridge)]
3307     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3308     nl = [self.sstore.GetMasterNode(),
3309           self.instance.primary_node] + list(self.instance.secondary_nodes)
3310     return env, nl, nl
3311
3312   def CheckPrereq(self):
3313     """Check prerequisites.
3314
3315     This only checks the instance list against the existing names.
3316
3317     """
3318     self.mem = getattr(self.op, "mem", None)
3319     self.vcpus = getattr(self.op, "vcpus", None)
3320     self.ip = getattr(self.op, "ip", None)
3321     self.bridge = getattr(self.op, "bridge", None)
3322     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3323       raise errors.OpPrereqError("No changes submitted")
3324     if self.mem is not None:
3325       try:
3326         self.mem = int(self.mem)
3327       except ValueError, err:
3328         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3329     if self.vcpus is not None:
3330       try:
3331         self.vcpus = int(self.vcpus)
3332       except ValueError, err:
3333         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3334     if self.ip is not None:
3335       self.do_ip = True
3336       if self.ip.lower() == "none":
3337         self.ip = None
3338       else:
3339         if not utils.IsValidIP(self.ip):
3340           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3341     else:
3342       self.do_ip = False
3343     self.do_bridge = (self.bridge is not None)
3344
3345     instance = self.cfg.GetInstanceInfo(
3346       self.cfg.ExpandInstanceName(self.op.instance_name))
3347     if instance is None:
3348       raise errors.OpPrereqError("No such instance name '%s'" %
3349                                  self.op.instance_name)
3350     self.op.instance_name = instance.name
3351     self.instance = instance
3352     return
3353
3354   def Exec(self, feedback_fn):
3355     """Modifies an instance.
3356
3357     All parameters take effect only at the next restart of the instance.
3358     """
3359     result = []
3360     instance = self.instance
3361     if self.mem:
3362       instance.memory = self.mem
3363       result.append(("mem", self.mem))
3364     if self.vcpus:
3365       instance.vcpus = self.vcpus
3366       result.append(("vcpus",  self.vcpus))
3367     if self.do_ip:
3368       instance.nics[0].ip = self.ip
3369       result.append(("ip", self.ip))
3370     if self.bridge:
3371       instance.nics[0].bridge = self.bridge
3372       result.append(("bridge", self.bridge))
3373
3374     self.cfg.AddInstance(instance)
3375
3376     return result
3377
3378
3379 class LUQueryExports(NoHooksLU):
3380   """Query the exports list
3381
3382   """
3383   _OP_REQP = []
3384
3385   def CheckPrereq(self):
3386     """Check that the nodelist contains only existing nodes.
3387
3388     """
3389     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3390
3391   def Exec(self, feedback_fn):
3392     """Compute the list of all the exported system images.
3393
3394     Returns:
3395       a dictionary with the structure node->(export-list)
3396       where export-list is a list of the instances exported on
3397       that node.
3398
3399     """
3400     return rpc.call_export_list([node.name for node in self.nodes])
3401
3402
3403 class LUExportInstance(LogicalUnit):
3404   """Export an instance to an image in the cluster.
3405
3406   """
3407   HPATH = "instance-export"
3408   HTYPE = constants.HTYPE_INSTANCE
3409   _OP_REQP = ["instance_name", "target_node", "shutdown"]
3410
3411   def BuildHooksEnv(self):
3412     """Build hooks env.
3413
3414     This will run on the master, primary node and target node.
3415
3416     """
3417     env = {
3418       "EXPORT_NODE": self.op.target_node,
3419       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3420       }
3421     env.update(_BuildInstanceHookEnvByObject(self.instance))
3422     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3423           self.op.target_node]
3424     return env, nl, nl
3425
3426   def CheckPrereq(self):
3427     """Check prerequisites.
3428
3429     This checks that the instance name is a valid one.
3430
3431     """
3432     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3433     self.instance = self.cfg.GetInstanceInfo(instance_name)
3434     if self.instance is None:
3435       raise errors.OpPrereqError("Instance '%s' not found" %
3436                                  self.op.instance_name)
3437
3438     # node verification
3439     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3440     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3441
3442     if self.dst_node is None:
3443       raise errors.OpPrereqError("Destination node '%s' is unknown." %
3444                                  self.op.target_node)
3445     self.op.target_node = self.dst_node.name
3446
3447   def Exec(self, feedback_fn):
3448     """Export an instance to an image in the cluster.
3449
3450     """
3451     instance = self.instance
3452     dst_node = self.dst_node
3453     src_node = instance.primary_node
3454     # shutdown the instance, unless requested not to do so
3455     if self.op.shutdown:
3456       op = opcodes.OpShutdownInstance(instance_name=instance.name)
3457       self.processor.ChainOpCode(op, feedback_fn)
3458
3459     vgname = self.cfg.GetVGName()
3460
3461     snap_disks = []
3462
3463     try:
3464       for disk in instance.disks:
3465         if disk.iv_name == "sda":
3466           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3467           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3468
3469           if not new_dev_name:
3470             logger.Error("could not snapshot block device %s on node %s" %
3471                          (disk.logical_id[1], src_node))
3472           else:
3473             new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3474                                       logical_id=(vgname, new_dev_name),
3475                                       physical_id=(vgname, new_dev_name),
3476                                       iv_name=disk.iv_name)
3477             snap_disks.append(new_dev)
3478
3479     finally:
3480       if self.op.shutdown:
3481         op = opcodes.OpStartupInstance(instance_name=instance.name,
3482                                        force=False)
3483         self.processor.ChainOpCode(op, feedback_fn)
3484
3485     # TODO: check for size
3486
3487     for dev in snap_disks:
3488       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3489                                            instance):
3490         logger.Error("could not export block device %s from node"
3491                      " %s to node %s" %
3492                      (dev.logical_id[1], src_node, dst_node.name))
3493       if not rpc.call_blockdev_remove(src_node, dev):
3494         logger.Error("could not remove snapshot block device %s from"
3495                      " node %s" % (dev.logical_id[1], src_node))
3496
3497     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3498       logger.Error("could not finalize export for instance %s on node %s" %
3499                    (instance.name, dst_node.name))
3500
3501     nodelist = self.cfg.GetNodeList()
3502     nodelist.remove(dst_node.name)
3503
3504     # on one-node clusters nodelist will be empty after the removal
3505     # if we proceed the backup would be removed because OpQueryExports
3506     # substitutes an empty list with the full cluster node list.
3507     if nodelist:
3508       op = opcodes.OpQueryExports(nodes=nodelist)
3509       exportlist = self.processor.ChainOpCode(op, feedback_fn)
3510       for node in exportlist:
3511         if instance.name in exportlist[node]:
3512           if not rpc.call_export_remove(node, instance.name):
3513             logger.Error("could not remove older export for instance %s"
3514                          " on node %s" % (instance.name, node))
3515
3516
3517 class TagsLU(NoHooksLU):
3518   """Generic tags LU.
3519
3520   This is an abstract class which is the parent of all the other tags LUs.
3521
3522   """
3523   def CheckPrereq(self):
3524     """Check prerequisites.
3525
3526     """
3527     if self.op.kind == constants.TAG_CLUSTER:
3528       self.target = self.cfg.GetClusterInfo()
3529     elif self.op.kind == constants.TAG_NODE:
3530       name = self.cfg.ExpandNodeName(self.op.name)
3531       if name is None:
3532         raise errors.OpPrereqError("Invalid node name (%s)" %
3533                                    (self.op.name,))
3534       self.op.name = name
3535       self.target = self.cfg.GetNodeInfo(name)
3536     elif self.op.kind == constants.TAG_INSTANCE:
3537       name = self.cfg.ExpandInstanceName(name)
3538       if name is None:
3539         raise errors.OpPrereqError("Invalid instance name (%s)" %
3540                                    (self.op.name,))
3541       self.op.name = name
3542       self.target = self.cfg.GetInstanceInfo(name)
3543     else:
3544       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3545                                  str(self.op.kind))
3546
3547
3548 class LUGetTags(TagsLU):
3549   """Returns the tags of a given object.
3550
3551   """
3552   _OP_REQP = ["kind", "name"]
3553
3554   def Exec(self, feedback_fn):
3555     """Returns the tag list.
3556
3557     """
3558     return self.target.GetTags()
3559
3560
3561 class LUAddTag(TagsLU):
3562   """Sets a tag on a given object.
3563
3564   """
3565   _OP_REQP = ["kind", "name", "tag"]
3566
3567   def CheckPrereq(self):
3568     """Check prerequisites.
3569
3570     This checks the type and length of the tag name and value.
3571
3572     """
3573     TagsLU.CheckPrereq(self)
3574     objects.TaggableObject.ValidateTag(self.op.tag)
3575
3576   def Exec(self, feedback_fn):
3577     """Sets the tag.
3578
3579     """
3580     try:
3581       self.target.AddTag(self.op.tag)
3582     except errors.TagError, err:
3583       raise errors.OpExecError("Error while setting tag: %s" % str(err))
3584     try:
3585       self.cfg.Update(self.target)
3586     except errors.ConfigurationError:
3587       raise errors.OpRetryError("There has been a modification to the"
3588                                 " config file and the operation has been"
3589                                 " aborted. Please retry.")
3590
3591
3592 class LUDelTag(TagsLU):
3593   """Delete a tag from a given object.
3594
3595   """
3596   _OP_REQP = ["kind", "name", "tag"]
3597
3598   def CheckPrereq(self):
3599     """Check prerequisites.
3600
3601     This checks that we have the given tag.
3602
3603     """
3604     TagsLU.CheckPrereq(self)
3605     objects.TaggableObject.ValidateTag(self.op.tag)
3606     if self.op.tag not in self.target.GetTags():
3607       raise errors.OpPrereqError("Tag not found")
3608
3609   def Exec(self, feedback_fn):
3610     """Remove the tag from the object.
3611
3612     """
3613     self.target.RemoveTag(self.op.tag)
3614     try:
3615       self.cfg.Update(self.target)
3616     except errors.ConfigurationError:
3617       raise errors.OpRetryError("There has been a modification to the"
3618                                 " config file and the operation has been"
3619                                 " aborted. Please retry.")