Fix gnt-instance info i1 i2 ...
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46 class LogicalUnit(object):
47   """Logical Unit base class.
48
49   Subclasses must follow these rules:
50     - implement CheckPrereq which also fills in the opcode instance
51       with all the fields (even if as None)
52     - implement Exec
53     - implement BuildHooksEnv
54     - redefine HPATH and HTYPE
55     - optionally redefine their run requirements (REQ_CLUSTER,
56       REQ_MASTER); note that all commands require root permissions
57
58   """
59   HPATH = None
60   HTYPE = None
61   _OP_REQP = []
62   REQ_CLUSTER = True
63   REQ_MASTER = True
64
65   def __init__(self, processor, op, cfg, sstore):
66     """Constructor for LogicalUnit.
67
68     This needs to be overriden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = cfg
75     self.sstore = sstore
76     for attr_name in self._OP_REQP:
77       attr_val = getattr(op, attr_name, None)
78       if attr_val is None:
79         raise errors.OpPrereqError("Required parameter '%s' missing" %
80                                    attr_name)
81     if self.REQ_CLUSTER:
82       if not cfg.IsCluster():
83         raise errors.OpPrereqError("Cluster not initialized yet,"
84                                    " use 'gnt-cluster init' first.")
85       if self.REQ_MASTER:
86         master = sstore.GetMasterNode()
87         if master != utils.HostInfo().name:
88           raise errors.OpPrereqError("Commands must be run on the master"
89                                      " node %s" % master)
90
91   def CheckPrereq(self):
92     """Check prerequisites for this LU.
93
94     This method should check that the prerequisites for the execution
95     of this LU are fulfilled. It can do internode communication, but
96     it should be idempotent - no cluster or system changes are
97     allowed.
98
99     The method should raise errors.OpPrereqError in case something is
100     not fulfilled. Its return value is ignored.
101
102     This method should also update all the parameters of the opcode to
103     their canonical form; e.g. a short node name must be fully
104     expanded after this method has successfully completed (so that
105     hooks, logging, etc. work correctly).
106
107     """
108     raise NotImplementedError
109
110   def Exec(self, feedback_fn):
111     """Execute the LU.
112
113     This method should implement the actual work. It should raise
114     errors.OpExecError for failures that are somewhat dealt with in
115     code, or expected.
116
117     """
118     raise NotImplementedError
119
120   def BuildHooksEnv(self):
121     """Build hooks environment for this LU.
122
123     This method should return a three-node tuple consisting of: a dict
124     containing the environment that will be used for running the
125     specific hook for this LU, a list of node names on which the hook
126     should run before the execution, and a list of node names on which
127     the hook should run after the execution.
128
129     The keys of the dict must not have 'GANETI_' prefixed as this will
130     be handled in the hooks runner. Also note additional keys will be
131     added by the hooks runner. If the LU doesn't define any
132     environment, an empty dict (and not None) should be returned.
133
134     As for the node lists, the master should not be included in the
135     them, as it will be added by the hooks runner in case this LU
136     requires a cluster to run on (otherwise we don't have a node
137     list). No nodes should be returned as an empty list (and not
138     None).
139
140     Note that if the HPATH for a LU class is None, this function will
141     not be called.
142
143     """
144     raise NotImplementedError
145
146
147 class NoHooksLU(LogicalUnit):
148   """Simple LU which runs no hooks.
149
150   This LU is intended as a parent for other LogicalUnits which will
151   run no hooks, in order to reduce duplicate code.
152
153   """
154   HPATH = None
155   HTYPE = None
156
157   def BuildHooksEnv(self):
158     """Build hooks env.
159
160     This is a no-op, since we don't run hooks.
161
162     """
163     return {}, [], []
164
165
166 def _AddHostToEtcHosts(hostname):
167   """Wrapper around utils.SetEtcHostsEntry.
168
169   """
170   hi = utils.HostInfo(name=hostname)
171   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172
173
174 def _RemoveHostFromEtcHosts(hostname):
175   """Wrapper around utils.RemoveEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181
182
183 def _GetWantedNodes(lu, nodes):
184   """Returns list of checked and expanded node names.
185
186   Args:
187     nodes: List of nodes (strings) or None for all
188
189   """
190   if not isinstance(nodes, list):
191     raise errors.OpPrereqError("Invalid argument type 'nodes'")
192
193   if nodes:
194     wanted = []
195
196     for name in nodes:
197       node = lu.cfg.ExpandNodeName(name)
198       if node is None:
199         raise errors.OpPrereqError("No such node name '%s'" % name)
200       wanted.append(node)
201
202   else:
203     wanted = lu.cfg.GetNodeList()
204   return utils.NiceSort(wanted)
205
206
207 def _GetWantedInstances(lu, instances):
208   """Returns list of checked and expanded instance names.
209
210   Args:
211     instances: List of instances (strings) or None for all
212
213   """
214   if not isinstance(instances, list):
215     raise errors.OpPrereqError("Invalid argument type 'instances'")
216
217   if instances:
218     wanted = []
219
220     for name in instances:
221       instance = lu.cfg.ExpandInstanceName(name)
222       if instance is None:
223         raise errors.OpPrereqError("No such instance name '%s'" % name)
224       wanted.append(instance)
225
226   else:
227     wanted = lu.cfg.GetInstanceList()
228   return utils.NiceSort(wanted)
229
230
231 def _CheckOutputFields(static, dynamic, selected):
232   """Checks whether all selected fields are valid.
233
234   Args:
235     static: Static fields
236     dynamic: Dynamic fields
237
238   """
239   static_fields = frozenset(static)
240   dynamic_fields = frozenset(dynamic)
241
242   all_fields = static_fields | dynamic_fields
243
244   if not all_fields.issuperset(selected):
245     raise errors.OpPrereqError("Unknown output fields selected: %s"
246                                % ",".join(frozenset(selected).
247                                           difference(all_fields)))
248
249
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251                           memory, vcpus, nics):
252   """Builds instance related env variables for hooks from single variables.
253
254   Args:
255     secondary_nodes: List of secondary nodes as strings
256   """
257   env = {
258     "OP_TARGET": name,
259     "INSTANCE_NAME": name,
260     "INSTANCE_PRIMARY": primary_node,
261     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262     "INSTANCE_OS_TYPE": os_type,
263     "INSTANCE_STATUS": status,
264     "INSTANCE_MEMORY": memory,
265     "INSTANCE_VCPUS": vcpus,
266   }
267
268   if nics:
269     nic_count = len(nics)
270     for idx, (ip, bridge, mac) in enumerate(nics):
271       if ip is None:
272         ip = ""
273       env["INSTANCE_NIC%d_IP" % idx] = ip
274       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
276   else:
277     nic_count = 0
278
279   env["INSTANCE_NIC_COUNT"] = nic_count
280
281   return env
282
283
284 def _BuildInstanceHookEnvByObject(instance, override=None):
285   """Builds instance related env variables for hooks from an object.
286
287   Args:
288     instance: objects.Instance object of instance
289     override: dict of values to override
290   """
291   args = {
292     'name': instance.name,
293     'primary_node': instance.primary_node,
294     'secondary_nodes': instance.secondary_nodes,
295     'os_type': instance.os,
296     'status': instance.os,
297     'memory': instance.memory,
298     'vcpus': instance.vcpus,
299     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
300   }
301   if override:
302     args.update(override)
303   return _BuildInstanceHookEnv(**args)
304
305
306 def _UpdateKnownHosts(fullnode, ip, pubkey):
307   """Ensure a node has a correct known_hosts entry.
308
309   Args:
310     fullnode - Fully qualified domain name of host. (str)
311     ip       - IPv4 address of host (str)
312     pubkey   - the public key of the cluster
313
314   """
315   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
316     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
317   else:
318     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
319
320   inthere = False
321
322   save_lines = []
323   add_lines = []
324   removed = False
325
326   for rawline in f:
327     logger.Debug('read %s' % (repr(rawline),))
328
329     parts = rawline.rstrip('\r\n').split()
330
331     # Ignore unwanted lines
332     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
333       fields = parts[0].split(',')
334       key = parts[2]
335
336       haveall = True
337       havesome = False
338       for spec in [ ip, fullnode ]:
339         if spec not in fields:
340           haveall = False
341         if spec in fields:
342           havesome = True
343
344       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
345       if haveall and key == pubkey:
346         inthere = True
347         save_lines.append(rawline)
348         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
349         continue
350
351       if havesome and (not haveall or key != pubkey):
352         removed = True
353         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
354         continue
355
356     save_lines.append(rawline)
357
358   if not inthere:
359     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
360     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
361
362   if removed:
363     save_lines = save_lines + add_lines
364
365     # Write a new file and replace old.
366     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
367                                    constants.DATA_DIR)
368     newfile = os.fdopen(fd, 'w')
369     try:
370       newfile.write(''.join(save_lines))
371     finally:
372       newfile.close()
373     logger.Debug("Wrote new known_hosts.")
374     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
375
376   elif add_lines:
377     # Simply appending a new line will do the trick.
378     f.seek(0, 2)
379     for add in add_lines:
380       f.write(add)
381
382   f.close()
383
384
385 def _HasValidVG(vglist, vgname):
386   """Checks if the volume group list is valid.
387
388   A non-None return value means there's an error, and the return value
389   is the error message.
390
391   """
392   vgsize = vglist.get(vgname, None)
393   if vgsize is None:
394     return "volume group '%s' missing" % vgname
395   elif vgsize < 20480:
396     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
397             (vgname, vgsize))
398   return None
399
400
401 def _InitSSHSetup(node):
402   """Setup the SSH configuration for the cluster.
403
404
405   This generates a dsa keypair for root, adds the pub key to the
406   permitted hosts and adds the hostkey to its own known hosts.
407
408   Args:
409     node: the name of this host as a fqdn
410
411   """
412   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
413
414   for name in priv_key, pub_key:
415     if os.path.exists(name):
416       utils.CreateBackup(name)
417     utils.RemoveFile(name)
418
419   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
420                          "-f", priv_key,
421                          "-q", "-N", ""])
422   if result.failed:
423     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
424                              result.output)
425
426   f = open(pub_key, 'r')
427   try:
428     utils.AddAuthorizedKey(auth_keys, f.read(8192))
429   finally:
430     f.close()
431
432
433 def _InitGanetiServerSetup(ss):
434   """Setup the necessary configuration for the initial node daemon.
435
436   This creates the nodepass file containing the shared password for
437   the cluster and also generates the SSL certificate.
438
439   """
440   # Create pseudo random password
441   randpass = sha.new(os.urandom(64)).hexdigest()
442   # and write it into sstore
443   ss.SetKey(ss.SS_NODED_PASS, randpass)
444
445   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
446                          "-days", str(365*5), "-nodes", "-x509",
447                          "-keyout", constants.SSL_CERT_FILE,
448                          "-out", constants.SSL_CERT_FILE, "-batch"])
449   if result.failed:
450     raise errors.OpExecError("could not generate server ssl cert, command"
451                              " %s had exitcode %s and error message %s" %
452                              (result.cmd, result.exit_code, result.output))
453
454   os.chmod(constants.SSL_CERT_FILE, 0400)
455
456   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
457
458   if result.failed:
459     raise errors.OpExecError("Could not start the node daemon, command %s"
460                              " had exitcode %s and error %s" %
461                              (result.cmd, result.exit_code, result.output))
462
463
464 def _CheckInstanceBridgesExist(instance):
465   """Check that the brigdes needed by an instance exist.
466
467   """
468   # check bridges existance
469   brlist = [nic.bridge for nic in instance.nics]
470   if not rpc.call_bridges_exist(instance.primary_node, brlist):
471     raise errors.OpPrereqError("one or more target bridges %s does not"
472                                " exist on destination node '%s'" %
473                                (brlist, instance.primary_node))
474
475
476 class LUInitCluster(LogicalUnit):
477   """Initialise the cluster.
478
479   """
480   HPATH = "cluster-init"
481   HTYPE = constants.HTYPE_CLUSTER
482   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
483               "def_bridge", "master_netdev"]
484   REQ_CLUSTER = False
485
486   def BuildHooksEnv(self):
487     """Build hooks env.
488
489     Notes: Since we don't require a cluster, we must manually add
490     ourselves in the post-run node list.
491
492     """
493     env = {"OP_TARGET": self.op.cluster_name}
494     return env, [], [self.hostname.name]
495
496   def CheckPrereq(self):
497     """Verify that the passed name is a valid one.
498
499     """
500     if config.ConfigWriter.IsCluster():
501       raise errors.OpPrereqError("Cluster is already initialised")
502
503     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
504       if not os.path.exists(constants.VNC_PASSWORD_FILE):
505         raise errors.OpPrereqError("Please prepare the cluster VNC"
506                                    "password file %s" %
507                                    constants.VNC_PASSWORD_FILE)
508
509     self.hostname = hostname = utils.HostInfo()
510
511     if hostname.ip.startswith("127."):
512       raise errors.OpPrereqError("This host's IP resolves to the private"
513                                  " range (%s). Please fix DNS or /etc/hosts." %
514                                  (hostname.ip,))
515
516     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
517
518     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
519                          constants.DEFAULT_NODED_PORT):
520       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
521                                  " to %s,\nbut this ip address does not"
522                                  " belong to this host."
523                                  " Aborting." % hostname.ip)
524
525     secondary_ip = getattr(self.op, "secondary_ip", None)
526     if secondary_ip and not utils.IsValidIP(secondary_ip):
527       raise errors.OpPrereqError("Invalid secondary ip given")
528     if (secondary_ip and
529         secondary_ip != hostname.ip and
530         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
531                            constants.DEFAULT_NODED_PORT))):
532       raise errors.OpPrereqError("You gave %s as secondary IP,"
533                                  " but it does not belong to this host." %
534                                  secondary_ip)
535     self.secondary_ip = secondary_ip
536
537     # checks presence of the volume group given
538     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
539
540     if vgstatus:
541       raise errors.OpPrereqError("Error: %s" % vgstatus)
542
543     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
544                     self.op.mac_prefix):
545       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
546                                  self.op.mac_prefix)
547
548     if self.op.hypervisor_type not in constants.HYPER_TYPES:
549       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
550                                  self.op.hypervisor_type)
551
552     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
553     if result.failed:
554       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
555                                  (self.op.master_netdev,
556                                   result.output.strip()))
557
558     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
559             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
560       raise errors.OpPrereqError("Init.d script '%s' missing or not"
561                                  " executable." % constants.NODE_INITD_SCRIPT)
562
563   def Exec(self, feedback_fn):
564     """Initialize the cluster.
565
566     """
567     clustername = self.clustername
568     hostname = self.hostname
569
570     # set up the simple store
571     self.sstore = ss = ssconf.SimpleStore()
572     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
573     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
574     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
575     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
576     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
577
578     # set up the inter-node password and certificate
579     _InitGanetiServerSetup(ss)
580
581     # start the master ip
582     rpc.call_node_start_master(hostname.name)
583
584     # set up ssh config and /etc/hosts
585     f = open(constants.SSH_HOST_RSA_PUB, 'r')
586     try:
587       sshline = f.read()
588     finally:
589       f.close()
590     sshkey = sshline.split(" ")[1]
591
592     _AddHostToEtcHosts(hostname.name)
593
594     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
595
596     _InitSSHSetup(hostname.name)
597
598     # init of cluster config file
599     self.cfg = cfgw = config.ConfigWriter()
600     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
601                     sshkey, self.op.mac_prefix,
602                     self.op.vg_name, self.op.def_bridge)
603
604
605 class LUDestroyCluster(NoHooksLU):
606   """Logical unit for destroying the cluster.
607
608   """
609   _OP_REQP = []
610
611   def CheckPrereq(self):
612     """Check prerequisites.
613
614     This checks whether the cluster is empty.
615
616     Any errors are signalled by raising errors.OpPrereqError.
617
618     """
619     master = self.sstore.GetMasterNode()
620
621     nodelist = self.cfg.GetNodeList()
622     if len(nodelist) != 1 or nodelist[0] != master:
623       raise errors.OpPrereqError("There are still %d node(s) in"
624                                  " this cluster." % (len(nodelist) - 1))
625     instancelist = self.cfg.GetInstanceList()
626     if instancelist:
627       raise errors.OpPrereqError("There are still %d instance(s) in"
628                                  " this cluster." % len(instancelist))
629
630   def Exec(self, feedback_fn):
631     """Destroys the cluster.
632
633     """
634     master = self.sstore.GetMasterNode()
635     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
636     utils.CreateBackup(priv_key)
637     utils.CreateBackup(pub_key)
638     rpc.call_node_leave_cluster(master)
639
640
641 class LUVerifyCluster(NoHooksLU):
642   """Verifies the cluster status.
643
644   """
645   _OP_REQP = []
646
647   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
648                   remote_version, feedback_fn):
649     """Run multiple tests against a node.
650
651     Test list:
652       - compares ganeti version
653       - checks vg existance and size > 20G
654       - checks config file checksum
655       - checks ssh to other nodes
656
657     Args:
658       node: name of the node to check
659       file_list: required list of files
660       local_cksum: dictionary of local files and their checksums
661
662     """
663     # compares ganeti version
664     local_version = constants.PROTOCOL_VERSION
665     if not remote_version:
666       feedback_fn(" - ERROR: connection to %s failed" % (node))
667       return True
668
669     if local_version != remote_version:
670       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
671                       (local_version, node, remote_version))
672       return True
673
674     # checks vg existance and size > 20G
675
676     bad = False
677     if not vglist:
678       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
679                       (node,))
680       bad = True
681     else:
682       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
683       if vgstatus:
684         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
685         bad = True
686
687     # checks config file checksum
688     # checks ssh to any
689
690     if 'filelist' not in node_result:
691       bad = True
692       feedback_fn("  - ERROR: node hasn't returned file checksum data")
693     else:
694       remote_cksum = node_result['filelist']
695       for file_name in file_list:
696         if file_name not in remote_cksum:
697           bad = True
698           feedback_fn("  - ERROR: file '%s' missing" % file_name)
699         elif remote_cksum[file_name] != local_cksum[file_name]:
700           bad = True
701           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
702
703     if 'nodelist' not in node_result:
704       bad = True
705       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
706     else:
707       if node_result['nodelist']:
708         bad = True
709         for node in node_result['nodelist']:
710           feedback_fn("  - ERROR: communication with node '%s': %s" %
711                           (node, node_result['nodelist'][node]))
712     hyp_result = node_result.get('hypervisor', None)
713     if hyp_result is not None:
714       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
715     return bad
716
717   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
718     """Verify an instance.
719
720     This function checks to see if the required block devices are
721     available on the instance's node.
722
723     """
724     bad = False
725
726     instancelist = self.cfg.GetInstanceList()
727     if not instance in instancelist:
728       feedback_fn("  - ERROR: instance %s not in instance list %s" %
729                       (instance, instancelist))
730       bad = True
731
732     instanceconfig = self.cfg.GetInstanceInfo(instance)
733     node_current = instanceconfig.primary_node
734
735     node_vol_should = {}
736     instanceconfig.MapLVsByNode(node_vol_should)
737
738     for node in node_vol_should:
739       for volume in node_vol_should[node]:
740         if node not in node_vol_is or volume not in node_vol_is[node]:
741           feedback_fn("  - ERROR: volume %s missing on node %s" %
742                           (volume, node))
743           bad = True
744
745     if not instanceconfig.status == 'down':
746       if not instance in node_instance[node_current]:
747         feedback_fn("  - ERROR: instance %s not running on node %s" %
748                         (instance, node_current))
749         bad = True
750
751     for node in node_instance:
752       if (not node == node_current):
753         if instance in node_instance[node]:
754           feedback_fn("  - ERROR: instance %s should not run on node %s" %
755                           (instance, node))
756           bad = True
757
758     return bad
759
760   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
761     """Verify if there are any unknown volumes in the cluster.
762
763     The .os, .swap and backup volumes are ignored. All other volumes are
764     reported as unknown.
765
766     """
767     bad = False
768
769     for node in node_vol_is:
770       for volume in node_vol_is[node]:
771         if node not in node_vol_should or volume not in node_vol_should[node]:
772           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
773                       (volume, node))
774           bad = True
775     return bad
776
777   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
778     """Verify the list of running instances.
779
780     This checks what instances are running but unknown to the cluster.
781
782     """
783     bad = False
784     for node in node_instance:
785       for runninginstance in node_instance[node]:
786         if runninginstance not in instancelist:
787           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
788                           (runninginstance, node))
789           bad = True
790     return bad
791
792   def CheckPrereq(self):
793     """Check prerequisites.
794
795     This has no prerequisites.
796
797     """
798     pass
799
800   def Exec(self, feedback_fn):
801     """Verify integrity of cluster, performing various test on nodes.
802
803     """
804     bad = False
805     feedback_fn("* Verifying global settings")
806     for msg in self.cfg.VerifyConfig():
807       feedback_fn("  - ERROR: %s" % msg)
808
809     vg_name = self.cfg.GetVGName()
810     nodelist = utils.NiceSort(self.cfg.GetNodeList())
811     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
812     node_volume = {}
813     node_instance = {}
814
815     # FIXME: verify OS list
816     # do local checksums
817     file_names = list(self.sstore.GetFileList())
818     file_names.append(constants.SSL_CERT_FILE)
819     file_names.append(constants.CLUSTER_CONF_FILE)
820     local_checksums = utils.FingerprintFiles(file_names)
821
822     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
823     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
824     all_instanceinfo = rpc.call_instance_list(nodelist)
825     all_vglist = rpc.call_vg_list(nodelist)
826     node_verify_param = {
827       'filelist': file_names,
828       'nodelist': nodelist,
829       'hypervisor': None,
830       }
831     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
832     all_rversion = rpc.call_version(nodelist)
833
834     for node in nodelist:
835       feedback_fn("* Verifying node %s" % node)
836       result = self._VerifyNode(node, file_names, local_checksums,
837                                 all_vglist[node], all_nvinfo[node],
838                                 all_rversion[node], feedback_fn)
839       bad = bad or result
840
841       # node_volume
842       volumeinfo = all_volumeinfo[node]
843
844       if isinstance(volumeinfo, basestring):
845         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
846                     (node, volumeinfo[-400:].encode('string_escape')))
847         bad = True
848         node_volume[node] = {}
849       elif not isinstance(volumeinfo, dict):
850         feedback_fn("  - ERROR: connection to %s failed" % (node,))
851         bad = True
852         continue
853       else:
854         node_volume[node] = volumeinfo
855
856       # node_instance
857       nodeinstance = all_instanceinfo[node]
858       if type(nodeinstance) != list:
859         feedback_fn("  - ERROR: connection to %s failed" % (node,))
860         bad = True
861         continue
862
863       node_instance[node] = nodeinstance
864
865     node_vol_should = {}
866
867     for instance in instancelist:
868       feedback_fn("* Verifying instance %s" % instance)
869       result =  self._VerifyInstance(instance, node_volume, node_instance,
870                                      feedback_fn)
871       bad = bad or result
872
873       inst_config = self.cfg.GetInstanceInfo(instance)
874
875       inst_config.MapLVsByNode(node_vol_should)
876
877     feedback_fn("* Verifying orphan volumes")
878     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
879                                        feedback_fn)
880     bad = bad or result
881
882     feedback_fn("* Verifying remaining instances")
883     result = self._VerifyOrphanInstances(instancelist, node_instance,
884                                          feedback_fn)
885     bad = bad or result
886
887     return int(bad)
888
889
890 class LUVerifyDisks(NoHooksLU):
891   """Verifies the cluster disks status.
892
893   """
894   _OP_REQP = []
895
896   def CheckPrereq(self):
897     """Check prerequisites.
898
899     This has no prerequisites.
900
901     """
902     pass
903
904   def Exec(self, feedback_fn):
905     """Verify integrity of cluster disks.
906
907     """
908     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
909
910     vg_name = self.cfg.GetVGName()
911     nodes = utils.NiceSort(self.cfg.GetNodeList())
912     instances = [self.cfg.GetInstanceInfo(name)
913                  for name in self.cfg.GetInstanceList()]
914
915     nv_dict = {}
916     for inst in instances:
917       inst_lvs = {}
918       if (inst.status != "up" or
919           inst.disk_template not in constants.DTS_NET_MIRROR):
920         continue
921       inst.MapLVsByNode(inst_lvs)
922       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
923       for node, vol_list in inst_lvs.iteritems():
924         for vol in vol_list:
925           nv_dict[(node, vol)] = inst
926
927     if not nv_dict:
928       return result
929
930     node_lvs = rpc.call_volume_list(nodes, vg_name)
931
932     to_act = set()
933     for node in nodes:
934       # node_volume
935       lvs = node_lvs[node]
936
937       if isinstance(lvs, basestring):
938         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
939         res_nlvm[node] = lvs
940       elif not isinstance(lvs, dict):
941         logger.Info("connection to node %s failed or invalid data returned" %
942                     (node,))
943         res_nodes.append(node)
944         continue
945
946       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
947         inst = nv_dict.pop((node, lv_name), None)
948         if (not lv_online and inst is not None
949             and inst.name not in res_instances):
950           res_instances.append(inst.name)
951
952     # any leftover items in nv_dict are missing LVs, let's arrange the
953     # data better
954     for key, inst in nv_dict.iteritems():
955       if inst.name not in res_missing:
956         res_missing[inst.name] = []
957       res_missing[inst.name].append(key)
958
959     return result
960
961
962 class LURenameCluster(LogicalUnit):
963   """Rename the cluster.
964
965   """
966   HPATH = "cluster-rename"
967   HTYPE = constants.HTYPE_CLUSTER
968   _OP_REQP = ["name"]
969
970   def BuildHooksEnv(self):
971     """Build hooks env.
972
973     """
974     env = {
975       "OP_TARGET": self.op.sstore.GetClusterName(),
976       "NEW_NAME": self.op.name,
977       }
978     mn = self.sstore.GetMasterNode()
979     return env, [mn], [mn]
980
981   def CheckPrereq(self):
982     """Verify that the passed name is a valid one.
983
984     """
985     hostname = utils.HostInfo(self.op.name)
986
987     new_name = hostname.name
988     self.ip = new_ip = hostname.ip
989     old_name = self.sstore.GetClusterName()
990     old_ip = self.sstore.GetMasterIP()
991     if new_name == old_name and new_ip == old_ip:
992       raise errors.OpPrereqError("Neither the name nor the IP address of the"
993                                  " cluster has changed")
994     if new_ip != old_ip:
995       result = utils.RunCmd(["fping", "-q", new_ip])
996       if not result.failed:
997         raise errors.OpPrereqError("The given cluster IP address (%s) is"
998                                    " reachable on the network. Aborting." %
999                                    new_ip)
1000
1001     self.op.name = new_name
1002
1003   def Exec(self, feedback_fn):
1004     """Rename the cluster.
1005
1006     """
1007     clustername = self.op.name
1008     ip = self.ip
1009     ss = self.sstore
1010
1011     # shutdown the master IP
1012     master = ss.GetMasterNode()
1013     if not rpc.call_node_stop_master(master):
1014       raise errors.OpExecError("Could not disable the master role")
1015
1016     try:
1017       # modify the sstore
1018       ss.SetKey(ss.SS_MASTER_IP, ip)
1019       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1020
1021       # Distribute updated ss config to all nodes
1022       myself = self.cfg.GetNodeInfo(master)
1023       dist_nodes = self.cfg.GetNodeList()
1024       if myself.name in dist_nodes:
1025         dist_nodes.remove(myself.name)
1026
1027       logger.Debug("Copying updated ssconf data to all nodes")
1028       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1029         fname = ss.KeyToFilename(keyname)
1030         result = rpc.call_upload_file(dist_nodes, fname)
1031         for to_node in dist_nodes:
1032           if not result[to_node]:
1033             logger.Error("copy of file %s to node %s failed" %
1034                          (fname, to_node))
1035     finally:
1036       if not rpc.call_node_start_master(master):
1037         logger.Error("Could not re-enable the master role on the master,"
1038                      " please restart manually.")
1039
1040
1041 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1042   """Sleep and poll for an instance's disk to sync.
1043
1044   """
1045   if not instance.disks:
1046     return True
1047
1048   if not oneshot:
1049     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1050
1051   node = instance.primary_node
1052
1053   for dev in instance.disks:
1054     cfgw.SetDiskID(dev, node)
1055
1056   retries = 0
1057   while True:
1058     max_time = 0
1059     done = True
1060     cumul_degraded = False
1061     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1062     if not rstats:
1063       proc.LogWarning("Can't get any data from node %s" % node)
1064       retries += 1
1065       if retries >= 10:
1066         raise errors.RemoteError("Can't contact node %s for mirror data,"
1067                                  " aborting." % node)
1068       time.sleep(6)
1069       continue
1070     retries = 0
1071     for i in range(len(rstats)):
1072       mstat = rstats[i]
1073       if mstat is None:
1074         proc.LogWarning("Can't compute data for node %s/%s" %
1075                         (node, instance.disks[i].iv_name))
1076         continue
1077       # we ignore the ldisk parameter
1078       perc_done, est_time, is_degraded, _ = mstat
1079       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1080       if perc_done is not None:
1081         done = False
1082         if est_time is not None:
1083           rem_time = "%d estimated seconds remaining" % est_time
1084           max_time = est_time
1085         else:
1086           rem_time = "no time estimate"
1087         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1088                      (instance.disks[i].iv_name, perc_done, rem_time))
1089     if done or oneshot:
1090       break
1091
1092     if unlock:
1093       utils.Unlock('cmd')
1094     try:
1095       time.sleep(min(60, max_time))
1096     finally:
1097       if unlock:
1098         utils.Lock('cmd')
1099
1100   if done:
1101     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1102   return not cumul_degraded
1103
1104
1105 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1106   """Check that mirrors are not degraded.
1107
1108   The ldisk parameter, if True, will change the test from the
1109   is_degraded attribute (which represents overall non-ok status for
1110   the device(s)) to the ldisk (representing the local storage status).
1111
1112   """
1113   cfgw.SetDiskID(dev, node)
1114   if ldisk:
1115     idx = 6
1116   else:
1117     idx = 5
1118
1119   result = True
1120   if on_primary or dev.AssembleOnSecondary():
1121     rstats = rpc.call_blockdev_find(node, dev)
1122     if not rstats:
1123       logger.ToStderr("Can't get any data from node %s" % node)
1124       result = False
1125     else:
1126       result = result and (not rstats[idx])
1127   if dev.children:
1128     for child in dev.children:
1129       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1130
1131   return result
1132
1133
1134 class LUDiagnoseOS(NoHooksLU):
1135   """Logical unit for OS diagnose/query.
1136
1137   """
1138   _OP_REQP = []
1139
1140   def CheckPrereq(self):
1141     """Check prerequisites.
1142
1143     This always succeeds, since this is a pure query LU.
1144
1145     """
1146     return
1147
1148   def Exec(self, feedback_fn):
1149     """Compute the list of OSes.
1150
1151     """
1152     node_list = self.cfg.GetNodeList()
1153     node_data = rpc.call_os_diagnose(node_list)
1154     if node_data == False:
1155       raise errors.OpExecError("Can't gather the list of OSes")
1156     return node_data
1157
1158
1159 class LURemoveNode(LogicalUnit):
1160   """Logical unit for removing a node.
1161
1162   """
1163   HPATH = "node-remove"
1164   HTYPE = constants.HTYPE_NODE
1165   _OP_REQP = ["node_name"]
1166
1167   def BuildHooksEnv(self):
1168     """Build hooks env.
1169
1170     This doesn't run on the target node in the pre phase as a failed
1171     node would not allows itself to run.
1172
1173     """
1174     env = {
1175       "OP_TARGET": self.op.node_name,
1176       "NODE_NAME": self.op.node_name,
1177       }
1178     all_nodes = self.cfg.GetNodeList()
1179     all_nodes.remove(self.op.node_name)
1180     return env, all_nodes, all_nodes
1181
1182   def CheckPrereq(self):
1183     """Check prerequisites.
1184
1185     This checks:
1186      - the node exists in the configuration
1187      - it does not have primary or secondary instances
1188      - it's not the master
1189
1190     Any errors are signalled by raising errors.OpPrereqError.
1191
1192     """
1193     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1194     if node is None:
1195       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1196
1197     instance_list = self.cfg.GetInstanceList()
1198
1199     masternode = self.sstore.GetMasterNode()
1200     if node.name == masternode:
1201       raise errors.OpPrereqError("Node is the master node,"
1202                                  " you need to failover first.")
1203
1204     for instance_name in instance_list:
1205       instance = self.cfg.GetInstanceInfo(instance_name)
1206       if node.name == instance.primary_node:
1207         raise errors.OpPrereqError("Instance %s still running on the node,"
1208                                    " please remove first." % instance_name)
1209       if node.name in instance.secondary_nodes:
1210         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1211                                    " please remove first." % instance_name)
1212     self.op.node_name = node.name
1213     self.node = node
1214
1215   def Exec(self, feedback_fn):
1216     """Removes the node from the cluster.
1217
1218     """
1219     node = self.node
1220     logger.Info("stopping the node daemon and removing configs from node %s" %
1221                 node.name)
1222
1223     rpc.call_node_leave_cluster(node.name)
1224
1225     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1226
1227     logger.Info("Removing node %s from config" % node.name)
1228
1229     self.cfg.RemoveNode(node.name)
1230
1231     _RemoveHostFromEtcHosts(node.name)
1232
1233
1234 class LUQueryNodes(NoHooksLU):
1235   """Logical unit for querying nodes.
1236
1237   """
1238   _OP_REQP = ["output_fields", "names"]
1239
1240   def CheckPrereq(self):
1241     """Check prerequisites.
1242
1243     This checks that the fields required are valid output fields.
1244
1245     """
1246     self.dynamic_fields = frozenset(["dtotal", "dfree",
1247                                      "mtotal", "mnode", "mfree",
1248                                      "bootid"])
1249
1250     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1251                                "pinst_list", "sinst_list",
1252                                "pip", "sip"],
1253                        dynamic=self.dynamic_fields,
1254                        selected=self.op.output_fields)
1255
1256     self.wanted = _GetWantedNodes(self, self.op.names)
1257
1258   def Exec(self, feedback_fn):
1259     """Computes the list of nodes and their attributes.
1260
1261     """
1262     nodenames = self.wanted
1263     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1264
1265     # begin data gathering
1266
1267     if self.dynamic_fields.intersection(self.op.output_fields):
1268       live_data = {}
1269       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1270       for name in nodenames:
1271         nodeinfo = node_data.get(name, None)
1272         if nodeinfo:
1273           live_data[name] = {
1274             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1275             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1276             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1277             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1278             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1279             "bootid": nodeinfo['bootid'],
1280             }
1281         else:
1282           live_data[name] = {}
1283     else:
1284       live_data = dict.fromkeys(nodenames, {})
1285
1286     node_to_primary = dict([(name, set()) for name in nodenames])
1287     node_to_secondary = dict([(name, set()) for name in nodenames])
1288
1289     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1290                              "sinst_cnt", "sinst_list"))
1291     if inst_fields & frozenset(self.op.output_fields):
1292       instancelist = self.cfg.GetInstanceList()
1293
1294       for instance_name in instancelist:
1295         inst = self.cfg.GetInstanceInfo(instance_name)
1296         if inst.primary_node in node_to_primary:
1297           node_to_primary[inst.primary_node].add(inst.name)
1298         for secnode in inst.secondary_nodes:
1299           if secnode in node_to_secondary:
1300             node_to_secondary[secnode].add(inst.name)
1301
1302     # end data gathering
1303
1304     output = []
1305     for node in nodelist:
1306       node_output = []
1307       for field in self.op.output_fields:
1308         if field == "name":
1309           val = node.name
1310         elif field == "pinst_list":
1311           val = list(node_to_primary[node.name])
1312         elif field == "sinst_list":
1313           val = list(node_to_secondary[node.name])
1314         elif field == "pinst_cnt":
1315           val = len(node_to_primary[node.name])
1316         elif field == "sinst_cnt":
1317           val = len(node_to_secondary[node.name])
1318         elif field == "pip":
1319           val = node.primary_ip
1320         elif field == "sip":
1321           val = node.secondary_ip
1322         elif field in self.dynamic_fields:
1323           val = live_data[node.name].get(field, None)
1324         else:
1325           raise errors.ParameterError(field)
1326         node_output.append(val)
1327       output.append(node_output)
1328
1329     return output
1330
1331
1332 class LUQueryNodeVolumes(NoHooksLU):
1333   """Logical unit for getting volumes on node(s).
1334
1335   """
1336   _OP_REQP = ["nodes", "output_fields"]
1337
1338   def CheckPrereq(self):
1339     """Check prerequisites.
1340
1341     This checks that the fields required are valid output fields.
1342
1343     """
1344     self.nodes = _GetWantedNodes(self, self.op.nodes)
1345
1346     _CheckOutputFields(static=["node"],
1347                        dynamic=["phys", "vg", "name", "size", "instance"],
1348                        selected=self.op.output_fields)
1349
1350
1351   def Exec(self, feedback_fn):
1352     """Computes the list of nodes and their attributes.
1353
1354     """
1355     nodenames = self.nodes
1356     volumes = rpc.call_node_volumes(nodenames)
1357
1358     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1359              in self.cfg.GetInstanceList()]
1360
1361     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1362
1363     output = []
1364     for node in nodenames:
1365       if node not in volumes or not volumes[node]:
1366         continue
1367
1368       node_vols = volumes[node][:]
1369       node_vols.sort(key=lambda vol: vol['dev'])
1370
1371       for vol in node_vols:
1372         node_output = []
1373         for field in self.op.output_fields:
1374           if field == "node":
1375             val = node
1376           elif field == "phys":
1377             val = vol['dev']
1378           elif field == "vg":
1379             val = vol['vg']
1380           elif field == "name":
1381             val = vol['name']
1382           elif field == "size":
1383             val = int(float(vol['size']))
1384           elif field == "instance":
1385             for inst in ilist:
1386               if node not in lv_by_node[inst]:
1387                 continue
1388               if vol['name'] in lv_by_node[inst][node]:
1389                 val = inst.name
1390                 break
1391             else:
1392               val = '-'
1393           else:
1394             raise errors.ParameterError(field)
1395           node_output.append(str(val))
1396
1397         output.append(node_output)
1398
1399     return output
1400
1401
1402 class LUAddNode(LogicalUnit):
1403   """Logical unit for adding node to the cluster.
1404
1405   """
1406   HPATH = "node-add"
1407   HTYPE = constants.HTYPE_NODE
1408   _OP_REQP = ["node_name"]
1409
1410   def BuildHooksEnv(self):
1411     """Build hooks env.
1412
1413     This will run on all nodes before, and on all nodes + the new node after.
1414
1415     """
1416     env = {
1417       "OP_TARGET": self.op.node_name,
1418       "NODE_NAME": self.op.node_name,
1419       "NODE_PIP": self.op.primary_ip,
1420       "NODE_SIP": self.op.secondary_ip,
1421       }
1422     nodes_0 = self.cfg.GetNodeList()
1423     nodes_1 = nodes_0 + [self.op.node_name, ]
1424     return env, nodes_0, nodes_1
1425
1426   def CheckPrereq(self):
1427     """Check prerequisites.
1428
1429     This checks:
1430      - the new node is not already in the config
1431      - it is resolvable
1432      - its parameters (single/dual homed) matches the cluster
1433
1434     Any errors are signalled by raising errors.OpPrereqError.
1435
1436     """
1437     node_name = self.op.node_name
1438     cfg = self.cfg
1439
1440     dns_data = utils.HostInfo(node_name)
1441
1442     node = dns_data.name
1443     primary_ip = self.op.primary_ip = dns_data.ip
1444     secondary_ip = getattr(self.op, "secondary_ip", None)
1445     if secondary_ip is None:
1446       secondary_ip = primary_ip
1447     if not utils.IsValidIP(secondary_ip):
1448       raise errors.OpPrereqError("Invalid secondary IP given")
1449     self.op.secondary_ip = secondary_ip
1450     node_list = cfg.GetNodeList()
1451     if node in node_list:
1452       raise errors.OpPrereqError("Node %s is already in the configuration"
1453                                  % node)
1454
1455     for existing_node_name in node_list:
1456       existing_node = cfg.GetNodeInfo(existing_node_name)
1457       if (existing_node.primary_ip == primary_ip or
1458           existing_node.secondary_ip == primary_ip or
1459           existing_node.primary_ip == secondary_ip or
1460           existing_node.secondary_ip == secondary_ip):
1461         raise errors.OpPrereqError("New node ip address(es) conflict with"
1462                                    " existing node %s" % existing_node.name)
1463
1464     # check that the type of the node (single versus dual homed) is the
1465     # same as for the master
1466     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1467     master_singlehomed = myself.secondary_ip == myself.primary_ip
1468     newbie_singlehomed = secondary_ip == primary_ip
1469     if master_singlehomed != newbie_singlehomed:
1470       if master_singlehomed:
1471         raise errors.OpPrereqError("The master has no private ip but the"
1472                                    " new node has one")
1473       else:
1474         raise errors.OpPrereqError("The master has a private ip but the"
1475                                    " new node doesn't have one")
1476
1477     # checks reachablity
1478     if not utils.TcpPing(utils.HostInfo().name,
1479                          primary_ip,
1480                          constants.DEFAULT_NODED_PORT):
1481       raise errors.OpPrereqError("Node not reachable by ping")
1482
1483     if not newbie_singlehomed:
1484       # check reachability from my secondary ip to newbie's secondary ip
1485       if not utils.TcpPing(myself.secondary_ip,
1486                            secondary_ip,
1487                            constants.DEFAULT_NODED_PORT):
1488         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1489                                    " based ping to noded port")
1490
1491     self.new_node = objects.Node(name=node,
1492                                  primary_ip=primary_ip,
1493                                  secondary_ip=secondary_ip)
1494
1495     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1496       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1497         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1498                                    constants.VNC_PASSWORD_FILE)
1499
1500   def Exec(self, feedback_fn):
1501     """Adds the new node to the cluster.
1502
1503     """
1504     new_node = self.new_node
1505     node = new_node.name
1506
1507     # set up inter-node password and certificate and restarts the node daemon
1508     gntpass = self.sstore.GetNodeDaemonPassword()
1509     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1510       raise errors.OpExecError("ganeti password corruption detected")
1511     f = open(constants.SSL_CERT_FILE)
1512     try:
1513       gntpem = f.read(8192)
1514     finally:
1515       f.close()
1516     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1517     # so we use this to detect an invalid certificate; as long as the
1518     # cert doesn't contain this, the here-document will be correctly
1519     # parsed by the shell sequence below
1520     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1521       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1522     if not gntpem.endswith("\n"):
1523       raise errors.OpExecError("PEM must end with newline")
1524     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1525
1526     # and then connect with ssh to set password and start ganeti-noded
1527     # note that all the below variables are sanitized at this point,
1528     # either by being constants or by the checks above
1529     ss = self.sstore
1530     mycommand = ("umask 077 && "
1531                  "echo '%s' > '%s' && "
1532                  "cat > '%s' << '!EOF.' && \n"
1533                  "%s!EOF.\n%s restart" %
1534                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1535                   constants.SSL_CERT_FILE, gntpem,
1536                   constants.NODE_INITD_SCRIPT))
1537
1538     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1539     if result.failed:
1540       raise errors.OpExecError("Remote command on node %s, error: %s,"
1541                                " output: %s" %
1542                                (node, result.fail_reason, result.output))
1543
1544     # check connectivity
1545     time.sleep(4)
1546
1547     result = rpc.call_version([node])[node]
1548     if result:
1549       if constants.PROTOCOL_VERSION == result:
1550         logger.Info("communication to node %s fine, sw version %s match" %
1551                     (node, result))
1552       else:
1553         raise errors.OpExecError("Version mismatch master version %s,"
1554                                  " node version %s" %
1555                                  (constants.PROTOCOL_VERSION, result))
1556     else:
1557       raise errors.OpExecError("Cannot get version from the new node")
1558
1559     # setup ssh on node
1560     logger.Info("copy ssh key to node %s" % node)
1561     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1562     keyarray = []
1563     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1564                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1565                 priv_key, pub_key]
1566
1567     for i in keyfiles:
1568       f = open(i, 'r')
1569       try:
1570         keyarray.append(f.read())
1571       finally:
1572         f.close()
1573
1574     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1575                                keyarray[3], keyarray[4], keyarray[5])
1576
1577     if not result:
1578       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1579
1580     # Add node to our /etc/hosts, and add key to known_hosts
1581     _AddHostToEtcHosts(new_node.name)
1582
1583     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1584                       self.cfg.GetHostKey())
1585
1586     if new_node.secondary_ip != new_node.primary_ip:
1587       if not rpc.call_node_tcp_ping(new_node.name,
1588                                     constants.LOCALHOST_IP_ADDRESS,
1589                                     new_node.secondary_ip,
1590                                     constants.DEFAULT_NODED_PORT,
1591                                     10, False):
1592         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1593                                  " you gave (%s). Please fix and re-run this"
1594                                  " command." % new_node.secondary_ip)
1595
1596     success, msg = ssh.VerifyNodeHostname(node)
1597     if not success:
1598       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1599                                " than the one the resolver gives: %s."
1600                                " Please fix and re-run this command." %
1601                                (node, msg))
1602
1603     # Distribute updated /etc/hosts and known_hosts to all nodes,
1604     # including the node just added
1605     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1606     dist_nodes = self.cfg.GetNodeList() + [node]
1607     if myself.name in dist_nodes:
1608       dist_nodes.remove(myself.name)
1609
1610     logger.Debug("Copying hosts and known_hosts to all nodes")
1611     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1612       result = rpc.call_upload_file(dist_nodes, fname)
1613       for to_node in dist_nodes:
1614         if not result[to_node]:
1615           logger.Error("copy of file %s to node %s failed" %
1616                        (fname, to_node))
1617
1618     to_copy = ss.GetFileList()
1619     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1620       to_copy.append(constants.VNC_PASSWORD_FILE)
1621     for fname in to_copy:
1622       if not ssh.CopyFileToNode(node, fname):
1623         logger.Error("could not copy file %s to node %s" % (fname, node))
1624
1625     logger.Info("adding node %s to cluster.conf" % node)
1626     self.cfg.AddNode(new_node)
1627
1628
1629 class LUMasterFailover(LogicalUnit):
1630   """Failover the master node to the current node.
1631
1632   This is a special LU in that it must run on a non-master node.
1633
1634   """
1635   HPATH = "master-failover"
1636   HTYPE = constants.HTYPE_CLUSTER
1637   REQ_MASTER = False
1638   _OP_REQP = []
1639
1640   def BuildHooksEnv(self):
1641     """Build hooks env.
1642
1643     This will run on the new master only in the pre phase, and on all
1644     the nodes in the post phase.
1645
1646     """
1647     env = {
1648       "OP_TARGET": self.new_master,
1649       "NEW_MASTER": self.new_master,
1650       "OLD_MASTER": self.old_master,
1651       }
1652     return env, [self.new_master], self.cfg.GetNodeList()
1653
1654   def CheckPrereq(self):
1655     """Check prerequisites.
1656
1657     This checks that we are not already the master.
1658
1659     """
1660     self.new_master = utils.HostInfo().name
1661     self.old_master = self.sstore.GetMasterNode()
1662
1663     if self.old_master == self.new_master:
1664       raise errors.OpPrereqError("This commands must be run on the node"
1665                                  " where you want the new master to be."
1666                                  " %s is already the master" %
1667                                  self.old_master)
1668
1669   def Exec(self, feedback_fn):
1670     """Failover the master node.
1671
1672     This command, when run on a non-master node, will cause the current
1673     master to cease being master, and the non-master to become new
1674     master.
1675
1676     """
1677     #TODO: do not rely on gethostname returning the FQDN
1678     logger.Info("setting master to %s, old master: %s" %
1679                 (self.new_master, self.old_master))
1680
1681     if not rpc.call_node_stop_master(self.old_master):
1682       logger.Error("could disable the master role on the old master"
1683                    " %s, please disable manually" % self.old_master)
1684
1685     ss = self.sstore
1686     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1687     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1688                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1689       logger.Error("could not distribute the new simple store master file"
1690                    " to the other nodes, please check.")
1691
1692     if not rpc.call_node_start_master(self.new_master):
1693       logger.Error("could not start the master role on the new master"
1694                    " %s, please check" % self.new_master)
1695       feedback_fn("Error in activating the master IP on the new master,"
1696                   " please fix manually.")
1697
1698
1699
1700 class LUQueryClusterInfo(NoHooksLU):
1701   """Query cluster configuration.
1702
1703   """
1704   _OP_REQP = []
1705   REQ_MASTER = False
1706
1707   def CheckPrereq(self):
1708     """No prerequsites needed for this LU.
1709
1710     """
1711     pass
1712
1713   def Exec(self, feedback_fn):
1714     """Return cluster config.
1715
1716     """
1717     result = {
1718       "name": self.sstore.GetClusterName(),
1719       "software_version": constants.RELEASE_VERSION,
1720       "protocol_version": constants.PROTOCOL_VERSION,
1721       "config_version": constants.CONFIG_VERSION,
1722       "os_api_version": constants.OS_API_VERSION,
1723       "export_version": constants.EXPORT_VERSION,
1724       "master": self.sstore.GetMasterNode(),
1725       "architecture": (platform.architecture()[0], platform.machine()),
1726       }
1727
1728     return result
1729
1730
1731 class LUClusterCopyFile(NoHooksLU):
1732   """Copy file to cluster.
1733
1734   """
1735   _OP_REQP = ["nodes", "filename"]
1736
1737   def CheckPrereq(self):
1738     """Check prerequisites.
1739
1740     It should check that the named file exists and that the given list
1741     of nodes is valid.
1742
1743     """
1744     if not os.path.exists(self.op.filename):
1745       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1746
1747     self.nodes = _GetWantedNodes(self, self.op.nodes)
1748
1749   def Exec(self, feedback_fn):
1750     """Copy a file from master to some nodes.
1751
1752     Args:
1753       opts - class with options as members
1754       args - list containing a single element, the file name
1755     Opts used:
1756       nodes - list containing the name of target nodes; if empty, all nodes
1757
1758     """
1759     filename = self.op.filename
1760
1761     myname = utils.HostInfo().name
1762
1763     for node in self.nodes:
1764       if node == myname:
1765         continue
1766       if not ssh.CopyFileToNode(node, filename):
1767         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1768
1769
1770 class LUDumpClusterConfig(NoHooksLU):
1771   """Return a text-representation of the cluster-config.
1772
1773   """
1774   _OP_REQP = []
1775
1776   def CheckPrereq(self):
1777     """No prerequisites.
1778
1779     """
1780     pass
1781
1782   def Exec(self, feedback_fn):
1783     """Dump a representation of the cluster config to the standard output.
1784
1785     """
1786     return self.cfg.DumpConfig()
1787
1788
1789 class LURunClusterCommand(NoHooksLU):
1790   """Run a command on some nodes.
1791
1792   """
1793   _OP_REQP = ["command", "nodes"]
1794
1795   def CheckPrereq(self):
1796     """Check prerequisites.
1797
1798     It checks that the given list of nodes is valid.
1799
1800     """
1801     self.nodes = _GetWantedNodes(self, self.op.nodes)
1802
1803   def Exec(self, feedback_fn):
1804     """Run a command on some nodes.
1805
1806     """
1807     data = []
1808     for node in self.nodes:
1809       result = ssh.SSHCall(node, "root", self.op.command)
1810       data.append((node, result.output, result.exit_code))
1811
1812     return data
1813
1814
1815 class LUActivateInstanceDisks(NoHooksLU):
1816   """Bring up an instance's disks.
1817
1818   """
1819   _OP_REQP = ["instance_name"]
1820
1821   def CheckPrereq(self):
1822     """Check prerequisites.
1823
1824     This checks that the instance is in the cluster.
1825
1826     """
1827     instance = self.cfg.GetInstanceInfo(
1828       self.cfg.ExpandInstanceName(self.op.instance_name))
1829     if instance is None:
1830       raise errors.OpPrereqError("Instance '%s' not known" %
1831                                  self.op.instance_name)
1832     self.instance = instance
1833
1834
1835   def Exec(self, feedback_fn):
1836     """Activate the disks.
1837
1838     """
1839     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1840     if not disks_ok:
1841       raise errors.OpExecError("Cannot activate block devices")
1842
1843     return disks_info
1844
1845
1846 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1847   """Prepare the block devices for an instance.
1848
1849   This sets up the block devices on all nodes.
1850
1851   Args:
1852     instance: a ganeti.objects.Instance object
1853     ignore_secondaries: if true, errors on secondary nodes won't result
1854                         in an error return from the function
1855
1856   Returns:
1857     false if the operation failed
1858     list of (host, instance_visible_name, node_visible_name) if the operation
1859          suceeded with the mapping from node devices to instance devices
1860   """
1861   device_info = []
1862   disks_ok = True
1863   iname = instance.name
1864   # With the two passes mechanism we try to reduce the window of
1865   # opportunity for the race condition of switching DRBD to primary
1866   # before handshaking occured, but we do not eliminate it
1867
1868   # The proper fix would be to wait (with some limits) until the
1869   # connection has been made and drbd transitions from WFConnection
1870   # into any other network-connected state (Connected, SyncTarget,
1871   # SyncSource, etc.)
1872
1873   # 1st pass, assemble on all nodes in secondary mode
1874   for inst_disk in instance.disks:
1875     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1876       cfg.SetDiskID(node_disk, node)
1877       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1878       if not result:
1879         logger.Error("could not prepare block device %s on node %s"
1880                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1881         if not ignore_secondaries:
1882           disks_ok = False
1883
1884   # FIXME: race condition on drbd migration to primary
1885
1886   # 2nd pass, do only the primary node
1887   for inst_disk in instance.disks:
1888     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1889       if node != instance.primary_node:
1890         continue
1891       cfg.SetDiskID(node_disk, node)
1892       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1893       if not result:
1894         logger.Error("could not prepare block device %s on node %s"
1895                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1896         disks_ok = False
1897     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1898
1899   # leave the disks configured for the primary node
1900   # this is a workaround that would be fixed better by
1901   # improving the logical/physical id handling
1902   for disk in instance.disks:
1903     cfg.SetDiskID(disk, instance.primary_node)
1904
1905   return disks_ok, device_info
1906
1907
1908 def _StartInstanceDisks(cfg, instance, force):
1909   """Start the disks of an instance.
1910
1911   """
1912   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1913                                            ignore_secondaries=force)
1914   if not disks_ok:
1915     _ShutdownInstanceDisks(instance, cfg)
1916     if force is not None and not force:
1917       logger.Error("If the message above refers to a secondary node,"
1918                    " you can retry the operation using '--force'.")
1919     raise errors.OpExecError("Disk consistency error")
1920
1921
1922 class LUDeactivateInstanceDisks(NoHooksLU):
1923   """Shutdown an instance's disks.
1924
1925   """
1926   _OP_REQP = ["instance_name"]
1927
1928   def CheckPrereq(self):
1929     """Check prerequisites.
1930
1931     This checks that the instance is in the cluster.
1932
1933     """
1934     instance = self.cfg.GetInstanceInfo(
1935       self.cfg.ExpandInstanceName(self.op.instance_name))
1936     if instance is None:
1937       raise errors.OpPrereqError("Instance '%s' not known" %
1938                                  self.op.instance_name)
1939     self.instance = instance
1940
1941   def Exec(self, feedback_fn):
1942     """Deactivate the disks
1943
1944     """
1945     instance = self.instance
1946     ins_l = rpc.call_instance_list([instance.primary_node])
1947     ins_l = ins_l[instance.primary_node]
1948     if not type(ins_l) is list:
1949       raise errors.OpExecError("Can't contact node '%s'" %
1950                                instance.primary_node)
1951
1952     if self.instance.name in ins_l:
1953       raise errors.OpExecError("Instance is running, can't shutdown"
1954                                " block devices.")
1955
1956     _ShutdownInstanceDisks(instance, self.cfg)
1957
1958
1959 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1960   """Shutdown block devices of an instance.
1961
1962   This does the shutdown on all nodes of the instance.
1963
1964   If the ignore_primary is false, errors on the primary node are
1965   ignored.
1966
1967   """
1968   result = True
1969   for disk in instance.disks:
1970     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1971       cfg.SetDiskID(top_disk, node)
1972       if not rpc.call_blockdev_shutdown(node, top_disk):
1973         logger.Error("could not shutdown block device %s on node %s" %
1974                      (disk.iv_name, node))
1975         if not ignore_primary or node != instance.primary_node:
1976           result = False
1977   return result
1978
1979
1980 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1981   """Checks if a node has enough free memory.
1982
1983   This function check if a given node has the needed amount of free
1984   memory. In case the node has less memory or we cannot get the
1985   information from the node, this function raise an OpPrereqError
1986   exception.
1987
1988   Args:
1989     - cfg: a ConfigWriter instance
1990     - node: the node name
1991     - reason: string to use in the error message
1992     - requested: the amount of memory in MiB
1993
1994   """
1995   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1996   if not nodeinfo or not isinstance(nodeinfo, dict):
1997     raise errors.OpPrereqError("Could not contact node %s for resource"
1998                              " information" % (node,))
1999
2000   free_mem = nodeinfo[node].get('memory_free')
2001   if not isinstance(free_mem, int):
2002     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2003                              " was '%s'" % (node, free_mem))
2004   if requested > free_mem:
2005     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2006                              " needed %s MiB, available %s MiB" %
2007                              (node, reason, requested, free_mem))
2008
2009
2010 class LUStartupInstance(LogicalUnit):
2011   """Starts an instance.
2012
2013   """
2014   HPATH = "instance-start"
2015   HTYPE = constants.HTYPE_INSTANCE
2016   _OP_REQP = ["instance_name", "force"]
2017
2018   def BuildHooksEnv(self):
2019     """Build hooks env.
2020
2021     This runs on master, primary and secondary nodes of the instance.
2022
2023     """
2024     env = {
2025       "FORCE": self.op.force,
2026       }
2027     env.update(_BuildInstanceHookEnvByObject(self.instance))
2028     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2029           list(self.instance.secondary_nodes))
2030     return env, nl, nl
2031
2032   def CheckPrereq(self):
2033     """Check prerequisites.
2034
2035     This checks that the instance is in the cluster.
2036
2037     """
2038     instance = self.cfg.GetInstanceInfo(
2039       self.cfg.ExpandInstanceName(self.op.instance_name))
2040     if instance is None:
2041       raise errors.OpPrereqError("Instance '%s' not known" %
2042                                  self.op.instance_name)
2043
2044     # check bridges existance
2045     _CheckInstanceBridgesExist(instance)
2046
2047     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2048                          "starting instance %s" % instance.name,
2049                          instance.memory)
2050
2051     self.instance = instance
2052     self.op.instance_name = instance.name
2053
2054   def Exec(self, feedback_fn):
2055     """Start the instance.
2056
2057     """
2058     instance = self.instance
2059     force = self.op.force
2060     extra_args = getattr(self.op, "extra_args", "")
2061
2062     node_current = instance.primary_node
2063
2064     _StartInstanceDisks(self.cfg, instance, force)
2065
2066     if not rpc.call_instance_start(node_current, instance, extra_args):
2067       _ShutdownInstanceDisks(instance, self.cfg)
2068       raise errors.OpExecError("Could not start instance")
2069
2070     self.cfg.MarkInstanceUp(instance.name)
2071
2072
2073 class LURebootInstance(LogicalUnit):
2074   """Reboot an instance.
2075
2076   """
2077   HPATH = "instance-reboot"
2078   HTYPE = constants.HTYPE_INSTANCE
2079   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2080
2081   def BuildHooksEnv(self):
2082     """Build hooks env.
2083
2084     This runs on master, primary and secondary nodes of the instance.
2085
2086     """
2087     env = {
2088       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2089       }
2090     env.update(_BuildInstanceHookEnvByObject(self.instance))
2091     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2092           list(self.instance.secondary_nodes))
2093     return env, nl, nl
2094
2095   def CheckPrereq(self):
2096     """Check prerequisites.
2097
2098     This checks that the instance is in the cluster.
2099
2100     """
2101     instance = self.cfg.GetInstanceInfo(
2102       self.cfg.ExpandInstanceName(self.op.instance_name))
2103     if instance is None:
2104       raise errors.OpPrereqError("Instance '%s' not known" %
2105                                  self.op.instance_name)
2106
2107     # check bridges existance
2108     _CheckInstanceBridgesExist(instance)
2109
2110     self.instance = instance
2111     self.op.instance_name = instance.name
2112
2113   def Exec(self, feedback_fn):
2114     """Reboot the instance.
2115
2116     """
2117     instance = self.instance
2118     ignore_secondaries = self.op.ignore_secondaries
2119     reboot_type = self.op.reboot_type
2120     extra_args = getattr(self.op, "extra_args", "")
2121
2122     node_current = instance.primary_node
2123
2124     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2125                            constants.INSTANCE_REBOOT_HARD,
2126                            constants.INSTANCE_REBOOT_FULL]:
2127       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2128                                   (constants.INSTANCE_REBOOT_SOFT,
2129                                    constants.INSTANCE_REBOOT_HARD,
2130                                    constants.INSTANCE_REBOOT_FULL))
2131
2132     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2133                        constants.INSTANCE_REBOOT_HARD]:
2134       if not rpc.call_instance_reboot(node_current, instance,
2135                                       reboot_type, extra_args):
2136         raise errors.OpExecError("Could not reboot instance")
2137     else:
2138       if not rpc.call_instance_shutdown(node_current, instance):
2139         raise errors.OpExecError("could not shutdown instance for full reboot")
2140       _ShutdownInstanceDisks(instance, self.cfg)
2141       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2142       if not rpc.call_instance_start(node_current, instance, extra_args):
2143         _ShutdownInstanceDisks(instance, self.cfg)
2144         raise errors.OpExecError("Could not start instance for full reboot")
2145
2146     self.cfg.MarkInstanceUp(instance.name)
2147
2148
2149 class LUShutdownInstance(LogicalUnit):
2150   """Shutdown an instance.
2151
2152   """
2153   HPATH = "instance-stop"
2154   HTYPE = constants.HTYPE_INSTANCE
2155   _OP_REQP = ["instance_name"]
2156
2157   def BuildHooksEnv(self):
2158     """Build hooks env.
2159
2160     This runs on master, primary and secondary nodes of the instance.
2161
2162     """
2163     env = _BuildInstanceHookEnvByObject(self.instance)
2164     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2165           list(self.instance.secondary_nodes))
2166     return env, nl, nl
2167
2168   def CheckPrereq(self):
2169     """Check prerequisites.
2170
2171     This checks that the instance is in the cluster.
2172
2173     """
2174     instance = self.cfg.GetInstanceInfo(
2175       self.cfg.ExpandInstanceName(self.op.instance_name))
2176     if instance is None:
2177       raise errors.OpPrereqError("Instance '%s' not known" %
2178                                  self.op.instance_name)
2179     self.instance = instance
2180
2181   def Exec(self, feedback_fn):
2182     """Shutdown the instance.
2183
2184     """
2185     instance = self.instance
2186     node_current = instance.primary_node
2187     if not rpc.call_instance_shutdown(node_current, instance):
2188       logger.Error("could not shutdown instance")
2189
2190     self.cfg.MarkInstanceDown(instance.name)
2191     _ShutdownInstanceDisks(instance, self.cfg)
2192
2193
2194 class LUReinstallInstance(LogicalUnit):
2195   """Reinstall an instance.
2196
2197   """
2198   HPATH = "instance-reinstall"
2199   HTYPE = constants.HTYPE_INSTANCE
2200   _OP_REQP = ["instance_name"]
2201
2202   def BuildHooksEnv(self):
2203     """Build hooks env.
2204
2205     This runs on master, primary and secondary nodes of the instance.
2206
2207     """
2208     env = _BuildInstanceHookEnvByObject(self.instance)
2209     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2210           list(self.instance.secondary_nodes))
2211     return env, nl, nl
2212
2213   def CheckPrereq(self):
2214     """Check prerequisites.
2215
2216     This checks that the instance is in the cluster and is not running.
2217
2218     """
2219     instance = self.cfg.GetInstanceInfo(
2220       self.cfg.ExpandInstanceName(self.op.instance_name))
2221     if instance is None:
2222       raise errors.OpPrereqError("Instance '%s' not known" %
2223                                  self.op.instance_name)
2224     if instance.disk_template == constants.DT_DISKLESS:
2225       raise errors.OpPrereqError("Instance '%s' has no disks" %
2226                                  self.op.instance_name)
2227     if instance.status != "down":
2228       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2229                                  self.op.instance_name)
2230     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2231     if remote_info:
2232       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2233                                  (self.op.instance_name,
2234                                   instance.primary_node))
2235
2236     self.op.os_type = getattr(self.op, "os_type", None)
2237     if self.op.os_type is not None:
2238       # OS verification
2239       pnode = self.cfg.GetNodeInfo(
2240         self.cfg.ExpandNodeName(instance.primary_node))
2241       if pnode is None:
2242         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2243                                    self.op.pnode)
2244       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2245       if not os_obj:
2246         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2247                                    " primary node"  % self.op.os_type)
2248
2249     self.instance = instance
2250
2251   def Exec(self, feedback_fn):
2252     """Reinstall the instance.
2253
2254     """
2255     inst = self.instance
2256
2257     if self.op.os_type is not None:
2258       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2259       inst.os = self.op.os_type
2260       self.cfg.AddInstance(inst)
2261
2262     _StartInstanceDisks(self.cfg, inst, None)
2263     try:
2264       feedback_fn("Running the instance OS create scripts...")
2265       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2266         raise errors.OpExecError("Could not install OS for instance %s"
2267                                  " on node %s" %
2268                                  (inst.name, inst.primary_node))
2269     finally:
2270       _ShutdownInstanceDisks(inst, self.cfg)
2271
2272
2273 class LURenameInstance(LogicalUnit):
2274   """Rename an instance.
2275
2276   """
2277   HPATH = "instance-rename"
2278   HTYPE = constants.HTYPE_INSTANCE
2279   _OP_REQP = ["instance_name", "new_name"]
2280
2281   def BuildHooksEnv(self):
2282     """Build hooks env.
2283
2284     This runs on master, primary and secondary nodes of the instance.
2285
2286     """
2287     env = _BuildInstanceHookEnvByObject(self.instance)
2288     env["INSTANCE_NEW_NAME"] = self.op.new_name
2289     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2290           list(self.instance.secondary_nodes))
2291     return env, nl, nl
2292
2293   def CheckPrereq(self):
2294     """Check prerequisites.
2295
2296     This checks that the instance is in the cluster and is not running.
2297
2298     """
2299     instance = self.cfg.GetInstanceInfo(
2300       self.cfg.ExpandInstanceName(self.op.instance_name))
2301     if instance is None:
2302       raise errors.OpPrereqError("Instance '%s' not known" %
2303                                  self.op.instance_name)
2304     if instance.status != "down":
2305       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2306                                  self.op.instance_name)
2307     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2308     if remote_info:
2309       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2310                                  (self.op.instance_name,
2311                                   instance.primary_node))
2312     self.instance = instance
2313
2314     # new name verification
2315     name_info = utils.HostInfo(self.op.new_name)
2316
2317     self.op.new_name = new_name = name_info.name
2318     if not getattr(self.op, "ignore_ip", False):
2319       command = ["fping", "-q", name_info.ip]
2320       result = utils.RunCmd(command)
2321       if not result.failed:
2322         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2323                                    (name_info.ip, new_name))
2324
2325
2326   def Exec(self, feedback_fn):
2327     """Reinstall the instance.
2328
2329     """
2330     inst = self.instance
2331     old_name = inst.name
2332
2333     self.cfg.RenameInstance(inst.name, self.op.new_name)
2334
2335     # re-read the instance from the configuration after rename
2336     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2337
2338     _StartInstanceDisks(self.cfg, inst, None)
2339     try:
2340       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2341                                           "sda", "sdb"):
2342         msg = ("Could run OS rename script for instance %s on node %s (but the"
2343                " instance has been renamed in Ganeti)" %
2344                (inst.name, inst.primary_node))
2345         logger.Error(msg)
2346     finally:
2347       _ShutdownInstanceDisks(inst, self.cfg)
2348
2349
2350 class LURemoveInstance(LogicalUnit):
2351   """Remove an instance.
2352
2353   """
2354   HPATH = "instance-remove"
2355   HTYPE = constants.HTYPE_INSTANCE
2356   _OP_REQP = ["instance_name"]
2357
2358   def BuildHooksEnv(self):
2359     """Build hooks env.
2360
2361     This runs on master, primary and secondary nodes of the instance.
2362
2363     """
2364     env = _BuildInstanceHookEnvByObject(self.instance)
2365     nl = [self.sstore.GetMasterNode()]
2366     return env, nl, nl
2367
2368   def CheckPrereq(self):
2369     """Check prerequisites.
2370
2371     This checks that the instance is in the cluster.
2372
2373     """
2374     instance = self.cfg.GetInstanceInfo(
2375       self.cfg.ExpandInstanceName(self.op.instance_name))
2376     if instance is None:
2377       raise errors.OpPrereqError("Instance '%s' not known" %
2378                                  self.op.instance_name)
2379     self.instance = instance
2380
2381   def Exec(self, feedback_fn):
2382     """Remove the instance.
2383
2384     """
2385     instance = self.instance
2386     logger.Info("shutting down instance %s on node %s" %
2387                 (instance.name, instance.primary_node))
2388
2389     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2390       if self.op.ignore_failures:
2391         feedback_fn("Warning: can't shutdown instance")
2392       else:
2393         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2394                                  (instance.name, instance.primary_node))
2395
2396     logger.Info("removing block devices for instance %s" % instance.name)
2397
2398     if not _RemoveDisks(instance, self.cfg):
2399       if self.op.ignore_failures:
2400         feedback_fn("Warning: can't remove instance's disks")
2401       else:
2402         raise errors.OpExecError("Can't remove instance's disks")
2403
2404     logger.Info("removing instance %s out of cluster config" % instance.name)
2405
2406     self.cfg.RemoveInstance(instance.name)
2407
2408
2409 class LUQueryInstances(NoHooksLU):
2410   """Logical unit for querying instances.
2411
2412   """
2413   _OP_REQP = ["output_fields", "names"]
2414
2415   def CheckPrereq(self):
2416     """Check prerequisites.
2417
2418     This checks that the fields required are valid output fields.
2419
2420     """
2421     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2422     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2423                                "admin_state", "admin_ram",
2424                                "disk_template", "ip", "mac", "bridge",
2425                                "sda_size", "sdb_size", "vcpus"],
2426                        dynamic=self.dynamic_fields,
2427                        selected=self.op.output_fields)
2428
2429     self.wanted = _GetWantedInstances(self, self.op.names)
2430
2431   def Exec(self, feedback_fn):
2432     """Computes the list of nodes and their attributes.
2433
2434     """
2435     instance_names = self.wanted
2436     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2437                      in instance_names]
2438
2439     # begin data gathering
2440
2441     nodes = frozenset([inst.primary_node for inst in instance_list])
2442
2443     bad_nodes = []
2444     if self.dynamic_fields.intersection(self.op.output_fields):
2445       live_data = {}
2446       node_data = rpc.call_all_instances_info(nodes)
2447       for name in nodes:
2448         result = node_data[name]
2449         if result:
2450           live_data.update(result)
2451         elif result == False:
2452           bad_nodes.append(name)
2453         # else no instance is alive
2454     else:
2455       live_data = dict([(name, {}) for name in instance_names])
2456
2457     # end data gathering
2458
2459     output = []
2460     for instance in instance_list:
2461       iout = []
2462       for field in self.op.output_fields:
2463         if field == "name":
2464           val = instance.name
2465         elif field == "os":
2466           val = instance.os
2467         elif field == "pnode":
2468           val = instance.primary_node
2469         elif field == "snodes":
2470           val = list(instance.secondary_nodes)
2471         elif field == "admin_state":
2472           val = (instance.status != "down")
2473         elif field == "oper_state":
2474           if instance.primary_node in bad_nodes:
2475             val = None
2476           else:
2477             val = bool(live_data.get(instance.name))
2478         elif field == "status":
2479           if instance.primary_node in bad_nodes:
2480             val = "ERROR_nodedown"
2481           else:
2482             running = bool(live_data.get(instance.name))
2483             if running:
2484               if instance.status != "down":
2485                 val = "running"
2486               else:
2487                 val = "ERROR_up"
2488             else:
2489               if instance.status != "down":
2490                 val = "ERROR_down"
2491               else:
2492                 val = "ADMIN_down"
2493         elif field == "admin_ram":
2494           val = instance.memory
2495         elif field == "oper_ram":
2496           if instance.primary_node in bad_nodes:
2497             val = None
2498           elif instance.name in live_data:
2499             val = live_data[instance.name].get("memory", "?")
2500           else:
2501             val = "-"
2502         elif field == "disk_template":
2503           val = instance.disk_template
2504         elif field == "ip":
2505           val = instance.nics[0].ip
2506         elif field == "bridge":
2507           val = instance.nics[0].bridge
2508         elif field == "mac":
2509           val = instance.nics[0].mac
2510         elif field == "sda_size" or field == "sdb_size":
2511           disk = instance.FindDisk(field[:3])
2512           if disk is None:
2513             val = None
2514           else:
2515             val = disk.size
2516         elif field == "vcpus":
2517           val = instance.vcpus
2518         else:
2519           raise errors.ParameterError(field)
2520         iout.append(val)
2521       output.append(iout)
2522
2523     return output
2524
2525
2526 class LUFailoverInstance(LogicalUnit):
2527   """Failover an instance.
2528
2529   """
2530   HPATH = "instance-failover"
2531   HTYPE = constants.HTYPE_INSTANCE
2532   _OP_REQP = ["instance_name", "ignore_consistency"]
2533
2534   def BuildHooksEnv(self):
2535     """Build hooks env.
2536
2537     This runs on master, primary and secondary nodes of the instance.
2538
2539     """
2540     env = {
2541       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2542       }
2543     env.update(_BuildInstanceHookEnvByObject(self.instance))
2544     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2545     return env, nl, nl
2546
2547   def CheckPrereq(self):
2548     """Check prerequisites.
2549
2550     This checks that the instance is in the cluster.
2551
2552     """
2553     instance = self.cfg.GetInstanceInfo(
2554       self.cfg.ExpandInstanceName(self.op.instance_name))
2555     if instance is None:
2556       raise errors.OpPrereqError("Instance '%s' not known" %
2557                                  self.op.instance_name)
2558
2559     if instance.disk_template not in constants.DTS_NET_MIRROR:
2560       raise errors.OpPrereqError("Instance's disk layout is not"
2561                                  " network mirrored, cannot failover.")
2562
2563     secondary_nodes = instance.secondary_nodes
2564     if not secondary_nodes:
2565       raise errors.ProgrammerError("no secondary node but using "
2566                                    "DT_REMOTE_RAID1 template")
2567
2568     target_node = secondary_nodes[0]
2569     # check memory requirements on the secondary node
2570     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2571                          instance.name, instance.memory)
2572
2573     # check bridge existance
2574     brlist = [nic.bridge for nic in instance.nics]
2575     if not rpc.call_bridges_exist(target_node, brlist):
2576       raise errors.OpPrereqError("One or more target bridges %s does not"
2577                                  " exist on destination node '%s'" %
2578                                  (brlist, target_node))
2579
2580     self.instance = instance
2581
2582   def Exec(self, feedback_fn):
2583     """Failover an instance.
2584
2585     The failover is done by shutting it down on its present node and
2586     starting it on the secondary.
2587
2588     """
2589     instance = self.instance
2590
2591     source_node = instance.primary_node
2592     target_node = instance.secondary_nodes[0]
2593
2594     feedback_fn("* checking disk consistency between source and target")
2595     for dev in instance.disks:
2596       # for remote_raid1, these are md over drbd
2597       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2598         if not self.op.ignore_consistency:
2599           raise errors.OpExecError("Disk %s is degraded on target node,"
2600                                    " aborting failover." % dev.iv_name)
2601
2602     feedback_fn("* shutting down instance on source node")
2603     logger.Info("Shutting down instance %s on node %s" %
2604                 (instance.name, source_node))
2605
2606     if not rpc.call_instance_shutdown(source_node, instance):
2607       if self.op.ignore_consistency:
2608         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2609                      " anyway. Please make sure node %s is down"  %
2610                      (instance.name, source_node, source_node))
2611       else:
2612         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2613                                  (instance.name, source_node))
2614
2615     feedback_fn("* deactivating the instance's disks on source node")
2616     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2617       raise errors.OpExecError("Can't shut down the instance's disks.")
2618
2619     instance.primary_node = target_node
2620     # distribute new instance config to the other nodes
2621     self.cfg.AddInstance(instance)
2622
2623     feedback_fn("* activating the instance's disks on target node")
2624     logger.Info("Starting instance %s on node %s" %
2625                 (instance.name, target_node))
2626
2627     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2628                                              ignore_secondaries=True)
2629     if not disks_ok:
2630       _ShutdownInstanceDisks(instance, self.cfg)
2631       raise errors.OpExecError("Can't activate the instance's disks")
2632
2633     feedback_fn("* starting the instance on the target node")
2634     if not rpc.call_instance_start(target_node, instance, None):
2635       _ShutdownInstanceDisks(instance, self.cfg)
2636       raise errors.OpExecError("Could not start instance %s on node %s." %
2637                                (instance.name, target_node))
2638
2639
2640 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2641   """Create a tree of block devices on the primary node.
2642
2643   This always creates all devices.
2644
2645   """
2646   if device.children:
2647     for child in device.children:
2648       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2649         return False
2650
2651   cfg.SetDiskID(device, node)
2652   new_id = rpc.call_blockdev_create(node, device, device.size,
2653                                     instance.name, True, info)
2654   if not new_id:
2655     return False
2656   if device.physical_id is None:
2657     device.physical_id = new_id
2658   return True
2659
2660
2661 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2662   """Create a tree of block devices on a secondary node.
2663
2664   If this device type has to be created on secondaries, create it and
2665   all its children.
2666
2667   If not, just recurse to children keeping the same 'force' value.
2668
2669   """
2670   if device.CreateOnSecondary():
2671     force = True
2672   if device.children:
2673     for child in device.children:
2674       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2675                                         child, force, info):
2676         return False
2677
2678   if not force:
2679     return True
2680   cfg.SetDiskID(device, node)
2681   new_id = rpc.call_blockdev_create(node, device, device.size,
2682                                     instance.name, False, info)
2683   if not new_id:
2684     return False
2685   if device.physical_id is None:
2686     device.physical_id = new_id
2687   return True
2688
2689
2690 def _GenerateUniqueNames(cfg, exts):
2691   """Generate a suitable LV name.
2692
2693   This will generate a logical volume name for the given instance.
2694
2695   """
2696   results = []
2697   for val in exts:
2698     new_id = cfg.GenerateUniqueID()
2699     results.append("%s%s" % (new_id, val))
2700   return results
2701
2702
2703 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2704   """Generate a drbd device complete with its children.
2705
2706   """
2707   port = cfg.AllocatePort()
2708   vgname = cfg.GetVGName()
2709   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2710                           logical_id=(vgname, names[0]))
2711   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2712                           logical_id=(vgname, names[1]))
2713   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2714                           logical_id = (primary, secondary, port),
2715                           children = [dev_data, dev_meta])
2716   return drbd_dev
2717
2718
2719 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2720   """Generate a drbd8 device complete with its children.
2721
2722   """
2723   port = cfg.AllocatePort()
2724   vgname = cfg.GetVGName()
2725   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2726                           logical_id=(vgname, names[0]))
2727   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2728                           logical_id=(vgname, names[1]))
2729   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2730                           logical_id = (primary, secondary, port),
2731                           children = [dev_data, dev_meta],
2732                           iv_name=iv_name)
2733   return drbd_dev
2734
2735 def _GenerateDiskTemplate(cfg, template_name,
2736                           instance_name, primary_node,
2737                           secondary_nodes, disk_sz, swap_sz):
2738   """Generate the entire disk layout for a given template type.
2739
2740   """
2741   #TODO: compute space requirements
2742
2743   vgname = cfg.GetVGName()
2744   if template_name == "diskless":
2745     disks = []
2746   elif template_name == "plain":
2747     if len(secondary_nodes) != 0:
2748       raise errors.ProgrammerError("Wrong template configuration")
2749
2750     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2751     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2752                            logical_id=(vgname, names[0]),
2753                            iv_name = "sda")
2754     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2755                            logical_id=(vgname, names[1]),
2756                            iv_name = "sdb")
2757     disks = [sda_dev, sdb_dev]
2758   elif template_name == "local_raid1":
2759     if len(secondary_nodes) != 0:
2760       raise errors.ProgrammerError("Wrong template configuration")
2761
2762
2763     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2764                                        ".sdb_m1", ".sdb_m2"])
2765     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2766                               logical_id=(vgname, names[0]))
2767     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2768                               logical_id=(vgname, names[1]))
2769     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2770                               size=disk_sz,
2771                               children = [sda_dev_m1, sda_dev_m2])
2772     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2773                               logical_id=(vgname, names[2]))
2774     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2775                               logical_id=(vgname, names[3]))
2776     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2777                               size=swap_sz,
2778                               children = [sdb_dev_m1, sdb_dev_m2])
2779     disks = [md_sda_dev, md_sdb_dev]
2780   elif template_name == constants.DT_REMOTE_RAID1:
2781     if len(secondary_nodes) != 1:
2782       raise errors.ProgrammerError("Wrong template configuration")
2783     remote_node = secondary_nodes[0]
2784     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2785                                        ".sdb_data", ".sdb_meta"])
2786     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2787                                          disk_sz, names[0:2])
2788     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2789                               children = [drbd_sda_dev], size=disk_sz)
2790     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2791                                          swap_sz, names[2:4])
2792     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2793                               children = [drbd_sdb_dev], size=swap_sz)
2794     disks = [md_sda_dev, md_sdb_dev]
2795   elif template_name == constants.DT_DRBD8:
2796     if len(secondary_nodes) != 1:
2797       raise errors.ProgrammerError("Wrong template configuration")
2798     remote_node = secondary_nodes[0]
2799     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2800                                        ".sdb_data", ".sdb_meta"])
2801     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2802                                          disk_sz, names[0:2], "sda")
2803     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2804                                          swap_sz, names[2:4], "sdb")
2805     disks = [drbd_sda_dev, drbd_sdb_dev]
2806   else:
2807     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2808   return disks
2809
2810
2811 def _GetInstanceInfoText(instance):
2812   """Compute that text that should be added to the disk's metadata.
2813
2814   """
2815   return "originstname+%s" % instance.name
2816
2817
2818 def _CreateDisks(cfg, instance):
2819   """Create all disks for an instance.
2820
2821   This abstracts away some work from AddInstance.
2822
2823   Args:
2824     instance: the instance object
2825
2826   Returns:
2827     True or False showing the success of the creation process
2828
2829   """
2830   info = _GetInstanceInfoText(instance)
2831
2832   for device in instance.disks:
2833     logger.Info("creating volume %s for instance %s" %
2834               (device.iv_name, instance.name))
2835     #HARDCODE
2836     for secondary_node in instance.secondary_nodes:
2837       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2838                                         device, False, info):
2839         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2840                      (device.iv_name, device, secondary_node))
2841         return False
2842     #HARDCODE
2843     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2844                                     instance, device, info):
2845       logger.Error("failed to create volume %s on primary!" %
2846                    device.iv_name)
2847       return False
2848   return True
2849
2850
2851 def _RemoveDisks(instance, cfg):
2852   """Remove all disks for an instance.
2853
2854   This abstracts away some work from `AddInstance()` and
2855   `RemoveInstance()`. Note that in case some of the devices couldn't
2856   be removed, the removal will continue with the other ones (compare
2857   with `_CreateDisks()`).
2858
2859   Args:
2860     instance: the instance object
2861
2862   Returns:
2863     True or False showing the success of the removal proces
2864
2865   """
2866   logger.Info("removing block devices for instance %s" % instance.name)
2867
2868   result = True
2869   for device in instance.disks:
2870     for node, disk in device.ComputeNodeTree(instance.primary_node):
2871       cfg.SetDiskID(disk, node)
2872       if not rpc.call_blockdev_remove(node, disk):
2873         logger.Error("could not remove block device %s on node %s,"
2874                      " continuing anyway" %
2875                      (device.iv_name, node))
2876         result = False
2877   return result
2878
2879
2880 class LUCreateInstance(LogicalUnit):
2881   """Create an instance.
2882
2883   """
2884   HPATH = "instance-add"
2885   HTYPE = constants.HTYPE_INSTANCE
2886   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2887               "disk_template", "swap_size", "mode", "start", "vcpus",
2888               "wait_for_sync", "ip_check", "mac"]
2889
2890   def BuildHooksEnv(self):
2891     """Build hooks env.
2892
2893     This runs on master, primary and secondary nodes of the instance.
2894
2895     """
2896     env = {
2897       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2898       "INSTANCE_DISK_SIZE": self.op.disk_size,
2899       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2900       "INSTANCE_ADD_MODE": self.op.mode,
2901       }
2902     if self.op.mode == constants.INSTANCE_IMPORT:
2903       env["INSTANCE_SRC_NODE"] = self.op.src_node
2904       env["INSTANCE_SRC_PATH"] = self.op.src_path
2905       env["INSTANCE_SRC_IMAGE"] = self.src_image
2906
2907     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2908       primary_node=self.op.pnode,
2909       secondary_nodes=self.secondaries,
2910       status=self.instance_status,
2911       os_type=self.op.os_type,
2912       memory=self.op.mem_size,
2913       vcpus=self.op.vcpus,
2914       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2915     ))
2916
2917     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2918           self.secondaries)
2919     return env, nl, nl
2920
2921
2922   def CheckPrereq(self):
2923     """Check prerequisites.
2924
2925     """
2926     for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2927       if not hasattr(self.op, attr):
2928         setattr(self.op, attr, None)
2929
2930     if self.op.mode not in (constants.INSTANCE_CREATE,
2931                             constants.INSTANCE_IMPORT):
2932       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2933                                  self.op.mode)
2934
2935     if self.op.mode == constants.INSTANCE_IMPORT:
2936       src_node = getattr(self.op, "src_node", None)
2937       src_path = getattr(self.op, "src_path", None)
2938       if src_node is None or src_path is None:
2939         raise errors.OpPrereqError("Importing an instance requires source"
2940                                    " node and path options")
2941       src_node_full = self.cfg.ExpandNodeName(src_node)
2942       if src_node_full is None:
2943         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2944       self.op.src_node = src_node = src_node_full
2945
2946       if not os.path.isabs(src_path):
2947         raise errors.OpPrereqError("The source path must be absolute")
2948
2949       export_info = rpc.call_export_info(src_node, src_path)
2950
2951       if not export_info:
2952         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2953
2954       if not export_info.has_section(constants.INISECT_EXP):
2955         raise errors.ProgrammerError("Corrupted export config")
2956
2957       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2958       if (int(ei_version) != constants.EXPORT_VERSION):
2959         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2960                                    (ei_version, constants.EXPORT_VERSION))
2961
2962       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2963         raise errors.OpPrereqError("Can't import instance with more than"
2964                                    " one data disk")
2965
2966       # FIXME: are the old os-es, disk sizes, etc. useful?
2967       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2968       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2969                                                          'disk0_dump'))
2970       self.src_image = diskimage
2971     else: # INSTANCE_CREATE
2972       if getattr(self.op, "os_type", None) is None:
2973         raise errors.OpPrereqError("No guest OS specified")
2974
2975     # check primary node
2976     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2977     if pnode is None:
2978       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2979                                  self.op.pnode)
2980     self.op.pnode = pnode.name
2981     self.pnode = pnode
2982     self.secondaries = []
2983     # disk template and mirror node verification
2984     if self.op.disk_template not in constants.DISK_TEMPLATES:
2985       raise errors.OpPrereqError("Invalid disk template name")
2986
2987     if self.op.disk_template in constants.DTS_NET_MIRROR:
2988       if getattr(self.op, "snode", None) is None:
2989         raise errors.OpPrereqError("The networked disk templates need"
2990                                    " a mirror node")
2991
2992       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2993       if snode_name is None:
2994         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2995                                    self.op.snode)
2996       elif snode_name == pnode.name:
2997         raise errors.OpPrereqError("The secondary node cannot be"
2998                                    " the primary node.")
2999       self.secondaries.append(snode_name)
3000
3001     # Required free disk space as a function of disk and swap space
3002     req_size_dict = {
3003       constants.DT_DISKLESS: None,
3004       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3005       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3006       # 256 MB are added for drbd metadata, 128MB for each drbd device
3007       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3008       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3009     }
3010
3011     if self.op.disk_template not in req_size_dict:
3012       raise errors.ProgrammerError("Disk template '%s' size requirement"
3013                                    " is unknown" %  self.op.disk_template)
3014
3015     req_size = req_size_dict[self.op.disk_template]
3016
3017     # Check lv size requirements
3018     if req_size is not None:
3019       nodenames = [pnode.name] + self.secondaries
3020       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3021       for node in nodenames:
3022         info = nodeinfo.get(node, None)
3023         if not info:
3024           raise errors.OpPrereqError("Cannot get current information"
3025                                      " from node '%s'" % nodeinfo)
3026         vg_free = info.get('vg_free', None)
3027         if not isinstance(vg_free, int):
3028           raise errors.OpPrereqError("Can't compute free disk space on"
3029                                      " node %s" % node)
3030         if req_size > info['vg_free']:
3031           raise errors.OpPrereqError("Not enough disk space on target node %s."
3032                                      " %d MB available, %d MB required" %
3033                                      (node, info['vg_free'], req_size))
3034
3035     # os verification
3036     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3037     if not os_obj:
3038       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3039                                  " primary node"  % self.op.os_type)
3040
3041     if self.op.kernel_path == constants.VALUE_NONE:
3042       raise errors.OpPrereqError("Can't set instance kernel to none")
3043
3044     # instance verification
3045     hostname1 = utils.HostInfo(self.op.instance_name)
3046
3047     self.op.instance_name = instance_name = hostname1.name
3048     instance_list = self.cfg.GetInstanceList()
3049     if instance_name in instance_list:
3050       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3051                                  instance_name)
3052
3053     ip = getattr(self.op, "ip", None)
3054     if ip is None or ip.lower() == "none":
3055       inst_ip = None
3056     elif ip.lower() == "auto":
3057       inst_ip = hostname1.ip
3058     else:
3059       if not utils.IsValidIP(ip):
3060         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3061                                    " like a valid IP" % ip)
3062       inst_ip = ip
3063     self.inst_ip = inst_ip
3064
3065     if self.op.start and not self.op.ip_check:
3066       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3067                                  " adding an instance in start mode")
3068
3069     if self.op.ip_check:
3070       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3071                        constants.DEFAULT_NODED_PORT):
3072         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3073                                    (hostname1.ip, instance_name))
3074
3075     # MAC address verification
3076     if self.op.mac != "auto":
3077       if not utils.IsValidMac(self.op.mac.lower()):
3078         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3079                                    self.op.mac)
3080
3081     # bridge verification
3082     bridge = getattr(self.op, "bridge", None)
3083     if bridge is None:
3084       self.op.bridge = self.cfg.GetDefBridge()
3085     else:
3086       self.op.bridge = bridge
3087
3088     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3089       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3090                                  " destination node '%s'" %
3091                                  (self.op.bridge, pnode.name))
3092
3093     # boot order verification
3094     if self.op.hvm_boot_order is not None:
3095       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3096         raise errors.OpPrereqError("invalid boot order specified,"
3097                                    " must be one or more of [acdn]")
3098
3099     if self.op.start:
3100       self.instance_status = 'up'
3101     else:
3102       self.instance_status = 'down'
3103
3104   def Exec(self, feedback_fn):
3105     """Create and add the instance to the cluster.
3106
3107     """
3108     instance = self.op.instance_name
3109     pnode_name = self.pnode.name
3110
3111     if self.op.mac == "auto":
3112       mac_address = self.cfg.GenerateMAC()
3113     else:
3114       mac_address = self.op.mac
3115
3116     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3117     if self.inst_ip is not None:
3118       nic.ip = self.inst_ip
3119
3120     ht_kind = self.sstore.GetHypervisorType()
3121     if ht_kind in constants.HTS_REQ_PORT:
3122       network_port = self.cfg.AllocatePort()
3123     else:
3124       network_port = None
3125
3126     disks = _GenerateDiskTemplate(self.cfg,
3127                                   self.op.disk_template,
3128                                   instance, pnode_name,
3129                                   self.secondaries, self.op.disk_size,
3130                                   self.op.swap_size)
3131
3132     iobj = objects.Instance(name=instance, os=self.op.os_type,
3133                             primary_node=pnode_name,
3134                             memory=self.op.mem_size,
3135                             vcpus=self.op.vcpus,
3136                             nics=[nic], disks=disks,
3137                             disk_template=self.op.disk_template,
3138                             status=self.instance_status,
3139                             network_port=network_port,
3140                             kernel_path=self.op.kernel_path,
3141                             initrd_path=self.op.initrd_path,
3142                             hvm_boot_order=self.op.hvm_boot_order,
3143                             )
3144
3145     feedback_fn("* creating instance disks...")
3146     if not _CreateDisks(self.cfg, iobj):
3147       _RemoveDisks(iobj, self.cfg)
3148       raise errors.OpExecError("Device creation failed, reverting...")
3149
3150     feedback_fn("adding instance %s to cluster config" % instance)
3151
3152     self.cfg.AddInstance(iobj)
3153
3154     if self.op.wait_for_sync:
3155       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3156     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3157       # make sure the disks are not degraded (still sync-ing is ok)
3158       time.sleep(15)
3159       feedback_fn("* checking mirrors status")
3160       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3161     else:
3162       disk_abort = False
3163
3164     if disk_abort:
3165       _RemoveDisks(iobj, self.cfg)
3166       self.cfg.RemoveInstance(iobj.name)
3167       raise errors.OpExecError("There are some degraded disks for"
3168                                " this instance")
3169
3170     feedback_fn("creating os for instance %s on node %s" %
3171                 (instance, pnode_name))
3172
3173     if iobj.disk_template != constants.DT_DISKLESS:
3174       if self.op.mode == constants.INSTANCE_CREATE:
3175         feedback_fn("* running the instance OS create scripts...")
3176         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3177           raise errors.OpExecError("could not add os for instance %s"
3178                                    " on node %s" %
3179                                    (instance, pnode_name))
3180
3181       elif self.op.mode == constants.INSTANCE_IMPORT:
3182         feedback_fn("* running the instance OS import scripts...")
3183         src_node = self.op.src_node
3184         src_image = self.src_image
3185         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3186                                                 src_node, src_image):
3187           raise errors.OpExecError("Could not import os for instance"
3188                                    " %s on node %s" %
3189                                    (instance, pnode_name))
3190       else:
3191         # also checked in the prereq part
3192         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3193                                      % self.op.mode)
3194
3195     if self.op.start:
3196       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3197       feedback_fn("* starting instance...")
3198       if not rpc.call_instance_start(pnode_name, iobj, None):
3199         raise errors.OpExecError("Could not start instance")
3200
3201
3202 class LUConnectConsole(NoHooksLU):
3203   """Connect to an instance's console.
3204
3205   This is somewhat special in that it returns the command line that
3206   you need to run on the master node in order to connect to the
3207   console.
3208
3209   """
3210   _OP_REQP = ["instance_name"]
3211
3212   def CheckPrereq(self):
3213     """Check prerequisites.
3214
3215     This checks that the instance is in the cluster.
3216
3217     """
3218     instance = self.cfg.GetInstanceInfo(
3219       self.cfg.ExpandInstanceName(self.op.instance_name))
3220     if instance is None:
3221       raise errors.OpPrereqError("Instance '%s' not known" %
3222                                  self.op.instance_name)
3223     self.instance = instance
3224
3225   def Exec(self, feedback_fn):
3226     """Connect to the console of an instance
3227
3228     """
3229     instance = self.instance
3230     node = instance.primary_node
3231
3232     node_insts = rpc.call_instance_list([node])[node]
3233     if node_insts is False:
3234       raise errors.OpExecError("Can't connect to node %s." % node)
3235
3236     if instance.name not in node_insts:
3237       raise errors.OpExecError("Instance %s is not running." % instance.name)
3238
3239     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3240
3241     hyper = hypervisor.GetHypervisor()
3242     console_cmd = hyper.GetShellCommandForConsole(instance)
3243     # build ssh cmdline
3244     argv = ["ssh", "-q", "-t"]
3245     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3246     argv.extend(ssh.BATCH_MODE_OPTS)
3247     argv.append(node)
3248     argv.append(console_cmd)
3249     return "ssh", argv
3250
3251
3252 class LUAddMDDRBDComponent(LogicalUnit):
3253   """Adda new mirror member to an instance's disk.
3254
3255   """
3256   HPATH = "mirror-add"
3257   HTYPE = constants.HTYPE_INSTANCE
3258   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3259
3260   def BuildHooksEnv(self):
3261     """Build hooks env.
3262
3263     This runs on the master, the primary and all the secondaries.
3264
3265     """
3266     env = {
3267       "NEW_SECONDARY": self.op.remote_node,
3268       "DISK_NAME": self.op.disk_name,
3269       }
3270     env.update(_BuildInstanceHookEnvByObject(self.instance))
3271     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3272           self.op.remote_node,] + list(self.instance.secondary_nodes)
3273     return env, nl, nl
3274
3275   def CheckPrereq(self):
3276     """Check prerequisites.
3277
3278     This checks that the instance is in the cluster.
3279
3280     """
3281     instance = self.cfg.GetInstanceInfo(
3282       self.cfg.ExpandInstanceName(self.op.instance_name))
3283     if instance is None:
3284       raise errors.OpPrereqError("Instance '%s' not known" %
3285                                  self.op.instance_name)
3286     self.instance = instance
3287
3288     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3289     if remote_node is None:
3290       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3291     self.remote_node = remote_node
3292
3293     if remote_node == instance.primary_node:
3294       raise errors.OpPrereqError("The specified node is the primary node of"
3295                                  " the instance.")
3296
3297     if instance.disk_template != constants.DT_REMOTE_RAID1:
3298       raise errors.OpPrereqError("Instance's disk layout is not"
3299                                  " remote_raid1.")
3300     for disk in instance.disks:
3301       if disk.iv_name == self.op.disk_name:
3302         break
3303     else:
3304       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3305                                  " instance." % self.op.disk_name)
3306     if len(disk.children) > 1:
3307       raise errors.OpPrereqError("The device already has two slave devices."
3308                                  " This would create a 3-disk raid1 which we"
3309                                  " don't allow.")
3310     self.disk = disk
3311
3312   def Exec(self, feedback_fn):
3313     """Add the mirror component
3314
3315     """
3316     disk = self.disk
3317     instance = self.instance
3318
3319     remote_node = self.remote_node
3320     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3321     names = _GenerateUniqueNames(self.cfg, lv_names)
3322     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3323                                      remote_node, disk.size, names)
3324
3325     logger.Info("adding new mirror component on secondary")
3326     #HARDCODE
3327     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3328                                       new_drbd, False,
3329                                       _GetInstanceInfoText(instance)):
3330       raise errors.OpExecError("Failed to create new component on secondary"
3331                                " node %s" % remote_node)
3332
3333     logger.Info("adding new mirror component on primary")
3334     #HARDCODE
3335     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3336                                     instance, new_drbd,
3337                                     _GetInstanceInfoText(instance)):
3338       # remove secondary dev
3339       self.cfg.SetDiskID(new_drbd, remote_node)
3340       rpc.call_blockdev_remove(remote_node, new_drbd)
3341       raise errors.OpExecError("Failed to create volume on primary")
3342
3343     # the device exists now
3344     # call the primary node to add the mirror to md
3345     logger.Info("adding new mirror component to md")
3346     if not rpc.call_blockdev_addchildren(instance.primary_node,
3347                                          disk, [new_drbd]):
3348       logger.Error("Can't add mirror compoment to md!")
3349       self.cfg.SetDiskID(new_drbd, remote_node)
3350       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3351         logger.Error("Can't rollback on secondary")
3352       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3353       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3354         logger.Error("Can't rollback on primary")
3355       raise errors.OpExecError("Can't add mirror component to md array")
3356
3357     disk.children.append(new_drbd)
3358
3359     self.cfg.AddInstance(instance)
3360
3361     _WaitForSync(self.cfg, instance, self.proc)
3362
3363     return 0
3364
3365
3366 class LURemoveMDDRBDComponent(LogicalUnit):
3367   """Remove a component from a remote_raid1 disk.
3368
3369   """
3370   HPATH = "mirror-remove"
3371   HTYPE = constants.HTYPE_INSTANCE
3372   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3373
3374   def BuildHooksEnv(self):
3375     """Build hooks env.
3376
3377     This runs on the master, the primary and all the secondaries.
3378
3379     """
3380     env = {
3381       "DISK_NAME": self.op.disk_name,
3382       "DISK_ID": self.op.disk_id,
3383       "OLD_SECONDARY": self.old_secondary,
3384       }
3385     env.update(_BuildInstanceHookEnvByObject(self.instance))
3386     nl = [self.sstore.GetMasterNode(),
3387           self.instance.primary_node] + list(self.instance.secondary_nodes)
3388     return env, nl, nl
3389
3390   def CheckPrereq(self):
3391     """Check prerequisites.
3392
3393     This checks that the instance is in the cluster.
3394
3395     """
3396     instance = self.cfg.GetInstanceInfo(
3397       self.cfg.ExpandInstanceName(self.op.instance_name))
3398     if instance is None:
3399       raise errors.OpPrereqError("Instance '%s' not known" %
3400                                  self.op.instance_name)
3401     self.instance = instance
3402
3403     if instance.disk_template != constants.DT_REMOTE_RAID1:
3404       raise errors.OpPrereqError("Instance's disk layout is not"
3405                                  " remote_raid1.")
3406     for disk in instance.disks:
3407       if disk.iv_name == self.op.disk_name:
3408         break
3409     else:
3410       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3411                                  " instance." % self.op.disk_name)
3412     for child in disk.children:
3413       if (child.dev_type == constants.LD_DRBD7 and
3414           child.logical_id[2] == self.op.disk_id):
3415         break
3416     else:
3417       raise errors.OpPrereqError("Can't find the device with this port.")
3418
3419     if len(disk.children) < 2:
3420       raise errors.OpPrereqError("Cannot remove the last component from"
3421                                  " a mirror.")
3422     self.disk = disk
3423     self.child = child
3424     if self.child.logical_id[0] == instance.primary_node:
3425       oid = 1
3426     else:
3427       oid = 0
3428     self.old_secondary = self.child.logical_id[oid]
3429
3430   def Exec(self, feedback_fn):
3431     """Remove the mirror component
3432
3433     """
3434     instance = self.instance
3435     disk = self.disk
3436     child = self.child
3437     logger.Info("remove mirror component")
3438     self.cfg.SetDiskID(disk, instance.primary_node)
3439     if not rpc.call_blockdev_removechildren(instance.primary_node,
3440                                             disk, [child]):
3441       raise errors.OpExecError("Can't remove child from mirror.")
3442
3443     for node in child.logical_id[:2]:
3444       self.cfg.SetDiskID(child, node)
3445       if not rpc.call_blockdev_remove(node, child):
3446         logger.Error("Warning: failed to remove device from node %s,"
3447                      " continuing operation." % node)
3448
3449     disk.children.remove(child)
3450     self.cfg.AddInstance(instance)
3451
3452
3453 class LUReplaceDisks(LogicalUnit):
3454   """Replace the disks of an instance.
3455
3456   """
3457   HPATH = "mirrors-replace"
3458   HTYPE = constants.HTYPE_INSTANCE
3459   _OP_REQP = ["instance_name", "mode", "disks"]
3460
3461   def BuildHooksEnv(self):
3462     """Build hooks env.
3463
3464     This runs on the master, the primary and all the secondaries.
3465
3466     """
3467     env = {
3468       "MODE": self.op.mode,
3469       "NEW_SECONDARY": self.op.remote_node,
3470       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3471       }
3472     env.update(_BuildInstanceHookEnvByObject(self.instance))
3473     nl = [
3474       self.sstore.GetMasterNode(),
3475       self.instance.primary_node,
3476       ]
3477     if self.op.remote_node is not None:
3478       nl.append(self.op.remote_node)
3479     return env, nl, nl
3480
3481   def CheckPrereq(self):
3482     """Check prerequisites.
3483
3484     This checks that the instance is in the cluster.
3485
3486     """
3487     instance = self.cfg.GetInstanceInfo(
3488       self.cfg.ExpandInstanceName(self.op.instance_name))
3489     if instance is None:
3490       raise errors.OpPrereqError("Instance '%s' not known" %
3491                                  self.op.instance_name)
3492     self.instance = instance
3493     self.op.instance_name = instance.name
3494
3495     if instance.disk_template not in constants.DTS_NET_MIRROR:
3496       raise errors.OpPrereqError("Instance's disk layout is not"
3497                                  " network mirrored.")
3498
3499     if len(instance.secondary_nodes) != 1:
3500       raise errors.OpPrereqError("The instance has a strange layout,"
3501                                  " expected one secondary but found %d" %
3502                                  len(instance.secondary_nodes))
3503
3504     self.sec_node = instance.secondary_nodes[0]
3505
3506     remote_node = getattr(self.op, "remote_node", None)
3507     if remote_node is not None:
3508       remote_node = self.cfg.ExpandNodeName(remote_node)
3509       if remote_node is None:
3510         raise errors.OpPrereqError("Node '%s' not known" %
3511                                    self.op.remote_node)
3512       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3513     else:
3514       self.remote_node_info = None
3515     if remote_node == instance.primary_node:
3516       raise errors.OpPrereqError("The specified node is the primary node of"
3517                                  " the instance.")
3518     elif remote_node == self.sec_node:
3519       if self.op.mode == constants.REPLACE_DISK_SEC:
3520         # this is for DRBD8, where we can't execute the same mode of
3521         # replacement as for drbd7 (no different port allocated)
3522         raise errors.OpPrereqError("Same secondary given, cannot execute"
3523                                    " replacement")
3524       # the user gave the current secondary, switch to
3525       # 'no-replace-secondary' mode for drbd7
3526       remote_node = None
3527     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3528         self.op.mode != constants.REPLACE_DISK_ALL):
3529       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3530                                  " disks replacement, not individual ones")
3531     if instance.disk_template == constants.DT_DRBD8:
3532       if (self.op.mode == constants.REPLACE_DISK_ALL and
3533           remote_node is not None):
3534         # switch to replace secondary mode
3535         self.op.mode = constants.REPLACE_DISK_SEC
3536
3537       if self.op.mode == constants.REPLACE_DISK_ALL:
3538         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3539                                    " secondary disk replacement, not"
3540                                    " both at once")
3541       elif self.op.mode == constants.REPLACE_DISK_PRI:
3542         if remote_node is not None:
3543           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3544                                      " the secondary while doing a primary"
3545                                      " node disk replacement")
3546         self.tgt_node = instance.primary_node
3547         self.oth_node = instance.secondary_nodes[0]
3548       elif self.op.mode == constants.REPLACE_DISK_SEC:
3549         self.new_node = remote_node # this can be None, in which case
3550                                     # we don't change the secondary
3551         self.tgt_node = instance.secondary_nodes[0]
3552         self.oth_node = instance.primary_node
3553       else:
3554         raise errors.ProgrammerError("Unhandled disk replace mode")
3555
3556     for name in self.op.disks:
3557       if instance.FindDisk(name) is None:
3558         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3559                                    (name, instance.name))
3560     self.op.remote_node = remote_node
3561
3562   def _ExecRR1(self, feedback_fn):
3563     """Replace the disks of an instance.
3564
3565     """
3566     instance = self.instance
3567     iv_names = {}
3568     # start of work
3569     if self.op.remote_node is None:
3570       remote_node = self.sec_node
3571     else:
3572       remote_node = self.op.remote_node
3573     cfg = self.cfg
3574     for dev in instance.disks:
3575       size = dev.size
3576       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3577       names = _GenerateUniqueNames(cfg, lv_names)
3578       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3579                                        remote_node, size, names)
3580       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3581       logger.Info("adding new mirror component on secondary for %s" %
3582                   dev.iv_name)
3583       #HARDCODE
3584       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3585                                         new_drbd, False,
3586                                         _GetInstanceInfoText(instance)):
3587         raise errors.OpExecError("Failed to create new component on secondary"
3588                                  " node %s. Full abort, cleanup manually!" %
3589                                  remote_node)
3590
3591       logger.Info("adding new mirror component on primary")
3592       #HARDCODE
3593       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3594                                       instance, new_drbd,
3595                                       _GetInstanceInfoText(instance)):
3596         # remove secondary dev
3597         cfg.SetDiskID(new_drbd, remote_node)
3598         rpc.call_blockdev_remove(remote_node, new_drbd)
3599         raise errors.OpExecError("Failed to create volume on primary!"
3600                                  " Full abort, cleanup manually!!")
3601
3602       # the device exists now
3603       # call the primary node to add the mirror to md
3604       logger.Info("adding new mirror component to md")
3605       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3606                                            [new_drbd]):
3607         logger.Error("Can't add mirror compoment to md!")
3608         cfg.SetDiskID(new_drbd, remote_node)
3609         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3610           logger.Error("Can't rollback on secondary")
3611         cfg.SetDiskID(new_drbd, instance.primary_node)
3612         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3613           logger.Error("Can't rollback on primary")
3614         raise errors.OpExecError("Full abort, cleanup manually!!")
3615
3616       dev.children.append(new_drbd)
3617       cfg.AddInstance(instance)
3618
3619     # this can fail as the old devices are degraded and _WaitForSync
3620     # does a combined result over all disks, so we don't check its
3621     # return value
3622     _WaitForSync(cfg, instance, self.proc, unlock=True)
3623
3624     # so check manually all the devices
3625     for name in iv_names:
3626       dev, child, new_drbd = iv_names[name]
3627       cfg.SetDiskID(dev, instance.primary_node)
3628       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3629       if is_degr:
3630         raise errors.OpExecError("MD device %s is degraded!" % name)
3631       cfg.SetDiskID(new_drbd, instance.primary_node)
3632       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3633       if is_degr:
3634         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3635
3636     for name in iv_names:
3637       dev, child, new_drbd = iv_names[name]
3638       logger.Info("remove mirror %s component" % name)
3639       cfg.SetDiskID(dev, instance.primary_node)
3640       if not rpc.call_blockdev_removechildren(instance.primary_node,
3641                                               dev, [child]):
3642         logger.Error("Can't remove child from mirror, aborting"
3643                      " *this device cleanup*.\nYou need to cleanup manually!!")
3644         continue
3645
3646       for node in child.logical_id[:2]:
3647         logger.Info("remove child device on %s" % node)
3648         cfg.SetDiskID(child, node)
3649         if not rpc.call_blockdev_remove(node, child):
3650           logger.Error("Warning: failed to remove device from node %s,"
3651                        " continuing operation." % node)
3652
3653       dev.children.remove(child)
3654
3655       cfg.AddInstance(instance)
3656
3657   def _ExecD8DiskOnly(self, feedback_fn):
3658     """Replace a disk on the primary or secondary for dbrd8.
3659
3660     The algorithm for replace is quite complicated:
3661       - for each disk to be replaced:
3662         - create new LVs on the target node with unique names
3663         - detach old LVs from the drbd device
3664         - rename old LVs to name_replaced.<time_t>
3665         - rename new LVs to old LVs
3666         - attach the new LVs (with the old names now) to the drbd device
3667       - wait for sync across all devices
3668       - for each modified disk:
3669         - remove old LVs (which have the name name_replaces.<time_t>)
3670
3671     Failures are not very well handled.
3672
3673     """
3674     steps_total = 6
3675     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3676     instance = self.instance
3677     iv_names = {}
3678     vgname = self.cfg.GetVGName()
3679     # start of work
3680     cfg = self.cfg
3681     tgt_node = self.tgt_node
3682     oth_node = self.oth_node
3683
3684     # Step: check device activation
3685     self.proc.LogStep(1, steps_total, "check device existence")
3686     info("checking volume groups")
3687     my_vg = cfg.GetVGName()
3688     results = rpc.call_vg_list([oth_node, tgt_node])
3689     if not results:
3690       raise errors.OpExecError("Can't list volume groups on the nodes")
3691     for node in oth_node, tgt_node:
3692       res = results.get(node, False)
3693       if not res or my_vg not in res:
3694         raise errors.OpExecError("Volume group '%s' not found on %s" %
3695                                  (my_vg, node))
3696     for dev in instance.disks:
3697       if not dev.iv_name in self.op.disks:
3698         continue
3699       for node in tgt_node, oth_node:
3700         info("checking %s on %s" % (dev.iv_name, node))
3701         cfg.SetDiskID(dev, node)
3702         if not rpc.call_blockdev_find(node, dev):
3703           raise errors.OpExecError("Can't find device %s on node %s" %
3704                                    (dev.iv_name, node))
3705
3706     # Step: check other node consistency
3707     self.proc.LogStep(2, steps_total, "check peer consistency")
3708     for dev in instance.disks:
3709       if not dev.iv_name in self.op.disks:
3710         continue
3711       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3712       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3713                                    oth_node==instance.primary_node):
3714         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3715                                  " to replace disks on this node (%s)" %
3716                                  (oth_node, tgt_node))
3717
3718     # Step: create new storage
3719     self.proc.LogStep(3, steps_total, "allocate new storage")
3720     for dev in instance.disks:
3721       if not dev.iv_name in self.op.disks:
3722         continue
3723       size = dev.size
3724       cfg.SetDiskID(dev, tgt_node)
3725       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3726       names = _GenerateUniqueNames(cfg, lv_names)
3727       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3728                              logical_id=(vgname, names[0]))
3729       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3730                              logical_id=(vgname, names[1]))
3731       new_lvs = [lv_data, lv_meta]
3732       old_lvs = dev.children
3733       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3734       info("creating new local storage on %s for %s" %
3735            (tgt_node, dev.iv_name))
3736       # since we *always* want to create this LV, we use the
3737       # _Create...OnPrimary (which forces the creation), even if we
3738       # are talking about the secondary node
3739       for new_lv in new_lvs:
3740         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3741                                         _GetInstanceInfoText(instance)):
3742           raise errors.OpExecError("Failed to create new LV named '%s' on"
3743                                    " node '%s'" %
3744                                    (new_lv.logical_id[1], tgt_node))
3745
3746     # Step: for each lv, detach+rename*2+attach
3747     self.proc.LogStep(4, steps_total, "change drbd configuration")
3748     for dev, old_lvs, new_lvs in iv_names.itervalues():
3749       info("detaching %s drbd from local storage" % dev.iv_name)
3750       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3751         raise errors.OpExecError("Can't detach drbd from local storage on node"
3752                                  " %s for device %s" % (tgt_node, dev.iv_name))
3753       #dev.children = []
3754       #cfg.Update(instance)
3755
3756       # ok, we created the new LVs, so now we know we have the needed
3757       # storage; as such, we proceed on the target node to rename
3758       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3759       # using the assumption than logical_id == physical_id (which in
3760       # turn is the unique_id on that node)
3761
3762       # FIXME(iustin): use a better name for the replaced LVs
3763       temp_suffix = int(time.time())
3764       ren_fn = lambda d, suff: (d.physical_id[0],
3765                                 d.physical_id[1] + "_replaced-%s" % suff)
3766       # build the rename list based on what LVs exist on the node
3767       rlist = []
3768       for to_ren in old_lvs:
3769         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3770         if find_res is not None: # device exists
3771           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3772
3773       info("renaming the old LVs on the target node")
3774       if not rpc.call_blockdev_rename(tgt_node, rlist):
3775         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3776       # now we rename the new LVs to the old LVs
3777       info("renaming the new LVs on the target node")
3778       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3779       if not rpc.call_blockdev_rename(tgt_node, rlist):
3780         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3781
3782       for old, new in zip(old_lvs, new_lvs):
3783         new.logical_id = old.logical_id
3784         cfg.SetDiskID(new, tgt_node)
3785
3786       for disk in old_lvs:
3787         disk.logical_id = ren_fn(disk, temp_suffix)
3788         cfg.SetDiskID(disk, tgt_node)
3789
3790       # now that the new lvs have the old name, we can add them to the device
3791       info("adding new mirror component on %s" % tgt_node)
3792       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3793         for new_lv in new_lvs:
3794           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3795             warning("Can't rollback device %s", hint="manually cleanup unused"
3796                     " logical volumes")
3797         raise errors.OpExecError("Can't add local storage to drbd")
3798
3799       dev.children = new_lvs
3800       cfg.Update(instance)
3801
3802     # Step: wait for sync
3803
3804     # this can fail as the old devices are degraded and _WaitForSync
3805     # does a combined result over all disks, so we don't check its
3806     # return value
3807     self.proc.LogStep(5, steps_total, "sync devices")
3808     _WaitForSync(cfg, instance, self.proc, unlock=True)
3809
3810     # so check manually all the devices
3811     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3812       cfg.SetDiskID(dev, instance.primary_node)
3813       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3814       if is_degr:
3815         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3816
3817     # Step: remove old storage
3818     self.proc.LogStep(6, steps_total, "removing old storage")
3819     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3820       info("remove logical volumes for %s" % name)
3821       for lv in old_lvs:
3822         cfg.SetDiskID(lv, tgt_node)
3823         if not rpc.call_blockdev_remove(tgt_node, lv):
3824           warning("Can't remove old LV", hint="manually remove unused LVs")
3825           continue
3826
3827   def _ExecD8Secondary(self, feedback_fn):
3828     """Replace the secondary node for drbd8.
3829
3830     The algorithm for replace is quite complicated:
3831       - for all disks of the instance:
3832         - create new LVs on the new node with same names
3833         - shutdown the drbd device on the old secondary
3834         - disconnect the drbd network on the primary
3835         - create the drbd device on the new secondary
3836         - network attach the drbd on the primary, using an artifice:
3837           the drbd code for Attach() will connect to the network if it
3838           finds a device which is connected to the good local disks but
3839           not network enabled
3840       - wait for sync across all devices
3841       - remove all disks from the old secondary
3842
3843     Failures are not very well handled.
3844
3845     """
3846     steps_total = 6
3847     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3848     instance = self.instance
3849     iv_names = {}
3850     vgname = self.cfg.GetVGName()
3851     # start of work
3852     cfg = self.cfg
3853     old_node = self.tgt_node
3854     new_node = self.new_node
3855     pri_node = instance.primary_node
3856
3857     # Step: check device activation
3858     self.proc.LogStep(1, steps_total, "check device existence")
3859     info("checking volume groups")
3860     my_vg = cfg.GetVGName()
3861     results = rpc.call_vg_list([pri_node, new_node])
3862     if not results:
3863       raise errors.OpExecError("Can't list volume groups on the nodes")
3864     for node in pri_node, new_node:
3865       res = results.get(node, False)
3866       if not res or my_vg not in res:
3867         raise errors.OpExecError("Volume group '%s' not found on %s" %
3868                                  (my_vg, node))
3869     for dev in instance.disks:
3870       if not dev.iv_name in self.op.disks:
3871         continue
3872       info("checking %s on %s" % (dev.iv_name, pri_node))
3873       cfg.SetDiskID(dev, pri_node)
3874       if not rpc.call_blockdev_find(pri_node, dev):
3875         raise errors.OpExecError("Can't find device %s on node %s" %
3876                                  (dev.iv_name, pri_node))
3877
3878     # Step: check other node consistency
3879     self.proc.LogStep(2, steps_total, "check peer consistency")
3880     for dev in instance.disks:
3881       if not dev.iv_name in self.op.disks:
3882         continue
3883       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3884       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3885         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3886                                  " unsafe to replace the secondary" %
3887                                  pri_node)
3888
3889     # Step: create new storage
3890     self.proc.LogStep(3, steps_total, "allocate new storage")
3891     for dev in instance.disks:
3892       size = dev.size
3893       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3894       # since we *always* want to create this LV, we use the
3895       # _Create...OnPrimary (which forces the creation), even if we
3896       # are talking about the secondary node
3897       for new_lv in dev.children:
3898         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3899                                         _GetInstanceInfoText(instance)):
3900           raise errors.OpExecError("Failed to create new LV named '%s' on"
3901                                    " node '%s'" %
3902                                    (new_lv.logical_id[1], new_node))
3903
3904       iv_names[dev.iv_name] = (dev, dev.children)
3905
3906     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3907     for dev in instance.disks:
3908       size = dev.size
3909       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3910       # create new devices on new_node
3911       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3912                               logical_id=(pri_node, new_node,
3913                                           dev.logical_id[2]),
3914                               children=dev.children)
3915       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3916                                         new_drbd, False,
3917                                       _GetInstanceInfoText(instance)):
3918         raise errors.OpExecError("Failed to create new DRBD on"
3919                                  " node '%s'" % new_node)
3920
3921     for dev in instance.disks:
3922       # we have new devices, shutdown the drbd on the old secondary
3923       info("shutting down drbd for %s on old node" % dev.iv_name)
3924       cfg.SetDiskID(dev, old_node)
3925       if not rpc.call_blockdev_shutdown(old_node, dev):
3926         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3927                 hint="Please cleanup this device manually as soon as possible")
3928
3929     info("detaching primary drbds from the network (=> standalone)")
3930     done = 0
3931     for dev in instance.disks:
3932       cfg.SetDiskID(dev, pri_node)
3933       # set the physical (unique in bdev terms) id to None, meaning
3934       # detach from network
3935       dev.physical_id = (None,) * len(dev.physical_id)
3936       # and 'find' the device, which will 'fix' it to match the
3937       # standalone state
3938       if rpc.call_blockdev_find(pri_node, dev):
3939         done += 1
3940       else:
3941         warning("Failed to detach drbd %s from network, unusual case" %
3942                 dev.iv_name)
3943
3944     if not done:
3945       # no detaches succeeded (very unlikely)
3946       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3947
3948     # if we managed to detach at least one, we update all the disks of
3949     # the instance to point to the new secondary
3950     info("updating instance configuration")
3951     for dev in instance.disks:
3952       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3953       cfg.SetDiskID(dev, pri_node)
3954     cfg.Update(instance)
3955
3956     # and now perform the drbd attach
3957     info("attaching primary drbds to new secondary (standalone => connected)")
3958     failures = []
3959     for dev in instance.disks:
3960       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3961       # since the attach is smart, it's enough to 'find' the device,
3962       # it will automatically activate the network, if the physical_id
3963       # is correct
3964       cfg.SetDiskID(dev, pri_node)
3965       if not rpc.call_blockdev_find(pri_node, dev):
3966         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3967                 "please do a gnt-instance info to see the status of disks")
3968
3969     # this can fail as the old devices are degraded and _WaitForSync
3970     # does a combined result over all disks, so we don't check its
3971     # return value
3972     self.proc.LogStep(5, steps_total, "sync devices")
3973     _WaitForSync(cfg, instance, self.proc, unlock=True)
3974
3975     # so check manually all the devices
3976     for name, (dev, old_lvs) in iv_names.iteritems():
3977       cfg.SetDiskID(dev, pri_node)
3978       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3979       if is_degr:
3980         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3981
3982     self.proc.LogStep(6, steps_total, "removing old storage")
3983     for name, (dev, old_lvs) in iv_names.iteritems():
3984       info("remove logical volumes for %s" % name)
3985       for lv in old_lvs:
3986         cfg.SetDiskID(lv, old_node)
3987         if not rpc.call_blockdev_remove(old_node, lv):
3988           warning("Can't remove LV on old secondary",
3989                   hint="Cleanup stale volumes by hand")
3990
3991   def Exec(self, feedback_fn):
3992     """Execute disk replacement.
3993
3994     This dispatches the disk replacement to the appropriate handler.
3995
3996     """
3997     instance = self.instance
3998     if instance.disk_template == constants.DT_REMOTE_RAID1:
3999       fn = self._ExecRR1
4000     elif instance.disk_template == constants.DT_DRBD8:
4001       if self.op.remote_node is None:
4002         fn = self._ExecD8DiskOnly
4003       else:
4004         fn = self._ExecD8Secondary
4005     else:
4006       raise errors.ProgrammerError("Unhandled disk replacement case")
4007     return fn(feedback_fn)
4008
4009
4010 class LUQueryInstanceData(NoHooksLU):
4011   """Query runtime instance data.
4012
4013   """
4014   _OP_REQP = ["instances"]
4015
4016   def CheckPrereq(self):
4017     """Check prerequisites.
4018
4019     This only checks the optional instance list against the existing names.
4020
4021     """
4022     if not isinstance(self.op.instances, list):
4023       raise errors.OpPrereqError("Invalid argument type 'instances'")
4024     if self.op.instances:
4025       self.wanted_instances = []
4026       names = self.op.instances
4027       for name in names:
4028         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4029         if instance is None:
4030           raise errors.OpPrereqError("No such instance name '%s'" % name)
4031         self.wanted_instances.append(instance)
4032     else:
4033       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4034                                in self.cfg.GetInstanceList()]
4035     return
4036
4037
4038   def _ComputeDiskStatus(self, instance, snode, dev):
4039     """Compute block device status.
4040
4041     """
4042     self.cfg.SetDiskID(dev, instance.primary_node)
4043     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4044     if dev.dev_type in constants.LDS_DRBD:
4045       # we change the snode then (otherwise we use the one passed in)
4046       if dev.logical_id[0] == instance.primary_node:
4047         snode = dev.logical_id[1]
4048       else:
4049         snode = dev.logical_id[0]
4050
4051     if snode:
4052       self.cfg.SetDiskID(dev, snode)
4053       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4054     else:
4055       dev_sstatus = None
4056
4057     if dev.children:
4058       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4059                       for child in dev.children]
4060     else:
4061       dev_children = []
4062
4063     data = {
4064       "iv_name": dev.iv_name,
4065       "dev_type": dev.dev_type,
4066       "logical_id": dev.logical_id,
4067       "physical_id": dev.physical_id,
4068       "pstatus": dev_pstatus,
4069       "sstatus": dev_sstatus,
4070       "children": dev_children,
4071       }
4072
4073     return data
4074
4075   def Exec(self, feedback_fn):
4076     """Gather and return data"""
4077     result = {}
4078     for instance in self.wanted_instances:
4079       remote_info = rpc.call_instance_info(instance.primary_node,
4080                                                 instance.name)
4081       if remote_info and "state" in remote_info:
4082         remote_state = "up"
4083       else:
4084         remote_state = "down"
4085       if instance.status == "down":
4086         config_state = "down"
4087       else:
4088         config_state = "up"
4089
4090       disks = [self._ComputeDiskStatus(instance, None, device)
4091                for device in instance.disks]
4092
4093       idict = {
4094         "name": instance.name,
4095         "config_state": config_state,
4096         "run_state": remote_state,
4097         "pnode": instance.primary_node,
4098         "snodes": instance.secondary_nodes,
4099         "os": instance.os,
4100         "memory": instance.memory,
4101         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4102         "disks": disks,
4103         "network_port": instance.network_port,
4104         "vcpus": instance.vcpus,
4105         "kernel_path": instance.kernel_path,
4106         "initrd_path": instance.initrd_path,
4107         "hvm_boot_order": instance.hvm_boot_order,
4108         }
4109
4110       result[instance.name] = idict
4111
4112     return result
4113
4114
4115 class LUSetInstanceParms(LogicalUnit):
4116   """Modifies an instances's parameters.
4117
4118   """
4119   HPATH = "instance-modify"
4120   HTYPE = constants.HTYPE_INSTANCE
4121   _OP_REQP = ["instance_name"]
4122
4123   def BuildHooksEnv(self):
4124     """Build hooks env.
4125
4126     This runs on the master, primary and secondaries.
4127
4128     """
4129     args = dict()
4130     if self.mem:
4131       args['memory'] = self.mem
4132     if self.vcpus:
4133       args['vcpus'] = self.vcpus
4134     if self.do_ip or self.do_bridge or self.mac:
4135       if self.do_ip:
4136         ip = self.ip
4137       else:
4138         ip = self.instance.nics[0].ip
4139       if self.bridge:
4140         bridge = self.bridge
4141       else:
4142         bridge = self.instance.nics[0].bridge
4143       if self.mac:
4144         mac = self.mac
4145       else:
4146         mac = self.instance.nics[0].mac
4147       args['nics'] = [(ip, bridge, mac)]
4148     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4149     nl = [self.sstore.GetMasterNode(),
4150           self.instance.primary_node] + list(self.instance.secondary_nodes)
4151     return env, nl, nl
4152
4153   def CheckPrereq(self):
4154     """Check prerequisites.
4155
4156     This only checks the instance list against the existing names.
4157
4158     """
4159     self.mem = getattr(self.op, "mem", None)
4160     self.vcpus = getattr(self.op, "vcpus", None)
4161     self.ip = getattr(self.op, "ip", None)
4162     self.mac = getattr(self.op, "mac", None)
4163     self.bridge = getattr(self.op, "bridge", None)
4164     self.kernel_path = getattr(self.op, "kernel_path", None)
4165     self.initrd_path = getattr(self.op, "initrd_path", None)
4166     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4167     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4168                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4169     if all_parms.count(None) == len(all_parms):
4170       raise errors.OpPrereqError("No changes submitted")
4171     if self.mem is not None:
4172       try:
4173         self.mem = int(self.mem)
4174       except ValueError, err:
4175         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4176     if self.vcpus is not None:
4177       try:
4178         self.vcpus = int(self.vcpus)
4179       except ValueError, err:
4180         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4181     if self.ip is not None:
4182       self.do_ip = True
4183       if self.ip.lower() == "none":
4184         self.ip = None
4185       else:
4186         if not utils.IsValidIP(self.ip):
4187           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4188     else:
4189       self.do_ip = False
4190     self.do_bridge = (self.bridge is not None)
4191     if self.mac is not None:
4192       if self.cfg.IsMacInUse(self.mac):
4193         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4194                                    self.mac)
4195       if not utils.IsValidMac(self.mac):
4196         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4197
4198     if self.kernel_path is not None:
4199       self.do_kernel_path = True
4200       if self.kernel_path == constants.VALUE_NONE:
4201         raise errors.OpPrereqError("Can't set instance to no kernel")
4202
4203       if self.kernel_path != constants.VALUE_DEFAULT:
4204         if not os.path.isabs(self.kernel_path):
4205           raise errors.OpPrereqError("The kernel path must be an absolute"
4206                                     " filename")
4207     else:
4208       self.do_kernel_path = False
4209
4210     if self.initrd_path is not None:
4211       self.do_initrd_path = True
4212       if self.initrd_path not in (constants.VALUE_NONE,
4213                                   constants.VALUE_DEFAULT):
4214         if not os.path.isabs(self.initrd_path):
4215           raise errors.OpPrereqError("The initrd path must be an absolute"
4216                                     " filename")
4217     else:
4218       self.do_initrd_path = False
4219
4220     # boot order verification
4221     if self.hvm_boot_order is not None:
4222       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4223         if len(self.hvm_boot_order.strip("acdn")) != 0:
4224           raise errors.OpPrereqError("invalid boot order specified,"
4225                                      " must be one or more of [acdn]"
4226                                      " or 'default'")
4227
4228     instance = self.cfg.GetInstanceInfo(
4229       self.cfg.ExpandInstanceName(self.op.instance_name))
4230     if instance is None:
4231       raise errors.OpPrereqError("No such instance name '%s'" %
4232                                  self.op.instance_name)
4233     self.op.instance_name = instance.name
4234     self.instance = instance
4235     return
4236
4237   def Exec(self, feedback_fn):
4238     """Modifies an instance.
4239
4240     All parameters take effect only at the next restart of the instance.
4241     """
4242     result = []
4243     instance = self.instance
4244     if self.mem:
4245       instance.memory = self.mem
4246       result.append(("mem", self.mem))
4247     if self.vcpus:
4248       instance.vcpus = self.vcpus
4249       result.append(("vcpus",  self.vcpus))
4250     if self.do_ip:
4251       instance.nics[0].ip = self.ip
4252       result.append(("ip", self.ip))
4253     if self.bridge:
4254       instance.nics[0].bridge = self.bridge
4255       result.append(("bridge", self.bridge))
4256     if self.mac:
4257       instance.nics[0].mac = self.mac
4258       result.append(("mac", self.mac))
4259     if self.do_kernel_path:
4260       instance.kernel_path = self.kernel_path
4261       result.append(("kernel_path", self.kernel_path))
4262     if self.do_initrd_path:
4263       instance.initrd_path = self.initrd_path
4264       result.append(("initrd_path", self.initrd_path))
4265     if self.hvm_boot_order:
4266       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4267         instance.hvm_boot_order = None
4268       else:
4269         instance.hvm_boot_order = self.hvm_boot_order
4270       result.append(("hvm_boot_order", self.hvm_boot_order))
4271
4272     self.cfg.AddInstance(instance)
4273
4274     return result
4275
4276
4277 class LUQueryExports(NoHooksLU):
4278   """Query the exports list
4279
4280   """
4281   _OP_REQP = []
4282
4283   def CheckPrereq(self):
4284     """Check that the nodelist contains only existing nodes.
4285
4286     """
4287     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4288
4289   def Exec(self, feedback_fn):
4290     """Compute the list of all the exported system images.
4291
4292     Returns:
4293       a dictionary with the structure node->(export-list)
4294       where export-list is a list of the instances exported on
4295       that node.
4296
4297     """
4298     return rpc.call_export_list(self.nodes)
4299
4300
4301 class LUExportInstance(LogicalUnit):
4302   """Export an instance to an image in the cluster.
4303
4304   """
4305   HPATH = "instance-export"
4306   HTYPE = constants.HTYPE_INSTANCE
4307   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4308
4309   def BuildHooksEnv(self):
4310     """Build hooks env.
4311
4312     This will run on the master, primary node and target node.
4313
4314     """
4315     env = {
4316       "EXPORT_NODE": self.op.target_node,
4317       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4318       }
4319     env.update(_BuildInstanceHookEnvByObject(self.instance))
4320     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4321           self.op.target_node]
4322     return env, nl, nl
4323
4324   def CheckPrereq(self):
4325     """Check prerequisites.
4326
4327     This checks that the instance name is a valid one.
4328
4329     """
4330     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4331     self.instance = self.cfg.GetInstanceInfo(instance_name)
4332     if self.instance is None:
4333       raise errors.OpPrereqError("Instance '%s' not found" %
4334                                  self.op.instance_name)
4335
4336     # node verification
4337     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4338     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4339
4340     if self.dst_node is None:
4341       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4342                                  self.op.target_node)
4343     self.op.target_node = self.dst_node.name
4344
4345   def Exec(self, feedback_fn):
4346     """Export an instance to an image in the cluster.
4347
4348     """
4349     instance = self.instance
4350     dst_node = self.dst_node
4351     src_node = instance.primary_node
4352     # shutdown the instance, unless requested not to do so
4353     if self.op.shutdown:
4354       op = opcodes.OpShutdownInstance(instance_name=instance.name)
4355       self.proc.ChainOpCode(op)
4356
4357     vgname = self.cfg.GetVGName()
4358
4359     snap_disks = []
4360
4361     try:
4362       for disk in instance.disks:
4363         if disk.iv_name == "sda":
4364           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4365           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4366
4367           if not new_dev_name:
4368             logger.Error("could not snapshot block device %s on node %s" %
4369                          (disk.logical_id[1], src_node))
4370           else:
4371             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4372                                       logical_id=(vgname, new_dev_name),
4373                                       physical_id=(vgname, new_dev_name),
4374                                       iv_name=disk.iv_name)
4375             snap_disks.append(new_dev)
4376
4377     finally:
4378       if self.op.shutdown:
4379         op = opcodes.OpStartupInstance(instance_name=instance.name,
4380                                        force=False)
4381         self.proc.ChainOpCode(op)
4382
4383     # TODO: check for size
4384
4385     for dev in snap_disks:
4386       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4387                                            instance):
4388         logger.Error("could not export block device %s from node"
4389                      " %s to node %s" %
4390                      (dev.logical_id[1], src_node, dst_node.name))
4391       if not rpc.call_blockdev_remove(src_node, dev):
4392         logger.Error("could not remove snapshot block device %s from"
4393                      " node %s" % (dev.logical_id[1], src_node))
4394
4395     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4396       logger.Error("could not finalize export for instance %s on node %s" %
4397                    (instance.name, dst_node.name))
4398
4399     nodelist = self.cfg.GetNodeList()
4400     nodelist.remove(dst_node.name)
4401
4402     # on one-node clusters nodelist will be empty after the removal
4403     # if we proceed the backup would be removed because OpQueryExports
4404     # substitutes an empty list with the full cluster node list.
4405     if nodelist:
4406       op = opcodes.OpQueryExports(nodes=nodelist)
4407       exportlist = self.proc.ChainOpCode(op)
4408       for node in exportlist:
4409         if instance.name in exportlist[node]:
4410           if not rpc.call_export_remove(node, instance.name):
4411             logger.Error("could not remove older export for instance %s"
4412                          " on node %s" % (instance.name, node))
4413
4414
4415 class TagsLU(NoHooksLU):
4416   """Generic tags LU.
4417
4418   This is an abstract class which is the parent of all the other tags LUs.
4419
4420   """
4421   def CheckPrereq(self):
4422     """Check prerequisites.
4423
4424     """
4425     if self.op.kind == constants.TAG_CLUSTER:
4426       self.target = self.cfg.GetClusterInfo()
4427     elif self.op.kind == constants.TAG_NODE:
4428       name = self.cfg.ExpandNodeName(self.op.name)
4429       if name is None:
4430         raise errors.OpPrereqError("Invalid node name (%s)" %
4431                                    (self.op.name,))
4432       self.op.name = name
4433       self.target = self.cfg.GetNodeInfo(name)
4434     elif self.op.kind == constants.TAG_INSTANCE:
4435       name = self.cfg.ExpandInstanceName(self.op.name)
4436       if name is None:
4437         raise errors.OpPrereqError("Invalid instance name (%s)" %
4438                                    (self.op.name,))
4439       self.op.name = name
4440       self.target = self.cfg.GetInstanceInfo(name)
4441     else:
4442       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4443                                  str(self.op.kind))
4444
4445
4446 class LUGetTags(TagsLU):
4447   """Returns the tags of a given object.
4448
4449   """
4450   _OP_REQP = ["kind", "name"]
4451
4452   def Exec(self, feedback_fn):
4453     """Returns the tag list.
4454
4455     """
4456     return self.target.GetTags()
4457
4458
4459 class LUSearchTags(NoHooksLU):
4460   """Searches the tags for a given pattern.
4461
4462   """
4463   _OP_REQP = ["pattern"]
4464
4465   def CheckPrereq(self):
4466     """Check prerequisites.
4467
4468     This checks the pattern passed for validity by compiling it.
4469
4470     """
4471     try:
4472       self.re = re.compile(self.op.pattern)
4473     except re.error, err:
4474       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4475                                  (self.op.pattern, err))
4476
4477   def Exec(self, feedback_fn):
4478     """Returns the tag list.
4479
4480     """
4481     cfg = self.cfg
4482     tgts = [("/cluster", cfg.GetClusterInfo())]
4483     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4484     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4485     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4486     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4487     results = []
4488     for path, target in tgts:
4489       for tag in target.GetTags():
4490         if self.re.search(tag):
4491           results.append((path, tag))
4492     return results
4493
4494
4495 class LUAddTags(TagsLU):
4496   """Sets a tag on a given object.
4497
4498   """
4499   _OP_REQP = ["kind", "name", "tags"]
4500
4501   def CheckPrereq(self):
4502     """Check prerequisites.
4503
4504     This checks the type and length of the tag name and value.
4505
4506     """
4507     TagsLU.CheckPrereq(self)
4508     for tag in self.op.tags:
4509       objects.TaggableObject.ValidateTag(tag)
4510
4511   def Exec(self, feedback_fn):
4512     """Sets the tag.
4513
4514     """
4515     try:
4516       for tag in self.op.tags:
4517         self.target.AddTag(tag)
4518     except errors.TagError, err:
4519       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4520     try:
4521       self.cfg.Update(self.target)
4522     except errors.ConfigurationError:
4523       raise errors.OpRetryError("There has been a modification to the"
4524                                 " config file and the operation has been"
4525                                 " aborted. Please retry.")
4526
4527
4528 class LUDelTags(TagsLU):
4529   """Delete a list of tags from a given object.
4530
4531   """
4532   _OP_REQP = ["kind", "name", "tags"]
4533
4534   def CheckPrereq(self):
4535     """Check prerequisites.
4536
4537     This checks that we have the given tag.
4538
4539     """
4540     TagsLU.CheckPrereq(self)
4541     for tag in self.op.tags:
4542       objects.TaggableObject.ValidateTag(tag)
4543     del_tags = frozenset(self.op.tags)
4544     cur_tags = self.target.GetTags()
4545     if not del_tags <= cur_tags:
4546       diff_tags = del_tags - cur_tags
4547       diff_names = ["'%s'" % tag for tag in diff_tags]
4548       diff_names.sort()
4549       raise errors.OpPrereqError("Tag(s) %s not found" %
4550                                  (",".join(diff_names)))
4551
4552   def Exec(self, feedback_fn):
4553     """Remove the tag from the object.
4554
4555     """
4556     for tag in self.op.tags:
4557       self.target.RemoveTag(tag)
4558     try:
4559       self.cfg.Update(self.target)
4560     except errors.ConfigurationError:
4561       raise errors.OpRetryError("There has been a modification to the"
4562                                 " config file and the operation has been"
4563                                 " aborted. Please retry.")
4564
4565 class LUTestDelay(NoHooksLU):
4566   """Sleep for a specified amount of time.
4567
4568   This LU sleeps on the master and/or nodes for a specified amoutn of
4569   time.
4570
4571   """
4572   _OP_REQP = ["duration", "on_master", "on_nodes"]
4573
4574   def CheckPrereq(self):
4575     """Check prerequisites.
4576
4577     This checks that we have a good list of nodes and/or the duration
4578     is valid.
4579
4580     """
4581
4582     if self.op.on_nodes:
4583       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4584
4585   def Exec(self, feedback_fn):
4586     """Do the actual sleep.
4587
4588     """
4589     if self.op.on_master:
4590       if not utils.TestDelay(self.op.duration):
4591         raise errors.OpExecError("Error during master delay test")
4592     if self.op.on_nodes:
4593       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4594       if not result:
4595         raise errors.OpExecError("Complete failure from rpc call")
4596       for node, node_result in result.items():
4597         if not node_result:
4598           raise errors.OpExecError("Failure during rpc call to node %s,"
4599                                    " result: %s" % (node, node_result))