Fix bridge checking in instance failover
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import socket
30 import time
31 import tempfile
32 import re
33 import platform
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.processor = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     for attr_name in self._OP_REQP:
78       attr_val = getattr(op, attr_name, None)
79       if attr_val is None:
80         raise errors.OpPrereqError("Required parameter '%s' missing" %
81                                    attr_name)
82     if self.REQ_CLUSTER:
83       if not cfg.IsCluster():
84         raise errors.OpPrereqError("Cluster not initialized yet,"
85                                    " use 'gnt-cluster init' first.")
86       if self.REQ_MASTER:
87         master = sstore.GetMasterNode()
88         if master != utils.HostInfo().name:
89           raise errors.OpPrereqError("Commands must be run on the master"
90                                      " node %s" % master)
91
92   def CheckPrereq(self):
93     """Check prerequisites for this LU.
94
95     This method should check that the prerequisites for the execution
96     of this LU are fulfilled. It can do internode communication, but
97     it should be idempotent - no cluster or system changes are
98     allowed.
99
100     The method should raise errors.OpPrereqError in case something is
101     not fulfilled. Its return value is ignored.
102
103     This method should also update all the parameters of the opcode to
104     their canonical form; e.g. a short node name must be fully
105     expanded after this method has successfully completed (so that
106     hooks, logging, etc. work correctly).
107
108     """
109     raise NotImplementedError
110
111   def Exec(self, feedback_fn):
112     """Execute the LU.
113
114     This method should implement the actual work. It should raise
115     errors.OpExecError for failures that are somewhat dealt with in
116     code, or expected.
117
118     """
119     raise NotImplementedError
120
121   def BuildHooksEnv(self):
122     """Build hooks environment for this LU.
123
124     This method should return a three-node tuple consisting of: a dict
125     containing the environment that will be used for running the
126     specific hook for this LU, a list of node names on which the hook
127     should run before the execution, and a list of node names on which
128     the hook should run after the execution.
129
130     The keys of the dict must not have 'GANETI_' prefixed as this will
131     be handled in the hooks runner. Also note additional keys will be
132     added by the hooks runner. If the LU doesn't define any
133     environment, an empty dict (and not None) should be returned.
134
135     As for the node lists, the master should not be included in the
136     them, as it will be added by the hooks runner in case this LU
137     requires a cluster to run on (otherwise we don't have a node
138     list). No nodes should be returned as an empty list (and not
139     None).
140
141     Note that if the HPATH for a LU class is None, this function will
142     not be called.
143
144     """
145     raise NotImplementedError
146
147
148 class NoHooksLU(LogicalUnit):
149   """Simple LU which runs no hooks.
150
151   This LU is intended as a parent for other LogicalUnits which will
152   run no hooks, in order to reduce duplicate code.
153
154   """
155   HPATH = None
156   HTYPE = None
157
158   def BuildHooksEnv(self):
159     """Build hooks env.
160
161     This is a no-op, since we don't run hooks.
162
163     """
164     return {}, [], []
165
166
167 def _GetWantedNodes(lu, nodes):
168   """Returns list of checked and expanded node names.
169
170   Args:
171     nodes: List of nodes (strings) or None for all
172
173   """
174   if not isinstance(nodes, list):
175     raise errors.OpPrereqError("Invalid argument type 'nodes'")
176
177   if nodes:
178     wanted = []
179
180     for name in nodes:
181       node = lu.cfg.ExpandNodeName(name)
182       if node is None:
183         raise errors.OpPrereqError("No such node name '%s'" % name)
184       wanted.append(node)
185
186   else:
187     wanted = lu.cfg.GetNodeList()
188   return utils.NiceSort(wanted)
189
190
191 def _GetWantedInstances(lu, instances):
192   """Returns list of checked and expanded instance names.
193
194   Args:
195     instances: List of instances (strings) or None for all
196
197   """
198   if not isinstance(instances, list):
199     raise errors.OpPrereqError("Invalid argument type 'instances'")
200
201   if instances:
202     wanted = []
203
204     for name in instances:
205       instance = lu.cfg.ExpandInstanceName(name)
206       if instance is None:
207         raise errors.OpPrereqError("No such instance name '%s'" % name)
208       wanted.append(instance)
209
210   else:
211     wanted = lu.cfg.GetInstanceList()
212   return utils.NiceSort(wanted)
213
214
215 def _CheckOutputFields(static, dynamic, selected):
216   """Checks whether all selected fields are valid.
217
218   Args:
219     static: Static fields
220     dynamic: Dynamic fields
221
222   """
223   static_fields = frozenset(static)
224   dynamic_fields = frozenset(dynamic)
225
226   all_fields = static_fields | dynamic_fields
227
228   if not all_fields.issuperset(selected):
229     raise errors.OpPrereqError("Unknown output fields selected: %s"
230                                % ",".join(frozenset(selected).
231                                           difference(all_fields)))
232
233
234 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235                           memory, vcpus, nics):
236   """Builds instance related env variables for hooks from single variables.
237
238   Args:
239     secondary_nodes: List of secondary nodes as strings
240   """
241   env = {
242     "OP_TARGET": name,
243     "INSTANCE_NAME": name,
244     "INSTANCE_PRIMARY": primary_node,
245     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
246     "INSTANCE_OS_TYPE": os_type,
247     "INSTANCE_STATUS": status,
248     "INSTANCE_MEMORY": memory,
249     "INSTANCE_VCPUS": vcpus,
250   }
251
252   if nics:
253     nic_count = len(nics)
254     for idx, (ip, bridge) in enumerate(nics):
255       if ip is None:
256         ip = ""
257       env["INSTANCE_NIC%d_IP" % idx] = ip
258       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
259   else:
260     nic_count = 0
261
262   env["INSTANCE_NIC_COUNT"] = nic_count
263
264   return env
265
266
267 def _BuildInstanceHookEnvByObject(instance, override=None):
268   """Builds instance related env variables for hooks from an object.
269
270   Args:
271     instance: objects.Instance object of instance
272     override: dict of values to override
273   """
274   args = {
275     'name': instance.name,
276     'primary_node': instance.primary_node,
277     'secondary_nodes': instance.secondary_nodes,
278     'os_type': instance.os,
279     'status': instance.os,
280     'memory': instance.memory,
281     'vcpus': instance.vcpus,
282     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
283   }
284   if override:
285     args.update(override)
286   return _BuildInstanceHookEnv(**args)
287
288
289 def _UpdateEtcHosts(fullnode, ip):
290   """Ensure a node has a correct entry in /etc/hosts.
291
292   Args:
293     fullnode - Fully qualified domain name of host. (str)
294     ip       - IPv4 address of host (str)
295
296   """
297   node = fullnode.split(".", 1)[0]
298
299   f = open('/etc/hosts', 'r+')
300
301   inthere = False
302
303   save_lines = []
304   add_lines = []
305   removed = False
306
307   while True:
308     rawline = f.readline()
309
310     if not rawline:
311       # End of file
312       break
313
314     line = rawline.split('\n')[0]
315
316     # Strip off comments
317     line = line.split('#')[0]
318
319     if not line:
320       # Entire line was comment, skip
321       save_lines.append(rawline)
322       continue
323
324     fields = line.split()
325
326     haveall = True
327     havesome = False
328     for spec in [ ip, fullnode, node ]:
329       if spec not in fields:
330         haveall = False
331       if spec in fields:
332         havesome = True
333
334     if haveall:
335       inthere = True
336       save_lines.append(rawline)
337       continue
338
339     if havesome and not haveall:
340       # Line (old, or manual?) which is missing some.  Remove.
341       removed = True
342       continue
343
344     save_lines.append(rawline)
345
346   if not inthere:
347     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
348
349   if removed:
350     if add_lines:
351       save_lines = save_lines + add_lines
352
353     # We removed a line, write a new file and replace old.
354     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
355     newfile = os.fdopen(fd, 'w')
356     newfile.write(''.join(save_lines))
357     newfile.close()
358     os.rename(tmpname, '/etc/hosts')
359
360   elif add_lines:
361     # Simply appending a new line will do the trick.
362     f.seek(0, 2)
363     for add in add_lines:
364       f.write(add)
365
366   f.close()
367
368
369 def _UpdateKnownHosts(fullnode, ip, pubkey):
370   """Ensure a node has a correct known_hosts entry.
371
372   Args:
373     fullnode - Fully qualified domain name of host. (str)
374     ip       - IPv4 address of host (str)
375     pubkey   - the public key of the cluster
376
377   """
378   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
379     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
380   else:
381     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
382
383   inthere = False
384
385   save_lines = []
386   add_lines = []
387   removed = False
388
389   for rawline in f:
390     logger.Debug('read %s' % (repr(rawline),))
391
392     parts = rawline.rstrip('\r\n').split()
393
394     # Ignore unwanted lines
395     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
396       fields = parts[0].split(',')
397       key = parts[2]
398
399       haveall = True
400       havesome = False
401       for spec in [ ip, fullnode ]:
402         if spec not in fields:
403           haveall = False
404         if spec in fields:
405           havesome = True
406
407       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
408       if haveall and key == pubkey:
409         inthere = True
410         save_lines.append(rawline)
411         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
412         continue
413
414       if havesome and (not haveall or key != pubkey):
415         removed = True
416         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
417         continue
418
419     save_lines.append(rawline)
420
421   if not inthere:
422     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
423     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
424
425   if removed:
426     save_lines = save_lines + add_lines
427
428     # Write a new file and replace old.
429     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
430                                    constants.DATA_DIR)
431     newfile = os.fdopen(fd, 'w')
432     try:
433       newfile.write(''.join(save_lines))
434     finally:
435       newfile.close()
436     logger.Debug("Wrote new known_hosts.")
437     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
438
439   elif add_lines:
440     # Simply appending a new line will do the trick.
441     f.seek(0, 2)
442     for add in add_lines:
443       f.write(add)
444
445   f.close()
446
447
448 def _HasValidVG(vglist, vgname):
449   """Checks if the volume group list is valid.
450
451   A non-None return value means there's an error, and the return value
452   is the error message.
453
454   """
455   vgsize = vglist.get(vgname, None)
456   if vgsize is None:
457     return "volume group '%s' missing" % vgname
458   elif vgsize < 20480:
459     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
460             (vgname, vgsize))
461   return None
462
463
464 def _InitSSHSetup(node):
465   """Setup the SSH configuration for the cluster.
466
467
468   This generates a dsa keypair for root, adds the pub key to the
469   permitted hosts and adds the hostkey to its own known hosts.
470
471   Args:
472     node: the name of this host as a fqdn
473
474   """
475   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
476
477   for name in priv_key, pub_key:
478     if os.path.exists(name):
479       utils.CreateBackup(name)
480     utils.RemoveFile(name)
481
482   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
483                          "-f", priv_key,
484                          "-q", "-N", ""])
485   if result.failed:
486     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
487                              result.output)
488
489   f = open(pub_key, 'r')
490   try:
491     utils.AddAuthorizedKey(auth_keys, f.read(8192))
492   finally:
493     f.close()
494
495
496 def _InitGanetiServerSetup(ss):
497   """Setup the necessary configuration for the initial node daemon.
498
499   This creates the nodepass file containing the shared password for
500   the cluster and also generates the SSL certificate.
501
502   """
503   # Create pseudo random password
504   randpass = sha.new(os.urandom(64)).hexdigest()
505   # and write it into sstore
506   ss.SetKey(ss.SS_NODED_PASS, randpass)
507
508   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
509                          "-days", str(365*5), "-nodes", "-x509",
510                          "-keyout", constants.SSL_CERT_FILE,
511                          "-out", constants.SSL_CERT_FILE, "-batch"])
512   if result.failed:
513     raise errors.OpExecError("could not generate server ssl cert, command"
514                              " %s had exitcode %s and error message %s" %
515                              (result.cmd, result.exit_code, result.output))
516
517   os.chmod(constants.SSL_CERT_FILE, 0400)
518
519   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
520
521   if result.failed:
522     raise errors.OpExecError("Could not start the node daemon, command %s"
523                              " had exitcode %s and error %s" %
524                              (result.cmd, result.exit_code, result.output))
525
526
527 def _CheckInstanceBridgesExist(instance):
528   """Check that the brigdes needed by an instance exist.
529
530   """
531   # check bridges existance
532   brlist = [nic.bridge for nic in instance.nics]
533   if not rpc.call_bridges_exist(instance.primary_node, brlist):
534     raise errors.OpPrereqError("one or more target bridges %s does not"
535                                " exist on destination node '%s'" %
536                                (brlist, instance.primary_node))
537
538
539 class LUInitCluster(LogicalUnit):
540   """Initialise the cluster.
541
542   """
543   HPATH = "cluster-init"
544   HTYPE = constants.HTYPE_CLUSTER
545   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
546               "def_bridge", "master_netdev"]
547   REQ_CLUSTER = False
548
549   def BuildHooksEnv(self):
550     """Build hooks env.
551
552     Notes: Since we don't require a cluster, we must manually add
553     ourselves in the post-run node list.
554
555     """
556     env = {"OP_TARGET": self.op.cluster_name}
557     return env, [], [self.hostname.name]
558
559   def CheckPrereq(self):
560     """Verify that the passed name is a valid one.
561
562     """
563     if config.ConfigWriter.IsCluster():
564       raise errors.OpPrereqError("Cluster is already initialised")
565
566     self.hostname = hostname = utils.HostInfo()
567
568     if hostname.ip.startswith("127."):
569       raise errors.OpPrereqError("This host's IP resolves to the private"
570                                  " range (%s). Please fix DNS or /etc/hosts." %
571                                  (hostname.ip,))
572
573     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
574
575     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
576                          constants.DEFAULT_NODED_PORT):
577       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
578                                  " to %s,\nbut this ip address does not"
579                                  " belong to this host."
580                                  " Aborting." % hostname.ip)
581
582     secondary_ip = getattr(self.op, "secondary_ip", None)
583     if secondary_ip and not utils.IsValidIP(secondary_ip):
584       raise errors.OpPrereqError("Invalid secondary ip given")
585     if (secondary_ip and
586         secondary_ip != hostname.ip and
587         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
588                            constants.DEFAULT_NODED_PORT))):
589       raise errors.OpPrereqError("You gave %s as secondary IP,\n"
590                                  "but it does not belong to this host." %
591                                  secondary_ip)
592     self.secondary_ip = secondary_ip
593
594     # checks presence of the volume group given
595     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
596
597     if vgstatus:
598       raise errors.OpPrereqError("Error: %s" % vgstatus)
599
600     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
601                     self.op.mac_prefix):
602       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
603                                  self.op.mac_prefix)
604
605     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
606       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
607                                  self.op.hypervisor_type)
608
609     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
610     if result.failed:
611       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
612                                  (self.op.master_netdev,
613                                   result.output.strip()))
614
615   def Exec(self, feedback_fn):
616     """Initialize the cluster.
617
618     """
619     clustername = self.clustername
620     hostname = self.hostname
621
622     # set up the simple store
623     self.sstore = ss = ssconf.SimpleStore()
624     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
625     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
626     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
627     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
628     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
629
630     # set up the inter-node password and certificate
631     _InitGanetiServerSetup(ss)
632
633     # start the master ip
634     rpc.call_node_start_master(hostname.name)
635
636     # set up ssh config and /etc/hosts
637     f = open(constants.SSH_HOST_RSA_PUB, 'r')
638     try:
639       sshline = f.read()
640     finally:
641       f.close()
642     sshkey = sshline.split(" ")[1]
643
644     _UpdateEtcHosts(hostname.name, hostname.ip)
645
646     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
647
648     _InitSSHSetup(hostname.name)
649
650     # init of cluster config file
651     self.cfg = cfgw = config.ConfigWriter()
652     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
653                     sshkey, self.op.mac_prefix,
654                     self.op.vg_name, self.op.def_bridge)
655
656
657 class LUDestroyCluster(NoHooksLU):
658   """Logical unit for destroying the cluster.
659
660   """
661   _OP_REQP = []
662
663   def CheckPrereq(self):
664     """Check prerequisites.
665
666     This checks whether the cluster is empty.
667
668     Any errors are signalled by raising errors.OpPrereqError.
669
670     """
671     master = self.sstore.GetMasterNode()
672
673     nodelist = self.cfg.GetNodeList()
674     if len(nodelist) != 1 or nodelist[0] != master:
675       raise errors.OpPrereqError("There are still %d node(s) in"
676                                  " this cluster." % (len(nodelist) - 1))
677     instancelist = self.cfg.GetInstanceList()
678     if instancelist:
679       raise errors.OpPrereqError("There are still %d instance(s) in"
680                                  " this cluster." % len(instancelist))
681
682   def Exec(self, feedback_fn):
683     """Destroys the cluster.
684
685     """
686     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
687     utils.CreateBackup(priv_key)
688     utils.CreateBackup(pub_key)
689     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
690
691
692 class LUVerifyCluster(NoHooksLU):
693   """Verifies the cluster status.
694
695   """
696   _OP_REQP = []
697
698   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
699                   remote_version, feedback_fn):
700     """Run multiple tests against a node.
701
702     Test list:
703       - compares ganeti version
704       - checks vg existance and size > 20G
705       - checks config file checksum
706       - checks ssh to other nodes
707
708     Args:
709       node: name of the node to check
710       file_list: required list of files
711       local_cksum: dictionary of local files and their checksums
712
713     """
714     # compares ganeti version
715     local_version = constants.PROTOCOL_VERSION
716     if not remote_version:
717       feedback_fn(" - ERROR: connection to %s failed" % (node))
718       return True
719
720     if local_version != remote_version:
721       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
722                       (local_version, node, remote_version))
723       return True
724
725     # checks vg existance and size > 20G
726
727     bad = False
728     if not vglist:
729       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
730                       (node,))
731       bad = True
732     else:
733       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
734       if vgstatus:
735         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
736         bad = True
737
738     # checks config file checksum
739     # checks ssh to any
740
741     if 'filelist' not in node_result:
742       bad = True
743       feedback_fn("  - ERROR: node hasn't returned file checksum data")
744     else:
745       remote_cksum = node_result['filelist']
746       for file_name in file_list:
747         if file_name not in remote_cksum:
748           bad = True
749           feedback_fn("  - ERROR: file '%s' missing" % file_name)
750         elif remote_cksum[file_name] != local_cksum[file_name]:
751           bad = True
752           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
753
754     if 'nodelist' not in node_result:
755       bad = True
756       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
757     else:
758       if node_result['nodelist']:
759         bad = True
760         for node in node_result['nodelist']:
761           feedback_fn("  - ERROR: communication with node '%s': %s" %
762                           (node, node_result['nodelist'][node]))
763     hyp_result = node_result.get('hypervisor', None)
764     if hyp_result is not None:
765       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
766     return bad
767
768   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
769     """Verify an instance.
770
771     This function checks to see if the required block devices are
772     available on the instance's node.
773
774     """
775     bad = False
776
777     instancelist = self.cfg.GetInstanceList()
778     if not instance in instancelist:
779       feedback_fn("  - ERROR: instance %s not in instance list %s" %
780                       (instance, instancelist))
781       bad = True
782
783     instanceconfig = self.cfg.GetInstanceInfo(instance)
784     node_current = instanceconfig.primary_node
785
786     node_vol_should = {}
787     instanceconfig.MapLVsByNode(node_vol_should)
788
789     for node in node_vol_should:
790       for volume in node_vol_should[node]:
791         if node not in node_vol_is or volume not in node_vol_is[node]:
792           feedback_fn("  - ERROR: volume %s missing on node %s" %
793                           (volume, node))
794           bad = True
795
796     if not instanceconfig.status == 'down':
797       if not instance in node_instance[node_current]:
798         feedback_fn("  - ERROR: instance %s not running on node %s" %
799                         (instance, node_current))
800         bad = True
801
802     for node in node_instance:
803       if (not node == node_current):
804         if instance in node_instance[node]:
805           feedback_fn("  - ERROR: instance %s should not run on node %s" %
806                           (instance, node))
807           bad = True
808
809     return bad
810
811   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
812     """Verify if there are any unknown volumes in the cluster.
813
814     The .os, .swap and backup volumes are ignored. All other volumes are
815     reported as unknown.
816
817     """
818     bad = False
819
820     for node in node_vol_is:
821       for volume in node_vol_is[node]:
822         if node not in node_vol_should or volume not in node_vol_should[node]:
823           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
824                       (volume, node))
825           bad = True
826     return bad
827
828   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
829     """Verify the list of running instances.
830
831     This checks what instances are running but unknown to the cluster.
832
833     """
834     bad = False
835     for node in node_instance:
836       for runninginstance in node_instance[node]:
837         if runninginstance not in instancelist:
838           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
839                           (runninginstance, node))
840           bad = True
841     return bad
842
843   def CheckPrereq(self):
844     """Check prerequisites.
845
846     This has no prerequisites.
847
848     """
849     pass
850
851   def Exec(self, feedback_fn):
852     """Verify integrity of cluster, performing various test on nodes.
853
854     """
855     bad = False
856     feedback_fn("* Verifying global settings")
857     self.cfg.VerifyConfig()
858
859     master = self.sstore.GetMasterNode()
860     vg_name = self.cfg.GetVGName()
861     nodelist = utils.NiceSort(self.cfg.GetNodeList())
862     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
863     node_volume = {}
864     node_instance = {}
865
866     # FIXME: verify OS list
867     # do local checksums
868     file_names = list(self.sstore.GetFileList())
869     file_names.append(constants.SSL_CERT_FILE)
870     file_names.append(constants.CLUSTER_CONF_FILE)
871     local_checksums = utils.FingerprintFiles(file_names)
872
873     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
874     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
875     all_instanceinfo = rpc.call_instance_list(nodelist)
876     all_vglist = rpc.call_vg_list(nodelist)
877     node_verify_param = {
878       'filelist': file_names,
879       'nodelist': nodelist,
880       'hypervisor': None,
881       }
882     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
883     all_rversion = rpc.call_version(nodelist)
884
885     for node in nodelist:
886       feedback_fn("* Verifying node %s" % node)
887       result = self._VerifyNode(node, file_names, local_checksums,
888                                 all_vglist[node], all_nvinfo[node],
889                                 all_rversion[node], feedback_fn)
890       bad = bad or result
891
892       # node_volume
893       volumeinfo = all_volumeinfo[node]
894
895       if type(volumeinfo) != dict:
896         feedback_fn("  - ERROR: connection to %s failed" % (node,))
897         bad = True
898         continue
899
900       node_volume[node] = volumeinfo
901
902       # node_instance
903       nodeinstance = all_instanceinfo[node]
904       if type(nodeinstance) != list:
905         feedback_fn("  - ERROR: connection to %s failed" % (node,))
906         bad = True
907         continue
908
909       node_instance[node] = nodeinstance
910
911     node_vol_should = {}
912
913     for instance in instancelist:
914       feedback_fn("* Verifying instance %s" % instance)
915       result =  self._VerifyInstance(instance, node_volume, node_instance,
916                                      feedback_fn)
917       bad = bad or result
918
919       inst_config = self.cfg.GetInstanceInfo(instance)
920
921       inst_config.MapLVsByNode(node_vol_should)
922
923     feedback_fn("* Verifying orphan volumes")
924     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
925                                        feedback_fn)
926     bad = bad or result
927
928     feedback_fn("* Verifying remaining instances")
929     result = self._VerifyOrphanInstances(instancelist, node_instance,
930                                          feedback_fn)
931     bad = bad or result
932
933     return int(bad)
934
935
936 class LURenameCluster(LogicalUnit):
937   """Rename the cluster.
938
939   """
940   HPATH = "cluster-rename"
941   HTYPE = constants.HTYPE_CLUSTER
942   _OP_REQP = ["name"]
943
944   def BuildHooksEnv(self):
945     """Build hooks env.
946
947     """
948     env = {
949       "OP_TARGET": self.op.sstore.GetClusterName(),
950       "NEW_NAME": self.op.name,
951       }
952     mn = self.sstore.GetMasterNode()
953     return env, [mn], [mn]
954
955   def CheckPrereq(self):
956     """Verify that the passed name is a valid one.
957
958     """
959     hostname = utils.HostInfo(self.op.name)
960
961     new_name = hostname.name
962     self.ip = new_ip = hostname.ip
963     old_name = self.sstore.GetClusterName()
964     old_ip = self.sstore.GetMasterIP()
965     if new_name == old_name and new_ip == old_ip:
966       raise errors.OpPrereqError("Neither the name nor the IP address of the"
967                                  " cluster has changed")
968     if new_ip != old_ip:
969       result = utils.RunCmd(["fping", "-q", new_ip])
970       if not result.failed:
971         raise errors.OpPrereqError("The given cluster IP address (%s) is"
972                                    " reachable on the network. Aborting." %
973                                    new_ip)
974
975     self.op.name = new_name
976
977   def Exec(self, feedback_fn):
978     """Rename the cluster.
979
980     """
981     clustername = self.op.name
982     ip = self.ip
983     ss = self.sstore
984
985     # shutdown the master IP
986     master = ss.GetMasterNode()
987     if not rpc.call_node_stop_master(master):
988       raise errors.OpExecError("Could not disable the master role")
989
990     try:
991       # modify the sstore
992       ss.SetKey(ss.SS_MASTER_IP, ip)
993       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
994
995       # Distribute updated ss config to all nodes
996       myself = self.cfg.GetNodeInfo(master)
997       dist_nodes = self.cfg.GetNodeList()
998       if myself.name in dist_nodes:
999         dist_nodes.remove(myself.name)
1000
1001       logger.Debug("Copying updated ssconf data to all nodes")
1002       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1003         fname = ss.KeyToFilename(keyname)
1004         result = rpc.call_upload_file(dist_nodes, fname)
1005         for to_node in dist_nodes:
1006           if not result[to_node]:
1007             logger.Error("copy of file %s to node %s failed" %
1008                          (fname, to_node))
1009     finally:
1010       if not rpc.call_node_start_master(master):
1011         logger.Error("Could not re-enable the master role on the master,\n"
1012                      "please restart manually.")
1013
1014
1015 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1016   """Sleep and poll for an instance's disk to sync.
1017
1018   """
1019   if not instance.disks:
1020     return True
1021
1022   if not oneshot:
1023     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1024
1025   node = instance.primary_node
1026
1027   for dev in instance.disks:
1028     cfgw.SetDiskID(dev, node)
1029
1030   retries = 0
1031   while True:
1032     max_time = 0
1033     done = True
1034     cumul_degraded = False
1035     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1036     if not rstats:
1037       logger.ToStderr("Can't get any data from node %s" % node)
1038       retries += 1
1039       if retries >= 10:
1040         raise errors.RemoteError("Can't contact node %s for mirror data,"
1041                                  " aborting." % node)
1042       time.sleep(6)
1043       continue
1044     retries = 0
1045     for i in range(len(rstats)):
1046       mstat = rstats[i]
1047       if mstat is None:
1048         logger.ToStderr("Can't compute data for node %s/%s" %
1049                         (node, instance.disks[i].iv_name))
1050         continue
1051       perc_done, est_time, is_degraded = mstat
1052       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1053       if perc_done is not None:
1054         done = False
1055         if est_time is not None:
1056           rem_time = "%d estimated seconds remaining" % est_time
1057           max_time = est_time
1058         else:
1059           rem_time = "no time estimate"
1060         logger.ToStdout("- device %s: %5.2f%% done, %s" %
1061                         (instance.disks[i].iv_name, perc_done, rem_time))
1062     if done or oneshot:
1063       break
1064
1065     if unlock:
1066       utils.Unlock('cmd')
1067     try:
1068       time.sleep(min(60, max_time))
1069     finally:
1070       if unlock:
1071         utils.Lock('cmd')
1072
1073   if done:
1074     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1075   return not cumul_degraded
1076
1077
1078 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1079   """Check that mirrors are not degraded.
1080
1081   """
1082   cfgw.SetDiskID(dev, node)
1083
1084   result = True
1085   if on_primary or dev.AssembleOnSecondary():
1086     rstats = rpc.call_blockdev_find(node, dev)
1087     if not rstats:
1088       logger.ToStderr("Can't get any data from node %s" % node)
1089       result = False
1090     else:
1091       result = result and (not rstats[5])
1092   if dev.children:
1093     for child in dev.children:
1094       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1095
1096   return result
1097
1098
1099 class LUDiagnoseOS(NoHooksLU):
1100   """Logical unit for OS diagnose/query.
1101
1102   """
1103   _OP_REQP = []
1104
1105   def CheckPrereq(self):
1106     """Check prerequisites.
1107
1108     This always succeeds, since this is a pure query LU.
1109
1110     """
1111     return
1112
1113   def Exec(self, feedback_fn):
1114     """Compute the list of OSes.
1115
1116     """
1117     node_list = self.cfg.GetNodeList()
1118     node_data = rpc.call_os_diagnose(node_list)
1119     if node_data == False:
1120       raise errors.OpExecError("Can't gather the list of OSes")
1121     return node_data
1122
1123
1124 class LURemoveNode(LogicalUnit):
1125   """Logical unit for removing a node.
1126
1127   """
1128   HPATH = "node-remove"
1129   HTYPE = constants.HTYPE_NODE
1130   _OP_REQP = ["node_name"]
1131
1132   def BuildHooksEnv(self):
1133     """Build hooks env.
1134
1135     This doesn't run on the target node in the pre phase as a failed
1136     node would not allows itself to run.
1137
1138     """
1139     env = {
1140       "OP_TARGET": self.op.node_name,
1141       "NODE_NAME": self.op.node_name,
1142       }
1143     all_nodes = self.cfg.GetNodeList()
1144     all_nodes.remove(self.op.node_name)
1145     return env, all_nodes, all_nodes
1146
1147   def CheckPrereq(self):
1148     """Check prerequisites.
1149
1150     This checks:
1151      - the node exists in the configuration
1152      - it does not have primary or secondary instances
1153      - it's not the master
1154
1155     Any errors are signalled by raising errors.OpPrereqError.
1156
1157     """
1158     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1159     if node is None:
1160       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1161
1162     instance_list = self.cfg.GetInstanceList()
1163
1164     masternode = self.sstore.GetMasterNode()
1165     if node.name == masternode:
1166       raise errors.OpPrereqError("Node is the master node,"
1167                                  " you need to failover first.")
1168
1169     for instance_name in instance_list:
1170       instance = self.cfg.GetInstanceInfo(instance_name)
1171       if node.name == instance.primary_node:
1172         raise errors.OpPrereqError("Instance %s still running on the node,"
1173                                    " please remove first." % instance_name)
1174       if node.name in instance.secondary_nodes:
1175         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1176                                    " please remove first." % instance_name)
1177     self.op.node_name = node.name
1178     self.node = node
1179
1180   def Exec(self, feedback_fn):
1181     """Removes the node from the cluster.
1182
1183     """
1184     node = self.node
1185     logger.Info("stopping the node daemon and removing configs from node %s" %
1186                 node.name)
1187
1188     rpc.call_node_leave_cluster(node.name)
1189
1190     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1191
1192     logger.Info("Removing node %s from config" % node.name)
1193
1194     self.cfg.RemoveNode(node.name)
1195
1196
1197 class LUQueryNodes(NoHooksLU):
1198   """Logical unit for querying nodes.
1199
1200   """
1201   _OP_REQP = ["output_fields", "names"]
1202
1203   def CheckPrereq(self):
1204     """Check prerequisites.
1205
1206     This checks that the fields required are valid output fields.
1207
1208     """
1209     self.dynamic_fields = frozenset(["dtotal", "dfree",
1210                                      "mtotal", "mnode", "mfree",
1211                                      "bootid"])
1212
1213     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1214                                "pinst_list", "sinst_list",
1215                                "pip", "sip"],
1216                        dynamic=self.dynamic_fields,
1217                        selected=self.op.output_fields)
1218
1219     self.wanted = _GetWantedNodes(self, self.op.names)
1220
1221   def Exec(self, feedback_fn):
1222     """Computes the list of nodes and their attributes.
1223
1224     """
1225     nodenames = self.wanted
1226     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1227
1228     # begin data gathering
1229
1230     if self.dynamic_fields.intersection(self.op.output_fields):
1231       live_data = {}
1232       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1233       for name in nodenames:
1234         nodeinfo = node_data.get(name, None)
1235         if nodeinfo:
1236           live_data[name] = {
1237             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1238             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1239             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1240             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1241             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1242             "bootid": nodeinfo['bootid'],
1243             }
1244         else:
1245           live_data[name] = {}
1246     else:
1247       live_data = dict.fromkeys(nodenames, {})
1248
1249     node_to_primary = dict([(name, set()) for name in nodenames])
1250     node_to_secondary = dict([(name, set()) for name in nodenames])
1251
1252     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1253                              "sinst_cnt", "sinst_list"))
1254     if inst_fields & frozenset(self.op.output_fields):
1255       instancelist = self.cfg.GetInstanceList()
1256
1257       for instance_name in instancelist:
1258         inst = self.cfg.GetInstanceInfo(instance_name)
1259         if inst.primary_node in node_to_primary:
1260           node_to_primary[inst.primary_node].add(inst.name)
1261         for secnode in inst.secondary_nodes:
1262           if secnode in node_to_secondary:
1263             node_to_secondary[secnode].add(inst.name)
1264
1265     # end data gathering
1266
1267     output = []
1268     for node in nodelist:
1269       node_output = []
1270       for field in self.op.output_fields:
1271         if field == "name":
1272           val = node.name
1273         elif field == "pinst_list":
1274           val = list(node_to_primary[node.name])
1275         elif field == "sinst_list":
1276           val = list(node_to_secondary[node.name])
1277         elif field == "pinst_cnt":
1278           val = len(node_to_primary[node.name])
1279         elif field == "sinst_cnt":
1280           val = len(node_to_secondary[node.name])
1281         elif field == "pip":
1282           val = node.primary_ip
1283         elif field == "sip":
1284           val = node.secondary_ip
1285         elif field in self.dynamic_fields:
1286           val = live_data[node.name].get(field, None)
1287         else:
1288           raise errors.ParameterError(field)
1289         node_output.append(val)
1290       output.append(node_output)
1291
1292     return output
1293
1294
1295 class LUQueryNodeVolumes(NoHooksLU):
1296   """Logical unit for getting volumes on node(s).
1297
1298   """
1299   _OP_REQP = ["nodes", "output_fields"]
1300
1301   def CheckPrereq(self):
1302     """Check prerequisites.
1303
1304     This checks that the fields required are valid output fields.
1305
1306     """
1307     self.nodes = _GetWantedNodes(self, self.op.nodes)
1308
1309     _CheckOutputFields(static=["node"],
1310                        dynamic=["phys", "vg", "name", "size", "instance"],
1311                        selected=self.op.output_fields)
1312
1313
1314   def Exec(self, feedback_fn):
1315     """Computes the list of nodes and their attributes.
1316
1317     """
1318     nodenames = self.nodes
1319     volumes = rpc.call_node_volumes(nodenames)
1320
1321     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1322              in self.cfg.GetInstanceList()]
1323
1324     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1325
1326     output = []
1327     for node in nodenames:
1328       if node not in volumes or not volumes[node]:
1329         continue
1330
1331       node_vols = volumes[node][:]
1332       node_vols.sort(key=lambda vol: vol['dev'])
1333
1334       for vol in node_vols:
1335         node_output = []
1336         for field in self.op.output_fields:
1337           if field == "node":
1338             val = node
1339           elif field == "phys":
1340             val = vol['dev']
1341           elif field == "vg":
1342             val = vol['vg']
1343           elif field == "name":
1344             val = vol['name']
1345           elif field == "size":
1346             val = int(float(vol['size']))
1347           elif field == "instance":
1348             for inst in ilist:
1349               if node not in lv_by_node[inst]:
1350                 continue
1351               if vol['name'] in lv_by_node[inst][node]:
1352                 val = inst.name
1353                 break
1354             else:
1355               val = '-'
1356           else:
1357             raise errors.ParameterError(field)
1358           node_output.append(str(val))
1359
1360         output.append(node_output)
1361
1362     return output
1363
1364
1365 class LUAddNode(LogicalUnit):
1366   """Logical unit for adding node to the cluster.
1367
1368   """
1369   HPATH = "node-add"
1370   HTYPE = constants.HTYPE_NODE
1371   _OP_REQP = ["node_name"]
1372
1373   def BuildHooksEnv(self):
1374     """Build hooks env.
1375
1376     This will run on all nodes before, and on all nodes + the new node after.
1377
1378     """
1379     env = {
1380       "OP_TARGET": self.op.node_name,
1381       "NODE_NAME": self.op.node_name,
1382       "NODE_PIP": self.op.primary_ip,
1383       "NODE_SIP": self.op.secondary_ip,
1384       }
1385     nodes_0 = self.cfg.GetNodeList()
1386     nodes_1 = nodes_0 + [self.op.node_name, ]
1387     return env, nodes_0, nodes_1
1388
1389   def CheckPrereq(self):
1390     """Check prerequisites.
1391
1392     This checks:
1393      - the new node is not already in the config
1394      - it is resolvable
1395      - its parameters (single/dual homed) matches the cluster
1396
1397     Any errors are signalled by raising errors.OpPrereqError.
1398
1399     """
1400     node_name = self.op.node_name
1401     cfg = self.cfg
1402
1403     dns_data = utils.HostInfo(node_name)
1404
1405     node = dns_data.name
1406     primary_ip = self.op.primary_ip = dns_data.ip
1407     secondary_ip = getattr(self.op, "secondary_ip", None)
1408     if secondary_ip is None:
1409       secondary_ip = primary_ip
1410     if not utils.IsValidIP(secondary_ip):
1411       raise errors.OpPrereqError("Invalid secondary IP given")
1412     self.op.secondary_ip = secondary_ip
1413     node_list = cfg.GetNodeList()
1414     if node in node_list:
1415       raise errors.OpPrereqError("Node %s is already in the configuration"
1416                                  % node)
1417
1418     for existing_node_name in node_list:
1419       existing_node = cfg.GetNodeInfo(existing_node_name)
1420       if (existing_node.primary_ip == primary_ip or
1421           existing_node.secondary_ip == primary_ip or
1422           existing_node.primary_ip == secondary_ip or
1423           existing_node.secondary_ip == secondary_ip):
1424         raise errors.OpPrereqError("New node ip address(es) conflict with"
1425                                    " existing node %s" % existing_node.name)
1426
1427     # check that the type of the node (single versus dual homed) is the
1428     # same as for the master
1429     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1430     master_singlehomed = myself.secondary_ip == myself.primary_ip
1431     newbie_singlehomed = secondary_ip == primary_ip
1432     if master_singlehomed != newbie_singlehomed:
1433       if master_singlehomed:
1434         raise errors.OpPrereqError("The master has no private ip but the"
1435                                    " new node has one")
1436       else:
1437         raise errors.OpPrereqError("The master has a private ip but the"
1438                                    " new node doesn't have one")
1439
1440     # checks reachablity
1441     if not utils.TcpPing(utils.HostInfo().name,
1442                          primary_ip,
1443                          constants.DEFAULT_NODED_PORT):
1444       raise errors.OpPrereqError("Node not reachable by ping")
1445
1446     if not newbie_singlehomed:
1447       # check reachability from my secondary ip to newbie's secondary ip
1448       if not utils.TcpPing(myself.secondary_ip,
1449                            secondary_ip,
1450                            constants.DEFAULT_NODED_PORT):
1451         raise errors.OpPrereqError(
1452           "Node secondary ip not reachable by TCP based ping to noded port")
1453
1454     self.new_node = objects.Node(name=node,
1455                                  primary_ip=primary_ip,
1456                                  secondary_ip=secondary_ip)
1457
1458   def Exec(self, feedback_fn):
1459     """Adds the new node to the cluster.
1460
1461     """
1462     new_node = self.new_node
1463     node = new_node.name
1464
1465     # set up inter-node password and certificate and restarts the node daemon
1466     gntpass = self.sstore.GetNodeDaemonPassword()
1467     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1468       raise errors.OpExecError("ganeti password corruption detected")
1469     f = open(constants.SSL_CERT_FILE)
1470     try:
1471       gntpem = f.read(8192)
1472     finally:
1473       f.close()
1474     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1475     # so we use this to detect an invalid certificate; as long as the
1476     # cert doesn't contain this, the here-document will be correctly
1477     # parsed by the shell sequence below
1478     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1479       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1480     if not gntpem.endswith("\n"):
1481       raise errors.OpExecError("PEM must end with newline")
1482     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1483
1484     # and then connect with ssh to set password and start ganeti-noded
1485     # note that all the below variables are sanitized at this point,
1486     # either by being constants or by the checks above
1487     ss = self.sstore
1488     mycommand = ("umask 077 && "
1489                  "echo '%s' > '%s' && "
1490                  "cat > '%s' << '!EOF.' && \n"
1491                  "%s!EOF.\n%s restart" %
1492                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1493                   constants.SSL_CERT_FILE, gntpem,
1494                   constants.NODE_INITD_SCRIPT))
1495
1496     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1497     if result.failed:
1498       raise errors.OpExecError("Remote command on node %s, error: %s,"
1499                                " output: %s" %
1500                                (node, result.fail_reason, result.output))
1501
1502     # check connectivity
1503     time.sleep(4)
1504
1505     result = rpc.call_version([node])[node]
1506     if result:
1507       if constants.PROTOCOL_VERSION == result:
1508         logger.Info("communication to node %s fine, sw version %s match" %
1509                     (node, result))
1510       else:
1511         raise errors.OpExecError("Version mismatch master version %s,"
1512                                  " node version %s" %
1513                                  (constants.PROTOCOL_VERSION, result))
1514     else:
1515       raise errors.OpExecError("Cannot get version from the new node")
1516
1517     # setup ssh on node
1518     logger.Info("copy ssh key to node %s" % node)
1519     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1520     keyarray = []
1521     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1522                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1523                 priv_key, pub_key]
1524
1525     for i in keyfiles:
1526       f = open(i, 'r')
1527       try:
1528         keyarray.append(f.read())
1529       finally:
1530         f.close()
1531
1532     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1533                                keyarray[3], keyarray[4], keyarray[5])
1534
1535     if not result:
1536       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1537
1538     # Add node to our /etc/hosts, and add key to known_hosts
1539     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1540     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1541                       self.cfg.GetHostKey())
1542
1543     if new_node.secondary_ip != new_node.primary_ip:
1544       if not rpc.call_node_tcp_ping(new_node.name,
1545                                     constants.LOCALHOST_IP_ADDRESS,
1546                                     new_node.secondary_ip,
1547                                     constants.DEFAULT_NODED_PORT,
1548                                     10, False):
1549         raise errors.OpExecError("Node claims it doesn't have the"
1550                                  " secondary ip you gave (%s).\n"
1551                                  "Please fix and re-run this command." %
1552                                  new_node.secondary_ip)
1553
1554     success, msg = ssh.VerifyNodeHostname(node)
1555     if not success:
1556       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1557                                " than the one the resolver gives: %s.\n"
1558                                "Please fix and re-run this command." %
1559                                (node, msg))
1560
1561     # Distribute updated /etc/hosts and known_hosts to all nodes,
1562     # including the node just added
1563     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1564     dist_nodes = self.cfg.GetNodeList() + [node]
1565     if myself.name in dist_nodes:
1566       dist_nodes.remove(myself.name)
1567
1568     logger.Debug("Copying hosts and known_hosts to all nodes")
1569     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1570       result = rpc.call_upload_file(dist_nodes, fname)
1571       for to_node in dist_nodes:
1572         if not result[to_node]:
1573           logger.Error("copy of file %s to node %s failed" %
1574                        (fname, to_node))
1575
1576     to_copy = ss.GetFileList()
1577     for fname in to_copy:
1578       if not ssh.CopyFileToNode(node, fname):
1579         logger.Error("could not copy file %s to node %s" % (fname, node))
1580
1581     logger.Info("adding node %s to cluster.conf" % node)
1582     self.cfg.AddNode(new_node)
1583
1584
1585 class LUMasterFailover(LogicalUnit):
1586   """Failover the master node to the current node.
1587
1588   This is a special LU in that it must run on a non-master node.
1589
1590   """
1591   HPATH = "master-failover"
1592   HTYPE = constants.HTYPE_CLUSTER
1593   REQ_MASTER = False
1594   _OP_REQP = []
1595
1596   def BuildHooksEnv(self):
1597     """Build hooks env.
1598
1599     This will run on the new master only in the pre phase, and on all
1600     the nodes in the post phase.
1601
1602     """
1603     env = {
1604       "OP_TARGET": self.new_master,
1605       "NEW_MASTER": self.new_master,
1606       "OLD_MASTER": self.old_master,
1607       }
1608     return env, [self.new_master], self.cfg.GetNodeList()
1609
1610   def CheckPrereq(self):
1611     """Check prerequisites.
1612
1613     This checks that we are not already the master.
1614
1615     """
1616     self.new_master = utils.HostInfo().name
1617     self.old_master = self.sstore.GetMasterNode()
1618
1619     if self.old_master == self.new_master:
1620       raise errors.OpPrereqError("This commands must be run on the node"
1621                                  " where you want the new master to be.\n"
1622                                  "%s is already the master" %
1623                                  self.old_master)
1624
1625   def Exec(self, feedback_fn):
1626     """Failover the master node.
1627
1628     This command, when run on a non-master node, will cause the current
1629     master to cease being master, and the non-master to become new
1630     master.
1631
1632     """
1633     #TODO: do not rely on gethostname returning the FQDN
1634     logger.Info("setting master to %s, old master: %s" %
1635                 (self.new_master, self.old_master))
1636
1637     if not rpc.call_node_stop_master(self.old_master):
1638       logger.Error("could disable the master role on the old master"
1639                    " %s, please disable manually" % self.old_master)
1640
1641     ss = self.sstore
1642     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1643     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1644                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1645       logger.Error("could not distribute the new simple store master file"
1646                    " to the other nodes, please check.")
1647
1648     if not rpc.call_node_start_master(self.new_master):
1649       logger.Error("could not start the master role on the new master"
1650                    " %s, please check" % self.new_master)
1651       feedback_fn("Error in activating the master IP on the new master,\n"
1652                   "please fix manually.")
1653
1654
1655
1656 class LUQueryClusterInfo(NoHooksLU):
1657   """Query cluster configuration.
1658
1659   """
1660   _OP_REQP = []
1661   REQ_MASTER = False
1662
1663   def CheckPrereq(self):
1664     """No prerequsites needed for this LU.
1665
1666     """
1667     pass
1668
1669   def Exec(self, feedback_fn):
1670     """Return cluster config.
1671
1672     """
1673     result = {
1674       "name": self.sstore.GetClusterName(),
1675       "software_version": constants.RELEASE_VERSION,
1676       "protocol_version": constants.PROTOCOL_VERSION,
1677       "config_version": constants.CONFIG_VERSION,
1678       "os_api_version": constants.OS_API_VERSION,
1679       "export_version": constants.EXPORT_VERSION,
1680       "master": self.sstore.GetMasterNode(),
1681       "architecture": (platform.architecture()[0], platform.machine()),
1682       }
1683
1684     return result
1685
1686
1687 class LUClusterCopyFile(NoHooksLU):
1688   """Copy file to cluster.
1689
1690   """
1691   _OP_REQP = ["nodes", "filename"]
1692
1693   def CheckPrereq(self):
1694     """Check prerequisites.
1695
1696     It should check that the named file exists and that the given list
1697     of nodes is valid.
1698
1699     """
1700     if not os.path.exists(self.op.filename):
1701       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1702
1703     self.nodes = _GetWantedNodes(self, self.op.nodes)
1704
1705   def Exec(self, feedback_fn):
1706     """Copy a file from master to some nodes.
1707
1708     Args:
1709       opts - class with options as members
1710       args - list containing a single element, the file name
1711     Opts used:
1712       nodes - list containing the name of target nodes; if empty, all nodes
1713
1714     """
1715     filename = self.op.filename
1716
1717     myname = utils.HostInfo().name
1718
1719     for node in self.nodes:
1720       if node == myname:
1721         continue
1722       if not ssh.CopyFileToNode(node, filename):
1723         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1724
1725
1726 class LUDumpClusterConfig(NoHooksLU):
1727   """Return a text-representation of the cluster-config.
1728
1729   """
1730   _OP_REQP = []
1731
1732   def CheckPrereq(self):
1733     """No prerequisites.
1734
1735     """
1736     pass
1737
1738   def Exec(self, feedback_fn):
1739     """Dump a representation of the cluster config to the standard output.
1740
1741     """
1742     return self.cfg.DumpConfig()
1743
1744
1745 class LURunClusterCommand(NoHooksLU):
1746   """Run a command on some nodes.
1747
1748   """
1749   _OP_REQP = ["command", "nodes"]
1750
1751   def CheckPrereq(self):
1752     """Check prerequisites.
1753
1754     It checks that the given list of nodes is valid.
1755
1756     """
1757     self.nodes = _GetWantedNodes(self, self.op.nodes)
1758
1759   def Exec(self, feedback_fn):
1760     """Run a command on some nodes.
1761
1762     """
1763     data = []
1764     for node in self.nodes:
1765       result = ssh.SSHCall(node, "root", self.op.command)
1766       data.append((node, result.output, result.exit_code))
1767
1768     return data
1769
1770
1771 class LUActivateInstanceDisks(NoHooksLU):
1772   """Bring up an instance's disks.
1773
1774   """
1775   _OP_REQP = ["instance_name"]
1776
1777   def CheckPrereq(self):
1778     """Check prerequisites.
1779
1780     This checks that the instance is in the cluster.
1781
1782     """
1783     instance = self.cfg.GetInstanceInfo(
1784       self.cfg.ExpandInstanceName(self.op.instance_name))
1785     if instance is None:
1786       raise errors.OpPrereqError("Instance '%s' not known" %
1787                                  self.op.instance_name)
1788     self.instance = instance
1789
1790
1791   def Exec(self, feedback_fn):
1792     """Activate the disks.
1793
1794     """
1795     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1796     if not disks_ok:
1797       raise errors.OpExecError("Cannot activate block devices")
1798
1799     return disks_info
1800
1801
1802 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1803   """Prepare the block devices for an instance.
1804
1805   This sets up the block devices on all nodes.
1806
1807   Args:
1808     instance: a ganeti.objects.Instance object
1809     ignore_secondaries: if true, errors on secondary nodes won't result
1810                         in an error return from the function
1811
1812   Returns:
1813     false if the operation failed
1814     list of (host, instance_visible_name, node_visible_name) if the operation
1815          suceeded with the mapping from node devices to instance devices
1816   """
1817   device_info = []
1818   disks_ok = True
1819   for inst_disk in instance.disks:
1820     master_result = None
1821     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1822       cfg.SetDiskID(node_disk, node)
1823       is_primary = node == instance.primary_node
1824       result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1825       if not result:
1826         logger.Error("could not prepare block device %s on node %s (is_pri"
1827                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1828         if is_primary or not ignore_secondaries:
1829           disks_ok = False
1830       if is_primary:
1831         master_result = result
1832     device_info.append((instance.primary_node, inst_disk.iv_name,
1833                         master_result))
1834
1835   # leave the disks configured for the primary node
1836   # this is a workaround that would be fixed better by
1837   # improving the logical/physical id handling
1838   for disk in instance.disks:
1839     cfg.SetDiskID(disk, instance.primary_node)
1840
1841   return disks_ok, device_info
1842
1843
1844 def _StartInstanceDisks(cfg, instance, force):
1845   """Start the disks of an instance.
1846
1847   """
1848   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1849                                            ignore_secondaries=force)
1850   if not disks_ok:
1851     _ShutdownInstanceDisks(instance, cfg)
1852     if force is not None and not force:
1853       logger.Error("If the message above refers to a secondary node,"
1854                    " you can retry the operation using '--force'.")
1855     raise errors.OpExecError("Disk consistency error")
1856
1857
1858 class LUDeactivateInstanceDisks(NoHooksLU):
1859   """Shutdown an instance's disks.
1860
1861   """
1862   _OP_REQP = ["instance_name"]
1863
1864   def CheckPrereq(self):
1865     """Check prerequisites.
1866
1867     This checks that the instance is in the cluster.
1868
1869     """
1870     instance = self.cfg.GetInstanceInfo(
1871       self.cfg.ExpandInstanceName(self.op.instance_name))
1872     if instance is None:
1873       raise errors.OpPrereqError("Instance '%s' not known" %
1874                                  self.op.instance_name)
1875     self.instance = instance
1876
1877   def Exec(self, feedback_fn):
1878     """Deactivate the disks
1879
1880     """
1881     instance = self.instance
1882     ins_l = rpc.call_instance_list([instance.primary_node])
1883     ins_l = ins_l[instance.primary_node]
1884     if not type(ins_l) is list:
1885       raise errors.OpExecError("Can't contact node '%s'" %
1886                                instance.primary_node)
1887
1888     if self.instance.name in ins_l:
1889       raise errors.OpExecError("Instance is running, can't shutdown"
1890                                " block devices.")
1891
1892     _ShutdownInstanceDisks(instance, self.cfg)
1893
1894
1895 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1896   """Shutdown block devices of an instance.
1897
1898   This does the shutdown on all nodes of the instance.
1899
1900   If the ignore_primary is false, errors on the primary node are
1901   ignored.
1902
1903   """
1904   result = True
1905   for disk in instance.disks:
1906     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1907       cfg.SetDiskID(top_disk, node)
1908       if not rpc.call_blockdev_shutdown(node, top_disk):
1909         logger.Error("could not shutdown block device %s on node %s" %
1910                      (disk.iv_name, node))
1911         if not ignore_primary or node != instance.primary_node:
1912           result = False
1913   return result
1914
1915
1916 class LUStartupInstance(LogicalUnit):
1917   """Starts an instance.
1918
1919   """
1920   HPATH = "instance-start"
1921   HTYPE = constants.HTYPE_INSTANCE
1922   _OP_REQP = ["instance_name", "force"]
1923
1924   def BuildHooksEnv(self):
1925     """Build hooks env.
1926
1927     This runs on master, primary and secondary nodes of the instance.
1928
1929     """
1930     env = {
1931       "FORCE": self.op.force,
1932       }
1933     env.update(_BuildInstanceHookEnvByObject(self.instance))
1934     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1935           list(self.instance.secondary_nodes))
1936     return env, nl, nl
1937
1938   def CheckPrereq(self):
1939     """Check prerequisites.
1940
1941     This checks that the instance is in the cluster.
1942
1943     """
1944     instance = self.cfg.GetInstanceInfo(
1945       self.cfg.ExpandInstanceName(self.op.instance_name))
1946     if instance is None:
1947       raise errors.OpPrereqError("Instance '%s' not known" %
1948                                  self.op.instance_name)
1949
1950     # check bridges existance
1951     _CheckInstanceBridgesExist(instance)
1952
1953     self.instance = instance
1954     self.op.instance_name = instance.name
1955
1956   def Exec(self, feedback_fn):
1957     """Start the instance.
1958
1959     """
1960     instance = self.instance
1961     force = self.op.force
1962     extra_args = getattr(self.op, "extra_args", "")
1963
1964     node_current = instance.primary_node
1965
1966     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1967     if not nodeinfo:
1968       raise errors.OpExecError("Could not contact node %s for infos" %
1969                                (node_current))
1970
1971     freememory = nodeinfo[node_current]['memory_free']
1972     memory = instance.memory
1973     if memory > freememory:
1974       raise errors.OpExecError("Not enough memory to start instance"
1975                                " %s on node %s"
1976                                " needed %s MiB, available %s MiB" %
1977                                (instance.name, node_current, memory,
1978                                 freememory))
1979
1980     _StartInstanceDisks(self.cfg, instance, force)
1981
1982     if not rpc.call_instance_start(node_current, instance, extra_args):
1983       _ShutdownInstanceDisks(instance, self.cfg)
1984       raise errors.OpExecError("Could not start instance")
1985
1986     self.cfg.MarkInstanceUp(instance.name)
1987
1988
1989 class LURebootInstance(LogicalUnit):
1990   """Reboot an instance.
1991
1992   """
1993   HPATH = "instance-reboot"
1994   HTYPE = constants.HTYPE_INSTANCE
1995   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
1996
1997   def BuildHooksEnv(self):
1998     """Build hooks env.
1999
2000     This runs on master, primary and secondary nodes of the instance.
2001
2002     """
2003     env = {
2004       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2005       }
2006     env.update(_BuildInstanceHookEnvByObject(self.instance))
2007     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2008           list(self.instance.secondary_nodes))
2009     return env, nl, nl
2010
2011   def CheckPrereq(self):
2012     """Check prerequisites.
2013
2014     This checks that the instance is in the cluster.
2015
2016     """
2017     instance = self.cfg.GetInstanceInfo(
2018       self.cfg.ExpandInstanceName(self.op.instance_name))
2019     if instance is None:
2020       raise errors.OpPrereqError("Instance '%s' not known" %
2021                                  self.op.instance_name)
2022
2023     # check bridges existance
2024     _CheckInstanceBridgesExist(instance)
2025
2026     self.instance = instance
2027     self.op.instance_name = instance.name
2028
2029   def Exec(self, feedback_fn):
2030     """Reboot the instance.
2031
2032     """
2033     instance = self.instance
2034     ignore_secondaries = self.op.ignore_secondaries
2035     reboot_type = self.op.reboot_type
2036     extra_args = getattr(self.op, "extra_args", "")
2037
2038     node_current = instance.primary_node
2039
2040     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2041                            constants.INSTANCE_REBOOT_HARD,
2042                            constants.INSTANCE_REBOOT_FULL]:
2043       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2044                                   (constants.INSTANCE_REBOOT_SOFT,
2045                                    constants.INSTANCE_REBOOT_HARD,
2046                                    constants.INSTANCE_REBOOT_FULL))
2047
2048     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2049                        constants.INSTANCE_REBOOT_HARD]:
2050       if not rpc.call_instance_reboot(node_current, instance,
2051                                       reboot_type, extra_args):
2052         raise errors.OpExecError("Could not reboot instance")
2053     else:
2054       if not rpc.call_instance_shutdown(node_current, instance):
2055         raise errors.OpExecError("could not shutdown instance for full reboot")
2056       _ShutdownInstanceDisks(instance, self.cfg)
2057       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2058       if not rpc.call_instance_start(node_current, instance, extra_args):
2059         _ShutdownInstanceDisks(instance, self.cfg)
2060         raise errors.OpExecError("Could not start instance for full reboot")
2061
2062     self.cfg.MarkInstanceUp(instance.name)
2063
2064
2065 class LUShutdownInstance(LogicalUnit):
2066   """Shutdown an instance.
2067
2068   """
2069   HPATH = "instance-stop"
2070   HTYPE = constants.HTYPE_INSTANCE
2071   _OP_REQP = ["instance_name"]
2072
2073   def BuildHooksEnv(self):
2074     """Build hooks env.
2075
2076     This runs on master, primary and secondary nodes of the instance.
2077
2078     """
2079     env = _BuildInstanceHookEnvByObject(self.instance)
2080     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2081           list(self.instance.secondary_nodes))
2082     return env, nl, nl
2083
2084   def CheckPrereq(self):
2085     """Check prerequisites.
2086
2087     This checks that the instance is in the cluster.
2088
2089     """
2090     instance = self.cfg.GetInstanceInfo(
2091       self.cfg.ExpandInstanceName(self.op.instance_name))
2092     if instance is None:
2093       raise errors.OpPrereqError("Instance '%s' not known" %
2094                                  self.op.instance_name)
2095     self.instance = instance
2096
2097   def Exec(self, feedback_fn):
2098     """Shutdown the instance.
2099
2100     """
2101     instance = self.instance
2102     node_current = instance.primary_node
2103     if not rpc.call_instance_shutdown(node_current, instance):
2104       logger.Error("could not shutdown instance")
2105
2106     self.cfg.MarkInstanceDown(instance.name)
2107     _ShutdownInstanceDisks(instance, self.cfg)
2108
2109
2110 class LUReinstallInstance(LogicalUnit):
2111   """Reinstall an instance.
2112
2113   """
2114   HPATH = "instance-reinstall"
2115   HTYPE = constants.HTYPE_INSTANCE
2116   _OP_REQP = ["instance_name"]
2117
2118   def BuildHooksEnv(self):
2119     """Build hooks env.
2120
2121     This runs on master, primary and secondary nodes of the instance.
2122
2123     """
2124     env = _BuildInstanceHookEnvByObject(self.instance)
2125     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2126           list(self.instance.secondary_nodes))
2127     return env, nl, nl
2128
2129   def CheckPrereq(self):
2130     """Check prerequisites.
2131
2132     This checks that the instance is in the cluster and is not running.
2133
2134     """
2135     instance = self.cfg.GetInstanceInfo(
2136       self.cfg.ExpandInstanceName(self.op.instance_name))
2137     if instance is None:
2138       raise errors.OpPrereqError("Instance '%s' not known" %
2139                                  self.op.instance_name)
2140     if instance.disk_template == constants.DT_DISKLESS:
2141       raise errors.OpPrereqError("Instance '%s' has no disks" %
2142                                  self.op.instance_name)
2143     if instance.status != "down":
2144       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2145                                  self.op.instance_name)
2146     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2147     if remote_info:
2148       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2149                                  (self.op.instance_name,
2150                                   instance.primary_node))
2151
2152     self.op.os_type = getattr(self.op, "os_type", None)
2153     if self.op.os_type is not None:
2154       # OS verification
2155       pnode = self.cfg.GetNodeInfo(
2156         self.cfg.ExpandNodeName(instance.primary_node))
2157       if pnode is None:
2158         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2159                                    self.op.pnode)
2160       os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2161       if not isinstance(os_obj, objects.OS):
2162         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2163                                    " primary node"  % self.op.os_type)
2164
2165     self.instance = instance
2166
2167   def Exec(self, feedback_fn):
2168     """Reinstall the instance.
2169
2170     """
2171     inst = self.instance
2172
2173     if self.op.os_type is not None:
2174       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2175       inst.os = self.op.os_type
2176       self.cfg.AddInstance(inst)
2177
2178     _StartInstanceDisks(self.cfg, inst, None)
2179     try:
2180       feedback_fn("Running the instance OS create scripts...")
2181       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2182         raise errors.OpExecError("Could not install OS for instance %s "
2183                                  "on node %s" %
2184                                  (inst.name, inst.primary_node))
2185     finally:
2186       _ShutdownInstanceDisks(inst, self.cfg)
2187
2188
2189 class LURenameInstance(LogicalUnit):
2190   """Rename an instance.
2191
2192   """
2193   HPATH = "instance-rename"
2194   HTYPE = constants.HTYPE_INSTANCE
2195   _OP_REQP = ["instance_name", "new_name"]
2196
2197   def BuildHooksEnv(self):
2198     """Build hooks env.
2199
2200     This runs on master, primary and secondary nodes of the instance.
2201
2202     """
2203     env = _BuildInstanceHookEnvByObject(self.instance)
2204     env["INSTANCE_NEW_NAME"] = self.op.new_name
2205     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2206           list(self.instance.secondary_nodes))
2207     return env, nl, nl
2208
2209   def CheckPrereq(self):
2210     """Check prerequisites.
2211
2212     This checks that the instance is in the cluster and is not running.
2213
2214     """
2215     instance = self.cfg.GetInstanceInfo(
2216       self.cfg.ExpandInstanceName(self.op.instance_name))
2217     if instance is None:
2218       raise errors.OpPrereqError("Instance '%s' not known" %
2219                                  self.op.instance_name)
2220     if instance.status != "down":
2221       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2222                                  self.op.instance_name)
2223     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2224     if remote_info:
2225       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2226                                  (self.op.instance_name,
2227                                   instance.primary_node))
2228     self.instance = instance
2229
2230     # new name verification
2231     name_info = utils.HostInfo(self.op.new_name)
2232
2233     self.op.new_name = new_name = name_info.name
2234     if not getattr(self.op, "ignore_ip", False):
2235       command = ["fping", "-q", name_info.ip]
2236       result = utils.RunCmd(command)
2237       if not result.failed:
2238         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2239                                    (name_info.ip, new_name))
2240
2241
2242   def Exec(self, feedback_fn):
2243     """Reinstall the instance.
2244
2245     """
2246     inst = self.instance
2247     old_name = inst.name
2248
2249     self.cfg.RenameInstance(inst.name, self.op.new_name)
2250
2251     # re-read the instance from the configuration after rename
2252     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2253
2254     _StartInstanceDisks(self.cfg, inst, None)
2255     try:
2256       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2257                                           "sda", "sdb"):
2258         msg = ("Could run OS rename script for instance %s\n"
2259                "on node %s\n"
2260                "(but the instance has been renamed in Ganeti)" %
2261                (inst.name, inst.primary_node))
2262         logger.Error(msg)
2263     finally:
2264       _ShutdownInstanceDisks(inst, self.cfg)
2265
2266
2267 class LURemoveInstance(LogicalUnit):
2268   """Remove an instance.
2269
2270   """
2271   HPATH = "instance-remove"
2272   HTYPE = constants.HTYPE_INSTANCE
2273   _OP_REQP = ["instance_name"]
2274
2275   def BuildHooksEnv(self):
2276     """Build hooks env.
2277
2278     This runs on master, primary and secondary nodes of the instance.
2279
2280     """
2281     env = _BuildInstanceHookEnvByObject(self.instance)
2282     nl = [self.sstore.GetMasterNode()]
2283     return env, nl, nl
2284
2285   def CheckPrereq(self):
2286     """Check prerequisites.
2287
2288     This checks that the instance is in the cluster.
2289
2290     """
2291     instance = self.cfg.GetInstanceInfo(
2292       self.cfg.ExpandInstanceName(self.op.instance_name))
2293     if instance is None:
2294       raise errors.OpPrereqError("Instance '%s' not known" %
2295                                  self.op.instance_name)
2296     self.instance = instance
2297
2298   def Exec(self, feedback_fn):
2299     """Remove the instance.
2300
2301     """
2302     instance = self.instance
2303     logger.Info("shutting down instance %s on node %s" %
2304                 (instance.name, instance.primary_node))
2305
2306     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2307       if self.op.ignore_failures:
2308         feedback_fn("Warning: can't shutdown instance")
2309       else:
2310         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2311                                  (instance.name, instance.primary_node))
2312
2313     logger.Info("removing block devices for instance %s" % instance.name)
2314
2315     if not _RemoveDisks(instance, self.cfg):
2316       if self.op.ignore_failures:
2317         feedback_fn("Warning: can't remove instance's disks")
2318       else:
2319         raise errors.OpExecError("Can't remove instance's disks")
2320
2321     logger.Info("removing instance %s out of cluster config" % instance.name)
2322
2323     self.cfg.RemoveInstance(instance.name)
2324
2325
2326 class LUQueryInstances(NoHooksLU):
2327   """Logical unit for querying instances.
2328
2329   """
2330   _OP_REQP = ["output_fields", "names"]
2331
2332   def CheckPrereq(self):
2333     """Check prerequisites.
2334
2335     This checks that the fields required are valid output fields.
2336
2337     """
2338     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2339     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2340                                "admin_state", "admin_ram",
2341                                "disk_template", "ip", "mac", "bridge",
2342                                "sda_size", "sdb_size"],
2343                        dynamic=self.dynamic_fields,
2344                        selected=self.op.output_fields)
2345
2346     self.wanted = _GetWantedInstances(self, self.op.names)
2347
2348   def Exec(self, feedback_fn):
2349     """Computes the list of nodes and their attributes.
2350
2351     """
2352     instance_names = self.wanted
2353     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2354                      in instance_names]
2355
2356     # begin data gathering
2357
2358     nodes = frozenset([inst.primary_node for inst in instance_list])
2359
2360     bad_nodes = []
2361     if self.dynamic_fields.intersection(self.op.output_fields):
2362       live_data = {}
2363       node_data = rpc.call_all_instances_info(nodes)
2364       for name in nodes:
2365         result = node_data[name]
2366         if result:
2367           live_data.update(result)
2368         elif result == False:
2369           bad_nodes.append(name)
2370         # else no instance is alive
2371     else:
2372       live_data = dict([(name, {}) for name in instance_names])
2373
2374     # end data gathering
2375
2376     output = []
2377     for instance in instance_list:
2378       iout = []
2379       for field in self.op.output_fields:
2380         if field == "name":
2381           val = instance.name
2382         elif field == "os":
2383           val = instance.os
2384         elif field == "pnode":
2385           val = instance.primary_node
2386         elif field == "snodes":
2387           val = list(instance.secondary_nodes)
2388         elif field == "admin_state":
2389           val = (instance.status != "down")
2390         elif field == "oper_state":
2391           if instance.primary_node in bad_nodes:
2392             val = None
2393           else:
2394             val = bool(live_data.get(instance.name))
2395         elif field == "admin_ram":
2396           val = instance.memory
2397         elif field == "oper_ram":
2398           if instance.primary_node in bad_nodes:
2399             val = None
2400           elif instance.name in live_data:
2401             val = live_data[instance.name].get("memory", "?")
2402           else:
2403             val = "-"
2404         elif field == "disk_template":
2405           val = instance.disk_template
2406         elif field == "ip":
2407           val = instance.nics[0].ip
2408         elif field == "bridge":
2409           val = instance.nics[0].bridge
2410         elif field == "mac":
2411           val = instance.nics[0].mac
2412         elif field == "sda_size" or field == "sdb_size":
2413           disk = instance.FindDisk(field[:3])
2414           if disk is None:
2415             val = None
2416           else:
2417             val = disk.size
2418         else:
2419           raise errors.ParameterError(field)
2420         iout.append(val)
2421       output.append(iout)
2422
2423     return output
2424
2425
2426 class LUFailoverInstance(LogicalUnit):
2427   """Failover an instance.
2428
2429   """
2430   HPATH = "instance-failover"
2431   HTYPE = constants.HTYPE_INSTANCE
2432   _OP_REQP = ["instance_name", "ignore_consistency"]
2433
2434   def BuildHooksEnv(self):
2435     """Build hooks env.
2436
2437     This runs on master, primary and secondary nodes of the instance.
2438
2439     """
2440     env = {
2441       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2442       }
2443     env.update(_BuildInstanceHookEnvByObject(self.instance))
2444     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2445     return env, nl, nl
2446
2447   def CheckPrereq(self):
2448     """Check prerequisites.
2449
2450     This checks that the instance is in the cluster.
2451
2452     """
2453     instance = self.cfg.GetInstanceInfo(
2454       self.cfg.ExpandInstanceName(self.op.instance_name))
2455     if instance is None:
2456       raise errors.OpPrereqError("Instance '%s' not known" %
2457                                  self.op.instance_name)
2458
2459     if instance.disk_template not in constants.DTS_NET_MIRROR:
2460       raise errors.OpPrereqError("Instance's disk layout is not"
2461                                  " network mirrored, cannot failover.")
2462
2463     secondary_nodes = instance.secondary_nodes
2464     if not secondary_nodes:
2465       raise errors.ProgrammerError("no secondary node but using "
2466                                    "DT_REMOTE_RAID1 template")
2467
2468     # check memory requirements on the secondary node
2469     target_node = secondary_nodes[0]
2470     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2471     info = nodeinfo.get(target_node, None)
2472     if not info:
2473       raise errors.OpPrereqError("Cannot get current information"
2474                                  " from node '%s'" % nodeinfo)
2475     if instance.memory > info['memory_free']:
2476       raise errors.OpPrereqError("Not enough memory on target node %s."
2477                                  " %d MB available, %d MB required" %
2478                                  (target_node, info['memory_free'],
2479                                   instance.memory))
2480
2481     # check bridge existance
2482     brlist = [nic.bridge for nic in instance.nics]
2483     if not rpc.call_bridges_exist(target_node, brlist):
2484       raise errors.OpPrereqError("One or more target bridges %s does not"
2485                                  " exist on destination node '%s'" %
2486                                  (brlist, target_node))
2487
2488     self.instance = instance
2489
2490   def Exec(self, feedback_fn):
2491     """Failover an instance.
2492
2493     The failover is done by shutting it down on its present node and
2494     starting it on the secondary.
2495
2496     """
2497     instance = self.instance
2498
2499     source_node = instance.primary_node
2500     target_node = instance.secondary_nodes[0]
2501
2502     feedback_fn("* checking disk consistency between source and target")
2503     for dev in instance.disks:
2504       # for remote_raid1, these are md over drbd
2505       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2506         if not self.op.ignore_consistency:
2507           raise errors.OpExecError("Disk %s is degraded on target node,"
2508                                    " aborting failover." % dev.iv_name)
2509
2510     feedback_fn("* checking target node resource availability")
2511     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2512
2513     if not nodeinfo:
2514       raise errors.OpExecError("Could not contact target node %s." %
2515                                target_node)
2516
2517     free_memory = int(nodeinfo[target_node]['memory_free'])
2518     memory = instance.memory
2519     if memory > free_memory:
2520       raise errors.OpExecError("Not enough memory to create instance %s on"
2521                                " node %s. needed %s MiB, available %s MiB" %
2522                                (instance.name, target_node, memory,
2523                                 free_memory))
2524
2525     feedback_fn("* shutting down instance on source node")
2526     logger.Info("Shutting down instance %s on node %s" %
2527                 (instance.name, source_node))
2528
2529     if not rpc.call_instance_shutdown(source_node, instance):
2530       logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2531                    " anyway. Please make sure node %s is down"  %
2532                    (instance.name, source_node, source_node))
2533
2534     feedback_fn("* deactivating the instance's disks on source node")
2535     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2536       raise errors.OpExecError("Can't shut down the instance's disks.")
2537
2538     instance.primary_node = target_node
2539     # distribute new instance config to the other nodes
2540     self.cfg.AddInstance(instance)
2541
2542     feedback_fn("* activating the instance's disks on target node")
2543     logger.Info("Starting instance %s on node %s" %
2544                 (instance.name, target_node))
2545
2546     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2547                                              ignore_secondaries=True)
2548     if not disks_ok:
2549       _ShutdownInstanceDisks(instance, self.cfg)
2550       raise errors.OpExecError("Can't activate the instance's disks")
2551
2552     feedback_fn("* starting the instance on the target node")
2553     if not rpc.call_instance_start(target_node, instance, None):
2554       _ShutdownInstanceDisks(instance, self.cfg)
2555       raise errors.OpExecError("Could not start instance %s on node %s." %
2556                                (instance.name, target_node))
2557
2558
2559 def _CreateBlockDevOnPrimary(cfg, node, device, info):
2560   """Create a tree of block devices on the primary node.
2561
2562   This always creates all devices.
2563
2564   """
2565   if device.children:
2566     for child in device.children:
2567       if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2568         return False
2569
2570   cfg.SetDiskID(device, node)
2571   new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2572   if not new_id:
2573     return False
2574   if device.physical_id is None:
2575     device.physical_id = new_id
2576   return True
2577
2578
2579 def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2580   """Create a tree of block devices on a secondary node.
2581
2582   If this device type has to be created on secondaries, create it and
2583   all its children.
2584
2585   If not, just recurse to children keeping the same 'force' value.
2586
2587   """
2588   if device.CreateOnSecondary():
2589     force = True
2590   if device.children:
2591     for child in device.children:
2592       if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2593         return False
2594
2595   if not force:
2596     return True
2597   cfg.SetDiskID(device, node)
2598   new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2599   if not new_id:
2600     return False
2601   if device.physical_id is None:
2602     device.physical_id = new_id
2603   return True
2604
2605
2606 def _GenerateUniqueNames(cfg, exts):
2607   """Generate a suitable LV name.
2608
2609   This will generate a logical volume name for the given instance.
2610
2611   """
2612   results = []
2613   for val in exts:
2614     new_id = cfg.GenerateUniqueID()
2615     results.append("%s%s" % (new_id, val))
2616   return results
2617
2618
2619 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2620   """Generate a drbd device complete with its children.
2621
2622   """
2623   port = cfg.AllocatePort()
2624   vgname = cfg.GetVGName()
2625   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2626                           logical_id=(vgname, names[0]))
2627   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2628                           logical_id=(vgname, names[1]))
2629   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2630                           logical_id = (primary, secondary, port),
2631                           children = [dev_data, dev_meta])
2632   return drbd_dev
2633
2634
2635 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2636   """Generate a drbd8 device complete with its children.
2637
2638   """
2639   port = cfg.AllocatePort()
2640   vgname = cfg.GetVGName()
2641   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2642                           logical_id=(vgname, names[0]))
2643   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2644                           logical_id=(vgname, names[1]))
2645   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2646                           logical_id = (primary, secondary, port),
2647                           children = [dev_data, dev_meta],
2648                           iv_name=iv_name)
2649   return drbd_dev
2650
2651 def _GenerateDiskTemplate(cfg, template_name,
2652                           instance_name, primary_node,
2653                           secondary_nodes, disk_sz, swap_sz):
2654   """Generate the entire disk layout for a given template type.
2655
2656   """
2657   #TODO: compute space requirements
2658
2659   vgname = cfg.GetVGName()
2660   if template_name == "diskless":
2661     disks = []
2662   elif template_name == "plain":
2663     if len(secondary_nodes) != 0:
2664       raise errors.ProgrammerError("Wrong template configuration")
2665
2666     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2667     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2668                            logical_id=(vgname, names[0]),
2669                            iv_name = "sda")
2670     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2671                            logical_id=(vgname, names[1]),
2672                            iv_name = "sdb")
2673     disks = [sda_dev, sdb_dev]
2674   elif template_name == "local_raid1":
2675     if len(secondary_nodes) != 0:
2676       raise errors.ProgrammerError("Wrong template configuration")
2677
2678
2679     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2680                                        ".sdb_m1", ".sdb_m2"])
2681     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2682                               logical_id=(vgname, names[0]))
2683     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2684                               logical_id=(vgname, names[1]))
2685     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2686                               size=disk_sz,
2687                               children = [sda_dev_m1, sda_dev_m2])
2688     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2689                               logical_id=(vgname, names[2]))
2690     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2691                               logical_id=(vgname, names[3]))
2692     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2693                               size=swap_sz,
2694                               children = [sdb_dev_m1, sdb_dev_m2])
2695     disks = [md_sda_dev, md_sdb_dev]
2696   elif template_name == constants.DT_REMOTE_RAID1:
2697     if len(secondary_nodes) != 1:
2698       raise errors.ProgrammerError("Wrong template configuration")
2699     remote_node = secondary_nodes[0]
2700     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2701                                        ".sdb_data", ".sdb_meta"])
2702     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2703                                          disk_sz, names[0:2])
2704     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2705                               children = [drbd_sda_dev], size=disk_sz)
2706     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2707                                          swap_sz, names[2:4])
2708     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2709                               children = [drbd_sdb_dev], size=swap_sz)
2710     disks = [md_sda_dev, md_sdb_dev]
2711   elif template_name == constants.DT_DRBD8:
2712     if len(secondary_nodes) != 1:
2713       raise errors.ProgrammerError("Wrong template configuration")
2714     remote_node = secondary_nodes[0]
2715     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2716                                        ".sdb_data", ".sdb_meta"])
2717     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2718                                          disk_sz, names[0:2], "sda")
2719     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2720                                          swap_sz, names[2:4], "sdb")
2721     disks = [drbd_sda_dev, drbd_sdb_dev]
2722   else:
2723     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2724   return disks
2725
2726
2727 def _GetInstanceInfoText(instance):
2728   """Compute that text that should be added to the disk's metadata.
2729
2730   """
2731   return "originstname+%s" % instance.name
2732
2733
2734 def _CreateDisks(cfg, instance):
2735   """Create all disks for an instance.
2736
2737   This abstracts away some work from AddInstance.
2738
2739   Args:
2740     instance: the instance object
2741
2742   Returns:
2743     True or False showing the success of the creation process
2744
2745   """
2746   info = _GetInstanceInfoText(instance)
2747
2748   for device in instance.disks:
2749     logger.Info("creating volume %s for instance %s" %
2750               (device.iv_name, instance.name))
2751     #HARDCODE
2752     for secondary_node in instance.secondary_nodes:
2753       if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2754                                         info):
2755         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2756                      (device.iv_name, device, secondary_node))
2757         return False
2758     #HARDCODE
2759     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2760       logger.Error("failed to create volume %s on primary!" %
2761                    device.iv_name)
2762       return False
2763   return True
2764
2765
2766 def _RemoveDisks(instance, cfg):
2767   """Remove all disks for an instance.
2768
2769   This abstracts away some work from `AddInstance()` and
2770   `RemoveInstance()`. Note that in case some of the devices couldn't
2771   be removed, the removal will continue with the other ones (compare
2772   with `_CreateDisks()`).
2773
2774   Args:
2775     instance: the instance object
2776
2777   Returns:
2778     True or False showing the success of the removal proces
2779
2780   """
2781   logger.Info("removing block devices for instance %s" % instance.name)
2782
2783   result = True
2784   for device in instance.disks:
2785     for node, disk in device.ComputeNodeTree(instance.primary_node):
2786       cfg.SetDiskID(disk, node)
2787       if not rpc.call_blockdev_remove(node, disk):
2788         logger.Error("could not remove block device %s on node %s,"
2789                      " continuing anyway" %
2790                      (device.iv_name, node))
2791         result = False
2792   return result
2793
2794
2795 class LUCreateInstance(LogicalUnit):
2796   """Create an instance.
2797
2798   """
2799   HPATH = "instance-add"
2800   HTYPE = constants.HTYPE_INSTANCE
2801   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2802               "disk_template", "swap_size", "mode", "start", "vcpus",
2803               "wait_for_sync", "ip_check"]
2804
2805   def BuildHooksEnv(self):
2806     """Build hooks env.
2807
2808     This runs on master, primary and secondary nodes of the instance.
2809
2810     """
2811     env = {
2812       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2813       "INSTANCE_DISK_SIZE": self.op.disk_size,
2814       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2815       "INSTANCE_ADD_MODE": self.op.mode,
2816       }
2817     if self.op.mode == constants.INSTANCE_IMPORT:
2818       env["INSTANCE_SRC_NODE"] = self.op.src_node
2819       env["INSTANCE_SRC_PATH"] = self.op.src_path
2820       env["INSTANCE_SRC_IMAGE"] = self.src_image
2821
2822     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2823       primary_node=self.op.pnode,
2824       secondary_nodes=self.secondaries,
2825       status=self.instance_status,
2826       os_type=self.op.os_type,
2827       memory=self.op.mem_size,
2828       vcpus=self.op.vcpus,
2829       nics=[(self.inst_ip, self.op.bridge)],
2830     ))
2831
2832     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2833           self.secondaries)
2834     return env, nl, nl
2835
2836
2837   def CheckPrereq(self):
2838     """Check prerequisites.
2839
2840     """
2841     if self.op.mode not in (constants.INSTANCE_CREATE,
2842                             constants.INSTANCE_IMPORT):
2843       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2844                                  self.op.mode)
2845
2846     if self.op.mode == constants.INSTANCE_IMPORT:
2847       src_node = getattr(self.op, "src_node", None)
2848       src_path = getattr(self.op, "src_path", None)
2849       if src_node is None or src_path is None:
2850         raise errors.OpPrereqError("Importing an instance requires source"
2851                                    " node and path options")
2852       src_node_full = self.cfg.ExpandNodeName(src_node)
2853       if src_node_full is None:
2854         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2855       self.op.src_node = src_node = src_node_full
2856
2857       if not os.path.isabs(src_path):
2858         raise errors.OpPrereqError("The source path must be absolute")
2859
2860       export_info = rpc.call_export_info(src_node, src_path)
2861
2862       if not export_info:
2863         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2864
2865       if not export_info.has_section(constants.INISECT_EXP):
2866         raise errors.ProgrammerError("Corrupted export config")
2867
2868       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2869       if (int(ei_version) != constants.EXPORT_VERSION):
2870         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2871                                    (ei_version, constants.EXPORT_VERSION))
2872
2873       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2874         raise errors.OpPrereqError("Can't import instance with more than"
2875                                    " one data disk")
2876
2877       # FIXME: are the old os-es, disk sizes, etc. useful?
2878       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2879       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2880                                                          'disk0_dump'))
2881       self.src_image = diskimage
2882     else: # INSTANCE_CREATE
2883       if getattr(self.op, "os_type", None) is None:
2884         raise errors.OpPrereqError("No guest OS specified")
2885
2886     # check primary node
2887     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2888     if pnode is None:
2889       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2890                                  self.op.pnode)
2891     self.op.pnode = pnode.name
2892     self.pnode = pnode
2893     self.secondaries = []
2894     # disk template and mirror node verification
2895     if self.op.disk_template not in constants.DISK_TEMPLATES:
2896       raise errors.OpPrereqError("Invalid disk template name")
2897
2898     if self.op.disk_template in constants.DTS_NET_MIRROR:
2899       if getattr(self.op, "snode", None) is None:
2900         raise errors.OpPrereqError("The networked disk templates need"
2901                                    " a mirror node")
2902
2903       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2904       if snode_name is None:
2905         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2906                                    self.op.snode)
2907       elif snode_name == pnode.name:
2908         raise errors.OpPrereqError("The secondary node cannot be"
2909                                    " the primary node.")
2910       self.secondaries.append(snode_name)
2911
2912     # Check lv size requirements
2913     nodenames = [pnode.name] + self.secondaries
2914     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2915
2916     # Required free disk space as a function of disk and swap space
2917     req_size_dict = {
2918       constants.DT_DISKLESS: 0,
2919       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2920       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2921       # 256 MB are added for drbd metadata, 128MB for each drbd device
2922       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2923       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2924     }
2925
2926     if self.op.disk_template not in req_size_dict:
2927       raise errors.ProgrammerError("Disk template '%s' size requirement"
2928                                    " is unknown" %  self.op.disk_template)
2929
2930     req_size = req_size_dict[self.op.disk_template]
2931
2932     for node in nodenames:
2933       info = nodeinfo.get(node, None)
2934       if not info:
2935         raise errors.OpPrereqError("Cannot get current information"
2936                                    " from node '%s'" % nodeinfo)
2937       if req_size > info['vg_free']:
2938         raise errors.OpPrereqError("Not enough disk space on target node %s."
2939                                    " %d MB available, %d MB required" %
2940                                    (node, info['vg_free'], req_size))
2941
2942     # os verification
2943     os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2944     if not isinstance(os_obj, objects.OS):
2945       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2946                                  " primary node"  % self.op.os_type)
2947
2948     # instance verification
2949     hostname1 = utils.HostInfo(self.op.instance_name)
2950
2951     self.op.instance_name = instance_name = hostname1.name
2952     instance_list = self.cfg.GetInstanceList()
2953     if instance_name in instance_list:
2954       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2955                                  instance_name)
2956
2957     ip = getattr(self.op, "ip", None)
2958     if ip is None or ip.lower() == "none":
2959       inst_ip = None
2960     elif ip.lower() == "auto":
2961       inst_ip = hostname1.ip
2962     else:
2963       if not utils.IsValidIP(ip):
2964         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2965                                    " like a valid IP" % ip)
2966       inst_ip = ip
2967     self.inst_ip = inst_ip
2968
2969     if self.op.start and not self.op.ip_check:
2970       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2971                                  " adding an instance in start mode")
2972
2973     if self.op.ip_check:
2974       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2975                        constants.DEFAULT_NODED_PORT):
2976         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2977                                    (hostname1.ip, instance_name))
2978
2979     # bridge verification
2980     bridge = getattr(self.op, "bridge", None)
2981     if bridge is None:
2982       self.op.bridge = self.cfg.GetDefBridge()
2983     else:
2984       self.op.bridge = bridge
2985
2986     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2987       raise errors.OpPrereqError("target bridge '%s' does not exist on"
2988                                  " destination node '%s'" %
2989                                  (self.op.bridge, pnode.name))
2990
2991     if self.op.start:
2992       self.instance_status = 'up'
2993     else:
2994       self.instance_status = 'down'
2995
2996   def Exec(self, feedback_fn):
2997     """Create and add the instance to the cluster.
2998
2999     """
3000     instance = self.op.instance_name
3001     pnode_name = self.pnode.name
3002
3003     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
3004     if self.inst_ip is not None:
3005       nic.ip = self.inst_ip
3006
3007     disks = _GenerateDiskTemplate(self.cfg,
3008                                   self.op.disk_template,
3009                                   instance, pnode_name,
3010                                   self.secondaries, self.op.disk_size,
3011                                   self.op.swap_size)
3012
3013     iobj = objects.Instance(name=instance, os=self.op.os_type,
3014                             primary_node=pnode_name,
3015                             memory=self.op.mem_size,
3016                             vcpus=self.op.vcpus,
3017                             nics=[nic], disks=disks,
3018                             disk_template=self.op.disk_template,
3019                             status=self.instance_status,
3020                             )
3021
3022     feedback_fn("* creating instance disks...")
3023     if not _CreateDisks(self.cfg, iobj):
3024       _RemoveDisks(iobj, self.cfg)
3025       raise errors.OpExecError("Device creation failed, reverting...")
3026
3027     feedback_fn("adding instance %s to cluster config" % instance)
3028
3029     self.cfg.AddInstance(iobj)
3030
3031     if self.op.wait_for_sync:
3032       disk_abort = not _WaitForSync(self.cfg, iobj)
3033     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3034       # make sure the disks are not degraded (still sync-ing is ok)
3035       time.sleep(15)
3036       feedback_fn("* checking mirrors status")
3037       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
3038     else:
3039       disk_abort = False
3040
3041     if disk_abort:
3042       _RemoveDisks(iobj, self.cfg)
3043       self.cfg.RemoveInstance(iobj.name)
3044       raise errors.OpExecError("There are some degraded disks for"
3045                                " this instance")
3046
3047     feedback_fn("creating os for instance %s on node %s" %
3048                 (instance, pnode_name))
3049
3050     if iobj.disk_template != constants.DT_DISKLESS:
3051       if self.op.mode == constants.INSTANCE_CREATE:
3052         feedback_fn("* running the instance OS create scripts...")
3053         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3054           raise errors.OpExecError("could not add os for instance %s"
3055                                    " on node %s" %
3056                                    (instance, pnode_name))
3057
3058       elif self.op.mode == constants.INSTANCE_IMPORT:
3059         feedback_fn("* running the instance OS import scripts...")
3060         src_node = self.op.src_node
3061         src_image = self.src_image
3062         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3063                                                 src_node, src_image):
3064           raise errors.OpExecError("Could not import os for instance"
3065                                    " %s on node %s" %
3066                                    (instance, pnode_name))
3067       else:
3068         # also checked in the prereq part
3069         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3070                                      % self.op.mode)
3071
3072     if self.op.start:
3073       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3074       feedback_fn("* starting instance...")
3075       if not rpc.call_instance_start(pnode_name, iobj, None):
3076         raise errors.OpExecError("Could not start instance")
3077
3078
3079 class LUConnectConsole(NoHooksLU):
3080   """Connect to an instance's console.
3081
3082   This is somewhat special in that it returns the command line that
3083   you need to run on the master node in order to connect to the
3084   console.
3085
3086   """
3087   _OP_REQP = ["instance_name"]
3088
3089   def CheckPrereq(self):
3090     """Check prerequisites.
3091
3092     This checks that the instance is in the cluster.
3093
3094     """
3095     instance = self.cfg.GetInstanceInfo(
3096       self.cfg.ExpandInstanceName(self.op.instance_name))
3097     if instance is None:
3098       raise errors.OpPrereqError("Instance '%s' not known" %
3099                                  self.op.instance_name)
3100     self.instance = instance
3101
3102   def Exec(self, feedback_fn):
3103     """Connect to the console of an instance
3104
3105     """
3106     instance = self.instance
3107     node = instance.primary_node
3108
3109     node_insts = rpc.call_instance_list([node])[node]
3110     if node_insts is False:
3111       raise errors.OpExecError("Can't connect to node %s." % node)
3112
3113     if instance.name not in node_insts:
3114       raise errors.OpExecError("Instance %s is not running." % instance.name)
3115
3116     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3117
3118     hyper = hypervisor.GetHypervisor()
3119     console_cmd = hyper.GetShellCommandForConsole(instance.name)
3120     # build ssh cmdline
3121     argv = ["ssh", "-q", "-t"]
3122     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3123     argv.extend(ssh.BATCH_MODE_OPTS)
3124     argv.append(node)
3125     argv.append(console_cmd)
3126     return "ssh", argv
3127
3128
3129 class LUAddMDDRBDComponent(LogicalUnit):
3130   """Adda new mirror member to an instance's disk.
3131
3132   """
3133   HPATH = "mirror-add"
3134   HTYPE = constants.HTYPE_INSTANCE
3135   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3136
3137   def BuildHooksEnv(self):
3138     """Build hooks env.
3139
3140     This runs on the master, the primary and all the secondaries.
3141
3142     """
3143     env = {
3144       "NEW_SECONDARY": self.op.remote_node,
3145       "DISK_NAME": self.op.disk_name,
3146       }
3147     env.update(_BuildInstanceHookEnvByObject(self.instance))
3148     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3149           self.op.remote_node,] + list(self.instance.secondary_nodes)
3150     return env, nl, nl
3151
3152   def CheckPrereq(self):
3153     """Check prerequisites.
3154
3155     This checks that the instance is in the cluster.
3156
3157     """
3158     instance = self.cfg.GetInstanceInfo(
3159       self.cfg.ExpandInstanceName(self.op.instance_name))
3160     if instance is None:
3161       raise errors.OpPrereqError("Instance '%s' not known" %
3162                                  self.op.instance_name)
3163     self.instance = instance
3164
3165     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3166     if remote_node is None:
3167       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3168     self.remote_node = remote_node
3169
3170     if remote_node == instance.primary_node:
3171       raise errors.OpPrereqError("The specified node is the primary node of"
3172                                  " the instance.")
3173
3174     if instance.disk_template != constants.DT_REMOTE_RAID1:
3175       raise errors.OpPrereqError("Instance's disk layout is not"
3176                                  " remote_raid1.")
3177     for disk in instance.disks:
3178       if disk.iv_name == self.op.disk_name:
3179         break
3180     else:
3181       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3182                                  " instance." % self.op.disk_name)
3183     if len(disk.children) > 1:
3184       raise errors.OpPrereqError("The device already has two slave"
3185                                  " devices.\n"
3186                                  "This would create a 3-disk raid1"
3187                                  " which we don't allow.")
3188     self.disk = disk
3189
3190   def Exec(self, feedback_fn):
3191     """Add the mirror component
3192
3193     """
3194     disk = self.disk
3195     instance = self.instance
3196
3197     remote_node = self.remote_node
3198     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3199     names = _GenerateUniqueNames(self.cfg, lv_names)
3200     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3201                                      remote_node, disk.size, names)
3202
3203     logger.Info("adding new mirror component on secondary")
3204     #HARDCODE
3205     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
3206                                       _GetInstanceInfoText(instance)):
3207       raise errors.OpExecError("Failed to create new component on secondary"
3208                                " node %s" % remote_node)
3209
3210     logger.Info("adding new mirror component on primary")
3211     #HARDCODE
3212     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
3213                                     _GetInstanceInfoText(instance)):
3214       # remove secondary dev
3215       self.cfg.SetDiskID(new_drbd, remote_node)
3216       rpc.call_blockdev_remove(remote_node, new_drbd)
3217       raise errors.OpExecError("Failed to create volume on primary")
3218
3219     # the device exists now
3220     # call the primary node to add the mirror to md
3221     logger.Info("adding new mirror component to md")
3222     if not rpc.call_blockdev_addchildren(instance.primary_node,
3223                                          disk, [new_drbd]):
3224       logger.Error("Can't add mirror compoment to md!")
3225       self.cfg.SetDiskID(new_drbd, remote_node)
3226       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3227         logger.Error("Can't rollback on secondary")
3228       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3229       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3230         logger.Error("Can't rollback on primary")
3231       raise errors.OpExecError("Can't add mirror component to md array")
3232
3233     disk.children.append(new_drbd)
3234
3235     self.cfg.AddInstance(instance)
3236
3237     _WaitForSync(self.cfg, instance)
3238
3239     return 0
3240
3241
3242 class LURemoveMDDRBDComponent(LogicalUnit):
3243   """Remove a component from a remote_raid1 disk.
3244
3245   """
3246   HPATH = "mirror-remove"
3247   HTYPE = constants.HTYPE_INSTANCE
3248   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3249
3250   def BuildHooksEnv(self):
3251     """Build hooks env.
3252
3253     This runs on the master, the primary and all the secondaries.
3254
3255     """
3256     env = {
3257       "DISK_NAME": self.op.disk_name,
3258       "DISK_ID": self.op.disk_id,
3259       "OLD_SECONDARY": self.old_secondary,
3260       }
3261     env.update(_BuildInstanceHookEnvByObject(self.instance))
3262     nl = [self.sstore.GetMasterNode(),
3263           self.instance.primary_node] + list(self.instance.secondary_nodes)
3264     return env, nl, nl
3265
3266   def CheckPrereq(self):
3267     """Check prerequisites.
3268
3269     This checks that the instance is in the cluster.
3270
3271     """
3272     instance = self.cfg.GetInstanceInfo(
3273       self.cfg.ExpandInstanceName(self.op.instance_name))
3274     if instance is None:
3275       raise errors.OpPrereqError("Instance '%s' not known" %
3276                                  self.op.instance_name)
3277     self.instance = instance
3278
3279     if instance.disk_template != constants.DT_REMOTE_RAID1:
3280       raise errors.OpPrereqError("Instance's disk layout is not"
3281                                  " remote_raid1.")
3282     for disk in instance.disks:
3283       if disk.iv_name == self.op.disk_name:
3284         break
3285     else:
3286       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3287                                  " instance." % self.op.disk_name)
3288     for child in disk.children:
3289       if (child.dev_type == constants.LD_DRBD7 and
3290           child.logical_id[2] == self.op.disk_id):
3291         break
3292     else:
3293       raise errors.OpPrereqError("Can't find the device with this port.")
3294
3295     if len(disk.children) < 2:
3296       raise errors.OpPrereqError("Cannot remove the last component from"
3297                                  " a mirror.")
3298     self.disk = disk
3299     self.child = child
3300     if self.child.logical_id[0] == instance.primary_node:
3301       oid = 1
3302     else:
3303       oid = 0
3304     self.old_secondary = self.child.logical_id[oid]
3305
3306   def Exec(self, feedback_fn):
3307     """Remove the mirror component
3308
3309     """
3310     instance = self.instance
3311     disk = self.disk
3312     child = self.child
3313     logger.Info("remove mirror component")
3314     self.cfg.SetDiskID(disk, instance.primary_node)
3315     if not rpc.call_blockdev_removechildren(instance.primary_node,
3316                                             disk, [child]):
3317       raise errors.OpExecError("Can't remove child from mirror.")
3318
3319     for node in child.logical_id[:2]:
3320       self.cfg.SetDiskID(child, node)
3321       if not rpc.call_blockdev_remove(node, child):
3322         logger.Error("Warning: failed to remove device from node %s,"
3323                      " continuing operation." % node)
3324
3325     disk.children.remove(child)
3326     self.cfg.AddInstance(instance)
3327
3328
3329 class LUReplaceDisks(LogicalUnit):
3330   """Replace the disks of an instance.
3331
3332   """
3333   HPATH = "mirrors-replace"
3334   HTYPE = constants.HTYPE_INSTANCE
3335   _OP_REQP = ["instance_name", "mode", "disks"]
3336
3337   def BuildHooksEnv(self):
3338     """Build hooks env.
3339
3340     This runs on the master, the primary and all the secondaries.
3341
3342     """
3343     env = {
3344       "MODE": self.op.mode,
3345       "NEW_SECONDARY": self.op.remote_node,
3346       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3347       }
3348     env.update(_BuildInstanceHookEnvByObject(self.instance))
3349     nl = [self.sstore.GetMasterNode(),
3350           self.instance.primary_node] + list(self.instance.secondary_nodes)
3351     return env, nl, nl
3352
3353   def CheckPrereq(self):
3354     """Check prerequisites.
3355
3356     This checks that the instance is in the cluster.
3357
3358     """
3359     instance = self.cfg.GetInstanceInfo(
3360       self.cfg.ExpandInstanceName(self.op.instance_name))
3361     if instance is None:
3362       raise errors.OpPrereqError("Instance '%s' not known" %
3363                                  self.op.instance_name)
3364     self.instance = instance
3365
3366     if instance.disk_template not in constants.DTS_NET_MIRROR:
3367       raise errors.OpPrereqError("Instance's disk layout is not"
3368                                  " network mirrored.")
3369
3370     if len(instance.secondary_nodes) != 1:
3371       raise errors.OpPrereqError("The instance has a strange layout,"
3372                                  " expected one secondary but found %d" %
3373                                  len(instance.secondary_nodes))
3374
3375     self.sec_node = instance.secondary_nodes[0]
3376
3377     remote_node = getattr(self.op, "remote_node", None)
3378     if remote_node is not None:
3379       remote_node = self.cfg.ExpandNodeName(remote_node)
3380       if remote_node is None:
3381         raise errors.OpPrereqError("Node '%s' not known" %
3382                                    self.op.remote_node)
3383       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3384     else:
3385       self.remote_node_info = None
3386     if remote_node == instance.primary_node:
3387       raise errors.OpPrereqError("The specified node is the primary node of"
3388                                  " the instance.")
3389     elif remote_node == self.sec_node:
3390       # the user gave the current secondary, switch to
3391       # 'no-replace-secondary' mode
3392       remote_node = None
3393     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3394         self.op.mode != constants.REPLACE_DISK_ALL):
3395       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3396                                  " disks replacement, not individual ones")
3397     if instance.disk_template == constants.DT_DRBD8:
3398       if self.op.mode == constants.REPLACE_DISK_ALL:
3399         raise errors.OpPrereqError("Template 'drbd8' only allows primary or"
3400                                    " secondary disk replacement, not"
3401                                    " both at once")
3402       elif self.op.mode == constants.REPLACE_DISK_PRI:
3403         if remote_node is not None:
3404           raise errors.OpPrereqError("Template 'drbd8' does not allow changing"
3405                                      " the secondary while doing a primary"
3406                                      " node disk replacement")
3407         self.tgt_node = instance.primary_node
3408       elif self.op.mode == constants.REPLACE_DISK_SEC:
3409         self.new_node = remote_node # this can be None, in which case
3410                                     # we don't change the secondary
3411         self.tgt_node = instance.secondary_nodes[0]
3412       else:
3413         raise errors.ProgrammerError("Unhandled disk replace mode")
3414
3415     for name in self.op.disks:
3416       if instance.FindDisk(name) is None:
3417         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3418                                    (name, instance.name))
3419     self.op.remote_node = remote_node
3420
3421   def _ExecRR1(self, feedback_fn):
3422     """Replace the disks of an instance.
3423
3424     """
3425     instance = self.instance
3426     iv_names = {}
3427     # start of work
3428     if self.op.remote_node is None:
3429       remote_node = self.sec_node
3430     else:
3431       remote_node = self.op.remote_node
3432     cfg = self.cfg
3433     for dev in instance.disks:
3434       size = dev.size
3435       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3436       names = _GenerateUniqueNames(cfg, lv_names)
3437       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3438                                        remote_node, size, names)
3439       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3440       logger.Info("adding new mirror component on secondary for %s" %
3441                   dev.iv_name)
3442       #HARDCODE
3443       if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3444                                         _GetInstanceInfoText(instance)):
3445         raise errors.OpExecError("Failed to create new component on"
3446                                  " secondary node %s\n"
3447                                  "Full abort, cleanup manually!" %
3448                                  remote_node)
3449
3450       logger.Info("adding new mirror component on primary")
3451       #HARDCODE
3452       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3453                                       _GetInstanceInfoText(instance)):
3454         # remove secondary dev
3455         cfg.SetDiskID(new_drbd, remote_node)
3456         rpc.call_blockdev_remove(remote_node, new_drbd)
3457         raise errors.OpExecError("Failed to create volume on primary!\n"
3458                                  "Full abort, cleanup manually!!")
3459
3460       # the device exists now
3461       # call the primary node to add the mirror to md
3462       logger.Info("adding new mirror component to md")
3463       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3464                                            [new_drbd]):
3465         logger.Error("Can't add mirror compoment to md!")
3466         cfg.SetDiskID(new_drbd, remote_node)
3467         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3468           logger.Error("Can't rollback on secondary")
3469         cfg.SetDiskID(new_drbd, instance.primary_node)
3470         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3471           logger.Error("Can't rollback on primary")
3472         raise errors.OpExecError("Full abort, cleanup manually!!")
3473
3474       dev.children.append(new_drbd)
3475       cfg.AddInstance(instance)
3476
3477     # this can fail as the old devices are degraded and _WaitForSync
3478     # does a combined result over all disks, so we don't check its
3479     # return value
3480     _WaitForSync(cfg, instance, unlock=True)
3481
3482     # so check manually all the devices
3483     for name in iv_names:
3484       dev, child, new_drbd = iv_names[name]
3485       cfg.SetDiskID(dev, instance.primary_node)
3486       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3487       if is_degr:
3488         raise errors.OpExecError("MD device %s is degraded!" % name)
3489       cfg.SetDiskID(new_drbd, instance.primary_node)
3490       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3491       if is_degr:
3492         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3493
3494     for name in iv_names:
3495       dev, child, new_drbd = iv_names[name]
3496       logger.Info("remove mirror %s component" % name)
3497       cfg.SetDiskID(dev, instance.primary_node)
3498       if not rpc.call_blockdev_removechildren(instance.primary_node,
3499                                               dev, [child]):
3500         logger.Error("Can't remove child from mirror, aborting"
3501                      " *this device cleanup*.\nYou need to cleanup manually!!")
3502         continue
3503
3504       for node in child.logical_id[:2]:
3505         logger.Info("remove child device on %s" % node)
3506         cfg.SetDiskID(child, node)
3507         if not rpc.call_blockdev_remove(node, child):
3508           logger.Error("Warning: failed to remove device from node %s,"
3509                        " continuing operation." % node)
3510
3511       dev.children.remove(child)
3512
3513       cfg.AddInstance(instance)
3514
3515   def _ExecD8DiskOnly(self, feedback_fn):
3516     """Replace a disk on the primary or secondary for dbrd8.
3517
3518     The algorithm for replace is quite complicated:
3519       - for each disk to be replaced:
3520         - create new LVs on the target node with unique names
3521         - detach old LVs from the drbd device
3522         - rename old LVs to name_replaced.<time_t>
3523         - rename new LVs to old LVs
3524         - attach the new LVs (with the old names now) to the drbd device
3525       - wait for sync across all devices
3526       - for each modified disk:
3527         - remove old LVs (which have the name name_replaces.<time_t>)
3528
3529     Failures are not very well handled.
3530     """
3531     instance = self.instance
3532     iv_names = {}
3533     vgname = self.cfg.GetVGName()
3534     # start of work
3535     cfg = self.cfg
3536     tgt_node = self.tgt_node
3537     for dev in instance.disks:
3538       if not dev.iv_name in self.op.disks:
3539         continue
3540       size = dev.size
3541       cfg.SetDiskID(dev, tgt_node)
3542       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3543       names = _GenerateUniqueNames(cfg, lv_names)
3544       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3545                              logical_id=(vgname, names[0]))
3546       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3547                              logical_id=(vgname, names[1]))
3548       new_lvs = [lv_data, lv_meta]
3549       old_lvs = dev.children
3550       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3551       logger.Info("adding new local storage on %s for %s" %
3552                   (tgt_node, dev.iv_name))
3553       # since we *always* want to create this LV, we use the
3554       # _Create...OnPrimary (which forces the creation), even if we
3555       # are talking about the secondary node
3556       for new_lv in new_lvs:
3557         if not _CreateBlockDevOnPrimary(cfg, tgt_node, new_lv,
3558                                         _GetInstanceInfoText(instance)):
3559           raise errors.OpExecError("Failed to create new LV named '%s' on"
3560                                    " node '%s'" %
3561                                    (new_lv.logical_id[1], tgt_node))
3562
3563       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3564         raise errors.OpExecError("Can't detach drbd from local storage on node"
3565                                  " %s for device %s" % (tgt_node, dev.iv_name))
3566       dev.children = []
3567       cfg.Update(instance)
3568
3569       # ok, we created the new LVs, so now we know we have the needed
3570       # storage; as such, we proceed on the target node to rename
3571       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3572       # using the assumption than logical_id == physical_id (which in
3573       # turn is the unique_id on that node)
3574       temp_suffix = int(time.time())
3575       logger.Info("renaming the old LVs on the target node")
3576       ren_fn = lambda d, suff: (d.physical_id[0],
3577                                 d.physical_id[1] + "_replaced-%s" % suff)
3578       rlist = [(disk, ren_fn(disk, temp_suffix)) for disk in old_lvs]
3579       if not rpc.call_blockdev_rename(tgt_node, rlist):
3580         logger.Error("Can't rename old LVs on node %s" % tgt_node)
3581         do_change_old = False
3582       else:
3583         do_change_old = True
3584       # now we rename the new LVs to the old LVs
3585       logger.Info("renaming the new LVs on the target node")
3586       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3587       if not rpc.call_blockdev_rename(tgt_node, rlist):
3588         logger.Error("Can't rename new LVs on node %s" % tgt_node)
3589       else:
3590         for old, new in zip(old_lvs, new_lvs):
3591           new.logical_id = old.logical_id
3592           cfg.SetDiskID(new, tgt_node)
3593
3594       if do_change_old:
3595         for disk in old_lvs:
3596           disk.logical_id = ren_fn(disk, temp_suffix)
3597           cfg.SetDiskID(disk, tgt_node)
3598
3599       # now that the new lvs have the old name, we can add them to the device
3600       logger.Info("adding new mirror component on %s" % tgt_node)
3601       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3602         logger.Error("Can't add local storage to drbd!")
3603         for new_lv in new_lvs:
3604           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3605             logger.Error("Can't rollback device %s")
3606         return
3607
3608       dev.children = new_lvs
3609       cfg.Update(instance)
3610
3611
3612     # this can fail as the old devices are degraded and _WaitForSync
3613     # does a combined result over all disks, so we don't check its
3614     # return value
3615     logger.Info("Done changing drbd configs, waiting for sync")
3616     _WaitForSync(cfg, instance, unlock=True)
3617
3618     # so check manually all the devices
3619     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3620       cfg.SetDiskID(dev, instance.primary_node)
3621       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3622       if is_degr:
3623         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3624
3625     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3626       logger.Info("remove logical volumes for %s" % name)
3627       for lv in old_lvs:
3628         cfg.SetDiskID(lv, tgt_node)
3629         if not rpc.call_blockdev_remove(tgt_node, lv):
3630           logger.Error("Can't cleanup child device, skipping. You need to"
3631                        " fix manually!")
3632           continue
3633
3634   def _ExecD8Secondary(self, feedback_fn):
3635     """Replace the secondary node for drbd8.
3636
3637     The algorithm for replace is quite complicated:
3638       - for all disks of the instance:
3639         - create new LVs on the new node with same names
3640         - shutdown the drbd device on the old secondary
3641         - disconnect the drbd network on the primary
3642         - create the drbd device on the new secondary
3643         - network attach the drbd on the primary, using an artifice:
3644           the drbd code for Attach() will connect to the network if it
3645           finds a device which is connected to the good local disks but
3646           not network enabled
3647       - wait for sync across all devices
3648       - remove all disks from the old secondary
3649
3650     Failures are not very well handled.
3651     """
3652     instance = self.instance
3653     iv_names = {}
3654     vgname = self.cfg.GetVGName()
3655     # start of work
3656     cfg = self.cfg
3657     old_node = self.tgt_node
3658     new_node = self.new_node
3659     pri_node = instance.primary_node
3660     for dev in instance.disks:
3661       size = dev.size
3662       logger.Info("adding new local storage on %s for %s" %
3663                   (new_node, dev.iv_name))
3664       # since we *always* want to create this LV, we use the
3665       # _Create...OnPrimary (which forces the creation), even if we
3666       # are talking about the secondary node
3667       for new_lv in dev.children:
3668         if not _CreateBlockDevOnPrimary(cfg, new_node, new_lv,
3669                                         _GetInstanceInfoText(instance)):
3670           raise errors.OpExecError("Failed to create new LV named '%s' on"
3671                                    " node '%s'" %
3672                                    (new_lv.logical_id[1], new_node))
3673
3674       # create new devices on new_node
3675       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3676                               logical_id=(pri_node, new_node,
3677                                           dev.logical_id[2]),
3678                               children=dev.children)
3679       if not _CreateBlockDevOnSecondary(cfg, new_node, new_drbd, False,
3680                                       _GetInstanceInfoText(instance)):
3681         raise errors.OpExecError("Failed to create new DRBD on"
3682                                  " node '%s'" % new_node)
3683
3684       # we have new devices, shutdown the drbd on the old secondary
3685       cfg.SetDiskID(dev, old_node)
3686       if not rpc.call_blockdev_shutdown(old_node, dev):
3687         raise errors.OpExecError("Failed to shutdown DRBD on old node")
3688
3689       # we have new storage, we 'rename' the network on the primary
3690       cfg.SetDiskID(dev, pri_node)
3691       # rename to the ip of the new node
3692       new_uid = list(dev.physical_id)
3693       new_uid[2] = self.remote_node_info.secondary_ip
3694       rlist = [(dev, tuple(new_uid))]
3695       if not rpc.call_blockdev_rename(pri_node, rlist):
3696         raise errors.OpExecError("Can't detach re-attach drbd %s on node"
3697                                  " %s from %s to %s" %
3698                                  (dev.iv_name, pri_node, old_node, new_node))
3699       dev.logical_id = (pri_node, new_node, dev.logical_id[2])
3700       cfg.SetDiskID(dev, pri_node)
3701       cfg.Update(instance)
3702
3703       iv_names[dev.iv_name] = (dev, dev.children)
3704
3705     # this can fail as the old devices are degraded and _WaitForSync
3706     # does a combined result over all disks, so we don't check its
3707     # return value
3708     logger.Info("Done changing drbd configs, waiting for sync")
3709     _WaitForSync(cfg, instance, unlock=True)
3710
3711     # so check manually all the devices
3712     for name, (dev, old_lvs) in iv_names.iteritems():
3713       cfg.SetDiskID(dev, pri_node)
3714       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3715       if is_degr:
3716         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3717
3718     for name, (dev, old_lvs) in iv_names.iteritems():
3719       logger.Info("remove logical volumes for %s" % name)
3720       for lv in old_lvs:
3721         cfg.SetDiskID(lv, old_node)
3722         if not rpc.call_blockdev_remove(old_node, lv):
3723           logger.Error("Can't cleanup child device, skipping. You need to"
3724                        " fix manually!")
3725           continue
3726
3727   def Exec(self, feedback_fn):
3728     """Execute disk replacement.
3729
3730     This dispatches the disk replacement to the appropriate handler.
3731
3732     """
3733     instance = self.instance
3734     if instance.disk_template == constants.DT_REMOTE_RAID1:
3735       fn = self._ExecRR1
3736     elif instance.disk_template == constants.DT_DRBD8:
3737       if self.op.remote_node is None:
3738         fn = self._ExecD8DiskOnly
3739       else:
3740         fn = self._ExecD8Secondary
3741     else:
3742       raise errors.ProgrammerError("Unhandled disk replacement case")
3743     return fn(feedback_fn)
3744
3745
3746 class LUQueryInstanceData(NoHooksLU):
3747   """Query runtime instance data.
3748
3749   """
3750   _OP_REQP = ["instances"]
3751
3752   def CheckPrereq(self):
3753     """Check prerequisites.
3754
3755     This only checks the optional instance list against the existing names.
3756
3757     """
3758     if not isinstance(self.op.instances, list):
3759       raise errors.OpPrereqError("Invalid argument type 'instances'")
3760     if self.op.instances:
3761       self.wanted_instances = []
3762       names = self.op.instances
3763       for name in names:
3764         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3765         if instance is None:
3766           raise errors.OpPrereqError("No such instance name '%s'" % name)
3767       self.wanted_instances.append(instance)
3768     else:
3769       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3770                                in self.cfg.GetInstanceList()]
3771     return
3772
3773
3774   def _ComputeDiskStatus(self, instance, snode, dev):
3775     """Compute block device status.
3776
3777     """
3778     self.cfg.SetDiskID(dev, instance.primary_node)
3779     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3780     if dev.dev_type in constants.LDS_DRBD:
3781       # we change the snode then (otherwise we use the one passed in)
3782       if dev.logical_id[0] == instance.primary_node:
3783         snode = dev.logical_id[1]
3784       else:
3785         snode = dev.logical_id[0]
3786
3787     if snode:
3788       self.cfg.SetDiskID(dev, snode)
3789       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3790     else:
3791       dev_sstatus = None
3792
3793     if dev.children:
3794       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3795                       for child in dev.children]
3796     else:
3797       dev_children = []
3798
3799     data = {
3800       "iv_name": dev.iv_name,
3801       "dev_type": dev.dev_type,
3802       "logical_id": dev.logical_id,
3803       "physical_id": dev.physical_id,
3804       "pstatus": dev_pstatus,
3805       "sstatus": dev_sstatus,
3806       "children": dev_children,
3807       }
3808
3809     return data
3810
3811   def Exec(self, feedback_fn):
3812     """Gather and return data"""
3813     result = {}
3814     for instance in self.wanted_instances:
3815       remote_info = rpc.call_instance_info(instance.primary_node,
3816                                                 instance.name)
3817       if remote_info and "state" in remote_info:
3818         remote_state = "up"
3819       else:
3820         remote_state = "down"
3821       if instance.status == "down":
3822         config_state = "down"
3823       else:
3824         config_state = "up"
3825
3826       disks = [self._ComputeDiskStatus(instance, None, device)
3827                for device in instance.disks]
3828
3829       idict = {
3830         "name": instance.name,
3831         "config_state": config_state,
3832         "run_state": remote_state,
3833         "pnode": instance.primary_node,
3834         "snodes": instance.secondary_nodes,
3835         "os": instance.os,
3836         "memory": instance.memory,
3837         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3838         "disks": disks,
3839         "vcpus": instance.vcpus,
3840         }
3841
3842       result[instance.name] = idict
3843
3844     return result
3845
3846
3847 class LUSetInstanceParms(LogicalUnit):
3848   """Modifies an instances's parameters.
3849
3850   """
3851   HPATH = "instance-modify"
3852   HTYPE = constants.HTYPE_INSTANCE
3853   _OP_REQP = ["instance_name"]
3854
3855   def BuildHooksEnv(self):
3856     """Build hooks env.
3857
3858     This runs on the master, primary and secondaries.
3859
3860     """
3861     args = dict()
3862     if self.mem:
3863       args['memory'] = self.mem
3864     if self.vcpus:
3865       args['vcpus'] = self.vcpus
3866     if self.do_ip or self.do_bridge:
3867       if self.do_ip:
3868         ip = self.ip
3869       else:
3870         ip = self.instance.nics[0].ip
3871       if self.bridge:
3872         bridge = self.bridge
3873       else:
3874         bridge = self.instance.nics[0].bridge
3875       args['nics'] = [(ip, bridge)]
3876     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3877     nl = [self.sstore.GetMasterNode(),
3878           self.instance.primary_node] + list(self.instance.secondary_nodes)
3879     return env, nl, nl
3880
3881   def CheckPrereq(self):
3882     """Check prerequisites.
3883
3884     This only checks the instance list against the existing names.
3885
3886     """
3887     self.mem = getattr(self.op, "mem", None)
3888     self.vcpus = getattr(self.op, "vcpus", None)
3889     self.ip = getattr(self.op, "ip", None)
3890     self.bridge = getattr(self.op, "bridge", None)
3891     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3892       raise errors.OpPrereqError("No changes submitted")
3893     if self.mem is not None:
3894       try:
3895         self.mem = int(self.mem)
3896       except ValueError, err:
3897         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3898     if self.vcpus is not None:
3899       try:
3900         self.vcpus = int(self.vcpus)
3901       except ValueError, err:
3902         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3903     if self.ip is not None:
3904       self.do_ip = True
3905       if self.ip.lower() == "none":
3906         self.ip = None
3907       else:
3908         if not utils.IsValidIP(self.ip):
3909           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3910     else:
3911       self.do_ip = False
3912     self.do_bridge = (self.bridge is not None)
3913
3914     instance = self.cfg.GetInstanceInfo(
3915       self.cfg.ExpandInstanceName(self.op.instance_name))
3916     if instance is None:
3917       raise errors.OpPrereqError("No such instance name '%s'" %
3918                                  self.op.instance_name)
3919     self.op.instance_name = instance.name
3920     self.instance = instance
3921     return
3922
3923   def Exec(self, feedback_fn):
3924     """Modifies an instance.
3925
3926     All parameters take effect only at the next restart of the instance.
3927     """
3928     result = []
3929     instance = self.instance
3930     if self.mem:
3931       instance.memory = self.mem
3932       result.append(("mem", self.mem))
3933     if self.vcpus:
3934       instance.vcpus = self.vcpus
3935       result.append(("vcpus",  self.vcpus))
3936     if self.do_ip:
3937       instance.nics[0].ip = self.ip
3938       result.append(("ip", self.ip))
3939     if self.bridge:
3940       instance.nics[0].bridge = self.bridge
3941       result.append(("bridge", self.bridge))
3942
3943     self.cfg.AddInstance(instance)
3944
3945     return result
3946
3947
3948 class LUQueryExports(NoHooksLU):
3949   """Query the exports list
3950
3951   """
3952   _OP_REQP = []
3953
3954   def CheckPrereq(self):
3955     """Check that the nodelist contains only existing nodes.
3956
3957     """
3958     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3959
3960   def Exec(self, feedback_fn):
3961     """Compute the list of all the exported system images.
3962
3963     Returns:
3964       a dictionary with the structure node->(export-list)
3965       where export-list is a list of the instances exported on
3966       that node.
3967
3968     """
3969     return rpc.call_export_list(self.nodes)
3970
3971
3972 class LUExportInstance(LogicalUnit):
3973   """Export an instance to an image in the cluster.
3974
3975   """
3976   HPATH = "instance-export"
3977   HTYPE = constants.HTYPE_INSTANCE
3978   _OP_REQP = ["instance_name", "target_node", "shutdown"]
3979
3980   def BuildHooksEnv(self):
3981     """Build hooks env.
3982
3983     This will run on the master, primary node and target node.
3984
3985     """
3986     env = {
3987       "EXPORT_NODE": self.op.target_node,
3988       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3989       }
3990     env.update(_BuildInstanceHookEnvByObject(self.instance))
3991     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3992           self.op.target_node]
3993     return env, nl, nl
3994
3995   def CheckPrereq(self):
3996     """Check prerequisites.
3997
3998     This checks that the instance name is a valid one.
3999
4000     """
4001     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4002     self.instance = self.cfg.GetInstanceInfo(instance_name)
4003     if self.instance is None:
4004       raise errors.OpPrereqError("Instance '%s' not found" %
4005                                  self.op.instance_name)
4006
4007     # node verification
4008     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4009     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4010
4011     if self.dst_node is None:
4012       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4013                                  self.op.target_node)
4014     self.op.target_node = self.dst_node.name
4015
4016   def Exec(self, feedback_fn):
4017     """Export an instance to an image in the cluster.
4018
4019     """
4020     instance = self.instance
4021     dst_node = self.dst_node
4022     src_node = instance.primary_node
4023     # shutdown the instance, unless requested not to do so
4024     if self.op.shutdown:
4025       op = opcodes.OpShutdownInstance(instance_name=instance.name)
4026       self.processor.ChainOpCode(op)
4027
4028     vgname = self.cfg.GetVGName()
4029
4030     snap_disks = []
4031
4032     try:
4033       for disk in instance.disks:
4034         if disk.iv_name == "sda":
4035           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4036           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4037
4038           if not new_dev_name:
4039             logger.Error("could not snapshot block device %s on node %s" %
4040                          (disk.logical_id[1], src_node))
4041           else:
4042             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4043                                       logical_id=(vgname, new_dev_name),
4044                                       physical_id=(vgname, new_dev_name),
4045                                       iv_name=disk.iv_name)
4046             snap_disks.append(new_dev)
4047
4048     finally:
4049       if self.op.shutdown:
4050         op = opcodes.OpStartupInstance(instance_name=instance.name,
4051                                        force=False)
4052         self.processor.ChainOpCode(op)
4053
4054     # TODO: check for size
4055
4056     for dev in snap_disks:
4057       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4058                                            instance):
4059         logger.Error("could not export block device %s from node"
4060                      " %s to node %s" %
4061                      (dev.logical_id[1], src_node, dst_node.name))
4062       if not rpc.call_blockdev_remove(src_node, dev):
4063         logger.Error("could not remove snapshot block device %s from"
4064                      " node %s" % (dev.logical_id[1], src_node))
4065
4066     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4067       logger.Error("could not finalize export for instance %s on node %s" %
4068                    (instance.name, dst_node.name))
4069
4070     nodelist = self.cfg.GetNodeList()
4071     nodelist.remove(dst_node.name)
4072
4073     # on one-node clusters nodelist will be empty after the removal
4074     # if we proceed the backup would be removed because OpQueryExports
4075     # substitutes an empty list with the full cluster node list.
4076     if nodelist:
4077       op = opcodes.OpQueryExports(nodes=nodelist)
4078       exportlist = self.processor.ChainOpCode(op)
4079       for node in exportlist:
4080         if instance.name in exportlist[node]:
4081           if not rpc.call_export_remove(node, instance.name):
4082             logger.Error("could not remove older export for instance %s"
4083                          " on node %s" % (instance.name, node))
4084
4085
4086 class TagsLU(NoHooksLU):
4087   """Generic tags LU.
4088
4089   This is an abstract class which is the parent of all the other tags LUs.
4090
4091   """
4092   def CheckPrereq(self):
4093     """Check prerequisites.
4094
4095     """
4096     if self.op.kind == constants.TAG_CLUSTER:
4097       self.target = self.cfg.GetClusterInfo()
4098     elif self.op.kind == constants.TAG_NODE:
4099       name = self.cfg.ExpandNodeName(self.op.name)
4100       if name is None:
4101         raise errors.OpPrereqError("Invalid node name (%s)" %
4102                                    (self.op.name,))
4103       self.op.name = name
4104       self.target = self.cfg.GetNodeInfo(name)
4105     elif self.op.kind == constants.TAG_INSTANCE:
4106       name = self.cfg.ExpandInstanceName(self.op.name)
4107       if name is None:
4108         raise errors.OpPrereqError("Invalid instance name (%s)" %
4109                                    (self.op.name,))
4110       self.op.name = name
4111       self.target = self.cfg.GetInstanceInfo(name)
4112     else:
4113       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4114                                  str(self.op.kind))
4115
4116
4117 class LUGetTags(TagsLU):
4118   """Returns the tags of a given object.
4119
4120   """
4121   _OP_REQP = ["kind", "name"]
4122
4123   def Exec(self, feedback_fn):
4124     """Returns the tag list.
4125
4126     """
4127     return self.target.GetTags()
4128
4129
4130 class LUAddTags(TagsLU):
4131   """Sets a tag on a given object.
4132
4133   """
4134   _OP_REQP = ["kind", "name", "tags"]
4135
4136   def CheckPrereq(self):
4137     """Check prerequisites.
4138
4139     This checks the type and length of the tag name and value.
4140
4141     """
4142     TagsLU.CheckPrereq(self)
4143     for tag in self.op.tags:
4144       objects.TaggableObject.ValidateTag(tag)
4145
4146   def Exec(self, feedback_fn):
4147     """Sets the tag.
4148
4149     """
4150     try:
4151       for tag in self.op.tags:
4152         self.target.AddTag(tag)
4153     except errors.TagError, err:
4154       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4155     try:
4156       self.cfg.Update(self.target)
4157     except errors.ConfigurationError:
4158       raise errors.OpRetryError("There has been a modification to the"
4159                                 " config file and the operation has been"
4160                                 " aborted. Please retry.")
4161
4162
4163 class LUDelTags(TagsLU):
4164   """Delete a list of tags from a given object.
4165
4166   """
4167   _OP_REQP = ["kind", "name", "tags"]
4168
4169   def CheckPrereq(self):
4170     """Check prerequisites.
4171
4172     This checks that we have the given tag.
4173
4174     """
4175     TagsLU.CheckPrereq(self)
4176     for tag in self.op.tags:
4177       objects.TaggableObject.ValidateTag(tag)
4178     del_tags = frozenset(self.op.tags)
4179     cur_tags = self.target.GetTags()
4180     if not del_tags <= cur_tags:
4181       diff_tags = del_tags - cur_tags
4182       diff_names = ["'%s'" % tag for tag in diff_tags]
4183       diff_names.sort()
4184       raise errors.OpPrereqError("Tag(s) %s not found" %
4185                                  (",".join(diff_names)))
4186
4187   def Exec(self, feedback_fn):
4188     """Remove the tag from the object.
4189
4190     """
4191     for tag in self.op.tags:
4192       self.target.RemoveTag(tag)
4193     try:
4194       self.cfg.Update(self.target)
4195     except errors.ConfigurationError:
4196       raise errors.OpRetryError("There has been a modification to the"
4197                                 " config file and the operation has been"
4198                                 " aborted. Please retry.")