Reduce the chance of DRBD errors with stale primaries
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46 class LogicalUnit(object):
47   """Logical Unit base class.
48
49   Subclasses must follow these rules:
50     - implement CheckPrereq which also fills in the opcode instance
51       with all the fields (even if as None)
52     - implement Exec
53     - implement BuildHooksEnv
54     - redefine HPATH and HTYPE
55     - optionally redefine their run requirements (REQ_CLUSTER,
56       REQ_MASTER); note that all commands require root permissions
57
58   """
59   HPATH = None
60   HTYPE = None
61   _OP_REQP = []
62   REQ_CLUSTER = True
63   REQ_MASTER = True
64
65   def __init__(self, processor, op, cfg, sstore):
66     """Constructor for LogicalUnit.
67
68     This needs to be overriden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = cfg
75     self.sstore = sstore
76     for attr_name in self._OP_REQP:
77       attr_val = getattr(op, attr_name, None)
78       if attr_val is None:
79         raise errors.OpPrereqError("Required parameter '%s' missing" %
80                                    attr_name)
81     if self.REQ_CLUSTER:
82       if not cfg.IsCluster():
83         raise errors.OpPrereqError("Cluster not initialized yet,"
84                                    " use 'gnt-cluster init' first.")
85       if self.REQ_MASTER:
86         master = sstore.GetMasterNode()
87         if master != utils.HostInfo().name:
88           raise errors.OpPrereqError("Commands must be run on the master"
89                                      " node %s" % master)
90
91   def CheckPrereq(self):
92     """Check prerequisites for this LU.
93
94     This method should check that the prerequisites for the execution
95     of this LU are fulfilled. It can do internode communication, but
96     it should be idempotent - no cluster or system changes are
97     allowed.
98
99     The method should raise errors.OpPrereqError in case something is
100     not fulfilled. Its return value is ignored.
101
102     This method should also update all the parameters of the opcode to
103     their canonical form; e.g. a short node name must be fully
104     expanded after this method has successfully completed (so that
105     hooks, logging, etc. work correctly).
106
107     """
108     raise NotImplementedError
109
110   def Exec(self, feedback_fn):
111     """Execute the LU.
112
113     This method should implement the actual work. It should raise
114     errors.OpExecError for failures that are somewhat dealt with in
115     code, or expected.
116
117     """
118     raise NotImplementedError
119
120   def BuildHooksEnv(self):
121     """Build hooks environment for this LU.
122
123     This method should return a three-node tuple consisting of: a dict
124     containing the environment that will be used for running the
125     specific hook for this LU, a list of node names on which the hook
126     should run before the execution, and a list of node names on which
127     the hook should run after the execution.
128
129     The keys of the dict must not have 'GANETI_' prefixed as this will
130     be handled in the hooks runner. Also note additional keys will be
131     added by the hooks runner. If the LU doesn't define any
132     environment, an empty dict (and not None) should be returned.
133
134     As for the node lists, the master should not be included in the
135     them, as it will be added by the hooks runner in case this LU
136     requires a cluster to run on (otherwise we don't have a node
137     list). No nodes should be returned as an empty list (and not
138     None).
139
140     Note that if the HPATH for a LU class is None, this function will
141     not be called.
142
143     """
144     raise NotImplementedError
145
146
147 class NoHooksLU(LogicalUnit):
148   """Simple LU which runs no hooks.
149
150   This LU is intended as a parent for other LogicalUnits which will
151   run no hooks, in order to reduce duplicate code.
152
153   """
154   HPATH = None
155   HTYPE = None
156
157   def BuildHooksEnv(self):
158     """Build hooks env.
159
160     This is a no-op, since we don't run hooks.
161
162     """
163     return {}, [], []
164
165
166 def _AddHostToEtcHosts(hostname):
167   """Wrapper around utils.SetEtcHostsEntry.
168
169   """
170   hi = utils.HostInfo(name=hostname)
171   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172
173
174 def _RemoveHostFromEtcHosts(hostname):
175   """Wrapper around utils.RemoveEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181
182
183 def _GetWantedNodes(lu, nodes):
184   """Returns list of checked and expanded node names.
185
186   Args:
187     nodes: List of nodes (strings) or None for all
188
189   """
190   if not isinstance(nodes, list):
191     raise errors.OpPrereqError("Invalid argument type 'nodes'")
192
193   if nodes:
194     wanted = []
195
196     for name in nodes:
197       node = lu.cfg.ExpandNodeName(name)
198       if node is None:
199         raise errors.OpPrereqError("No such node name '%s'" % name)
200       wanted.append(node)
201
202   else:
203     wanted = lu.cfg.GetNodeList()
204   return utils.NiceSort(wanted)
205
206
207 def _GetWantedInstances(lu, instances):
208   """Returns list of checked and expanded instance names.
209
210   Args:
211     instances: List of instances (strings) or None for all
212
213   """
214   if not isinstance(instances, list):
215     raise errors.OpPrereqError("Invalid argument type 'instances'")
216
217   if instances:
218     wanted = []
219
220     for name in instances:
221       instance = lu.cfg.ExpandInstanceName(name)
222       if instance is None:
223         raise errors.OpPrereqError("No such instance name '%s'" % name)
224       wanted.append(instance)
225
226   else:
227     wanted = lu.cfg.GetInstanceList()
228   return utils.NiceSort(wanted)
229
230
231 def _CheckOutputFields(static, dynamic, selected):
232   """Checks whether all selected fields are valid.
233
234   Args:
235     static: Static fields
236     dynamic: Dynamic fields
237
238   """
239   static_fields = frozenset(static)
240   dynamic_fields = frozenset(dynamic)
241
242   all_fields = static_fields | dynamic_fields
243
244   if not all_fields.issuperset(selected):
245     raise errors.OpPrereqError("Unknown output fields selected: %s"
246                                % ",".join(frozenset(selected).
247                                           difference(all_fields)))
248
249
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251                           memory, vcpus, nics):
252   """Builds instance related env variables for hooks from single variables.
253
254   Args:
255     secondary_nodes: List of secondary nodes as strings
256   """
257   env = {
258     "OP_TARGET": name,
259     "INSTANCE_NAME": name,
260     "INSTANCE_PRIMARY": primary_node,
261     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262     "INSTANCE_OS_TYPE": os_type,
263     "INSTANCE_STATUS": status,
264     "INSTANCE_MEMORY": memory,
265     "INSTANCE_VCPUS": vcpus,
266   }
267
268   if nics:
269     nic_count = len(nics)
270     for idx, (ip, bridge, mac) in enumerate(nics):
271       if ip is None:
272         ip = ""
273       env["INSTANCE_NIC%d_IP" % idx] = ip
274       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
276   else:
277     nic_count = 0
278
279   env["INSTANCE_NIC_COUNT"] = nic_count
280
281   return env
282
283
284 def _BuildInstanceHookEnvByObject(instance, override=None):
285   """Builds instance related env variables for hooks from an object.
286
287   Args:
288     instance: objects.Instance object of instance
289     override: dict of values to override
290   """
291   args = {
292     'name': instance.name,
293     'primary_node': instance.primary_node,
294     'secondary_nodes': instance.secondary_nodes,
295     'os_type': instance.os,
296     'status': instance.os,
297     'memory': instance.memory,
298     'vcpus': instance.vcpus,
299     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
300   }
301   if override:
302     args.update(override)
303   return _BuildInstanceHookEnv(**args)
304
305
306 def _UpdateKnownHosts(fullnode, ip, pubkey):
307   """Ensure a node has a correct known_hosts entry.
308
309   Args:
310     fullnode - Fully qualified domain name of host. (str)
311     ip       - IPv4 address of host (str)
312     pubkey   - the public key of the cluster
313
314   """
315   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
316     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
317   else:
318     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
319
320   inthere = False
321
322   save_lines = []
323   add_lines = []
324   removed = False
325
326   for rawline in f:
327     logger.Debug('read %s' % (repr(rawline),))
328
329     parts = rawline.rstrip('\r\n').split()
330
331     # Ignore unwanted lines
332     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
333       fields = parts[0].split(',')
334       key = parts[2]
335
336       haveall = True
337       havesome = False
338       for spec in [ ip, fullnode ]:
339         if spec not in fields:
340           haveall = False
341         if spec in fields:
342           havesome = True
343
344       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
345       if haveall and key == pubkey:
346         inthere = True
347         save_lines.append(rawline)
348         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
349         continue
350
351       if havesome and (not haveall or key != pubkey):
352         removed = True
353         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
354         continue
355
356     save_lines.append(rawline)
357
358   if not inthere:
359     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
360     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
361
362   if removed:
363     save_lines = save_lines + add_lines
364
365     # Write a new file and replace old.
366     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
367                                    constants.DATA_DIR)
368     newfile = os.fdopen(fd, 'w')
369     try:
370       newfile.write(''.join(save_lines))
371     finally:
372       newfile.close()
373     logger.Debug("Wrote new known_hosts.")
374     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
375
376   elif add_lines:
377     # Simply appending a new line will do the trick.
378     f.seek(0, 2)
379     for add in add_lines:
380       f.write(add)
381
382   f.close()
383
384
385 def _HasValidVG(vglist, vgname):
386   """Checks if the volume group list is valid.
387
388   A non-None return value means there's an error, and the return value
389   is the error message.
390
391   """
392   vgsize = vglist.get(vgname, None)
393   if vgsize is None:
394     return "volume group '%s' missing" % vgname
395   elif vgsize < 20480:
396     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
397             (vgname, vgsize))
398   return None
399
400
401 def _InitSSHSetup(node):
402   """Setup the SSH configuration for the cluster.
403
404
405   This generates a dsa keypair for root, adds the pub key to the
406   permitted hosts and adds the hostkey to its own known hosts.
407
408   Args:
409     node: the name of this host as a fqdn
410
411   """
412   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
413
414   for name in priv_key, pub_key:
415     if os.path.exists(name):
416       utils.CreateBackup(name)
417     utils.RemoveFile(name)
418
419   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
420                          "-f", priv_key,
421                          "-q", "-N", ""])
422   if result.failed:
423     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
424                              result.output)
425
426   f = open(pub_key, 'r')
427   try:
428     utils.AddAuthorizedKey(auth_keys, f.read(8192))
429   finally:
430     f.close()
431
432
433 def _InitGanetiServerSetup(ss):
434   """Setup the necessary configuration for the initial node daemon.
435
436   This creates the nodepass file containing the shared password for
437   the cluster and also generates the SSL certificate.
438
439   """
440   # Create pseudo random password
441   randpass = sha.new(os.urandom(64)).hexdigest()
442   # and write it into sstore
443   ss.SetKey(ss.SS_NODED_PASS, randpass)
444
445   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
446                          "-days", str(365*5), "-nodes", "-x509",
447                          "-keyout", constants.SSL_CERT_FILE,
448                          "-out", constants.SSL_CERT_FILE, "-batch"])
449   if result.failed:
450     raise errors.OpExecError("could not generate server ssl cert, command"
451                              " %s had exitcode %s and error message %s" %
452                              (result.cmd, result.exit_code, result.output))
453
454   os.chmod(constants.SSL_CERT_FILE, 0400)
455
456   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
457
458   if result.failed:
459     raise errors.OpExecError("Could not start the node daemon, command %s"
460                              " had exitcode %s and error %s" %
461                              (result.cmd, result.exit_code, result.output))
462
463
464 def _CheckInstanceBridgesExist(instance):
465   """Check that the brigdes needed by an instance exist.
466
467   """
468   # check bridges existance
469   brlist = [nic.bridge for nic in instance.nics]
470   if not rpc.call_bridges_exist(instance.primary_node, brlist):
471     raise errors.OpPrereqError("one or more target bridges %s does not"
472                                " exist on destination node '%s'" %
473                                (brlist, instance.primary_node))
474
475
476 class LUInitCluster(LogicalUnit):
477   """Initialise the cluster.
478
479   """
480   HPATH = "cluster-init"
481   HTYPE = constants.HTYPE_CLUSTER
482   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
483               "def_bridge", "master_netdev"]
484   REQ_CLUSTER = False
485
486   def BuildHooksEnv(self):
487     """Build hooks env.
488
489     Notes: Since we don't require a cluster, we must manually add
490     ourselves in the post-run node list.
491
492     """
493     env = {"OP_TARGET": self.op.cluster_name}
494     return env, [], [self.hostname.name]
495
496   def CheckPrereq(self):
497     """Verify that the passed name is a valid one.
498
499     """
500     if config.ConfigWriter.IsCluster():
501       raise errors.OpPrereqError("Cluster is already initialised")
502
503     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
504       if not os.path.exists(constants.VNC_PASSWORD_FILE):
505         raise errors.OpPrereqError("Please prepare the cluster VNC"
506                                    "password file %s" %
507                                    constants.VNC_PASSWORD_FILE)
508
509     self.hostname = hostname = utils.HostInfo()
510
511     if hostname.ip.startswith("127."):
512       raise errors.OpPrereqError("This host's IP resolves to the private"
513                                  " range (%s). Please fix DNS or /etc/hosts." %
514                                  (hostname.ip,))
515
516     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
517
518     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
519                          constants.DEFAULT_NODED_PORT):
520       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
521                                  " to %s,\nbut this ip address does not"
522                                  " belong to this host."
523                                  " Aborting." % hostname.ip)
524
525     secondary_ip = getattr(self.op, "secondary_ip", None)
526     if secondary_ip and not utils.IsValidIP(secondary_ip):
527       raise errors.OpPrereqError("Invalid secondary ip given")
528     if (secondary_ip and
529         secondary_ip != hostname.ip and
530         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
531                            constants.DEFAULT_NODED_PORT))):
532       raise errors.OpPrereqError("You gave %s as secondary IP,"
533                                  " but it does not belong to this host." %
534                                  secondary_ip)
535     self.secondary_ip = secondary_ip
536
537     # checks presence of the volume group given
538     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
539
540     if vgstatus:
541       raise errors.OpPrereqError("Error: %s" % vgstatus)
542
543     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
544                     self.op.mac_prefix):
545       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
546                                  self.op.mac_prefix)
547
548     if self.op.hypervisor_type not in constants.HYPER_TYPES:
549       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
550                                  self.op.hypervisor_type)
551
552     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
553     if result.failed:
554       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
555                                  (self.op.master_netdev,
556                                   result.output.strip()))
557
558     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
559             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
560       raise errors.OpPrereqError("Init.d script '%s' missing or not"
561                                  " executable." % constants.NODE_INITD_SCRIPT)
562
563   def Exec(self, feedback_fn):
564     """Initialize the cluster.
565
566     """
567     clustername = self.clustername
568     hostname = self.hostname
569
570     # set up the simple store
571     self.sstore = ss = ssconf.SimpleStore()
572     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
573     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
574     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
575     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
576     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
577
578     # set up the inter-node password and certificate
579     _InitGanetiServerSetup(ss)
580
581     # start the master ip
582     rpc.call_node_start_master(hostname.name)
583
584     # set up ssh config and /etc/hosts
585     f = open(constants.SSH_HOST_RSA_PUB, 'r')
586     try:
587       sshline = f.read()
588     finally:
589       f.close()
590     sshkey = sshline.split(" ")[1]
591
592     _AddHostToEtcHosts(hostname.name)
593
594     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
595
596     _InitSSHSetup(hostname.name)
597
598     # init of cluster config file
599     self.cfg = cfgw = config.ConfigWriter()
600     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
601                     sshkey, self.op.mac_prefix,
602                     self.op.vg_name, self.op.def_bridge)
603
604
605 class LUDestroyCluster(NoHooksLU):
606   """Logical unit for destroying the cluster.
607
608   """
609   _OP_REQP = []
610
611   def CheckPrereq(self):
612     """Check prerequisites.
613
614     This checks whether the cluster is empty.
615
616     Any errors are signalled by raising errors.OpPrereqError.
617
618     """
619     master = self.sstore.GetMasterNode()
620
621     nodelist = self.cfg.GetNodeList()
622     if len(nodelist) != 1 or nodelist[0] != master:
623       raise errors.OpPrereqError("There are still %d node(s) in"
624                                  " this cluster." % (len(nodelist) - 1))
625     instancelist = self.cfg.GetInstanceList()
626     if instancelist:
627       raise errors.OpPrereqError("There are still %d instance(s) in"
628                                  " this cluster." % len(instancelist))
629
630   def Exec(self, feedback_fn):
631     """Destroys the cluster.
632
633     """
634     master = self.sstore.GetMasterNode()
635     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
636     utils.CreateBackup(priv_key)
637     utils.CreateBackup(pub_key)
638     rpc.call_node_leave_cluster(master)
639
640
641 class LUVerifyCluster(NoHooksLU):
642   """Verifies the cluster status.
643
644   """
645   _OP_REQP = []
646
647   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
648                   remote_version, feedback_fn):
649     """Run multiple tests against a node.
650
651     Test list:
652       - compares ganeti version
653       - checks vg existance and size > 20G
654       - checks config file checksum
655       - checks ssh to other nodes
656
657     Args:
658       node: name of the node to check
659       file_list: required list of files
660       local_cksum: dictionary of local files and their checksums
661
662     """
663     # compares ganeti version
664     local_version = constants.PROTOCOL_VERSION
665     if not remote_version:
666       feedback_fn(" - ERROR: connection to %s failed" % (node))
667       return True
668
669     if local_version != remote_version:
670       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
671                       (local_version, node, remote_version))
672       return True
673
674     # checks vg existance and size > 20G
675
676     bad = False
677     if not vglist:
678       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
679                       (node,))
680       bad = True
681     else:
682       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
683       if vgstatus:
684         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
685         bad = True
686
687     # checks config file checksum
688     # checks ssh to any
689
690     if 'filelist' not in node_result:
691       bad = True
692       feedback_fn("  - ERROR: node hasn't returned file checksum data")
693     else:
694       remote_cksum = node_result['filelist']
695       for file_name in file_list:
696         if file_name not in remote_cksum:
697           bad = True
698           feedback_fn("  - ERROR: file '%s' missing" % file_name)
699         elif remote_cksum[file_name] != local_cksum[file_name]:
700           bad = True
701           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
702
703     if 'nodelist' not in node_result:
704       bad = True
705       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
706     else:
707       if node_result['nodelist']:
708         bad = True
709         for node in node_result['nodelist']:
710           feedback_fn("  - ERROR: communication with node '%s': %s" %
711                           (node, node_result['nodelist'][node]))
712     hyp_result = node_result.get('hypervisor', None)
713     if hyp_result is not None:
714       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
715     return bad
716
717   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
718     """Verify an instance.
719
720     This function checks to see if the required block devices are
721     available on the instance's node.
722
723     """
724     bad = False
725
726     instancelist = self.cfg.GetInstanceList()
727     if not instance in instancelist:
728       feedback_fn("  - ERROR: instance %s not in instance list %s" %
729                       (instance, instancelist))
730       bad = True
731
732     instanceconfig = self.cfg.GetInstanceInfo(instance)
733     node_current = instanceconfig.primary_node
734
735     node_vol_should = {}
736     instanceconfig.MapLVsByNode(node_vol_should)
737
738     for node in node_vol_should:
739       for volume in node_vol_should[node]:
740         if node not in node_vol_is or volume not in node_vol_is[node]:
741           feedback_fn("  - ERROR: volume %s missing on node %s" %
742                           (volume, node))
743           bad = True
744
745     if not instanceconfig.status == 'down':
746       if not instance in node_instance[node_current]:
747         feedback_fn("  - ERROR: instance %s not running on node %s" %
748                         (instance, node_current))
749         bad = True
750
751     for node in node_instance:
752       if (not node == node_current):
753         if instance in node_instance[node]:
754           feedback_fn("  - ERROR: instance %s should not run on node %s" %
755                           (instance, node))
756           bad = True
757
758     return bad
759
760   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
761     """Verify if there are any unknown volumes in the cluster.
762
763     The .os, .swap and backup volumes are ignored. All other volumes are
764     reported as unknown.
765
766     """
767     bad = False
768
769     for node in node_vol_is:
770       for volume in node_vol_is[node]:
771         if node not in node_vol_should or volume not in node_vol_should[node]:
772           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
773                       (volume, node))
774           bad = True
775     return bad
776
777   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
778     """Verify the list of running instances.
779
780     This checks what instances are running but unknown to the cluster.
781
782     """
783     bad = False
784     for node in node_instance:
785       for runninginstance in node_instance[node]:
786         if runninginstance not in instancelist:
787           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
788                           (runninginstance, node))
789           bad = True
790     return bad
791
792   def CheckPrereq(self):
793     """Check prerequisites.
794
795     This has no prerequisites.
796
797     """
798     pass
799
800   def Exec(self, feedback_fn):
801     """Verify integrity of cluster, performing various test on nodes.
802
803     """
804     bad = False
805     feedback_fn("* Verifying global settings")
806     for msg in self.cfg.VerifyConfig():
807       feedback_fn("  - ERROR: %s" % msg)
808
809     vg_name = self.cfg.GetVGName()
810     nodelist = utils.NiceSort(self.cfg.GetNodeList())
811     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
812     node_volume = {}
813     node_instance = {}
814
815     # FIXME: verify OS list
816     # do local checksums
817     file_names = list(self.sstore.GetFileList())
818     file_names.append(constants.SSL_CERT_FILE)
819     file_names.append(constants.CLUSTER_CONF_FILE)
820     local_checksums = utils.FingerprintFiles(file_names)
821
822     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
823     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
824     all_instanceinfo = rpc.call_instance_list(nodelist)
825     all_vglist = rpc.call_vg_list(nodelist)
826     node_verify_param = {
827       'filelist': file_names,
828       'nodelist': nodelist,
829       'hypervisor': None,
830       }
831     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
832     all_rversion = rpc.call_version(nodelist)
833
834     for node in nodelist:
835       feedback_fn("* Verifying node %s" % node)
836       result = self._VerifyNode(node, file_names, local_checksums,
837                                 all_vglist[node], all_nvinfo[node],
838                                 all_rversion[node], feedback_fn)
839       bad = bad or result
840
841       # node_volume
842       volumeinfo = all_volumeinfo[node]
843
844       if isinstance(volumeinfo, basestring):
845         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
846                     (node, volumeinfo[-400:].encode('string_escape')))
847         bad = True
848         node_volume[node] = {}
849       elif not isinstance(volumeinfo, dict):
850         feedback_fn("  - ERROR: connection to %s failed" % (node,))
851         bad = True
852         continue
853       else:
854         node_volume[node] = volumeinfo
855
856       # node_instance
857       nodeinstance = all_instanceinfo[node]
858       if type(nodeinstance) != list:
859         feedback_fn("  - ERROR: connection to %s failed" % (node,))
860         bad = True
861         continue
862
863       node_instance[node] = nodeinstance
864
865     node_vol_should = {}
866
867     for instance in instancelist:
868       feedback_fn("* Verifying instance %s" % instance)
869       result =  self._VerifyInstance(instance, node_volume, node_instance,
870                                      feedback_fn)
871       bad = bad or result
872
873       inst_config = self.cfg.GetInstanceInfo(instance)
874
875       inst_config.MapLVsByNode(node_vol_should)
876
877     feedback_fn("* Verifying orphan volumes")
878     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
879                                        feedback_fn)
880     bad = bad or result
881
882     feedback_fn("* Verifying remaining instances")
883     result = self._VerifyOrphanInstances(instancelist, node_instance,
884                                          feedback_fn)
885     bad = bad or result
886
887     return int(bad)
888
889
890 class LUVerifyDisks(NoHooksLU):
891   """Verifies the cluster disks status.
892
893   """
894   _OP_REQP = []
895
896   def CheckPrereq(self):
897     """Check prerequisites.
898
899     This has no prerequisites.
900
901     """
902     pass
903
904   def Exec(self, feedback_fn):
905     """Verify integrity of cluster disks.
906
907     """
908     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
909
910     vg_name = self.cfg.GetVGName()
911     nodes = utils.NiceSort(self.cfg.GetNodeList())
912     instances = [self.cfg.GetInstanceInfo(name)
913                  for name in self.cfg.GetInstanceList()]
914
915     nv_dict = {}
916     for inst in instances:
917       inst_lvs = {}
918       if (inst.status != "up" or
919           inst.disk_template not in constants.DTS_NET_MIRROR):
920         continue
921       inst.MapLVsByNode(inst_lvs)
922       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
923       for node, vol_list in inst_lvs.iteritems():
924         for vol in vol_list:
925           nv_dict[(node, vol)] = inst
926
927     if not nv_dict:
928       return result
929
930     node_lvs = rpc.call_volume_list(nodes, vg_name)
931
932     to_act = set()
933     for node in nodes:
934       # node_volume
935       lvs = node_lvs[node]
936
937       if isinstance(lvs, basestring):
938         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
939         res_nlvm[node] = lvs
940       elif not isinstance(lvs, dict):
941         logger.Info("connection to node %s failed or invalid data returned" %
942                     (node,))
943         res_nodes.append(node)
944         continue
945
946       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
947         inst = nv_dict.pop((node, lv_name), None)
948         if (not lv_online and inst is not None
949             and inst.name not in res_instances):
950           res_instances.append(inst.name)
951
952     # any leftover items in nv_dict are missing LVs, let's arrange the
953     # data better
954     for key, inst in nv_dict.iteritems():
955       if inst.name not in res_missing:
956         res_missing[inst.name] = []
957       res_missing[inst.name].append(key)
958
959     return result
960
961
962 class LURenameCluster(LogicalUnit):
963   """Rename the cluster.
964
965   """
966   HPATH = "cluster-rename"
967   HTYPE = constants.HTYPE_CLUSTER
968   _OP_REQP = ["name"]
969
970   def BuildHooksEnv(self):
971     """Build hooks env.
972
973     """
974     env = {
975       "OP_TARGET": self.op.sstore.GetClusterName(),
976       "NEW_NAME": self.op.name,
977       }
978     mn = self.sstore.GetMasterNode()
979     return env, [mn], [mn]
980
981   def CheckPrereq(self):
982     """Verify that the passed name is a valid one.
983
984     """
985     hostname = utils.HostInfo(self.op.name)
986
987     new_name = hostname.name
988     self.ip = new_ip = hostname.ip
989     old_name = self.sstore.GetClusterName()
990     old_ip = self.sstore.GetMasterIP()
991     if new_name == old_name and new_ip == old_ip:
992       raise errors.OpPrereqError("Neither the name nor the IP address of the"
993                                  " cluster has changed")
994     if new_ip != old_ip:
995       result = utils.RunCmd(["fping", "-q", new_ip])
996       if not result.failed:
997         raise errors.OpPrereqError("The given cluster IP address (%s) is"
998                                    " reachable on the network. Aborting." %
999                                    new_ip)
1000
1001     self.op.name = new_name
1002
1003   def Exec(self, feedback_fn):
1004     """Rename the cluster.
1005
1006     """
1007     clustername = self.op.name
1008     ip = self.ip
1009     ss = self.sstore
1010
1011     # shutdown the master IP
1012     master = ss.GetMasterNode()
1013     if not rpc.call_node_stop_master(master):
1014       raise errors.OpExecError("Could not disable the master role")
1015
1016     try:
1017       # modify the sstore
1018       ss.SetKey(ss.SS_MASTER_IP, ip)
1019       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1020
1021       # Distribute updated ss config to all nodes
1022       myself = self.cfg.GetNodeInfo(master)
1023       dist_nodes = self.cfg.GetNodeList()
1024       if myself.name in dist_nodes:
1025         dist_nodes.remove(myself.name)
1026
1027       logger.Debug("Copying updated ssconf data to all nodes")
1028       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1029         fname = ss.KeyToFilename(keyname)
1030         result = rpc.call_upload_file(dist_nodes, fname)
1031         for to_node in dist_nodes:
1032           if not result[to_node]:
1033             logger.Error("copy of file %s to node %s failed" %
1034                          (fname, to_node))
1035     finally:
1036       if not rpc.call_node_start_master(master):
1037         logger.Error("Could not re-enable the master role on the master,"
1038                      " please restart manually.")
1039
1040
1041 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1042   """Sleep and poll for an instance's disk to sync.
1043
1044   """
1045   if not instance.disks:
1046     return True
1047
1048   if not oneshot:
1049     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1050
1051   node = instance.primary_node
1052
1053   for dev in instance.disks:
1054     cfgw.SetDiskID(dev, node)
1055
1056   retries = 0
1057   while True:
1058     max_time = 0
1059     done = True
1060     cumul_degraded = False
1061     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1062     if not rstats:
1063       proc.LogWarning("Can't get any data from node %s" % node)
1064       retries += 1
1065       if retries >= 10:
1066         raise errors.RemoteError("Can't contact node %s for mirror data,"
1067                                  " aborting." % node)
1068       time.sleep(6)
1069       continue
1070     retries = 0
1071     for i in range(len(rstats)):
1072       mstat = rstats[i]
1073       if mstat is None:
1074         proc.LogWarning("Can't compute data for node %s/%s" %
1075                         (node, instance.disks[i].iv_name))
1076         continue
1077       # we ignore the ldisk parameter
1078       perc_done, est_time, is_degraded, _ = mstat
1079       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1080       if perc_done is not None:
1081         done = False
1082         if est_time is not None:
1083           rem_time = "%d estimated seconds remaining" % est_time
1084           max_time = est_time
1085         else:
1086           rem_time = "no time estimate"
1087         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1088                      (instance.disks[i].iv_name, perc_done, rem_time))
1089     if done or oneshot:
1090       break
1091
1092     if unlock:
1093       utils.Unlock('cmd')
1094     try:
1095       time.sleep(min(60, max_time))
1096     finally:
1097       if unlock:
1098         utils.Lock('cmd')
1099
1100   if done:
1101     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1102   return not cumul_degraded
1103
1104
1105 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1106   """Check that mirrors are not degraded.
1107
1108   The ldisk parameter, if True, will change the test from the
1109   is_degraded attribute (which represents overall non-ok status for
1110   the device(s)) to the ldisk (representing the local storage status).
1111
1112   """
1113   cfgw.SetDiskID(dev, node)
1114   if ldisk:
1115     idx = 6
1116   else:
1117     idx = 5
1118
1119   result = True
1120   if on_primary or dev.AssembleOnSecondary():
1121     rstats = rpc.call_blockdev_find(node, dev)
1122     if not rstats:
1123       logger.ToStderr("Can't get any data from node %s" % node)
1124       result = False
1125     else:
1126       result = result and (not rstats[idx])
1127   if dev.children:
1128     for child in dev.children:
1129       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1130
1131   return result
1132
1133
1134 class LUDiagnoseOS(NoHooksLU):
1135   """Logical unit for OS diagnose/query.
1136
1137   """
1138   _OP_REQP = []
1139
1140   def CheckPrereq(self):
1141     """Check prerequisites.
1142
1143     This always succeeds, since this is a pure query LU.
1144
1145     """
1146     return
1147
1148   def Exec(self, feedback_fn):
1149     """Compute the list of OSes.
1150
1151     """
1152     node_list = self.cfg.GetNodeList()
1153     node_data = rpc.call_os_diagnose(node_list)
1154     if node_data == False:
1155       raise errors.OpExecError("Can't gather the list of OSes")
1156     return node_data
1157
1158
1159 class LURemoveNode(LogicalUnit):
1160   """Logical unit for removing a node.
1161
1162   """
1163   HPATH = "node-remove"
1164   HTYPE = constants.HTYPE_NODE
1165   _OP_REQP = ["node_name"]
1166
1167   def BuildHooksEnv(self):
1168     """Build hooks env.
1169
1170     This doesn't run on the target node in the pre phase as a failed
1171     node would not allows itself to run.
1172
1173     """
1174     env = {
1175       "OP_TARGET": self.op.node_name,
1176       "NODE_NAME": self.op.node_name,
1177       }
1178     all_nodes = self.cfg.GetNodeList()
1179     all_nodes.remove(self.op.node_name)
1180     return env, all_nodes, all_nodes
1181
1182   def CheckPrereq(self):
1183     """Check prerequisites.
1184
1185     This checks:
1186      - the node exists in the configuration
1187      - it does not have primary or secondary instances
1188      - it's not the master
1189
1190     Any errors are signalled by raising errors.OpPrereqError.
1191
1192     """
1193     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1194     if node is None:
1195       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1196
1197     instance_list = self.cfg.GetInstanceList()
1198
1199     masternode = self.sstore.GetMasterNode()
1200     if node.name == masternode:
1201       raise errors.OpPrereqError("Node is the master node,"
1202                                  " you need to failover first.")
1203
1204     for instance_name in instance_list:
1205       instance = self.cfg.GetInstanceInfo(instance_name)
1206       if node.name == instance.primary_node:
1207         raise errors.OpPrereqError("Instance %s still running on the node,"
1208                                    " please remove first." % instance_name)
1209       if node.name in instance.secondary_nodes:
1210         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1211                                    " please remove first." % instance_name)
1212     self.op.node_name = node.name
1213     self.node = node
1214
1215   def Exec(self, feedback_fn):
1216     """Removes the node from the cluster.
1217
1218     """
1219     node = self.node
1220     logger.Info("stopping the node daemon and removing configs from node %s" %
1221                 node.name)
1222
1223     rpc.call_node_leave_cluster(node.name)
1224
1225     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1226
1227     logger.Info("Removing node %s from config" % node.name)
1228
1229     self.cfg.RemoveNode(node.name)
1230
1231     _RemoveHostFromEtcHosts(node.name)
1232
1233
1234 class LUQueryNodes(NoHooksLU):
1235   """Logical unit for querying nodes.
1236
1237   """
1238   _OP_REQP = ["output_fields", "names"]
1239
1240   def CheckPrereq(self):
1241     """Check prerequisites.
1242
1243     This checks that the fields required are valid output fields.
1244
1245     """
1246     self.dynamic_fields = frozenset(["dtotal", "dfree",
1247                                      "mtotal", "mnode", "mfree",
1248                                      "bootid"])
1249
1250     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1251                                "pinst_list", "sinst_list",
1252                                "pip", "sip"],
1253                        dynamic=self.dynamic_fields,
1254                        selected=self.op.output_fields)
1255
1256     self.wanted = _GetWantedNodes(self, self.op.names)
1257
1258   def Exec(self, feedback_fn):
1259     """Computes the list of nodes and their attributes.
1260
1261     """
1262     nodenames = self.wanted
1263     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1264
1265     # begin data gathering
1266
1267     if self.dynamic_fields.intersection(self.op.output_fields):
1268       live_data = {}
1269       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1270       for name in nodenames:
1271         nodeinfo = node_data.get(name, None)
1272         if nodeinfo:
1273           live_data[name] = {
1274             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1275             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1276             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1277             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1278             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1279             "bootid": nodeinfo['bootid'],
1280             }
1281         else:
1282           live_data[name] = {}
1283     else:
1284       live_data = dict.fromkeys(nodenames, {})
1285
1286     node_to_primary = dict([(name, set()) for name in nodenames])
1287     node_to_secondary = dict([(name, set()) for name in nodenames])
1288
1289     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1290                              "sinst_cnt", "sinst_list"))
1291     if inst_fields & frozenset(self.op.output_fields):
1292       instancelist = self.cfg.GetInstanceList()
1293
1294       for instance_name in instancelist:
1295         inst = self.cfg.GetInstanceInfo(instance_name)
1296         if inst.primary_node in node_to_primary:
1297           node_to_primary[inst.primary_node].add(inst.name)
1298         for secnode in inst.secondary_nodes:
1299           if secnode in node_to_secondary:
1300             node_to_secondary[secnode].add(inst.name)
1301
1302     # end data gathering
1303
1304     output = []
1305     for node in nodelist:
1306       node_output = []
1307       for field in self.op.output_fields:
1308         if field == "name":
1309           val = node.name
1310         elif field == "pinst_list":
1311           val = list(node_to_primary[node.name])
1312         elif field == "sinst_list":
1313           val = list(node_to_secondary[node.name])
1314         elif field == "pinst_cnt":
1315           val = len(node_to_primary[node.name])
1316         elif field == "sinst_cnt":
1317           val = len(node_to_secondary[node.name])
1318         elif field == "pip":
1319           val = node.primary_ip
1320         elif field == "sip":
1321           val = node.secondary_ip
1322         elif field in self.dynamic_fields:
1323           val = live_data[node.name].get(field, None)
1324         else:
1325           raise errors.ParameterError(field)
1326         node_output.append(val)
1327       output.append(node_output)
1328
1329     return output
1330
1331
1332 class LUQueryNodeVolumes(NoHooksLU):
1333   """Logical unit for getting volumes on node(s).
1334
1335   """
1336   _OP_REQP = ["nodes", "output_fields"]
1337
1338   def CheckPrereq(self):
1339     """Check prerequisites.
1340
1341     This checks that the fields required are valid output fields.
1342
1343     """
1344     self.nodes = _GetWantedNodes(self, self.op.nodes)
1345
1346     _CheckOutputFields(static=["node"],
1347                        dynamic=["phys", "vg", "name", "size", "instance"],
1348                        selected=self.op.output_fields)
1349
1350
1351   def Exec(self, feedback_fn):
1352     """Computes the list of nodes and their attributes.
1353
1354     """
1355     nodenames = self.nodes
1356     volumes = rpc.call_node_volumes(nodenames)
1357
1358     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1359              in self.cfg.GetInstanceList()]
1360
1361     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1362
1363     output = []
1364     for node in nodenames:
1365       if node not in volumes or not volumes[node]:
1366         continue
1367
1368       node_vols = volumes[node][:]
1369       node_vols.sort(key=lambda vol: vol['dev'])
1370
1371       for vol in node_vols:
1372         node_output = []
1373         for field in self.op.output_fields:
1374           if field == "node":
1375             val = node
1376           elif field == "phys":
1377             val = vol['dev']
1378           elif field == "vg":
1379             val = vol['vg']
1380           elif field == "name":
1381             val = vol['name']
1382           elif field == "size":
1383             val = int(float(vol['size']))
1384           elif field == "instance":
1385             for inst in ilist:
1386               if node not in lv_by_node[inst]:
1387                 continue
1388               if vol['name'] in lv_by_node[inst][node]:
1389                 val = inst.name
1390                 break
1391             else:
1392               val = '-'
1393           else:
1394             raise errors.ParameterError(field)
1395           node_output.append(str(val))
1396
1397         output.append(node_output)
1398
1399     return output
1400
1401
1402 class LUAddNode(LogicalUnit):
1403   """Logical unit for adding node to the cluster.
1404
1405   """
1406   HPATH = "node-add"
1407   HTYPE = constants.HTYPE_NODE
1408   _OP_REQP = ["node_name"]
1409
1410   def BuildHooksEnv(self):
1411     """Build hooks env.
1412
1413     This will run on all nodes before, and on all nodes + the new node after.
1414
1415     """
1416     env = {
1417       "OP_TARGET": self.op.node_name,
1418       "NODE_NAME": self.op.node_name,
1419       "NODE_PIP": self.op.primary_ip,
1420       "NODE_SIP": self.op.secondary_ip,
1421       }
1422     nodes_0 = self.cfg.GetNodeList()
1423     nodes_1 = nodes_0 + [self.op.node_name, ]
1424     return env, nodes_0, nodes_1
1425
1426   def CheckPrereq(self):
1427     """Check prerequisites.
1428
1429     This checks:
1430      - the new node is not already in the config
1431      - it is resolvable
1432      - its parameters (single/dual homed) matches the cluster
1433
1434     Any errors are signalled by raising errors.OpPrereqError.
1435
1436     """
1437     node_name = self.op.node_name
1438     cfg = self.cfg
1439
1440     dns_data = utils.HostInfo(node_name)
1441
1442     node = dns_data.name
1443     primary_ip = self.op.primary_ip = dns_data.ip
1444     secondary_ip = getattr(self.op, "secondary_ip", None)
1445     if secondary_ip is None:
1446       secondary_ip = primary_ip
1447     if not utils.IsValidIP(secondary_ip):
1448       raise errors.OpPrereqError("Invalid secondary IP given")
1449     self.op.secondary_ip = secondary_ip
1450     node_list = cfg.GetNodeList()
1451     if node in node_list:
1452       raise errors.OpPrereqError("Node %s is already in the configuration"
1453                                  % node)
1454
1455     for existing_node_name in node_list:
1456       existing_node = cfg.GetNodeInfo(existing_node_name)
1457       if (existing_node.primary_ip == primary_ip or
1458           existing_node.secondary_ip == primary_ip or
1459           existing_node.primary_ip == secondary_ip or
1460           existing_node.secondary_ip == secondary_ip):
1461         raise errors.OpPrereqError("New node ip address(es) conflict with"
1462                                    " existing node %s" % existing_node.name)
1463
1464     # check that the type of the node (single versus dual homed) is the
1465     # same as for the master
1466     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1467     master_singlehomed = myself.secondary_ip == myself.primary_ip
1468     newbie_singlehomed = secondary_ip == primary_ip
1469     if master_singlehomed != newbie_singlehomed:
1470       if master_singlehomed:
1471         raise errors.OpPrereqError("The master has no private ip but the"
1472                                    " new node has one")
1473       else:
1474         raise errors.OpPrereqError("The master has a private ip but the"
1475                                    " new node doesn't have one")
1476
1477     # checks reachablity
1478     if not utils.TcpPing(utils.HostInfo().name,
1479                          primary_ip,
1480                          constants.DEFAULT_NODED_PORT):
1481       raise errors.OpPrereqError("Node not reachable by ping")
1482
1483     if not newbie_singlehomed:
1484       # check reachability from my secondary ip to newbie's secondary ip
1485       if not utils.TcpPing(myself.secondary_ip,
1486                            secondary_ip,
1487                            constants.DEFAULT_NODED_PORT):
1488         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1489                                    " based ping to noded port")
1490
1491     self.new_node = objects.Node(name=node,
1492                                  primary_ip=primary_ip,
1493                                  secondary_ip=secondary_ip)
1494
1495     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1496       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1497         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1498                                    constants.VNC_PASSWORD_FILE)
1499
1500   def Exec(self, feedback_fn):
1501     """Adds the new node to the cluster.
1502
1503     """
1504     new_node = self.new_node
1505     node = new_node.name
1506
1507     # set up inter-node password and certificate and restarts the node daemon
1508     gntpass = self.sstore.GetNodeDaemonPassword()
1509     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1510       raise errors.OpExecError("ganeti password corruption detected")
1511     f = open(constants.SSL_CERT_FILE)
1512     try:
1513       gntpem = f.read(8192)
1514     finally:
1515       f.close()
1516     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1517     # so we use this to detect an invalid certificate; as long as the
1518     # cert doesn't contain this, the here-document will be correctly
1519     # parsed by the shell sequence below
1520     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1521       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1522     if not gntpem.endswith("\n"):
1523       raise errors.OpExecError("PEM must end with newline")
1524     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1525
1526     # and then connect with ssh to set password and start ganeti-noded
1527     # note that all the below variables are sanitized at this point,
1528     # either by being constants or by the checks above
1529     ss = self.sstore
1530     mycommand = ("umask 077 && "
1531                  "echo '%s' > '%s' && "
1532                  "cat > '%s' << '!EOF.' && \n"
1533                  "%s!EOF.\n%s restart" %
1534                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1535                   constants.SSL_CERT_FILE, gntpem,
1536                   constants.NODE_INITD_SCRIPT))
1537
1538     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1539     if result.failed:
1540       raise errors.OpExecError("Remote command on node %s, error: %s,"
1541                                " output: %s" %
1542                                (node, result.fail_reason, result.output))
1543
1544     # check connectivity
1545     time.sleep(4)
1546
1547     result = rpc.call_version([node])[node]
1548     if result:
1549       if constants.PROTOCOL_VERSION == result:
1550         logger.Info("communication to node %s fine, sw version %s match" %
1551                     (node, result))
1552       else:
1553         raise errors.OpExecError("Version mismatch master version %s,"
1554                                  " node version %s" %
1555                                  (constants.PROTOCOL_VERSION, result))
1556     else:
1557       raise errors.OpExecError("Cannot get version from the new node")
1558
1559     # setup ssh on node
1560     logger.Info("copy ssh key to node %s" % node)
1561     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1562     keyarray = []
1563     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1564                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1565                 priv_key, pub_key]
1566
1567     for i in keyfiles:
1568       f = open(i, 'r')
1569       try:
1570         keyarray.append(f.read())
1571       finally:
1572         f.close()
1573
1574     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1575                                keyarray[3], keyarray[4], keyarray[5])
1576
1577     if not result:
1578       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1579
1580     # Add node to our /etc/hosts, and add key to known_hosts
1581     _AddHostToEtcHosts(new_node.name)
1582
1583     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1584                       self.cfg.GetHostKey())
1585
1586     if new_node.secondary_ip != new_node.primary_ip:
1587       if not rpc.call_node_tcp_ping(new_node.name,
1588                                     constants.LOCALHOST_IP_ADDRESS,
1589                                     new_node.secondary_ip,
1590                                     constants.DEFAULT_NODED_PORT,
1591                                     10, False):
1592         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1593                                  " you gave (%s). Please fix and re-run this"
1594                                  " command." % new_node.secondary_ip)
1595
1596     success, msg = ssh.VerifyNodeHostname(node)
1597     if not success:
1598       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1599                                " than the one the resolver gives: %s."
1600                                " Please fix and re-run this command." %
1601                                (node, msg))
1602
1603     # Distribute updated /etc/hosts and known_hosts to all nodes,
1604     # including the node just added
1605     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1606     dist_nodes = self.cfg.GetNodeList() + [node]
1607     if myself.name in dist_nodes:
1608       dist_nodes.remove(myself.name)
1609
1610     logger.Debug("Copying hosts and known_hosts to all nodes")
1611     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1612       result = rpc.call_upload_file(dist_nodes, fname)
1613       for to_node in dist_nodes:
1614         if not result[to_node]:
1615           logger.Error("copy of file %s to node %s failed" %
1616                        (fname, to_node))
1617
1618     to_copy = ss.GetFileList()
1619     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1620       to_copy.append(constants.VNC_PASSWORD_FILE)
1621     for fname in to_copy:
1622       if not ssh.CopyFileToNode(node, fname):
1623         logger.Error("could not copy file %s to node %s" % (fname, node))
1624
1625     logger.Info("adding node %s to cluster.conf" % node)
1626     self.cfg.AddNode(new_node)
1627
1628
1629 class LUMasterFailover(LogicalUnit):
1630   """Failover the master node to the current node.
1631
1632   This is a special LU in that it must run on a non-master node.
1633
1634   """
1635   HPATH = "master-failover"
1636   HTYPE = constants.HTYPE_CLUSTER
1637   REQ_MASTER = False
1638   _OP_REQP = []
1639
1640   def BuildHooksEnv(self):
1641     """Build hooks env.
1642
1643     This will run on the new master only in the pre phase, and on all
1644     the nodes in the post phase.
1645
1646     """
1647     env = {
1648       "OP_TARGET": self.new_master,
1649       "NEW_MASTER": self.new_master,
1650       "OLD_MASTER": self.old_master,
1651       }
1652     return env, [self.new_master], self.cfg.GetNodeList()
1653
1654   def CheckPrereq(self):
1655     """Check prerequisites.
1656
1657     This checks that we are not already the master.
1658
1659     """
1660     self.new_master = utils.HostInfo().name
1661     self.old_master = self.sstore.GetMasterNode()
1662
1663     if self.old_master == self.new_master:
1664       raise errors.OpPrereqError("This commands must be run on the node"
1665                                  " where you want the new master to be."
1666                                  " %s is already the master" %
1667                                  self.old_master)
1668
1669   def Exec(self, feedback_fn):
1670     """Failover the master node.
1671
1672     This command, when run on a non-master node, will cause the current
1673     master to cease being master, and the non-master to become new
1674     master.
1675
1676     """
1677     #TODO: do not rely on gethostname returning the FQDN
1678     logger.Info("setting master to %s, old master: %s" %
1679                 (self.new_master, self.old_master))
1680
1681     if not rpc.call_node_stop_master(self.old_master):
1682       logger.Error("could disable the master role on the old master"
1683                    " %s, please disable manually" % self.old_master)
1684
1685     ss = self.sstore
1686     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1687     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1688                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1689       logger.Error("could not distribute the new simple store master file"
1690                    " to the other nodes, please check.")
1691
1692     if not rpc.call_node_start_master(self.new_master):
1693       logger.Error("could not start the master role on the new master"
1694                    " %s, please check" % self.new_master)
1695       feedback_fn("Error in activating the master IP on the new master,"
1696                   " please fix manually.")
1697
1698
1699
1700 class LUQueryClusterInfo(NoHooksLU):
1701   """Query cluster configuration.
1702
1703   """
1704   _OP_REQP = []
1705   REQ_MASTER = False
1706
1707   def CheckPrereq(self):
1708     """No prerequsites needed for this LU.
1709
1710     """
1711     pass
1712
1713   def Exec(self, feedback_fn):
1714     """Return cluster config.
1715
1716     """
1717     result = {
1718       "name": self.sstore.GetClusterName(),
1719       "software_version": constants.RELEASE_VERSION,
1720       "protocol_version": constants.PROTOCOL_VERSION,
1721       "config_version": constants.CONFIG_VERSION,
1722       "os_api_version": constants.OS_API_VERSION,
1723       "export_version": constants.EXPORT_VERSION,
1724       "master": self.sstore.GetMasterNode(),
1725       "architecture": (platform.architecture()[0], platform.machine()),
1726       }
1727
1728     return result
1729
1730
1731 class LUClusterCopyFile(NoHooksLU):
1732   """Copy file to cluster.
1733
1734   """
1735   _OP_REQP = ["nodes", "filename"]
1736
1737   def CheckPrereq(self):
1738     """Check prerequisites.
1739
1740     It should check that the named file exists and that the given list
1741     of nodes is valid.
1742
1743     """
1744     if not os.path.exists(self.op.filename):
1745       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1746
1747     self.nodes = _GetWantedNodes(self, self.op.nodes)
1748
1749   def Exec(self, feedback_fn):
1750     """Copy a file from master to some nodes.
1751
1752     Args:
1753       opts - class with options as members
1754       args - list containing a single element, the file name
1755     Opts used:
1756       nodes - list containing the name of target nodes; if empty, all nodes
1757
1758     """
1759     filename = self.op.filename
1760
1761     myname = utils.HostInfo().name
1762
1763     for node in self.nodes:
1764       if node == myname:
1765         continue
1766       if not ssh.CopyFileToNode(node, filename):
1767         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1768
1769
1770 class LUDumpClusterConfig(NoHooksLU):
1771   """Return a text-representation of the cluster-config.
1772
1773   """
1774   _OP_REQP = []
1775
1776   def CheckPrereq(self):
1777     """No prerequisites.
1778
1779     """
1780     pass
1781
1782   def Exec(self, feedback_fn):
1783     """Dump a representation of the cluster config to the standard output.
1784
1785     """
1786     return self.cfg.DumpConfig()
1787
1788
1789 class LURunClusterCommand(NoHooksLU):
1790   """Run a command on some nodes.
1791
1792   """
1793   _OP_REQP = ["command", "nodes"]
1794
1795   def CheckPrereq(self):
1796     """Check prerequisites.
1797
1798     It checks that the given list of nodes is valid.
1799
1800     """
1801     self.nodes = _GetWantedNodes(self, self.op.nodes)
1802
1803   def Exec(self, feedback_fn):
1804     """Run a command on some nodes.
1805
1806     """
1807     data = []
1808     for node in self.nodes:
1809       result = ssh.SSHCall(node, "root", self.op.command)
1810       data.append((node, result.output, result.exit_code))
1811
1812     return data
1813
1814
1815 class LUActivateInstanceDisks(NoHooksLU):
1816   """Bring up an instance's disks.
1817
1818   """
1819   _OP_REQP = ["instance_name"]
1820
1821   def CheckPrereq(self):
1822     """Check prerequisites.
1823
1824     This checks that the instance is in the cluster.
1825
1826     """
1827     instance = self.cfg.GetInstanceInfo(
1828       self.cfg.ExpandInstanceName(self.op.instance_name))
1829     if instance is None:
1830       raise errors.OpPrereqError("Instance '%s' not known" %
1831                                  self.op.instance_name)
1832     self.instance = instance
1833
1834
1835   def Exec(self, feedback_fn):
1836     """Activate the disks.
1837
1838     """
1839     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1840     if not disks_ok:
1841       raise errors.OpExecError("Cannot activate block devices")
1842
1843     return disks_info
1844
1845
1846 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1847   """Prepare the block devices for an instance.
1848
1849   This sets up the block devices on all nodes.
1850
1851   Args:
1852     instance: a ganeti.objects.Instance object
1853     ignore_secondaries: if true, errors on secondary nodes won't result
1854                         in an error return from the function
1855
1856   Returns:
1857     false if the operation failed
1858     list of (host, instance_visible_name, node_visible_name) if the operation
1859          suceeded with the mapping from node devices to instance devices
1860   """
1861   device_info = []
1862   disks_ok = True
1863   iname = instance.name
1864   # With the two passes mechanism we try to reduce the window of
1865   # opportunity for the race condition of switching DRBD to primary
1866   # before handshaking occured, but we do not eliminate it
1867
1868   # The proper fix would be to wait (with some limits) until the
1869   # connection has been made and drbd transitions from WFConnection
1870   # into any other network-connected state (Connected, SyncTarget,
1871   # SyncSource, etc.)
1872
1873   # 1st pass, assemble on all nodes in secondary mode
1874   for inst_disk in instance.disks:
1875     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1876       cfg.SetDiskID(node_disk, node)
1877       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1878       if not result:
1879         logger.Error("could not prepare block device %s on node %s"
1880                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1881         if not ignore_secondaries:
1882           disks_ok = False
1883
1884   # FIXME: race condition on drbd migration to primary
1885
1886   # 2nd pass, do only the primary node
1887   for inst_disk in instance.disks:
1888     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1889       if node != instance.primary_node:
1890         continue
1891       cfg.SetDiskID(node_disk, node)
1892       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1893       if not result:
1894         logger.Error("could not prepare block device %s on node %s"
1895                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1896         disks_ok = False
1897     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1898
1899   # leave the disks configured for the primary node
1900   # this is a workaround that would be fixed better by
1901   # improving the logical/physical id handling
1902   for disk in instance.disks:
1903     cfg.SetDiskID(disk, instance.primary_node)
1904
1905   return disks_ok, device_info
1906
1907
1908 def _StartInstanceDisks(cfg, instance, force):
1909   """Start the disks of an instance.
1910
1911   """
1912   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1913                                            ignore_secondaries=force)
1914   if not disks_ok:
1915     _ShutdownInstanceDisks(instance, cfg)
1916     if force is not None and not force:
1917       logger.Error("If the message above refers to a secondary node,"
1918                    " you can retry the operation using '--force'.")
1919     raise errors.OpExecError("Disk consistency error")
1920
1921
1922 class LUDeactivateInstanceDisks(NoHooksLU):
1923   """Shutdown an instance's disks.
1924
1925   """
1926   _OP_REQP = ["instance_name"]
1927
1928   def CheckPrereq(self):
1929     """Check prerequisites.
1930
1931     This checks that the instance is in the cluster.
1932
1933     """
1934     instance = self.cfg.GetInstanceInfo(
1935       self.cfg.ExpandInstanceName(self.op.instance_name))
1936     if instance is None:
1937       raise errors.OpPrereqError("Instance '%s' not known" %
1938                                  self.op.instance_name)
1939     self.instance = instance
1940
1941   def Exec(self, feedback_fn):
1942     """Deactivate the disks
1943
1944     """
1945     instance = self.instance
1946     ins_l = rpc.call_instance_list([instance.primary_node])
1947     ins_l = ins_l[instance.primary_node]
1948     if not type(ins_l) is list:
1949       raise errors.OpExecError("Can't contact node '%s'" %
1950                                instance.primary_node)
1951
1952     if self.instance.name in ins_l:
1953       raise errors.OpExecError("Instance is running, can't shutdown"
1954                                " block devices.")
1955
1956     _ShutdownInstanceDisks(instance, self.cfg)
1957
1958
1959 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1960   """Shutdown block devices of an instance.
1961
1962   This does the shutdown on all nodes of the instance.
1963
1964   If the ignore_primary is false, errors on the primary node are
1965   ignored.
1966
1967   """
1968   result = True
1969   for disk in instance.disks:
1970     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1971       cfg.SetDiskID(top_disk, node)
1972       if not rpc.call_blockdev_shutdown(node, top_disk):
1973         logger.Error("could not shutdown block device %s on node %s" %
1974                      (disk.iv_name, node))
1975         if not ignore_primary or node != instance.primary_node:
1976           result = False
1977   return result
1978
1979
1980 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1981   """Checks if a node has enough free memory.
1982
1983   This function check if a given node has the needed amount of free
1984   memory. In case the node has less memory or we cannot get the
1985   information from the node, this function raise an OpPrereqError
1986   exception.
1987
1988   Args:
1989     - cfg: a ConfigWriter instance
1990     - node: the node name
1991     - reason: string to use in the error message
1992     - requested: the amount of memory in MiB
1993
1994   """
1995   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1996   if not nodeinfo or not isinstance(nodeinfo, dict):
1997     raise errors.OpPrereqError("Could not contact node %s for resource"
1998                              " information" % (node,))
1999
2000   free_mem = nodeinfo[node].get('memory_free')
2001   if not isinstance(free_mem, int):
2002     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2003                              " was '%s'" % (node, free_mem))
2004   if requested > free_mem:
2005     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2006                              " needed %s MiB, available %s MiB" %
2007                              (node, reason, requested, free_mem))
2008
2009
2010 class LUStartupInstance(LogicalUnit):
2011   """Starts an instance.
2012
2013   """
2014   HPATH = "instance-start"
2015   HTYPE = constants.HTYPE_INSTANCE
2016   _OP_REQP = ["instance_name", "force"]
2017
2018   def BuildHooksEnv(self):
2019     """Build hooks env.
2020
2021     This runs on master, primary and secondary nodes of the instance.
2022
2023     """
2024     env = {
2025       "FORCE": self.op.force,
2026       }
2027     env.update(_BuildInstanceHookEnvByObject(self.instance))
2028     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2029           list(self.instance.secondary_nodes))
2030     return env, nl, nl
2031
2032   def CheckPrereq(self):
2033     """Check prerequisites.
2034
2035     This checks that the instance is in the cluster.
2036
2037     """
2038     instance = self.cfg.GetInstanceInfo(
2039       self.cfg.ExpandInstanceName(self.op.instance_name))
2040     if instance is None:
2041       raise errors.OpPrereqError("Instance '%s' not known" %
2042                                  self.op.instance_name)
2043
2044     # check bridges existance
2045     _CheckInstanceBridgesExist(instance)
2046
2047     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2048                          "starting instance %s" % instance.name,
2049                          instance.memory)
2050
2051     self.instance = instance
2052     self.op.instance_name = instance.name
2053
2054   def Exec(self, feedback_fn):
2055     """Start the instance.
2056
2057     """
2058     instance = self.instance
2059     force = self.op.force
2060     extra_args = getattr(self.op, "extra_args", "")
2061
2062     node_current = instance.primary_node
2063
2064     _StartInstanceDisks(self.cfg, instance, force)
2065
2066     if not rpc.call_instance_start(node_current, instance, extra_args):
2067       _ShutdownInstanceDisks(instance, self.cfg)
2068       raise errors.OpExecError("Could not start instance")
2069
2070     self.cfg.MarkInstanceUp(instance.name)
2071
2072
2073 class LURebootInstance(LogicalUnit):
2074   """Reboot an instance.
2075
2076   """
2077   HPATH = "instance-reboot"
2078   HTYPE = constants.HTYPE_INSTANCE
2079   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2080
2081   def BuildHooksEnv(self):
2082     """Build hooks env.
2083
2084     This runs on master, primary and secondary nodes of the instance.
2085
2086     """
2087     env = {
2088       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2089       }
2090     env.update(_BuildInstanceHookEnvByObject(self.instance))
2091     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2092           list(self.instance.secondary_nodes))
2093     return env, nl, nl
2094
2095   def CheckPrereq(self):
2096     """Check prerequisites.
2097
2098     This checks that the instance is in the cluster.
2099
2100     """
2101     instance = self.cfg.GetInstanceInfo(
2102       self.cfg.ExpandInstanceName(self.op.instance_name))
2103     if instance is None:
2104       raise errors.OpPrereqError("Instance '%s' not known" %
2105                                  self.op.instance_name)
2106
2107     # check bridges existance
2108     _CheckInstanceBridgesExist(instance)
2109
2110     self.instance = instance
2111     self.op.instance_name = instance.name
2112
2113   def Exec(self, feedback_fn):
2114     """Reboot the instance.
2115
2116     """
2117     instance = self.instance
2118     ignore_secondaries = self.op.ignore_secondaries
2119     reboot_type = self.op.reboot_type
2120     extra_args = getattr(self.op, "extra_args", "")
2121
2122     node_current = instance.primary_node
2123
2124     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2125                            constants.INSTANCE_REBOOT_HARD,
2126                            constants.INSTANCE_REBOOT_FULL]:
2127       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2128                                   (constants.INSTANCE_REBOOT_SOFT,
2129                                    constants.INSTANCE_REBOOT_HARD,
2130                                    constants.INSTANCE_REBOOT_FULL))
2131
2132     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2133                        constants.INSTANCE_REBOOT_HARD]:
2134       if not rpc.call_instance_reboot(node_current, instance,
2135                                       reboot_type, extra_args):
2136         raise errors.OpExecError("Could not reboot instance")
2137     else:
2138       if not rpc.call_instance_shutdown(node_current, instance):
2139         raise errors.OpExecError("could not shutdown instance for full reboot")
2140       _ShutdownInstanceDisks(instance, self.cfg)
2141       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2142       if not rpc.call_instance_start(node_current, instance, extra_args):
2143         _ShutdownInstanceDisks(instance, self.cfg)
2144         raise errors.OpExecError("Could not start instance for full reboot")
2145
2146     self.cfg.MarkInstanceUp(instance.name)
2147
2148
2149 class LUShutdownInstance(LogicalUnit):
2150   """Shutdown an instance.
2151
2152   """
2153   HPATH = "instance-stop"
2154   HTYPE = constants.HTYPE_INSTANCE
2155   _OP_REQP = ["instance_name"]
2156
2157   def BuildHooksEnv(self):
2158     """Build hooks env.
2159
2160     This runs on master, primary and secondary nodes of the instance.
2161
2162     """
2163     env = _BuildInstanceHookEnvByObject(self.instance)
2164     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2165           list(self.instance.secondary_nodes))
2166     return env, nl, nl
2167
2168   def CheckPrereq(self):
2169     """Check prerequisites.
2170
2171     This checks that the instance is in the cluster.
2172
2173     """
2174     instance = self.cfg.GetInstanceInfo(
2175       self.cfg.ExpandInstanceName(self.op.instance_name))
2176     if instance is None:
2177       raise errors.OpPrereqError("Instance '%s' not known" %
2178                                  self.op.instance_name)
2179     self.instance = instance
2180
2181   def Exec(self, feedback_fn):
2182     """Shutdown the instance.
2183
2184     """
2185     instance = self.instance
2186     node_current = instance.primary_node
2187     if not rpc.call_instance_shutdown(node_current, instance):
2188       logger.Error("could not shutdown instance")
2189
2190     self.cfg.MarkInstanceDown(instance.name)
2191     _ShutdownInstanceDisks(instance, self.cfg)
2192
2193
2194 class LUReinstallInstance(LogicalUnit):
2195   """Reinstall an instance.
2196
2197   """
2198   HPATH = "instance-reinstall"
2199   HTYPE = constants.HTYPE_INSTANCE
2200   _OP_REQP = ["instance_name"]
2201
2202   def BuildHooksEnv(self):
2203     """Build hooks env.
2204
2205     This runs on master, primary and secondary nodes of the instance.
2206
2207     """
2208     env = _BuildInstanceHookEnvByObject(self.instance)
2209     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2210           list(self.instance.secondary_nodes))
2211     return env, nl, nl
2212
2213   def CheckPrereq(self):
2214     """Check prerequisites.
2215
2216     This checks that the instance is in the cluster and is not running.
2217
2218     """
2219     instance = self.cfg.GetInstanceInfo(
2220       self.cfg.ExpandInstanceName(self.op.instance_name))
2221     if instance is None:
2222       raise errors.OpPrereqError("Instance '%s' not known" %
2223                                  self.op.instance_name)
2224     if instance.disk_template == constants.DT_DISKLESS:
2225       raise errors.OpPrereqError("Instance '%s' has no disks" %
2226                                  self.op.instance_name)
2227     if instance.status != "down":
2228       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2229                                  self.op.instance_name)
2230     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2231     if remote_info:
2232       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2233                                  (self.op.instance_name,
2234                                   instance.primary_node))
2235
2236     self.op.os_type = getattr(self.op, "os_type", None)
2237     if self.op.os_type is not None:
2238       # OS verification
2239       pnode = self.cfg.GetNodeInfo(
2240         self.cfg.ExpandNodeName(instance.primary_node))
2241       if pnode is None:
2242         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2243                                    self.op.pnode)
2244       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2245       if not os_obj:
2246         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2247                                    " primary node"  % self.op.os_type)
2248
2249     self.instance = instance
2250
2251   def Exec(self, feedback_fn):
2252     """Reinstall the instance.
2253
2254     """
2255     inst = self.instance
2256
2257     if self.op.os_type is not None:
2258       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2259       inst.os = self.op.os_type
2260       self.cfg.AddInstance(inst)
2261
2262     _StartInstanceDisks(self.cfg, inst, None)
2263     try:
2264       feedback_fn("Running the instance OS create scripts...")
2265       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2266         raise errors.OpExecError("Could not install OS for instance %s"
2267                                  " on node %s" %
2268                                  (inst.name, inst.primary_node))
2269     finally:
2270       _ShutdownInstanceDisks(inst, self.cfg)
2271
2272
2273 class LURenameInstance(LogicalUnit):
2274   """Rename an instance.
2275
2276   """
2277   HPATH = "instance-rename"
2278   HTYPE = constants.HTYPE_INSTANCE
2279   _OP_REQP = ["instance_name", "new_name"]
2280
2281   def BuildHooksEnv(self):
2282     """Build hooks env.
2283
2284     This runs on master, primary and secondary nodes of the instance.
2285
2286     """
2287     env = _BuildInstanceHookEnvByObject(self.instance)
2288     env["INSTANCE_NEW_NAME"] = self.op.new_name
2289     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2290           list(self.instance.secondary_nodes))
2291     return env, nl, nl
2292
2293   def CheckPrereq(self):
2294     """Check prerequisites.
2295
2296     This checks that the instance is in the cluster and is not running.
2297
2298     """
2299     instance = self.cfg.GetInstanceInfo(
2300       self.cfg.ExpandInstanceName(self.op.instance_name))
2301     if instance is None:
2302       raise errors.OpPrereqError("Instance '%s' not known" %
2303                                  self.op.instance_name)
2304     if instance.status != "down":
2305       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2306                                  self.op.instance_name)
2307     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2308     if remote_info:
2309       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2310                                  (self.op.instance_name,
2311                                   instance.primary_node))
2312     self.instance = instance
2313
2314     # new name verification
2315     name_info = utils.HostInfo(self.op.new_name)
2316
2317     self.op.new_name = new_name = name_info.name
2318     if not getattr(self.op, "ignore_ip", False):
2319       command = ["fping", "-q", name_info.ip]
2320       result = utils.RunCmd(command)
2321       if not result.failed:
2322         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2323                                    (name_info.ip, new_name))
2324
2325
2326   def Exec(self, feedback_fn):
2327     """Reinstall the instance.
2328
2329     """
2330     inst = self.instance
2331     old_name = inst.name
2332
2333     self.cfg.RenameInstance(inst.name, self.op.new_name)
2334
2335     # re-read the instance from the configuration after rename
2336     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2337
2338     _StartInstanceDisks(self.cfg, inst, None)
2339     try:
2340       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2341                                           "sda", "sdb"):
2342         msg = ("Could run OS rename script for instance %s on node %s (but the"
2343                " instance has been renamed in Ganeti)" %
2344                (inst.name, inst.primary_node))
2345         logger.Error(msg)
2346     finally:
2347       _ShutdownInstanceDisks(inst, self.cfg)
2348
2349
2350 class LURemoveInstance(LogicalUnit):
2351   """Remove an instance.
2352
2353   """
2354   HPATH = "instance-remove"
2355   HTYPE = constants.HTYPE_INSTANCE
2356   _OP_REQP = ["instance_name"]
2357
2358   def BuildHooksEnv(self):
2359     """Build hooks env.
2360
2361     This runs on master, primary and secondary nodes of the instance.
2362
2363     """
2364     env = _BuildInstanceHookEnvByObject(self.instance)
2365     nl = [self.sstore.GetMasterNode()]
2366     return env, nl, nl
2367
2368   def CheckPrereq(self):
2369     """Check prerequisites.
2370
2371     This checks that the instance is in the cluster.
2372
2373     """
2374     instance = self.cfg.GetInstanceInfo(
2375       self.cfg.ExpandInstanceName(self.op.instance_name))
2376     if instance is None:
2377       raise errors.OpPrereqError("Instance '%s' not known" %
2378                                  self.op.instance_name)
2379     self.instance = instance
2380
2381   def Exec(self, feedback_fn):
2382     """Remove the instance.
2383
2384     """
2385     instance = self.instance
2386     logger.Info("shutting down instance %s on node %s" %
2387                 (instance.name, instance.primary_node))
2388
2389     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2390       if self.op.ignore_failures:
2391         feedback_fn("Warning: can't shutdown instance")
2392       else:
2393         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2394                                  (instance.name, instance.primary_node))
2395
2396     logger.Info("removing block devices for instance %s" % instance.name)
2397
2398     if not _RemoveDisks(instance, self.cfg):
2399       if self.op.ignore_failures:
2400         feedback_fn("Warning: can't remove instance's disks")
2401       else:
2402         raise errors.OpExecError("Can't remove instance's disks")
2403
2404     logger.Info("removing instance %s out of cluster config" % instance.name)
2405
2406     self.cfg.RemoveInstance(instance.name)
2407
2408
2409 class LUQueryInstances(NoHooksLU):
2410   """Logical unit for querying instances.
2411
2412   """
2413   _OP_REQP = ["output_fields", "names"]
2414
2415   def CheckPrereq(self):
2416     """Check prerequisites.
2417
2418     This checks that the fields required are valid output fields.
2419
2420     """
2421     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2422     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2423                                "admin_state", "admin_ram",
2424                                "disk_template", "ip", "mac", "bridge",
2425                                "sda_size", "sdb_size", "vcpus"],
2426                        dynamic=self.dynamic_fields,
2427                        selected=self.op.output_fields)
2428
2429     self.wanted = _GetWantedInstances(self, self.op.names)
2430
2431   def Exec(self, feedback_fn):
2432     """Computes the list of nodes and their attributes.
2433
2434     """
2435     instance_names = self.wanted
2436     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2437                      in instance_names]
2438
2439     # begin data gathering
2440
2441     nodes = frozenset([inst.primary_node for inst in instance_list])
2442
2443     bad_nodes = []
2444     if self.dynamic_fields.intersection(self.op.output_fields):
2445       live_data = {}
2446       node_data = rpc.call_all_instances_info(nodes)
2447       for name in nodes:
2448         result = node_data[name]
2449         if result:
2450           live_data.update(result)
2451         elif result == False:
2452           bad_nodes.append(name)
2453         # else no instance is alive
2454     else:
2455       live_data = dict([(name, {}) for name in instance_names])
2456
2457     # end data gathering
2458
2459     output = []
2460     for instance in instance_list:
2461       iout = []
2462       for field in self.op.output_fields:
2463         if field == "name":
2464           val = instance.name
2465         elif field == "os":
2466           val = instance.os
2467         elif field == "pnode":
2468           val = instance.primary_node
2469         elif field == "snodes":
2470           val = list(instance.secondary_nodes)
2471         elif field == "admin_state":
2472           val = (instance.status != "down")
2473         elif field == "oper_state":
2474           if instance.primary_node in bad_nodes:
2475             val = None
2476           else:
2477             val = bool(live_data.get(instance.name))
2478         elif field == "admin_ram":
2479           val = instance.memory
2480         elif field == "oper_ram":
2481           if instance.primary_node in bad_nodes:
2482             val = None
2483           elif instance.name in live_data:
2484             val = live_data[instance.name].get("memory", "?")
2485           else:
2486             val = "-"
2487         elif field == "disk_template":
2488           val = instance.disk_template
2489         elif field == "ip":
2490           val = instance.nics[0].ip
2491         elif field == "bridge":
2492           val = instance.nics[0].bridge
2493         elif field == "mac":
2494           val = instance.nics[0].mac
2495         elif field == "sda_size" or field == "sdb_size":
2496           disk = instance.FindDisk(field[:3])
2497           if disk is None:
2498             val = None
2499           else:
2500             val = disk.size
2501         elif field == "vcpus":
2502           val = instance.vcpus
2503         else:
2504           raise errors.ParameterError(field)
2505         iout.append(val)
2506       output.append(iout)
2507
2508     return output
2509
2510
2511 class LUFailoverInstance(LogicalUnit):
2512   """Failover an instance.
2513
2514   """
2515   HPATH = "instance-failover"
2516   HTYPE = constants.HTYPE_INSTANCE
2517   _OP_REQP = ["instance_name", "ignore_consistency"]
2518
2519   def BuildHooksEnv(self):
2520     """Build hooks env.
2521
2522     This runs on master, primary and secondary nodes of the instance.
2523
2524     """
2525     env = {
2526       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2527       }
2528     env.update(_BuildInstanceHookEnvByObject(self.instance))
2529     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2530     return env, nl, nl
2531
2532   def CheckPrereq(self):
2533     """Check prerequisites.
2534
2535     This checks that the instance is in the cluster.
2536
2537     """
2538     instance = self.cfg.GetInstanceInfo(
2539       self.cfg.ExpandInstanceName(self.op.instance_name))
2540     if instance is None:
2541       raise errors.OpPrereqError("Instance '%s' not known" %
2542                                  self.op.instance_name)
2543
2544     if instance.disk_template not in constants.DTS_NET_MIRROR:
2545       raise errors.OpPrereqError("Instance's disk layout is not"
2546                                  " network mirrored, cannot failover.")
2547
2548     secondary_nodes = instance.secondary_nodes
2549     if not secondary_nodes:
2550       raise errors.ProgrammerError("no secondary node but using "
2551                                    "DT_REMOTE_RAID1 template")
2552
2553     target_node = secondary_nodes[0]
2554     # check memory requirements on the secondary node
2555     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2556                          instance.name, instance.memory)
2557
2558     # check bridge existance
2559     brlist = [nic.bridge for nic in instance.nics]
2560     if not rpc.call_bridges_exist(target_node, brlist):
2561       raise errors.OpPrereqError("One or more target bridges %s does not"
2562                                  " exist on destination node '%s'" %
2563                                  (brlist, target_node))
2564
2565     self.instance = instance
2566
2567   def Exec(self, feedback_fn):
2568     """Failover an instance.
2569
2570     The failover is done by shutting it down on its present node and
2571     starting it on the secondary.
2572
2573     """
2574     instance = self.instance
2575
2576     source_node = instance.primary_node
2577     target_node = instance.secondary_nodes[0]
2578
2579     feedback_fn("* checking disk consistency between source and target")
2580     for dev in instance.disks:
2581       # for remote_raid1, these are md over drbd
2582       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2583         if not self.op.ignore_consistency:
2584           raise errors.OpExecError("Disk %s is degraded on target node,"
2585                                    " aborting failover." % dev.iv_name)
2586
2587     feedback_fn("* shutting down instance on source node")
2588     logger.Info("Shutting down instance %s on node %s" %
2589                 (instance.name, source_node))
2590
2591     if not rpc.call_instance_shutdown(source_node, instance):
2592       if self.op.ignore_consistency:
2593         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2594                      " anyway. Please make sure node %s is down"  %
2595                      (instance.name, source_node, source_node))
2596       else:
2597         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2598                                  (instance.name, source_node))
2599
2600     feedback_fn("* deactivating the instance's disks on source node")
2601     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2602       raise errors.OpExecError("Can't shut down the instance's disks.")
2603
2604     instance.primary_node = target_node
2605     # distribute new instance config to the other nodes
2606     self.cfg.AddInstance(instance)
2607
2608     feedback_fn("* activating the instance's disks on target node")
2609     logger.Info("Starting instance %s on node %s" %
2610                 (instance.name, target_node))
2611
2612     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2613                                              ignore_secondaries=True)
2614     if not disks_ok:
2615       _ShutdownInstanceDisks(instance, self.cfg)
2616       raise errors.OpExecError("Can't activate the instance's disks")
2617
2618     feedback_fn("* starting the instance on the target node")
2619     if not rpc.call_instance_start(target_node, instance, None):
2620       _ShutdownInstanceDisks(instance, self.cfg)
2621       raise errors.OpExecError("Could not start instance %s on node %s." %
2622                                (instance.name, target_node))
2623
2624
2625 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2626   """Create a tree of block devices on the primary node.
2627
2628   This always creates all devices.
2629
2630   """
2631   if device.children:
2632     for child in device.children:
2633       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2634         return False
2635
2636   cfg.SetDiskID(device, node)
2637   new_id = rpc.call_blockdev_create(node, device, device.size,
2638                                     instance.name, True, info)
2639   if not new_id:
2640     return False
2641   if device.physical_id is None:
2642     device.physical_id = new_id
2643   return True
2644
2645
2646 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2647   """Create a tree of block devices on a secondary node.
2648
2649   If this device type has to be created on secondaries, create it and
2650   all its children.
2651
2652   If not, just recurse to children keeping the same 'force' value.
2653
2654   """
2655   if device.CreateOnSecondary():
2656     force = True
2657   if device.children:
2658     for child in device.children:
2659       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2660                                         child, force, info):
2661         return False
2662
2663   if not force:
2664     return True
2665   cfg.SetDiskID(device, node)
2666   new_id = rpc.call_blockdev_create(node, device, device.size,
2667                                     instance.name, False, info)
2668   if not new_id:
2669     return False
2670   if device.physical_id is None:
2671     device.physical_id = new_id
2672   return True
2673
2674
2675 def _GenerateUniqueNames(cfg, exts):
2676   """Generate a suitable LV name.
2677
2678   This will generate a logical volume name for the given instance.
2679
2680   """
2681   results = []
2682   for val in exts:
2683     new_id = cfg.GenerateUniqueID()
2684     results.append("%s%s" % (new_id, val))
2685   return results
2686
2687
2688 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2689   """Generate a drbd device complete with its children.
2690
2691   """
2692   port = cfg.AllocatePort()
2693   vgname = cfg.GetVGName()
2694   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2695                           logical_id=(vgname, names[0]))
2696   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2697                           logical_id=(vgname, names[1]))
2698   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2699                           logical_id = (primary, secondary, port),
2700                           children = [dev_data, dev_meta])
2701   return drbd_dev
2702
2703
2704 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2705   """Generate a drbd8 device complete with its children.
2706
2707   """
2708   port = cfg.AllocatePort()
2709   vgname = cfg.GetVGName()
2710   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2711                           logical_id=(vgname, names[0]))
2712   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2713                           logical_id=(vgname, names[1]))
2714   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2715                           logical_id = (primary, secondary, port),
2716                           children = [dev_data, dev_meta],
2717                           iv_name=iv_name)
2718   return drbd_dev
2719
2720 def _GenerateDiskTemplate(cfg, template_name,
2721                           instance_name, primary_node,
2722                           secondary_nodes, disk_sz, swap_sz):
2723   """Generate the entire disk layout for a given template type.
2724
2725   """
2726   #TODO: compute space requirements
2727
2728   vgname = cfg.GetVGName()
2729   if template_name == "diskless":
2730     disks = []
2731   elif template_name == "plain":
2732     if len(secondary_nodes) != 0:
2733       raise errors.ProgrammerError("Wrong template configuration")
2734
2735     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2736     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2737                            logical_id=(vgname, names[0]),
2738                            iv_name = "sda")
2739     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2740                            logical_id=(vgname, names[1]),
2741                            iv_name = "sdb")
2742     disks = [sda_dev, sdb_dev]
2743   elif template_name == "local_raid1":
2744     if len(secondary_nodes) != 0:
2745       raise errors.ProgrammerError("Wrong template configuration")
2746
2747
2748     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2749                                        ".sdb_m1", ".sdb_m2"])
2750     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2751                               logical_id=(vgname, names[0]))
2752     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2753                               logical_id=(vgname, names[1]))
2754     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2755                               size=disk_sz,
2756                               children = [sda_dev_m1, sda_dev_m2])
2757     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2758                               logical_id=(vgname, names[2]))
2759     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2760                               logical_id=(vgname, names[3]))
2761     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2762                               size=swap_sz,
2763                               children = [sdb_dev_m1, sdb_dev_m2])
2764     disks = [md_sda_dev, md_sdb_dev]
2765   elif template_name == constants.DT_REMOTE_RAID1:
2766     if len(secondary_nodes) != 1:
2767       raise errors.ProgrammerError("Wrong template configuration")
2768     remote_node = secondary_nodes[0]
2769     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2770                                        ".sdb_data", ".sdb_meta"])
2771     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2772                                          disk_sz, names[0:2])
2773     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2774                               children = [drbd_sda_dev], size=disk_sz)
2775     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2776                                          swap_sz, names[2:4])
2777     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2778                               children = [drbd_sdb_dev], size=swap_sz)
2779     disks = [md_sda_dev, md_sdb_dev]
2780   elif template_name == constants.DT_DRBD8:
2781     if len(secondary_nodes) != 1:
2782       raise errors.ProgrammerError("Wrong template configuration")
2783     remote_node = secondary_nodes[0]
2784     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2785                                        ".sdb_data", ".sdb_meta"])
2786     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2787                                          disk_sz, names[0:2], "sda")
2788     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2789                                          swap_sz, names[2:4], "sdb")
2790     disks = [drbd_sda_dev, drbd_sdb_dev]
2791   else:
2792     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2793   return disks
2794
2795
2796 def _GetInstanceInfoText(instance):
2797   """Compute that text that should be added to the disk's metadata.
2798
2799   """
2800   return "originstname+%s" % instance.name
2801
2802
2803 def _CreateDisks(cfg, instance):
2804   """Create all disks for an instance.
2805
2806   This abstracts away some work from AddInstance.
2807
2808   Args:
2809     instance: the instance object
2810
2811   Returns:
2812     True or False showing the success of the creation process
2813
2814   """
2815   info = _GetInstanceInfoText(instance)
2816
2817   for device in instance.disks:
2818     logger.Info("creating volume %s for instance %s" %
2819               (device.iv_name, instance.name))
2820     #HARDCODE
2821     for secondary_node in instance.secondary_nodes:
2822       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2823                                         device, False, info):
2824         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2825                      (device.iv_name, device, secondary_node))
2826         return False
2827     #HARDCODE
2828     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2829                                     instance, device, info):
2830       logger.Error("failed to create volume %s on primary!" %
2831                    device.iv_name)
2832       return False
2833   return True
2834
2835
2836 def _RemoveDisks(instance, cfg):
2837   """Remove all disks for an instance.
2838
2839   This abstracts away some work from `AddInstance()` and
2840   `RemoveInstance()`. Note that in case some of the devices couldn't
2841   be removed, the removal will continue with the other ones (compare
2842   with `_CreateDisks()`).
2843
2844   Args:
2845     instance: the instance object
2846
2847   Returns:
2848     True or False showing the success of the removal proces
2849
2850   """
2851   logger.Info("removing block devices for instance %s" % instance.name)
2852
2853   result = True
2854   for device in instance.disks:
2855     for node, disk in device.ComputeNodeTree(instance.primary_node):
2856       cfg.SetDiskID(disk, node)
2857       if not rpc.call_blockdev_remove(node, disk):
2858         logger.Error("could not remove block device %s on node %s,"
2859                      " continuing anyway" %
2860                      (device.iv_name, node))
2861         result = False
2862   return result
2863
2864
2865 class LUCreateInstance(LogicalUnit):
2866   """Create an instance.
2867
2868   """
2869   HPATH = "instance-add"
2870   HTYPE = constants.HTYPE_INSTANCE
2871   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2872               "disk_template", "swap_size", "mode", "start", "vcpus",
2873               "wait_for_sync", "ip_check", "mac"]
2874
2875   def BuildHooksEnv(self):
2876     """Build hooks env.
2877
2878     This runs on master, primary and secondary nodes of the instance.
2879
2880     """
2881     env = {
2882       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2883       "INSTANCE_DISK_SIZE": self.op.disk_size,
2884       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2885       "INSTANCE_ADD_MODE": self.op.mode,
2886       }
2887     if self.op.mode == constants.INSTANCE_IMPORT:
2888       env["INSTANCE_SRC_NODE"] = self.op.src_node
2889       env["INSTANCE_SRC_PATH"] = self.op.src_path
2890       env["INSTANCE_SRC_IMAGE"] = self.src_image
2891
2892     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2893       primary_node=self.op.pnode,
2894       secondary_nodes=self.secondaries,
2895       status=self.instance_status,
2896       os_type=self.op.os_type,
2897       memory=self.op.mem_size,
2898       vcpus=self.op.vcpus,
2899       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2900     ))
2901
2902     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2903           self.secondaries)
2904     return env, nl, nl
2905
2906
2907   def CheckPrereq(self):
2908     """Check prerequisites.
2909
2910     """
2911     for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2912       if not hasattr(self.op, attr):
2913         setattr(self.op, attr, None)
2914
2915     if self.op.mode not in (constants.INSTANCE_CREATE,
2916                             constants.INSTANCE_IMPORT):
2917       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2918                                  self.op.mode)
2919
2920     if self.op.mode == constants.INSTANCE_IMPORT:
2921       src_node = getattr(self.op, "src_node", None)
2922       src_path = getattr(self.op, "src_path", None)
2923       if src_node is None or src_path is None:
2924         raise errors.OpPrereqError("Importing an instance requires source"
2925                                    " node and path options")
2926       src_node_full = self.cfg.ExpandNodeName(src_node)
2927       if src_node_full is None:
2928         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2929       self.op.src_node = src_node = src_node_full
2930
2931       if not os.path.isabs(src_path):
2932         raise errors.OpPrereqError("The source path must be absolute")
2933
2934       export_info = rpc.call_export_info(src_node, src_path)
2935
2936       if not export_info:
2937         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2938
2939       if not export_info.has_section(constants.INISECT_EXP):
2940         raise errors.ProgrammerError("Corrupted export config")
2941
2942       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2943       if (int(ei_version) != constants.EXPORT_VERSION):
2944         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2945                                    (ei_version, constants.EXPORT_VERSION))
2946
2947       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2948         raise errors.OpPrereqError("Can't import instance with more than"
2949                                    " one data disk")
2950
2951       # FIXME: are the old os-es, disk sizes, etc. useful?
2952       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2953       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2954                                                          'disk0_dump'))
2955       self.src_image = diskimage
2956     else: # INSTANCE_CREATE
2957       if getattr(self.op, "os_type", None) is None:
2958         raise errors.OpPrereqError("No guest OS specified")
2959
2960     # check primary node
2961     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2962     if pnode is None:
2963       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2964                                  self.op.pnode)
2965     self.op.pnode = pnode.name
2966     self.pnode = pnode
2967     self.secondaries = []
2968     # disk template and mirror node verification
2969     if self.op.disk_template not in constants.DISK_TEMPLATES:
2970       raise errors.OpPrereqError("Invalid disk template name")
2971
2972     if self.op.disk_template in constants.DTS_NET_MIRROR:
2973       if getattr(self.op, "snode", None) is None:
2974         raise errors.OpPrereqError("The networked disk templates need"
2975                                    " a mirror node")
2976
2977       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2978       if snode_name is None:
2979         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2980                                    self.op.snode)
2981       elif snode_name == pnode.name:
2982         raise errors.OpPrereqError("The secondary node cannot be"
2983                                    " the primary node.")
2984       self.secondaries.append(snode_name)
2985
2986     # Required free disk space as a function of disk and swap space
2987     req_size_dict = {
2988       constants.DT_DISKLESS: None,
2989       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2990       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2991       # 256 MB are added for drbd metadata, 128MB for each drbd device
2992       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2993       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2994     }
2995
2996     if self.op.disk_template not in req_size_dict:
2997       raise errors.ProgrammerError("Disk template '%s' size requirement"
2998                                    " is unknown" %  self.op.disk_template)
2999
3000     req_size = req_size_dict[self.op.disk_template]
3001
3002     # Check lv size requirements
3003     if req_size is not None:
3004       nodenames = [pnode.name] + self.secondaries
3005       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3006       for node in nodenames:
3007         info = nodeinfo.get(node, None)
3008         if not info:
3009           raise errors.OpPrereqError("Cannot get current information"
3010                                      " from node '%s'" % nodeinfo)
3011         vg_free = info.get('vg_free', None)
3012         if not isinstance(vg_free, int):
3013           raise errors.OpPrereqError("Can't compute free disk space on"
3014                                      " node %s" % node)
3015         if req_size > info['vg_free']:
3016           raise errors.OpPrereqError("Not enough disk space on target node %s."
3017                                      " %d MB available, %d MB required" %
3018                                      (node, info['vg_free'], req_size))
3019
3020     # os verification
3021     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3022     if not os_obj:
3023       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3024                                  " primary node"  % self.op.os_type)
3025
3026     if self.op.kernel_path == constants.VALUE_NONE:
3027       raise errors.OpPrereqError("Can't set instance kernel to none")
3028
3029     # instance verification
3030     hostname1 = utils.HostInfo(self.op.instance_name)
3031
3032     self.op.instance_name = instance_name = hostname1.name
3033     instance_list = self.cfg.GetInstanceList()
3034     if instance_name in instance_list:
3035       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3036                                  instance_name)
3037
3038     ip = getattr(self.op, "ip", None)
3039     if ip is None or ip.lower() == "none":
3040       inst_ip = None
3041     elif ip.lower() == "auto":
3042       inst_ip = hostname1.ip
3043     else:
3044       if not utils.IsValidIP(ip):
3045         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3046                                    " like a valid IP" % ip)
3047       inst_ip = ip
3048     self.inst_ip = inst_ip
3049
3050     if self.op.start and not self.op.ip_check:
3051       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3052                                  " adding an instance in start mode")
3053
3054     if self.op.ip_check:
3055       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3056                        constants.DEFAULT_NODED_PORT):
3057         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3058                                    (hostname1.ip, instance_name))
3059
3060     # MAC address verification
3061     if self.op.mac != "auto":
3062       if not utils.IsValidMac(self.op.mac.lower()):
3063         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3064                                    self.op.mac)
3065
3066     # bridge verification
3067     bridge = getattr(self.op, "bridge", None)
3068     if bridge is None:
3069       self.op.bridge = self.cfg.GetDefBridge()
3070     else:
3071       self.op.bridge = bridge
3072
3073     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3074       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3075                                  " destination node '%s'" %
3076                                  (self.op.bridge, pnode.name))
3077
3078     # boot order verification
3079     if self.op.hvm_boot_order is not None:
3080       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3081         raise errors.OpPrereqError("invalid boot order specified,"
3082                                    " must be one or more of [acdn]")
3083
3084     if self.op.start:
3085       self.instance_status = 'up'
3086     else:
3087       self.instance_status = 'down'
3088
3089   def Exec(self, feedback_fn):
3090     """Create and add the instance to the cluster.
3091
3092     """
3093     instance = self.op.instance_name
3094     pnode_name = self.pnode.name
3095
3096     if self.op.mac == "auto":
3097       mac_address = self.cfg.GenerateMAC()
3098     else:
3099       mac_address = self.op.mac
3100
3101     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3102     if self.inst_ip is not None:
3103       nic.ip = self.inst_ip
3104
3105     ht_kind = self.sstore.GetHypervisorType()
3106     if ht_kind in constants.HTS_REQ_PORT:
3107       network_port = self.cfg.AllocatePort()
3108     else:
3109       network_port = None
3110
3111     disks = _GenerateDiskTemplate(self.cfg,
3112                                   self.op.disk_template,
3113                                   instance, pnode_name,
3114                                   self.secondaries, self.op.disk_size,
3115                                   self.op.swap_size)
3116
3117     iobj = objects.Instance(name=instance, os=self.op.os_type,
3118                             primary_node=pnode_name,
3119                             memory=self.op.mem_size,
3120                             vcpus=self.op.vcpus,
3121                             nics=[nic], disks=disks,
3122                             disk_template=self.op.disk_template,
3123                             status=self.instance_status,
3124                             network_port=network_port,
3125                             kernel_path=self.op.kernel_path,
3126                             initrd_path=self.op.initrd_path,
3127                             hvm_boot_order=self.op.hvm_boot_order,
3128                             )
3129
3130     feedback_fn("* creating instance disks...")
3131     if not _CreateDisks(self.cfg, iobj):
3132       _RemoveDisks(iobj, self.cfg)
3133       raise errors.OpExecError("Device creation failed, reverting...")
3134
3135     feedback_fn("adding instance %s to cluster config" % instance)
3136
3137     self.cfg.AddInstance(iobj)
3138
3139     if self.op.wait_for_sync:
3140       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3141     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3142       # make sure the disks are not degraded (still sync-ing is ok)
3143       time.sleep(15)
3144       feedback_fn("* checking mirrors status")
3145       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3146     else:
3147       disk_abort = False
3148
3149     if disk_abort:
3150       _RemoveDisks(iobj, self.cfg)
3151       self.cfg.RemoveInstance(iobj.name)
3152       raise errors.OpExecError("There are some degraded disks for"
3153                                " this instance")
3154
3155     feedback_fn("creating os for instance %s on node %s" %
3156                 (instance, pnode_name))
3157
3158     if iobj.disk_template != constants.DT_DISKLESS:
3159       if self.op.mode == constants.INSTANCE_CREATE:
3160         feedback_fn("* running the instance OS create scripts...")
3161         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3162           raise errors.OpExecError("could not add os for instance %s"
3163                                    " on node %s" %
3164                                    (instance, pnode_name))
3165
3166       elif self.op.mode == constants.INSTANCE_IMPORT:
3167         feedback_fn("* running the instance OS import scripts...")
3168         src_node = self.op.src_node
3169         src_image = self.src_image
3170         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3171                                                 src_node, src_image):
3172           raise errors.OpExecError("Could not import os for instance"
3173                                    " %s on node %s" %
3174                                    (instance, pnode_name))
3175       else:
3176         # also checked in the prereq part
3177         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3178                                      % self.op.mode)
3179
3180     if self.op.start:
3181       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3182       feedback_fn("* starting instance...")
3183       if not rpc.call_instance_start(pnode_name, iobj, None):
3184         raise errors.OpExecError("Could not start instance")
3185
3186
3187 class LUConnectConsole(NoHooksLU):
3188   """Connect to an instance's console.
3189
3190   This is somewhat special in that it returns the command line that
3191   you need to run on the master node in order to connect to the
3192   console.
3193
3194   """
3195   _OP_REQP = ["instance_name"]
3196
3197   def CheckPrereq(self):
3198     """Check prerequisites.
3199
3200     This checks that the instance is in the cluster.
3201
3202     """
3203     instance = self.cfg.GetInstanceInfo(
3204       self.cfg.ExpandInstanceName(self.op.instance_name))
3205     if instance is None:
3206       raise errors.OpPrereqError("Instance '%s' not known" %
3207                                  self.op.instance_name)
3208     self.instance = instance
3209
3210   def Exec(self, feedback_fn):
3211     """Connect to the console of an instance
3212
3213     """
3214     instance = self.instance
3215     node = instance.primary_node
3216
3217     node_insts = rpc.call_instance_list([node])[node]
3218     if node_insts is False:
3219       raise errors.OpExecError("Can't connect to node %s." % node)
3220
3221     if instance.name not in node_insts:
3222       raise errors.OpExecError("Instance %s is not running." % instance.name)
3223
3224     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3225
3226     hyper = hypervisor.GetHypervisor()
3227     console_cmd = hyper.GetShellCommandForConsole(instance)
3228     # build ssh cmdline
3229     argv = ["ssh", "-q", "-t"]
3230     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3231     argv.extend(ssh.BATCH_MODE_OPTS)
3232     argv.append(node)
3233     argv.append(console_cmd)
3234     return "ssh", argv
3235
3236
3237 class LUAddMDDRBDComponent(LogicalUnit):
3238   """Adda new mirror member to an instance's disk.
3239
3240   """
3241   HPATH = "mirror-add"
3242   HTYPE = constants.HTYPE_INSTANCE
3243   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3244
3245   def BuildHooksEnv(self):
3246     """Build hooks env.
3247
3248     This runs on the master, the primary and all the secondaries.
3249
3250     """
3251     env = {
3252       "NEW_SECONDARY": self.op.remote_node,
3253       "DISK_NAME": self.op.disk_name,
3254       }
3255     env.update(_BuildInstanceHookEnvByObject(self.instance))
3256     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3257           self.op.remote_node,] + list(self.instance.secondary_nodes)
3258     return env, nl, nl
3259
3260   def CheckPrereq(self):
3261     """Check prerequisites.
3262
3263     This checks that the instance is in the cluster.
3264
3265     """
3266     instance = self.cfg.GetInstanceInfo(
3267       self.cfg.ExpandInstanceName(self.op.instance_name))
3268     if instance is None:
3269       raise errors.OpPrereqError("Instance '%s' not known" %
3270                                  self.op.instance_name)
3271     self.instance = instance
3272
3273     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3274     if remote_node is None:
3275       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3276     self.remote_node = remote_node
3277
3278     if remote_node == instance.primary_node:
3279       raise errors.OpPrereqError("The specified node is the primary node of"
3280                                  " the instance.")
3281
3282     if instance.disk_template != constants.DT_REMOTE_RAID1:
3283       raise errors.OpPrereqError("Instance's disk layout is not"
3284                                  " remote_raid1.")
3285     for disk in instance.disks:
3286       if disk.iv_name == self.op.disk_name:
3287         break
3288     else:
3289       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3290                                  " instance." % self.op.disk_name)
3291     if len(disk.children) > 1:
3292       raise errors.OpPrereqError("The device already has two slave devices."
3293                                  " This would create a 3-disk raid1 which we"
3294                                  " don't allow.")
3295     self.disk = disk
3296
3297   def Exec(self, feedback_fn):
3298     """Add the mirror component
3299
3300     """
3301     disk = self.disk
3302     instance = self.instance
3303
3304     remote_node = self.remote_node
3305     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3306     names = _GenerateUniqueNames(self.cfg, lv_names)
3307     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3308                                      remote_node, disk.size, names)
3309
3310     logger.Info("adding new mirror component on secondary")
3311     #HARDCODE
3312     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3313                                       new_drbd, False,
3314                                       _GetInstanceInfoText(instance)):
3315       raise errors.OpExecError("Failed to create new component on secondary"
3316                                " node %s" % remote_node)
3317
3318     logger.Info("adding new mirror component on primary")
3319     #HARDCODE
3320     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3321                                     instance, new_drbd,
3322                                     _GetInstanceInfoText(instance)):
3323       # remove secondary dev
3324       self.cfg.SetDiskID(new_drbd, remote_node)
3325       rpc.call_blockdev_remove(remote_node, new_drbd)
3326       raise errors.OpExecError("Failed to create volume on primary")
3327
3328     # the device exists now
3329     # call the primary node to add the mirror to md
3330     logger.Info("adding new mirror component to md")
3331     if not rpc.call_blockdev_addchildren(instance.primary_node,
3332                                          disk, [new_drbd]):
3333       logger.Error("Can't add mirror compoment to md!")
3334       self.cfg.SetDiskID(new_drbd, remote_node)
3335       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3336         logger.Error("Can't rollback on secondary")
3337       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3338       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3339         logger.Error("Can't rollback on primary")
3340       raise errors.OpExecError("Can't add mirror component to md array")
3341
3342     disk.children.append(new_drbd)
3343
3344     self.cfg.AddInstance(instance)
3345
3346     _WaitForSync(self.cfg, instance, self.proc)
3347
3348     return 0
3349
3350
3351 class LURemoveMDDRBDComponent(LogicalUnit):
3352   """Remove a component from a remote_raid1 disk.
3353
3354   """
3355   HPATH = "mirror-remove"
3356   HTYPE = constants.HTYPE_INSTANCE
3357   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3358
3359   def BuildHooksEnv(self):
3360     """Build hooks env.
3361
3362     This runs on the master, the primary and all the secondaries.
3363
3364     """
3365     env = {
3366       "DISK_NAME": self.op.disk_name,
3367       "DISK_ID": self.op.disk_id,
3368       "OLD_SECONDARY": self.old_secondary,
3369       }
3370     env.update(_BuildInstanceHookEnvByObject(self.instance))
3371     nl = [self.sstore.GetMasterNode(),
3372           self.instance.primary_node] + list(self.instance.secondary_nodes)
3373     return env, nl, nl
3374
3375   def CheckPrereq(self):
3376     """Check prerequisites.
3377
3378     This checks that the instance is in the cluster.
3379
3380     """
3381     instance = self.cfg.GetInstanceInfo(
3382       self.cfg.ExpandInstanceName(self.op.instance_name))
3383     if instance is None:
3384       raise errors.OpPrereqError("Instance '%s' not known" %
3385                                  self.op.instance_name)
3386     self.instance = instance
3387
3388     if instance.disk_template != constants.DT_REMOTE_RAID1:
3389       raise errors.OpPrereqError("Instance's disk layout is not"
3390                                  " remote_raid1.")
3391     for disk in instance.disks:
3392       if disk.iv_name == self.op.disk_name:
3393         break
3394     else:
3395       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3396                                  " instance." % self.op.disk_name)
3397     for child in disk.children:
3398       if (child.dev_type == constants.LD_DRBD7 and
3399           child.logical_id[2] == self.op.disk_id):
3400         break
3401     else:
3402       raise errors.OpPrereqError("Can't find the device with this port.")
3403
3404     if len(disk.children) < 2:
3405       raise errors.OpPrereqError("Cannot remove the last component from"
3406                                  " a mirror.")
3407     self.disk = disk
3408     self.child = child
3409     if self.child.logical_id[0] == instance.primary_node:
3410       oid = 1
3411     else:
3412       oid = 0
3413     self.old_secondary = self.child.logical_id[oid]
3414
3415   def Exec(self, feedback_fn):
3416     """Remove the mirror component
3417
3418     """
3419     instance = self.instance
3420     disk = self.disk
3421     child = self.child
3422     logger.Info("remove mirror component")
3423     self.cfg.SetDiskID(disk, instance.primary_node)
3424     if not rpc.call_blockdev_removechildren(instance.primary_node,
3425                                             disk, [child]):
3426       raise errors.OpExecError("Can't remove child from mirror.")
3427
3428     for node in child.logical_id[:2]:
3429       self.cfg.SetDiskID(child, node)
3430       if not rpc.call_blockdev_remove(node, child):
3431         logger.Error("Warning: failed to remove device from node %s,"
3432                      " continuing operation." % node)
3433
3434     disk.children.remove(child)
3435     self.cfg.AddInstance(instance)
3436
3437
3438 class LUReplaceDisks(LogicalUnit):
3439   """Replace the disks of an instance.
3440
3441   """
3442   HPATH = "mirrors-replace"
3443   HTYPE = constants.HTYPE_INSTANCE
3444   _OP_REQP = ["instance_name", "mode", "disks"]
3445
3446   def BuildHooksEnv(self):
3447     """Build hooks env.
3448
3449     This runs on the master, the primary and all the secondaries.
3450
3451     """
3452     env = {
3453       "MODE": self.op.mode,
3454       "NEW_SECONDARY": self.op.remote_node,
3455       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3456       }
3457     env.update(_BuildInstanceHookEnvByObject(self.instance))
3458     nl = [
3459       self.sstore.GetMasterNode(),
3460       self.instance.primary_node,
3461       ]
3462     if self.op.remote_node is not None:
3463       nl.append(self.op.remote_node)
3464     return env, nl, nl
3465
3466   def CheckPrereq(self):
3467     """Check prerequisites.
3468
3469     This checks that the instance is in the cluster.
3470
3471     """
3472     instance = self.cfg.GetInstanceInfo(
3473       self.cfg.ExpandInstanceName(self.op.instance_name))
3474     if instance is None:
3475       raise errors.OpPrereqError("Instance '%s' not known" %
3476                                  self.op.instance_name)
3477     self.instance = instance
3478     self.op.instance_name = instance.name
3479
3480     if instance.disk_template not in constants.DTS_NET_MIRROR:
3481       raise errors.OpPrereqError("Instance's disk layout is not"
3482                                  " network mirrored.")
3483
3484     if len(instance.secondary_nodes) != 1:
3485       raise errors.OpPrereqError("The instance has a strange layout,"
3486                                  " expected one secondary but found %d" %
3487                                  len(instance.secondary_nodes))
3488
3489     self.sec_node = instance.secondary_nodes[0]
3490
3491     remote_node = getattr(self.op, "remote_node", None)
3492     if remote_node is not None:
3493       remote_node = self.cfg.ExpandNodeName(remote_node)
3494       if remote_node is None:
3495         raise errors.OpPrereqError("Node '%s' not known" %
3496                                    self.op.remote_node)
3497       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3498     else:
3499       self.remote_node_info = None
3500     if remote_node == instance.primary_node:
3501       raise errors.OpPrereqError("The specified node is the primary node of"
3502                                  " the instance.")
3503     elif remote_node == self.sec_node:
3504       if self.op.mode == constants.REPLACE_DISK_SEC:
3505         # this is for DRBD8, where we can't execute the same mode of
3506         # replacement as for drbd7 (no different port allocated)
3507         raise errors.OpPrereqError("Same secondary given, cannot execute"
3508                                    " replacement")
3509       # the user gave the current secondary, switch to
3510       # 'no-replace-secondary' mode for drbd7
3511       remote_node = None
3512     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3513         self.op.mode != constants.REPLACE_DISK_ALL):
3514       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3515                                  " disks replacement, not individual ones")
3516     if instance.disk_template == constants.DT_DRBD8:
3517       if (self.op.mode == constants.REPLACE_DISK_ALL and
3518           remote_node is not None):
3519         # switch to replace secondary mode
3520         self.op.mode = constants.REPLACE_DISK_SEC
3521
3522       if self.op.mode == constants.REPLACE_DISK_ALL:
3523         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3524                                    " secondary disk replacement, not"
3525                                    " both at once")
3526       elif self.op.mode == constants.REPLACE_DISK_PRI:
3527         if remote_node is not None:
3528           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3529                                      " the secondary while doing a primary"
3530                                      " node disk replacement")
3531         self.tgt_node = instance.primary_node
3532         self.oth_node = instance.secondary_nodes[0]
3533       elif self.op.mode == constants.REPLACE_DISK_SEC:
3534         self.new_node = remote_node # this can be None, in which case
3535                                     # we don't change the secondary
3536         self.tgt_node = instance.secondary_nodes[0]
3537         self.oth_node = instance.primary_node
3538       else:
3539         raise errors.ProgrammerError("Unhandled disk replace mode")
3540
3541     for name in self.op.disks:
3542       if instance.FindDisk(name) is None:
3543         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3544                                    (name, instance.name))
3545     self.op.remote_node = remote_node
3546
3547   def _ExecRR1(self, feedback_fn):
3548     """Replace the disks of an instance.
3549
3550     """
3551     instance = self.instance
3552     iv_names = {}
3553     # start of work
3554     if self.op.remote_node is None:
3555       remote_node = self.sec_node
3556     else:
3557       remote_node = self.op.remote_node
3558     cfg = self.cfg
3559     for dev in instance.disks:
3560       size = dev.size
3561       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3562       names = _GenerateUniqueNames(cfg, lv_names)
3563       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3564                                        remote_node, size, names)
3565       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3566       logger.Info("adding new mirror component on secondary for %s" %
3567                   dev.iv_name)
3568       #HARDCODE
3569       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3570                                         new_drbd, False,
3571                                         _GetInstanceInfoText(instance)):
3572         raise errors.OpExecError("Failed to create new component on secondary"
3573                                  " node %s. Full abort, cleanup manually!" %
3574                                  remote_node)
3575
3576       logger.Info("adding new mirror component on primary")
3577       #HARDCODE
3578       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3579                                       instance, new_drbd,
3580                                       _GetInstanceInfoText(instance)):
3581         # remove secondary dev
3582         cfg.SetDiskID(new_drbd, remote_node)
3583         rpc.call_blockdev_remove(remote_node, new_drbd)
3584         raise errors.OpExecError("Failed to create volume on primary!"
3585                                  " Full abort, cleanup manually!!")
3586
3587       # the device exists now
3588       # call the primary node to add the mirror to md
3589       logger.Info("adding new mirror component to md")
3590       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3591                                            [new_drbd]):
3592         logger.Error("Can't add mirror compoment to md!")
3593         cfg.SetDiskID(new_drbd, remote_node)
3594         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3595           logger.Error("Can't rollback on secondary")
3596         cfg.SetDiskID(new_drbd, instance.primary_node)
3597         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3598           logger.Error("Can't rollback on primary")
3599         raise errors.OpExecError("Full abort, cleanup manually!!")
3600
3601       dev.children.append(new_drbd)
3602       cfg.AddInstance(instance)
3603
3604     # this can fail as the old devices are degraded and _WaitForSync
3605     # does a combined result over all disks, so we don't check its
3606     # return value
3607     _WaitForSync(cfg, instance, self.proc, unlock=True)
3608
3609     # so check manually all the devices
3610     for name in iv_names:
3611       dev, child, new_drbd = iv_names[name]
3612       cfg.SetDiskID(dev, instance.primary_node)
3613       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3614       if is_degr:
3615         raise errors.OpExecError("MD device %s is degraded!" % name)
3616       cfg.SetDiskID(new_drbd, instance.primary_node)
3617       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3618       if is_degr:
3619         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3620
3621     for name in iv_names:
3622       dev, child, new_drbd = iv_names[name]
3623       logger.Info("remove mirror %s component" % name)
3624       cfg.SetDiskID(dev, instance.primary_node)
3625       if not rpc.call_blockdev_removechildren(instance.primary_node,
3626                                               dev, [child]):
3627         logger.Error("Can't remove child from mirror, aborting"
3628                      " *this device cleanup*.\nYou need to cleanup manually!!")
3629         continue
3630
3631       for node in child.logical_id[:2]:
3632         logger.Info("remove child device on %s" % node)
3633         cfg.SetDiskID(child, node)
3634         if not rpc.call_blockdev_remove(node, child):
3635           logger.Error("Warning: failed to remove device from node %s,"
3636                        " continuing operation." % node)
3637
3638       dev.children.remove(child)
3639
3640       cfg.AddInstance(instance)
3641
3642   def _ExecD8DiskOnly(self, feedback_fn):
3643     """Replace a disk on the primary or secondary for dbrd8.
3644
3645     The algorithm for replace is quite complicated:
3646       - for each disk to be replaced:
3647         - create new LVs on the target node with unique names
3648         - detach old LVs from the drbd device
3649         - rename old LVs to name_replaced.<time_t>
3650         - rename new LVs to old LVs
3651         - attach the new LVs (with the old names now) to the drbd device
3652       - wait for sync across all devices
3653       - for each modified disk:
3654         - remove old LVs (which have the name name_replaces.<time_t>)
3655
3656     Failures are not very well handled.
3657
3658     """
3659     steps_total = 6
3660     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3661     instance = self.instance
3662     iv_names = {}
3663     vgname = self.cfg.GetVGName()
3664     # start of work
3665     cfg = self.cfg
3666     tgt_node = self.tgt_node
3667     oth_node = self.oth_node
3668
3669     # Step: check device activation
3670     self.proc.LogStep(1, steps_total, "check device existence")
3671     info("checking volume groups")
3672     my_vg = cfg.GetVGName()
3673     results = rpc.call_vg_list([oth_node, tgt_node])
3674     if not results:
3675       raise errors.OpExecError("Can't list volume groups on the nodes")
3676     for node in oth_node, tgt_node:
3677       res = results.get(node, False)
3678       if not res or my_vg not in res:
3679         raise errors.OpExecError("Volume group '%s' not found on %s" %
3680                                  (my_vg, node))
3681     for dev in instance.disks:
3682       if not dev.iv_name in self.op.disks:
3683         continue
3684       for node in tgt_node, oth_node:
3685         info("checking %s on %s" % (dev.iv_name, node))
3686         cfg.SetDiskID(dev, node)
3687         if not rpc.call_blockdev_find(node, dev):
3688           raise errors.OpExecError("Can't find device %s on node %s" %
3689                                    (dev.iv_name, node))
3690
3691     # Step: check other node consistency
3692     self.proc.LogStep(2, steps_total, "check peer consistency")
3693     for dev in instance.disks:
3694       if not dev.iv_name in self.op.disks:
3695         continue
3696       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3697       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3698                                    oth_node==instance.primary_node):
3699         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3700                                  " to replace disks on this node (%s)" %
3701                                  (oth_node, tgt_node))
3702
3703     # Step: create new storage
3704     self.proc.LogStep(3, steps_total, "allocate new storage")
3705     for dev in instance.disks:
3706       if not dev.iv_name in self.op.disks:
3707         continue
3708       size = dev.size
3709       cfg.SetDiskID(dev, tgt_node)
3710       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3711       names = _GenerateUniqueNames(cfg, lv_names)
3712       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3713                              logical_id=(vgname, names[0]))
3714       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3715                              logical_id=(vgname, names[1]))
3716       new_lvs = [lv_data, lv_meta]
3717       old_lvs = dev.children
3718       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3719       info("creating new local storage on %s for %s" %
3720            (tgt_node, dev.iv_name))
3721       # since we *always* want to create this LV, we use the
3722       # _Create...OnPrimary (which forces the creation), even if we
3723       # are talking about the secondary node
3724       for new_lv in new_lvs:
3725         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3726                                         _GetInstanceInfoText(instance)):
3727           raise errors.OpExecError("Failed to create new LV named '%s' on"
3728                                    " node '%s'" %
3729                                    (new_lv.logical_id[1], tgt_node))
3730
3731     # Step: for each lv, detach+rename*2+attach
3732     self.proc.LogStep(4, steps_total, "change drbd configuration")
3733     for dev, old_lvs, new_lvs in iv_names.itervalues():
3734       info("detaching %s drbd from local storage" % dev.iv_name)
3735       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3736         raise errors.OpExecError("Can't detach drbd from local storage on node"
3737                                  " %s for device %s" % (tgt_node, dev.iv_name))
3738       #dev.children = []
3739       #cfg.Update(instance)
3740
3741       # ok, we created the new LVs, so now we know we have the needed
3742       # storage; as such, we proceed on the target node to rename
3743       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3744       # using the assumption than logical_id == physical_id (which in
3745       # turn is the unique_id on that node)
3746
3747       # FIXME(iustin): use a better name for the replaced LVs
3748       temp_suffix = int(time.time())
3749       ren_fn = lambda d, suff: (d.physical_id[0],
3750                                 d.physical_id[1] + "_replaced-%s" % suff)
3751       # build the rename list based on what LVs exist on the node
3752       rlist = []
3753       for to_ren in old_lvs:
3754         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3755         if find_res is not None: # device exists
3756           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3757
3758       info("renaming the old LVs on the target node")
3759       if not rpc.call_blockdev_rename(tgt_node, rlist):
3760         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3761       # now we rename the new LVs to the old LVs
3762       info("renaming the new LVs on the target node")
3763       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3764       if not rpc.call_blockdev_rename(tgt_node, rlist):
3765         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3766
3767       for old, new in zip(old_lvs, new_lvs):
3768         new.logical_id = old.logical_id
3769         cfg.SetDiskID(new, tgt_node)
3770
3771       for disk in old_lvs:
3772         disk.logical_id = ren_fn(disk, temp_suffix)
3773         cfg.SetDiskID(disk, tgt_node)
3774
3775       # now that the new lvs have the old name, we can add them to the device
3776       info("adding new mirror component on %s" % tgt_node)
3777       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3778         for new_lv in new_lvs:
3779           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3780             warning("Can't rollback device %s", hint="manually cleanup unused"
3781                     " logical volumes")
3782         raise errors.OpExecError("Can't add local storage to drbd")
3783
3784       dev.children = new_lvs
3785       cfg.Update(instance)
3786
3787     # Step: wait for sync
3788
3789     # this can fail as the old devices are degraded and _WaitForSync
3790     # does a combined result over all disks, so we don't check its
3791     # return value
3792     self.proc.LogStep(5, steps_total, "sync devices")
3793     _WaitForSync(cfg, instance, self.proc, unlock=True)
3794
3795     # so check manually all the devices
3796     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3797       cfg.SetDiskID(dev, instance.primary_node)
3798       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3799       if is_degr:
3800         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3801
3802     # Step: remove old storage
3803     self.proc.LogStep(6, steps_total, "removing old storage")
3804     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3805       info("remove logical volumes for %s" % name)
3806       for lv in old_lvs:
3807         cfg.SetDiskID(lv, tgt_node)
3808         if not rpc.call_blockdev_remove(tgt_node, lv):
3809           warning("Can't remove old LV", hint="manually remove unused LVs")
3810           continue
3811
3812   def _ExecD8Secondary(self, feedback_fn):
3813     """Replace the secondary node for drbd8.
3814
3815     The algorithm for replace is quite complicated:
3816       - for all disks of the instance:
3817         - create new LVs on the new node with same names
3818         - shutdown the drbd device on the old secondary
3819         - disconnect the drbd network on the primary
3820         - create the drbd device on the new secondary
3821         - network attach the drbd on the primary, using an artifice:
3822           the drbd code for Attach() will connect to the network if it
3823           finds a device which is connected to the good local disks but
3824           not network enabled
3825       - wait for sync across all devices
3826       - remove all disks from the old secondary
3827
3828     Failures are not very well handled.
3829
3830     """
3831     steps_total = 6
3832     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3833     instance = self.instance
3834     iv_names = {}
3835     vgname = self.cfg.GetVGName()
3836     # start of work
3837     cfg = self.cfg
3838     old_node = self.tgt_node
3839     new_node = self.new_node
3840     pri_node = instance.primary_node
3841
3842     # Step: check device activation
3843     self.proc.LogStep(1, steps_total, "check device existence")
3844     info("checking volume groups")
3845     my_vg = cfg.GetVGName()
3846     results = rpc.call_vg_list([pri_node, new_node])
3847     if not results:
3848       raise errors.OpExecError("Can't list volume groups on the nodes")
3849     for node in pri_node, new_node:
3850       res = results.get(node, False)
3851       if not res or my_vg not in res:
3852         raise errors.OpExecError("Volume group '%s' not found on %s" %
3853                                  (my_vg, node))
3854     for dev in instance.disks:
3855       if not dev.iv_name in self.op.disks:
3856         continue
3857       info("checking %s on %s" % (dev.iv_name, pri_node))
3858       cfg.SetDiskID(dev, pri_node)
3859       if not rpc.call_blockdev_find(pri_node, dev):
3860         raise errors.OpExecError("Can't find device %s on node %s" %
3861                                  (dev.iv_name, pri_node))
3862
3863     # Step: check other node consistency
3864     self.proc.LogStep(2, steps_total, "check peer consistency")
3865     for dev in instance.disks:
3866       if not dev.iv_name in self.op.disks:
3867         continue
3868       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3869       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3870         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3871                                  " unsafe to replace the secondary" %
3872                                  pri_node)
3873
3874     # Step: create new storage
3875     self.proc.LogStep(3, steps_total, "allocate new storage")
3876     for dev in instance.disks:
3877       size = dev.size
3878       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3879       # since we *always* want to create this LV, we use the
3880       # _Create...OnPrimary (which forces the creation), even if we
3881       # are talking about the secondary node
3882       for new_lv in dev.children:
3883         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3884                                         _GetInstanceInfoText(instance)):
3885           raise errors.OpExecError("Failed to create new LV named '%s' on"
3886                                    " node '%s'" %
3887                                    (new_lv.logical_id[1], new_node))
3888
3889       iv_names[dev.iv_name] = (dev, dev.children)
3890
3891     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3892     for dev in instance.disks:
3893       size = dev.size
3894       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3895       # create new devices on new_node
3896       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3897                               logical_id=(pri_node, new_node,
3898                                           dev.logical_id[2]),
3899                               children=dev.children)
3900       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3901                                         new_drbd, False,
3902                                       _GetInstanceInfoText(instance)):
3903         raise errors.OpExecError("Failed to create new DRBD on"
3904                                  " node '%s'" % new_node)
3905
3906     for dev in instance.disks:
3907       # we have new devices, shutdown the drbd on the old secondary
3908       info("shutting down drbd for %s on old node" % dev.iv_name)
3909       cfg.SetDiskID(dev, old_node)
3910       if not rpc.call_blockdev_shutdown(old_node, dev):
3911         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3912                 hint="Please cleanup this device manually as soon as possible")
3913
3914     info("detaching primary drbds from the network (=> standalone)")
3915     done = 0
3916     for dev in instance.disks:
3917       cfg.SetDiskID(dev, pri_node)
3918       # set the physical (unique in bdev terms) id to None, meaning
3919       # detach from network
3920       dev.physical_id = (None,) * len(dev.physical_id)
3921       # and 'find' the device, which will 'fix' it to match the
3922       # standalone state
3923       if rpc.call_blockdev_find(pri_node, dev):
3924         done += 1
3925       else:
3926         warning("Failed to detach drbd %s from network, unusual case" %
3927                 dev.iv_name)
3928
3929     if not done:
3930       # no detaches succeeded (very unlikely)
3931       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3932
3933     # if we managed to detach at least one, we update all the disks of
3934     # the instance to point to the new secondary
3935     info("updating instance configuration")
3936     for dev in instance.disks:
3937       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3938       cfg.SetDiskID(dev, pri_node)
3939     cfg.Update(instance)
3940
3941     # and now perform the drbd attach
3942     info("attaching primary drbds to new secondary (standalone => connected)")
3943     failures = []
3944     for dev in instance.disks:
3945       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3946       # since the attach is smart, it's enough to 'find' the device,
3947       # it will automatically activate the network, if the physical_id
3948       # is correct
3949       cfg.SetDiskID(dev, pri_node)
3950       if not rpc.call_blockdev_find(pri_node, dev):
3951         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3952                 "please do a gnt-instance info to see the status of disks")
3953
3954     # this can fail as the old devices are degraded and _WaitForSync
3955     # does a combined result over all disks, so we don't check its
3956     # return value
3957     self.proc.LogStep(5, steps_total, "sync devices")
3958     _WaitForSync(cfg, instance, self.proc, unlock=True)
3959
3960     # so check manually all the devices
3961     for name, (dev, old_lvs) in iv_names.iteritems():
3962       cfg.SetDiskID(dev, pri_node)
3963       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3964       if is_degr:
3965         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3966
3967     self.proc.LogStep(6, steps_total, "removing old storage")
3968     for name, (dev, old_lvs) in iv_names.iteritems():
3969       info("remove logical volumes for %s" % name)
3970       for lv in old_lvs:
3971         cfg.SetDiskID(lv, old_node)
3972         if not rpc.call_blockdev_remove(old_node, lv):
3973           warning("Can't remove LV on old secondary",
3974                   hint="Cleanup stale volumes by hand")
3975
3976   def Exec(self, feedback_fn):
3977     """Execute disk replacement.
3978
3979     This dispatches the disk replacement to the appropriate handler.
3980
3981     """
3982     instance = self.instance
3983     if instance.disk_template == constants.DT_REMOTE_RAID1:
3984       fn = self._ExecRR1
3985     elif instance.disk_template == constants.DT_DRBD8:
3986       if self.op.remote_node is None:
3987         fn = self._ExecD8DiskOnly
3988       else:
3989         fn = self._ExecD8Secondary
3990     else:
3991       raise errors.ProgrammerError("Unhandled disk replacement case")
3992     return fn(feedback_fn)
3993
3994
3995 class LUQueryInstanceData(NoHooksLU):
3996   """Query runtime instance data.
3997
3998   """
3999   _OP_REQP = ["instances"]
4000
4001   def CheckPrereq(self):
4002     """Check prerequisites.
4003
4004     This only checks the optional instance list against the existing names.
4005
4006     """
4007     if not isinstance(self.op.instances, list):
4008       raise errors.OpPrereqError("Invalid argument type 'instances'")
4009     if self.op.instances:
4010       self.wanted_instances = []
4011       names = self.op.instances
4012       for name in names:
4013         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4014         if instance is None:
4015           raise errors.OpPrereqError("No such instance name '%s'" % name)
4016       self.wanted_instances.append(instance)
4017     else:
4018       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4019                                in self.cfg.GetInstanceList()]
4020     return
4021
4022
4023   def _ComputeDiskStatus(self, instance, snode, dev):
4024     """Compute block device status.
4025
4026     """
4027     self.cfg.SetDiskID(dev, instance.primary_node)
4028     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4029     if dev.dev_type in constants.LDS_DRBD:
4030       # we change the snode then (otherwise we use the one passed in)
4031       if dev.logical_id[0] == instance.primary_node:
4032         snode = dev.logical_id[1]
4033       else:
4034         snode = dev.logical_id[0]
4035
4036     if snode:
4037       self.cfg.SetDiskID(dev, snode)
4038       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4039     else:
4040       dev_sstatus = None
4041
4042     if dev.children:
4043       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4044                       for child in dev.children]
4045     else:
4046       dev_children = []
4047
4048     data = {
4049       "iv_name": dev.iv_name,
4050       "dev_type": dev.dev_type,
4051       "logical_id": dev.logical_id,
4052       "physical_id": dev.physical_id,
4053       "pstatus": dev_pstatus,
4054       "sstatus": dev_sstatus,
4055       "children": dev_children,
4056       }
4057
4058     return data
4059
4060   def Exec(self, feedback_fn):
4061     """Gather and return data"""
4062     result = {}
4063     for instance in self.wanted_instances:
4064       remote_info = rpc.call_instance_info(instance.primary_node,
4065                                                 instance.name)
4066       if remote_info and "state" in remote_info:
4067         remote_state = "up"
4068       else:
4069         remote_state = "down"
4070       if instance.status == "down":
4071         config_state = "down"
4072       else:
4073         config_state = "up"
4074
4075       disks = [self._ComputeDiskStatus(instance, None, device)
4076                for device in instance.disks]
4077
4078       idict = {
4079         "name": instance.name,
4080         "config_state": config_state,
4081         "run_state": remote_state,
4082         "pnode": instance.primary_node,
4083         "snodes": instance.secondary_nodes,
4084         "os": instance.os,
4085         "memory": instance.memory,
4086         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4087         "disks": disks,
4088         "network_port": instance.network_port,
4089         "vcpus": instance.vcpus,
4090         "kernel_path": instance.kernel_path,
4091         "initrd_path": instance.initrd_path,
4092         "hvm_boot_order": instance.hvm_boot_order,
4093         }
4094
4095       result[instance.name] = idict
4096
4097     return result
4098
4099
4100 class LUSetInstanceParms(LogicalUnit):
4101   """Modifies an instances's parameters.
4102
4103   """
4104   HPATH = "instance-modify"
4105   HTYPE = constants.HTYPE_INSTANCE
4106   _OP_REQP = ["instance_name"]
4107
4108   def BuildHooksEnv(self):
4109     """Build hooks env.
4110
4111     This runs on the master, primary and secondaries.
4112
4113     """
4114     args = dict()
4115     if self.mem:
4116       args['memory'] = self.mem
4117     if self.vcpus:
4118       args['vcpus'] = self.vcpus
4119     if self.do_ip or self.do_bridge:
4120       if self.do_ip:
4121         ip = self.ip
4122       else:
4123         ip = self.instance.nics[0].ip
4124       if self.bridge:
4125         bridge = self.bridge
4126       else:
4127         bridge = self.instance.nics[0].bridge
4128       args['nics'] = [(ip, bridge)]
4129     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4130     nl = [self.sstore.GetMasterNode(),
4131           self.instance.primary_node] + list(self.instance.secondary_nodes)
4132     return env, nl, nl
4133
4134   def CheckPrereq(self):
4135     """Check prerequisites.
4136
4137     This only checks the instance list against the existing names.
4138
4139     """
4140     self.mem = getattr(self.op, "mem", None)
4141     self.vcpus = getattr(self.op, "vcpus", None)
4142     self.ip = getattr(self.op, "ip", None)
4143     self.mac = getattr(self.op, "mac", None)
4144     self.bridge = getattr(self.op, "bridge", None)
4145     self.kernel_path = getattr(self.op, "kernel_path", None)
4146     self.initrd_path = getattr(self.op, "initrd_path", None)
4147     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4148     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4149                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4150     if all_parms.count(None) == len(all_parms):
4151       raise errors.OpPrereqError("No changes submitted")
4152     if self.mem is not None:
4153       try:
4154         self.mem = int(self.mem)
4155       except ValueError, err:
4156         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4157     if self.vcpus is not None:
4158       try:
4159         self.vcpus = int(self.vcpus)
4160       except ValueError, err:
4161         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4162     if self.ip is not None:
4163       self.do_ip = True
4164       if self.ip.lower() == "none":
4165         self.ip = None
4166       else:
4167         if not utils.IsValidIP(self.ip):
4168           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4169     else:
4170       self.do_ip = False
4171     self.do_bridge = (self.bridge is not None)
4172     if self.mac is not None:
4173       if self.cfg.IsMacInUse(self.mac):
4174         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4175                                    self.mac)
4176       if not utils.IsValidMac(self.mac):
4177         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4178
4179     if self.kernel_path is not None:
4180       self.do_kernel_path = True
4181       if self.kernel_path == constants.VALUE_NONE:
4182         raise errors.OpPrereqError("Can't set instance to no kernel")
4183
4184       if self.kernel_path != constants.VALUE_DEFAULT:
4185         if not os.path.isabs(self.kernel_path):
4186           raise errors.OpPrereqError("The kernel path must be an absolute"
4187                                     " filename")
4188     else:
4189       self.do_kernel_path = False
4190
4191     if self.initrd_path is not None:
4192       self.do_initrd_path = True
4193       if self.initrd_path not in (constants.VALUE_NONE,
4194                                   constants.VALUE_DEFAULT):
4195         if not os.path.isabs(self.initrd_path):
4196           raise errors.OpPrereqError("The initrd path must be an absolute"
4197                                     " filename")
4198     else:
4199       self.do_initrd_path = False
4200
4201     # boot order verification
4202     if self.hvm_boot_order is not None:
4203       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4204         if len(self.hvm_boot_order.strip("acdn")) != 0:
4205           raise errors.OpPrereqError("invalid boot order specified,"
4206                                      " must be one or more of [acdn]"
4207                                      " or 'default'")
4208
4209     instance = self.cfg.GetInstanceInfo(
4210       self.cfg.ExpandInstanceName(self.op.instance_name))
4211     if instance is None:
4212       raise errors.OpPrereqError("No such instance name '%s'" %
4213                                  self.op.instance_name)
4214     self.op.instance_name = instance.name
4215     self.instance = instance
4216     return
4217
4218   def Exec(self, feedback_fn):
4219     """Modifies an instance.
4220
4221     All parameters take effect only at the next restart of the instance.
4222     """
4223     result = []
4224     instance = self.instance
4225     if self.mem:
4226       instance.memory = self.mem
4227       result.append(("mem", self.mem))
4228     if self.vcpus:
4229       instance.vcpus = self.vcpus
4230       result.append(("vcpus",  self.vcpus))
4231     if self.do_ip:
4232       instance.nics[0].ip = self.ip
4233       result.append(("ip", self.ip))
4234     if self.bridge:
4235       instance.nics[0].bridge = self.bridge
4236       result.append(("bridge", self.bridge))
4237     if self.mac:
4238       instance.nics[0].mac = self.mac
4239       result.append(("mac", self.mac))
4240     if self.do_kernel_path:
4241       instance.kernel_path = self.kernel_path
4242       result.append(("kernel_path", self.kernel_path))
4243     if self.do_initrd_path:
4244       instance.initrd_path = self.initrd_path
4245       result.append(("initrd_path", self.initrd_path))
4246     if self.hvm_boot_order:
4247       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4248         instance.hvm_boot_order = None
4249       else:
4250         instance.hvm_boot_order = self.hvm_boot_order
4251       result.append(("hvm_boot_order", self.hvm_boot_order))
4252
4253     self.cfg.AddInstance(instance)
4254
4255     return result
4256
4257
4258 class LUQueryExports(NoHooksLU):
4259   """Query the exports list
4260
4261   """
4262   _OP_REQP = []
4263
4264   def CheckPrereq(self):
4265     """Check that the nodelist contains only existing nodes.
4266
4267     """
4268     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4269
4270   def Exec(self, feedback_fn):
4271     """Compute the list of all the exported system images.
4272
4273     Returns:
4274       a dictionary with the structure node->(export-list)
4275       where export-list is a list of the instances exported on
4276       that node.
4277
4278     """
4279     return rpc.call_export_list(self.nodes)
4280
4281
4282 class LUExportInstance(LogicalUnit):
4283   """Export an instance to an image in the cluster.
4284
4285   """
4286   HPATH = "instance-export"
4287   HTYPE = constants.HTYPE_INSTANCE
4288   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4289
4290   def BuildHooksEnv(self):
4291     """Build hooks env.
4292
4293     This will run on the master, primary node and target node.
4294
4295     """
4296     env = {
4297       "EXPORT_NODE": self.op.target_node,
4298       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4299       }
4300     env.update(_BuildInstanceHookEnvByObject(self.instance))
4301     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4302           self.op.target_node]
4303     return env, nl, nl
4304
4305   def CheckPrereq(self):
4306     """Check prerequisites.
4307
4308     This checks that the instance name is a valid one.
4309
4310     """
4311     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4312     self.instance = self.cfg.GetInstanceInfo(instance_name)
4313     if self.instance is None:
4314       raise errors.OpPrereqError("Instance '%s' not found" %
4315                                  self.op.instance_name)
4316
4317     # node verification
4318     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4319     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4320
4321     if self.dst_node is None:
4322       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4323                                  self.op.target_node)
4324     self.op.target_node = self.dst_node.name
4325
4326   def Exec(self, feedback_fn):
4327     """Export an instance to an image in the cluster.
4328
4329     """
4330     instance = self.instance
4331     dst_node = self.dst_node
4332     src_node = instance.primary_node
4333     # shutdown the instance, unless requested not to do so
4334     if self.op.shutdown:
4335       op = opcodes.OpShutdownInstance(instance_name=instance.name)
4336       self.proc.ChainOpCode(op)
4337
4338     vgname = self.cfg.GetVGName()
4339
4340     snap_disks = []
4341
4342     try:
4343       for disk in instance.disks:
4344         if disk.iv_name == "sda":
4345           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4346           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4347
4348           if not new_dev_name:
4349             logger.Error("could not snapshot block device %s on node %s" %
4350                          (disk.logical_id[1], src_node))
4351           else:
4352             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4353                                       logical_id=(vgname, new_dev_name),
4354                                       physical_id=(vgname, new_dev_name),
4355                                       iv_name=disk.iv_name)
4356             snap_disks.append(new_dev)
4357
4358     finally:
4359       if self.op.shutdown:
4360         op = opcodes.OpStartupInstance(instance_name=instance.name,
4361                                        force=False)
4362         self.proc.ChainOpCode(op)
4363
4364     # TODO: check for size
4365
4366     for dev in snap_disks:
4367       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4368                                            instance):
4369         logger.Error("could not export block device %s from node"
4370                      " %s to node %s" %
4371                      (dev.logical_id[1], src_node, dst_node.name))
4372       if not rpc.call_blockdev_remove(src_node, dev):
4373         logger.Error("could not remove snapshot block device %s from"
4374                      " node %s" % (dev.logical_id[1], src_node))
4375
4376     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4377       logger.Error("could not finalize export for instance %s on node %s" %
4378                    (instance.name, dst_node.name))
4379
4380     nodelist = self.cfg.GetNodeList()
4381     nodelist.remove(dst_node.name)
4382
4383     # on one-node clusters nodelist will be empty after the removal
4384     # if we proceed the backup would be removed because OpQueryExports
4385     # substitutes an empty list with the full cluster node list.
4386     if nodelist:
4387       op = opcodes.OpQueryExports(nodes=nodelist)
4388       exportlist = self.proc.ChainOpCode(op)
4389       for node in exportlist:
4390         if instance.name in exportlist[node]:
4391           if not rpc.call_export_remove(node, instance.name):
4392             logger.Error("could not remove older export for instance %s"
4393                          " on node %s" % (instance.name, node))
4394
4395
4396 class TagsLU(NoHooksLU):
4397   """Generic tags LU.
4398
4399   This is an abstract class which is the parent of all the other tags LUs.
4400
4401   """
4402   def CheckPrereq(self):
4403     """Check prerequisites.
4404
4405     """
4406     if self.op.kind == constants.TAG_CLUSTER:
4407       self.target = self.cfg.GetClusterInfo()
4408     elif self.op.kind == constants.TAG_NODE:
4409       name = self.cfg.ExpandNodeName(self.op.name)
4410       if name is None:
4411         raise errors.OpPrereqError("Invalid node name (%s)" %
4412                                    (self.op.name,))
4413       self.op.name = name
4414       self.target = self.cfg.GetNodeInfo(name)
4415     elif self.op.kind == constants.TAG_INSTANCE:
4416       name = self.cfg.ExpandInstanceName(self.op.name)
4417       if name is None:
4418         raise errors.OpPrereqError("Invalid instance name (%s)" %
4419                                    (self.op.name,))
4420       self.op.name = name
4421       self.target = self.cfg.GetInstanceInfo(name)
4422     else:
4423       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4424                                  str(self.op.kind))
4425
4426
4427 class LUGetTags(TagsLU):
4428   """Returns the tags of a given object.
4429
4430   """
4431   _OP_REQP = ["kind", "name"]
4432
4433   def Exec(self, feedback_fn):
4434     """Returns the tag list.
4435
4436     """
4437     return self.target.GetTags()
4438
4439
4440 class LUSearchTags(NoHooksLU):
4441   """Searches the tags for a given pattern.
4442
4443   """
4444   _OP_REQP = ["pattern"]
4445
4446   def CheckPrereq(self):
4447     """Check prerequisites.
4448
4449     This checks the pattern passed for validity by compiling it.
4450
4451     """
4452     try:
4453       self.re = re.compile(self.op.pattern)
4454     except re.error, err:
4455       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4456                                  (self.op.pattern, err))
4457
4458   def Exec(self, feedback_fn):
4459     """Returns the tag list.
4460
4461     """
4462     cfg = self.cfg
4463     tgts = [("/cluster", cfg.GetClusterInfo())]
4464     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4465     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4466     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4467     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4468     results = []
4469     for path, target in tgts:
4470       for tag in target.GetTags():
4471         if self.re.search(tag):
4472           results.append((path, tag))
4473     return results
4474
4475
4476 class LUAddTags(TagsLU):
4477   """Sets a tag on a given object.
4478
4479   """
4480   _OP_REQP = ["kind", "name", "tags"]
4481
4482   def CheckPrereq(self):
4483     """Check prerequisites.
4484
4485     This checks the type and length of the tag name and value.
4486
4487     """
4488     TagsLU.CheckPrereq(self)
4489     for tag in self.op.tags:
4490       objects.TaggableObject.ValidateTag(tag)
4491
4492   def Exec(self, feedback_fn):
4493     """Sets the tag.
4494
4495     """
4496     try:
4497       for tag in self.op.tags:
4498         self.target.AddTag(tag)
4499     except errors.TagError, err:
4500       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4501     try:
4502       self.cfg.Update(self.target)
4503     except errors.ConfigurationError:
4504       raise errors.OpRetryError("There has been a modification to the"
4505                                 " config file and the operation has been"
4506                                 " aborted. Please retry.")
4507
4508
4509 class LUDelTags(TagsLU):
4510   """Delete a list of tags from a given object.
4511
4512   """
4513   _OP_REQP = ["kind", "name", "tags"]
4514
4515   def CheckPrereq(self):
4516     """Check prerequisites.
4517
4518     This checks that we have the given tag.
4519
4520     """
4521     TagsLU.CheckPrereq(self)
4522     for tag in self.op.tags:
4523       objects.TaggableObject.ValidateTag(tag)
4524     del_tags = frozenset(self.op.tags)
4525     cur_tags = self.target.GetTags()
4526     if not del_tags <= cur_tags:
4527       diff_tags = del_tags - cur_tags
4528       diff_names = ["'%s'" % tag for tag in diff_tags]
4529       diff_names.sort()
4530       raise errors.OpPrereqError("Tag(s) %s not found" %
4531                                  (",".join(diff_names)))
4532
4533   def Exec(self, feedback_fn):
4534     """Remove the tag from the object.
4535
4536     """
4537     for tag in self.op.tags:
4538       self.target.RemoveTag(tag)
4539     try:
4540       self.cfg.Update(self.target)
4541     except errors.ConfigurationError:
4542       raise errors.OpRetryError("There has been a modification to the"
4543                                 " config file and the operation has been"
4544                                 " aborted. Please retry.")